1use std::error::Error;
10use std::fmt::Debug;
11use std::future::pending;
12use std::ops::Deref;
13use std::sync::OnceLock;
14
15use async_trait::async_trait;
16use hyperactor::Actor;
17use hyperactor::ActorHandle;
18use hyperactor::Context;
19use hyperactor::Handler;
20use hyperactor::Instance;
21use hyperactor::OncePortHandle;
22use hyperactor::PortHandle;
23use hyperactor::Proc;
24use hyperactor::RemoteSpawn;
25use hyperactor::actor::ActorError;
26use hyperactor::actor::ActorErrorKind;
27use hyperactor::actor::ActorStatus;
28use hyperactor::actor::Signal;
29use hyperactor::context::Actor as ContextActor;
30use hyperactor::mailbox::MessageEnvelope;
31use hyperactor::mailbox::Undeliverable;
32use hyperactor::message::Bind;
33use hyperactor::message::Bindings;
34use hyperactor::message::IndexedErasedUnbound;
35use hyperactor::message::Unbind;
36use hyperactor::supervision::ActorSupervisionEvent;
37use hyperactor_config::Flattrs;
38use hyperactor_mesh::casting::update_undeliverable_envelope_for_casting;
39use hyperactor_mesh::comm::multicast::CAST_POINT;
40use hyperactor_mesh::comm::multicast::CastInfo;
41use hyperactor_mesh::supervision::MeshFailure;
42use hyperactor_mesh::transport::default_bind_spec;
43use hyperactor_mesh::value_mesh::ValueOverlay;
44use monarch_types::PickledPyObject;
45use monarch_types::SerializablePyErr;
46use monarch_types::py_global;
47use ndslice::Point;
48use ndslice::extent;
49use pyo3::IntoPyObjectExt;
50use pyo3::exceptions::PyBaseException;
51use pyo3::exceptions::PyRuntimeError;
52use pyo3::exceptions::PyValueError;
53use pyo3::prelude::*;
54use pyo3::types::PyDict;
55use pyo3::types::PyList;
56use pyo3::types::PyType;
57use serde::Deserialize;
58use serde::Serialize;
59use serde_multipart::Part;
60use tokio::sync::oneshot;
61use typeuri::Named;
62
63use crate::buffers::FrozenBuffer;
64use crate::config::ACTOR_QUEUE_DISPATCH;
65use crate::config::SHARED_ASYNCIO_RUNTIME;
66use crate::context::PyInstance;
67use crate::local_state_broker::BrokerId;
68use crate::local_state_broker::LocalStateBrokerMessage;
69use crate::mailbox::EitherPortRef;
70use crate::mailbox::PyMailbox;
71use crate::mailbox::PythonPortHandle;
72use crate::mailbox::PythonUndeliverableMessageEnvelope;
73use crate::metrics::ENDPOINT_ACTOR_COUNT;
74use crate::metrics::ENDPOINT_ACTOR_ERROR;
75use crate::metrics::ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM;
76use crate::metrics::ENDPOINT_ACTOR_PANIC;
77use crate::pickle::pickle_to_part;
78use crate::proc::PyActorId;
79use crate::pympsc;
80use crate::pytokio::PythonTask;
81use crate::runtime::get_proc_runtime;
82use crate::runtime::get_tokio_runtime;
83use crate::runtime::monarch_with_gil;
84use crate::runtime::monarch_with_gil_blocking;
85use crate::supervision::PyMeshFailure;
86
87py_global!(
88 unhandled_fault_hook_exception,
89 "monarch._src.actor.supervision",
90 "UnhandledFaultHookException"
91);
92
93#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
94#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
95pub enum UnflattenArg {
96 Mailbox,
97 PyObject,
98}
99
100#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
101#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
102pub enum MethodSpecifier {
103 ReturnsResponse { name: String },
105 ExplicitPort { name: String },
107 Init {},
109}
110
111impl std::fmt::Display for MethodSpecifier {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "{}", self.name())
114 }
115}
116
117#[pymethods]
118impl MethodSpecifier {
119 #[getter(name)]
120 fn py_name(&self) -> &str {
121 self.name()
122 }
123}
124
125impl MethodSpecifier {
126 pub(crate) fn name(&self) -> &str {
127 match self {
128 MethodSpecifier::ReturnsResponse { name } => name,
129 MethodSpecifier::ExplicitPort { name } => name,
130 MethodSpecifier::Init {} => "__init__",
131 }
132 }
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Eq)]
142pub enum PythonResponseMessage {
143 Result(serde_multipart::Part),
144 Exception(serde_multipart::Part),
145}
146
147wirevalue::register_type!(PythonResponseMessage);
148wirevalue::register_type!(ValueOverlay<PythonResponseMessage>);
149
150#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
155#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
156pub struct AccumulatedResponses(ValueOverlay<PythonResponseMessage>);
157
158#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
159#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq)]
160pub enum PythonMessageKind {
161 CallMethod {
162 name: MethodSpecifier,
163 response_port: Option<EitherPortRef>,
164 },
165 Result {
166 rank: Option<usize>,
167 },
168 Exception {
169 rank: Option<usize>,
170 },
171 Uninit {},
172 CallMethodIndirect {
173 name: MethodSpecifier,
174 local_state_broker: (String, usize),
175 id: usize,
176 unflatten_args: Vec<UnflattenArg>,
179 },
180 AccumulatedResponses(AccumulatedResponses),
181}
182wirevalue::register_type!(PythonMessageKind);
183
184impl Default for PythonMessageKind {
185 fn default() -> Self {
186 PythonMessageKind::Uninit {}
187 }
188}
189
190fn mailbox<'py, T: Actor>(py: Python<'py>, cx: &Context<'_, T>) -> Bound<'py, PyAny> {
191 let mailbox: PyMailbox = cx.mailbox_for_py().clone().into();
192 mailbox.into_bound_py_any(py).unwrap()
193}
194
195#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
196#[derive(Clone, Serialize, Deserialize, Named, Default, PartialEq)]
197pub struct PythonMessage {
198 pub kind: PythonMessageKind,
199 pub message: Part,
200}
201
202fn python_message_endpoint_name(msg: &PythonMessage) -> Option<String> {
204 match &msg.kind {
205 PythonMessageKind::CallMethod { name, .. }
206 | PythonMessageKind::CallMethodIndirect { name, .. } => Some(name.name().to_string()),
207 _ => None,
208 }
209}
210
211wirevalue::submit! {
216 wirevalue::TypeInfo {
217 typename: <PythonMessage as wirevalue::Named>::typename,
218 typehash: <PythonMessage as wirevalue::Named>::typehash,
219 typeid: <PythonMessage as wirevalue::Named>::typeid,
220 port: <PythonMessage as wirevalue::Named>::port,
221 dump: Some(<PythonMessage as wirevalue::NamedDumpable>::dump),
222 arm_unchecked: <PythonMessage as wirevalue::Named>::arm_unchecked,
223 endpoint_name: |ptr| {
224 let msg = unsafe { &*(ptr as *const PythonMessage) };
226 python_message_endpoint_name(msg)
227 },
228 }
229}
230
231wirevalue::submit! {
239 wirevalue::TypeInfo {
240 typename: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typename,
241 typehash: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typehash,
242 typeid: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typeid,
243 port: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::port,
244 dump: None,
245 arm_unchecked: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::arm_unchecked,
246 endpoint_name: |ptr| {
247 let erased = unsafe { &*(ptr as *const IndexedErasedUnbound<PythonMessage>) };
249 erased
250 .inner_any()
251 .deserialized_unchecked::<PythonMessage>()
252 .ok()
253 .and_then(|msg| python_message_endpoint_name(&msg))
254 },
255 }
256}
257
258impl From<ValueOverlay<PythonResponseMessage>> for PythonMessage {
259 fn from(overlay: ValueOverlay<PythonResponseMessage>) -> Self {
260 PythonMessage {
261 kind: PythonMessageKind::AccumulatedResponses(AccumulatedResponses(overlay)),
262 message: Default::default(),
263 }
264 }
265}
266
267impl PythonMessage {
268 pub fn into_overlay(self) -> anyhow::Result<ValueOverlay<PythonResponseMessage>> {
273 match self.kind {
274 PythonMessageKind::AccumulatedResponses(overlay) => Ok(overlay.0),
275 PythonMessageKind::Result { rank, .. } => {
276 let rank = rank.expect("accumulated response should have a rank");
277 let mut overlay = ValueOverlay::new();
278 overlay.push_run(rank..rank + 1, PythonResponseMessage::Result(self.message))?;
279 Ok(overlay)
280 }
281 PythonMessageKind::Exception { rank, .. } => {
282 let rank = rank.expect("accumulated exception should have a rank");
283 let mut overlay = ValueOverlay::new();
284 overlay.push_run(
285 rank..rank + 1,
286 PythonResponseMessage::Exception(self.message),
287 )?;
288 Ok(overlay)
289 }
290 other => {
291 anyhow::bail!(
292 "unexpected message kind {:?} in collected responses reducer",
293 other
294 );
295 }
296 }
297 }
298}
299
300struct ResolvedCallMethod {
301 method: MethodSpecifier,
302 bytes: FrozenBuffer,
303 local_state: Option<Py<PyAny>>,
304 response_port: ResponsePort,
307}
308
309enum ResponsePort {
310 Dropping,
311 Port(Port),
312 Local(LocalPort),
313}
314
315impl ResponsePort {
316 fn into_py_any(self, py: Python<'_>) -> PyResult<Py<PyAny>> {
317 match self {
318 ResponsePort::Dropping => DroppingPort.into_py_any(py),
319 ResponsePort::Port(port) => port.into_py_any(py),
320 ResponsePort::Local(port) => port.into_py_any(py),
321 }
322 }
323}
324
325#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
328pub struct QueuedMessage {
329 #[pyo3(get)]
330 pub context: Py<crate::context::PyContext>,
331 #[pyo3(get)]
332 pub method: MethodSpecifier,
333 #[pyo3(get)]
334 pub bytes: FrozenBuffer,
335 #[pyo3(get)]
336 pub local_state: Py<PyAny>,
337 #[pyo3(get)]
338 pub response_port: Py<PyAny>,
339}
340
341impl PythonMessage {
342 pub fn new_from_buf(kind: PythonMessageKind, message: impl Into<Part>) -> Self {
343 Self {
344 kind,
345 message: message.into(),
346 }
347 }
348
349 pub fn into_rank(self, rank: usize) -> Self {
350 let rank = Some(rank);
351 match self.kind {
352 PythonMessageKind::Result { .. } => PythonMessage {
353 kind: PythonMessageKind::Result { rank },
354 message: self.message,
355 },
356 PythonMessageKind::Exception { .. } => PythonMessage {
357 kind: PythonMessageKind::Exception { rank },
358 message: self.message,
359 },
360 _ => panic!("PythonMessage is not a response but {:?}", self),
361 }
362 }
363 async fn resolve_indirect_call(
364 self,
365 cx: &Context<'_, PythonActor>,
366 ) -> anyhow::Result<ResolvedCallMethod> {
367 match self.kind {
368 PythonMessageKind::CallMethodIndirect {
369 name,
370 local_state_broker,
371 id,
372 unflatten_args,
373 } => {
374 let broker = BrokerId::new(local_state_broker).resolve(cx).await;
375 let (send, recv) = cx.open_once_port();
376 broker.send(cx, LocalStateBrokerMessage::Get(id, send))?;
377 let state = recv.recv().await?;
378 let mut state_it = state.state.into_iter();
379 monarch_with_gil(|py| {
380 let mailbox = mailbox(py, cx);
381 let local_state = Some(
382 PyList::new(
383 py,
384 unflatten_args.into_iter().map(|x| -> Bound<'_, PyAny> {
385 match x {
386 UnflattenArg::Mailbox => mailbox.clone(),
387 UnflattenArg::PyObject => {
388 state_it.next().unwrap().into_bound(py)
389 }
390 }
391 }),
392 )
393 .unwrap()
394 .into(),
395 );
396 let response_port = ResponsePort::Local(LocalPort {
397 instance: cx.into(),
398 inner: Some(state.response_port),
399 });
400 Ok(ResolvedCallMethod {
401 method: name,
402 bytes: FrozenBuffer {
403 inner: self.message.into_bytes(),
404 },
405 local_state,
406 response_port,
407 })
408 })
409 .await
410 }
411 PythonMessageKind::CallMethod {
412 name,
413 response_port,
414 } => {
415 let response_port = response_port.map_or(ResponsePort::Dropping, |port_ref| {
416 let point = cx.cast_point();
417 ResponsePort::Port(Port {
418 port_ref,
419 instance: cx.instance().clone_for_py(),
420 rank: Some(point.rank()),
421 })
422 });
423 Ok(ResolvedCallMethod {
424 method: name,
425 bytes: FrozenBuffer {
426 inner: self.message.into_bytes(),
427 },
428 local_state: None,
429 response_port,
430 })
431 }
432 _ => {
433 panic!("unexpected message kind {:?}", self.kind)
434 }
435 }
436 }
437}
438
439impl std::fmt::Debug for PythonMessage {
440 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441 f.debug_struct("PythonMessage")
442 .field("kind", &self.kind)
443 .field(
444 "message",
445 &wirevalue::HexFmt(&(*self.message.to_bytes())[..]).to_string(),
446 )
447 .finish()
448 }
449}
450
451impl Unbind for PythonMessage {
452 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
453 match &self.kind {
454 PythonMessageKind::CallMethod { response_port, .. } => response_port.unbind(bindings),
455 _ => Ok(()),
456 }
457 }
458}
459
460impl Bind for PythonMessage {
461 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
462 match &mut self.kind {
463 PythonMessageKind::CallMethod { response_port, .. } => response_port.bind(bindings),
464 _ => Ok(()),
465 }
466 }
467}
468
469#[pymethods]
470impl PythonMessage {
471 #[new]
472 #[pyo3(signature = (kind, message))]
473 pub fn new<'py>(kind: PythonMessageKind, message: PyRef<'py, FrozenBuffer>) -> PyResult<Self> {
474 Ok(PythonMessage::new_from_buf(kind, message.inner.clone()))
475 }
476
477 #[getter]
478 fn kind(&self) -> PythonMessageKind {
479 self.kind.clone()
480 }
481
482 #[getter]
483 fn message(&self) -> FrozenBuffer {
484 FrozenBuffer {
485 inner: self.message.to_bytes(),
486 }
487 }
488}
489
490#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
491pub(super) struct PythonActorHandle {
492 pub(super) inner: ActorHandle<PythonActor>,
493}
494
495#[pymethods]
496impl PythonActorHandle {
497 fn send(&self, instance: &PyInstance, message: &PythonMessage) -> PyResult<()> {
499 self.inner
500 .send(instance.deref(), message.clone())
501 .map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
502 Ok(())
503 }
504
505 fn bind(&self) -> PyActorId {
506 self.inner.bind::<PythonActor>().into_actor_id().into()
507 }
508}
509
510#[derive(Debug)]
512pub enum PythonActorDispatchMode {
513 Direct,
515 Queue {
517 sender: pympsc::Sender,
519 receiver: Option<pympsc::PyReceiver>,
521 },
522}
523
524#[derive(Debug)]
526#[hyperactor::export(
527 spawn = true,
528 handlers = [
529 PythonMessage { cast = true },
530 MeshFailure { cast = true },
531 ],
532)]
533pub struct PythonActor {
534 actor: Py<PyAny>,
536 task_locals: Option<pyo3_async_runtimes::TaskLocals>,
539 instance: Option<Py<crate::context::PyInstance>>,
542 dispatch_mode: PythonActorDispatchMode,
544 spawn_point: OnceLock<Option<Point>>,
546 init_message: Option<PythonMessage>,
548}
549
550impl PythonActor {
551 pub(crate) fn new(
552 actor_type: PickledPyObject,
553 init_message: Option<PythonMessage>,
554 spawn_point: Option<Point>,
555 ) -> Result<Self, anyhow::Error> {
556 let use_queue_dispatch = hyperactor_config::global::get(ACTOR_QUEUE_DISPATCH);
557
558 Ok(monarch_with_gil_blocking(
559 |py| -> Result<Self, SerializablePyErr> {
560 let unpickled = actor_type.unpickle(py)?;
561 let class_type: &Bound<'_, PyType> = unpickled.downcast()?;
562 let actor: Py<PyAny> = class_type.call0()?.into_py_any(py)?;
563
564 let task_locals = (!hyperactor_config::global::get(SHARED_ASYNCIO_RUNTIME))
566 .then(|| Python::detach(py, create_task_locals));
567
568 let dispatch_mode = if use_queue_dispatch {
569 let (sender, receiver) = pympsc::channel().map_err(|e| {
570 let py_err = PyRuntimeError::new_err(e.to_string());
571 SerializablePyErr::from(py, &py_err)
572 })?;
573 PythonActorDispatchMode::Queue {
574 sender,
575 receiver: Some(receiver),
576 }
577 } else {
578 PythonActorDispatchMode::Direct
579 };
580
581 Ok(Self {
582 actor,
583 task_locals,
584 instance: None,
585 dispatch_mode,
586 spawn_point: OnceLock::from(spawn_point),
587 init_message,
588 })
589 },
590 )?)
591 }
592
593 fn get_task_locals(&self, py: Python) -> &pyo3_async_runtimes::TaskLocals {
596 self.task_locals
597 .as_ref()
598 .unwrap_or_else(|| shared_task_locals(py))
599 }
600
601 pub(crate) fn bootstrap_client(py: Python<'_>) -> (&'static Instance<Self>, ActorHandle<Self>) {
604 static ROOT_CLIENT_INSTANCE: OnceLock<Instance<PythonActor>> = OnceLock::new();
605
606 let client_proc = Proc::direct(
607 default_bind_spec().binding_addr(),
608 "mesh_root_client_proc".into(),
609 )
610 .unwrap();
611
612 Self::bootstrap_client_inner(py, client_proc, &ROOT_CLIENT_INSTANCE)
613 }
614
615 pub(crate) fn bootstrap_client_inner(
619 py: Python<'_>,
620 client_proc: Proc,
621 root_client_instance: &'static OnceLock<Instance<PythonActor>>,
622 ) -> (&'static Instance<Self>, ActorHandle<Self>) {
623 let actor_mesh_mod = py
624 .import("monarch._src.actor.actor_mesh")
625 .expect("import actor_mesh");
626 let root_client_class = actor_mesh_mod
627 .getattr("RootClientActor")
628 .expect("get RootClientActor");
629
630 let actor_type =
631 PickledPyObject::pickle(&actor_mesh_mod.getattr("_Actor").expect("get _Actor"))
632 .expect("pickle _Actor");
633
634 let init_frozen_buffer: FrozenBuffer = root_client_class
635 .call_method0("_pickled_init_args")
636 .expect("call RootClientActor._pickled_init_args")
637 .extract()
638 .expect("extract FrozenBuffer from _pickled_init_args");
639 let init_message = PythonMessage::new_from_buf(
640 PythonMessageKind::CallMethod {
641 name: MethodSpecifier::Init {},
642 response_port: None,
643 },
644 init_frozen_buffer,
645 );
646
647 let mut actor = PythonActor::new(
648 actor_type,
649 Some(init_message),
650 Some(extent!().point_of_rank(0).unwrap()),
651 )
652 .expect("create client PythonActor");
653
654 let ai = client_proc
655 .actor_instance(
656 root_client_class
657 .getattr("name")
658 .expect("get RootClientActor.name")
659 .extract()
660 .expect("extract RootClientActor.name"),
661 )
662 .expect("root instance create");
663
664 let handle = ai.handle;
665 let signal_rx = ai.signal;
666 let supervision_rx = ai.supervision;
667 let work_rx = ai.work;
668
669 root_client_instance
670 .set(ai.instance)
671 .map_err(|_| "already initialized root client instance")
672 .unwrap();
673 let instance = root_client_instance.get().unwrap();
674
675 instance.set_system();
679
680 let _client_ref = handle.bind::<PythonActor>();
683
684 get_tokio_runtime().spawn(async move {
685 actor.init(instance).await.unwrap();
687
688 let mut signal_rx = signal_rx;
689 let mut supervision_rx = supervision_rx;
690 let mut work_rx = work_rx;
691 let mut need_drain = false;
692 let mut err = 'messages: loop {
693 tokio::select! {
694 work = work_rx.recv() => {
695 let work = work.expect("inconsistent work queue state");
696 if let Err(err) = work.handle(&mut actor, instance).await {
697 let is_hook_exception = monarch_with_gil(|py| {
704 err.downcast_ref::<pyo3::PyErr>()
705 .is_some_and(|pyerr| {
706 pyerr.is_instance(
707 py,
708 &unhandled_fault_hook_exception(py),
709 )
710 })
711 }).await;
712
713 let kind = ActorErrorKind::processing(err);
714 let err = ActorError {
715 actor_id: Box::new(instance.self_id().clone()),
716 kind: Box::new(kind),
717 };
718
719 if is_hook_exception {
720 break Some(err);
721 }
722
723 let supervision_event = actor_error_to_event(instance, &actor, err);
729 if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
733 for supervision_event in supervision_rx.drain() {
734 if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
735 break 'messages Some(err);
736 }
737 }
738 break Some(err);
739 }
740 }
741 }
742 signal = signal_rx.recv() => {
743 let signal = signal.map_err(ActorError::from);
744 tracing::info!(actor_id = %instance.self_id(), "client received signal {signal:?}");
745 match signal {
746 Ok(signal@(Signal::Stop(_) | Signal::DrainAndStop(_))) => {
747 need_drain = matches!(signal, Signal::DrainAndStop(_));
748 break None;
749 },
750 Ok(Signal::ChildStopped(_)) => {},
751 Ok(Signal::Abort(reason)) => {
752 break Some(ActorError { actor_id: Box::new(instance.self_id().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) })
753 },
754 Err(err) => break Some(err),
755 }
756 }
757 Ok(supervision_event) = supervision_rx.recv() => {
758 if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
759 break Some(err);
760 }
761 }
762 };
763 };
764 if need_drain {
765 let mut n = 0;
766 while let Ok(work) = work_rx.try_recv() {
767 if let Err(e) = work.handle(&mut actor, instance).await {
768 err = Some(ActorError {
769 actor_id: Box::new(instance.self_id().clone()),
770 kind: Box::new(ActorErrorKind::processing(e)),
771 });
772 break;
773 }
774 n += 1;
775 }
776 tracing::debug!(actor_id = %instance.self_id(), "client drained {} messages before stopping", n);
777 }
778 if let Some(err) = err {
779 let event = actor_error_to_event(instance, &actor, err);
780 tracing::error!(
784 actor_id = %instance.self_id(),
785 "could not propagate supervision event {} because it reached the global client: signaling KeyboardInterrupt to main thread",
786 event,
787 );
788
789 monarch_with_gil_blocking(|py| {
798 let thread_mod = py.import("_thread").expect("import _thread");
801 let interrupt_main = thread_mod
802 .getattr("interrupt_main")
803 .expect("get interrupt_main");
804
805 if let Err(e) = interrupt_main.call0() {
807 tracing::error!("unable to interrupt main, exiting the process instead: {:?}", e);
808 eprintln!("unable to interrupt main, exiting the process with code 1 instead: {:?}", e);
809 std::process::exit(1);
810 }
811 });
812 } else {
813 tracing::info!(actor_id = %instance.self_id(), "client stopped");
814 instance.change_status(hyperactor::actor::ActorStatus::Stopped("client stopped".into()));
815 }
816 });
817
818 (root_client_instance.get().unwrap(), handle)
819 }
820}
821
822fn actor_error_to_event(
823 instance: &Instance<PythonActor>,
824 actor: &PythonActor,
825 err: ActorError,
826) -> ActorSupervisionEvent {
827 match *err.kind {
828 ActorErrorKind::UnhandledSupervisionEvent(event) => *event,
829 _ => {
830 let status = ActorStatus::generic_failure(err.kind.to_string());
831 ActorSupervisionEvent::new(
832 instance.self_id().clone(),
833 actor.display_name(),
834 status,
835 None,
836 )
837 }
838 }
839}
840
841pub(crate) fn root_client_actor(py: Python<'_>) -> &'static Instance<PythonActor> {
842 static ROOT_CLIENT_ACTOR: OnceLock<&'static Instance<PythonActor>> = OnceLock::new();
843
844 py.detach(|| {
849 ROOT_CLIENT_ACTOR.get_or_init(|| {
850 monarch_with_gil_blocking(|py| {
851 let (client, _handle) = PythonActor::bootstrap_client(py);
852 client
853 })
854 })
855 })
856}
857
858#[async_trait]
859impl Actor for PythonActor {
860 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
861 if let PythonActorDispatchMode::Queue { receiver, .. } = &mut self.dispatch_mode {
862 let receiver = receiver.take().unwrap();
863
864 let error_port: hyperactor::PortHandle<PythonMessage> =
867 this.port::<Signal>().contramap(|msg: PythonMessage| {
868 monarch_with_gil_blocking(|py| {
869 let err = match msg.kind {
870 PythonMessageKind::Exception { .. } => {
871 let cloudpickle = py.import("cloudpickle").unwrap();
873 let err_obj = cloudpickle
874 .call_method1("loads", (msg.message.to_bytes().as_ref(),))
875 .unwrap();
876 let py_err = pyo3::PyErr::from_value(err_obj);
877 SerializablePyErr::from(py, &py_err)
878 }
879 _ => {
880 let py_err = PyRuntimeError::new_err(format!(
881 "expected Exception, got {:?}",
882 msg.kind
883 ));
884 SerializablePyErr::from(py, &py_err)
885 }
886 };
887 Signal::Abort(err.to_string())
888 })
889 });
890
891 let error_port_handle = PythonPortHandle::new(error_port);
892
893 monarch_with_gil(|py| {
894 let tl = self
895 .task_locals
896 .as_ref()
897 .unwrap_or_else(|| shared_task_locals(py));
898 let awaitable = self.actor.call_method(
899 py,
900 "_dispatch_loop",
901 (receiver, error_port_handle),
902 None,
903 )?;
904 let future =
905 pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))?;
906 tokio::spawn(async move {
907 if let Err(e) = future.await {
908 tracing::error!("message loop error: {}", e);
909 }
910 });
911 Ok::<_, anyhow::Error>(())
912 })
913 .await?;
914 }
915
916 if let Some(init_message) = self.init_message.take() {
917 let spawn_point = self.spawn_point.get().unwrap().as_ref().expect("PythonActor should never be spawned with init_message unless spawn_point also specified").clone();
918 let mut headers = Flattrs::new();
919 headers.set(CAST_POINT, spawn_point);
920 let cx = Context::new(this, headers);
921 <Self as Handler<PythonMessage>>::handle(self, &cx, init_message).await?;
922 }
923
924 Ok(())
925 }
926
927 async fn cleanup(
928 &mut self,
929 this: &Instance<Self>,
930 err: Option<&ActorError>,
931 ) -> anyhow::Result<()> {
932 let cx = Context::new(this, Flattrs::new());
936 let err_as_str = err.map(|e| e.to_string());
940 let future = monarch_with_gil(|py| {
941 let py_cx = match &self.instance {
942 Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
943 None => {
944 let py_instance: crate::context::PyInstance = this.into();
945 crate::context::PyContext::new(
946 &cx,
947 py_instance
948 .into_py_any(py)?
949 .downcast_bound(py)
950 .map_err(PyErr::from)?
951 .clone()
952 .unbind(),
953 )
954 }
955 }
956 .into_bound_py_any(py)?;
957 let actor = self.actor.bind(py);
958 match actor.hasattr("__cleanup__") {
961 Ok(false) | Err(_) => {
962 return Ok(None);
964 }
965 _ => {}
966 }
967 let awaitable = actor
968 .call_method("__cleanup__", (&py_cx, err_as_str), None)
969 .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?;
970 if awaitable.is_none() {
971 Ok(None)
972 } else {
973 pyo3_async_runtimes::into_future_with_locals(self.get_task_locals(py), awaitable)
974 .map(Some)
975 .map_err(anyhow::Error::from)
976 }
977 })
978 .await?;
979 if let Some(future) = future {
980 future.await.map_err(anyhow::Error::from)?;
981 }
982 Ok(())
983 }
984
985 fn display_name(&self) -> Option<String> {
986 self.instance.as_ref().and_then(|instance| {
987 monarch_with_gil_blocking(|py| instance.bind(py).str().ok().map(|s| s.to_string()))
988 })
989 }
990
991 async fn handle_undeliverable_message(
992 &mut self,
993 ins: &Instance<Self>,
994 mut envelope: Undeliverable<MessageEnvelope>,
995 ) -> Result<(), anyhow::Error> {
996 if envelope.0.sender() != ins.self_id() {
997 envelope = update_undeliverable_envelope_for_casting(envelope);
999 }
1000 assert_eq!(
1001 envelope.0.sender(),
1002 ins.self_id(),
1003 "undeliverable message was returned to the wrong actor. \
1004 Return address = {}, src actor = {}, dest actor port = {}, message type = {}, envelope headers = {}",
1005 envelope.0.sender(),
1006 ins.self_id(),
1007 envelope.0.dest(),
1008 envelope.0.data().typename().unwrap_or("unknown"),
1009 envelope.0.headers()
1010 );
1011
1012 let cx = Context::new(ins, envelope.0.headers().clone());
1013
1014 let (envelope, handled) = monarch_with_gil(|py| {
1015 let py_cx = match &self.instance {
1016 Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
1017 None => {
1018 let py_instance: crate::context::PyInstance = ins.into();
1019 crate::context::PyContext::new(
1020 &cx,
1021 py_instance
1022 .into_py_any(py)?
1023 .downcast_bound(py)
1024 .map_err(PyErr::from)?
1025 .clone()
1026 .unbind(),
1027 )
1028 }
1029 }
1030 .into_bound_py_any(py)?;
1031 let py_envelope = PythonUndeliverableMessageEnvelope {
1032 inner: Some(envelope),
1033 }
1034 .into_bound_py_any(py)?;
1035 let handled = self
1036 .actor
1037 .call_method(
1038 py,
1039 "_handle_undeliverable_message",
1040 (&py_cx, &py_envelope),
1041 None,
1042 )
1043 .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?
1044 .extract::<bool>(py)?;
1045 Ok::<_, anyhow::Error>((
1046 py_envelope
1047 .downcast::<PythonUndeliverableMessageEnvelope>()
1048 .map_err(PyErr::from)?
1049 .try_borrow_mut()
1050 .map_err(PyErr::from)?
1051 .take()?,
1052 handled,
1053 ))
1054 })
1055 .await?;
1056
1057 if !handled {
1058 hyperactor::actor::handle_undeliverable_message(ins, envelope)
1059 } else {
1060 Ok(())
1061 }
1062 }
1063
1064 async fn handle_supervision_event(
1065 &mut self,
1066 this: &Instance<Self>,
1067 event: &ActorSupervisionEvent,
1068 ) -> Result<bool, anyhow::Error> {
1069 let cx = Context::new(this, Flattrs::new());
1070 self.handle(
1071 &cx,
1072 MeshFailure {
1073 actor_mesh_name: None,
1074 event: event.clone(),
1075 crashed_ranks: vec![],
1076 },
1077 )
1078 .await
1079 .map(|_| true)
1080 }
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize, Named)]
1084pub struct PythonActorParams {
1085 actor_type: PickledPyObject,
1087 init_message: Option<PythonMessage>,
1089}
1090
1091impl PythonActorParams {
1092 pub(crate) fn new(actor_type: PickledPyObject, init_message: Option<PythonMessage>) -> Self {
1093 Self {
1094 actor_type,
1095 init_message,
1096 }
1097 }
1098}
1099
1100#[async_trait]
1101impl RemoteSpawn for PythonActor {
1102 type Params = PythonActorParams;
1103
1104 async fn new(
1105 PythonActorParams {
1106 actor_type,
1107 init_message,
1108 }: PythonActorParams,
1109 environment: Flattrs,
1110 ) -> Result<Self, anyhow::Error> {
1111 let spawn_point = environment.get(CAST_POINT);
1112 Self::new(actor_type, init_message, spawn_point)
1113 }
1114}
1115
1116fn create_task_locals() -> pyo3_async_runtimes::TaskLocals {
1118 monarch_with_gil_blocking(|py| {
1119 let asyncio = Python::import(py, "asyncio").unwrap();
1120 let event_loop = asyncio.call_method0("new_event_loop").unwrap();
1121 let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
1122 .copy_context(py)
1123 .unwrap();
1124
1125 let kwargs = PyDict::new(py);
1126 let target = event_loop.getattr("run_forever").unwrap();
1127 kwargs.set_item("target", target).unwrap();
1128 kwargs.set_item("daemon", true).unwrap();
1130 let thread = py
1131 .import("threading")
1132 .unwrap()
1133 .call_method("Thread", (), Some(&kwargs))
1134 .unwrap();
1135 thread.call_method0("start").unwrap();
1136 task_locals
1137 })
1138}
1139
1140fn shared_task_locals(py: Python) -> &'static pyo3_async_runtimes::TaskLocals {
1142 static SHARED_TASK_LOCALS: OnceLock<pyo3_async_runtimes::TaskLocals> = OnceLock::new();
1143 Python::detach(py, || SHARED_TASK_LOCALS.get_or_init(create_task_locals))
1144}
1145
1146#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1182struct PanicFlag {
1183 sender: Option<tokio::sync::oneshot::Sender<Py<PyAny>>>,
1184}
1185
1186#[pymethods]
1187impl PanicFlag {
1188 fn signal_panic(&mut self, ex: Py<PyAny>) {
1189 self.sender.take().unwrap().send(ex).unwrap();
1190 }
1191}
1192
1193#[async_trait]
1194impl Handler<PythonMessage> for PythonActor {
1195 #[tracing::instrument(level = "debug", skip_all)]
1196 async fn handle(
1197 &mut self,
1198 cx: &Context<PythonActor>,
1199 message: PythonMessage,
1200 ) -> anyhow::Result<()> {
1201 match &self.dispatch_mode {
1202 PythonActorDispatchMode::Direct => self.handle_direct(cx, message).await,
1203 PythonActorDispatchMode::Queue { sender, .. } => {
1204 let sender = sender.clone();
1205 self.handle_queue(cx, sender, message).await
1206 }
1207 }
1208 }
1209}
1210
1211impl PythonActor {
1212 async fn handle_direct(
1214 &mut self,
1215 cx: &Context<'_, PythonActor>,
1216 message: PythonMessage,
1217 ) -> anyhow::Result<()> {
1218 let resolved = message.resolve_indirect_call(cx).await?;
1219 let endpoint = resolved.method.to_string();
1220
1221 let (sender, receiver) = oneshot::channel();
1224
1225 let future = monarch_with_gil(|py| -> Result<_, SerializablePyErr> {
1226 let inst = self.instance.get_or_insert_with(|| {
1227 let inst: crate::context::PyInstance = cx.into();
1228 inst.into_pyobject(py).unwrap().into()
1229 });
1230
1231 let awaitable = self.actor.call_method(
1232 py,
1233 "handle",
1234 (
1235 crate::context::PyContext::new(cx, inst.clone_ref(py)),
1236 resolved.method,
1237 resolved.bytes,
1238 PanicFlag {
1239 sender: Some(sender),
1240 },
1241 resolved
1242 .local_state
1243 .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1244 resolved.response_port.into_py_any(py)?,
1245 ),
1246 None,
1247 )?;
1248
1249 let tl = self
1250 .task_locals
1251 .as_ref()
1252 .unwrap_or_else(|| shared_task_locals(py));
1253
1254 pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))
1255 .map_err(|err| err.into())
1256 })
1257 .await?;
1258
1259 tokio::spawn(handle_async_endpoint_panic(
1261 cx.port(),
1262 PythonTask::new(future)?,
1263 receiver,
1264 cx.self_id().to_string(),
1265 endpoint,
1266 ));
1267 Ok(())
1268 }
1269
1270 async fn handle_queue(
1273 &mut self,
1274 cx: &Context<'_, PythonActor>,
1275 sender: pympsc::Sender,
1276 message: PythonMessage,
1277 ) -> anyhow::Result<()> {
1278 let resolved = message.resolve_indirect_call(cx).await?;
1279
1280 let queued_msg = monarch_with_gil(|py| -> anyhow::Result<QueuedMessage> {
1281 let inst = self.instance.get_or_insert_with(|| {
1282 let inst: crate::context::PyInstance = cx.into();
1283 inst.into_pyobject(py).unwrap().into()
1284 });
1285
1286 let py_context = crate::context::PyContext::new(cx, inst.clone_ref(py));
1287 let py_context_obj = Py::new(py, py_context)?;
1288
1289 Ok(QueuedMessage {
1290 context: py_context_obj,
1291 method: resolved.method,
1292 bytes: resolved.bytes,
1293 local_state: resolved
1294 .local_state
1295 .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1296 response_port: resolved.response_port.into_py_any(py)?,
1297 })
1298 })
1299 .await?;
1300
1301 sender
1302 .send(queued_msg)
1303 .map_err(|_| anyhow::anyhow!("failed to send message to queue"))?;
1304
1305 Ok(())
1306 }
1307}
1308
1309#[async_trait]
1310impl Handler<MeshFailure> for PythonActor {
1311 async fn handle(&mut self, cx: &Context<Self>, message: MeshFailure) -> anyhow::Result<()> {
1312 if !message.event.actor_status.is_failed() {
1316 tracing::info!(
1317 "ignoring non-failure supervision event from child: {}",
1318 message
1319 );
1320 return Ok(());
1321 }
1322 monarch_with_gil(|py| {
1326 let inst = self.instance.get_or_insert_with(|| {
1327 let inst: crate::context::PyInstance = cx.into();
1328 inst.into_pyobject(py).unwrap().into()
1329 });
1330 let display_name: Option<String> = inst.bind(py).str().ok().map(|s| s.to_string());
1332 let actor_bound = self.actor.bind(py);
1333 if !actor_bound.hasattr("__supervise__")? {
1336 return Err(anyhow::anyhow!(
1337 "no __supervise__ method on {:?}",
1338 actor_bound
1339 ));
1340 }
1341 let result = actor_bound.call_method(
1342 "__supervise__",
1343 (
1344 crate::context::PyContext::new(cx, inst.clone_ref(py)),
1345 PyMeshFailure::from(message.clone()),
1346 ),
1347 None,
1348 );
1349 match result {
1350 Ok(s) => {
1351 if s.is_truthy()? {
1352 tracing::info!(
1357 name = "ActorMeshStatus",
1358 status = "SupervisionError::Handled",
1359 actor_name = message.actor_mesh_name,
1361 event = %message.event,
1362 "__supervise__ on {} handled a supervision event, not reporting any further",
1363 cx.self_id(),
1364 );
1365 Ok(())
1366 } else {
1367 for (actor_name, status) in [
1376 (
1377 message
1378 .actor_mesh_name
1379 .as_deref()
1380 .unwrap_or_else(|| message.event.actor_id.name()),
1381 "SupervisionError::Unhandled",
1382 ),
1383 (cx.self_id().name(), "UnhandledSupervisionEvent"),
1384 ] {
1385 tracing::info!(
1386 name = "ActorMeshStatus",
1387 status,
1388 actor_name,
1389 event = %message.event,
1390 "__supervise__ on {} did not handle a supervision event, reporting to the next next owner",
1391 cx.self_id(),
1392 );
1393 }
1394 let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1395 ActorSupervisionEvent::new(
1396 cx.self_id().clone(),
1397 display_name.clone(),
1398 ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(
1399 Box::new(message.event.clone()),
1400 )),
1401 None,
1402 ),
1403 ));
1404 Err(anyhow::Error::new(err))
1405 }
1406 }
1407 Err(err) => {
1408 if err.is_instance(
1413 py,
1414 &unhandled_fault_hook_exception(py),
1415 ) {
1416 return Err(err.into());
1417 }
1418
1419 for (actor_name, status) in [
1425 (
1426 message
1427 .actor_mesh_name
1428 .as_deref()
1429 .unwrap_or_else(|| message.event.actor_id.name()),
1430 "SupervisionError::__supervise__::exception",
1431 ),
1432 (cx.self_id().name(), "UnhandledSupervisionEvent"),
1433 ] {
1434 tracing::info!(
1435 name = "ActorMeshStatus",
1436 status,
1437 actor_name,
1438 event = %message.event,
1439 "__supervise__ on {} threw an exception",
1440 cx.self_id(),
1441 );
1442 }
1443 let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1444 ActorSupervisionEvent::new(
1445 cx.self_id().clone(),
1446 display_name,
1447 ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
1448 err.to_string(),
1449 Box::new(message.event.clone()),
1450 )),
1451 None,
1452 ),
1453 ));
1454 Err(anyhow::Error::new(err))
1455 }
1456 }
1457 })
1458 .await
1459 }
1460}
1461
1462async fn handle_async_endpoint_panic(
1463 panic_sender: PortHandle<Signal>,
1464 task: PythonTask,
1465 side_channel: oneshot::Receiver<Py<PyAny>>,
1466 actor_id: String,
1467 endpoint: String,
1468) {
1469 let attributes =
1471 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id, "endpoint" => endpoint);
1472
1473 let start_time = std::time::Instant::now();
1475
1476 ENDPOINT_ACTOR_COUNT.add(1, attributes);
1478
1479 let err_or_never = async {
1480 match side_channel.await {
1483 Ok(value) => {
1484 monarch_with_gil(|py| -> Option<SerializablePyErr> {
1485 let err: PyErr = value
1486 .downcast_bound::<PyBaseException>(py)
1487 .unwrap()
1488 .clone()
1489 .into();
1490 ENDPOINT_ACTOR_PANIC.add(1, attributes);
1491 Some(err.into())
1492 })
1493 .await
1494 }
1495 Err(_) => pending().await,
1500 }
1501 };
1502 let future = task.take();
1503 if let Some(panic) = tokio::select! {
1504 result = future => {
1505 match result {
1506 Ok(_) => None,
1507 Err(e) => Some(e.into()),
1508 }
1509 },
1510 result = err_or_never => {
1511 result
1512 }
1513 } {
1514 ENDPOINT_ACTOR_ERROR.add(1, attributes);
1516 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
1517 let client = &CLIENT
1518 .get_or_init(|| {
1519 get_proc_runtime()
1520 .instance("async_endpoint_handler")
1521 .unwrap()
1522 })
1523 .0;
1524 panic_sender
1525 .send(&client, Signal::Abort(panic.to_string()))
1526 .expect("Unable to send panic message");
1527 }
1528
1529 let elapsed_micros = start_time.elapsed().as_micros() as f64;
1531 ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM.record(elapsed_micros, attributes);
1532}
1533
1534#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1535struct LocalPort {
1536 instance: PyInstance,
1537 inner: Option<OncePortHandle<Result<Py<PyAny>, Py<PyAny>>>>,
1538}
1539
1540impl Debug for LocalPort {
1541 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1542 f.debug_struct("LocalPort")
1543 .field("inner", &self.inner)
1544 .finish()
1545 }
1546}
1547
1548pub(crate) fn to_py_error<T>(e: T) -> PyErr
1549where
1550 T: Error,
1551{
1552 PyErr::new::<PyValueError, _>(e.to_string())
1553}
1554
1555#[pymethods]
1556impl LocalPort {
1557 fn send(&mut self, obj: Py<PyAny>) -> PyResult<()> {
1558 let port = self.inner.take().expect("use local port once");
1559 port.send(self.instance.deref(), Ok(obj))
1560 .map_err(to_py_error)
1561 }
1562 fn exception(&mut self, e: Py<PyAny>) -> PyResult<()> {
1563 let port = self.inner.take().expect("use local port once");
1564 port.send(self.instance.deref(), Err(e))
1565 .map_err(to_py_error)
1566 }
1567}
1568
1569#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1573#[derive(Debug)]
1574pub struct DroppingPort;
1575
1576#[pymethods]
1577impl DroppingPort {
1578 #[new]
1579 fn new() -> Self {
1580 DroppingPort
1581 }
1582
1583 fn send(&self, _obj: Py<PyAny>) -> PyResult<()> {
1584 Ok(())
1585 }
1586
1587 fn exception(&self, e: Bound<'_, PyAny>) -> PyResult<()> {
1588 let exc = if let Ok(inner) = e.getattr("exception") {
1590 inner
1591 } else {
1592 e
1593 };
1594 Err(PyErr::from_value(exc))
1595 }
1596
1597 #[getter]
1598 fn get_return_undeliverable(&self) -> bool {
1599 true
1600 }
1601
1602 #[setter]
1603 fn set_return_undeliverable(&self, _value: bool) {}
1604}
1605
1606#[pyclass(module = "monarch._src.actor.actor_mesh")]
1609pub struct Port {
1610 port_ref: EitherPortRef,
1611 instance: Instance<PythonActor>,
1612 rank: Option<usize>,
1613}
1614
1615#[pymethods]
1616impl Port {
1617 #[new]
1618 fn new(
1619 port_ref: EitherPortRef,
1620 instance: &crate::context::PyInstance,
1621 rank: Option<usize>,
1622 ) -> Self {
1623 Self {
1624 port_ref,
1625 instance: instance.clone().into_instance(),
1626 rank,
1627 }
1628 }
1629
1630 #[getter("_port_ref")]
1631 fn port_ref_py(&self) -> EitherPortRef {
1632 self.port_ref.clone()
1633 }
1634
1635 #[getter("_rank")]
1636 fn rank_py(&self) -> Option<usize> {
1637 self.rank
1638 }
1639
1640 #[getter]
1641 fn get_return_undeliverable(&self) -> bool {
1642 self.port_ref.get_return_undeliverable()
1643 }
1644
1645 #[setter]
1646 fn set_return_undeliverable(&mut self, value: bool) {
1647 self.port_ref.set_return_undeliverable(value);
1648 }
1649
1650 fn send(&mut self, py: Python<'_>, obj: Py<PyAny>) -> PyResult<()> {
1651 let message = PythonMessage::new_from_buf(
1652 PythonMessageKind::Result { rank: self.rank },
1653 pickle_to_part(py, &obj)?,
1654 );
1655
1656 self.port_ref
1657 .send(&self.instance, message)
1658 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1659 }
1660
1661 fn exception(&mut self, py: Python<'_>, e: Py<PyAny>) -> PyResult<()> {
1662 let message = PythonMessage::new_from_buf(
1663 PythonMessageKind::Exception { rank: self.rank },
1664 pickle_to_part(py, &e)?,
1665 );
1666
1667 self.port_ref
1668 .send(&self.instance, message)
1669 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1670 }
1671}
1672
1673pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
1674 hyperactor_mod.add_class::<PythonActorHandle>()?;
1675 hyperactor_mod.add_class::<PythonMessage>()?;
1676 hyperactor_mod.add_class::<PythonMessageKind>()?;
1677 hyperactor_mod.add_class::<MethodSpecifier>()?;
1678 hyperactor_mod.add_class::<UnflattenArg>()?;
1679 hyperactor_mod.add_class::<PanicFlag>()?;
1680 hyperactor_mod.add_class::<QueuedMessage>()?;
1681 hyperactor_mod.add_class::<DroppingPort>()?;
1682 hyperactor_mod.add_class::<Port>()?;
1683 Ok(())
1684}
1685
1686#[cfg(test)]
1687mod tests {
1688 use hyperactor::accum::ReducerSpec;
1689 use hyperactor::accum::StreamingReducerOpts;
1690 use hyperactor::message::ErasedUnbound;
1691 use hyperactor::message::Unbound;
1692 use hyperactor::reference;
1693 use hyperactor::testing::ids::test_port_id;
1694 use hyperactor_mesh::Error as MeshError;
1695 use hyperactor_mesh::Name;
1696 use hyperactor_mesh::host_mesh::host_agent::ProcState;
1697 use hyperactor_mesh::resource::Status;
1698 use hyperactor_mesh::resource::{self};
1699 use pyo3::PyTypeInfo;
1700
1701 use super::*;
1702 use crate::actor::to_py_error;
1703
1704 #[test]
1705 fn test_python_message_bind_unbind() {
1706 let reducer_spec = ReducerSpec {
1707 typehash: 123,
1708 builder_params: Some(wirevalue::Any::serialize(&"abcdefg12345".to_string()).unwrap()),
1709 };
1710 let port_ref = reference::PortRef::<PythonMessage>::attest_reducible(
1711 test_port_id("world_0", "client", 123),
1712 Some(reducer_spec),
1713 StreamingReducerOpts::default(),
1714 );
1715 let message = PythonMessage {
1716 kind: PythonMessageKind::CallMethod {
1717 name: MethodSpecifier::ReturnsResponse {
1718 name: "test".to_string(),
1719 },
1720 response_port: Some(EitherPortRef::Unbounded(port_ref.clone().into())),
1721 },
1722 message: Part::from(vec![1, 2, 3]),
1723 };
1724 {
1725 let mut erased = ErasedUnbound::try_from_message(message.clone()).unwrap();
1726 let mut bindings = vec![];
1727 erased
1728 .visit_mut::<reference::UnboundPort>(|b| {
1729 bindings.push(b.clone());
1730 Ok(())
1731 })
1732 .unwrap();
1733 assert_eq!(bindings, vec![reference::UnboundPort::from(&port_ref)]);
1734 let unbound = Unbound::try_from_message(message.clone()).unwrap();
1735 assert_eq!(message, unbound.bind().unwrap());
1736 }
1737
1738 let no_port_message = PythonMessage {
1739 kind: PythonMessageKind::CallMethod {
1740 name: MethodSpecifier::ReturnsResponse {
1741 name: "test".to_string(),
1742 },
1743 response_port: None,
1744 },
1745 ..message
1746 };
1747 {
1748 let mut erased = ErasedUnbound::try_from_message(no_port_message.clone()).unwrap();
1749 let mut bindings = vec![];
1750 erased
1751 .visit_mut::<reference::UnboundPort>(|b| {
1752 bindings.push(b.clone());
1753 Ok(())
1754 })
1755 .unwrap();
1756 assert_eq!(bindings.len(), 0);
1757 let unbound = Unbound::try_from_message(no_port_message.clone()).unwrap();
1758 assert_eq!(no_port_message, unbound.bind().unwrap());
1759 }
1760 }
1761
1762 #[test]
1763 fn to_py_error_preserves_proc_creation_message() {
1764 let state: resource::State<ProcState> = resource::State {
1766 name: Name::new("my_proc").unwrap(),
1767 status: Status::Failed("boom".into()),
1768 state: None,
1769 generation: 0,
1770 timestamp: std::time::SystemTime::now(),
1771 };
1772
1773 let mesh_agent: hyperactor::reference::ActorRef<hyperactor_mesh::host_mesh::HostAgent> =
1775 hyperactor::reference::ActorRef::attest(
1776 test_port_id("hello_0", "actor", 0).actor_id().clone(),
1777 );
1778 let expected_prefix = format!(
1779 "error creating proc (host rank 0) on host mesh agent {}",
1780 mesh_agent
1781 );
1782 let err = MeshError::ProcCreationError {
1783 host_rank: 0,
1784 mesh_agent,
1785 state: Box::new(state),
1786 };
1787
1788 let rust_msg = err.to_string();
1789 let pyerr = to_py_error(err);
1790
1791 pyo3::Python::initialize();
1792 monarch_with_gil_blocking(|py| {
1793 assert!(pyerr.get_type(py).is(PyValueError::type_object(py)));
1794 let py_msg = pyerr.value(py).to_string();
1795
1796 assert_eq!(py_msg, rust_msg);
1798 assert!(py_msg.contains(", state: "));
1800 assert!(py_msg.contains("\"status\":{\"Failed\":\"boom\"}"));
1801 assert!(py_msg.starts_with(&expected_prefix));
1803 });
1804 }
1805}