Skip to main content

monarch_hyperactor/
context.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use hyperactor::Instance;
10use hyperactor::context;
11use hyperactor_mesh::comm::multicast::CastInfo;
12use ndslice::Extent;
13use ndslice::Point;
14use pyo3::exceptions::PyRuntimeError;
15use pyo3::prelude::*;
16
17use crate::actor::PythonActor;
18use crate::actor::root_client_actor;
19use crate::mailbox::PyMailbox;
20use crate::proc::PyActorAddr;
21use crate::runtime;
22use crate::shape::PyPoint;
23
24#[pyclass(name = "Instance", module = "monarch._src.actor.actor_mesh")]
25pub struct PyInstance {
26    inner: Instance<PythonActor>,
27    #[pyo3(get, set)]
28    proc_mesh: Option<Py<PyAny>>,
29    #[pyo3(get, set, name = "_controller_controller")]
30    controller_controller: Option<Py<PyAny>>,
31    #[pyo3(get, set)]
32    pub(crate) rank: PyPoint,
33    #[pyo3(get, set, name = "_children")]
34    children: Option<Py<PyAny>>,
35
36    #[pyo3(get, set, name = "name")]
37    name: String,
38    #[pyo3(get, set, name = "class_name")]
39    class_name: Option<String>,
40    #[pyo3(get, set, name = "creator")]
41    creator: Option<Py<PyAny>>,
42
43    #[pyo3(get, set, name = "_mock_tensor_engine_factory")]
44    mock_tensor_engine_factory: Option<Py<PyAny>>,
45}
46
47impl Clone for PyInstance {
48    fn clone(&self) -> Self {
49        PyInstance {
50            inner: self.inner.clone_for_py(),
51            proc_mesh: self.proc_mesh.clone(),
52            controller_controller: self.controller_controller.clone(),
53            rank: self.rank.clone(),
54            children: self.children.clone(),
55            name: self.name.clone(),
56            class_name: self.class_name.clone(),
57            creator: self.creator.clone(),
58            mock_tensor_engine_factory: self.mock_tensor_engine_factory.clone(),
59        }
60    }
61}
62
63impl std::ops::Deref for PyInstance {
64    type Target = Instance<PythonActor>;
65
66    fn deref(&self) -> &Self::Target {
67        &self.inner
68    }
69}
70
71#[pymethods]
72impl PyInstance {
73    #[getter]
74    pub(crate) fn _mailbox(&self) -> PyMailbox {
75        PyMailbox {
76            inner: self.inner.mailbox_for_py().clone(),
77        }
78    }
79
80    #[getter]
81    pub fn actor_id(&self) -> PyActorAddr {
82        let actor_id: hyperactor::ActorAddr = self.inner.self_addr().clone();
83        actor_id.into()
84    }
85
86    #[pyo3(signature = (reason = None))]
87    fn abort(&self, reason: Option<&str>) -> PyResult<()> {
88        let reason = reason.unwrap_or("(no reason provided)");
89        Ok(self.inner.abort(reason).map_err(anyhow::Error::from)?)
90    }
91
92    #[pyo3(signature = (reason = None))]
93    fn stop(&self, reason: Option<&str>) -> PyResult<()> {
94        tracing::info!(actor_id = %self.inner.self_addr(), "stopping PyInstance");
95        let reason = reason.unwrap_or("(no reason provided)");
96        self.inner
97            .stop(reason)
98            .map_err(|e| PyRuntimeError::new_err(e.to_string()))
99    }
100
101    /// Stop the actor and return a future that resolves when it reaches
102    /// a terminal status (stopped or failed). This ensures all pending
103    /// messages are drained and connections are flushed before returning.
104    #[pyo3(signature = (reason = None))]
105    fn stop_and_wait(&self, reason: Option<&str>) -> PyResult<crate::pytokio::PyPythonTask> {
106        let reason = reason.unwrap_or("shutdown").to_string();
107        let actor_id = self.inner.self_addr().clone();
108        let proc = self.inner.proc().clone();
109        crate::pytokio::PyPythonTask::new(async move {
110            let status_rx = proc.stop_actor(actor_id.id(), reason);
111            if let Some(mut rx) = status_rx {
112                let _ = rx.wait_for(|s| s.is_terminal()).await;
113            }
114            if let Err(e) = proc.flush().await {
115                tracing::warn!(%actor_id, "stop_and_wait: flush failed: {}", e);
116            }
117            Ok(())
118        })
119    }
120
121    /// Mark this actor as system/infrastructure.
122    ///
123    /// **PY-SYS-2:** Python actors use the `_is_system_actor = True`
124    /// class attribute so that this is called during actor init,
125    /// before ProcAgent publishes its first introspection snapshot.
126    fn set_system(&self) {
127        self.inner.set_system();
128    }
129}
130
131impl PyInstance {
132    pub fn into_instance(self) -> Instance<PythonActor> {
133        self.inner
134    }
135}
136
137impl<I: context::Actor<A = PythonActor>> From<I> for PyInstance {
138    fn from(ins: I) -> Self {
139        PyInstance {
140            inner: ins.instance().clone_for_py(),
141            proc_mesh: None,
142            controller_controller: None,
143            rank: PyPoint::new(0, Extent::unity().into()),
144            children: None,
145            name: "root".to_string(),
146            class_name: None,
147            creator: None,
148            mock_tensor_engine_factory: None,
149        }
150    }
151}
152
153#[pyclass(name = "Context", module = "monarch._src.actor.actor_mesh")]
154pub struct PyContext {
155    instance: Py<PyInstance>,
156    rank: Point,
157    /// Cloneable handle to a span carrying the actor's recording key.
158    /// When entered, events emitted under this span are captured by
159    /// the per-actor flight recorder. `None` for bootstrap/client
160    /// contexts that are not actor handler execution paths.
161    recording_span: Option<tracing::Span>,
162}
163
164#[pymethods]
165impl PyContext {
166    #[getter]
167    fn actor_instance(&self) -> &Py<PyInstance> {
168        &self.instance
169    }
170
171    #[getter]
172    fn message_rank(&self) -> PyPoint {
173        self.rank.clone().into()
174    }
175
176    #[staticmethod]
177    fn _root_client_context(py: Python<'_>) -> PyResult<PyContext> {
178        let _guard = runtime::get_tokio_runtime().enter();
179        let instance: PyInstance = root_client_actor(py).into();
180        Ok(PyContext {
181            instance: instance.into_pyobject(py)?.into(),
182            rank: Extent::unity().point_of_rank(0).unwrap(),
183            recording_span: None,
184        })
185    }
186
187    /// Create a context from an existing instance.
188    /// This is used when the root client was bootstrapped via bootstrap_host()
189    /// instead of the default bootstrap_client().
190    #[staticmethod]
191    fn _from_instance(py: Python<'_>, instance: PyInstance) -> PyResult<PyContext> {
192        Ok(PyContext {
193            instance: instance.into_pyobject(py)?.into(),
194            rank: Extent::unity().point_of_rank(0).unwrap(),
195            recording_span: None,
196        })
197    }
198}
199
200impl PyContext {
201    pub(crate) fn new<T: hyperactor::actor::Actor>(
202        cx: &hyperactor::Context<T>,
203        instance: Py<PyInstance>,
204    ) -> PyContext {
205        PyContext {
206            instance,
207            rank: cx.cast_point(),
208            recording_span: Some(cx.recording_span()),
209        }
210    }
211
212    /// The actor's recording span, if this context is an actor handler
213    /// execution path. Used by `forward_to_tracing` to enter the
214    /// recording scope on the asyncio thread so that log events are
215    /// captured in the flight recorder.
216    pub(crate) fn recording_span(&self) -> Option<&tracing::Span> {
217        self.recording_span.as_ref()
218    }
219
220    /// Test-only: build a PyContext with a chosen recording span.
221    /// Uses the root client actor for the instance field (the test
222    /// only exercises the recording_span extraction path).
223    #[doc(hidden)]
224    pub fn for_test(py: Python<'_>, recording_span: Option<tracing::Span>) -> PyResult<PyContext> {
225        let mut ctx = Self::_root_client_context(py)?;
226        ctx.recording_span = recording_span;
227        Ok(ctx)
228    }
229}
230
231pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
232    hyperactor_mod.add_class::<PyInstance>()?;
233    hyperactor_mod.add_class::<PyContext>()?;
234    Ok(())
235}