1use std::error::Error;
10use std::fmt::Debug;
11use std::future::pending;
12use std::ops::Deref;
13use std::sync::OnceLock;
14
15use async_trait::async_trait;
16use hyperactor::Actor;
17use hyperactor::ActorHandle;
18use hyperactor::Context;
19use hyperactor::Handler;
20use hyperactor::Instance;
21use hyperactor::OncePortHandle;
22use hyperactor::PortHandle;
23use hyperactor::Proc;
24use hyperactor::RemoteSpawn;
25use hyperactor::actor::ActorError;
26use hyperactor::actor::ActorErrorKind;
27use hyperactor::actor::ActorStatus;
28use hyperactor::actor::Signal;
29use hyperactor::context::Actor as ContextActor;
30use hyperactor::mailbox::MessageEnvelope;
31use hyperactor::mailbox::Undeliverable;
32use hyperactor::message::Bind;
33use hyperactor::message::Bindings;
34use hyperactor::message::Unbind;
35use hyperactor::supervision::ActorSupervisionEvent;
36use hyperactor_config::Flattrs;
37use hyperactor_mesh::casting::update_undeliverable_envelope_for_casting;
38use hyperactor_mesh::comm::multicast::CAST_POINT;
39use hyperactor_mesh::comm::multicast::CastInfo;
40use hyperactor_mesh::supervision::MeshFailure;
41use hyperactor_mesh::transport::default_bind_spec;
42use monarch_types::PickledPyObject;
43use monarch_types::SerializablePyErr;
44use ndslice::Point;
45use ndslice::extent;
46use pyo3::IntoPyObjectExt;
47use pyo3::exceptions::PyBaseException;
48use pyo3::exceptions::PyRuntimeError;
49use pyo3::exceptions::PyValueError;
50use pyo3::prelude::*;
51use pyo3::types::PyDict;
52use pyo3::types::PyList;
53use pyo3::types::PyType;
54use serde::Deserialize;
55use serde::Serialize;
56use serde_multipart::Part;
57use tokio::sync::oneshot;
58use typeuri::Named;
59
60use crate::buffers::FrozenBuffer;
61use crate::config::ACTOR_QUEUE_DISPATCH;
62use crate::config::SHARED_ASYNCIO_RUNTIME;
63use crate::context::PyInstance;
64use crate::local_state_broker::BrokerId;
65use crate::local_state_broker::LocalStateBrokerMessage;
66use crate::mailbox::EitherPortRef;
67use crate::mailbox::PyMailbox;
68use crate::mailbox::PythonPortHandle;
69use crate::mailbox::PythonUndeliverableMessageEnvelope;
70use crate::metrics::ENDPOINT_ACTOR_COUNT;
71use crate::metrics::ENDPOINT_ACTOR_ERROR;
72use crate::metrics::ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM;
73use crate::metrics::ENDPOINT_ACTOR_PANIC;
74use crate::pickle::pickle_to_part;
75use crate::proc::PyActorId;
76use crate::pympsc;
77use crate::pytokio::PythonTask;
78use crate::runtime::get_proc_runtime;
79use crate::runtime::get_tokio_runtime;
80use crate::runtime::monarch_with_gil;
81use crate::runtime::monarch_with_gil_blocking;
82use crate::supervision::PyMeshFailure;
83
84#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
85#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86pub enum UnflattenArg {
87 Mailbox,
88 PyObject,
89}
90
91#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
92#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
93pub enum MethodSpecifier {
94 ReturnsResponse { name: String },
96 ExplicitPort { name: String },
98 Init {},
100}
101
102impl std::fmt::Display for MethodSpecifier {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 write!(f, "{}", self.name())
105 }
106}
107
108#[pymethods]
109impl MethodSpecifier {
110 #[getter(name)]
111 fn py_name(&self) -> &str {
112 self.name()
113 }
114}
115
116impl MethodSpecifier {
117 pub(crate) fn name(&self) -> &str {
118 match self {
119 MethodSpecifier::ReturnsResponse { name } => name,
120 MethodSpecifier::ExplicitPort { name } => name,
121 MethodSpecifier::Init {} => "__init__",
122 }
123 }
124}
125
126#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
127#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq)]
128pub enum PythonMessageKind {
129 CallMethod {
130 name: MethodSpecifier,
131 response_port: Option<EitherPortRef>,
132 },
133 Result {
134 rank: Option<usize>,
135 },
136 Exception {
137 rank: Option<usize>,
138 },
139 Uninit {},
140 CallMethodIndirect {
141 name: MethodSpecifier,
142 local_state_broker: (String, usize),
143 id: usize,
144 unflatten_args: Vec<UnflattenArg>,
147 },
148}
149wirevalue::register_type!(PythonMessageKind);
150
151impl Default for PythonMessageKind {
152 fn default() -> Self {
153 PythonMessageKind::Uninit {}
154 }
155}
156
157fn mailbox<'py, T: Actor>(py: Python<'py>, cx: &Context<'_, T>) -> Bound<'py, PyAny> {
158 let mailbox: PyMailbox = cx.mailbox_for_py().clone().into();
159 mailbox.into_bound_py_any(py).unwrap()
160}
161
162#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
163#[derive(Clone, Serialize, Deserialize, Named, Default, PartialEq)]
164pub struct PythonMessage {
165 pub kind: PythonMessageKind,
166 pub message: Part,
167}
168
169wirevalue::register_type!(PythonMessage);
170
171struct ResolvedCallMethod {
172 method: MethodSpecifier,
173 bytes: FrozenBuffer,
174 local_state: Option<Py<PyAny>>,
175 response_port: ResponsePort,
178}
179
180enum ResponsePort {
181 Dropping,
182 Port(Port),
183 Local(LocalPort),
184}
185
186impl ResponsePort {
187 fn into_py_any(self, py: Python<'_>) -> PyResult<Py<PyAny>> {
188 match self {
189 ResponsePort::Dropping => DroppingPort.into_py_any(py),
190 ResponsePort::Port(port) => port.into_py_any(py),
191 ResponsePort::Local(port) => port.into_py_any(py),
192 }
193 }
194}
195
196#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
199pub struct QueuedMessage {
200 #[pyo3(get)]
201 pub context: Py<crate::context::PyContext>,
202 #[pyo3(get)]
203 pub method: MethodSpecifier,
204 #[pyo3(get)]
205 pub bytes: FrozenBuffer,
206 #[pyo3(get)]
207 pub local_state: Py<PyAny>,
208 #[pyo3(get)]
209 pub response_port: Py<PyAny>,
210}
211
212impl PythonMessage {
213 pub fn new_from_buf(kind: PythonMessageKind, message: impl Into<Part>) -> Self {
214 Self {
215 kind,
216 message: message.into(),
217 }
218 }
219
220 pub fn into_rank(self, rank: usize) -> Self {
221 let rank = Some(rank);
222 match self.kind {
223 PythonMessageKind::Result { .. } => PythonMessage {
224 kind: PythonMessageKind::Result { rank },
225 message: self.message,
226 },
227 PythonMessageKind::Exception { .. } => PythonMessage {
228 kind: PythonMessageKind::Exception { rank },
229 message: self.message,
230 },
231 _ => panic!("PythonMessage is not a response but {:?}", self),
232 }
233 }
234 async fn resolve_indirect_call(
235 self,
236 cx: &Context<'_, PythonActor>,
237 ) -> anyhow::Result<ResolvedCallMethod> {
238 match self.kind {
239 PythonMessageKind::CallMethodIndirect {
240 name,
241 local_state_broker,
242 id,
243 unflatten_args,
244 } => {
245 let broker = BrokerId::new(local_state_broker).resolve(cx).await;
246 let (send, recv) = cx.open_once_port();
247 broker.send(cx, LocalStateBrokerMessage::Get(id, send))?;
248 let state = recv.recv().await?;
249 let mut state_it = state.state.into_iter();
250 monarch_with_gil(|py| {
251 let mailbox = mailbox(py, cx);
252 let local_state = Some(
253 PyList::new(
254 py,
255 unflatten_args.into_iter().map(|x| -> Bound<'_, PyAny> {
256 match x {
257 UnflattenArg::Mailbox => mailbox.clone(),
258 UnflattenArg::PyObject => {
259 state_it.next().unwrap().into_bound(py)
260 }
261 }
262 }),
263 )
264 .unwrap()
265 .into(),
266 );
267 let response_port = ResponsePort::Local(LocalPort {
268 instance: cx.into(),
269 inner: Some(state.response_port),
270 });
271 Ok(ResolvedCallMethod {
272 method: name,
273 bytes: FrozenBuffer {
274 inner: self.message.into_bytes(),
275 },
276 local_state,
277 response_port,
278 })
279 })
280 .await
281 }
282 PythonMessageKind::CallMethod {
283 name,
284 response_port,
285 } => {
286 let response_port = response_port.map_or(ResponsePort::Dropping, |port_ref| {
287 let point = cx.cast_point();
288 ResponsePort::Port(Port {
289 port_ref,
290 instance: cx.instance().clone_for_py(),
291 rank: Some(point.rank()),
292 })
293 });
294 Ok(ResolvedCallMethod {
295 method: name,
296 bytes: FrozenBuffer {
297 inner: self.message.into_bytes(),
298 },
299 local_state: None,
300 response_port,
301 })
302 }
303 _ => {
304 panic!("unexpected message kind {:?}", self.kind)
305 }
306 }
307 }
308}
309
310impl std::fmt::Debug for PythonMessage {
311 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
312 f.debug_struct("PythonMessage")
313 .field("kind", &self.kind)
314 .field(
315 "message",
316 &wirevalue::HexFmt(&(*self.message.to_bytes())[..]).to_string(),
317 )
318 .finish()
319 }
320}
321
322impl Unbind for PythonMessage {
323 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
324 match &self.kind {
325 PythonMessageKind::CallMethod { response_port, .. } => response_port.unbind(bindings),
326 _ => Ok(()),
327 }
328 }
329}
330
331impl Bind for PythonMessage {
332 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
333 match &mut self.kind {
334 PythonMessageKind::CallMethod { response_port, .. } => response_port.bind(bindings),
335 _ => Ok(()),
336 }
337 }
338}
339
340#[pymethods]
341impl PythonMessage {
342 #[new]
343 #[pyo3(signature = (kind, message))]
344 pub fn new<'py>(kind: PythonMessageKind, message: PyRef<'py, FrozenBuffer>) -> PyResult<Self> {
345 Ok(PythonMessage::new_from_buf(kind, message.inner.clone()))
346 }
347
348 #[getter]
349 fn kind(&self) -> PythonMessageKind {
350 self.kind.clone()
351 }
352
353 #[getter]
354 fn message(&self) -> FrozenBuffer {
355 FrozenBuffer {
356 inner: self.message.to_bytes(),
357 }
358 }
359}
360
361#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
362pub(super) struct PythonActorHandle {
363 pub(super) inner: ActorHandle<PythonActor>,
364}
365
366#[pymethods]
367impl PythonActorHandle {
368 fn send(&self, instance: &PyInstance, message: &PythonMessage) -> PyResult<()> {
370 self.inner
371 .send(instance.deref(), message.clone())
372 .map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
373 Ok(())
374 }
375
376 fn bind(&self) -> PyActorId {
377 self.inner.bind::<PythonActor>().into_actor_id().into()
378 }
379}
380
381#[derive(Debug)]
383pub enum PythonActorDispatchMode {
384 Direct,
386 Queue {
388 sender: pympsc::Sender,
390 receiver: Option<pympsc::PyReceiver>,
392 },
393}
394
395#[derive(Debug)]
397#[hyperactor::export(
398 spawn = true,
399 handlers = [
400 PythonMessage { cast = true },
401 MeshFailure { cast = true },
402 ],
403)]
404pub struct PythonActor {
405 actor: Py<PyAny>,
407 task_locals: Option<pyo3_async_runtimes::TaskLocals>,
410 instance: Option<Py<crate::context::PyInstance>>,
413 dispatch_mode: PythonActorDispatchMode,
415 spawn_point: OnceLock<Option<Point>>,
417 init_message: Option<PythonMessage>,
419}
420
421impl PythonActor {
422 pub(crate) fn new(
423 actor_type: PickledPyObject,
424 init_message: Option<PythonMessage>,
425 spawn_point: Option<Point>,
426 ) -> Result<Self, anyhow::Error> {
427 let use_queue_dispatch = hyperactor_config::global::get(ACTOR_QUEUE_DISPATCH);
428
429 Ok(monarch_with_gil_blocking(
430 |py| -> Result<Self, SerializablePyErr> {
431 let unpickled = actor_type.unpickle(py)?;
432 let class_type: &Bound<'_, PyType> = unpickled.downcast()?;
433 let actor: Py<PyAny> = class_type.call0()?.into_py_any(py)?;
434
435 let task_locals = (!hyperactor_config::global::get(SHARED_ASYNCIO_RUNTIME))
437 .then(|| Python::detach(py, create_task_locals));
438
439 let dispatch_mode = if use_queue_dispatch {
440 let (sender, receiver) = pympsc::channel().map_err(|e| {
441 let py_err = PyRuntimeError::new_err(e.to_string());
442 SerializablePyErr::from(py, &py_err)
443 })?;
444 PythonActorDispatchMode::Queue {
445 sender,
446 receiver: Some(receiver),
447 }
448 } else {
449 PythonActorDispatchMode::Direct
450 };
451
452 Ok(Self {
453 actor,
454 task_locals,
455 instance: None,
456 dispatch_mode,
457 spawn_point: OnceLock::from(spawn_point),
458 init_message,
459 })
460 },
461 )?)
462 }
463
464 fn get_task_locals(&self, py: Python) -> &pyo3_async_runtimes::TaskLocals {
467 self.task_locals
468 .as_ref()
469 .unwrap_or_else(|| shared_task_locals(py))
470 }
471
472 pub(crate) fn bootstrap_client(py: Python<'_>) -> (&'static Instance<Self>, ActorHandle<Self>) {
475 static ROOT_CLIENT_INSTANCE: OnceLock<Instance<PythonActor>> = OnceLock::new();
476
477 let client_proc = Proc::direct(
478 default_bind_spec().binding_addr(),
479 "mesh_root_client_proc".into(),
480 )
481 .unwrap();
482
483 Self::bootstrap_client_inner(py, client_proc, &ROOT_CLIENT_INSTANCE)
484 }
485
486 pub(crate) fn bootstrap_client_inner(
490 py: Python<'_>,
491 client_proc: Proc,
492 root_client_instance: &'static OnceLock<Instance<PythonActor>>,
493 ) -> (&'static Instance<Self>, ActorHandle<Self>) {
494 let actor_mesh_mod = py
495 .import("monarch._src.actor.actor_mesh")
496 .expect("import actor_mesh");
497 let root_client_class = actor_mesh_mod
498 .getattr("RootClientActor")
499 .expect("get RootClientActor");
500
501 let actor_type =
502 PickledPyObject::pickle(&actor_mesh_mod.getattr("_Actor").expect("get _Actor"))
503 .expect("pickle _Actor");
504
505 let init_frozen_buffer: FrozenBuffer = root_client_class
506 .call_method0("_pickled_init_args")
507 .expect("call RootClientActor._pickled_init_args")
508 .extract()
509 .expect("extract FrozenBuffer from _pickled_init_args");
510 let init_message = PythonMessage::new_from_buf(
511 PythonMessageKind::CallMethod {
512 name: MethodSpecifier::Init {},
513 response_port: None,
514 },
515 init_frozen_buffer,
516 );
517
518 let mut actor = PythonActor::new(
519 actor_type,
520 Some(init_message),
521 Some(extent!().point_of_rank(0).unwrap()),
522 )
523 .expect("create client PythonActor");
524
525 let ai = client_proc
526 .actor_instance(
527 root_client_class
528 .getattr("name")
529 .expect("get RootClientActor.name")
530 .extract()
531 .expect("extract RootClientActor.name"),
532 )
533 .expect("root instance create");
534
535 let handle = ai.handle;
536 let signal_rx = ai.signal;
537 let supervision_rx = ai.supervision;
538 let work_rx = ai.work;
539
540 root_client_instance
541 .set(ai.instance)
542 .map_err(|_| "already initialized root client instance")
543 .unwrap();
544 let instance = root_client_instance.get().unwrap();
545
546 instance.set_system();
550
551 let _client_ref = handle.bind::<PythonActor>();
554
555 get_tokio_runtime().spawn(async move {
556 actor.init(instance).await.unwrap();
558
559 let mut signal_rx = signal_rx;
560 let mut supervision_rx = supervision_rx;
561 let mut work_rx = work_rx;
562 let mut need_drain = false;
563 let mut err = 'messages: loop {
564 tokio::select! {
565 work = work_rx.recv() => {
566 let work = work.expect("inconsistent work queue state");
567 if let Err(err) = work.handle(&mut actor, instance).await {
568 let kind = ActorErrorKind::processing(err);
569 let err = ActorError {
570 actor_id: Box::new(instance.self_id().clone()),
571 kind: Box::new(kind),
572 };
573 let supervision_event = actor_error_to_event(instance, &actor, err);
579 if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
583 for supervision_event in supervision_rx.drain() {
584 if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
585 break 'messages Some(err);
586 }
587 }
588 break Some(err);
589 }
590 }
591 }
592 signal = signal_rx.recv() => {
593 let signal = signal.map_err(ActorError::from);
594 tracing::info!(actor_id = %instance.self_id(), "client received signal {signal:?}");
595 match signal {
596 Ok(signal@(Signal::Stop(_) | Signal::DrainAndStop(_))) => {
597 need_drain = matches!(signal, Signal::DrainAndStop(_));
598 break None;
599 },
600 Ok(Signal::ChildStopped(_)) => {},
601 Ok(Signal::Abort(reason)) => {
602 break Some(ActorError { actor_id: Box::new(instance.self_id().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) })
603 },
604 Err(err) => break Some(err),
605 }
606 }
607 Ok(supervision_event) = supervision_rx.recv() => {
608 if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
609 break Some(err);
610 }
611 }
612 };
613 };
614 if need_drain {
615 let mut n = 0;
616 while let Ok(work) = work_rx.try_recv() {
617 if let Err(e) = work.handle(&mut actor, instance).await {
618 err = Some(ActorError {
619 actor_id: Box::new(instance.self_id().clone()),
620 kind: Box::new(ActorErrorKind::processing(e)),
621 });
622 break;
623 }
624 n += 1;
625 }
626 tracing::debug!(actor_id = %instance.self_id(), "client drained {} messages before stopping", n);
627 }
628 if let Some(err) = err {
629 let event = actor_error_to_event(instance, &actor, err);
630 tracing::error!(
634 actor_id = %instance.self_id(),
635 "could not propagate supervision event {} because it reached the global client: signaling KeyboardInterrupt to main thread",
636 event,
637 );
638
639 monarch_with_gil_blocking(|py| {
648 let thread_mod = py.import("_thread").expect("import _thread");
651 let interrupt_main = thread_mod
652 .getattr("interrupt_main")
653 .expect("get interrupt_main");
654
655 if let Err(e) = interrupt_main.call0() {
657 tracing::error!("unable to interrupt main, exiting the process instead: {:?}", e);
658 eprintln!("unable to interrupt main, exiting the process with code 1 instead: {:?}", e);
659 std::process::exit(1);
660 }
661 });
662 } else {
663 tracing::info!(actor_id = %instance.self_id(), "client stopped");
664 }
665 });
666
667 (root_client_instance.get().unwrap(), handle)
668 }
669}
670
671fn actor_error_to_event(
672 instance: &Instance<PythonActor>,
673 actor: &PythonActor,
674 err: ActorError,
675) -> ActorSupervisionEvent {
676 match *err.kind {
677 ActorErrorKind::UnhandledSupervisionEvent(event) => *event,
678 _ => {
679 let status = ActorStatus::generic_failure(err.kind.to_string());
680 ActorSupervisionEvent::new(
681 instance.self_id().clone(),
682 actor.display_name(),
683 status,
684 None,
685 )
686 }
687 }
688}
689
690pub(crate) fn root_client_actor(py: Python<'_>) -> &'static Instance<PythonActor> {
691 static ROOT_CLIENT_ACTOR: OnceLock<&'static Instance<PythonActor>> = OnceLock::new();
692
693 py.detach(|| {
698 ROOT_CLIENT_ACTOR.get_or_init(|| {
699 monarch_with_gil_blocking(|py| {
700 let (client, _handle) = PythonActor::bootstrap_client(py);
701 client
702 })
703 })
704 })
705}
706
707#[async_trait]
708impl Actor for PythonActor {
709 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
710 if let PythonActorDispatchMode::Queue { receiver, .. } = &mut self.dispatch_mode {
711 let receiver = receiver.take().unwrap();
712
713 let error_port: hyperactor::PortHandle<PythonMessage> =
716 this.port::<Signal>().contramap(|msg: PythonMessage| {
717 monarch_with_gil_blocking(|py| {
718 let err = match msg.kind {
719 PythonMessageKind::Exception { .. } => {
720 let cloudpickle = py.import("cloudpickle").unwrap();
722 let err_obj = cloudpickle
723 .call_method1("loads", (msg.message.to_bytes().as_ref(),))
724 .unwrap();
725 let py_err = pyo3::PyErr::from_value(err_obj);
726 SerializablePyErr::from(py, &py_err)
727 }
728 _ => {
729 let py_err = PyRuntimeError::new_err(format!(
730 "expected Exception, got {:?}",
731 msg.kind
732 ));
733 SerializablePyErr::from(py, &py_err)
734 }
735 };
736 Signal::Abort(err.to_string())
737 })
738 });
739
740 let error_port_handle = PythonPortHandle::new(error_port);
741
742 monarch_with_gil(|py| {
743 let tl = self
744 .task_locals
745 .as_ref()
746 .unwrap_or_else(|| shared_task_locals(py));
747 let awaitable = self.actor.call_method(
748 py,
749 "_dispatch_loop",
750 (receiver, error_port_handle),
751 None,
752 )?;
753 let future =
754 pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))?;
755 tokio::spawn(async move {
756 if let Err(e) = future.await {
757 tracing::error!("message loop error: {}", e);
758 }
759 });
760 Ok::<_, anyhow::Error>(())
761 })
762 .await?;
763 }
764
765 if let Some(init_message) = self.init_message.take() {
766 let spawn_point = self.spawn_point.get().unwrap().as_ref().expect("PythonActor should never be spawned with init_message unless spawn_point also specified").clone();
767 let mut headers = Flattrs::new();
768 headers.set(CAST_POINT, spawn_point);
769 let cx = Context::new(this, headers);
770 <Self as Handler<PythonMessage>>::handle(self, &cx, init_message).await?;
771 }
772
773 Ok(())
774 }
775
776 async fn cleanup(
777 &mut self,
778 this: &Instance<Self>,
779 err: Option<&ActorError>,
780 ) -> anyhow::Result<()> {
781 let cx = Context::new(this, Flattrs::new());
785 let err_as_str = err.map(|e| e.to_string());
789 let future = monarch_with_gil(|py| {
790 let py_cx = match &self.instance {
791 Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
792 None => {
793 let py_instance: crate::context::PyInstance = this.into();
794 crate::context::PyContext::new(
795 &cx,
796 py_instance
797 .into_py_any(py)?
798 .downcast_bound(py)
799 .map_err(PyErr::from)?
800 .clone()
801 .unbind(),
802 )
803 }
804 }
805 .into_bound_py_any(py)?;
806 let actor = self.actor.bind(py);
807 match actor.hasattr("__cleanup__") {
810 Ok(false) | Err(_) => {
811 return Ok(None);
813 }
814 _ => {}
815 }
816 let awaitable = actor
817 .call_method("__cleanup__", (&py_cx, err_as_str), None)
818 .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?;
819 if awaitable.is_none() {
820 Ok(None)
821 } else {
822 pyo3_async_runtimes::into_future_with_locals(self.get_task_locals(py), awaitable)
823 .map(Some)
824 .map_err(anyhow::Error::from)
825 }
826 })
827 .await?;
828 if let Some(future) = future {
829 future.await.map_err(anyhow::Error::from)?;
830 }
831 Ok(())
832 }
833
834 fn display_name(&self) -> Option<String> {
835 self.instance.as_ref().and_then(|instance| {
836 monarch_with_gil_blocking(|py| instance.bind(py).str().ok().map(|s| s.to_string()))
837 })
838 }
839
840 async fn handle_undeliverable_message(
841 &mut self,
842 ins: &Instance<Self>,
843 mut envelope: Undeliverable<MessageEnvelope>,
844 ) -> Result<(), anyhow::Error> {
845 if envelope.0.sender() != ins.self_id() {
846 envelope = update_undeliverable_envelope_for_casting(envelope);
848 }
849 assert_eq!(
850 envelope.0.sender(),
851 ins.self_id(),
852 "undeliverable message was returned to the wrong actor. \
853 Return address = {}, src actor = {}, dest actor port = {}, envelope headers = {}",
854 envelope.0.sender(),
855 ins.self_id(),
856 envelope.0.dest(),
857 envelope.0.headers()
858 );
859
860 let cx = Context::new(ins, envelope.0.headers().clone());
861
862 let (envelope, handled) = monarch_with_gil(|py| {
863 let py_cx = match &self.instance {
864 Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
865 None => {
866 let py_instance: crate::context::PyInstance = ins.into();
867 crate::context::PyContext::new(
868 &cx,
869 py_instance
870 .into_py_any(py)?
871 .downcast_bound(py)
872 .map_err(PyErr::from)?
873 .clone()
874 .unbind(),
875 )
876 }
877 }
878 .into_bound_py_any(py)?;
879 let py_envelope = PythonUndeliverableMessageEnvelope {
880 inner: Some(envelope),
881 }
882 .into_bound_py_any(py)?;
883 let handled = self
884 .actor
885 .call_method(
886 py,
887 "_handle_undeliverable_message",
888 (&py_cx, &py_envelope),
889 None,
890 )
891 .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?
892 .extract::<bool>(py)?;
893 Ok::<_, anyhow::Error>((
894 py_envelope
895 .downcast::<PythonUndeliverableMessageEnvelope>()
896 .map_err(PyErr::from)?
897 .try_borrow_mut()
898 .map_err(PyErr::from)?
899 .take()?,
900 handled,
901 ))
902 })
903 .await?;
904
905 if !handled {
906 hyperactor::actor::handle_undeliverable_message(ins, envelope)
907 } else {
908 Ok(())
909 }
910 }
911
912 async fn handle_supervision_event(
913 &mut self,
914 this: &Instance<Self>,
915 event: &ActorSupervisionEvent,
916 ) -> Result<bool, anyhow::Error> {
917 let cx = Context::new(this, Flattrs::new());
918 self.handle(
919 &cx,
920 MeshFailure {
921 actor_mesh_name: None,
922 rank: None,
923 event: event.clone(),
924 },
925 )
926 .await
927 .map(|_| true)
928 }
929}
930
931#[derive(Debug, Clone, Serialize, Deserialize, Named)]
932pub struct PythonActorParams {
933 actor_type: PickledPyObject,
935 init_message: Option<PythonMessage>,
937}
938
939impl PythonActorParams {
940 pub(crate) fn new(actor_type: PickledPyObject, init_message: Option<PythonMessage>) -> Self {
941 Self {
942 actor_type,
943 init_message,
944 }
945 }
946}
947
948#[async_trait]
949impl RemoteSpawn for PythonActor {
950 type Params = PythonActorParams;
951
952 async fn new(
953 PythonActorParams {
954 actor_type,
955 init_message,
956 }: PythonActorParams,
957 environment: Flattrs,
958 ) -> Result<Self, anyhow::Error> {
959 let spawn_point = environment.get(CAST_POINT);
960 Self::new(actor_type, init_message, spawn_point)
961 }
962}
963
964fn create_task_locals() -> pyo3_async_runtimes::TaskLocals {
966 monarch_with_gil_blocking(|py| {
967 let asyncio = Python::import(py, "asyncio").unwrap();
968 let event_loop = asyncio.call_method0("new_event_loop").unwrap();
969 let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
970 .copy_context(py)
971 .unwrap();
972
973 let kwargs = PyDict::new(py);
974 let target = event_loop.getattr("run_forever").unwrap();
975 kwargs.set_item("target", target).unwrap();
976 kwargs.set_item("daemon", true).unwrap();
978 let thread = py
979 .import("threading")
980 .unwrap()
981 .call_method("Thread", (), Some(&kwargs))
982 .unwrap();
983 thread.call_method0("start").unwrap();
984 task_locals
985 })
986}
987
988fn shared_task_locals(py: Python) -> &'static pyo3_async_runtimes::TaskLocals {
990 static SHARED_TASK_LOCALS: OnceLock<pyo3_async_runtimes::TaskLocals> = OnceLock::new();
991 Python::detach(py, || SHARED_TASK_LOCALS.get_or_init(create_task_locals))
992}
993
994#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1030struct PanicFlag {
1031 sender: Option<tokio::sync::oneshot::Sender<Py<PyAny>>>,
1032}
1033
1034#[pymethods]
1035impl PanicFlag {
1036 fn signal_panic(&mut self, ex: Py<PyAny>) {
1037 self.sender.take().unwrap().send(ex).unwrap();
1038 }
1039}
1040
1041#[async_trait]
1042impl Handler<PythonMessage> for PythonActor {
1043 #[tracing::instrument(level = "debug", skip_all)]
1044 async fn handle(
1045 &mut self,
1046 cx: &Context<PythonActor>,
1047 message: PythonMessage,
1048 ) -> anyhow::Result<()> {
1049 match &self.dispatch_mode {
1050 PythonActorDispatchMode::Direct => self.handle_direct(cx, message).await,
1051 PythonActorDispatchMode::Queue { sender, .. } => {
1052 let sender = sender.clone();
1053 self.handle_queue(cx, sender, message).await
1054 }
1055 }
1056 }
1057}
1058
1059impl PythonActor {
1060 async fn handle_direct(
1062 &mut self,
1063 cx: &Context<'_, PythonActor>,
1064 message: PythonMessage,
1065 ) -> anyhow::Result<()> {
1066 let resolved = message.resolve_indirect_call(cx).await?;
1067 let endpoint = resolved.method.to_string();
1068
1069 let (sender, receiver) = oneshot::channel();
1072
1073 let future = monarch_with_gil(|py| -> Result<_, SerializablePyErr> {
1074 let inst = self.instance.get_or_insert_with(|| {
1075 let inst: crate::context::PyInstance = cx.into();
1076 inst.into_pyobject(py).unwrap().into()
1077 });
1078
1079 let awaitable = self.actor.call_method(
1080 py,
1081 "handle",
1082 (
1083 crate::context::PyContext::new(cx, inst.clone_ref(py)),
1084 resolved.method,
1085 resolved.bytes,
1086 PanicFlag {
1087 sender: Some(sender),
1088 },
1089 resolved
1090 .local_state
1091 .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1092 resolved.response_port.into_py_any(py)?,
1093 ),
1094 None,
1095 )?;
1096
1097 let tl = self
1098 .task_locals
1099 .as_ref()
1100 .unwrap_or_else(|| shared_task_locals(py));
1101
1102 pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))
1103 .map_err(|err| err.into())
1104 })
1105 .await?;
1106
1107 tokio::spawn(handle_async_endpoint_panic(
1109 cx.port(),
1110 PythonTask::new(future)?,
1111 receiver,
1112 cx.self_id().to_string(),
1113 endpoint,
1114 ));
1115 Ok(())
1116 }
1117
1118 async fn handle_queue(
1121 &mut self,
1122 cx: &Context<'_, PythonActor>,
1123 sender: pympsc::Sender,
1124 message: PythonMessage,
1125 ) -> anyhow::Result<()> {
1126 let resolved = message.resolve_indirect_call(cx).await?;
1127
1128 let queued_msg = monarch_with_gil(|py| -> anyhow::Result<QueuedMessage> {
1129 let inst = self.instance.get_or_insert_with(|| {
1130 let inst: crate::context::PyInstance = cx.into();
1131 inst.into_pyobject(py).unwrap().into()
1132 });
1133
1134 let py_context = crate::context::PyContext::new(cx, inst.clone_ref(py));
1135 let py_context_obj = Py::new(py, py_context)?;
1136
1137 Ok(QueuedMessage {
1138 context: py_context_obj,
1139 method: resolved.method,
1140 bytes: resolved.bytes,
1141 local_state: resolved
1142 .local_state
1143 .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1144 response_port: resolved.response_port.into_py_any(py)?,
1145 })
1146 })
1147 .await?;
1148
1149 sender
1150 .send(queued_msg)
1151 .map_err(|_| anyhow::anyhow!("failed to send message to queue"))?;
1152
1153 Ok(())
1154 }
1155}
1156
1157#[async_trait]
1158impl Handler<MeshFailure> for PythonActor {
1159 async fn handle(&mut self, cx: &Context<Self>, message: MeshFailure) -> anyhow::Result<()> {
1160 if !message.event.actor_status.is_failed() {
1164 tracing::info!(
1165 "ignoring non-failure supervision event from child: {}",
1166 message
1167 );
1168 return Ok(());
1169 }
1170 monarch_with_gil(|py| {
1174 let inst = self.instance.get_or_insert_with(|| {
1175 let inst: crate::context::PyInstance = cx.into();
1176 inst.into_pyobject(py).unwrap().into()
1177 });
1178 let display_name: Option<String> = inst.bind(py).str().ok().map(|s| s.to_string());
1180 let actor_bound = self.actor.bind(py);
1181 if !actor_bound.hasattr("__supervise__")? {
1184 return Err(anyhow::anyhow!(
1185 "no __supervise__ method on {:?}",
1186 actor_bound
1187 ));
1188 }
1189 let result = actor_bound.call_method(
1190 "__supervise__",
1191 (
1192 crate::context::PyContext::new(cx, inst.clone_ref(py)),
1193 PyMeshFailure::from(message.clone()),
1194 ),
1195 None,
1196 );
1197 match result {
1198 Ok(s) => {
1199 if s.is_truthy()? {
1200 tracing::info!(
1205 name = "ActorMeshStatus",
1206 status = "SupervisionError::Handled",
1207 actor_name = message.actor_mesh_name,
1209 event = %message.event,
1210 "__supervise__ on {} handled a supervision event, not reporting any further",
1211 cx.self_id(),
1212 );
1213 Ok(())
1214 } else {
1215 for (actor_name, status) in [
1224 (
1225 message
1226 .actor_mesh_name
1227 .as_deref()
1228 .unwrap_or_else(|| message.event.actor_id.name()),
1229 "SupervisionError::Unhandled",
1230 ),
1231 (cx.self_id().name(), "UnhandledSupervisionEvent"),
1232 ] {
1233 tracing::info!(
1234 name = "ActorMeshStatus",
1235 status,
1236 actor_name,
1237 event = %message.event,
1238 "__supervise__ on {} did not handle a supervision event, reporting to the next next owner",
1239 cx.self_id(),
1240 );
1241 }
1242 let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1243 ActorSupervisionEvent::new(
1244 cx.self_id().clone(),
1245 display_name.clone(),
1246 ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(
1247 Box::new(message.event.clone()),
1248 )),
1249 None,
1250 ),
1251 ));
1252 Err(anyhow::Error::new(err))
1253 }
1254 }
1255 Err(err) => {
1256 for (actor_name, status) in [
1262 (
1263 message
1264 .actor_mesh_name
1265 .as_deref()
1266 .unwrap_or_else(|| message.event.actor_id.name()),
1267 "SupervisionError::__supervise__::exception",
1268 ),
1269 (cx.self_id().name(), "UnhandledSupervisionEvent"),
1270 ] {
1271 tracing::info!(
1272 name = "ActorMeshStatus",
1273 status,
1274 actor_name,
1275 event = %message.event,
1276 "__supervise__ on {} threw an exception",
1277 cx.self_id(),
1278 );
1279 }
1280 let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1281 ActorSupervisionEvent::new(
1282 cx.self_id().clone(),
1283 display_name,
1284 ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
1285 err.to_string(),
1286 Box::new(message.event.clone()),
1287 )),
1288 None,
1289 ),
1290 ));
1291 Err(anyhow::Error::new(err))
1292 }
1293 }
1294 })
1295 .await
1296 }
1297}
1298
1299async fn handle_async_endpoint_panic(
1300 panic_sender: PortHandle<Signal>,
1301 task: PythonTask,
1302 side_channel: oneshot::Receiver<Py<PyAny>>,
1303 actor_id: String,
1304 endpoint: String,
1305) {
1306 let attributes =
1308 hyperactor_telemetry::kv_pairs!("actor_id" => actor_id, "endpoint" => endpoint);
1309
1310 let start_time = std::time::Instant::now();
1312
1313 ENDPOINT_ACTOR_COUNT.add(1, attributes);
1315
1316 let err_or_never = async {
1317 match side_channel.await {
1320 Ok(value) => {
1321 monarch_with_gil(|py| -> Option<SerializablePyErr> {
1322 let err: PyErr = value
1323 .downcast_bound::<PyBaseException>(py)
1324 .unwrap()
1325 .clone()
1326 .into();
1327 ENDPOINT_ACTOR_PANIC.add(1, attributes);
1328 Some(err.into())
1329 })
1330 .await
1331 }
1332 Err(_) => pending().await,
1337 }
1338 };
1339 let future = task.take();
1340 if let Some(panic) = tokio::select! {
1341 result = future => {
1342 match result {
1343 Ok(_) => None,
1344 Err(e) => Some(e.into()),
1345 }
1346 },
1347 result = err_or_never => {
1348 result
1349 }
1350 } {
1351 ENDPOINT_ACTOR_ERROR.add(1, attributes);
1353 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
1354 let client = &CLIENT
1355 .get_or_init(|| {
1356 get_proc_runtime()
1357 .instance("async_endpoint_handler")
1358 .unwrap()
1359 })
1360 .0;
1361 panic_sender
1362 .send(&client, Signal::Abort(panic.to_string()))
1363 .expect("Unable to send panic message");
1364 }
1365
1366 let elapsed_micros = start_time.elapsed().as_micros() as f64;
1368 ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM.record(elapsed_micros, attributes);
1369}
1370
1371#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1372struct LocalPort {
1373 instance: PyInstance,
1374 inner: Option<OncePortHandle<Result<Py<PyAny>, Py<PyAny>>>>,
1375}
1376
1377impl Debug for LocalPort {
1378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1379 f.debug_struct("LocalPort")
1380 .field("inner", &self.inner)
1381 .finish()
1382 }
1383}
1384
1385pub(crate) fn to_py_error<T>(e: T) -> PyErr
1386where
1387 T: Error,
1388{
1389 PyErr::new::<PyValueError, _>(e.to_string())
1390}
1391
1392#[pymethods]
1393impl LocalPort {
1394 fn send(&mut self, obj: Py<PyAny>) -> PyResult<()> {
1395 let port = self.inner.take().expect("use local port once");
1396 port.send(self.instance.deref(), Ok(obj))
1397 .map_err(to_py_error)
1398 }
1399 fn exception(&mut self, e: Py<PyAny>) -> PyResult<()> {
1400 let port = self.inner.take().expect("use local port once");
1401 port.send(self.instance.deref(), Err(e))
1402 .map_err(to_py_error)
1403 }
1404}
1405
1406#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1410#[derive(Debug)]
1411pub struct DroppingPort;
1412
1413#[pymethods]
1414impl DroppingPort {
1415 #[new]
1416 fn new() -> Self {
1417 DroppingPort
1418 }
1419
1420 fn send(&self, _obj: Py<PyAny>) -> PyResult<()> {
1421 Ok(())
1422 }
1423
1424 fn exception(&self, e: Bound<'_, PyAny>) -> PyResult<()> {
1425 let exc = if let Ok(inner) = e.getattr("exception") {
1427 inner
1428 } else {
1429 e
1430 };
1431 Err(PyErr::from_value(exc))
1432 }
1433
1434 #[getter]
1435 fn get_return_undeliverable(&self) -> bool {
1436 true
1437 }
1438
1439 #[setter]
1440 fn set_return_undeliverable(&self, _value: bool) {}
1441}
1442
1443#[pyclass(module = "monarch._src.actor.actor_mesh")]
1446pub struct Port {
1447 port_ref: EitherPortRef,
1448 instance: Instance<PythonActor>,
1449 rank: Option<usize>,
1450}
1451
1452#[pymethods]
1453impl Port {
1454 #[new]
1455 fn new(
1456 port_ref: EitherPortRef,
1457 instance: &crate::context::PyInstance,
1458 rank: Option<usize>,
1459 ) -> Self {
1460 Self {
1461 port_ref,
1462 instance: instance.clone().into_instance(),
1463 rank,
1464 }
1465 }
1466
1467 #[getter("_port_ref")]
1468 fn port_ref_py(&self) -> EitherPortRef {
1469 self.port_ref.clone()
1470 }
1471
1472 #[getter("_rank")]
1473 fn rank_py(&self) -> Option<usize> {
1474 self.rank
1475 }
1476
1477 #[getter]
1478 fn get_return_undeliverable(&self) -> bool {
1479 self.port_ref.get_return_undeliverable()
1480 }
1481
1482 #[setter]
1483 fn set_return_undeliverable(&mut self, value: bool) {
1484 self.port_ref.set_return_undeliverable(value);
1485 }
1486
1487 fn send(&mut self, py: Python<'_>, obj: Py<PyAny>) -> PyResult<()> {
1488 let message = PythonMessage::new_from_buf(
1489 PythonMessageKind::Result { rank: self.rank },
1490 pickle_to_part(py, &obj)?,
1491 );
1492
1493 self.port_ref
1494 .send(&self.instance, message)
1495 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1496 }
1497
1498 fn exception(&mut self, py: Python<'_>, e: Py<PyAny>) -> PyResult<()> {
1499 let message = PythonMessage::new_from_buf(
1500 PythonMessageKind::Exception { rank: self.rank },
1501 pickle_to_part(py, &e)?,
1502 );
1503
1504 self.port_ref
1505 .send(&self.instance, message)
1506 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1507 }
1508}
1509
1510pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
1511 hyperactor_mod.add_class::<PythonActorHandle>()?;
1512 hyperactor_mod.add_class::<PythonMessage>()?;
1513 hyperactor_mod.add_class::<PythonMessageKind>()?;
1514 hyperactor_mod.add_class::<MethodSpecifier>()?;
1515 hyperactor_mod.add_class::<UnflattenArg>()?;
1516 hyperactor_mod.add_class::<PanicFlag>()?;
1517 hyperactor_mod.add_class::<QueuedMessage>()?;
1518 hyperactor_mod.add_class::<DroppingPort>()?;
1519 hyperactor_mod.add_class::<Port>()?;
1520 Ok(())
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525 use hyperactor::accum::ReducerSpec;
1526 use hyperactor::accum::StreamingReducerOpts;
1527 use hyperactor::message::ErasedUnbound;
1528 use hyperactor::message::Unbound;
1529 use hyperactor::reference;
1530 use hyperactor::testing::ids::test_port_id;
1531 use hyperactor_mesh::Error as MeshError;
1532 use hyperactor_mesh::Name;
1533 use hyperactor_mesh::host_mesh::host_agent::ProcState;
1534 use hyperactor_mesh::resource::Status;
1535 use hyperactor_mesh::resource::{self};
1536 use pyo3::PyTypeInfo;
1537
1538 use super::*;
1539 use crate::actor::to_py_error;
1540
1541 #[test]
1542 fn test_python_message_bind_unbind() {
1543 let reducer_spec = ReducerSpec {
1544 typehash: 123,
1545 builder_params: Some(wirevalue::Any::serialize(&"abcdefg12345".to_string()).unwrap()),
1546 };
1547 let port_ref = reference::PortRef::<PythonMessage>::attest_reducible(
1548 test_port_id("world_0", "client", 123),
1549 Some(reducer_spec),
1550 StreamingReducerOpts::default(),
1551 );
1552 let message = PythonMessage {
1553 kind: PythonMessageKind::CallMethod {
1554 name: MethodSpecifier::ReturnsResponse {
1555 name: "test".to_string(),
1556 },
1557 response_port: Some(EitherPortRef::Unbounded(port_ref.clone().into())),
1558 },
1559 message: Part::from(vec![1, 2, 3]),
1560 };
1561 {
1562 let mut erased = ErasedUnbound::try_from_message(message.clone()).unwrap();
1563 let mut bindings = vec![];
1564 erased
1565 .visit_mut::<reference::UnboundPort>(|b| {
1566 bindings.push(b.clone());
1567 Ok(())
1568 })
1569 .unwrap();
1570 assert_eq!(bindings, vec![reference::UnboundPort::from(&port_ref)]);
1571 let unbound = Unbound::try_from_message(message.clone()).unwrap();
1572 assert_eq!(message, unbound.bind().unwrap());
1573 }
1574
1575 let no_port_message = PythonMessage {
1576 kind: PythonMessageKind::CallMethod {
1577 name: MethodSpecifier::ReturnsResponse {
1578 name: "test".to_string(),
1579 },
1580 response_port: None,
1581 },
1582 ..message
1583 };
1584 {
1585 let mut erased = ErasedUnbound::try_from_message(no_port_message.clone()).unwrap();
1586 let mut bindings = vec![];
1587 erased
1588 .visit_mut::<reference::UnboundPort>(|b| {
1589 bindings.push(b.clone());
1590 Ok(())
1591 })
1592 .unwrap();
1593 assert_eq!(bindings.len(), 0);
1594 let unbound = Unbound::try_from_message(no_port_message.clone()).unwrap();
1595 assert_eq!(no_port_message, unbound.bind().unwrap());
1596 }
1597 }
1598
1599 #[test]
1600 fn to_py_error_preserves_proc_creation_message() {
1601 let state: resource::State<ProcState> = resource::State {
1603 name: Name::new("my_proc").unwrap(),
1604 status: Status::Failed("boom".into()),
1605 state: None,
1606 };
1607
1608 let mesh_agent: hyperactor::reference::ActorRef<hyperactor_mesh::host_mesh::HostAgent> =
1610 hyperactor::reference::ActorRef::attest(
1611 test_port_id("hello_0", "actor", 0).actor_id().clone(),
1612 );
1613 let expected_prefix = format!(
1614 "error creating proc (host rank 0) on host mesh agent {}",
1615 mesh_agent
1616 );
1617 let err = MeshError::ProcCreationError {
1618 host_rank: 0,
1619 mesh_agent,
1620 state: Box::new(state),
1621 };
1622
1623 let rust_msg = err.to_string();
1624 let pyerr = to_py_error(err);
1625
1626 pyo3::Python::initialize();
1627 monarch_with_gil_blocking(|py| {
1628 assert!(pyerr.get_type(py).is(PyValueError::type_object(py)));
1629 let py_msg = pyerr.value(py).to_string();
1630
1631 assert_eq!(py_msg, rust_msg);
1633 assert!(py_msg.contains(", state: "));
1635 assert!(py_msg.contains("\"status\":{\"Failed\":\"boom\"}"));
1636 assert!(py_msg.starts_with(&expected_prefix));
1638 });
1639 }
1640}