monarch_hyperactor/
actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::error::Error;
10use std::fmt::Debug;
11use std::future::pending;
12use std::ops::Deref;
13use std::sync::OnceLock;
14
15use async_trait::async_trait;
16use hyperactor::Actor;
17use hyperactor::ActorHandle;
18use hyperactor::Context;
19use hyperactor::Handler;
20use hyperactor::Instance;
21use hyperactor::OncePortHandle;
22use hyperactor::PortHandle;
23use hyperactor::Proc;
24use hyperactor::RemoteSpawn;
25use hyperactor::actor::ActorError;
26use hyperactor::actor::ActorErrorKind;
27use hyperactor::actor::ActorStatus;
28use hyperactor::actor::Signal;
29use hyperactor::context::Actor as ContextActor;
30use hyperactor::mailbox::MessageEnvelope;
31use hyperactor::mailbox::Undeliverable;
32use hyperactor::message::Bind;
33use hyperactor::message::Bindings;
34use hyperactor::message::IndexedErasedUnbound;
35use hyperactor::message::Unbind;
36use hyperactor::supervision::ActorSupervisionEvent;
37use hyperactor_config::Flattrs;
38use hyperactor_mesh::casting::update_undeliverable_envelope_for_casting;
39use hyperactor_mesh::comm::multicast::CAST_POINT;
40use hyperactor_mesh::comm::multicast::CastInfo;
41use hyperactor_mesh::supervision::MeshFailure;
42use hyperactor_mesh::transport::default_bind_spec;
43use hyperactor_mesh::value_mesh::ValueOverlay;
44use monarch_types::PickledPyObject;
45use monarch_types::SerializablePyErr;
46use monarch_types::py_global;
47use ndslice::Point;
48use ndslice::extent;
49use pyo3::IntoPyObjectExt;
50use pyo3::exceptions::PyBaseException;
51use pyo3::exceptions::PyRuntimeError;
52use pyo3::exceptions::PyValueError;
53use pyo3::prelude::*;
54use pyo3::types::PyDict;
55use pyo3::types::PyList;
56use pyo3::types::PyType;
57use serde::Deserialize;
58use serde::Serialize;
59use serde_multipart::Part;
60use tokio::sync::oneshot;
61use typeuri::Named;
62
63use crate::buffers::FrozenBuffer;
64use crate::config::ACTOR_QUEUE_DISPATCH;
65use crate::config::SHARED_ASYNCIO_RUNTIME;
66use crate::context::PyInstance;
67use crate::local_state_broker::BrokerId;
68use crate::local_state_broker::LocalStateBrokerMessage;
69use crate::mailbox::EitherPortRef;
70use crate::mailbox::PyMailbox;
71use crate::mailbox::PythonPortHandle;
72use crate::mailbox::PythonUndeliverableMessageEnvelope;
73use crate::metrics::ENDPOINT_ACTOR_COUNT;
74use crate::metrics::ENDPOINT_ACTOR_ERROR;
75use crate::metrics::ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM;
76use crate::metrics::ENDPOINT_ACTOR_PANIC;
77use crate::pickle::pickle_to_part;
78use crate::proc::PyActorId;
79use crate::pympsc;
80use crate::pytokio::PythonTask;
81use crate::runtime::get_proc_runtime;
82use crate::runtime::get_tokio_runtime;
83use crate::runtime::monarch_with_gil;
84use crate::runtime::monarch_with_gil_blocking;
85use crate::supervision::PyMeshFailure;
86
87py_global!(
88    unhandled_fault_hook_exception,
89    "monarch._src.actor.supervision",
90    "UnhandledFaultHookException"
91);
92
93#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
94#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
95pub enum UnflattenArg {
96    Mailbox,
97    PyObject,
98}
99
100#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
101#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
102pub enum MethodSpecifier {
103    /// Call method 'name', send its return value to the response port.
104    ReturnsResponse { name: String },
105    /// Call method 'name', send the response port as the first argument.
106    ExplicitPort { name: String },
107    /// Construct the object
108    Init {},
109}
110
111impl std::fmt::Display for MethodSpecifier {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "{}", self.name())
114    }
115}
116
117#[pymethods]
118impl MethodSpecifier {
119    #[getter(name)]
120    fn py_name(&self) -> &str {
121        self.name()
122    }
123}
124
125impl MethodSpecifier {
126    pub(crate) fn name(&self) -> &str {
127        match self {
128            MethodSpecifier::ReturnsResponse { name } => name,
129            MethodSpecifier::ExplicitPort { name } => name,
130            MethodSpecifier::Init {} => "__init__",
131        }
132    }
133}
134
135/// The payload of a single actor response, without rank information.
136///
137/// The rank is captured by the overlay's range key, so it is stripped
138/// from the value to enable RLE dedup: two ranks returning the same
139/// payload will have byte-identical values and can be coalesced into
140/// a single run.
141#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Eq)]
142pub enum PythonResponseMessage {
143    Result(serde_multipart::Part),
144    Exception(serde_multipart::Part),
145}
146
147wirevalue::register_type!(PythonResponseMessage);
148wirevalue::register_type!(ValueOverlay<PythonResponseMessage>);
149
150/// Newtype wrapper around [`ValueOverlay<PythonResponseMessage>`] needed
151/// because `PythonMessageKind` is a `#[pyclass]` enum, requiring all variant
152/// fields to implement PyO3 traits. `ValueOverlay` is defined in another crate
153/// and does not implement `PyClass`.
154#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
155#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
156pub struct AccumulatedResponses(ValueOverlay<PythonResponseMessage>);
157
158#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
159#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq)]
160pub enum PythonMessageKind {
161    CallMethod {
162        name: MethodSpecifier,
163        response_port: Option<EitherPortRef>,
164    },
165    Result {
166        rank: Option<usize>,
167    },
168    Exception {
169        rank: Option<usize>,
170    },
171    Uninit {},
172    CallMethodIndirect {
173        name: MethodSpecifier,
174        local_state_broker: (String, usize),
175        id: usize,
176        // specify whether the argument to unflatten the local mailbox,
177        // or the next argument of the local state.
178        unflatten_args: Vec<UnflattenArg>,
179    },
180    AccumulatedResponses(AccumulatedResponses),
181}
182wirevalue::register_type!(PythonMessageKind);
183
184impl Default for PythonMessageKind {
185    fn default() -> Self {
186        PythonMessageKind::Uninit {}
187    }
188}
189
190fn mailbox<'py, T: Actor>(py: Python<'py>, cx: &Context<'_, T>) -> Bound<'py, PyAny> {
191    let mailbox: PyMailbox = cx.mailbox_for_py().clone().into();
192    mailbox.into_bound_py_any(py).unwrap()
193}
194
195#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
196#[derive(Clone, Serialize, Deserialize, Named, Default, PartialEq)]
197pub struct PythonMessage {
198    pub kind: PythonMessageKind,
199    pub message: Part,
200}
201
202/// Extract the endpoint method name from a [`PythonMessage`].
203fn python_message_endpoint_name(msg: &PythonMessage) -> Option<String> {
204    match &msg.kind {
205        PythonMessageKind::CallMethod { name, .. }
206        | PythonMessageKind::CallMethodIndirect { name, .. } => Some(name.name().to_string()),
207        _ => None,
208    }
209}
210
211// We use manual `submit!` instead of `register_type!` because PythonMessage is a
212// struct, so the default `endpoint_name` (which delegates to `arm_unchecked`)
213// always returns None. The custom implementation inspects `PythonMessageKind` to
214// extract the method name. This registration handles direct (non-cast) dispatch.
215wirevalue::submit! {
216    wirevalue::TypeInfo {
217        typename: <PythonMessage as wirevalue::Named>::typename,
218        typehash: <PythonMessage as wirevalue::Named>::typehash,
219        typeid: <PythonMessage as wirevalue::Named>::typeid,
220        port: <PythonMessage as wirevalue::Named>::port,
221        dump: Some(<PythonMessage as wirevalue::NamedDumpable>::dump),
222        arm_unchecked: <PythonMessage as wirevalue::Named>::arm_unchecked,
223        endpoint_name: |ptr| {
224            // SAFETY: ptr points to a PythonMessage.
225            let msg = unsafe { &*(ptr as *const PythonMessage) };
226            python_message_endpoint_name(msg)
227        },
228    }
229}
230
231// Cast messages arrive as IndexedErasedUnbound<PythonMessage>, which wraps a
232// serialized PythonMessage. This type has no `register_type!` by default (it
233// shares ErasedUnbound's wire format), so we register it explicitly. The
234// endpoint_name deserializes the inner payload to read the method name. This
235// costs one extra deserialization per message, but the Part payload uses
236// zero-copy Bytes refcounting, and Python actor throughput is GIL-bounded,
237// so the serde overhead is negligible relative to Python-side processing.
238wirevalue::submit! {
239    wirevalue::TypeInfo {
240        typename: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typename,
241        typehash: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typehash,
242        typeid: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::typeid,
243        port: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::port,
244        dump: None,
245        arm_unchecked: <IndexedErasedUnbound<PythonMessage> as wirevalue::Named>::arm_unchecked,
246        endpoint_name: |ptr| {
247            // SAFETY: ptr points to an IndexedErasedUnbound<PythonMessage>.
248            let erased = unsafe { &*(ptr as *const IndexedErasedUnbound<PythonMessage>) };
249            erased
250                .inner_any()
251                .deserialized_unchecked::<PythonMessage>()
252                .ok()
253                .and_then(|msg| python_message_endpoint_name(&msg))
254        },
255    }
256}
257
258impl From<ValueOverlay<PythonResponseMessage>> for PythonMessage {
259    fn from(overlay: ValueOverlay<PythonResponseMessage>) -> Self {
260        PythonMessage {
261            kind: PythonMessageKind::AccumulatedResponses(AccumulatedResponses(overlay)),
262            message: Default::default(),
263        }
264    }
265}
266
267impl PythonMessage {
268    /// Consume this message and extract a `ValueOverlay<PythonResponseMessage>`.
269    ///
270    /// Handles both already-collected responses and leaf `Result`/`Exception`
271    /// messages by wrapping them in a single-run overlay.
272    pub fn into_overlay(self) -> anyhow::Result<ValueOverlay<PythonResponseMessage>> {
273        match self.kind {
274            PythonMessageKind::AccumulatedResponses(overlay) => Ok(overlay.0),
275            PythonMessageKind::Result { rank, .. } => {
276                let rank = rank.expect("accumulated response should have a rank");
277                let mut overlay = ValueOverlay::new();
278                overlay.push_run(rank..rank + 1, PythonResponseMessage::Result(self.message))?;
279                Ok(overlay)
280            }
281            PythonMessageKind::Exception { rank, .. } => {
282                let rank = rank.expect("accumulated exception should have a rank");
283                let mut overlay = ValueOverlay::new();
284                overlay.push_run(
285                    rank..rank + 1,
286                    PythonResponseMessage::Exception(self.message),
287                )?;
288                Ok(overlay)
289            }
290            other => {
291                anyhow::bail!(
292                    "unexpected message kind {:?} in collected responses reducer",
293                    other
294                );
295            }
296        }
297    }
298}
299
300struct ResolvedCallMethod {
301    method: MethodSpecifier,
302    bytes: FrozenBuffer,
303    local_state: Option<Py<PyAny>>,
304    /// Implements PortProtocol
305    /// Concretely either a Port, DroppingPort, or LocalPort
306    response_port: ResponsePort,
307}
308
309enum ResponsePort {
310    Dropping,
311    Port(Port),
312    Local(LocalPort),
313}
314
315impl ResponsePort {
316    fn into_py_any(self, py: Python<'_>) -> PyResult<Py<PyAny>> {
317        match self {
318            ResponsePort::Dropping => DroppingPort.into_py_any(py),
319            ResponsePort::Port(port) => port.into_py_any(py),
320            ResponsePort::Local(port) => port.into_py_any(py),
321        }
322    }
323}
324
325/// Message sent through the queue in queue-dispatch mode.
326/// Contains pre-resolved components ready for Python consumption.
327#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
328pub struct QueuedMessage {
329    #[pyo3(get)]
330    pub context: Py<crate::context::PyContext>,
331    #[pyo3(get)]
332    pub method: MethodSpecifier,
333    #[pyo3(get)]
334    pub bytes: FrozenBuffer,
335    #[pyo3(get)]
336    pub local_state: Py<PyAny>,
337    #[pyo3(get)]
338    pub response_port: Py<PyAny>,
339}
340
341impl PythonMessage {
342    pub fn new_from_buf(kind: PythonMessageKind, message: impl Into<Part>) -> Self {
343        Self {
344            kind,
345            message: message.into(),
346        }
347    }
348
349    pub fn into_rank(self, rank: usize) -> Self {
350        let rank = Some(rank);
351        match self.kind {
352            PythonMessageKind::Result { .. } => PythonMessage {
353                kind: PythonMessageKind::Result { rank },
354                message: self.message,
355            },
356            PythonMessageKind::Exception { .. } => PythonMessage {
357                kind: PythonMessageKind::Exception { rank },
358                message: self.message,
359            },
360            _ => panic!("PythonMessage is not a response but {:?}", self),
361        }
362    }
363    async fn resolve_indirect_call(
364        self,
365        cx: &Context<'_, PythonActor>,
366    ) -> anyhow::Result<ResolvedCallMethod> {
367        match self.kind {
368            PythonMessageKind::CallMethodIndirect {
369                name,
370                local_state_broker,
371                id,
372                unflatten_args,
373            } => {
374                let broker = BrokerId::new(local_state_broker).resolve(cx).await;
375                let (send, recv) = cx.open_once_port();
376                broker.send(cx, LocalStateBrokerMessage::Get(id, send))?;
377                let state = recv.recv().await?;
378                let mut state_it = state.state.into_iter();
379                monarch_with_gil(|py| {
380                    let mailbox = mailbox(py, cx);
381                    let local_state = Some(
382                        PyList::new(
383                            py,
384                            unflatten_args.into_iter().map(|x| -> Bound<'_, PyAny> {
385                                match x {
386                                    UnflattenArg::Mailbox => mailbox.clone(),
387                                    UnflattenArg::PyObject => {
388                                        state_it.next().unwrap().into_bound(py)
389                                    }
390                                }
391                            }),
392                        )
393                        .unwrap()
394                        .into(),
395                    );
396                    let response_port = ResponsePort::Local(LocalPort {
397                        instance: cx.into(),
398                        inner: Some(state.response_port),
399                    });
400                    Ok(ResolvedCallMethod {
401                        method: name,
402                        bytes: FrozenBuffer {
403                            inner: self.message.into_bytes(),
404                        },
405                        local_state,
406                        response_port,
407                    })
408                })
409                .await
410            }
411            PythonMessageKind::CallMethod {
412                name,
413                response_port,
414            } => {
415                let response_port = response_port.map_or(ResponsePort::Dropping, |port_ref| {
416                    let point = cx.cast_point();
417                    ResponsePort::Port(Port {
418                        port_ref,
419                        instance: cx.instance().clone_for_py(),
420                        rank: Some(point.rank()),
421                    })
422                });
423                Ok(ResolvedCallMethod {
424                    method: name,
425                    bytes: FrozenBuffer {
426                        inner: self.message.into_bytes(),
427                    },
428                    local_state: None,
429                    response_port,
430                })
431            }
432            _ => {
433                panic!("unexpected message kind {:?}", self.kind)
434            }
435        }
436    }
437}
438
439impl std::fmt::Debug for PythonMessage {
440    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441        f.debug_struct("PythonMessage")
442            .field("kind", &self.kind)
443            .field(
444                "message",
445                &wirevalue::HexFmt(&(*self.message.to_bytes())[..]).to_string(),
446            )
447            .finish()
448    }
449}
450
451impl Unbind for PythonMessage {
452    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
453        match &self.kind {
454            PythonMessageKind::CallMethod { response_port, .. } => response_port.unbind(bindings),
455            _ => Ok(()),
456        }
457    }
458}
459
460impl Bind for PythonMessage {
461    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
462        match &mut self.kind {
463            PythonMessageKind::CallMethod { response_port, .. } => response_port.bind(bindings),
464            _ => Ok(()),
465        }
466    }
467}
468
469#[pymethods]
470impl PythonMessage {
471    #[new]
472    #[pyo3(signature = (kind, message))]
473    pub fn new<'py>(kind: PythonMessageKind, message: PyRef<'py, FrozenBuffer>) -> PyResult<Self> {
474        Ok(PythonMessage::new_from_buf(kind, message.inner.clone()))
475    }
476
477    #[getter]
478    fn kind(&self) -> PythonMessageKind {
479        self.kind.clone()
480    }
481
482    #[getter]
483    fn message(&self) -> FrozenBuffer {
484        FrozenBuffer {
485            inner: self.message.to_bytes(),
486        }
487    }
488}
489
490#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
491pub(super) struct PythonActorHandle {
492    pub(super) inner: ActorHandle<PythonActor>,
493}
494
495#[pymethods]
496impl PythonActorHandle {
497    // TODO: do the pickling in rust
498    fn send(&self, instance: &PyInstance, message: &PythonMessage) -> PyResult<()> {
499        self.inner
500            .send(instance.deref(), message.clone())
501            .map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
502        Ok(())
503    }
504
505    fn bind(&self) -> PyActorId {
506        self.inner.bind::<PythonActor>().into_actor_id().into()
507    }
508}
509
510/// Dispatch mode for Python actors.
511#[derive(Debug)]
512pub enum PythonActorDispatchMode {
513    /// Direct dispatch: Rust acquires the GIL and calls Python handlers directly.
514    Direct,
515    /// Queue dispatch: Rust enqueues messages to a channel; Python dequeues and dispatches.
516    Queue {
517        /// Channel sender for enqueuing messages to Python.
518        sender: pympsc::Sender,
519        /// Channel receiver, taken during Actor::init to start the message loop.
520        receiver: Option<pympsc::PyReceiver>,
521    },
522}
523
524/// An actor for which message handlers are implemented in Python.
525#[derive(Debug)]
526#[hyperactor::export(
527    spawn = true,
528    handlers = [
529        PythonMessage { cast = true },
530        MeshFailure { cast = true },
531    ],
532)]
533pub struct PythonActor {
534    /// The Python object that we delegate message handling to.
535    actor: Py<PyAny>,
536    /// Stores a reference to the Python event loop to run Python coroutines on.
537    /// This is None when using single runtime mode, Some when using per-actor mode.
538    task_locals: Option<pyo3_async_runtimes::TaskLocals>,
539    /// Instance object that we keep across handle calls so that we can store
540    /// information from the Init (spawn rank, controller) and provide it to other calls.
541    instance: Option<Py<crate::context::PyInstance>>,
542    /// Dispatch mode for this actor.
543    dispatch_mode: PythonActorDispatchMode,
544    /// The location in the actor mesh at which this actor was spawned.
545    spawn_point: OnceLock<Option<Point>>,
546    /// Initial message to process during PythonActor::init.
547    init_message: Option<PythonMessage>,
548}
549
550impl PythonActor {
551    pub(crate) fn new(
552        actor_type: PickledPyObject,
553        init_message: Option<PythonMessage>,
554        spawn_point: Option<Point>,
555    ) -> Result<Self, anyhow::Error> {
556        let use_queue_dispatch = hyperactor_config::global::get(ACTOR_QUEUE_DISPATCH);
557
558        Ok(monarch_with_gil_blocking(
559            |py| -> Result<Self, SerializablePyErr> {
560                let unpickled = actor_type.unpickle(py)?;
561                let class_type: &Bound<'_, PyType> = unpickled.downcast()?;
562                let actor: Py<PyAny> = class_type.call0()?.into_py_any(py)?;
563
564                // Only create per-actor TaskLocals if not using shared runtime
565                let task_locals = (!hyperactor_config::global::get(SHARED_ASYNCIO_RUNTIME))
566                    .then(|| Python::detach(py, create_task_locals));
567
568                let dispatch_mode = if use_queue_dispatch {
569                    let (sender, receiver) = pympsc::channel().map_err(|e| {
570                        let py_err = PyRuntimeError::new_err(e.to_string());
571                        SerializablePyErr::from(py, &py_err)
572                    })?;
573                    PythonActorDispatchMode::Queue {
574                        sender,
575                        receiver: Some(receiver),
576                    }
577                } else {
578                    PythonActorDispatchMode::Direct
579                };
580
581                Ok(Self {
582                    actor,
583                    task_locals,
584                    instance: None,
585                    dispatch_mode,
586                    spawn_point: OnceLock::from(spawn_point),
587                    init_message,
588                })
589            },
590        )?)
591    }
592
593    /// Get the TaskLocals to use for this actor.
594    /// Returns either the shared TaskLocals or this actor's own TaskLocals based on configuration.
595    fn get_task_locals(&self, py: Python) -> &pyo3_async_runtimes::TaskLocals {
596        self.task_locals
597            .as_ref()
598            .unwrap_or_else(|| shared_task_locals(py))
599    }
600
601    /// Bootstrap the root client actor, creating a new proc for it.
602    /// This is the legacy entry point that creates its own proc.
603    pub(crate) fn bootstrap_client(py: Python<'_>) -> (&'static Instance<Self>, ActorHandle<Self>) {
604        static ROOT_CLIENT_INSTANCE: OnceLock<Instance<PythonActor>> = OnceLock::new();
605
606        let client_proc = Proc::direct(
607            default_bind_spec().binding_addr(),
608            "mesh_root_client_proc".into(),
609        )
610        .unwrap();
611
612        Self::bootstrap_client_inner(py, client_proc, &ROOT_CLIENT_INSTANCE)
613    }
614
615    /// Bootstrap the client proc, storing the root client instance in given static.
616    /// This is passed in because we require storage, as the instance is shared.
617    /// This can be simplified when we remove v0.
618    pub(crate) fn bootstrap_client_inner(
619        py: Python<'_>,
620        client_proc: Proc,
621        root_client_instance: &'static OnceLock<Instance<PythonActor>>,
622    ) -> (&'static Instance<Self>, ActorHandle<Self>) {
623        let actor_mesh_mod = py
624            .import("monarch._src.actor.actor_mesh")
625            .expect("import actor_mesh");
626        let root_client_class = actor_mesh_mod
627            .getattr("RootClientActor")
628            .expect("get RootClientActor");
629
630        let actor_type =
631            PickledPyObject::pickle(&actor_mesh_mod.getattr("_Actor").expect("get _Actor"))
632                .expect("pickle _Actor");
633
634        let init_frozen_buffer: FrozenBuffer = root_client_class
635            .call_method0("_pickled_init_args")
636            .expect("call RootClientActor._pickled_init_args")
637            .extract()
638            .expect("extract FrozenBuffer from _pickled_init_args");
639        let init_message = PythonMessage::new_from_buf(
640            PythonMessageKind::CallMethod {
641                name: MethodSpecifier::Init {},
642                response_port: None,
643            },
644            init_frozen_buffer,
645        );
646
647        let mut actor = PythonActor::new(
648            actor_type,
649            Some(init_message),
650            Some(extent!().point_of_rank(0).unwrap()),
651        )
652        .expect("create client PythonActor");
653
654        let ai = client_proc
655            .actor_instance(
656                root_client_class
657                    .getattr("name")
658                    .expect("get RootClientActor.name")
659                    .extract()
660                    .expect("extract RootClientActor.name"),
661            )
662            .expect("root instance create");
663
664        let handle = ai.handle;
665        let signal_rx = ai.signal;
666        let supervision_rx = ai.supervision;
667        let work_rx = ai.work;
668
669        root_client_instance
670            .set(ai.instance)
671            .map_err(|_| "already initialized root client instance")
672            .unwrap();
673        let instance = root_client_instance.get().unwrap();
674
675        // The root client PythonActor uses a custom run loop that
676        // bypasses Actor::init, so mark it as system explicitly
677        // (matching GlobalClientActor::fresh_instance).
678        instance.set_system();
679
680        // Bind to ensure the Signal and Undeliverable<MessageEnvelope> ports
681        // are bound.
682        let _client_ref = handle.bind::<PythonActor>();
683
684        get_tokio_runtime().spawn(async move {
685            // This is gross. Sorry.
686            actor.init(instance).await.unwrap();
687
688            let mut signal_rx = signal_rx;
689            let mut supervision_rx = supervision_rx;
690            let mut work_rx = work_rx;
691            let mut need_drain = false;
692            let mut err = 'messages: loop {
693                tokio::select! {
694                    work = work_rx.recv() => {
695                        let work = work.expect("inconsistent work queue state");
696                        if let Err(err) = work.handle(&mut actor, instance).await {
697                            // Check for UnhandledFaultHookException on the raw
698                            // anyhow::Error before wrapping in ActorErrorKind.
699                            // If __supervise__ already processed the supervision
700                            // event and the hook raised, don't re-handle it via
701                            // handle_supervision_event — that would call
702                            // __supervise__ a second time.
703                            let is_hook_exception = monarch_with_gil(|py| {
704                                err.downcast_ref::<pyo3::PyErr>()
705                                    .is_some_and(|pyerr| {
706                                        pyerr.is_instance(
707                                            py,
708                                            &unhandled_fault_hook_exception(py),
709                                        )
710                                    })
711                            }).await;
712
713                            let kind = ActorErrorKind::processing(err);
714                            let err = ActorError {
715                                actor_id: Box::new(instance.self_id().clone()),
716                                kind: Box::new(kind),
717                            };
718
719                            if is_hook_exception {
720                                break Some(err);
721                            }
722
723                            // Give the actor a chance to handle the error produced
724                            // in its own message handler. This is important because
725                            // we want Undeliverable<MessageEnvelope>, which returns
726                            // an Err typically, to create a supervision event and
727                            // call __supervise__.
728                            let supervision_event = actor_error_to_event(instance, &actor, err);
729                            // If the immediate supervision event isn't handled, continue with
730                            // exiting the loop.
731                            // Else, continue handling messages.
732                            if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
733                                for supervision_event in supervision_rx.drain() {
734                                    if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
735                                        break 'messages Some(err);
736                                    }
737                                }
738                                break Some(err);
739                            }
740                        }
741                    }
742                    signal = signal_rx.recv() => {
743                        let signal = signal.map_err(ActorError::from);
744                        tracing::info!(actor_id = %instance.self_id(), "client received signal {signal:?}");
745                        match signal {
746                            Ok(signal@(Signal::Stop(_) | Signal::DrainAndStop(_))) => {
747                                need_drain = matches!(signal, Signal::DrainAndStop(_));
748                                break None;
749                            },
750                            Ok(Signal::ChildStopped(_)) => {},
751                            Ok(Signal::Abort(reason)) => {
752                                break Some(ActorError { actor_id: Box::new(instance.self_id().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) })
753                            },
754                            Err(err) => break Some(err),
755                        }
756                    }
757                    Ok(supervision_event) = supervision_rx.recv() => {
758                        if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
759                            break Some(err);
760                        }
761                    }
762                };
763            };
764            if need_drain {
765                let mut n = 0;
766                while let Ok(work) = work_rx.try_recv() {
767                    if let Err(e) = work.handle(&mut actor, instance).await {
768                        err = Some(ActorError {
769                            actor_id: Box::new(instance.self_id().clone()),
770                            kind: Box::new(ActorErrorKind::processing(e)),
771                        });
772                        break;
773                    }
774                    n += 1;
775                }
776                tracing::debug!(actor_id = %instance.self_id(), "client drained {} messages before stopping", n);
777            }
778            if let Some(err) = err {
779                let event = actor_error_to_event(instance, &actor, err);
780                // The proc supervision handler will send to ProcAgent, which
781                // just records it in v1. We want to crash instead, as nothing will
782                // monitor the client ProcAgent for now.
783                tracing::error!(
784                    actor_id = %instance.self_id(),
785                    "could not propagate supervision event {} because it reached the global client: signaling KeyboardInterrupt to main thread",
786                    event,
787                );
788
789                // This is running in a background thread, and thus cannot run
790                // Py_FinalizeEx when it exits the process to properly shut down
791                // all python objects.
792                // We use _thread.interrupt_main to raise a KeyboardInterrupt
793                // to the main thread at some point in the future.
794                // There is no way to propagate the exception message, but it
795                // will at least run proper shutdown code as long as BaseException
796                // isn't caught.
797                monarch_with_gil_blocking(|py| {
798                    // Use _thread.interrupt_main to force the client to exit if it has an
799                    // unhandled supervision event.
800                    let thread_mod = py.import("_thread").expect("import _thread");
801                    let interrupt_main = thread_mod
802                        .getattr("interrupt_main")
803                        .expect("get interrupt_main");
804
805                    // Ignore any exception from calling interrupt_main
806                    if let Err(e) = interrupt_main.call0() {
807                        tracing::error!("unable to interrupt main, exiting the process instead: {:?}", e);
808                        eprintln!("unable to interrupt main, exiting the process with code 1 instead: {:?}", e);
809                        std::process::exit(1);
810                    }
811                });
812            } else {
813                tracing::info!(actor_id = %instance.self_id(), "client stopped");
814                instance.change_status(hyperactor::actor::ActorStatus::Stopped("client stopped".into()));
815            }
816        });
817
818        (root_client_instance.get().unwrap(), handle)
819    }
820}
821
822fn actor_error_to_event(
823    instance: &Instance<PythonActor>,
824    actor: &PythonActor,
825    err: ActorError,
826) -> ActorSupervisionEvent {
827    match *err.kind {
828        ActorErrorKind::UnhandledSupervisionEvent(event) => *event,
829        _ => {
830            let status = ActorStatus::generic_failure(err.kind.to_string());
831            ActorSupervisionEvent::new(
832                instance.self_id().clone(),
833                actor.display_name(),
834                status,
835                None,
836            )
837        }
838    }
839}
840
841pub(crate) fn root_client_actor(py: Python<'_>) -> &'static Instance<PythonActor> {
842    static ROOT_CLIENT_ACTOR: OnceLock<&'static Instance<PythonActor>> = OnceLock::new();
843
844    // Release the GIL before waiting on ROOT_CLIENT_ACTOR, because PythonActor::bootstrap_client
845    // may release/reacquire the GIL; if thread 0 holds the GIL blocking on ROOT_CLIENT_ACTOR.get_or_init
846    // while thread 1 blocks on acquiring the GIL inside PythonActor::bootstrap_client, we get
847    // a deadlock.
848    py.detach(|| {
849        ROOT_CLIENT_ACTOR.get_or_init(|| {
850            monarch_with_gil_blocking(|py| {
851                let (client, _handle) = PythonActor::bootstrap_client(py);
852                client
853            })
854        })
855    })
856}
857
858#[async_trait]
859impl Actor for PythonActor {
860    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
861        if let PythonActorDispatchMode::Queue { receiver, .. } = &mut self.dispatch_mode {
862            let receiver = receiver.take().unwrap();
863
864            // Create an error port that converts PythonMessage to an abort signal.
865            // This allows Python to send errors that trigger actor supervision.
866            let error_port: hyperactor::PortHandle<PythonMessage> =
867                this.port::<Signal>().contramap(|msg: PythonMessage| {
868                    monarch_with_gil_blocking(|py| {
869                        let err = match msg.kind {
870                            PythonMessageKind::Exception { .. } => {
871                                // Deserialize the error from the message
872                                let cloudpickle = py.import("cloudpickle").unwrap();
873                                let err_obj = cloudpickle
874                                    .call_method1("loads", (msg.message.to_bytes().as_ref(),))
875                                    .unwrap();
876                                let py_err = pyo3::PyErr::from_value(err_obj);
877                                SerializablePyErr::from(py, &py_err)
878                            }
879                            _ => {
880                                let py_err = PyRuntimeError::new_err(format!(
881                                    "expected Exception, got {:?}",
882                                    msg.kind
883                                ));
884                                SerializablePyErr::from(py, &py_err)
885                            }
886                        };
887                        Signal::Abort(err.to_string())
888                    })
889                });
890
891            let error_port_handle = PythonPortHandle::new(error_port);
892
893            monarch_with_gil(|py| {
894                let tl = self
895                    .task_locals
896                    .as_ref()
897                    .unwrap_or_else(|| shared_task_locals(py));
898                let awaitable = self.actor.call_method(
899                    py,
900                    "_dispatch_loop",
901                    (receiver, error_port_handle),
902                    None,
903                )?;
904                let future =
905                    pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))?;
906                tokio::spawn(async move {
907                    if let Err(e) = future.await {
908                        tracing::error!("message loop error: {}", e);
909                    }
910                });
911                Ok::<_, anyhow::Error>(())
912            })
913            .await?;
914        }
915
916        if let Some(init_message) = self.init_message.take() {
917            let spawn_point = self.spawn_point.get().unwrap().as_ref().expect("PythonActor should never be spawned with init_message unless spawn_point also specified").clone();
918            let mut headers = Flattrs::new();
919            headers.set(CAST_POINT, spawn_point);
920            let cx = Context::new(this, headers);
921            <Self as Handler<PythonMessage>>::handle(self, &cx, init_message).await?;
922        }
923
924        Ok(())
925    }
926
927    async fn cleanup(
928        &mut self,
929        this: &Instance<Self>,
930        err: Option<&ActorError>,
931    ) -> anyhow::Result<()> {
932        // Calls the "__cleanup__" method on the python instance to allow the actor
933        // to control its own cleanup.
934        // No headers because this isn't in the context of a message.
935        let cx = Context::new(this, Flattrs::new());
936        // Turn the ActorError into a representation of the error. We may not
937        // have an original exception object or traceback, so we just pass in
938        // the message.
939        let err_as_str = err.map(|e| e.to_string());
940        let future = monarch_with_gil(|py| {
941            let py_cx = match &self.instance {
942                Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
943                None => {
944                    let py_instance: crate::context::PyInstance = this.into();
945                    crate::context::PyContext::new(
946                        &cx,
947                        py_instance
948                            .into_py_any(py)?
949                            .downcast_bound(py)
950                            .map_err(PyErr::from)?
951                            .clone()
952                            .unbind(),
953                    )
954                }
955            }
956            .into_bound_py_any(py)?;
957            let actor = self.actor.bind(py);
958            // Some tests don't use the Actor base class, so add this check
959            // to be defensive.
960            match actor.hasattr("__cleanup__") {
961                Ok(false) | Err(_) => {
962                    // No cleanup found, default to returning None
963                    return Ok(None);
964                }
965                _ => {}
966            }
967            let awaitable = actor
968                .call_method("__cleanup__", (&py_cx, err_as_str), None)
969                .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?;
970            if awaitable.is_none() {
971                Ok(None)
972            } else {
973                pyo3_async_runtimes::into_future_with_locals(self.get_task_locals(py), awaitable)
974                    .map(Some)
975                    .map_err(anyhow::Error::from)
976            }
977        })
978        .await?;
979        if let Some(future) = future {
980            future.await.map_err(anyhow::Error::from)?;
981        }
982        Ok(())
983    }
984
985    fn display_name(&self) -> Option<String> {
986        self.instance.as_ref().and_then(|instance| {
987            monarch_with_gil_blocking(|py| instance.bind(py).str().ok().map(|s| s.to_string()))
988        })
989    }
990
991    async fn handle_undeliverable_message(
992        &mut self,
993        ins: &Instance<Self>,
994        mut envelope: Undeliverable<MessageEnvelope>,
995    ) -> Result<(), anyhow::Error> {
996        if envelope.0.sender() != ins.self_id() {
997            // This can happen if the sender is comm. Update the envelope.
998            envelope = update_undeliverable_envelope_for_casting(envelope);
999        }
1000        assert_eq!(
1001            envelope.0.sender(),
1002            ins.self_id(),
1003            "undeliverable message was returned to the wrong actor. \
1004            Return address = {}, src actor = {}, dest actor port = {}, message type = {}, envelope headers = {}",
1005            envelope.0.sender(),
1006            ins.self_id(),
1007            envelope.0.dest(),
1008            envelope.0.data().typename().unwrap_or("unknown"),
1009            envelope.0.headers()
1010        );
1011
1012        let cx = Context::new(ins, envelope.0.headers().clone());
1013
1014        let (envelope, handled) = monarch_with_gil(|py| {
1015            let py_cx = match &self.instance {
1016                Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
1017                None => {
1018                    let py_instance: crate::context::PyInstance = ins.into();
1019                    crate::context::PyContext::new(
1020                        &cx,
1021                        py_instance
1022                            .into_py_any(py)?
1023                            .downcast_bound(py)
1024                            .map_err(PyErr::from)?
1025                            .clone()
1026                            .unbind(),
1027                    )
1028                }
1029            }
1030            .into_bound_py_any(py)?;
1031            let py_envelope = PythonUndeliverableMessageEnvelope {
1032                inner: Some(envelope),
1033            }
1034            .into_bound_py_any(py)?;
1035            let handled = self
1036                .actor
1037                .call_method(
1038                    py,
1039                    "_handle_undeliverable_message",
1040                    (&py_cx, &py_envelope),
1041                    None,
1042                )
1043                .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?
1044                .extract::<bool>(py)?;
1045            Ok::<_, anyhow::Error>((
1046                py_envelope
1047                    .downcast::<PythonUndeliverableMessageEnvelope>()
1048                    .map_err(PyErr::from)?
1049                    .try_borrow_mut()
1050                    .map_err(PyErr::from)?
1051                    .take()?,
1052                handled,
1053            ))
1054        })
1055        .await?;
1056
1057        if !handled {
1058            hyperactor::actor::handle_undeliverable_message(ins, envelope)
1059        } else {
1060            Ok(())
1061        }
1062    }
1063
1064    async fn handle_supervision_event(
1065        &mut self,
1066        this: &Instance<Self>,
1067        event: &ActorSupervisionEvent,
1068    ) -> Result<bool, anyhow::Error> {
1069        let cx = Context::new(this, Flattrs::new());
1070        self.handle(
1071            &cx,
1072            MeshFailure {
1073                actor_mesh_name: None,
1074                event: event.clone(),
1075                crashed_ranks: vec![],
1076            },
1077        )
1078        .await
1079        .map(|_| true)
1080    }
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize, Named)]
1084pub struct PythonActorParams {
1085    // The pickled actor class to instantiate.
1086    actor_type: PickledPyObject,
1087    // Python message to process as part of the actor initialization.
1088    init_message: Option<PythonMessage>,
1089}
1090
1091impl PythonActorParams {
1092    pub(crate) fn new(actor_type: PickledPyObject, init_message: Option<PythonMessage>) -> Self {
1093        Self {
1094            actor_type,
1095            init_message,
1096        }
1097    }
1098}
1099
1100#[async_trait]
1101impl RemoteSpawn for PythonActor {
1102    type Params = PythonActorParams;
1103
1104    async fn new(
1105        PythonActorParams {
1106            actor_type,
1107            init_message,
1108        }: PythonActorParams,
1109        environment: Flattrs,
1110    ) -> Result<Self, anyhow::Error> {
1111        let spawn_point = environment.get(CAST_POINT);
1112        Self::new(actor_type, init_message, spawn_point)
1113    }
1114}
1115
1116/// Create a new TaskLocals with its own asyncio event loop in a dedicated thread.
1117fn create_task_locals() -> pyo3_async_runtimes::TaskLocals {
1118    monarch_with_gil_blocking(|py| {
1119        let asyncio = Python::import(py, "asyncio").unwrap();
1120        let event_loop = asyncio.call_method0("new_event_loop").unwrap();
1121        let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
1122            .copy_context(py)
1123            .unwrap();
1124
1125        let kwargs = PyDict::new(py);
1126        let target = event_loop.getattr("run_forever").unwrap();
1127        kwargs.set_item("target", target).unwrap();
1128        // Need to make this a daemon thread, otherwise shutdown will hang.
1129        kwargs.set_item("daemon", true).unwrap();
1130        let thread = py
1131            .import("threading")
1132            .unwrap()
1133            .call_method("Thread", (), Some(&kwargs))
1134            .unwrap();
1135        thread.call_method0("start").unwrap();
1136        task_locals
1137    })
1138}
1139
1140/// Get the shared TaskLocals, creating it if necessary.
1141fn shared_task_locals(py: Python) -> &'static pyo3_async_runtimes::TaskLocals {
1142    static SHARED_TASK_LOCALS: OnceLock<pyo3_async_runtimes::TaskLocals> = OnceLock::new();
1143    Python::detach(py, || SHARED_TASK_LOCALS.get_or_init(create_task_locals))
1144}
1145
1146// [Panics in async endpoints]
1147// This class exists to solve a deadlock when an async endpoint calls into some
1148// Rust code that panics.
1149//
1150// When an async endpoint is invoked and calls into Rust, the following sequence happens:
1151//
1152// hyperactor message -> PythonActor::handle() -> call _Actor.handle() in Python
1153//   -> convert the resulting coroutine into a Rust future, but scheduled on
1154//      the Python asyncio event loop (`into_future_with_locals`)
1155//   -> set a callback on Python asyncio loop to ping a channel that fulfills
1156//      the Rust future when the Python coroutine has finished. ('PyTaskCompleter`)
1157//
1158// This works fine for normal results and Python exceptions: we will take the
1159// result of the callback and send it through the channel, where it will be
1160// returned to the `await`er of the Rust future.
1161//
1162// This DOESN'T work for panics. The behavior of a panic in pyo3-bound code is
1163// that it will get caught by pyo3 and re-thrown to Python as a PanicException.
1164// And if that PanicException ever makes it back to Rust, it will get unwound
1165// instead of passed around as a normal PyErr type.
1166//
1167// So:
1168//   - Endpoint panics.
1169//   - This panic is captured as a PanicException in Python and
1170//     stored as the result of the Python asyncio task.
1171//   - When the callback in `PyTaskCompleter` queries the status of the task to
1172//     pass it back to the Rust awaiter, instead of getting a Result type, it
1173//     just starts resumes unwinding the PanicException
1174//   - This triggers a deadlock, because the whole task dies without ever
1175//     pinging the response channel, and the Rust awaiter will never complete.
1176//
1177// We work around this by passing a side-channel to our Python task so that it,
1178// in Python, can catch the PanicException and notify the Rust awaiter manually.
1179// In this way we can guarantee that the awaiter will complete even if the
1180// `PyTaskCompleter` callback explodes.
1181#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1182struct PanicFlag {
1183    sender: Option<tokio::sync::oneshot::Sender<Py<PyAny>>>,
1184}
1185
1186#[pymethods]
1187impl PanicFlag {
1188    fn signal_panic(&mut self, ex: Py<PyAny>) {
1189        self.sender.take().unwrap().send(ex).unwrap();
1190    }
1191}
1192
1193#[async_trait]
1194impl Handler<PythonMessage> for PythonActor {
1195    #[tracing::instrument(level = "debug", skip_all)]
1196    async fn handle(
1197        &mut self,
1198        cx: &Context<PythonActor>,
1199        message: PythonMessage,
1200    ) -> anyhow::Result<()> {
1201        match &self.dispatch_mode {
1202            PythonActorDispatchMode::Direct => self.handle_direct(cx, message).await,
1203            PythonActorDispatchMode::Queue { sender, .. } => {
1204                let sender = sender.clone();
1205                self.handle_queue(cx, sender, message).await
1206            }
1207        }
1208    }
1209}
1210
1211impl PythonActor {
1212    /// Handle a message using direct dispatch (current behavior).
1213    async fn handle_direct(
1214        &mut self,
1215        cx: &Context<'_, PythonActor>,
1216        message: PythonMessage,
1217    ) -> anyhow::Result<()> {
1218        let resolved = message.resolve_indirect_call(cx).await?;
1219        let endpoint = resolved.method.to_string();
1220
1221        // Create a channel for signaling panics in async endpoints.
1222        // See [Panics in async endpoints].
1223        let (sender, receiver) = oneshot::channel();
1224
1225        let future = monarch_with_gil(|py| -> Result<_, SerializablePyErr> {
1226            let inst = self.instance.get_or_insert_with(|| {
1227                let inst: crate::context::PyInstance = cx.into();
1228                inst.into_pyobject(py).unwrap().into()
1229            });
1230
1231            let awaitable = self.actor.call_method(
1232                py,
1233                "handle",
1234                (
1235                    crate::context::PyContext::new(cx, inst.clone_ref(py)),
1236                    resolved.method,
1237                    resolved.bytes,
1238                    PanicFlag {
1239                        sender: Some(sender),
1240                    },
1241                    resolved
1242                        .local_state
1243                        .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1244                    resolved.response_port.into_py_any(py)?,
1245                ),
1246                None,
1247            )?;
1248
1249            let tl = self
1250                .task_locals
1251                .as_ref()
1252                .unwrap_or_else(|| shared_task_locals(py));
1253
1254            pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))
1255                .map_err(|err| err.into())
1256        })
1257        .await?;
1258
1259        // Spawn a child actor to await the Python handler method.
1260        tokio::spawn(handle_async_endpoint_panic(
1261            cx.port(),
1262            PythonTask::new(future)?,
1263            receiver,
1264            cx.self_id().to_string(),
1265            endpoint,
1266        ));
1267        Ok(())
1268    }
1269
1270    /// Handle a message using queue dispatch.
1271    /// Resolves the message on the Rust side and enqueues it for Python to process.
1272    async fn handle_queue(
1273        &mut self,
1274        cx: &Context<'_, PythonActor>,
1275        sender: pympsc::Sender,
1276        message: PythonMessage,
1277    ) -> anyhow::Result<()> {
1278        let resolved = message.resolve_indirect_call(cx).await?;
1279
1280        let queued_msg = monarch_with_gil(|py| -> anyhow::Result<QueuedMessage> {
1281            let inst = self.instance.get_or_insert_with(|| {
1282                let inst: crate::context::PyInstance = cx.into();
1283                inst.into_pyobject(py).unwrap().into()
1284            });
1285
1286            let py_context = crate::context::PyContext::new(cx, inst.clone_ref(py));
1287            let py_context_obj = Py::new(py, py_context)?;
1288
1289            Ok(QueuedMessage {
1290                context: py_context_obj,
1291                method: resolved.method,
1292                bytes: resolved.bytes,
1293                local_state: resolved
1294                    .local_state
1295                    .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1296                response_port: resolved.response_port.into_py_any(py)?,
1297            })
1298        })
1299        .await?;
1300
1301        sender
1302            .send(queued_msg)
1303            .map_err(|_| anyhow::anyhow!("failed to send message to queue"))?;
1304
1305        Ok(())
1306    }
1307}
1308
1309#[async_trait]
1310impl Handler<MeshFailure> for PythonActor {
1311    async fn handle(&mut self, cx: &Context<Self>, message: MeshFailure) -> anyhow::Result<()> {
1312        // If the message is not about a failure, don't call __supervise__.
1313        // This includes messages like "stop", because those are not errors that
1314        // need to be propagated.
1315        if !message.event.actor_status.is_failed() {
1316            tracing::info!(
1317                "ignoring non-failure supervision event from child: {}",
1318                message
1319            );
1320            return Ok(());
1321        }
1322        // TODO: Consider routing supervision messages through the queue for Queue mode.
1323        // For now, supervision is always handled directly since it requires immediate response.
1324
1325        monarch_with_gil(|py| {
1326            let inst = self.instance.get_or_insert_with(|| {
1327                let inst: crate::context::PyInstance = cx.into();
1328                inst.into_pyobject(py).unwrap().into()
1329            });
1330            // Compute display_name here since we can't call self.display_name() due to borrow.
1331            let display_name: Option<String> = inst.bind(py).str().ok().map(|s| s.to_string());
1332            let actor_bound = self.actor.bind(py);
1333            // The _Actor class always has a __supervise__ method, so this should
1334            // never happen.
1335            if !actor_bound.hasattr("__supervise__")? {
1336                return Err(anyhow::anyhow!(
1337                    "no __supervise__ method on {:?}",
1338                    actor_bound
1339                ));
1340            }
1341            let result = actor_bound.call_method(
1342                "__supervise__",
1343                (
1344                    crate::context::PyContext::new(cx, inst.clone_ref(py)),
1345                    PyMeshFailure::from(message.clone()),
1346                ),
1347                None,
1348            );
1349            match result {
1350                Ok(s) => {
1351                    if s.is_truthy()? {
1352                        // If the return value is truthy, then the exception was handled
1353                        // and doesn't need to be propagated.
1354                        // TODO: We also don't want to deliver multiple supervision
1355                        // events from the same mesh if an earlier one is handled.
1356                        tracing::info!(
1357                            name = "ActorMeshStatus",
1358                            status = "SupervisionError::Handled",
1359                            // only care about the event sender when the message is handled
1360                            actor_name = message.actor_mesh_name,
1361                            event = %message.event,
1362                            "__supervise__ on {} handled a supervision event, not reporting any further",
1363                            cx.self_id(),
1364                        );
1365                        Ok(())
1366                    } else {
1367                        // For a falsey return value, we propagate the supervision event
1368                        // to the next owning actor. We do this by returning a new
1369                        // error. This will not set the causal chain for ActorSupervisionEvent,
1370                        // so make sure to include the original event in the error message
1371                        // to provide context.
1372
1373                        // False -- we propagate the event onward, but update it with the fact that
1374                        // this actor is now the event creator.
1375                        for (actor_name, status) in [
1376                            (
1377                                message
1378                                    .actor_mesh_name
1379                                    .as_deref()
1380                                    .unwrap_or_else(|| message.event.actor_id.name()),
1381                                "SupervisionError::Unhandled",
1382                            ),
1383                            (cx.self_id().name(), "UnhandledSupervisionEvent"),
1384                        ] {
1385                            tracing::info!(
1386                                name = "ActorMeshStatus",
1387                                status,
1388                                actor_name,
1389                                event = %message.event,
1390                                "__supervise__ on {} did not handle a supervision event, reporting to the next next owner",
1391                                cx.self_id(),
1392                            );
1393                        }
1394                        let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1395                            ActorSupervisionEvent::new(
1396                                cx.self_id().clone(),
1397                                display_name.clone(),
1398                                ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(
1399                                    Box::new(message.event.clone()),
1400                                )),
1401                                None,
1402                            ),
1403                        ));
1404                        Err(anyhow::Error::new(err))
1405                    }
1406                }
1407                Err(err) => {
1408                    // If __supervise__ raised UnhandledFaultHookException,
1409                    // return the PyErr directly without wrapping in
1410                    // ActorErrorKind. The custom run loop detects this by
1411                    // downcasting the anyhow::Error to PyErr.
1412                    if err.is_instance(
1413                        py,
1414                        &unhandled_fault_hook_exception(py),
1415                    ) {
1416                        return Err(err.into());
1417                    }
1418
1419                    // Any other exception will supersede in the propagation chain,
1420                    // and will become its own supervision failure.
1421                    // Include the event it was handling in the error message.
1422
1423                    // Add to caused_by chain.
1424                    for (actor_name, status) in [
1425                        (
1426                            message
1427                                .actor_mesh_name
1428                                .as_deref()
1429                                .unwrap_or_else(|| message.event.actor_id.name()),
1430                            "SupervisionError::__supervise__::exception",
1431                        ),
1432                        (cx.self_id().name(), "UnhandledSupervisionEvent"),
1433                    ] {
1434                        tracing::info!(
1435                            name = "ActorMeshStatus",
1436                            status,
1437                            actor_name,
1438                            event = %message.event,
1439                            "__supervise__ on {} threw an exception",
1440                            cx.self_id(),
1441                        );
1442                    }
1443                    let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1444                        ActorSupervisionEvent::new(
1445                            cx.self_id().clone(),
1446                            display_name,
1447                            ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
1448                                err.to_string(),
1449                                Box::new(message.event.clone()),
1450                            )),
1451                            None,
1452                        ),
1453                    ));
1454                    Err(anyhow::Error::new(err))
1455                }
1456            }
1457        })
1458        .await
1459    }
1460}
1461
1462async fn handle_async_endpoint_panic(
1463    panic_sender: PortHandle<Signal>,
1464    task: PythonTask,
1465    side_channel: oneshot::Receiver<Py<PyAny>>,
1466    actor_id: String,
1467    endpoint: String,
1468) {
1469    // Create attributes for metrics with actor_id and endpoint
1470    let attributes =
1471        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id, "endpoint" => endpoint);
1472
1473    // Record the start time for latency measurement
1474    let start_time = std::time::Instant::now();
1475
1476    // Increment throughput counter
1477    ENDPOINT_ACTOR_COUNT.add(1, attributes);
1478
1479    let err_or_never = async {
1480        // The side channel will resolve with a value if a panic occured during
1481        // processing of the async endpoint, see [Panics in async endpoints].
1482        match side_channel.await {
1483            Ok(value) => {
1484                monarch_with_gil(|py| -> Option<SerializablePyErr> {
1485                    let err: PyErr = value
1486                        .downcast_bound::<PyBaseException>(py)
1487                        .unwrap()
1488                        .clone()
1489                        .into();
1490                    ENDPOINT_ACTOR_PANIC.add(1, attributes);
1491                    Some(err.into())
1492                })
1493                .await
1494            }
1495            // An Err means that the sender has been dropped without sending.
1496            // That's okay, it just means that the Python task has completed.
1497            // In that case, just never resolve this future. We expect the other
1498            // branch of the select to finish eventually.
1499            Err(_) => pending().await,
1500        }
1501    };
1502    let future = task.take();
1503    if let Some(panic) = tokio::select! {
1504        result = future => {
1505            match result {
1506                Ok(_) => None,
1507                Err(e) => Some(e.into()),
1508            }
1509        },
1510        result = err_or_never => {
1511            result
1512        }
1513    } {
1514        // Record error and panic metrics
1515        ENDPOINT_ACTOR_ERROR.add(1, attributes);
1516        static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
1517        let client = &CLIENT
1518            .get_or_init(|| {
1519                get_proc_runtime()
1520                    .instance("async_endpoint_handler")
1521                    .unwrap()
1522            })
1523            .0;
1524        panic_sender
1525            .send(&client, Signal::Abort(panic.to_string()))
1526            .expect("Unable to send panic message");
1527    }
1528
1529    // Record latency in microseconds
1530    let elapsed_micros = start_time.elapsed().as_micros() as f64;
1531    ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM.record(elapsed_micros, attributes);
1532}
1533
1534#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1535struct LocalPort {
1536    instance: PyInstance,
1537    inner: Option<OncePortHandle<Result<Py<PyAny>, Py<PyAny>>>>,
1538}
1539
1540impl Debug for LocalPort {
1541    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1542        f.debug_struct("LocalPort")
1543            .field("inner", &self.inner)
1544            .finish()
1545    }
1546}
1547
1548pub(crate) fn to_py_error<T>(e: T) -> PyErr
1549where
1550    T: Error,
1551{
1552    PyErr::new::<PyValueError, _>(e.to_string())
1553}
1554
1555#[pymethods]
1556impl LocalPort {
1557    fn send(&mut self, obj: Py<PyAny>) -> PyResult<()> {
1558        let port = self.inner.take().expect("use local port once");
1559        port.send(self.instance.deref(), Ok(obj))
1560            .map_err(to_py_error)
1561    }
1562    fn exception(&mut self, e: Py<PyAny>) -> PyResult<()> {
1563        let port = self.inner.take().expect("use local port once");
1564        port.send(self.instance.deref(), Err(e))
1565            .map_err(to_py_error)
1566    }
1567}
1568
1569/// A port that drops all messages sent to it.
1570/// Used when there is no response port for a message.
1571/// Any exceptions sent to it are re-raised in the current actor.
1572#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1573#[derive(Debug)]
1574pub struct DroppingPort;
1575
1576#[pymethods]
1577impl DroppingPort {
1578    #[new]
1579    fn new() -> Self {
1580        DroppingPort
1581    }
1582
1583    fn send(&self, _obj: Py<PyAny>) -> PyResult<()> {
1584        Ok(())
1585    }
1586
1587    fn exception(&self, e: Bound<'_, PyAny>) -> PyResult<()> {
1588        // Unwrap ActorError to get the inner exception, matching Python behavior.
1589        let exc = if let Ok(inner) = e.getattr("exception") {
1590            inner
1591        } else {
1592            e
1593        };
1594        Err(PyErr::from_value(exc))
1595    }
1596
1597    #[getter]
1598    fn get_return_undeliverable(&self) -> bool {
1599        true
1600    }
1601
1602    #[setter]
1603    fn set_return_undeliverable(&self, _value: bool) {}
1604}
1605
1606/// A port that sends messages to a remote receiver.
1607/// Wraps an EitherPortRef with the actor instance needed for sending.
1608#[pyclass(module = "monarch._src.actor.actor_mesh")]
1609pub struct Port {
1610    port_ref: EitherPortRef,
1611    instance: Instance<PythonActor>,
1612    rank: Option<usize>,
1613}
1614
1615#[pymethods]
1616impl Port {
1617    #[new]
1618    fn new(
1619        port_ref: EitherPortRef,
1620        instance: &crate::context::PyInstance,
1621        rank: Option<usize>,
1622    ) -> Self {
1623        Self {
1624            port_ref,
1625            instance: instance.clone().into_instance(),
1626            rank,
1627        }
1628    }
1629
1630    #[getter("_port_ref")]
1631    fn port_ref_py(&self) -> EitherPortRef {
1632        self.port_ref.clone()
1633    }
1634
1635    #[getter("_rank")]
1636    fn rank_py(&self) -> Option<usize> {
1637        self.rank
1638    }
1639
1640    #[getter]
1641    fn get_return_undeliverable(&self) -> bool {
1642        self.port_ref.get_return_undeliverable()
1643    }
1644
1645    #[setter]
1646    fn set_return_undeliverable(&mut self, value: bool) {
1647        self.port_ref.set_return_undeliverable(value);
1648    }
1649
1650    fn send(&mut self, py: Python<'_>, obj: Py<PyAny>) -> PyResult<()> {
1651        let message = PythonMessage::new_from_buf(
1652            PythonMessageKind::Result { rank: self.rank },
1653            pickle_to_part(py, &obj)?,
1654        );
1655
1656        self.port_ref
1657            .send(&self.instance, message)
1658            .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1659    }
1660
1661    fn exception(&mut self, py: Python<'_>, e: Py<PyAny>) -> PyResult<()> {
1662        let message = PythonMessage::new_from_buf(
1663            PythonMessageKind::Exception { rank: self.rank },
1664            pickle_to_part(py, &e)?,
1665        );
1666
1667        self.port_ref
1668            .send(&self.instance, message)
1669            .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1670    }
1671}
1672
1673pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
1674    hyperactor_mod.add_class::<PythonActorHandle>()?;
1675    hyperactor_mod.add_class::<PythonMessage>()?;
1676    hyperactor_mod.add_class::<PythonMessageKind>()?;
1677    hyperactor_mod.add_class::<MethodSpecifier>()?;
1678    hyperactor_mod.add_class::<UnflattenArg>()?;
1679    hyperactor_mod.add_class::<PanicFlag>()?;
1680    hyperactor_mod.add_class::<QueuedMessage>()?;
1681    hyperactor_mod.add_class::<DroppingPort>()?;
1682    hyperactor_mod.add_class::<Port>()?;
1683    Ok(())
1684}
1685
1686#[cfg(test)]
1687mod tests {
1688    use hyperactor::accum::ReducerSpec;
1689    use hyperactor::accum::StreamingReducerOpts;
1690    use hyperactor::message::ErasedUnbound;
1691    use hyperactor::message::Unbound;
1692    use hyperactor::reference;
1693    use hyperactor::testing::ids::test_port_id;
1694    use hyperactor_mesh::Error as MeshError;
1695    use hyperactor_mesh::Name;
1696    use hyperactor_mesh::host_mesh::host_agent::ProcState;
1697    use hyperactor_mesh::resource::Status;
1698    use hyperactor_mesh::resource::{self};
1699    use pyo3::PyTypeInfo;
1700
1701    use super::*;
1702    use crate::actor::to_py_error;
1703
1704    #[test]
1705    fn test_python_message_bind_unbind() {
1706        let reducer_spec = ReducerSpec {
1707            typehash: 123,
1708            builder_params: Some(wirevalue::Any::serialize(&"abcdefg12345".to_string()).unwrap()),
1709        };
1710        let port_ref = reference::PortRef::<PythonMessage>::attest_reducible(
1711            test_port_id("world_0", "client", 123),
1712            Some(reducer_spec),
1713            StreamingReducerOpts::default(),
1714        );
1715        let message = PythonMessage {
1716            kind: PythonMessageKind::CallMethod {
1717                name: MethodSpecifier::ReturnsResponse {
1718                    name: "test".to_string(),
1719                },
1720                response_port: Some(EitherPortRef::Unbounded(port_ref.clone().into())),
1721            },
1722            message: Part::from(vec![1, 2, 3]),
1723        };
1724        {
1725            let mut erased = ErasedUnbound::try_from_message(message.clone()).unwrap();
1726            let mut bindings = vec![];
1727            erased
1728                .visit_mut::<reference::UnboundPort>(|b| {
1729                    bindings.push(b.clone());
1730                    Ok(())
1731                })
1732                .unwrap();
1733            assert_eq!(bindings, vec![reference::UnboundPort::from(&port_ref)]);
1734            let unbound = Unbound::try_from_message(message.clone()).unwrap();
1735            assert_eq!(message, unbound.bind().unwrap());
1736        }
1737
1738        let no_port_message = PythonMessage {
1739            kind: PythonMessageKind::CallMethod {
1740                name: MethodSpecifier::ReturnsResponse {
1741                    name: "test".to_string(),
1742                },
1743                response_port: None,
1744            },
1745            ..message
1746        };
1747        {
1748            let mut erased = ErasedUnbound::try_from_message(no_port_message.clone()).unwrap();
1749            let mut bindings = vec![];
1750            erased
1751                .visit_mut::<reference::UnboundPort>(|b| {
1752                    bindings.push(b.clone());
1753                    Ok(())
1754                })
1755                .unwrap();
1756            assert_eq!(bindings.len(), 0);
1757            let unbound = Unbound::try_from_message(no_port_message.clone()).unwrap();
1758            assert_eq!(no_port_message, unbound.bind().unwrap());
1759        }
1760    }
1761
1762    #[test]
1763    fn to_py_error_preserves_proc_creation_message() {
1764        // State<ProcState> w/ `state.is_none()`
1765        let state: resource::State<ProcState> = resource::State {
1766            name: Name::new("my_proc").unwrap(),
1767            status: Status::Failed("boom".into()),
1768            state: None,
1769            generation: 0,
1770            timestamp: std::time::SystemTime::now(),
1771        };
1772
1773        // A ProcCreationError
1774        let mesh_agent: hyperactor::reference::ActorRef<hyperactor_mesh::host_mesh::HostAgent> =
1775            hyperactor::reference::ActorRef::attest(
1776                test_port_id("hello_0", "actor", 0).actor_id().clone(),
1777            );
1778        let expected_prefix = format!(
1779            "error creating proc (host rank 0) on host mesh agent {}",
1780            mesh_agent
1781        );
1782        let err = MeshError::ProcCreationError {
1783            host_rank: 0,
1784            mesh_agent,
1785            state: Box::new(state),
1786        };
1787
1788        let rust_msg = err.to_string();
1789        let pyerr = to_py_error(err);
1790
1791        pyo3::Python::initialize();
1792        monarch_with_gil_blocking(|py| {
1793            assert!(pyerr.get_type(py).is(PyValueError::type_object(py)));
1794            let py_msg = pyerr.value(py).to_string();
1795
1796            // 1) Bridge preserves the exact message
1797            assert_eq!(py_msg, rust_msg);
1798            // 2) Contains the structured state and failure status
1799            assert!(py_msg.contains(", state: "));
1800            assert!(py_msg.contains("\"status\":{\"Failed\":\"boom\"}"));
1801            // 3) Starts with the expected prefix
1802            assert!(py_msg.starts_with(&expected_prefix));
1803        });
1804    }
1805}