monarch_hyperactor/
actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::error::Error;
10use std::fmt::Debug;
11use std::future::pending;
12use std::ops::Deref;
13use std::sync::OnceLock;
14
15use async_trait::async_trait;
16use hyperactor::Actor;
17use hyperactor::ActorHandle;
18use hyperactor::Context;
19use hyperactor::Handler;
20use hyperactor::Instance;
21use hyperactor::OncePortHandle;
22use hyperactor::PortHandle;
23use hyperactor::Proc;
24use hyperactor::RemoteSpawn;
25use hyperactor::actor::ActorError;
26use hyperactor::actor::ActorErrorKind;
27use hyperactor::actor::ActorStatus;
28use hyperactor::actor::Signal;
29use hyperactor::context::Actor as ContextActor;
30use hyperactor::mailbox::MessageEnvelope;
31use hyperactor::mailbox::Undeliverable;
32use hyperactor::message::Bind;
33use hyperactor::message::Bindings;
34use hyperactor::message::Unbind;
35use hyperactor::supervision::ActorSupervisionEvent;
36use hyperactor_config::Flattrs;
37use hyperactor_mesh::casting::update_undeliverable_envelope_for_casting;
38use hyperactor_mesh::comm::multicast::CAST_POINT;
39use hyperactor_mesh::comm::multicast::CastInfo;
40use hyperactor_mesh::supervision::MeshFailure;
41use hyperactor_mesh::transport::default_bind_spec;
42use monarch_types::PickledPyObject;
43use monarch_types::SerializablePyErr;
44use ndslice::Point;
45use ndslice::extent;
46use pyo3::IntoPyObjectExt;
47use pyo3::exceptions::PyBaseException;
48use pyo3::exceptions::PyRuntimeError;
49use pyo3::exceptions::PyValueError;
50use pyo3::prelude::*;
51use pyo3::types::PyDict;
52use pyo3::types::PyList;
53use pyo3::types::PyType;
54use serde::Deserialize;
55use serde::Serialize;
56use serde_multipart::Part;
57use tokio::sync::oneshot;
58use typeuri::Named;
59
60use crate::buffers::FrozenBuffer;
61use crate::config::ACTOR_QUEUE_DISPATCH;
62use crate::config::SHARED_ASYNCIO_RUNTIME;
63use crate::context::PyInstance;
64use crate::local_state_broker::BrokerId;
65use crate::local_state_broker::LocalStateBrokerMessage;
66use crate::mailbox::EitherPortRef;
67use crate::mailbox::PyMailbox;
68use crate::mailbox::PythonPortHandle;
69use crate::mailbox::PythonUndeliverableMessageEnvelope;
70use crate::metrics::ENDPOINT_ACTOR_COUNT;
71use crate::metrics::ENDPOINT_ACTOR_ERROR;
72use crate::metrics::ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM;
73use crate::metrics::ENDPOINT_ACTOR_PANIC;
74use crate::pickle::pickle_to_part;
75use crate::proc::PyActorId;
76use crate::pympsc;
77use crate::pytokio::PythonTask;
78use crate::runtime::get_proc_runtime;
79use crate::runtime::get_tokio_runtime;
80use crate::runtime::monarch_with_gil;
81use crate::runtime::monarch_with_gil_blocking;
82use crate::supervision::PyMeshFailure;
83
84#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
85#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86pub enum UnflattenArg {
87    Mailbox,
88    PyObject,
89}
90
91#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
92#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
93pub enum MethodSpecifier {
94    /// Call method 'name', send its return value to the response port.
95    ReturnsResponse { name: String },
96    /// Call method 'name', send the response port as the first argument.
97    ExplicitPort { name: String },
98    /// Construct the object
99    Init {},
100}
101
102impl std::fmt::Display for MethodSpecifier {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        write!(f, "{}", self.name())
105    }
106}
107
108#[pymethods]
109impl MethodSpecifier {
110    #[getter(name)]
111    fn py_name(&self) -> &str {
112        self.name()
113    }
114}
115
116impl MethodSpecifier {
117    pub(crate) fn name(&self) -> &str {
118        match self {
119            MethodSpecifier::ReturnsResponse { name } => name,
120            MethodSpecifier::ExplicitPort { name } => name,
121            MethodSpecifier::Init {} => "__init__",
122        }
123    }
124}
125
126#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
127#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq)]
128pub enum PythonMessageKind {
129    CallMethod {
130        name: MethodSpecifier,
131        response_port: Option<EitherPortRef>,
132    },
133    Result {
134        rank: Option<usize>,
135    },
136    Exception {
137        rank: Option<usize>,
138    },
139    Uninit {},
140    CallMethodIndirect {
141        name: MethodSpecifier,
142        local_state_broker: (String, usize),
143        id: usize,
144        // specify whether the argument to unflatten the local mailbox,
145        // or the next argument of the local state.
146        unflatten_args: Vec<UnflattenArg>,
147    },
148}
149wirevalue::register_type!(PythonMessageKind);
150
151impl Default for PythonMessageKind {
152    fn default() -> Self {
153        PythonMessageKind::Uninit {}
154    }
155}
156
157fn mailbox<'py, T: Actor>(py: Python<'py>, cx: &Context<'_, T>) -> Bound<'py, PyAny> {
158    let mailbox: PyMailbox = cx.mailbox_for_py().clone().into();
159    mailbox.into_bound_py_any(py).unwrap()
160}
161
162#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
163#[derive(Clone, Serialize, Deserialize, Named, Default, PartialEq)]
164pub struct PythonMessage {
165    pub kind: PythonMessageKind,
166    pub message: Part,
167}
168
169wirevalue::register_type!(PythonMessage);
170
171struct ResolvedCallMethod {
172    method: MethodSpecifier,
173    bytes: FrozenBuffer,
174    local_state: Option<Py<PyAny>>,
175    /// Implements PortProtocol
176    /// Concretely either a Port, DroppingPort, or LocalPort
177    response_port: ResponsePort,
178}
179
180enum ResponsePort {
181    Dropping,
182    Port(Port),
183    Local(LocalPort),
184}
185
186impl ResponsePort {
187    fn into_py_any(self, py: Python<'_>) -> PyResult<Py<PyAny>> {
188        match self {
189            ResponsePort::Dropping => DroppingPort.into_py_any(py),
190            ResponsePort::Port(port) => port.into_py_any(py),
191            ResponsePort::Local(port) => port.into_py_any(py),
192        }
193    }
194}
195
196/// Message sent through the queue in queue-dispatch mode.
197/// Contains pre-resolved components ready for Python consumption.
198#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
199pub struct QueuedMessage {
200    #[pyo3(get)]
201    pub context: Py<crate::context::PyContext>,
202    #[pyo3(get)]
203    pub method: MethodSpecifier,
204    #[pyo3(get)]
205    pub bytes: FrozenBuffer,
206    #[pyo3(get)]
207    pub local_state: Py<PyAny>,
208    #[pyo3(get)]
209    pub response_port: Py<PyAny>,
210}
211
212impl PythonMessage {
213    pub fn new_from_buf(kind: PythonMessageKind, message: impl Into<Part>) -> Self {
214        Self {
215            kind,
216            message: message.into(),
217        }
218    }
219
220    pub fn into_rank(self, rank: usize) -> Self {
221        let rank = Some(rank);
222        match self.kind {
223            PythonMessageKind::Result { .. } => PythonMessage {
224                kind: PythonMessageKind::Result { rank },
225                message: self.message,
226            },
227            PythonMessageKind::Exception { .. } => PythonMessage {
228                kind: PythonMessageKind::Exception { rank },
229                message: self.message,
230            },
231            _ => panic!("PythonMessage is not a response but {:?}", self),
232        }
233    }
234    async fn resolve_indirect_call(
235        self,
236        cx: &Context<'_, PythonActor>,
237    ) -> anyhow::Result<ResolvedCallMethod> {
238        match self.kind {
239            PythonMessageKind::CallMethodIndirect {
240                name,
241                local_state_broker,
242                id,
243                unflatten_args,
244            } => {
245                let broker = BrokerId::new(local_state_broker).resolve(cx).await;
246                let (send, recv) = cx.open_once_port();
247                broker.send(cx, LocalStateBrokerMessage::Get(id, send))?;
248                let state = recv.recv().await?;
249                let mut state_it = state.state.into_iter();
250                monarch_with_gil(|py| {
251                    let mailbox = mailbox(py, cx);
252                    let local_state = Some(
253                        PyList::new(
254                            py,
255                            unflatten_args.into_iter().map(|x| -> Bound<'_, PyAny> {
256                                match x {
257                                    UnflattenArg::Mailbox => mailbox.clone(),
258                                    UnflattenArg::PyObject => {
259                                        state_it.next().unwrap().into_bound(py)
260                                    }
261                                }
262                            }),
263                        )
264                        .unwrap()
265                        .into(),
266                    );
267                    let response_port = ResponsePort::Local(LocalPort {
268                        instance: cx.into(),
269                        inner: Some(state.response_port),
270                    });
271                    Ok(ResolvedCallMethod {
272                        method: name,
273                        bytes: FrozenBuffer {
274                            inner: self.message.into_bytes(),
275                        },
276                        local_state,
277                        response_port,
278                    })
279                })
280                .await
281            }
282            PythonMessageKind::CallMethod {
283                name,
284                response_port,
285            } => {
286                let response_port = response_port.map_or(ResponsePort::Dropping, |port_ref| {
287                    let point = cx.cast_point();
288                    ResponsePort::Port(Port {
289                        port_ref,
290                        instance: cx.instance().clone_for_py(),
291                        rank: Some(point.rank()),
292                    })
293                });
294                Ok(ResolvedCallMethod {
295                    method: name,
296                    bytes: FrozenBuffer {
297                        inner: self.message.into_bytes(),
298                    },
299                    local_state: None,
300                    response_port,
301                })
302            }
303            _ => {
304                panic!("unexpected message kind {:?}", self.kind)
305            }
306        }
307    }
308}
309
310impl std::fmt::Debug for PythonMessage {
311    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
312        f.debug_struct("PythonMessage")
313            .field("kind", &self.kind)
314            .field(
315                "message",
316                &wirevalue::HexFmt(&(*self.message.to_bytes())[..]).to_string(),
317            )
318            .finish()
319    }
320}
321
322impl Unbind for PythonMessage {
323    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
324        match &self.kind {
325            PythonMessageKind::CallMethod { response_port, .. } => response_port.unbind(bindings),
326            _ => Ok(()),
327        }
328    }
329}
330
331impl Bind for PythonMessage {
332    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
333        match &mut self.kind {
334            PythonMessageKind::CallMethod { response_port, .. } => response_port.bind(bindings),
335            _ => Ok(()),
336        }
337    }
338}
339
340#[pymethods]
341impl PythonMessage {
342    #[new]
343    #[pyo3(signature = (kind, message))]
344    pub fn new<'py>(kind: PythonMessageKind, message: PyRef<'py, FrozenBuffer>) -> PyResult<Self> {
345        Ok(PythonMessage::new_from_buf(kind, message.inner.clone()))
346    }
347
348    #[getter]
349    fn kind(&self) -> PythonMessageKind {
350        self.kind.clone()
351    }
352
353    #[getter]
354    fn message(&self) -> FrozenBuffer {
355        FrozenBuffer {
356            inner: self.message.to_bytes(),
357        }
358    }
359}
360
361#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
362pub(super) struct PythonActorHandle {
363    pub(super) inner: ActorHandle<PythonActor>,
364}
365
366#[pymethods]
367impl PythonActorHandle {
368    // TODO: do the pickling in rust
369    fn send(&self, instance: &PyInstance, message: &PythonMessage) -> PyResult<()> {
370        self.inner
371            .send(instance.deref(), message.clone())
372            .map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
373        Ok(())
374    }
375
376    fn bind(&self) -> PyActorId {
377        self.inner.bind::<PythonActor>().into_actor_id().into()
378    }
379}
380
381/// Dispatch mode for Python actors.
382#[derive(Debug)]
383pub enum PythonActorDispatchMode {
384    /// Direct dispatch: Rust acquires the GIL and calls Python handlers directly.
385    Direct,
386    /// Queue dispatch: Rust enqueues messages to a channel; Python dequeues and dispatches.
387    Queue {
388        /// Channel sender for enqueuing messages to Python.
389        sender: pympsc::Sender,
390        /// Channel receiver, taken during Actor::init to start the message loop.
391        receiver: Option<pympsc::PyReceiver>,
392    },
393}
394
395/// An actor for which message handlers are implemented in Python.
396#[derive(Debug)]
397#[hyperactor::export(
398    spawn = true,
399    handlers = [
400        PythonMessage { cast = true },
401        MeshFailure { cast = true },
402    ],
403)]
404pub struct PythonActor {
405    /// The Python object that we delegate message handling to.
406    actor: Py<PyAny>,
407    /// Stores a reference to the Python event loop to run Python coroutines on.
408    /// This is None when using single runtime mode, Some when using per-actor mode.
409    task_locals: Option<pyo3_async_runtimes::TaskLocals>,
410    /// Instance object that we keep across handle calls so that we can store
411    /// information from the Init (spawn rank, controller) and provide it to other calls.
412    instance: Option<Py<crate::context::PyInstance>>,
413    /// Dispatch mode for this actor.
414    dispatch_mode: PythonActorDispatchMode,
415    /// The location in the actor mesh at which this actor was spawned.
416    spawn_point: OnceLock<Option<Point>>,
417    /// Initial message to process during PythonActor::init.
418    init_message: Option<PythonMessage>,
419}
420
421impl PythonActor {
422    pub(crate) fn new(
423        actor_type: PickledPyObject,
424        init_message: Option<PythonMessage>,
425        spawn_point: Option<Point>,
426    ) -> Result<Self, anyhow::Error> {
427        let use_queue_dispatch = hyperactor_config::global::get(ACTOR_QUEUE_DISPATCH);
428
429        Ok(monarch_with_gil_blocking(
430            |py| -> Result<Self, SerializablePyErr> {
431                let unpickled = actor_type.unpickle(py)?;
432                let class_type: &Bound<'_, PyType> = unpickled.downcast()?;
433                let actor: Py<PyAny> = class_type.call0()?.into_py_any(py)?;
434
435                // Only create per-actor TaskLocals if not using shared runtime
436                let task_locals = (!hyperactor_config::global::get(SHARED_ASYNCIO_RUNTIME))
437                    .then(|| Python::detach(py, create_task_locals));
438
439                let dispatch_mode = if use_queue_dispatch {
440                    let (sender, receiver) = pympsc::channel().map_err(|e| {
441                        let py_err = PyRuntimeError::new_err(e.to_string());
442                        SerializablePyErr::from(py, &py_err)
443                    })?;
444                    PythonActorDispatchMode::Queue {
445                        sender,
446                        receiver: Some(receiver),
447                    }
448                } else {
449                    PythonActorDispatchMode::Direct
450                };
451
452                Ok(Self {
453                    actor,
454                    task_locals,
455                    instance: None,
456                    dispatch_mode,
457                    spawn_point: OnceLock::from(spawn_point),
458                    init_message,
459                })
460            },
461        )?)
462    }
463
464    /// Get the TaskLocals to use for this actor.
465    /// Returns either the shared TaskLocals or this actor's own TaskLocals based on configuration.
466    fn get_task_locals(&self, py: Python) -> &pyo3_async_runtimes::TaskLocals {
467        self.task_locals
468            .as_ref()
469            .unwrap_or_else(|| shared_task_locals(py))
470    }
471
472    /// Bootstrap the root client actor, creating a new proc for it.
473    /// This is the legacy entry point that creates its own proc.
474    pub(crate) fn bootstrap_client(py: Python<'_>) -> (&'static Instance<Self>, ActorHandle<Self>) {
475        static ROOT_CLIENT_INSTANCE: OnceLock<Instance<PythonActor>> = OnceLock::new();
476
477        let client_proc = Proc::direct(
478            default_bind_spec().binding_addr(),
479            "mesh_root_client_proc".into(),
480        )
481        .unwrap();
482
483        Self::bootstrap_client_inner(py, client_proc, &ROOT_CLIENT_INSTANCE)
484    }
485
486    /// Bootstrap the client proc, storing the root client instance in given static.
487    /// This is passed in because we require storage, as the instance is shared.
488    /// This can be simplified when we remove v0.
489    pub(crate) fn bootstrap_client_inner(
490        py: Python<'_>,
491        client_proc: Proc,
492        root_client_instance: &'static OnceLock<Instance<PythonActor>>,
493    ) -> (&'static Instance<Self>, ActorHandle<Self>) {
494        let actor_mesh_mod = py
495            .import("monarch._src.actor.actor_mesh")
496            .expect("import actor_mesh");
497        let root_client_class = actor_mesh_mod
498            .getattr("RootClientActor")
499            .expect("get RootClientActor");
500
501        let actor_type =
502            PickledPyObject::pickle(&actor_mesh_mod.getattr("_Actor").expect("get _Actor"))
503                .expect("pickle _Actor");
504
505        let init_frozen_buffer: FrozenBuffer = root_client_class
506            .call_method0("_pickled_init_args")
507            .expect("call RootClientActor._pickled_init_args")
508            .extract()
509            .expect("extract FrozenBuffer from _pickled_init_args");
510        let init_message = PythonMessage::new_from_buf(
511            PythonMessageKind::CallMethod {
512                name: MethodSpecifier::Init {},
513                response_port: None,
514            },
515            init_frozen_buffer,
516        );
517
518        let mut actor = PythonActor::new(
519            actor_type,
520            Some(init_message),
521            Some(extent!().point_of_rank(0).unwrap()),
522        )
523        .expect("create client PythonActor");
524
525        let ai = client_proc
526            .actor_instance(
527                root_client_class
528                    .getattr("name")
529                    .expect("get RootClientActor.name")
530                    .extract()
531                    .expect("extract RootClientActor.name"),
532            )
533            .expect("root instance create");
534
535        let handle = ai.handle;
536        let signal_rx = ai.signal;
537        let supervision_rx = ai.supervision;
538        let work_rx = ai.work;
539
540        root_client_instance
541            .set(ai.instance)
542            .map_err(|_| "already initialized root client instance")
543            .unwrap();
544        let instance = root_client_instance.get().unwrap();
545
546        // The root client PythonActor uses a custom run loop that
547        // bypasses Actor::init, so mark it as system explicitly
548        // (matching GlobalClientActor::fresh_instance).
549        instance.set_system();
550
551        // Bind to ensure the Signal and Undeliverable<MessageEnvelope> ports
552        // are bound.
553        let _client_ref = handle.bind::<PythonActor>();
554
555        get_tokio_runtime().spawn(async move {
556            // This is gross. Sorry.
557            actor.init(instance).await.unwrap();
558
559            let mut signal_rx = signal_rx;
560            let mut supervision_rx = supervision_rx;
561            let mut work_rx = work_rx;
562            let mut need_drain = false;
563            let mut err = 'messages: loop {
564                tokio::select! {
565                    work = work_rx.recv() => {
566                        let work = work.expect("inconsistent work queue state");
567                        if let Err(err) = work.handle(&mut actor, instance).await {
568                            let kind = ActorErrorKind::processing(err);
569                            let err = ActorError {
570                                actor_id: Box::new(instance.self_id().clone()),
571                                kind: Box::new(kind),
572                            };
573                            // Give the actor a chance to handle the error produced
574                            // in its own message handler. This is important because
575                            // we want Undeliverable<MessageEnvelope>, which returns
576                            // an Err typically, to create a supervision event and
577                            // call __supervise__.
578                            let supervision_event = actor_error_to_event(instance, &actor, err);
579                            // If the immediate supervision event isn't handled, continue with
580                            // exiting the loop.
581                            // Else, continue handling messages.
582                            if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
583                                for supervision_event in supervision_rx.drain() {
584                                    if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
585                                        break 'messages Some(err);
586                                    }
587                                }
588                                break Some(err);
589                            }
590                        }
591                    }
592                    signal = signal_rx.recv() => {
593                        let signal = signal.map_err(ActorError::from);
594                        tracing::info!(actor_id = %instance.self_id(), "client received signal {signal:?}");
595                        match signal {
596                            Ok(signal@(Signal::Stop(_) | Signal::DrainAndStop(_))) => {
597                                need_drain = matches!(signal, Signal::DrainAndStop(_));
598                                break None;
599                            },
600                            Ok(Signal::ChildStopped(_)) => {},
601                            Ok(Signal::Abort(reason)) => {
602                                break Some(ActorError { actor_id: Box::new(instance.self_id().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) })
603                            },
604                            Err(err) => break Some(err),
605                        }
606                    }
607                    Ok(supervision_event) = supervision_rx.recv() => {
608                        if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
609                            break Some(err);
610                        }
611                    }
612                };
613            };
614            if need_drain {
615                let mut n = 0;
616                while let Ok(work) = work_rx.try_recv() {
617                    if let Err(e) = work.handle(&mut actor, instance).await {
618                        err = Some(ActorError {
619                            actor_id: Box::new(instance.self_id().clone()),
620                            kind: Box::new(ActorErrorKind::processing(e)),
621                        });
622                        break;
623                    }
624                    n += 1;
625                }
626                tracing::debug!(actor_id = %instance.self_id(), "client drained {} messages before stopping", n);
627            }
628            if let Some(err) = err {
629                let event = actor_error_to_event(instance, &actor, err);
630                // The proc supervision handler will send to ProcAgent, which
631                // just records it in v1. We want to crash instead, as nothing will
632                // monitor the client ProcAgent for now.
633                tracing::error!(
634                    actor_id = %instance.self_id(),
635                    "could not propagate supervision event {} because it reached the global client: signaling KeyboardInterrupt to main thread",
636                    event,
637                );
638
639                // This is running in a background thread, and thus cannot run
640                // Py_FinalizeEx when it exits the process to properly shut down
641                // all python objects.
642                // We use _thread.interrupt_main to raise a KeyboardInterrupt
643                // to the main thread at some point in the future.
644                // There is no way to propagate the exception message, but it
645                // will at least run proper shutdown code as long as BaseException
646                // isn't caught.
647                monarch_with_gil_blocking(|py| {
648                    // Use _thread.interrupt_main to force the client to exit if it has an
649                    // unhandled supervision event.
650                    let thread_mod = py.import("_thread").expect("import _thread");
651                    let interrupt_main = thread_mod
652                        .getattr("interrupt_main")
653                        .expect("get interrupt_main");
654
655                    // Ignore any exception from calling interrupt_main
656                    if let Err(e) = interrupt_main.call0() {
657                        tracing::error!("unable to interrupt main, exiting the process instead: {:?}", e);
658                        eprintln!("unable to interrupt main, exiting the process with code 1 instead: {:?}", e);
659                        std::process::exit(1);
660                    }
661                });
662            } else {
663                tracing::info!(actor_id = %instance.self_id(), "client stopped");
664            }
665        });
666
667        (root_client_instance.get().unwrap(), handle)
668    }
669}
670
671fn actor_error_to_event(
672    instance: &Instance<PythonActor>,
673    actor: &PythonActor,
674    err: ActorError,
675) -> ActorSupervisionEvent {
676    match *err.kind {
677        ActorErrorKind::UnhandledSupervisionEvent(event) => *event,
678        _ => {
679            let status = ActorStatus::generic_failure(err.kind.to_string());
680            ActorSupervisionEvent::new(
681                instance.self_id().clone(),
682                actor.display_name(),
683                status,
684                None,
685            )
686        }
687    }
688}
689
690pub(crate) fn root_client_actor(py: Python<'_>) -> &'static Instance<PythonActor> {
691    static ROOT_CLIENT_ACTOR: OnceLock<&'static Instance<PythonActor>> = OnceLock::new();
692
693    // Release the GIL before waiting on ROOT_CLIENT_ACTOR, because PythonActor::bootstrap_client
694    // may release/reacquire the GIL; if thread 0 holds the GIL blocking on ROOT_CLIENT_ACTOR.get_or_init
695    // while thread 1 blocks on acquiring the GIL inside PythonActor::bootstrap_client, we get
696    // a deadlock.
697    py.detach(|| {
698        ROOT_CLIENT_ACTOR.get_or_init(|| {
699            monarch_with_gil_blocking(|py| {
700                let (client, _handle) = PythonActor::bootstrap_client(py);
701                client
702            })
703        })
704    })
705}
706
707#[async_trait]
708impl Actor for PythonActor {
709    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
710        if let PythonActorDispatchMode::Queue { receiver, .. } = &mut self.dispatch_mode {
711            let receiver = receiver.take().unwrap();
712
713            // Create an error port that converts PythonMessage to an abort signal.
714            // This allows Python to send errors that trigger actor supervision.
715            let error_port: hyperactor::PortHandle<PythonMessage> =
716                this.port::<Signal>().contramap(|msg: PythonMessage| {
717                    monarch_with_gil_blocking(|py| {
718                        let err = match msg.kind {
719                            PythonMessageKind::Exception { .. } => {
720                                // Deserialize the error from the message
721                                let cloudpickle = py.import("cloudpickle").unwrap();
722                                let err_obj = cloudpickle
723                                    .call_method1("loads", (msg.message.to_bytes().as_ref(),))
724                                    .unwrap();
725                                let py_err = pyo3::PyErr::from_value(err_obj);
726                                SerializablePyErr::from(py, &py_err)
727                            }
728                            _ => {
729                                let py_err = PyRuntimeError::new_err(format!(
730                                    "expected Exception, got {:?}",
731                                    msg.kind
732                                ));
733                                SerializablePyErr::from(py, &py_err)
734                            }
735                        };
736                        Signal::Abort(err.to_string())
737                    })
738                });
739
740            let error_port_handle = PythonPortHandle::new(error_port);
741
742            monarch_with_gil(|py| {
743                let tl = self
744                    .task_locals
745                    .as_ref()
746                    .unwrap_or_else(|| shared_task_locals(py));
747                let awaitable = self.actor.call_method(
748                    py,
749                    "_dispatch_loop",
750                    (receiver, error_port_handle),
751                    None,
752                )?;
753                let future =
754                    pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))?;
755                tokio::spawn(async move {
756                    if let Err(e) = future.await {
757                        tracing::error!("message loop error: {}", e);
758                    }
759                });
760                Ok::<_, anyhow::Error>(())
761            })
762            .await?;
763        }
764
765        if let Some(init_message) = self.init_message.take() {
766            let spawn_point = self.spawn_point.get().unwrap().as_ref().expect("PythonActor should never be spawned with init_message unless spawn_point also specified").clone();
767            let mut headers = Flattrs::new();
768            headers.set(CAST_POINT, spawn_point);
769            let cx = Context::new(this, headers);
770            <Self as Handler<PythonMessage>>::handle(self, &cx, init_message).await?;
771        }
772
773        Ok(())
774    }
775
776    async fn cleanup(
777        &mut self,
778        this: &Instance<Self>,
779        err: Option<&ActorError>,
780    ) -> anyhow::Result<()> {
781        // Calls the "__cleanup__" method on the python instance to allow the actor
782        // to control its own cleanup.
783        // No headers because this isn't in the context of a message.
784        let cx = Context::new(this, Flattrs::new());
785        // Turn the ActorError into a representation of the error. We may not
786        // have an original exception object or traceback, so we just pass in
787        // the message.
788        let err_as_str = err.map(|e| e.to_string());
789        let future = monarch_with_gil(|py| {
790            let py_cx = match &self.instance {
791                Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
792                None => {
793                    let py_instance: crate::context::PyInstance = this.into();
794                    crate::context::PyContext::new(
795                        &cx,
796                        py_instance
797                            .into_py_any(py)?
798                            .downcast_bound(py)
799                            .map_err(PyErr::from)?
800                            .clone()
801                            .unbind(),
802                    )
803                }
804            }
805            .into_bound_py_any(py)?;
806            let actor = self.actor.bind(py);
807            // Some tests don't use the Actor base class, so add this check
808            // to be defensive.
809            match actor.hasattr("__cleanup__") {
810                Ok(false) | Err(_) => {
811                    // No cleanup found, default to returning None
812                    return Ok(None);
813                }
814                _ => {}
815            }
816            let awaitable = actor
817                .call_method("__cleanup__", (&py_cx, err_as_str), None)
818                .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?;
819            if awaitable.is_none() {
820                Ok(None)
821            } else {
822                pyo3_async_runtimes::into_future_with_locals(self.get_task_locals(py), awaitable)
823                    .map(Some)
824                    .map_err(anyhow::Error::from)
825            }
826        })
827        .await?;
828        if let Some(future) = future {
829            future.await.map_err(anyhow::Error::from)?;
830        }
831        Ok(())
832    }
833
834    fn display_name(&self) -> Option<String> {
835        self.instance.as_ref().and_then(|instance| {
836            monarch_with_gil_blocking(|py| instance.bind(py).str().ok().map(|s| s.to_string()))
837        })
838    }
839
840    async fn handle_undeliverable_message(
841        &mut self,
842        ins: &Instance<Self>,
843        mut envelope: Undeliverable<MessageEnvelope>,
844    ) -> Result<(), anyhow::Error> {
845        if envelope.0.sender() != ins.self_id() {
846            // This can happen if the sender is comm. Update the envelope.
847            envelope = update_undeliverable_envelope_for_casting(envelope);
848        }
849        assert_eq!(
850            envelope.0.sender(),
851            ins.self_id(),
852            "undeliverable message was returned to the wrong actor. \
853            Return address = {}, src actor = {}, dest actor port = {}, envelope headers = {}",
854            envelope.0.sender(),
855            ins.self_id(),
856            envelope.0.dest(),
857            envelope.0.headers()
858        );
859
860        let cx = Context::new(ins, envelope.0.headers().clone());
861
862        let (envelope, handled) = monarch_with_gil(|py| {
863            let py_cx = match &self.instance {
864                Some(instance) => crate::context::PyContext::new(&cx, instance.clone_ref(py)),
865                None => {
866                    let py_instance: crate::context::PyInstance = ins.into();
867                    crate::context::PyContext::new(
868                        &cx,
869                        py_instance
870                            .into_py_any(py)?
871                            .downcast_bound(py)
872                            .map_err(PyErr::from)?
873                            .clone()
874                            .unbind(),
875                    )
876                }
877            }
878            .into_bound_py_any(py)?;
879            let py_envelope = PythonUndeliverableMessageEnvelope {
880                inner: Some(envelope),
881            }
882            .into_bound_py_any(py)?;
883            let handled = self
884                .actor
885                .call_method(
886                    py,
887                    "_handle_undeliverable_message",
888                    (&py_cx, &py_envelope),
889                    None,
890                )
891                .map_err(|err| anyhow::Error::from(SerializablePyErr::from(py, &err)))?
892                .extract::<bool>(py)?;
893            Ok::<_, anyhow::Error>((
894                py_envelope
895                    .downcast::<PythonUndeliverableMessageEnvelope>()
896                    .map_err(PyErr::from)?
897                    .try_borrow_mut()
898                    .map_err(PyErr::from)?
899                    .take()?,
900                handled,
901            ))
902        })
903        .await?;
904
905        if !handled {
906            hyperactor::actor::handle_undeliverable_message(ins, envelope)
907        } else {
908            Ok(())
909        }
910    }
911
912    async fn handle_supervision_event(
913        &mut self,
914        this: &Instance<Self>,
915        event: &ActorSupervisionEvent,
916    ) -> Result<bool, anyhow::Error> {
917        let cx = Context::new(this, Flattrs::new());
918        self.handle(
919            &cx,
920            MeshFailure {
921                actor_mesh_name: None,
922                rank: None,
923                event: event.clone(),
924            },
925        )
926        .await
927        .map(|_| true)
928    }
929}
930
931#[derive(Debug, Clone, Serialize, Deserialize, Named)]
932pub struct PythonActorParams {
933    // The pickled actor class to instantiate.
934    actor_type: PickledPyObject,
935    // Python message to process as part of the actor initialization.
936    init_message: Option<PythonMessage>,
937}
938
939impl PythonActorParams {
940    pub(crate) fn new(actor_type: PickledPyObject, init_message: Option<PythonMessage>) -> Self {
941        Self {
942            actor_type,
943            init_message,
944        }
945    }
946}
947
948#[async_trait]
949impl RemoteSpawn for PythonActor {
950    type Params = PythonActorParams;
951
952    async fn new(
953        PythonActorParams {
954            actor_type,
955            init_message,
956        }: PythonActorParams,
957        environment: Flattrs,
958    ) -> Result<Self, anyhow::Error> {
959        let spawn_point = environment.get(CAST_POINT);
960        Self::new(actor_type, init_message, spawn_point)
961    }
962}
963
964/// Create a new TaskLocals with its own asyncio event loop in a dedicated thread.
965fn create_task_locals() -> pyo3_async_runtimes::TaskLocals {
966    monarch_with_gil_blocking(|py| {
967        let asyncio = Python::import(py, "asyncio").unwrap();
968        let event_loop = asyncio.call_method0("new_event_loop").unwrap();
969        let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
970            .copy_context(py)
971            .unwrap();
972
973        let kwargs = PyDict::new(py);
974        let target = event_loop.getattr("run_forever").unwrap();
975        kwargs.set_item("target", target).unwrap();
976        // Need to make this a daemon thread, otherwise shutdown will hang.
977        kwargs.set_item("daemon", true).unwrap();
978        let thread = py
979            .import("threading")
980            .unwrap()
981            .call_method("Thread", (), Some(&kwargs))
982            .unwrap();
983        thread.call_method0("start").unwrap();
984        task_locals
985    })
986}
987
988/// Get the shared TaskLocals, creating it if necessary.
989fn shared_task_locals(py: Python) -> &'static pyo3_async_runtimes::TaskLocals {
990    static SHARED_TASK_LOCALS: OnceLock<pyo3_async_runtimes::TaskLocals> = OnceLock::new();
991    Python::detach(py, || SHARED_TASK_LOCALS.get_or_init(create_task_locals))
992}
993
994// [Panics in async endpoints]
995// This class exists to solve a deadlock when an async endpoint calls into some
996// Rust code that panics.
997//
998// When an async endpoint is invoked and calls into Rust, the following sequence happens:
999//
1000// hyperactor message -> PythonActor::handle() -> call _Actor.handle() in Python
1001//   -> convert the resulting coroutine into a Rust future, but scheduled on
1002//      the Python asyncio event loop (`into_future_with_locals`)
1003//   -> set a callback on Python asyncio loop to ping a channel that fulfills
1004//      the Rust future when the Python coroutine has finished. ('PyTaskCompleter`)
1005//
1006// This works fine for normal results and Python exceptions: we will take the
1007// result of the callback and send it through the channel, where it will be
1008// returned to the `await`er of the Rust future.
1009//
1010// This DOESN'T work for panics. The behavior of a panic in pyo3-bound code is
1011// that it will get caught by pyo3 and re-thrown to Python as a PanicException.
1012// And if that PanicException ever makes it back to Rust, it will get unwound
1013// instead of passed around as a normal PyErr type.
1014//
1015// So:
1016//   - Endpoint panics.
1017//   - This panic is captured as a PanicException in Python and
1018//     stored as the result of the Python asyncio task.
1019//   - When the callback in `PyTaskCompleter` queries the status of the task to
1020//     pass it back to the Rust awaiter, instead of getting a Result type, it
1021//     just starts resumes unwinding the PanicException
1022//   - This triggers a deadlock, because the whole task dies without ever
1023//     pinging the response channel, and the Rust awaiter will never complete.
1024//
1025// We work around this by passing a side-channel to our Python task so that it,
1026// in Python, can catch the PanicException and notify the Rust awaiter manually.
1027// In this way we can guarantee that the awaiter will complete even if the
1028// `PyTaskCompleter` callback explodes.
1029#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1030struct PanicFlag {
1031    sender: Option<tokio::sync::oneshot::Sender<Py<PyAny>>>,
1032}
1033
1034#[pymethods]
1035impl PanicFlag {
1036    fn signal_panic(&mut self, ex: Py<PyAny>) {
1037        self.sender.take().unwrap().send(ex).unwrap();
1038    }
1039}
1040
1041#[async_trait]
1042impl Handler<PythonMessage> for PythonActor {
1043    #[tracing::instrument(level = "debug", skip_all)]
1044    async fn handle(
1045        &mut self,
1046        cx: &Context<PythonActor>,
1047        message: PythonMessage,
1048    ) -> anyhow::Result<()> {
1049        match &self.dispatch_mode {
1050            PythonActorDispatchMode::Direct => self.handle_direct(cx, message).await,
1051            PythonActorDispatchMode::Queue { sender, .. } => {
1052                let sender = sender.clone();
1053                self.handle_queue(cx, sender, message).await
1054            }
1055        }
1056    }
1057}
1058
1059impl PythonActor {
1060    /// Handle a message using direct dispatch (current behavior).
1061    async fn handle_direct(
1062        &mut self,
1063        cx: &Context<'_, PythonActor>,
1064        message: PythonMessage,
1065    ) -> anyhow::Result<()> {
1066        let resolved = message.resolve_indirect_call(cx).await?;
1067        let endpoint = resolved.method.to_string();
1068
1069        // Create a channel for signaling panics in async endpoints.
1070        // See [Panics in async endpoints].
1071        let (sender, receiver) = oneshot::channel();
1072
1073        let future = monarch_with_gil(|py| -> Result<_, SerializablePyErr> {
1074            let inst = self.instance.get_or_insert_with(|| {
1075                let inst: crate::context::PyInstance = cx.into();
1076                inst.into_pyobject(py).unwrap().into()
1077            });
1078
1079            let awaitable = self.actor.call_method(
1080                py,
1081                "handle",
1082                (
1083                    crate::context::PyContext::new(cx, inst.clone_ref(py)),
1084                    resolved.method,
1085                    resolved.bytes,
1086                    PanicFlag {
1087                        sender: Some(sender),
1088                    },
1089                    resolved
1090                        .local_state
1091                        .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1092                    resolved.response_port.into_py_any(py)?,
1093                ),
1094                None,
1095            )?;
1096
1097            let tl = self
1098                .task_locals
1099                .as_ref()
1100                .unwrap_or_else(|| shared_task_locals(py));
1101
1102            pyo3_async_runtimes::into_future_with_locals(tl, awaitable.into_bound(py))
1103                .map_err(|err| err.into())
1104        })
1105        .await?;
1106
1107        // Spawn a child actor to await the Python handler method.
1108        tokio::spawn(handle_async_endpoint_panic(
1109            cx.port(),
1110            PythonTask::new(future)?,
1111            receiver,
1112            cx.self_id().to_string(),
1113            endpoint,
1114        ));
1115        Ok(())
1116    }
1117
1118    /// Handle a message using queue dispatch.
1119    /// Resolves the message on the Rust side and enqueues it for Python to process.
1120    async fn handle_queue(
1121        &mut self,
1122        cx: &Context<'_, PythonActor>,
1123        sender: pympsc::Sender,
1124        message: PythonMessage,
1125    ) -> anyhow::Result<()> {
1126        let resolved = message.resolve_indirect_call(cx).await?;
1127
1128        let queued_msg = monarch_with_gil(|py| -> anyhow::Result<QueuedMessage> {
1129            let inst = self.instance.get_or_insert_with(|| {
1130                let inst: crate::context::PyInstance = cx.into();
1131                inst.into_pyobject(py).unwrap().into()
1132            });
1133
1134            let py_context = crate::context::PyContext::new(cx, inst.clone_ref(py));
1135            let py_context_obj = Py::new(py, py_context)?;
1136
1137            Ok(QueuedMessage {
1138                context: py_context_obj,
1139                method: resolved.method,
1140                bytes: resolved.bytes,
1141                local_state: resolved
1142                    .local_state
1143                    .unwrap_or_else(|| PyList::empty(py).unbind().into()),
1144                response_port: resolved.response_port.into_py_any(py)?,
1145            })
1146        })
1147        .await?;
1148
1149        sender
1150            .send(queued_msg)
1151            .map_err(|_| anyhow::anyhow!("failed to send message to queue"))?;
1152
1153        Ok(())
1154    }
1155}
1156
1157#[async_trait]
1158impl Handler<MeshFailure> for PythonActor {
1159    async fn handle(&mut self, cx: &Context<Self>, message: MeshFailure) -> anyhow::Result<()> {
1160        // If the message is not about a failure, don't call __supervise__.
1161        // This includes messages like "stop", because those are not errors that
1162        // need to be propagated.
1163        if !message.event.actor_status.is_failed() {
1164            tracing::info!(
1165                "ignoring non-failure supervision event from child: {}",
1166                message
1167            );
1168            return Ok(());
1169        }
1170        // TODO: Consider routing supervision messages through the queue for Queue mode.
1171        // For now, supervision is always handled directly since it requires immediate response.
1172
1173        monarch_with_gil(|py| {
1174            let inst = self.instance.get_or_insert_with(|| {
1175                let inst: crate::context::PyInstance = cx.into();
1176                inst.into_pyobject(py).unwrap().into()
1177            });
1178            // Compute display_name here since we can't call self.display_name() due to borrow.
1179            let display_name: Option<String> = inst.bind(py).str().ok().map(|s| s.to_string());
1180            let actor_bound = self.actor.bind(py);
1181            // The _Actor class always has a __supervise__ method, so this should
1182            // never happen.
1183            if !actor_bound.hasattr("__supervise__")? {
1184                return Err(anyhow::anyhow!(
1185                    "no __supervise__ method on {:?}",
1186                    actor_bound
1187                ));
1188            }
1189            let result = actor_bound.call_method(
1190                "__supervise__",
1191                (
1192                    crate::context::PyContext::new(cx, inst.clone_ref(py)),
1193                    PyMeshFailure::from(message.clone()),
1194                ),
1195                None,
1196            );
1197            match result {
1198                Ok(s) => {
1199                    if s.is_truthy()? {
1200                        // If the return value is truthy, then the exception was handled
1201                        // and doesn't need to be propagated.
1202                        // TODO: We also don't want to deliver multiple supervision
1203                        // events from the same mesh if an earlier one is handled.
1204                        tracing::info!(
1205                            name = "ActorMeshStatus",
1206                            status = "SupervisionError::Handled",
1207                            // only care about the event sender when the message is handled
1208                            actor_name = message.actor_mesh_name,
1209                            event = %message.event,
1210                            "__supervise__ on {} handled a supervision event, not reporting any further",
1211                            cx.self_id(),
1212                        );
1213                        Ok(())
1214                    } else {
1215                        // For a falsey return value, we propagate the supervision event
1216                        // to the next owning actor. We do this by returning a new
1217                        // error. This will not set the causal chain for ActorSupervisionEvent,
1218                        // so make sure to include the original event in the error message
1219                        // to provide context.
1220
1221                        // False -- we propagate the event onward, but update it with the fact that
1222                        // this actor is now the event creator.
1223                        for (actor_name, status) in [
1224                            (
1225                                message
1226                                    .actor_mesh_name
1227                                    .as_deref()
1228                                    .unwrap_or_else(|| message.event.actor_id.name()),
1229                                "SupervisionError::Unhandled",
1230                            ),
1231                            (cx.self_id().name(), "UnhandledSupervisionEvent"),
1232                        ] {
1233                            tracing::info!(
1234                                name = "ActorMeshStatus",
1235                                status,
1236                                actor_name,
1237                                event = %message.event,
1238                                "__supervise__ on {} did not handle a supervision event, reporting to the next next owner",
1239                                cx.self_id(),
1240                            );
1241                        }
1242                        let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1243                            ActorSupervisionEvent::new(
1244                                cx.self_id().clone(),
1245                                display_name.clone(),
1246                                ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(
1247                                    Box::new(message.event.clone()),
1248                                )),
1249                                None,
1250                            ),
1251                        ));
1252                        Err(anyhow::Error::new(err))
1253                    }
1254                }
1255                Err(err) => {
1256                    // Any other exception will supersede in the propagation chain,
1257                    // and will become its own supervision failure.
1258                    // Include the event it was handling in the error message.
1259
1260                    // Add to caused_by chain.
1261                    for (actor_name, status) in [
1262                        (
1263                            message
1264                                .actor_mesh_name
1265                                .as_deref()
1266                                .unwrap_or_else(|| message.event.actor_id.name()),
1267                            "SupervisionError::__supervise__::exception",
1268                        ),
1269                        (cx.self_id().name(), "UnhandledSupervisionEvent"),
1270                    ] {
1271                        tracing::info!(
1272                            name = "ActorMeshStatus",
1273                            status,
1274                            actor_name,
1275                            event = %message.event,
1276                            "__supervise__ on {} threw an exception",
1277                            cx.self_id(),
1278                        );
1279                    }
1280                    let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1281                        ActorSupervisionEvent::new(
1282                            cx.self_id().clone(),
1283                            display_name,
1284                            ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
1285                                err.to_string(),
1286                                Box::new(message.event.clone()),
1287                            )),
1288                            None,
1289                        ),
1290                    ));
1291                    Err(anyhow::Error::new(err))
1292                }
1293            }
1294        })
1295        .await
1296    }
1297}
1298
1299async fn handle_async_endpoint_panic(
1300    panic_sender: PortHandle<Signal>,
1301    task: PythonTask,
1302    side_channel: oneshot::Receiver<Py<PyAny>>,
1303    actor_id: String,
1304    endpoint: String,
1305) {
1306    // Create attributes for metrics with actor_id and endpoint
1307    let attributes =
1308        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id, "endpoint" => endpoint);
1309
1310    // Record the start time for latency measurement
1311    let start_time = std::time::Instant::now();
1312
1313    // Increment throughput counter
1314    ENDPOINT_ACTOR_COUNT.add(1, attributes);
1315
1316    let err_or_never = async {
1317        // The side channel will resolve with a value if a panic occured during
1318        // processing of the async endpoint, see [Panics in async endpoints].
1319        match side_channel.await {
1320            Ok(value) => {
1321                monarch_with_gil(|py| -> Option<SerializablePyErr> {
1322                    let err: PyErr = value
1323                        .downcast_bound::<PyBaseException>(py)
1324                        .unwrap()
1325                        .clone()
1326                        .into();
1327                    ENDPOINT_ACTOR_PANIC.add(1, attributes);
1328                    Some(err.into())
1329                })
1330                .await
1331            }
1332            // An Err means that the sender has been dropped without sending.
1333            // That's okay, it just means that the Python task has completed.
1334            // In that case, just never resolve this future. We expect the other
1335            // branch of the select to finish eventually.
1336            Err(_) => pending().await,
1337        }
1338    };
1339    let future = task.take();
1340    if let Some(panic) = tokio::select! {
1341        result = future => {
1342            match result {
1343                Ok(_) => None,
1344                Err(e) => Some(e.into()),
1345            }
1346        },
1347        result = err_or_never => {
1348            result
1349        }
1350    } {
1351        // Record error and panic metrics
1352        ENDPOINT_ACTOR_ERROR.add(1, attributes);
1353        static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
1354        let client = &CLIENT
1355            .get_or_init(|| {
1356                get_proc_runtime()
1357                    .instance("async_endpoint_handler")
1358                    .unwrap()
1359            })
1360            .0;
1361        panic_sender
1362            .send(&client, Signal::Abort(panic.to_string()))
1363            .expect("Unable to send panic message");
1364    }
1365
1366    // Record latency in microseconds
1367    let elapsed_micros = start_time.elapsed().as_micros() as f64;
1368    ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM.record(elapsed_micros, attributes);
1369}
1370
1371#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1372struct LocalPort {
1373    instance: PyInstance,
1374    inner: Option<OncePortHandle<Result<Py<PyAny>, Py<PyAny>>>>,
1375}
1376
1377impl Debug for LocalPort {
1378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1379        f.debug_struct("LocalPort")
1380            .field("inner", &self.inner)
1381            .finish()
1382    }
1383}
1384
1385pub(crate) fn to_py_error<T>(e: T) -> PyErr
1386where
1387    T: Error,
1388{
1389    PyErr::new::<PyValueError, _>(e.to_string())
1390}
1391
1392#[pymethods]
1393impl LocalPort {
1394    fn send(&mut self, obj: Py<PyAny>) -> PyResult<()> {
1395        let port = self.inner.take().expect("use local port once");
1396        port.send(self.instance.deref(), Ok(obj))
1397            .map_err(to_py_error)
1398    }
1399    fn exception(&mut self, e: Py<PyAny>) -> PyResult<()> {
1400        let port = self.inner.take().expect("use local port once");
1401        port.send(self.instance.deref(), Err(e))
1402            .map_err(to_py_error)
1403    }
1404}
1405
1406/// A port that drops all messages sent to it.
1407/// Used when there is no response port for a message.
1408/// Any exceptions sent to it are re-raised in the current actor.
1409#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
1410#[derive(Debug)]
1411pub struct DroppingPort;
1412
1413#[pymethods]
1414impl DroppingPort {
1415    #[new]
1416    fn new() -> Self {
1417        DroppingPort
1418    }
1419
1420    fn send(&self, _obj: Py<PyAny>) -> PyResult<()> {
1421        Ok(())
1422    }
1423
1424    fn exception(&self, e: Bound<'_, PyAny>) -> PyResult<()> {
1425        // Unwrap ActorError to get the inner exception, matching Python behavior.
1426        let exc = if let Ok(inner) = e.getattr("exception") {
1427            inner
1428        } else {
1429            e
1430        };
1431        Err(PyErr::from_value(exc))
1432    }
1433
1434    #[getter]
1435    fn get_return_undeliverable(&self) -> bool {
1436        true
1437    }
1438
1439    #[setter]
1440    fn set_return_undeliverable(&self, _value: bool) {}
1441}
1442
1443/// A port that sends messages to a remote receiver.
1444/// Wraps an EitherPortRef with the actor instance needed for sending.
1445#[pyclass(module = "monarch._src.actor.actor_mesh")]
1446pub struct Port {
1447    port_ref: EitherPortRef,
1448    instance: Instance<PythonActor>,
1449    rank: Option<usize>,
1450}
1451
1452#[pymethods]
1453impl Port {
1454    #[new]
1455    fn new(
1456        port_ref: EitherPortRef,
1457        instance: &crate::context::PyInstance,
1458        rank: Option<usize>,
1459    ) -> Self {
1460        Self {
1461            port_ref,
1462            instance: instance.clone().into_instance(),
1463            rank,
1464        }
1465    }
1466
1467    #[getter("_port_ref")]
1468    fn port_ref_py(&self) -> EitherPortRef {
1469        self.port_ref.clone()
1470    }
1471
1472    #[getter("_rank")]
1473    fn rank_py(&self) -> Option<usize> {
1474        self.rank
1475    }
1476
1477    #[getter]
1478    fn get_return_undeliverable(&self) -> bool {
1479        self.port_ref.get_return_undeliverable()
1480    }
1481
1482    #[setter]
1483    fn set_return_undeliverable(&mut self, value: bool) {
1484        self.port_ref.set_return_undeliverable(value);
1485    }
1486
1487    fn send(&mut self, py: Python<'_>, obj: Py<PyAny>) -> PyResult<()> {
1488        let message = PythonMessage::new_from_buf(
1489            PythonMessageKind::Result { rank: self.rank },
1490            pickle_to_part(py, &obj)?,
1491        );
1492
1493        self.port_ref
1494            .send(&self.instance, message)
1495            .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1496    }
1497
1498    fn exception(&mut self, py: Python<'_>, e: Py<PyAny>) -> PyResult<()> {
1499        let message = PythonMessage::new_from_buf(
1500            PythonMessageKind::Exception { rank: self.rank },
1501            pickle_to_part(py, &e)?,
1502        );
1503
1504        self.port_ref
1505            .send(&self.instance, message)
1506            .map_err(|e| PyRuntimeError::new_err(e.to_string()))
1507    }
1508}
1509
1510pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
1511    hyperactor_mod.add_class::<PythonActorHandle>()?;
1512    hyperactor_mod.add_class::<PythonMessage>()?;
1513    hyperactor_mod.add_class::<PythonMessageKind>()?;
1514    hyperactor_mod.add_class::<MethodSpecifier>()?;
1515    hyperactor_mod.add_class::<UnflattenArg>()?;
1516    hyperactor_mod.add_class::<PanicFlag>()?;
1517    hyperactor_mod.add_class::<QueuedMessage>()?;
1518    hyperactor_mod.add_class::<DroppingPort>()?;
1519    hyperactor_mod.add_class::<Port>()?;
1520    Ok(())
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525    use hyperactor::accum::ReducerSpec;
1526    use hyperactor::accum::StreamingReducerOpts;
1527    use hyperactor::message::ErasedUnbound;
1528    use hyperactor::message::Unbound;
1529    use hyperactor::reference;
1530    use hyperactor::testing::ids::test_port_id;
1531    use hyperactor_mesh::Error as MeshError;
1532    use hyperactor_mesh::Name;
1533    use hyperactor_mesh::host_mesh::host_agent::ProcState;
1534    use hyperactor_mesh::resource::Status;
1535    use hyperactor_mesh::resource::{self};
1536    use pyo3::PyTypeInfo;
1537
1538    use super::*;
1539    use crate::actor::to_py_error;
1540
1541    #[test]
1542    fn test_python_message_bind_unbind() {
1543        let reducer_spec = ReducerSpec {
1544            typehash: 123,
1545            builder_params: Some(wirevalue::Any::serialize(&"abcdefg12345".to_string()).unwrap()),
1546        };
1547        let port_ref = reference::PortRef::<PythonMessage>::attest_reducible(
1548            test_port_id("world_0", "client", 123),
1549            Some(reducer_spec),
1550            StreamingReducerOpts::default(),
1551        );
1552        let message = PythonMessage {
1553            kind: PythonMessageKind::CallMethod {
1554                name: MethodSpecifier::ReturnsResponse {
1555                    name: "test".to_string(),
1556                },
1557                response_port: Some(EitherPortRef::Unbounded(port_ref.clone().into())),
1558            },
1559            message: Part::from(vec![1, 2, 3]),
1560        };
1561        {
1562            let mut erased = ErasedUnbound::try_from_message(message.clone()).unwrap();
1563            let mut bindings = vec![];
1564            erased
1565                .visit_mut::<reference::UnboundPort>(|b| {
1566                    bindings.push(b.clone());
1567                    Ok(())
1568                })
1569                .unwrap();
1570            assert_eq!(bindings, vec![reference::UnboundPort::from(&port_ref)]);
1571            let unbound = Unbound::try_from_message(message.clone()).unwrap();
1572            assert_eq!(message, unbound.bind().unwrap());
1573        }
1574
1575        let no_port_message = PythonMessage {
1576            kind: PythonMessageKind::CallMethod {
1577                name: MethodSpecifier::ReturnsResponse {
1578                    name: "test".to_string(),
1579                },
1580                response_port: None,
1581            },
1582            ..message
1583        };
1584        {
1585            let mut erased = ErasedUnbound::try_from_message(no_port_message.clone()).unwrap();
1586            let mut bindings = vec![];
1587            erased
1588                .visit_mut::<reference::UnboundPort>(|b| {
1589                    bindings.push(b.clone());
1590                    Ok(())
1591                })
1592                .unwrap();
1593            assert_eq!(bindings.len(), 0);
1594            let unbound = Unbound::try_from_message(no_port_message.clone()).unwrap();
1595            assert_eq!(no_port_message, unbound.bind().unwrap());
1596        }
1597    }
1598
1599    #[test]
1600    fn to_py_error_preserves_proc_creation_message() {
1601        // State<ProcState> w/ `state.is_none()`
1602        let state: resource::State<ProcState> = resource::State {
1603            name: Name::new("my_proc").unwrap(),
1604            status: Status::Failed("boom".into()),
1605            state: None,
1606        };
1607
1608        // A ProcCreationError
1609        let mesh_agent: hyperactor::reference::ActorRef<hyperactor_mesh::host_mesh::HostAgent> =
1610            hyperactor::reference::ActorRef::attest(
1611                test_port_id("hello_0", "actor", 0).actor_id().clone(),
1612            );
1613        let expected_prefix = format!(
1614            "error creating proc (host rank 0) on host mesh agent {}",
1615            mesh_agent
1616        );
1617        let err = MeshError::ProcCreationError {
1618            host_rank: 0,
1619            mesh_agent,
1620            state: Box::new(state),
1621        };
1622
1623        let rust_msg = err.to_string();
1624        let pyerr = to_py_error(err);
1625
1626        pyo3::Python::initialize();
1627        monarch_with_gil_blocking(|py| {
1628            assert!(pyerr.get_type(py).is(PyValueError::type_object(py)));
1629            let py_msg = pyerr.value(py).to_string();
1630
1631            // 1) Bridge preserves the exact message
1632            assert_eq!(py_msg, rust_msg);
1633            // 2) Contains the structured state and failure status
1634            assert!(py_msg.contains(", state: "));
1635            assert!(py_msg.contains("\"status\":{\"Failed\":\"boom\"}"));
1636            // 3) Starts with the expected prefix
1637            assert!(py_msg.starts_with(&expected_prefix));
1638        });
1639    }
1640}