Skip to main content

monarch_hyperactor/
actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::error::Error;
10use std::fmt::Debug;
11use std::future::pending;
12use std::ops::Deref;
13use std::sync::OnceLock;
14
15use async_trait::async_trait;
16use hyperactor::Actor;
17use hyperactor::ActorHandle;
18use hyperactor::Context;
19use hyperactor::Endpoint as _;
20use hyperactor::Handler;
21use hyperactor::Instance;
22use hyperactor::OncePortHandle;
23use hyperactor::PortHandle;
24use hyperactor::Proc;
25use hyperactor::RemoteSpawn;
26use hyperactor::actor::ActorError;
27use hyperactor::actor::ActorErrorKind;
28use hyperactor::actor::ActorStatus;
29use hyperactor::actor::Signal;
30use hyperactor::context::Actor as ContextActor;
31use hyperactor::mailbox::MessageEnvelope;
32use hyperactor::mailbox::Undeliverable;
33use hyperactor::mailbox::UndeliverableMessageError;
34use hyperactor::message::Bind;
35use hyperactor::message::Bindings;
36use hyperactor::message::IndexedErasedUnbound;
37use hyperactor::message::Unbind;
38use hyperactor::supervision::ActorSupervisionEvent;
39use hyperactor_config::Flattrs;
40use hyperactor_mesh::casting::update_undeliverable_envelope_for_casting;
41use hyperactor_mesh::comm::multicast::CAST_POINT;
42use hyperactor_mesh::comm::multicast::CastInfo;
43use hyperactor_mesh::supervision::MeshFailure;
44use hyperactor_mesh::transport::default_bind_spec;
45use hyperactor_mesh::value_mesh::ValueOverlay;
46use monarch_types::PickledPyObject;
47use monarch_types::SerializablePyErr;
48use monarch_types::py_global;
49use ndslice::Point;
50use ndslice::extent;
51use pyo3::IntoPyObjectExt;
52use pyo3::exceptions::PyBaseException;
53use pyo3::exceptions::PyRuntimeError;
54use pyo3::exceptions::PyValueError;
55use pyo3::prelude::*;
56use pyo3::types::PyDict;
57use pyo3::types::PyList;
58use pyo3::types::PyType;
59use serde::Deserialize;
60use serde::Serialize;
61use serde_multipart::Part;
62use tokio::sync::oneshot;
63use typeuri::Named;
64
65use crate::buffers::FrozenBuffer;
66use crate::config::ACTOR_QUEUE_DISPATCH;
67use crate::config::SHARED_ASYNCIO_RUNTIME;
68use crate::context::PyInstance;
69use crate::local_state_broker::BrokerId;
70use crate::local_state_broker::LocalStateBrokerMessage;
71use crate::mailbox::EitherPortRef;
72use crate::mailbox::PyMailbox;
73use crate::mailbox::PythonPortHandle;
74use crate::mailbox::PythonUndeliverableMessageEnvelope;
75use crate::metrics::ENDPOINT_ACTOR_COUNT;
76use crate::metrics::ENDPOINT_ACTOR_ERROR;
77use crate::metrics::ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM;
78use crate::metrics::ENDPOINT_ACTOR_PANIC;
79use crate::pickle::pickle_to_part;
80use crate::proc::PyActorAddr;
81use crate::pympsc;
82use crate::pytokio::PythonTask;
83use crate::runtime::get_proc_runtime;
84use crate::runtime::get_tokio_runtime;
85use crate::runtime::monarch_with_gil;
86use crate::runtime::monarch_with_gil_blocking;
87use crate::supervision::PyMeshFailure;
88
89py_global!(
90    unhandled_fault_hook_exception,
91    "monarch._src.actor.supervision",
92    "UnhandledFaultHookException"
93);
94
95#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
96#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
97pub enum UnflattenArg {
98    Mailbox,
99    PyObject,
100}
101
102#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
103#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
104pub enum MethodSpecifier {
105    /// Call method 'name', send its return value to the response port.
106    ReturnsResponse { name: String },
107    /// Call method 'name', send the response port as the first argument.
108    ExplicitPort { name: String },
109    /// Construct the object
110    Init {},
111}
112
113impl std::fmt::Display for MethodSpecifier {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        write!(f, "{}", self.name())
116    }
117}
118
119#[pymethods]
120impl MethodSpecifier {
121    #[getter(name)]
122    fn py_name(&self) -> &str {
123        self.name()
124    }
125}
126
127impl MethodSpecifier {
128    pub(crate) fn name(&self) -> &str {
129        match self {
130            MethodSpecifier::ReturnsResponse { name } => name,
131            MethodSpecifier::ExplicitPort { name } => name,
132            MethodSpecifier::Init {} => "__init__",
133        }
134    }
135}
136
137/// The payload of a single actor response, without rank information.
138///
139/// The rank is captured by the overlay's range key, so it is stripped
140/// from the value to enable RLE dedup: two ranks returning the same
141/// payload will have byte-identical values and can be coalesced into
142/// a single run.
143#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Eq)]
144pub enum PythonResponseMessage {
145    Result(serde_multipart::Part),
146    Exception(serde_multipart::Part),
147}
148
149wirevalue::register_type!(PythonResponseMessage);
150wirevalue::register_type!(ValueOverlay<PythonResponseMessage>);
151
152/// Newtype wrapper around [`ValueOverlay<PythonResponseMessage>`] needed
153/// because `PythonMessageKind` is a `#[pyclass]` enum, requiring all variant
154/// fields to implement PyO3 traits. `ValueOverlay` is defined in another crate
155/// and does not implement `PyClass`.
156#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
157#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
158pub struct AccumulatedResponses(ValueOverlay<PythonResponseMessage>);
159
160#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
161#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq)]
162pub enum PythonMessageKind {
163    CallMethod {
164        name: MethodSpecifier,
165        response_port: Option<EitherPortRef>,
166    },
167    Result {
168        rank: Option<usize>,
169    },
170    Exception {
171        rank: Option<usize>,
172    },
173    Uninit {},
174    CallMethodIndirect {
175        name: MethodSpecifier,
176        local_state_broker: (String, usize),
177        id: usize,
178        // specify whether the argument to unflatten the local mailbox,
179        // or the next argument of the local state.
180        unflatten_args: Vec<UnflattenArg>,
181    },
182    AccumulatedResponses(AccumulatedResponses),
183}
184wirevalue::register_type!(PythonMessageKind);
185
186impl Default for PythonMessageKind {
187    fn default() -> Self {
188        PythonMessageKind::Uninit {}
189    }
190}
191
192fn mailbox<'py, T: Actor>(py: Python<'py>, cx: &Context<'_, T>) -> Bound<'py, PyAny> {
193    let mailbox: PyMailbox = cx.mailbox_for_py().clone().into();
194    mailbox.into_bound_py_any(py).unwrap()
195}
196
197#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
198#[derive(Clone, Serialize, Deserialize, Named, Default, PartialEq)]
199pub struct PythonMessage {
200    pub kind: PythonMessageKind,
201    pub message: Part,
202}
203
204/// Extract the endpoint method name from a [`PythonMessage`].
205fn python_message_endpoint_name(msg: &PythonMessage) -> Option<String> {
206    match &msg.kind {
207        PythonMessageKind::CallMethod { name, .. }
208        | PythonMessageKind::CallMethodIndirect { name, .. } => Some(name.name().to_string()),
209        _ => None,
210    }
211}
212
213// We use manual `submit!` instead of `register_type!` because PythonMessage is a
214// struct, so the default `endpoint_name` (which delegates to `arm_unchecked`)
215// always returns None. The custom implementation inspects `PythonMessageKind` to
216// extract the method name. This registration handles direct (non-cast) dispatch.
217wirevalue::submit! {
218    wirevalue::TypeInfo {
219        typename: <PythonMessage as wirevalue::Named>::typename,
220        typehash: <PythonMessage as wirevalue::Named>::typehash,
221        typeid: <PythonMessage as wirevalue::Named>::typeid,
222        port: <PythonMessage as wirevalue::Named>::port,
223        dump: Some(<PythonMessage as wirevalue::NamedDumpable>::dump),
224        arm_unchecked: <PythonMessage as wirevalue::Named>::arm_unchecked,
225        endpoint_name: |ptr| {
226            // SAFETY: ptr points to a PythonMessage.
227            let msg = unsafe { &*(ptr as *const PythonMessage) };
228            python_message_endpoint_name(msg)
229        },
230    }
231}
232
233// Cast messages arrive as IndexedErasedUnbound<PythonMessage>, which wraps a
234// serialized PythonMessage. This type has no `register_type!` by default (it
235// shares ErasedUnbound's wire format), so we register it explicitly. The
236// endpoint_name deserializes the inner payload to read the method name. This
237// costs one extra deserialization per message, but the Part payload uses
238// zero-copy Bytes refcounting, and Python actor throughput is GIL-bounded,
239// so the serde overhead is negligible relative to Python-side processing.
240wirevalue::submit! {
241    wirevalue::TypeInfo {
242        typename: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typename,
243        typehash: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typehash,
244        typeid: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typeid,
245        port: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::port,
246        dump: None,
247        arm_unchecked: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::arm_unchecked,
248        endpoint_name: |ptr| {
249            // SAFETY: ptr points to an IndexedErasedUnbound<PythonMessage>.
250            let erased = unsafe { &*(ptr as *const IndexedErasedUnbound<PythonMessage>) };
251            erased
252                .inner_any()
253                .deserialized_unchecked::<PythonMessage>()
254                .ok()
255                .and_then(|msg| python_message_endpoint_name(&msg))
256        },
257    }
258}
259
260impl From<ValueOverlay<PythonResponseMessage>> for PythonMessage {
261    fn from(overlay: ValueOverlay<PythonResponseMessage>) -> Self {
262        PythonMessage {
263            kind: PythonMessageKind::AccumulatedResponses(AccumulatedResponses(overlay)),
264            message: Default::default(),
265        }
266    }
267}
268
269impl PythonMessage {
270    /// Consume this message and extract a `ValueOverlay<PythonResponseMessage>`.
271    ///
272    /// Handles both already-collected responses and leaf `Result`/`Exception`
273    /// messages by wrapping them in a single-run overlay.
274    pub fn into_overlay(self) -> anyhow::Result<ValueOverlay<PythonResponseMessage>> {
275        match self.kind {
276            PythonMessageKind::AccumulatedResponses(overlay) => Ok(overlay.0),
277            PythonMessageKind::Result { rank, .. } => {
278                let rank = rank.expect("accumulated response should have a rank");
279                let mut overlay = ValueOverlay::new();
280                overlay.push_run(rank..rank + 1, PythonResponseMessage::Result(self.message))?;
281                Ok(overlay)
282            }
283            PythonMessageKind::Exception { rank, .. } => {
284                let rank = rank.expect("accumulated exception should have a rank");
285                let mut overlay = ValueOverlay::new();
286                overlay.push_run(
287                    rank..rank + 1,
288                    PythonResponseMessage::Exception(self.message),
289                )?;
290                Ok(overlay)
291            }
292            other => {
293                anyhow::bail!(
294                    "unexpected message kind {:?} in collected responses reducer",
295                    other
296                );
297            }
298        }
299    }
300}
301
302struct ResolvedCallMethod {
303    method: MethodSpecifier,
304    bytes: FrozenBuffer,
305    local_state: Option<Py<PyAny>>,
306    /// Implements PortProtocol
307    /// Concretely either a Port, DroppingPort, or LocalPort
308    response_port: ResponsePort,
309}
310
311enum ResponsePort {
312    Dropping,
313    Port(Port),
314    Local(LocalPort),
315}
316
317impl ResponsePort {
318    fn into_py_any(self, py: Python<'_>) -> PyResult<Py<PyAny>> {
319        match self {
320            ResponsePort::Dropping => DroppingPort.into_py_any(py),
321            ResponsePort::Port(port) => port.into_py_any(py),
322            ResponsePort::Local(port) => port.into_py_any(py),
323        }
324    }
325}
326
327/// Message sent through the queue in queue-dispatch mode.
328/// Contains pre-resolved components ready for Python consumption.
329#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
330pub struct QueuedMessage {
331    #[pyo3(get)]
332    pub context: Py<crate::context::PyContext>,
333    #[pyo3(get)]
334    pub method: MethodSpecifier,
335    #[pyo3(get)]
336    pub bytes: FrozenBuffer,
337    #[pyo3(get)]
338    pub local_state: Py<PyAny>,
339    #[pyo3(get)]
340    pub response_port: Py<PyAny>,
341}
342
343impl PythonMessage {
344    pub fn new_from_buf(kind: PythonMessageKind, message: impl Into<Part>) -> Self {
345        Self {
346            kind,
347            message: message.into(),
348        }
349    }
350
351    pub fn into_rank(self, rank: usize) -> Self {
352        let rank = Some(rank);
353        match self.kind {
354            PythonMessageKind::Result { .. } => PythonMessage {
355                kind: PythonMessageKind::Result { rank },
356                message: self.message,
357            },
358            PythonMessageKind::Exception { .. } => PythonMessage {
359                kind: PythonMessageKind::Exception { rank },
360                message: self.message,
361            },
362            _ => panic!("PythonMessage is not a response but {:?}", self),
363        }
364    }
365    async fn resolve_indirect_call(
366        self,
367        cx: &Context<'_, PythonActor>,
368    ) -> anyhow::Result<ResolvedCallMethod> {
369        match self.kind {
370            PythonMessageKind::CallMethodIndirect {
371                name,
372                local_state_broker,
373                id,
374                unflatten_args,
375            } => {
376                let broker = BrokerId::new(local_state_broker).resolve(cx).await;
377                let (send, recv) = cx.open_once_port();
378                broker.post(cx, LocalStateBrokerMessage::Get(id, send));
379                let state = recv.recv().await?;
380                let mut state_it = state.state.into_iter();
381                monarch_with_gil(|py| {
382                    let mailbox = mailbox(py, cx);
383                    let local_state = Some(
384                        PyList::new(
385                            py,
386                            unflatten_args.into_iter().map(|x| -> Bound<'_, PyAny> {
387                                match x {
388                                    UnflattenArg::Mailbox => mailbox.clone(),
389                                    UnflattenArg::PyObject => {
390                                        state_it.next().unwrap().into_bound(py)
391                                    }
392                                }
393                            }),
394                        )
395                        .unwrap()
396                        .into(),
397                    );
398                    let response_port = ResponsePort::Local(LocalPort {
399                        instance: cx.into(),
400                        inner: Some(state.response_port),
401                    });
402                    Ok(ResolvedCallMethod {
403                        method: name,
404                        bytes: FrozenBuffer {
405                            inner: self.message.into_bytes(),
406                        },
407                        local_state,
408                        response_port,
409                    })
410                })
411                .await
412            }
413            PythonMessageKind::CallMethod {
414                name,
415                response_port,
416            } => {
417                let method_name = name.name().to_string();
418                let response_port = response_port.map_or(ResponsePort::Dropping, |port_ref| {
419                    let point = cx.cast_point();
420                    // Carry operation context onto the reply: copy
421                    // OPERATION_*-marked keys from the inbound
422                    // envelope, falling back to the method name when
423                    // the caller didn't stamp.
424                    let mut reply_headers = hyperactor_config::Flattrs::new();
425                    hyperactor_config::attrs::copy_marked_flattrs(
426                        &mut reply_headers,
427                        cx.headers(),
428                        hyperactor_config::attrs::OPERATION_CONTEXT_HEADER,
429                    );
430                    if reply_headers
431                        .get(hyperactor::mailbox::headers::OPERATION_ENDPOINT)
432                        .is_none()
433                    {
434                        reply_headers.set(
435                            hyperactor::mailbox::headers::OPERATION_ENDPOINT,
436                            format!("{}()", method_name),
437                        );
438                    }
439                    ResponsePort::Port(Port::with_reply_headers(
440                        port_ref,
441                        cx.instance().clone_for_py(),
442                        Some(point.rank()),
443                        reply_headers,
444                    ))
445                });
446                Ok(ResolvedCallMethod {
447                    method: name,
448                    bytes: FrozenBuffer {
449                        inner: self.message.into_bytes(),
450                    },
451                    local_state: None,
452                    response_port,
453                })
454            }
455            _ => {
456                panic!("unexpected message kind {:?}", self.kind)
457            }
458        }
459    }
460}
461
462impl std::fmt::Debug for PythonMessage {
463    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464        f.debug_struct("PythonMessage")
465            .field("kind", &self.kind)
466            .field(
467                "message",
468                &wirevalue::HexFmt(&(*self.message.to_bytes())[..]).to_string(),
469            )
470            .finish()
471    }
472}
473
474impl Unbind for PythonMessage {
475    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
476        match &self.kind {
477            PythonMessageKind::CallMethod { response_port, .. } => response_port.unbind(bindings),
478            _ => Ok(()),
479        }
480    }
481}
482
483impl Bind for PythonMessage {
484    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
485        match &mut self.kind {
486            PythonMessageKind::CallMethod { response_port, .. } => response_port.bind(bindings),
487            _ => Ok(()),
488        }
489    }
490}
491
492#[pymethods]
493impl PythonMessage {
494    #[new]
495    #[pyo3(signature = (kind, message))]
496    pub fn new(kind: PythonMessageKind, message: PyRef<'_, FrozenBuffer>) -> PyResult<Self> {
497        Ok(PythonMessage::new_from_buf(kind, message.inner.clone()))
498    }
499
500    #[getter]
501    fn kind(&self) -> PythonMessageKind {
502        self.kind.clone()
503    }
504
505    #[getter]
506    fn message(&self) -> FrozenBuffer {
507        FrozenBuffer {
508            inner: self.message.to_bytes(),
509        }
510    }
511}
512
513#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
514pub(super) struct PythonActorHandle {
515    pub(super) inner: ActorHandle<PythonActor>,
516}
517
518#[pymethods]
519impl PythonActorHandle {
520    // TODO: do the pickling in rust
521    fn send(&self, instance: &PyInstance, message: &PythonMessage) -> PyResult<()> {
522        self.inner.post(instance.deref(), message.clone());
523        Ok(())
524    }
525
526    fn bind(&self) -> PyActorAddr {
527        self.inner.bind::<PythonActor>().into_actor_addr().into()
528    }
529}
530
531/// Dispatch mode for Python actors.
532#[derive(Debug)]
533pub enum PythonActorDispatchMode {
534    /// Direct dispatch: Rust acquires the GIL and calls Python handlers directly.
535    Direct,
536    /// Queue dispatch: Rust enqueues messages to a channel; Python dequeues and dispatches.
537    Queue {
538        /// Channel sender for enqueuing messages to Python.
539        sender: pympsc::Sender,
540        /// Channel receiver, taken during Actor::init to start the message loop.
541        receiver: Option<pympsc::PyReceiver>,
542    },
543}
544
545/// An actor for which message handlers are implemented in Python.
546#[derive(Debug)]
547#[hyperactor::export(
548    handlers = [
549        PythonMessage { cast = true },
550        MeshFailure { cast = true },
551    ],
552)]
553#[hyperactor::spawnable]
554pub struct PythonActor {
555    /// The Python object that we delegate message handling to.
556    actor: Py<PyAny>,
557    /// Stores a reference to the Python event loop to run Python coroutines on.
558    /// This is None when using single runtime mode, Some when using per-actor mode.
559    task_locals: Option<pyo3_async_runtimes::TaskLocals>,
560    /// Instance object that we keep across handle calls so that we can store
561    /// information from the Init (spawn rank, controller) and provide it to other calls.
562    instance: Option<Py<crate::context::PyInstance>>,
563    /// Dispatch mode for this actor.
564    dispatch_mode: PythonActorDispatchMode,
565    /// The location in the actor mesh at which this actor was spawned.
566    spawn_point: OnceLock<Option<Point>>,
567    /// Initial message to process during PythonActor::init.
568    init_message: Option<PythonMessage>,
569    /// User-provided mesh base-name string plumbed from
570    /// `PythonActorParams`. This is the base name the caller
571    /// supplied when the mesh was spawned, narrowly used to populate
572    /// `MeshFailure.actor_mesh_name` on the direct actor-handled
573    /// supervision path without a lookup. It is not actor display
574    /// text (`display_name` handles that) and it is not a general
575    /// side channel; downstream code must not consume this field for
576    /// any other purpose.
577    mesh_base_name: Option<String>,
578}
579
580impl PythonActor {
581    pub(crate) fn new(
582        actor_type: PickledPyObject,
583        init_message: Option<PythonMessage>,
584        spawn_point: Option<Point>,
585        mesh_base_name: Option<String>,
586    ) -> Result<Self, anyhow::Error> {
587        let use_queue_dispatch = hyperactor_config::global::get(ACTOR_QUEUE_DISPATCH);
588
589        Ok(monarch_with_gil_blocking(
590            |py| -> Result<Self, SerializablePyErr> {
591                let unpickled = actor_type.unpickle(py)?;
592                let class_type: &Bound<'_, PyType> = unpickled.downcast()?;
593                let actor: Py<PyAny> = class_type.call0()?.into_py_any(py)?;
594
595                // Only create per-actor TaskLocals if not using shared runtime
596                let task_locals = (!hyperactor_config::global::get(SHARED_ASYNCIO_RUNTIME))
597                    .then(|| Python::detach(py, create_task_locals));
598
599                let dispatch_mode = if use_queue_dispatch {
600                    let (sender, receiver) = pympsc::channel().map_err(|e| {
601                        let py_err = PyRuntimeError::new_err(e.to_string());
602                        SerializablePyErr::from(py, &py_err)
603                    })?;
604                    PythonActorDispatchMode::Queue {
605                        sender,
606                        receiver: Some(receiver),
607                    }
608                } else {
609                    PythonActorDispatchMode::Direct
610                };
611
612                Ok(Self {
613                    actor,
614                    task_locals,
615                    instance: None,
616                    dispatch_mode,
617                    spawn_point: OnceLock::from(spawn_point),
618                    init_message,
619                    mesh_base_name,
620                })
621            },
622        )?)
623    }
624
625    /// Get the TaskLocals to use for this actor.
626    /// Returns either the shared TaskLocals or this actor's own TaskLocals based on configuration.
627    fn get_task_locals(&self, py: Python) -> &pyo3_async_runtimes::TaskLocals {
628        self.task_locals
629            .as_ref()
630            .unwrap_or_else(|| shared_task_locals(py))
631    }
632
633    /// Bootstrap the root client actor, creating a new proc for it.
634    /// This is the legacy entry point that creates its own proc.
635    pub(crate) fn bootstrap_client(py: Python<'_>) -> (&'static Instance<Self>, ActorHandle<Self>) {
636        static ROOT_CLIENT_INSTANCE: OnceLock<Instance<PythonActor>> = OnceLock::new();
637
638        let client_proc = Proc::direct(
639            default_bind_spec().binding_addr(),
640            "mesh_root_client_proc".into(),
641        )
642        .unwrap();
643
644        Self::bootstrap_client_inner(py, client_proc, &ROOT_CLIENT_INSTANCE)
645    }
646
647    /// Bootstrap the client proc, storing the root client instance in given static.
648    /// This is passed in because we require storage, as the instance is shared.
649    /// This can be simplified when we remove v0.
650    pub(crate) fn bootstrap_client_inner(
651        py: Python<'_>,
652        client_proc: Proc,
653        root_client_instance: &'static OnceLock<Instance<PythonActor>>,
654    ) -> (&'static Instance<Self>, ActorHandle<Self>) {
655        let actor_mesh_mod = py
656            .import("monarch._src.actor.actor_mesh")
657            .expect("import actor_mesh");
658        let root_client_class = actor_mesh_mod
659            .getattr("RootClientActor")
660            .expect("get RootClientActor");
661
662        let actor_type =
663            PickledPyObject::pickle(&actor_mesh_mod.getattr("_Actor").expect("get _Actor"))
664                .expect("pickle _Actor");
665
666        let init_frozen_buffer: FrozenBuffer = root_client_class
667            .call_method0("_pickled_init_args")
668            .expect("call RootClientActor._pickled_init_args")
669            .extract()
670            .expect("extract FrozenBuffer from _pickled_init_args");
671        let init_message = PythonMessage::new_from_buf(
672            PythonMessageKind::CallMethod {
673                name: MethodSpecifier::Init {},
674                response_port: None,
675            },
676            init_frozen_buffer,
677        );
678
679        let mut actor = PythonActor::new(
680            actor_type,
681            Some(init_message),
682            Some(extent!().point_of_rank(0).unwrap()),
683            None, // root client actor has no user-facing mesh name
684        )
685        .expect("create client PythonActor");
686
687        let ai = client_proc
688            .actor_instance(
689                root_client_class
690                    .getattr("name")
691                    .expect("get RootClientActor.name")
692                    .extract()
693                    .expect("extract RootClientActor.name"),
694            )
695            .expect("root instance create");
696
697        let handle = ai.handle;
698        let signal_rx = ai.signal;
699        let supervision_rx = ai.supervision;
700        let work_rx = ai.work;
701
702        root_client_instance
703            .set(ai.instance)
704            .map_err(|_| "already initialized root client instance")
705            .unwrap();
706        let instance = root_client_instance.get().unwrap();
707
708        // The root client PythonActor uses a custom run loop that
709        // bypasses Actor::init, so mark it as system explicitly
710        // (matching GlobalClientActor::fresh_instance).
711        instance.set_system();
712
713        // Bind to ensure the Undeliverable<MessageEnvelope> port is bound.
714        let _client_ref = handle.bind::<PythonActor>();
715
716        get_tokio_runtime().spawn(async move {
717            // This is gross. Sorry.
718            actor.init(instance).await.unwrap();
719
720            let mut signal_rx = signal_rx;
721            let mut supervision_rx = supervision_rx;
722            let mut work_rx = work_rx;
723            let mut need_drain = false;
724            let mut err = 'messages: loop {
725                tokio::select! {
726                    work = work_rx.recv() => {
727                        let work = work.expect("inconsistent work queue state");
728                        if let Err(err) = work.handle(&mut actor, instance).await {
729                            // Check for UnhandledFaultHookException on the raw
730                            // anyhow::Error before wrapping in ActorErrorKind.
731                            // If __supervise__ already processed the supervision
732                            // event and the hook raised, don't re-handle it via
733                            // handle_supervision_event — that would call
734                            // __supervise__ a second time.
735                            let is_hook_exception = monarch_with_gil(|py| {
736                                err.downcast_ref::<pyo3::PyErr>()
737                                    .is_some_and(|pyerr| {
738                                        pyerr.is_instance(
739                                            py,
740                                            &unhandled_fault_hook_exception(py),
741                                        )
742                                    })
743                            }).await;
744
745                            let kind = ActorErrorKind::processing(err);
746                            let err = ActorError {
747                                actor_id: Box::new(instance.self_addr().clone()),
748                                kind: Box::new(kind),
749                            };
750
751                            if is_hook_exception {
752                                break Some(err);
753                            }
754
755                            // Give the actor a chance to handle the error produced
756                            // in its own message handler. This is important because
757                            // we want Undeliverable<MessageEnvelope>, which returns
758                            // an Err typically, to create a supervision event and
759                            // call __supervise__.
760                            let supervision_event = actor_error_to_event(instance, &actor, err);
761                            // If the immediate supervision event isn't handled, continue with
762                            // exiting the loop.
763                            // Else, continue handling messages.
764                            if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
765                                while let Ok(supervision_event) = supervision_rx.try_recv() {
766                                    if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
767                                        break 'messages Some(err);
768                                    }
769                                }
770                                break Some(err);
771                            }
772                        }
773                    }
774                    signal = signal_rx.recv() => {
775                        let signal = signal.map_err(ActorError::from);
776                        tracing::info!(actor_id = %instance.self_addr(), "client received signal {signal:?}");
777                        match signal {
778                            Ok(signal@(Signal::Stop(_) | Signal::DrainAndStop(_))) => {
779                                need_drain = matches!(signal, Signal::DrainAndStop(_));
780                                break None;
781                            },
782                            Ok(Signal::ExitRequested(_)) => break None,
783                            Ok(Signal::ChildStopped(_)) => {},
784                            Ok(Signal::Kill(reason)) => {
785                                break Some(ActorError { actor_id: Box::new(instance.self_addr().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) })
786                            },
787                            Err(err) => break Some(err),
788                        }
789                    }
790                    Some(supervision_event) = supervision_rx.recv() => {
791                        if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
792                            break Some(err);
793                        }
794                    }
795                };
796            };
797            if need_drain {
798                let mut n = 0;
799                while let Ok(work) = work_rx.try_recv() {
800                    if let Err(e) = work.handle(&mut actor, instance).await {
801                        err = Some(ActorError {
802                            actor_id: Box::new(instance.self_addr().clone()),
803                            kind: Box::new(ActorErrorKind::processing(e)),
804                        });
805                        break;
806                    }
807                    n += 1;
808                }
809                tracing::debug!(actor_id = %instance.self_addr(), "client drained {} messages before stopping", n);
810            }
811            if let Some(err) = err {
812                let event = actor_error_to_event(instance, &actor, err);
813                // The proc supervision handler will send to ProcAgent, which
814                // just records it in v1. We want to crash instead, as nothing will
815                // monitor the client ProcAgent for now.
816                tracing::error!(
817                    actor_id = %instance.self_addr(),
818                    "could not propagate supervision event {} because it reached the global client: signaling KeyboardInterrupt to main thread",
819                    event,
820                );
821
822                // This is running in a background thread, and thus cannot run
823                // Py_FinalizeEx when it exits the process to properly shut down
824                // all python objects.
825                // We use _thread.interrupt_main to raise a KeyboardInterrupt
826                // to the main thread at some point in the future.
827                // There is no way to propagate the exception message, but it
828                // will at least run proper shutdown code as long as BaseException
829                // isn't caught.
830                monarch_with_gil_blocking(|py| {
831                    // Use _thread.interrupt_main to force the client to exit if it has an
832                    // unhandled supervision event.
833                    let thread_mod = py.import("_thread").expect("import _thread");
834                    let interrupt_main = thread_mod
835                        .getattr("interrupt_main")
836                        .expect("get interrupt_main");
837
838                    // Ignore any exception from calling interrupt_main
839                    if let Err(e) = interrupt_main.call0() {
840                        tracing::error!("unable to interrupt main, exiting the process instead: {:?}", e);
841                        eprintln!("unable to interrupt main, exiting the process with code 1 instead: {:?}", e);
842                        std::process::exit(1);
843                    }
844                });
845            } else {
846                tracing::info!(actor_id = %instance.self_addr(), "client stopped");
847                instance.change_status(hyperactor::actor::ActorStatus::Stopped("client stopped".into()));
848            }
849        });
850
851        (root_client_instance.get().unwrap(), handle)
852    }
853}
854
855fn actor_error_to_event(
856    instance: &Instance<PythonActor>,
857    actor: &PythonActor,
858    err: ActorError,
859) -> ActorSupervisionEvent {
860    match *err.kind {
861        ActorErrorKind::UnhandledSupervisionEvent(event) => *event,
862        _ => {
863            let status = ActorStatus::generic_failure(err.kind.to_string());
864            ActorSupervisionEvent::new(
865                instance.self_addr().clone(),
866                actor.display_name(),
867                status,
868                None,
869            )
870        }
871    }
872}
873
874pub(crate) fn root_client_actor(py: Python<'_>) -> &'static Instance<PythonActor> {
875    static ROOT_CLIENT_ACTOR: OnceLock<&'static Instance<PythonActor>> = OnceLock::new();
876
877    // Release the GIL before waiting on ROOT_CLIENT_ACTOR, because PythonActor::bootstrap_client
878    // may release/reacquire the GIL; if thread 0 holds the GIL blocking on ROOT_CLIENT_ACTOR.get_or_init
879    // while thread 1 blocks on acquiring the GIL inside PythonActor::bootstrap_client, we get
880    // a deadlock.
881    py.detach(|| {
882        ROOT_CLIENT_ACTOR.get_or_init(|| {
883            monarch_with_gil_blocking(|py| {
884                let (client, _handle) = PythonActor::bootstrap_client(py);
885                client
886            })
887        })
888    })
889}
890
891#[async_trait]
892impl Actor for PythonActor {
893    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
894        if let PythonActorDispatchMode::Queue { receiver, .. } = &mut self.dispatch_mode {
895            let receiver = receiver.take().unwrap();
896
897            // Create an error port that converts PythonMessage to an abort signal.
898            // This allows Python to send errors that trigger actor supervision.
899            let error_port: hyperactor::PortHandle<PythonMessage> =
900                this.signal_port().contramap(|msg: PythonMessage| {
901                    monarch_with_gil_blocking(|py| {
902                        let err = match msg.kind {
903                            PythonMessageKind::Exception { .. } => {
904                                // Deserialize the error from the message
905                                let cloudpickle = py.import("cloudpickle").unwrap();
906                                let err_obj = cloudpickle
907                                    .call_method1("loads", (msg.message.to_bytes().as_ref(),))
908                                    .unwrap();
909                                let py_err = pyo3::PyErr::from_value(err_obj);
910                                SerializablePyErr::from(py, &py_err)
911                            }
912                            _ => {
913                                let py_err = PyRuntimeError::new_err(format!(
914                                    "expected Exception, got {:?}",
915                                    msg.kind
916                                ));
917                                SerializablePyErr::from(py, &py_err)
918                            }
919                        };
920                        Signal::Kill(err.to_string())
921                    })
922                });
923
924            let error_port_handle = PythonPortHandle::new(error_port);
925
926            monarch_with_gil(|py| {
927                let tl = self
928                    .task_locals
929                    .as_ref()
930                    .unwrap_or_else(|| shared_task_locals(py));
931                let awaitable = self.actor.call_method(
932                    py,
933                    "_dispatch_loop",
934                    (receiver, error_port_handle),
935                    None,
936                )?;
937                let future =
938                    pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))?;
939                tokio::spawn(async move {
940                    if let Err(e) = future.await {
941                        tracing::error!("message loop error: {}", e);
942                    }
943                });
944                Ok::<_, anyhow::Error>(())
945            })
946            .await?;
947        }
948
949        if let Some(init_message) = self.init_message.take() {
950            let spawn_point = self.spawn_point.get().unwrap().as_ref().expect("PythonActor should never be spawned with init_message unless spawn_point also specified").clone();
951            let mut headers = Flattrs::new();
952            headers.set(CAST_POINT, spawn_point);
953            let cx = Context::new(this, headers);
954            <Self as Handler<PythonMessage>>::handle(self, &cx, init_message).await?;
955        }
956
957        Ok(())
958    }
959
960    async fn cleanup(
961        &mut self,
962        this: &Instance<Self>,
963        err: Option<&ActorError>,
964    ) -> anyhow::Result<()> {
965        // Calls the "__cleanup__" method on the python instance to allow the actor
966        // to control its own cleanup.
967        // No headers because this isn't in the context of a message.
968        let cx = Context::new(this, Flattrs::new());
969        // Turn the ActorError into a representation of the error. We may not
970        // have an original exception object or traceback, so we just pass in
971        // the message.
972        let err_as_str = err.map(|e| e.to_string());
973        let future = monarch_with_gil(|py| {
974            let py_cx = match &self.instance {
975                Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
976                None => {
977                    let py_instance: crate::context::PyInstance = this.into();
978                    crate::context::PyContext::new(
979                        &cx,
980                        py_instance
981                            .into_py_any(py)?
982                            .downcast_bound(py)
983                            .map_err(PyErr::from)?
984                            .clone()
985                            .unbind(),
986                    )
987                }
988            }
989            .into_bound_py_any(py)?;
990            let actor = self.actor.bind(py);
991            // Some tests don't use the Actor base class, so add this check
992            // to be defensive.
993            match actor.hasattr("__cleanup__") {
994                Ok(false) | Err(_) => {
995                    // No cleanup found, default to returning None
996                    return Ok(None);
997                }
998                _ => {}
999            }
1000            let awaitable = actor
1001                .call_method("__cleanup__", (&py_cx, err_as_str), None)
1002                .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?;
1003            if awaitable.is_none() {
1004                Ok(None)
1005            } else {
1006                pyo3_async_runtimes::into_future_with_locals(self.get_task_locals(py), awaitable)
1007                    .map(Some)
1008                    .map_err(anyhow::Error::from)
1009            }
1010        })
1011        .await?;
1012        if let Some(future) = future {
1013            future.await.map_err(anyhow::Error::from)?;
1014        }
1015        Ok(())
1016    }
1017
1018    fn display_name(&self) -> Option<String> {
1019        self.instance.as_ref().and_then(|instance| {
1020            monarch_with_gil_blocking(|py| instance.bind(py).str().ok().map(|s| s.to_string()))
1021        })
1022    }
1023
1024    async fn handle_undeliverable_message(
1025        &mut self,
1026        ins: &Instance<Self>,
1027        mut envelope: Undeliverable<MessageEnvelope>,
1028    ) -> Result<(), anyhow::Error> {
1029        if envelope
1030            .as_message()
1031            .is_some_and(|envelope| envelope.sender() != ins.self_addr())
1032        {
1033            // This can happen if the sender is comm. Update the envelope.
1034            envelope = update_undeliverable_envelope_for_casting(envelope);
1035        }
1036        let envelope = match envelope {
1037            Undeliverable::Message(envelope) => envelope,
1038            Undeliverable::Lost(lost) => {
1039                return Err(UndeliverableMessageError::Lost { lost }.into());
1040            }
1041        };
1042        assert_eq!(
1043            envelope.sender(),
1044            ins.self_addr(),
1045            "undeliverable message was returned to the wrong actor. \
1046            Return address = {}, src actor = {}, dest handler port = {}, message type = {}, envelope headers = {}",
1047            envelope.sender(),
1048            ins.self_addr(),
1049            envelope.dest(),
1050            envelope.data().typename().unwrap_or("unknown"),
1051            envelope.headers()
1052        );
1053
1054        let cx = Context::new(ins, envelope.headers().clone());
1055
1056        let (envelope, handled) = monarch_with_gil(|py| {
1057            let py_cx = match &self.instance {
1058                Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
1059                None => {
1060                    let py_instance: crate::context::PyInstance = ins.into();
1061                    crate::context::PyContext::new(
1062                        &cx,
1063                        py_instance
1064                            .into_py_any(py)?
1065                            .downcast_bound(py)
1066                            .map_err(PyErr::from)?
1067                            .clone()
1068                            .unbind(),
1069                    )
1070                }
1071            }
1072            .into_bound_py_any(py)?;
1073            let py_envelope = PythonUndeliverableMessageEnvelope {
1074                inner: Some(Undeliverable::Message(envelope)),
1075            }
1076            .into_bound_py_any(py)?;
1077            let handled = self
1078                .actor
1079                .call_method(
1080                    py,
1081                    "_handle_undeliverable_message",
1082                    (&py_cx, &py_envelope),
1083                    None,
1084                )
1085                .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?
1086                .extract::<bool>(py)?;
1087            Ok::<_, anyhow::Error>((
1088                py_envelope
1089                    .downcast::<PythonUndeliverableMessageEnvelope>()
1090                    .map_err(PyErr::from)?
1091                    .try_borrow_mut()
1092                    .map_err(PyErr::from)?
1093                    .take()?,
1094                handled,
1095            ))
1096        })
1097        .await?;
1098
1099        if !handled {
1100            hyperactor::actor::handle_undeliverable_message(ins, envelope)
1101        } else {
1102            Ok(())
1103        }
1104    }
1105
1106    async fn handle_supervision_event(
1107        &mut self,
1108        this: &Instance<Self>,
1109        event: &ActorSupervisionEvent,
1110    ) -> Result<bool, anyhow::Error> {
1111        let cx = Context::new(this, Flattrs::new());
1112        self.handle(
1113            &cx,
1114            MeshFailure {
1115                // Populate the mesh name from the base-name string
1116                // plumbed through PythonActorParams at spawn time —
1117                // no lookup.
1118                actor_mesh_name: self.mesh_base_name.clone(),
1119                event: event.clone(),
1120                crashed_ranks: vec![],
1121            },
1122        )
1123        .await
1124        .map(|_| true)
1125    }
1126}
1127
1128#[derive(Debug, Clone, Serialize, Deserialize, Named)]
1129pub struct PythonActorParams {
1130    // The pickled actor class to instantiate.
1131    actor_type: PickledPyObject,
1132    // Python message to process as part of the actor initialization.
1133    init_message: Option<PythonMessage>,
1134    // User-provided mesh base-name string under which this actor
1135    // was spawned. The base name the caller passed when the mesh
1136    // was spawned, plumbed through `PythonActor` narrowly to
1137    // populate `MeshFailure.actor_mesh_name` on the direct
1138    // actor-handled supervision path without a lookup. It is not
1139    // actor display text (`display_name` handles that) and it is
1140    // not a general side channel; downstream code must not consume
1141    // this field for any other purpose. Kept separate from
1142    // `supervision_display_name`, which is a rendered supervision
1143    // display string passed through `spawn_with_name(...)`.
1144    mesh_base_name: Option<String>,
1145}
1146
1147impl PythonActorParams {
1148    pub(crate) fn new(
1149        actor_type: PickledPyObject,
1150        init_message: Option<PythonMessage>,
1151        mesh_base_name: Option<String>,
1152    ) -> Self {
1153        Self {
1154            actor_type,
1155            init_message,
1156            mesh_base_name,
1157        }
1158    }
1159}
1160
1161#[async_trait]
1162impl RemoteSpawn for PythonActor {
1163    type Params = PythonActorParams;
1164
1165    async fn new(
1166        PythonActorParams {
1167            actor_type,
1168            init_message,
1169            mesh_base_name,
1170        }: PythonActorParams,
1171        environment: Flattrs,
1172    ) -> Result<Self, anyhow::Error> {
1173        let spawn_point = environment.get(CAST_POINT);
1174        Self::new(actor_type, init_message, spawn_point, mesh_base_name)
1175    }
1176}
1177
1178/// Create a new TaskLocals with its own asyncio event loop in a dedicated thread.
1179fn create_task_locals() -> pyo3_async_runtimes::TaskLocals {
1180    monarch_with_gil_blocking(|py| {
1181        let asyncio = Python::import(py, "asyncio").unwrap();
1182        let event_loop = asyncio.call_method0("new_event_loop").unwrap();
1183        let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
1184            .copy_context(py)
1185            .unwrap();
1186
1187        let kwargs = PyDict::new(py);
1188        let target = event_loop.getattr("run_forever").unwrap();
1189        kwargs.set_item("target", target).unwrap();
1190        // Need to make this a daemon thread, otherwise shutdown will hang.
1191        kwargs.set_item("daemon", true).unwrap();
1192        let thread = py
1193            .import("threading")
1194            .unwrap()
1195            .call_method("Thread", (), Some(&kwargs))
1196            .unwrap();
1197        thread.call_method0("start").unwrap();
1198        task_locals
1199    })
1200}
1201
1202/// Get the shared TaskLocals, creating it if necessary.
1203fn shared_task_locals(py: Python) -> &'static pyo3_async_runtimes::TaskLocals {
1204    static SHARED_TASK_LOCALS: OnceLock<pyo3_async_runtimes::TaskLocals> = OnceLock::new();
1205    Python::detach(py, || SHARED_TASK_LOCALS.get_or_init(create_task_locals))
1206}
1207
1208// [Panics in async endpoints]
1209// This class exists to solve a deadlock when an async endpoint calls into some
1210// Rust code that panics.
1211//
1212// When an async endpoint is invoked and calls into Rust, the following sequence happens:
1213//
1214// hyperactor message -> PythonActor::handle() -> call _Actor.handle() in Python
1215//   -> convert the resulting coroutine into a Rust future, but scheduled on
1216//      the Python asyncio event loop (`into_future_with_locals`)
1217//   -> set a callback on Python asyncio loop to ping a channel that fulfills
1218//      the Rust future when the Python coroutine has finished. ('PyTaskCompleter`)
1219//
1220// This works fine for normal results and Python exceptions: we will take the
1221// result of the callback and send it through the channel, where it will be
1222// returned to the `await`er of the Rust future.
1223//
1224// This DOESN'T work for panics. The behavior of a panic in pyo3-bound code is
1225// that it will get caught by pyo3 and re-thrown to Python as a PanicException.
1226// And if that PanicException ever makes it back to Rust, it will get unwound
1227// instead of passed around as a normal PyErr type.
1228//
1229// So:
1230//   - Endpoint panics.
1231//   - This panic is captured as a PanicException in Python and
1232//     stored as the result of the Python asyncio task.
1233//   - When the callback in `PyTaskCompleter` queries the status of the task to
1234//     pass it back to the Rust awaiter, instead of getting a Result type, it
1235//     just starts resumes unwinding the PanicException
1236//   - This triggers a deadlock, because the whole task dies without ever
1237//     pinging the response channel, and the Rust awaiter will never complete.
1238//
1239// We work around this by passing a side-channel to our Python task so that it,
1240// in Python, can catch the PanicException and notify the Rust awaiter manually.
1241// In this way we can guarantee that the awaiter will complete even if the
1242// `PyTaskCompleter` callback explodes.
1243#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1244struct PanicFlag {
1245    sender: Option<tokio::sync::oneshot::Sender<Py<PyAny>>>,
1246}
1247
1248#[pymethods]
1249impl PanicFlag {
1250    fn signal_panic(&mut self, ex: Py<PyAny>) {
1251        self.sender.take().unwrap().send(ex).unwrap();
1252    }
1253}
1254
1255#[async_trait]
1256impl Handler<PythonMessage> for PythonActor {
1257    #[tracing::instrument(level = "debug", skip_all)]
1258    async fn handle(
1259        &mut self,
1260        cx: &Context<PythonActor>,
1261        message: PythonMessage,
1262    ) -> anyhow::Result<()> {
1263        match &self.dispatch_mode {
1264            PythonActorDispatchMode::Direct => self.handle_direct(cx, message).await,
1265            PythonActorDispatchMode::Queue { sender, .. } => {
1266                let sender = sender.clone();
1267                self.handle_queue(cx, sender, message).await
1268            }
1269        }
1270    }
1271}
1272
1273impl PythonActor {
1274    /// Handle a message using direct dispatch (current behavior).
1275    async fn handle_direct(
1276        &mut self,
1277        cx: &Context<'_, PythonActor>,
1278        message: PythonMessage,
1279    ) -> anyhow::Result<()> {
1280        let resolved = message.resolve_indirect_call(cx).await?;
1281        let endpoint = resolved.method.to_string();
1282
1283        // Create a channel for signaling panics in async endpoints.
1284        // See [Panics in async endpoints].
1285        let (sender, receiver) = oneshot::channel();
1286
1287        let future = monarch_with_gil(|py| -> Result<_, SerializablePyErr> {
1288            let inst = self.instance.get_or_insert_with(|| {
1289                let inst: crate::context::PyInstance = cx.into();
1290                inst.into_pyobject(py).unwrap().into()
1291            });
1292
1293            let awaitable = self.actor.call_method(
1294                py,
1295                "handle",
1296                (
1297                    crate::context::PyContext::new(cx, inst.clone_ref(py)),
1298                    resolved.method,
1299                    resolved.bytes,
1300                    PanicFlag {
1301                        sender: Some(sender),
1302                    },
1303                    resolved
1304                        .local_state
1305                        .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1306                    resolved.response_port.into_py_any(py)?,
1307                ),
1308                None,
1309            )?;
1310
1311            let tl = self
1312                .task_locals
1313                .as_ref()
1314                .unwrap_or_else(|| shared_task_locals(py));
1315
1316            pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))
1317                .map_err(|err| err.into())
1318        })
1319        .await?;
1320
1321        // Spawn a child actor to await the Python handler method.
1322        tokio::spawn(handle_async_endpoint_panic(
1323            cx.signal_port(),
1324            PythonTask::new(future)?,
1325            receiver,
1326            cx.self_addr().to_string(),
1327            endpoint,
1328        ));
1329        Ok(())
1330    }
1331
1332    /// Handle a message using queue dispatch.
1333    /// Resolves the message on the Rust side and enqueues it for Python to process.
1334    async fn handle_queue(
1335        &mut self,
1336        cx: &Context<'_, PythonActor>,
1337        sender: pympsc::Sender,
1338        message: PythonMessage,
1339    ) -> anyhow::Result<()> {
1340        let resolved = message.resolve_indirect_call(cx).await?;
1341
1342        let queued_msg = monarch_with_gil(|py| -> anyhow::Result<QueuedMessage> {
1343            let inst = self.instance.get_or_insert_with(|| {
1344                let inst: crate::context::PyInstance = cx.into();
1345                inst.into_pyobject(py).unwrap().into()
1346            });
1347
1348            let py_context = crate::context::PyContext::new(cx, inst.clone_ref(py));
1349            let py_context_obj = Py::new(py, py_context)?;
1350
1351            Ok(QueuedMessage {
1352                context: py_context_obj,
1353                method: resolved.method,
1354                bytes: resolved.bytes,
1355                local_state: resolved
1356                    .local_state
1357                    .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1358                response_port: resolved.response_port.into_py_any(py)?,
1359            })
1360        })
1361        .await?;
1362
1363        sender
1364            .send(queued_msg)
1365            .map_err(|_| anyhow::anyhow!("failed to send message to queue"))?;
1366
1367        Ok(())
1368    }
1369}
1370
1371#[async_trait]
1372impl Handler<MeshFailure> for PythonActor {
1373    async fn handle(&mut self, cx: &Context<Self>, message: MeshFailure) -> anyhow::Result<()> {
1374        // If the message is not about a failure, don't call __supervise__.
1375        // This includes messages like "stop", because those are not errors that
1376        // need to be propagated.
1377        if !message.event.actor_status.is_failed() {
1378            tracing::info!(
1379                "ignoring non-failure supervision event from child: {}",
1380                message
1381            );
1382            return Ok(());
1383        }
1384        // TODO: Consider routing supervision messages through the queue for Queue mode.
1385        // For now, supervision is always handled directly since it requires immediate response.
1386
1387        // `_Actor.__supervise__` is `async def`, so calling it returns a
1388        // coroutine, which we schedule on the actor's asyncio event loop --
1389        // the same loop that runs endpoint coroutines. A sync user
1390        // `__supervise__` is dispatched under `fake_sync_state` inside
1391        // `_Actor.__supervise__`, mirroring `__cleanup__`.
1392        let (display_name, fut) = monarch_with_gil(|py| {
1393            let inst = self.instance.get_or_insert_with(|| {
1394                let inst: crate::context::PyInstance = cx.into();
1395                inst.into_pyobject(py).unwrap().into()
1396            });
1397            // Compute display_name here since we can't call self.display_name() due to borrow.
1398            let display_name: Option<String> = inst.bind(py).str().ok().map(|s| s.to_string());
1399            let actor_bound = self.actor.bind(py);
1400            // The _Actor class always has a __supervise__ method, so this should
1401            // never happen.
1402            if !actor_bound.hasattr("__supervise__")? {
1403                return Err(anyhow::anyhow!(
1404                    "no __supervise__ method on {:?}",
1405                    actor_bound
1406                ));
1407            }
1408            let awaitable = actor_bound.call_method(
1409                "__supervise__",
1410                (
1411                    crate::context::PyContext::new(cx, inst.clone_ref(py)),
1412                    PyMeshFailure::from(message.clone()),
1413                ),
1414                None,
1415            )?;
1416            let fut =
1417                pyo3_async_runtimes::into_future_with_locals(self.get_task_locals(py), awaitable)?;
1418            anyhow::Ok((display_name, fut))
1419        })
1420        .await?;
1421
1422        let awaited = fut.await;
1423
1424        monarch_with_gil(|py| match awaited {
1425            Ok(s) => {
1426                if s.bind(py).is_truthy()? {
1427                    // If the return value is truthy, then the exception was handled
1428                    // and doesn't need to be propagated.
1429                    // TODO: We also don't want to deliver multiple supervision
1430                    // events from the same mesh if an earlier one is handled.
1431                    tracing::info!(
1432                        name = "ActorMeshStatus",
1433                        status = "SupervisionError::Handled",
1434                        // only care about the event sender when the message is handled
1435                        actor_name = message.actor_mesh_name,
1436                        event = %message.event,
1437                        "__supervise__ on {} handled a supervision event, not reporting any further",
1438                        cx.self_addr(),
1439                    );
1440                    Ok(())
1441                } else {
1442                    // For a falsey return value, we propagate the supervision event
1443                    // to the next owning actor. We do this by returning a new
1444                    // error. This will not set the causal chain for ActorSupervisionEvent,
1445                    // so make sure to include the original event in the error message
1446                    // to provide context.
1447
1448                    // False -- we propagate the event onward, but update it with the fact that
1449                    // this actor is now the event creator.
1450                    for (actor_name, status) in [
1451                        (
1452                            message
1453                                .actor_mesh_name
1454                                .as_deref()
1455                                .unwrap_or_else(|| message.event.actor_id.log_name()),
1456                            "SupervisionError::Unhandled",
1457                        ),
1458                        (cx.self_addr().log_name(), "UnhandledSupervisionEvent"),
1459                    ] {
1460                        tracing::info!(
1461                            name = "ActorMeshStatus",
1462                            status,
1463                            actor_name,
1464                            event = %message.event,
1465                            "__supervise__ on {} did not handle a supervision event, reporting to the next next owner",
1466                            cx.self_addr(),
1467                        );
1468                    }
1469                    let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1470                        ActorSupervisionEvent::new(
1471                            cx.self_addr().clone(),
1472                            display_name.clone(),
1473                            ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(
1474                                Box::new(message.event.clone()),
1475                            )),
1476                            None,
1477                        ),
1478                    ));
1479                    Err(anyhow::Error::new(err))
1480                }
1481            }
1482            Err(err) => {
1483                // If __supervise__ raised UnhandledFaultHookException,
1484                // return the PyErr directly without wrapping in
1485                // ActorErrorKind. The custom run loop detects this by
1486                // downcasting the anyhow::Error to PyErr.
1487                if err.is_instance(py, &unhandled_fault_hook_exception(py)) {
1488                    return Err(err.into());
1489                }
1490
1491                // Any other exception will supersede in the propagation chain,
1492                // and will become its own supervision failure.
1493                // Include the event it was handling in the error message.
1494
1495                // Add to caused_by chain.
1496                for (actor_name, status) in [
1497                    (
1498                        message
1499                            .actor_mesh_name
1500                            .as_deref()
1501                            .unwrap_or_else(|| message.event.actor_id.log_name()),
1502                        "SupervisionError::__supervise__::exception",
1503                    ),
1504                    (cx.self_addr().log_name(), "UnhandledSupervisionEvent"),
1505                ] {
1506                    tracing::info!(
1507                        name = "ActorMeshStatus",
1508                        status,
1509                        actor_name,
1510                        event = %message.event,
1511                        "__supervise__ on {} threw an exception",
1512                        cx.self_addr(),
1513                    );
1514                }
1515                let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1516                    ActorSupervisionEvent::new(
1517                        cx.self_addr().clone(),
1518                        display_name,
1519                        ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
1520                            err.to_string(),
1521                            Box::new(message.event.clone()),
1522                        )),
1523                        None,
1524                    ),
1525                ));
1526                Err(anyhow::Error::new(err))
1527            }
1528        })
1529        .await
1530    }
1531}
1532
1533async fn handle_async_endpoint_panic(
1534    panic_sender: PortHandle<Signal>,
1535    task: PythonTask,
1536    side_channel: oneshot::Receiver<Py<PyAny>>,
1537    actor_id: String,
1538    endpoint: String,
1539) {
1540    // Create attributes for metrics with actor_id and endpoint
1541    let attributes =
1542        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id, "endpoint" => endpoint);
1543
1544    // Record the start time for latency measurement
1545    let start_time = std::time::Instant::now();
1546
1547    // Increment throughput counter
1548    ENDPOINT_ACTOR_COUNT.add(1, attributes);
1549
1550    let err_or_never = async {
1551        // The side channel will resolve with a value if a panic occured during
1552        // processing of the async endpoint, see [Panics in async endpoints].
1553        match side_channel.await {
1554            Ok(value) => {
1555                monarch_with_gil(|py| -> Option<SerializablePyErr> {
1556                    let err: PyErr = value
1557                        .downcast_bound::<PyBaseException>(py)
1558                        .unwrap()
1559                        .clone()
1560                        .into();
1561                    ENDPOINT_ACTOR_PANIC.add(1, attributes);
1562                    Some(err.into())
1563                })
1564                .await
1565            }
1566            // An Err means that the sender has been dropped without sending.
1567            // That's okay, it just means that the Python task has completed.
1568            // In that case, just never resolve this future. We expect the other
1569            // branch of the select to finish eventually.
1570            Err(_) => pending().await,
1571        }
1572    };
1573    let future = task.take();
1574    if let Some(panic) = tokio::select! {
1575        result = future => {
1576            match result {
1577                Ok(_) => None,
1578                Err(e) => Some(e.into()),
1579            }
1580        },
1581        result = err_or_never => {
1582            result
1583        }
1584    } {
1585        // Record error and panic metrics
1586        ENDPOINT_ACTOR_ERROR.add(1, attributes);
1587        static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
1588        let client = &CLIENT
1589            .get_or_init(|| get_proc_runtime().client("async_endpoint_handler").unwrap())
1590            .0;
1591        panic_sender.post(&client, Signal::Kill(panic.to_string()));
1592    }
1593
1594    // Record latency in microseconds
1595    let elapsed_micros = start_time.elapsed().as_micros() as f64;
1596    ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM.record(elapsed_micros, attributes);
1597}
1598
1599#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1600struct LocalPort {
1601    instance: PyInstance,
1602    inner: Option<OncePortHandle<Result<Py<PyAny>, Py<PyAny>>>>,
1603}
1604
1605impl Debug for LocalPort {
1606    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1607        f.debug_struct("LocalPort")
1608            .field("inner", &self.inner)
1609            .finish()
1610    }
1611}
1612
1613pub(crate) fn to_py_error<T>(e: T) -> PyErr
1614where
1615    T: Error,
1616{
1617    PyErr::new::<PyValueError, _>(e.to_string())
1618}
1619
1620#[pymethods]
1621impl LocalPort {
1622    fn send(&mut self, obj: Py<PyAny>) -> PyResult<()> {
1623        let port = self.inner.take().expect("use local port once");
1624        port.post(self.instance.deref(), Ok(obj));
1625        Ok(())
1626    }
1627    fn exception(&mut self, e: Py<PyAny>) -> PyResult<()> {
1628        let port = self.inner.take().expect("use local port once");
1629        port.post(self.instance.deref(), Err(e));
1630        Ok(())
1631    }
1632}
1633
1634/// A port that drops all messages sent to it.
1635/// Used when there is no response port for a message.
1636/// Any exceptions sent to it are re-raised in the current actor.
1637#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1638#[derive(Debug)]
1639pub struct DroppingPort;
1640
1641#[pymethods]
1642impl DroppingPort {
1643    #[new]
1644    fn new() -> Self {
1645        DroppingPort
1646    }
1647
1648    fn send(&self, _obj: Py<PyAny>) -> PyResult<()> {
1649        Ok(())
1650    }
1651
1652    fn exception(&self, e: Bound<'_, PyAny>) -> PyResult<()> {
1653        // Unwrap ActorError to get the inner exception, matching Python behavior.
1654        let exc = if let Ok(inner) = e.getattr("exception") {
1655            inner
1656        } else {
1657            e
1658        };
1659        Err(PyErr::from_value(exc))
1660    }
1661
1662    #[getter]
1663    fn get_return_undeliverable(&self) -> bool {
1664        true
1665    }
1666
1667    #[setter]
1668    fn set_return_undeliverable(&self, _value: bool) {}
1669}
1670
1671/// A port that sends messages to a remote receiver.
1672/// Wraps an EitherPortRef with the actor instance needed for sending.
1673#[pyclass(module = "monarch._src.actor.actor_mesh")]
1674pub struct Port {
1675    port_ref: EitherPortRef,
1676    instance: Instance<PythonActor>,
1677    rank: Option<usize>,
1678    /// Operation-context headers captured from the inbound request,
1679    /// re-emitted on every reply so failure surfaces can name the
1680    /// operation.
1681    reply_headers: hyperactor_config::Flattrs,
1682}
1683
1684#[pymethods]
1685impl Port {
1686    #[new]
1687    fn new(
1688        port_ref: EitherPortRef,
1689        instance: &crate::context::PyInstance,
1690        rank: Option<usize>,
1691    ) -> Self {
1692        Self {
1693            port_ref,
1694            instance: instance.clone().into_instance(),
1695            rank,
1696            reply_headers: hyperactor_config::Flattrs::new(),
1697        }
1698    }
1699
1700    #[getter("_port_ref")]
1701    fn port_ref_py(&self) -> EitherPortRef {
1702        self.port_ref.clone()
1703    }
1704
1705    #[getter("_rank")]
1706    fn rank_py(&self) -> Option<usize> {
1707        self.rank
1708    }
1709
1710    #[getter]
1711    fn get_return_undeliverable(&self) -> bool {
1712        self.port_ref.get_return_undeliverable()
1713    }
1714
1715    #[setter]
1716    fn set_return_undeliverable(&mut self, value: bool) {
1717        self.port_ref.set_return_undeliverable(value);
1718    }
1719
1720    fn send(&mut self, py: Python<'_>, obj: Py<PyAny>) -> PyResult<()> {
1721        let message = PythonMessage::new_from_buf(
1722            PythonMessageKind::Result { rank: self.rank },
1723            pickle_to_part(py, &obj)?,
1724        );
1725
1726        self.port_ref
1727            .post_with_headers(&self.instance, self.reply_headers.clone(), message)
1728            .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1729    }
1730
1731    fn exception(&mut self, py: Python<'_>, e: Py<PyAny>) -> PyResult<()> {
1732        let message = PythonMessage::new_from_buf(
1733            PythonMessageKind::Exception { rank: self.rank },
1734            pickle_to_part(py, &e)?,
1735        );
1736
1737        self.port_ref
1738            .post_with_headers(&self.instance, self.reply_headers.clone(), message)
1739            .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1740    }
1741}
1742
1743impl Port {
1744    /// Constructor that attaches operation-context headers captured
1745    /// from the inbound request. The Python `#[new]` constructor
1746    /// defaults to empty headers.
1747    pub(crate) fn with_reply_headers(
1748        port_ref: EitherPortRef,
1749        instance: Instance<PythonActor>,
1750        rank: Option<usize>,
1751        reply_headers: hyperactor_config::Flattrs,
1752    ) -> Self {
1753        Self {
1754            port_ref,
1755            instance,
1756            rank,
1757            reply_headers,
1758        }
1759    }
1760}
1761
1762pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
1763    hyperactor_mod.add_class::<PythonActorHandle>()?;
1764    hyperactor_mod.add_class::<PythonMessage>()?;
1765    hyperactor_mod.add_class::<PythonMessageKind>()?;
1766    hyperactor_mod.add_class::<MethodSpecifier>()?;
1767    hyperactor_mod.add_class::<UnflattenArg>()?;
1768    hyperactor_mod.add_class::<PanicFlag>()?;
1769    hyperactor_mod.add_class::<QueuedMessage>()?;
1770    hyperactor_mod.add_class::<DroppingPort>()?;
1771    hyperactor_mod.add_class::<Port>()?;
1772    Ok(())
1773}
1774
1775#[cfg(test)]
1776mod tests {
1777    use hyperactor as reference;
1778    use hyperactor::accum::ReducerSpec;
1779    use hyperactor::accum::StreamingReducerOpts;
1780    use hyperactor::id::Label;
1781    use hyperactor::message::ErasedUnbound;
1782    use hyperactor::message::Unbound;
1783    use hyperactor::testing::ids::test_port_id;
1784    use hyperactor_mesh::Error as MeshError;
1785    use hyperactor_mesh::host_mesh::host_agent::ProcState;
1786    use hyperactor_mesh::mesh_id::ResourceId;
1787    use hyperactor_mesh::resource::Status;
1788    use hyperactor_mesh::resource::{self};
1789    use pyo3::PyTypeInfo;
1790
1791    use super::*;
1792    use crate::actor::to_py_error;
1793
1794    #[test]
1795    fn test_python_message_bind_unbind() {
1796        let reducer_spec = ReducerSpec {
1797            typehash: 123,
1798            builder_params: Some(wirevalue::Any::serialize(&"abcdefg12345".to_string()).unwrap()),
1799        };
1800        let port_ref = hyperactor::PortRef::<PythonMessage>::attest_reducible(
1801            test_port_id("world_0", "client", 123),
1802            Some(reducer_spec),
1803            StreamingReducerOpts::default(),
1804        );
1805        let message = PythonMessage {
1806            kind: PythonMessageKind::CallMethod {
1807                name: MethodSpecifier::ReturnsResponse {
1808                    name: "test".to_string(),
1809                },
1810                response_port: Some(EitherPortRef::Unbounded(port_ref.clone().into())),
1811            },
1812            message: Part::from(vec![1, 2, 3]),
1813        };
1814        {
1815            let mut erased = ErasedUnbound::try_from_message(message.clone()).unwrap();
1816            let mut bindings = vec![];
1817            erased
1818                .visit_mut::<reference::UnboundPort>(|b| {
1819                    bindings.push(b.clone());
1820                    Ok(())
1821                })
1822                .unwrap();
1823            assert_eq!(bindings, vec![reference::UnboundPort::from(&port_ref)]);
1824            let unbound = Unbound::try_from_message(message.clone()).unwrap();
1825            assert_eq!(message, unbound.bind().unwrap());
1826        }
1827
1828        let no_port_message = PythonMessage {
1829            kind: PythonMessageKind::CallMethod {
1830                name: MethodSpecifier::ReturnsResponse {
1831                    name: "test".to_string(),
1832                },
1833                response_port: None,
1834            },
1835            ..message
1836        };
1837        {
1838            let mut erased = ErasedUnbound::try_from_message(no_port_message.clone()).unwrap();
1839            let mut bindings = vec![];
1840            erased
1841                .visit_mut::<reference::UnboundPort>(|b| {
1842                    bindings.push(b.clone());
1843                    Ok(())
1844                })
1845                .unwrap();
1846            assert_eq!(bindings.len(), 0);
1847            let unbound = Unbound::try_from_message(no_port_message.clone()).unwrap();
1848            assert_eq!(no_port_message, unbound.bind().unwrap());
1849        }
1850    }
1851
1852    #[test]
1853    fn to_py_error_preserves_proc_creation_message() {
1854        // State<ProcState> w/ `state.is_none()`
1855        let state: resource::State<ProcState> = resource::State {
1856            id: ResourceId::instance(Label::new("my-proc").unwrap()),
1857            status: Status::Failed("boom".into()),
1858            state: None,
1859            generation: 0,
1860            timestamp: std::time::SystemTime::now(),
1861        };
1862
1863        // A ProcCreationError
1864        let mesh_agent: hyperactor::ActorRef<hyperactor_mesh::host_mesh::HostAgent> =
1865            hyperactor::ActorRef::attest(test_port_id("hello_0", "actor", 0).actor_addr());
1866        let expected_prefix = format!(
1867            "error creating proc (host rank 0) on host mesh agent {}",
1868            mesh_agent
1869        );
1870        let err = MeshError::ProcCreationError {
1871            host_rank: 0,
1872            mesh_agent,
1873            state: Box::new(state),
1874        };
1875
1876        let rust_msg = err.to_string();
1877        let pyerr = to_py_error(err);
1878
1879        pyo3::Python::initialize();
1880        monarch_with_gil_blocking(|py| {
1881            assert!(pyerr.get_type(py).is(PyValueError::type_object(py)));
1882            let py_msg = pyerr.value(py).to_string();
1883
1884            // 1) Bridge preserves the exact message
1885            assert_eq!(py_msg, rust_msg);
1886            // 2) Contains the structured state and failure status
1887            assert!(py_msg.contains(", state: "));
1888            assert!(py_msg.contains("\"status\":{\"Failed\":\"boom\"}"));
1889            // 3) Starts with the expected prefix
1890            assert!(py_msg.starts_with(&expected_prefix));
1891        });
1892    }
1893}