1use std::error::Error;
10use std::fmt::Debug;
11use std::future::pending;
12use std::ops::Deref;
13use std::sync::OnceLock;
14
15use async_trait::async_trait;
16use hyperactor::Actor;
17use hyperactor::ActorHandle;
18use hyperactor::Context;
19use hyperactor::Endpoint as _;
20use hyperactor::Handler;
21use hyperactor::Instance;
22use hyperactor::OncePortHandle;
23use hyperactor::PortHandle;
24use hyperactor::Proc;
25use hyperactor::RemoteSpawn;
26use hyperactor::actor::ActorError;
27use hyperactor::actor::ActorErrorKind;
28use hyperactor::actor::ActorStatus;
29use hyperactor::actor::Signal;
30use hyperactor::context::Actor as ContextActor;
31use hyperactor::mailbox::MessageEnvelope;
32use hyperactor::mailbox::Undeliverable;
33use hyperactor::mailbox::UndeliverableMessageError;
34use hyperactor::message::Bind;
35use hyperactor::message::Bindings;
36use hyperactor::message::IndexedErasedUnbound;
37use hyperactor::message::Unbind;
38use hyperactor::supervision::ActorSupervisionEvent;
39use hyperactor_config::Flattrs;
40use hyperactor_mesh::casting::update_undeliverable_envelope_for_casting;
41use hyperactor_mesh::comm::multicast::CAST_POINT;
42use hyperactor_mesh::comm::multicast::CastInfo;
43use hyperactor_mesh::supervision::MeshFailure;
44use hyperactor_mesh::transport::default_bind_spec;
45use hyperactor_mesh::value_mesh::ValueOverlay;
46use monarch_types::PickledPyObject;
47use monarch_types::SerializablePyErr;
48use monarch_types::py_global;
49use ndslice::Point;
50use ndslice::extent;
51use pyo3::IntoPyObjectExt;
52use pyo3::exceptions::PyBaseException;
53use pyo3::exceptions::PyRuntimeError;
54use pyo3::exceptions::PyValueError;
55use pyo3::prelude::*;
56use pyo3::types::PyDict;
57use pyo3::types::PyList;
58use pyo3::types::PyType;
59use serde::Deserialize;
60use serde::Serialize;
61use serde_multipart::Part;
62use tokio::sync::oneshot;
63use typeuri::Named;
64
65use crate::buffers::FrozenBuffer;
66use crate::config::ACTOR_QUEUE_DISPATCH;
67use crate::config::SHARED_ASYNCIO_RUNTIME;
68use crate::context::PyInstance;
69use crate::local_state_broker::BrokerId;
70use crate::local_state_broker::LocalStateBrokerMessage;
71use crate::mailbox::EitherPortRef;
72use crate::mailbox::PyMailbox;
73use crate::mailbox::PythonPortHandle;
74use crate::mailbox::PythonUndeliverableMessageEnvelope;
75use crate::metrics::ENDPOINT_ACTOR_COUNT;
76use crate::metrics::ENDPOINT_ACTOR_ERROR;
77use crate::metrics::ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM;
78use crate::metrics::ENDPOINT_ACTOR_PANIC;
79use crate::pickle::pickle_to_part;
80use crate::proc::PyActorAddr;
81use crate::pympsc;
82use crate::pytokio::PythonTask;
83use crate::runtime::get_proc_runtime;
84use crate::runtime::get_tokio_runtime;
85use crate::runtime::monarch_with_gil;
86use crate::runtime::monarch_with_gil_blocking;
87use crate::supervision::PyMeshFailure;
88
89py_global!(
90 unhandled_fault_hook_exception,
91 "monarch._src.actor.supervision",
92 "UnhandledFaultHookException"
93);
94
95#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
96#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
97pub enum UnflattenArg {
98 Mailbox,
99 PyObject,
100}
101
102#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
103#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
104pub enum MethodSpecifier {
105 ReturnsResponse { name: String },
107 ExplicitPort { name: String },
109 Init {},
111}
112
113impl std::fmt::Display for MethodSpecifier {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 write!(f, "{}", self.name())
116 }
117}
118
119#[pymethods]
120impl MethodSpecifier {
121 #[getter(name)]
122 fn py_name(&self) -> &str {
123 self.name()
124 }
125}
126
127impl MethodSpecifier {
128 pub(crate) fn name(&self) -> &str {
129 match self {
130 MethodSpecifier::ReturnsResponse { name } => name,
131 MethodSpecifier::ExplicitPort { name } => name,
132 MethodSpecifier::Init {} => "__init__",
133 }
134 }
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Eq)]
144pub enum PythonResponseMessage {
145 Result(serde_multipart::Part),
146 Exception(serde_multipart::Part),
147}
148
149wirevalue::register_type!(PythonResponseMessage);
150wirevalue::register_type!(ValueOverlay<PythonResponseMessage>);
151
152#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
157#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
158pub struct AccumulatedResponses(ValueOverlay<PythonResponseMessage>);
159
160#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
161#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq)]
162pub enum PythonMessageKind {
163 CallMethod {
164 name: MethodSpecifier,
165 response_port: Option<EitherPortRef>,
166 },
167 Result {
168 rank: Option<usize>,
169 },
170 Exception {
171 rank: Option<usize>,
172 },
173 Uninit {},
174 CallMethodIndirect {
175 name: MethodSpecifier,
176 local_state_broker: (String, usize),
177 id: usize,
178 unflatten_args: Vec<UnflattenArg>,
181 },
182 AccumulatedResponses(AccumulatedResponses),
183}
184wirevalue::register_type!(PythonMessageKind);
185
186impl Default for PythonMessageKind {
187 fn default() -> Self {
188 PythonMessageKind::Uninit {}
189 }
190}
191
192fn mailbox<'py, T: Actor>(py: Python<'py>, cx: &Context<'_, T>) -> Bound<'py, PyAny> {
193 let mailbox: PyMailbox = cx.mailbox_for_py().clone().into();
194 mailbox.into_bound_py_any(py).unwrap()
195}
196
197#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
198#[derive(Clone, Serialize, Deserialize, Named, Default, PartialEq)]
199pub struct PythonMessage {
200 pub kind: PythonMessageKind,
201 pub message: Part,
202}
203
204fn python_message_endpoint_name(msg: &PythonMessage) -> Option<String> {
206 match &msg.kind {
207 PythonMessageKind::CallMethod { name, .. }
208 | PythonMessageKind::CallMethodIndirect { name, .. } => Some(name.name().to_string()),
209 _ => None,
210 }
211}
212
213wirevalue::submit! {
218 wirevalue::TypeInfo {
219 typename: <PythonMessage as wirevalue::Named>::typename,
220 typehash: <PythonMessage as wirevalue::Named>::typehash,
221 typeid: <PythonMessage as wirevalue::Named>::typeid,
222 port: <PythonMessage as wirevalue::Named>::port,
223 dump: Some(<PythonMessage as wirevalue::NamedDumpable>::dump),
224 arm_unchecked: <PythonMessage as wirevalue::Named>::arm_unchecked,
225 endpoint_name: |ptr| {
226 let msg = unsafe { &*(ptr as *const PythonMessage) };
228 python_message_endpoint_name(msg)
229 },
230 }
231}
232
233wirevalue::submit! {
241 wirevalue::TypeInfo {
242 typename: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typename,
243 typehash: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typehash,
244 typeid: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typeid,
245 port: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::port,
246 dump: None,
247 arm_unchecked: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::arm_unchecked,
248 endpoint_name: |ptr| {
249 let erased = unsafe { &*(ptr as *const IndexedErasedUnbound<PythonMessage>) };
251 erased
252 .inner_any()
253 .deserialized_unchecked::<PythonMessage>()
254 .ok()
255 .and_then(|msg| python_message_endpoint_name(&msg))
256 },
257 }
258}
259
260impl From<ValueOverlay<PythonResponseMessage>> for PythonMessage {
261 fn from(overlay: ValueOverlay<PythonResponseMessage>) -> Self {
262 PythonMessage {
263 kind: PythonMessageKind::AccumulatedResponses(AccumulatedResponses(overlay)),
264 message: Default::default(),
265 }
266 }
267}
268
269impl PythonMessage {
270 pub fn into_overlay(self) -> anyhow::Result<ValueOverlay<PythonResponseMessage>> {
275 match self.kind {
276 PythonMessageKind::AccumulatedResponses(overlay) => Ok(overlay.0),
277 PythonMessageKind::Result { rank, .. } => {
278 let rank = rank.expect("accumulated response should have a rank");
279 let mut overlay = ValueOverlay::new();
280 overlay.push_run(rank..rank + 1, PythonResponseMessage::Result(self.message))?;
281 Ok(overlay)
282 }
283 PythonMessageKind::Exception { rank, .. } => {
284 let rank = rank.expect("accumulated exception should have a rank");
285 let mut overlay = ValueOverlay::new();
286 overlay.push_run(
287 rank..rank + 1,
288 PythonResponseMessage::Exception(self.message),
289 )?;
290 Ok(overlay)
291 }
292 other => {
293 anyhow::bail!(
294 "unexpected message kind {:?} in collected responses reducer",
295 other
296 );
297 }
298 }
299 }
300}
301
302struct ResolvedCallMethod {
303 method: MethodSpecifier,
304 bytes: FrozenBuffer,
305 local_state: Option<Py<PyAny>>,
306 response_port: ResponsePort,
309}
310
311enum ResponsePort {
312 Dropping,
313 Port(Port),
314 Local(LocalPort),
315}
316
317impl ResponsePort {
318 fn into_py_any(self, py: Python<'_>) -> PyResult<Py<PyAny>> {
319 match self {
320 ResponsePort::Dropping => DroppingPort.into_py_any(py),
321 ResponsePort::Port(port) => port.into_py_any(py),
322 ResponsePort::Local(port) => port.into_py_any(py),
323 }
324 }
325}
326
327#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
330pub struct QueuedMessage {
331 #[pyo3(get)]
332 pub context: Py<crate::context::PyContext>,
333 #[pyo3(get)]
334 pub method: MethodSpecifier,
335 #[pyo3(get)]
336 pub bytes: FrozenBuffer,
337 #[pyo3(get)]
338 pub local_state: Py<PyAny>,
339 #[pyo3(get)]
340 pub response_port: Py<PyAny>,
341}
342
343impl PythonMessage {
344 pub fn new_from_buf(kind: PythonMessageKind, message: impl Into<Part>) -> Self {
345 Self {
346 kind,
347 message: message.into(),
348 }
349 }
350
351 pub fn into_rank(self, rank: usize) -> Self {
352 let rank = Some(rank);
353 match self.kind {
354 PythonMessageKind::Result { .. } => PythonMessage {
355 kind: PythonMessageKind::Result { rank },
356 message: self.message,
357 },
358 PythonMessageKind::Exception { .. } => PythonMessage {
359 kind: PythonMessageKind::Exception { rank },
360 message: self.message,
361 },
362 _ => panic!("PythonMessage is not a response but {:?}", self),
363 }
364 }
365 async fn resolve_indirect_call(
366 self,
367 cx: &Context<'_, PythonActor>,
368 ) -> anyhow::Result<ResolvedCallMethod> {
369 match self.kind {
370 PythonMessageKind::CallMethodIndirect {
371 name,
372 local_state_broker,
373 id,
374 unflatten_args,
375 } => {
376 let broker = BrokerId::new(local_state_broker).resolve(cx).await;
377 let (send, recv) = cx.open_once_port();
378 broker.post(cx, LocalStateBrokerMessage::Get(id, send));
379 let state = recv.recv().await?;
380 let mut state_it = state.state.into_iter();
381 monarch_with_gil(|py| {
382 let mailbox = mailbox(py, cx);
383 let local_state = Some(
384 PyList::new(
385 py,
386 unflatten_args.into_iter().map(|x| -> Bound<'_, PyAny> {
387 match x {
388 UnflattenArg::Mailbox => mailbox.clone(),
389 UnflattenArg::PyObject => {
390 state_it.next().unwrap().into_bound(py)
391 }
392 }
393 }),
394 )
395 .unwrap()
396 .into(),
397 );
398 let response_port = ResponsePort::Local(LocalPort {
399 instance: cx.into(),
400 inner: Some(state.response_port),
401 });
402 Ok(ResolvedCallMethod {
403 method: name,
404 bytes: FrozenBuffer {
405 inner: self.message.into_bytes(),
406 },
407 local_state,
408 response_port,
409 })
410 })
411 .await
412 }
413 PythonMessageKind::CallMethod {
414 name,
415 response_port,
416 } => {
417 let method_name = name.name().to_string();
418 let response_port = response_port.map_or(ResponsePort::Dropping, |port_ref| {
419 let point = cx.cast_point();
420 let mut reply_headers = hyperactor_config::Flattrs::new();
425 hyperactor_config::attrs::copy_marked_flattrs(
426 &mut reply_headers,
427 cx.headers(),
428 hyperactor_config::attrs::OPERATION_CONTEXT_HEADER,
429 );
430 if reply_headers
431 .get(hyperactor::mailbox::headers::OPERATION_ENDPOINT)
432 .is_none()
433 {
434 reply_headers.set(
435 hyperactor::mailbox::headers::OPERATION_ENDPOINT,
436 format!("{}()", method_name),
437 );
438 }
439 ResponsePort::Port(Port::with_reply_headers(
440 port_ref,
441 cx.instance().clone_for_py(),
442 Some(point.rank()),
443 reply_headers,
444 ))
445 });
446 Ok(ResolvedCallMethod {
447 method: name,
448 bytes: FrozenBuffer {
449 inner: self.message.into_bytes(),
450 },
451 local_state: None,
452 response_port,
453 })
454 }
455 _ => {
456 panic!("unexpected message kind {:?}", self.kind)
457 }
458 }
459 }
460}
461
462impl std::fmt::Debug for PythonMessage {
463 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464 f.debug_struct("PythonMessage")
465 .field("kind", &self.kind)
466 .field(
467 "message",
468 &wirevalue::HexFmt(&(*self.message.to_bytes())[..]).to_string(),
469 )
470 .finish()
471 }
472}
473
474impl Unbind for PythonMessage {
475 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
476 match &self.kind {
477 PythonMessageKind::CallMethod { response_port, .. } => response_port.unbind(bindings),
478 _ => Ok(()),
479 }
480 }
481}
482
483impl Bind for PythonMessage {
484 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
485 match &mut self.kind {
486 PythonMessageKind::CallMethod { response_port, .. } => response_port.bind(bindings),
487 _ => Ok(()),
488 }
489 }
490}
491
492#[pymethods]
493impl PythonMessage {
494 #[new]
495 #[pyo3(signature = (kind, message))]
496 pub fn new(kind: PythonMessageKind, message: PyRef<'_, FrozenBuffer>) -> PyResult<Self> {
497 Ok(PythonMessage::new_from_buf(kind, message.inner.clone()))
498 }
499
500 #[getter]
501 fn kind(&self) -> PythonMessageKind {
502 self.kind.clone()
503 }
504
505 #[getter]
506 fn message(&self) -> FrozenBuffer {
507 FrozenBuffer {
508 inner: self.message.to_bytes(),
509 }
510 }
511}
512
513#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
514pub(super) struct PythonActorHandle {
515 pub(super) inner: ActorHandle<PythonActor>,
516}
517
518#[pymethods]
519impl PythonActorHandle {
520 fn send(&self, instance: &PyInstance, message: &PythonMessage) -> PyResult<()> {
522 self.inner.post(instance.deref(), message.clone());
523 Ok(())
524 }
525
526 fn bind(&self) -> PyActorAddr {
527 self.inner.bind::<PythonActor>().into_actor_addr().into()
528 }
529}
530
531#[derive(Debug)]
533pub enum PythonActorDispatchMode {
534 Direct,
536 Queue {
538 sender: pympsc::Sender,
540 receiver: Option<pympsc::PyReceiver>,
542 },
543}
544
545#[derive(Debug)]
547#[hyperactor::export(
548 handlers = [
549 PythonMessage { cast = true },
550 MeshFailure { cast = true },
551 ],
552)]
553#[hyperactor::spawnable]
554pub struct PythonActor {
555 actor: Py<PyAny>,
557 task_locals: Option<pyo3_async_runtimes::TaskLocals>,
560 instance: Option<Py<crate::context::PyInstance>>,
563 dispatch_mode: PythonActorDispatchMode,
565 spawn_point: OnceLock<Option<Point>>,
567 init_message: Option<PythonMessage>,
569 mesh_base_name: Option<String>,
578}
579
580impl PythonActor {
581 pub(crate) fn new(
582 actor_type: PickledPyObject,
583 init_message: Option<PythonMessage>,
584 spawn_point: Option<Point>,
585 mesh_base_name: Option<String>,
586 ) -> Result<Self, anyhow::Error> {
587 let use_queue_dispatch = hyperactor_config::global::get(ACTOR_QUEUE_DISPATCH);
588
589 Ok(monarch_with_gil_blocking(
590 |py| -> Result<Self, SerializablePyErr> {
591 let unpickled = actor_type.unpickle(py)?;
592 let class_type: &Bound<'_, PyType> = unpickled.downcast()?;
593 let actor: Py<PyAny> = class_type.call0()?.into_py_any(py)?;
594
595 let task_locals = (!hyperactor_config::global::get(SHARED_ASYNCIO_RUNTIME))
597 .then(|| Python::detach(py, create_task_locals));
598
599 let dispatch_mode = if use_queue_dispatch {
600 let (sender, receiver) = pympsc::channel().map_err(|e| {
601 let py_err = PyRuntimeError::new_err(e.to_string());
602 SerializablePyErr::from(py, &py_err)
603 })?;
604 PythonActorDispatchMode::Queue {
605 sender,
606 receiver: Some(receiver),
607 }
608 } else {
609 PythonActorDispatchMode::Direct
610 };
611
612 Ok(Self {
613 actor,
614 task_locals,
615 instance: None,
616 dispatch_mode,
617 spawn_point: OnceLock::from(spawn_point),
618 init_message,
619 mesh_base_name,
620 })
621 },
622 )?)
623 }
624
625 fn get_task_locals(&self, py: Python) -> &pyo3_async_runtimes::TaskLocals {
628 self.task_locals
629 .as_ref()
630 .unwrap_or_else(|| shared_task_locals(py))
631 }
632
633 pub(crate) fn bootstrap_client(py: Python<'_>) -> (&'static Instance<Self>, ActorHandle<Self>) {
636 static ROOT_CLIENT_INSTANCE: OnceLock<Instance<PythonActor>> = OnceLock::new();
637
638 let client_proc = Proc::direct(
639 default_bind_spec().binding_addr(),
640 "mesh_root_client_proc".into(),
641 )
642 .unwrap();
643
644 Self::bootstrap_client_inner(py, client_proc, &ROOT_CLIENT_INSTANCE)
645 }
646
647 pub(crate) fn bootstrap_client_inner(
651 py: Python<'_>,
652 client_proc: Proc,
653 root_client_instance: &'static OnceLock<Instance<PythonActor>>,
654 ) -> (&'static Instance<Self>, ActorHandle<Self>) {
655 let actor_mesh_mod = py
656 .import("monarch._src.actor.actor_mesh")
657 .expect("import actor_mesh");
658 let root_client_class = actor_mesh_mod
659 .getattr("RootClientActor")
660 .expect("get RootClientActor");
661
662 let actor_type =
663 PickledPyObject::pickle(&actor_mesh_mod.getattr("_Actor").expect("get _Actor"))
664 .expect("pickle _Actor");
665
666 let init_frozen_buffer: FrozenBuffer = root_client_class
667 .call_method0("_pickled_init_args")
668 .expect("call RootClientActor._pickled_init_args")
669 .extract()
670 .expect("extract FrozenBuffer from _pickled_init_args");
671 let init_message = PythonMessage::new_from_buf(
672 PythonMessageKind::CallMethod {
673 name: MethodSpecifier::Init {},
674 response_port: None,
675 },
676 init_frozen_buffer,
677 );
678
679 let mut actor = PythonActor::new(
680 actor_type,
681 Some(init_message),
682 Some(extent!().point_of_rank(0).unwrap()),
683 None, )
685 .expect("create client PythonActor");
686
687 let ai = client_proc
688 .actor_instance(
689 root_client_class
690 .getattr("name")
691 .expect("get RootClientActor.name")
692 .extract()
693 .expect("extract RootClientActor.name"),
694 )
695 .expect("root instance create");
696
697 let handle = ai.handle;
698 let signal_rx = ai.signal;
699 let supervision_rx = ai.supervision;
700 let work_rx = ai.work;
701
702 root_client_instance
703 .set(ai.instance)
704 .map_err(|_| "already initialized root client instance")
705 .unwrap();
706 let instance = root_client_instance.get().unwrap();
707
708 instance.set_system();
712
713 let _client_ref = handle.bind::<PythonActor>();
715
716 get_tokio_runtime().spawn(async move {
717 actor.init(instance).await.unwrap();
719
720 let mut signal_rx = signal_rx;
721 let mut supervision_rx = supervision_rx;
722 let mut work_rx = work_rx;
723 let mut need_drain = false;
724 let mut err = 'messages: loop {
725 tokio::select! {
726 work = work_rx.recv() => {
727 let work = work.expect("inconsistent work queue state");
728 if let Err(err) = work.handle(&mut actor, instance).await {
729 let is_hook_exception = monarch_with_gil(|py| {
736 err.downcast_ref::<pyo3::PyErr>()
737 .is_some_and(|pyerr| {
738 pyerr.is_instance(
739 py,
740 &unhandled_fault_hook_exception(py),
741 )
742 })
743 }).await;
744
745 let kind = ActorErrorKind::processing(err);
746 let err = ActorError {
747 actor_id: Box::new(instance.self_addr().clone()),
748 kind: Box::new(kind),
749 };
750
751 if is_hook_exception {
752 break Some(err);
753 }
754
755 let supervision_event = actor_error_to_event(instance, &actor, err);
761 if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
765 while let Ok(supervision_event) = supervision_rx.try_recv() {
766 if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
767 break 'messages Some(err);
768 }
769 }
770 break Some(err);
771 }
772 }
773 }
774 signal = signal_rx.recv() => {
775 let signal = signal.map_err(ActorError::from);
776 tracing::info!(actor_id = %instance.self_addr(), "client received signal {signal:?}");
777 match signal {
778 Ok(signal@(Signal::Stop(_) | Signal::DrainAndStop(_))) => {
779 need_drain = matches!(signal, Signal::DrainAndStop(_));
780 break None;
781 },
782 Ok(Signal::ExitRequested(_)) => break None,
783 Ok(Signal::ChildStopped(_)) => {},
784 Ok(Signal::Kill(reason)) => {
785 break Some(ActorError { actor_id: Box::new(instance.self_addr().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) })
786 },
787 Err(err) => break Some(err),
788 }
789 }
790 Some(supervision_event) = supervision_rx.recv() => {
791 if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
792 break Some(err);
793 }
794 }
795 };
796 };
797 if need_drain {
798 let mut n = 0;
799 while let Ok(work) = work_rx.try_recv() {
800 if let Err(e) = work.handle(&mut actor, instance).await {
801 err = Some(ActorError {
802 actor_id: Box::new(instance.self_addr().clone()),
803 kind: Box::new(ActorErrorKind::processing(e)),
804 });
805 break;
806 }
807 n += 1;
808 }
809 tracing::debug!(actor_id = %instance.self_addr(), "client drained {} messages before stopping", n);
810 }
811 if let Some(err) = err {
812 let event = actor_error_to_event(instance, &actor, err);
813 tracing::error!(
817 actor_id = %instance.self_addr(),
818 "could not propagate supervision event {} because it reached the global client: signaling KeyboardInterrupt to main thread",
819 event,
820 );
821
822 monarch_with_gil_blocking(|py| {
831 let thread_mod = py.import("_thread").expect("import _thread");
834 let interrupt_main = thread_mod
835 .getattr("interrupt_main")
836 .expect("get interrupt_main");
837
838 if let Err(e) = interrupt_main.call0() {
840 tracing::error!("unable to interrupt main, exiting the process instead: {:?}", e);
841 eprintln!("unable to interrupt main, exiting the process with code 1 instead: {:?}", e);
842 std::process::exit(1);
843 }
844 });
845 } else {
846 tracing::info!(actor_id = %instance.self_addr(), "client stopped");
847 instance.change_status(hyperactor::actor::ActorStatus::Stopped("client stopped".into()));
848 }
849 });
850
851 (root_client_instance.get().unwrap(), handle)
852 }
853}
854
855fn actor_error_to_event(
856 instance: &Instance<PythonActor>,
857 actor: &PythonActor,
858 err: ActorError,
859) -> ActorSupervisionEvent {
860 match *err.kind {
861 ActorErrorKind::UnhandledSupervisionEvent(event) => *event,
862 _ => {
863 let status = ActorStatus::generic_failure(err.kind.to_string());
864 ActorSupervisionEvent::new(
865 instance.self_addr().clone(),
866 actor.display_name(),
867 status,
868 None,
869 )
870 }
871 }
872}
873
874pub(crate) fn root_client_actor(py: Python<'_>) -> &'static Instance<PythonActor> {
875 static ROOT_CLIENT_ACTOR: OnceLock<&'static Instance<PythonActor>> = OnceLock::new();
876
877 py.detach(|| {
882 ROOT_CLIENT_ACTOR.get_or_init(|| {
883 monarch_with_gil_blocking(|py| {
884 let (client, _handle) = PythonActor::bootstrap_client(py);
885 client
886 })
887 })
888 })
889}
890
891#[async_trait]
892impl Actor for PythonActor {
893 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
894 if let PythonActorDispatchMode::Queue { receiver, .. } = &mut self.dispatch_mode {
895 let receiver = receiver.take().unwrap();
896
897 let error_port: hyperactor::PortHandle<PythonMessage> =
900 this.signal_port().contramap(|msg: PythonMessage| {
901 monarch_with_gil_blocking(|py| {
902 let err = match msg.kind {
903 PythonMessageKind::Exception { .. } => {
904 let cloudpickle = py.import("cloudpickle").unwrap();
906 let err_obj = cloudpickle
907 .call_method1("loads", (msg.message.to_bytes().as_ref(),))
908 .unwrap();
909 let py_err = pyo3::PyErr::from_value(err_obj);
910 SerializablePyErr::from(py, &py_err)
911 }
912 _ => {
913 let py_err = PyRuntimeError::new_err(format!(
914 "expected Exception, got {:?}",
915 msg.kind
916 ));
917 SerializablePyErr::from(py, &py_err)
918 }
919 };
920 Signal::Kill(err.to_string())
921 })
922 });
923
924 let error_port_handle = PythonPortHandle::new(error_port);
925
926 monarch_with_gil(|py| {
927 let tl = self
928 .task_locals
929 .as_ref()
930 .unwrap_or_else(|| shared_task_locals(py));
931 let awaitable = self.actor.call_method(
932 py,
933 "_dispatch_loop",
934 (receiver, error_port_handle),
935 None,
936 )?;
937 let future =
938 pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))?;
939 tokio::spawn(async move {
940 if let Err(e) = future.await {
941 tracing::error!("message loop error: {}", e);
942 }
943 });
944 Ok::<_, anyhow::Error>(())
945 })
946 .await?;
947 }
948
949 if let Some(init_message) = self.init_message.take() {
950 let spawn_point = self.spawn_point.get().unwrap().as_ref().expect("PythonActor should never be spawned with init_message unless spawn_point also specified").clone();
951 let mut headers = Flattrs::new();
952 headers.set(CAST_POINT, spawn_point);
953 let cx = Context::new(this, headers);
954 <Self as Handler<PythonMessage>>::handle(self, &cx, init_message).await?;
955 }
956
957 Ok(())
958 }
959
960 async fn cleanup(
961 &mut self,
962 this: &Instance<Self>,
963 err: Option<&ActorError>,
964 ) -> anyhow::Result<()> {
965 let cx = Context::new(this, Flattrs::new());
969 let err_as_str = err.map(|e| e.to_string());
973 let future = monarch_with_gil(|py| {
974 let py_cx = match &self.instance {
975 Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
976 None => {
977 let py_instance: crate::context::PyInstance = this.into();
978 crate::context::PyContext::new(
979 &cx,
980 py_instance
981 .into_py_any(py)?
982 .downcast_bound(py)
983 .map_err(PyErr::from)?
984 .clone()
985 .unbind(),
986 )
987 }
988 }
989 .into_bound_py_any(py)?;
990 let actor = self.actor.bind(py);
991 match actor.hasattr("__cleanup__") {
994 Ok(false) | Err(_) => {
995 return Ok(None);
997 }
998 _ => {}
999 }
1000 let awaitable = actor
1001 .call_method("__cleanup__", (&py_cx, err_as_str), None)
1002 .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?;
1003 if awaitable.is_none() {
1004 Ok(None)
1005 } else {
1006 pyo3_async_runtimes::into_future_with_locals(self.get_task_locals(py), awaitable)
1007 .map(Some)
1008 .map_err(anyhow::Error::from)
1009 }
1010 })
1011 .await?;
1012 if let Some(future) = future {
1013 future.await.map_err(anyhow::Error::from)?;
1014 }
1015 Ok(())
1016 }
1017
1018 fn display_name(&self) -> Option<String> {
1019 self.instance.as_ref().and_then(|instance| {
1020 monarch_with_gil_blocking(|py| instance.bind(py).str().ok().map(|s| s.to_string()))
1021 })
1022 }
1023
1024 async fn handle_undeliverable_message(
1025 &mut self,
1026 ins: &Instance<Self>,
1027 mut envelope: Undeliverable<MessageEnvelope>,
1028 ) -> Result<(), anyhow::Error> {
1029 if envelope
1030 .as_message()
1031 .is_some_and(|envelope| envelope.sender() != ins.self_addr())
1032 {
1033 envelope = update_undeliverable_envelope_for_casting(envelope);
1035 }
1036 let envelope = match envelope {
1037 Undeliverable::Message(envelope) => envelope,
1038 Undeliverable::Lost(lost) => {
1039 return Err(UndeliverableMessageError::Lost { lost }.into());
1040 }
1041 };
1042 assert_eq!(
1043 envelope.sender(),
1044 ins.self_addr(),
1045 "undeliverable message was returned to the wrong actor. \
1046 Return address = {}, src actor = {}, dest handler port = {}, message type = {}, envelope headers = {}",
1047 envelope.sender(),
1048 ins.self_addr(),
1049 envelope.dest(),
1050 envelope.data().typename().unwrap_or("unknown"),
1051 envelope.headers()
1052 );
1053
1054 let cx = Context::new(ins, envelope.headers().clone());
1055
1056 let (envelope, handled) = monarch_with_gil(|py| {
1057 let py_cx = match &self.instance {
1058 Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
1059 None => {
1060 let py_instance: crate::context::PyInstance = ins.into();
1061 crate::context::PyContext::new(
1062 &cx,
1063 py_instance
1064 .into_py_any(py)?
1065 .downcast_bound(py)
1066 .map_err(PyErr::from)?
1067 .clone()
1068 .unbind(),
1069 )
1070 }
1071 }
1072 .into_bound_py_any(py)?;
1073 let py_envelope = PythonUndeliverableMessageEnvelope {
1074 inner: Some(Undeliverable::Message(envelope)),
1075 }
1076 .into_bound_py_any(py)?;
1077 let handled = self
1078 .actor
1079 .call_method(
1080 py,
1081 "_handle_undeliverable_message",
1082 (&py_cx, &py_envelope),
1083 None,
1084 )
1085 .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?
1086 .extract::<bool>(py)?;
1087 Ok::<_, anyhow::Error>((
1088 py_envelope
1089 .downcast::<PythonUndeliverableMessageEnvelope>()
1090 .map_err(PyErr::from)?
1091 .try_borrow_mut()
1092 .map_err(PyErr::from)?
1093 .take()?,
1094 handled,
1095 ))
1096 })
1097 .await?;
1098
1099 if !handled {
1100 hyperactor::actor::handle_undeliverable_message(ins, envelope)
1101 } else {
1102 Ok(())
1103 }
1104 }
1105
1106 async fn handle_supervision_event(
1107 &mut self,
1108 this: &Instance<Self>,
1109 event: &ActorSupervisionEvent,
1110 ) -> Result<bool, anyhow::Error> {
1111 let cx = Context::new(this, Flattrs::new());
1112 self.handle(
1113 &cx,
1114 MeshFailure {
1115 actor_mesh_name: self.mesh_base_name.clone(),
1119 event: event.clone(),
1120 crashed_ranks: vec![],
1121 },
1122 )
1123 .await
1124 .map(|_| true)
1125 }
1126}
1127
1128#[derive(Debug, Clone, Serialize, Deserialize, Named)]
1129pub struct PythonActorParams {
1130 actor_type: PickledPyObject,
1132 init_message: Option<PythonMessage>,
1134 mesh_base_name: Option<String>,
1145}
1146
1147impl PythonActorParams {
1148 pub(crate) fn new(
1149 actor_type: PickledPyObject,
1150 init_message: Option<PythonMessage>,
1151 mesh_base_name: Option<String>,
1152 ) -> Self {
1153 Self {
1154 actor_type,
1155 init_message,
1156 mesh_base_name,
1157 }
1158 }
1159}
1160
1161#[async_trait]
1162impl RemoteSpawn for PythonActor {
1163 type Params = PythonActorParams;
1164
1165 async fn new(
1166 PythonActorParams {
1167 actor_type,
1168 init_message,
1169 mesh_base_name,
1170 }: PythonActorParams,
1171 environment: Flattrs,
1172 ) -> Result<Self, anyhow::Error> {
1173 let spawn_point = environment.get(CAST_POINT);
1174 Self::new(actor_type, init_message, spawn_point, mesh_base_name)
1175 }
1176}
1177
1178fn create_task_locals() -> pyo3_async_runtimes::TaskLocals {
1180 monarch_with_gil_blocking(|py| {
1181 let asyncio = Python::import(py, "asyncio").unwrap();
1182 let event_loop = asyncio.call_method0("new_event_loop").unwrap();
1183 let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
1184 .copy_context(py)
1185 .unwrap();
1186
1187 let kwargs = PyDict::new(py);
1188 let target = event_loop.getattr("run_forever").unwrap();
1189 kwargs.set_item("target", target).unwrap();
1190 kwargs.set_item("daemon", true).unwrap();
1192 let thread = py
1193 .import("threading")
1194 .unwrap()
1195 .call_method("Thread", (), Some(&kwargs))
1196 .unwrap();
1197 thread.call_method0("start").unwrap();
1198 task_locals
1199 })
1200}
1201
1202fn shared_task_locals(py: Python) -> &'static pyo3_async_runtimes::TaskLocals {
1204 static SHARED_TASK_LOCALS: OnceLock<pyo3_async_runtimes::TaskLocals> = OnceLock::new();
1205 Python::detach(py, || SHARED_TASK_LOCALS.get_or_init(create_task_locals))
1206}
1207
1208#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1244struct PanicFlag {
1245 sender: Option<tokio::sync::oneshot::Sender<Py<PyAny>>>,
1246}
1247
1248#[pymethods]
1249impl PanicFlag {
1250 fn signal_panic(&mut self, ex: Py<PyAny>) {
1251 self.sender.take().unwrap().send(ex).unwrap();
1252 }
1253}
1254
1255#[async_trait]
1256impl Handler<PythonMessage> for PythonActor {
1257 #[tracing::instrument(level = "debug", skip_all)]
1258 async fn handle(
1259 &mut self,
1260 cx: &Context<PythonActor>,
1261 message: PythonMessage,
1262 ) -> anyhow::Result<()> {
1263 match &self.dispatch_mode {
1264 PythonActorDispatchMode::Direct => self.handle_direct(cx, message).await,
1265 PythonActorDispatchMode::Queue { sender, .. } => {
1266 let sender = sender.clone();
1267 self.handle_queue(cx, sender, message).await
1268 }
1269 }
1270 }
1271}
1272
1273impl PythonActor {
1274 async fn handle_direct(
1276 &mut self,
1277 cx: &Context<'_, PythonActor>,
1278 message: PythonMessage,
1279 ) -> anyhow::Result<()> {
1280 let resolved = message.resolve_indirect_call(cx).await?;
1281 let endpoint = resolved.method.to_string();
1282
1283 let (sender, receiver) = oneshot::channel();
1286
1287 let future = monarch_with_gil(|py| -> Result<_, SerializablePyErr> {
1288 let inst = self.instance.get_or_insert_with(|| {
1289 let inst: crate::context::PyInstance = cx.into();
1290 inst.into_pyobject(py).unwrap().into()
1291 });
1292
1293 let awaitable = self.actor.call_method(
1294 py,
1295 "handle",
1296 (
1297 crate::context::PyContext::new(cx, inst.clone_ref(py)),
1298 resolved.method,
1299 resolved.bytes,
1300 PanicFlag {
1301 sender: Some(sender),
1302 },
1303 resolved
1304 .local_state
1305 .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1306 resolved.response_port.into_py_any(py)?,
1307 ),
1308 None,
1309 )?;
1310
1311 let tl = self
1312 .task_locals
1313 .as_ref()
1314 .unwrap_or_else(|| shared_task_locals(py));
1315
1316 pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))
1317 .map_err(|err| err.into())
1318 })
1319 .await?;
1320
1321 tokio::spawn(handle_async_endpoint_panic(
1323 cx.signal_port(),
1324 PythonTask::new(future)?,
1325 receiver,
1326 cx.self_addr().to_string(),
1327 endpoint,
1328 ));
1329 Ok(())
1330 }
1331
1332 async fn handle_queue(
1335 &mut self,
1336 cx: &Context<'_, PythonActor>,
1337 sender: pympsc::Sender,
1338 message: PythonMessage,
1339 ) -> anyhow::Result<()> {
1340 let resolved = message.resolve_indirect_call(cx).await?;
1341
1342 let queued_msg = monarch_with_gil(|py| -> anyhow::Result<QueuedMessage> {
1343 let inst = self.instance.get_or_insert_with(|| {
1344 let inst: crate::context::PyInstance = cx.into();
1345 inst.into_pyobject(py).unwrap().into()
1346 });
1347
1348 let py_context = crate::context::PyContext::new(cx, inst.clone_ref(py));
1349 let py_context_obj = Py::new(py, py_context)?;
1350
1351 Ok(QueuedMessage {
1352 context: py_context_obj,
1353 method: resolved.method,
1354 bytes: resolved.bytes,
1355 local_state: resolved
1356 .local_state
1357 .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1358 response_port: resolved.response_port.into_py_any(py)?,
1359 })
1360 })
1361 .await?;
1362
1363 sender
1364 .send(queued_msg)
1365 .map_err(|_| anyhow::anyhow!("failed to send message to queue"))?;
1366
1367 Ok(())
1368 }
1369}
1370
1371#[async_trait]
1372impl Handler<MeshFailure> for PythonActor {
1373 async fn handle(&mut self, cx: &Context<Self>, message: MeshFailure) -> anyhow::Result<()> {
1374 if !message.event.actor_status.is_failed() {
1378 tracing::info!(
1379 "ignoring non-failure supervision event from child: {}",
1380 message
1381 );
1382 return Ok(());
1383 }
1384 let (display_name, fut) = monarch_with_gil(|py| {
1393 let inst = self.instance.get_or_insert_with(|| {
1394 let inst: crate::context::PyInstance = cx.into();
1395 inst.into_pyobject(py).unwrap().into()
1396 });
1397 let display_name: Option<String> = inst.bind(py).str().ok().map(|s| s.to_string());
1399 let actor_bound = self.actor.bind(py);
1400 if !actor_bound.hasattr("__supervise__")? {
1403 return Err(anyhow::anyhow!(
1404 "no __supervise__ method on {:?}",
1405 actor_bound
1406 ));
1407 }
1408 let awaitable = actor_bound.call_method(
1409 "__supervise__",
1410 (
1411 crate::context::PyContext::new(cx, inst.clone_ref(py)),
1412 PyMeshFailure::from(message.clone()),
1413 ),
1414 None,
1415 )?;
1416 let fut =
1417 pyo3_async_runtimes::into_future_with_locals(self.get_task_locals(py), awaitable)?;
1418 anyhow::Ok((display_name, fut))
1419 })
1420 .await?;
1421
1422 let awaited = fut.await;
1423
1424 monarch_with_gil(|py| match awaited {
1425 Ok(s) => {
1426 if s.bind(py).is_truthy()? {
1427 tracing::info!(
1432 name = "ActorMeshStatus",
1433 status = "SupervisionError::Handled",
1434 actor_name = message.actor_mesh_name,
1436 event = %message.event,
1437 "__supervise__ on {} handled a supervision event, not reporting any further",
1438 cx.self_addr(),
1439 );
1440 Ok(())
1441 } else {
1442 for (actor_name, status) in [
1451 (
1452 message
1453 .actor_mesh_name
1454 .as_deref()
1455 .unwrap_or_else(|| message.event.actor_id.log_name()),
1456 "SupervisionError::Unhandled",
1457 ),
1458 (cx.self_addr().log_name(), "UnhandledSupervisionEvent"),
1459 ] {
1460 tracing::info!(
1461 name = "ActorMeshStatus",
1462 status,
1463 actor_name,
1464 event = %message.event,
1465 "__supervise__ on {} did not handle a supervision event, reporting to the next next owner",
1466 cx.self_addr(),
1467 );
1468 }
1469 let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1470 ActorSupervisionEvent::new(
1471 cx.self_addr().clone(),
1472 display_name.clone(),
1473 ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(
1474 Box::new(message.event.clone()),
1475 )),
1476 None,
1477 ),
1478 ));
1479 Err(anyhow::Error::new(err))
1480 }
1481 }
1482 Err(err) => {
1483 if err.is_instance(py, &unhandled_fault_hook_exception(py)) {
1488 return Err(err.into());
1489 }
1490
1491 for (actor_name, status) in [
1497 (
1498 message
1499 .actor_mesh_name
1500 .as_deref()
1501 .unwrap_or_else(|| message.event.actor_id.log_name()),
1502 "SupervisionError::__supervise__::exception",
1503 ),
1504 (cx.self_addr().log_name(), "UnhandledSupervisionEvent"),
1505 ] {
1506 tracing::info!(
1507 name = "ActorMeshStatus",
1508 status,
1509 actor_name,
1510 event = %message.event,
1511 "__supervise__ on {} threw an exception",
1512 cx.self_addr(),
1513 );
1514 }
1515 let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1516 ActorSupervisionEvent::new(
1517 cx.self_addr().clone(),
1518 display_name,
1519 ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
1520 err.to_string(),
1521 Box::new(message.event.clone()),
1522 )),
1523 None,
1524 ),
1525 ));
1526 Err(anyhow::Error::new(err))
1527 }
1528 })
1529 .await
1530 }
1531}
1532
1533async fn handle_async_endpoint_panic(
1534 panic_sender: PortHandle<Signal>,
1535 task: PythonTask,
1536 side_channel: oneshot::Receiver<Py<PyAny>>,
1537 actor_id: String,
1538 endpoint: String,
1539) {
1540 let attributes =
1542 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id, "endpoint" => endpoint);
1543
1544 let start_time = std::time::Instant::now();
1546
1547 ENDPOINT_ACTOR_COUNT.add(1, attributes);
1549
1550 let err_or_never = async {
1551 match side_channel.await {
1554 Ok(value) => {
1555 monarch_with_gil(|py| -> Option<SerializablePyErr> {
1556 let err: PyErr = value
1557 .downcast_bound::<PyBaseException>(py)
1558 .unwrap()
1559 .clone()
1560 .into();
1561 ENDPOINT_ACTOR_PANIC.add(1, attributes);
1562 Some(err.into())
1563 })
1564 .await
1565 }
1566 Err(_) => pending().await,
1571 }
1572 };
1573 let future = task.take();
1574 if let Some(panic) = tokio::select! {
1575 result = future => {
1576 match result {
1577 Ok(_) => None,
1578 Err(e) => Some(e.into()),
1579 }
1580 },
1581 result = err_or_never => {
1582 result
1583 }
1584 } {
1585 ENDPOINT_ACTOR_ERROR.add(1, attributes);
1587 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
1588 let client = &CLIENT
1589 .get_or_init(|| get_proc_runtime().client("async_endpoint_handler").unwrap())
1590 .0;
1591 panic_sender.post(&client, Signal::Kill(panic.to_string()));
1592 }
1593
1594 let elapsed_micros = start_time.elapsed().as_micros() as f64;
1596 ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM.record(elapsed_micros, attributes);
1597}
1598
1599#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1600struct LocalPort {
1601 instance: PyInstance,
1602 inner: Option<OncePortHandle<Result<Py<PyAny>, Py<PyAny>>>>,
1603}
1604
1605impl Debug for LocalPort {
1606 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1607 f.debug_struct("LocalPort")
1608 .field("inner", &self.inner)
1609 .finish()
1610 }
1611}
1612
1613pub(crate) fn to_py_error<T>(e: T) -> PyErr
1614where
1615 T: Error,
1616{
1617 PyErr::new::<PyValueError, _>(e.to_string())
1618}
1619
1620#[pymethods]
1621impl LocalPort {
1622 fn send(&mut self, obj: Py<PyAny>) -> PyResult<()> {
1623 let port = self.inner.take().expect("use local port once");
1624 port.post(self.instance.deref(), Ok(obj));
1625 Ok(())
1626 }
1627 fn exception(&mut self, e: Py<PyAny>) -> PyResult<()> {
1628 let port = self.inner.take().expect("use local port once");
1629 port.post(self.instance.deref(), Err(e));
1630 Ok(())
1631 }
1632}
1633
1634#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1638#[derive(Debug)]
1639pub struct DroppingPort;
1640
1641#[pymethods]
1642impl DroppingPort {
1643 #[new]
1644 fn new() -> Self {
1645 DroppingPort
1646 }
1647
1648 fn send(&self, _obj: Py<PyAny>) -> PyResult<()> {
1649 Ok(())
1650 }
1651
1652 fn exception(&self, e: Bound<'_, PyAny>) -> PyResult<()> {
1653 let exc = if let Ok(inner) = e.getattr("exception") {
1655 inner
1656 } else {
1657 e
1658 };
1659 Err(PyErr::from_value(exc))
1660 }
1661
1662 #[getter]
1663 fn get_return_undeliverable(&self) -> bool {
1664 true
1665 }
1666
1667 #[setter]
1668 fn set_return_undeliverable(&self, _value: bool) {}
1669}
1670
1671#[pyclass(module = "monarch._src.actor.actor_mesh")]
1674pub struct Port {
1675 port_ref: EitherPortRef,
1676 instance: Instance<PythonActor>,
1677 rank: Option<usize>,
1678 reply_headers: hyperactor_config::Flattrs,
1682}
1683
1684#[pymethods]
1685impl Port {
1686 #[new]
1687 fn new(
1688 port_ref: EitherPortRef,
1689 instance: &crate::context::PyInstance,
1690 rank: Option<usize>,
1691 ) -> Self {
1692 Self {
1693 port_ref,
1694 instance: instance.clone().into_instance(),
1695 rank,
1696 reply_headers: hyperactor_config::Flattrs::new(),
1697 }
1698 }
1699
1700 #[getter("_port_ref")]
1701 fn port_ref_py(&self) -> EitherPortRef {
1702 self.port_ref.clone()
1703 }
1704
1705 #[getter("_rank")]
1706 fn rank_py(&self) -> Option<usize> {
1707 self.rank
1708 }
1709
1710 #[getter]
1711 fn get_return_undeliverable(&self) -> bool {
1712 self.port_ref.get_return_undeliverable()
1713 }
1714
1715 #[setter]
1716 fn set_return_undeliverable(&mut self, value: bool) {
1717 self.port_ref.set_return_undeliverable(value);
1718 }
1719
1720 fn send(&mut self, py: Python<'_>, obj: Py<PyAny>) -> PyResult<()> {
1721 let message = PythonMessage::new_from_buf(
1722 PythonMessageKind::Result { rank: self.rank },
1723 pickle_to_part(py, &obj)?,
1724 );
1725
1726 self.port_ref
1727 .post_with_headers(&self.instance, self.reply_headers.clone(), message)
1728 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1729 }
1730
1731 fn exception(&mut self, py: Python<'_>, e: Py<PyAny>) -> PyResult<()> {
1732 let message = PythonMessage::new_from_buf(
1733 PythonMessageKind::Exception { rank: self.rank },
1734 pickle_to_part(py, &e)?,
1735 );
1736
1737 self.port_ref
1738 .post_with_headers(&self.instance, self.reply_headers.clone(), message)
1739 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1740 }
1741}
1742
1743impl Port {
1744 pub(crate) fn with_reply_headers(
1748 port_ref: EitherPortRef,
1749 instance: Instance<PythonActor>,
1750 rank: Option<usize>,
1751 reply_headers: hyperactor_config::Flattrs,
1752 ) -> Self {
1753 Self {
1754 port_ref,
1755 instance,
1756 rank,
1757 reply_headers,
1758 }
1759 }
1760}
1761
1762pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
1763 hyperactor_mod.add_class::<PythonActorHandle>()?;
1764 hyperactor_mod.add_class::<PythonMessage>()?;
1765 hyperactor_mod.add_class::<PythonMessageKind>()?;
1766 hyperactor_mod.add_class::<MethodSpecifier>()?;
1767 hyperactor_mod.add_class::<UnflattenArg>()?;
1768 hyperactor_mod.add_class::<PanicFlag>()?;
1769 hyperactor_mod.add_class::<QueuedMessage>()?;
1770 hyperactor_mod.add_class::<DroppingPort>()?;
1771 hyperactor_mod.add_class::<Port>()?;
1772 Ok(())
1773}
1774
1775#[cfg(test)]
1776mod tests {
1777 use hyperactor as reference;
1778 use hyperactor::accum::ReducerSpec;
1779 use hyperactor::accum::StreamingReducerOpts;
1780 use hyperactor::id::Label;
1781 use hyperactor::message::ErasedUnbound;
1782 use hyperactor::message::Unbound;
1783 use hyperactor::testing::ids::test_port_id;
1784 use hyperactor_mesh::Error as MeshError;
1785 use hyperactor_mesh::host_mesh::host_agent::ProcState;
1786 use hyperactor_mesh::mesh_id::ResourceId;
1787 use hyperactor_mesh::resource::Status;
1788 use hyperactor_mesh::resource::{self};
1789 use pyo3::PyTypeInfo;
1790
1791 use super::*;
1792 use crate::actor::to_py_error;
1793
1794 #[test]
1795 fn test_python_message_bind_unbind() {
1796 let reducer_spec = ReducerSpec {
1797 typehash: 123,
1798 builder_params: Some(wirevalue::Any::serialize(&"abcdefg12345".to_string()).unwrap()),
1799 };
1800 let port_ref = hyperactor::PortRef::<PythonMessage>::attest_reducible(
1801 test_port_id("world_0", "client", 123),
1802 Some(reducer_spec),
1803 StreamingReducerOpts::default(),
1804 );
1805 let message = PythonMessage {
1806 kind: PythonMessageKind::CallMethod {
1807 name: MethodSpecifier::ReturnsResponse {
1808 name: "test".to_string(),
1809 },
1810 response_port: Some(EitherPortRef::Unbounded(port_ref.clone().into())),
1811 },
1812 message: Part::from(vec![1, 2, 3]),
1813 };
1814 {
1815 let mut erased = ErasedUnbound::try_from_message(message.clone()).unwrap();
1816 let mut bindings = vec![];
1817 erased
1818 .visit_mut::<reference::UnboundPort>(|b| {
1819 bindings.push(b.clone());
1820 Ok(())
1821 })
1822 .unwrap();
1823 assert_eq!(bindings, vec![reference::UnboundPort::from(&port_ref)]);
1824 let unbound = Unbound::try_from_message(message.clone()).unwrap();
1825 assert_eq!(message, unbound.bind().unwrap());
1826 }
1827
1828 let no_port_message = PythonMessage {
1829 kind: PythonMessageKind::CallMethod {
1830 name: MethodSpecifier::ReturnsResponse {
1831 name: "test".to_string(),
1832 },
1833 response_port: None,
1834 },
1835 ..message
1836 };
1837 {
1838 let mut erased = ErasedUnbound::try_from_message(no_port_message.clone()).unwrap();
1839 let mut bindings = vec![];
1840 erased
1841 .visit_mut::<reference::UnboundPort>(|b| {
1842 bindings.push(b.clone());
1843 Ok(())
1844 })
1845 .unwrap();
1846 assert_eq!(bindings.len(), 0);
1847 let unbound = Unbound::try_from_message(no_port_message.clone()).unwrap();
1848 assert_eq!(no_port_message, unbound.bind().unwrap());
1849 }
1850 }
1851
1852 #[test]
1853 fn to_py_error_preserves_proc_creation_message() {
1854 let state: resource::State<ProcState> = resource::State {
1856 id: ResourceId::instance(Label::new("my-proc").unwrap()),
1857 status: Status::Failed("boom".into()),
1858 state: None,
1859 generation: 0,
1860 timestamp: std::time::SystemTime::now(),
1861 };
1862
1863 let mesh_agent: hyperactor::ActorRef<hyperactor_mesh::host_mesh::HostAgent> =
1865 hyperactor::ActorRef::attest(test_port_id("hello_0", "actor", 0).actor_addr());
1866 let expected_prefix = format!(
1867 "error creating proc (host rank 0) on host mesh agent {}",
1868 mesh_agent
1869 );
1870 let err = MeshError::ProcCreationError {
1871 host_rank: 0,
1872 mesh_agent,
1873 state: Box::new(state),
1874 };
1875
1876 let rust_msg = err.to_string();
1877 let pyerr = to_py_error(err);
1878
1879 pyo3::Python::initialize();
1880 monarch_with_gil_blocking(|py| {
1881 assert!(pyerr.get_type(py).is(PyValueError::type_object(py)));
1882 let py_msg = pyerr.value(py).to_string();
1883
1884 assert_eq!(py_msg, rust_msg);
1886 assert!(py_msg.contains(", state: "));
1888 assert!(py_msg.contains("\"status\":{\"Failed\":\"boom\"}"));
1889 assert!(py_msg.starts_with(&expected_prefix));
1891 });
1892 }
1893}