monarch_hyperactor/
context.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use hyperactor::Instance;
10use hyperactor::context;
11use hyperactor_mesh::comm::multicast::CastInfo;
12use ndslice::Extent;
13use ndslice::Point;
14use pyo3::exceptions::PyRuntimeError;
15use pyo3::prelude::*;
16
17use crate::actor::PythonActor;
18use crate::actor::root_client_actor;
19use crate::mailbox::PyMailbox;
20use crate::proc::PyActorId;
21use crate::runtime;
22use crate::shape::PyPoint;
23
24#[pyclass(name = "Instance", module = "monarch._src.actor.actor_mesh")]
25pub struct PyInstance {
26    inner: Instance<PythonActor>,
27    #[pyo3(get, set)]
28    proc_mesh: Option<Py<PyAny>>,
29    #[pyo3(get, set, name = "_controller_controller")]
30    controller_controller: Option<Py<PyAny>>,
31    #[pyo3(get, set)]
32    pub(crate) rank: PyPoint,
33    #[pyo3(get, set, name = "_children")]
34    children: Option<Py<PyAny>>,
35
36    #[pyo3(get, set, name = "name")]
37    name: String,
38    #[pyo3(get, set, name = "class_name")]
39    class_name: Option<String>,
40    #[pyo3(get, set, name = "creator")]
41    creator: Option<Py<PyAny>>,
42
43    #[pyo3(get, set, name = "_mock_tensor_engine_factory")]
44    mock_tensor_engine_factory: Option<Py<PyAny>>,
45}
46
47impl Clone for PyInstance {
48    fn clone(&self) -> Self {
49        PyInstance {
50            inner: self.inner.clone_for_py(),
51            proc_mesh: self.proc_mesh.clone(),
52            controller_controller: self.controller_controller.clone(),
53            rank: self.rank.clone(),
54            children: self.children.clone(),
55            name: self.name.clone(),
56            class_name: self.class_name.clone(),
57            creator: self.creator.clone(),
58            mock_tensor_engine_factory: self.mock_tensor_engine_factory.clone(),
59        }
60    }
61}
62
63impl std::ops::Deref for PyInstance {
64    type Target = Instance<PythonActor>;
65
66    fn deref(&self) -> &Self::Target {
67        &self.inner
68    }
69}
70
71#[pymethods]
72impl PyInstance {
73    #[getter]
74    pub(crate) fn _mailbox(&self) -> PyMailbox {
75        PyMailbox {
76            inner: self.inner.mailbox_for_py().clone(),
77        }
78    }
79
80    #[getter]
81    pub fn actor_id(&self) -> PyActorId {
82        self.inner.self_id().clone().into()
83    }
84
85    #[pyo3(signature = (reason = None))]
86    fn abort(&self, reason: Option<&str>) -> PyResult<()> {
87        let reason = reason.unwrap_or("(no reason provided)");
88        Ok(self.inner.abort(reason).map_err(anyhow::Error::from)?)
89    }
90
91    #[pyo3(signature = (reason = None))]
92    fn stop(&self, reason: Option<&str>) -> PyResult<()> {
93        tracing::info!(actor_id = %self.inner.self_id(), "stopping PyInstance");
94        let reason = reason.unwrap_or("(no reason provided)");
95        self.inner
96            .stop(reason)
97            .map_err(|e| PyRuntimeError::new_err(e.to_string()))
98    }
99
100    /// Stop the actor and return a future that resolves when it reaches
101    /// a terminal status (stopped or failed). This ensures all pending
102    /// messages are drained and connections are flushed before returning.
103    #[pyo3(signature = (reason = None))]
104    fn stop_and_wait(&self, reason: Option<&str>) -> PyResult<crate::pytokio::PyPythonTask> {
105        let reason = reason.unwrap_or("shutdown").to_string();
106        let actor_id = self.inner.self_id().clone();
107        let proc = self.inner.proc().clone();
108        crate::pytokio::PyPythonTask::new(async move {
109            let status_rx = proc.stop_actor(&actor_id, reason);
110            if let Some(mut rx) = status_rx {
111                let _ = rx.wait_for(|s| s.is_terminal()).await;
112            }
113            if let Err(e) = proc.flush().await {
114                tracing::warn!(%actor_id, "stop_and_wait: flush failed: {}", e);
115            }
116            Ok(())
117        })
118    }
119
120    /// Mark this actor as system/infrastructure.
121    ///
122    /// **PY-SYS-2:** Python actors use the `_is_system_actor = True`
123    /// class attribute so that this is called during actor init,
124    /// before ProcAgent publishes its first introspection snapshot.
125    fn set_system(&self) {
126        self.inner.set_system();
127    }
128}
129
130impl PyInstance {
131    pub fn into_instance(self) -> Instance<PythonActor> {
132        self.inner
133    }
134}
135
136impl<I: context::Actor<A = PythonActor>> From<I> for PyInstance {
137    fn from(ins: I) -> Self {
138        PyInstance {
139            inner: ins.instance().clone_for_py(),
140            proc_mesh: None,
141            controller_controller: None,
142            rank: PyPoint::new(0, Extent::unity().into()),
143            children: None,
144            name: "root".to_string(),
145            class_name: None,
146            creator: None,
147            mock_tensor_engine_factory: None,
148        }
149    }
150}
151
152#[pyclass(name = "Context", module = "monarch._src.actor.actor_mesh")]
153pub struct PyContext {
154    instance: Py<PyInstance>,
155    rank: Point,
156}
157
158#[pymethods]
159impl PyContext {
160    #[getter]
161    fn actor_instance(&self) -> &Py<PyInstance> {
162        &self.instance
163    }
164
165    #[getter]
166    fn message_rank(&self) -> PyPoint {
167        self.rank.clone().into()
168    }
169
170    #[staticmethod]
171    fn _root_client_context(py: Python<'_>) -> PyResult<PyContext> {
172        let _guard = runtime::get_tokio_runtime().enter();
173        let instance: PyInstance = root_client_actor(py).into();
174        Ok(PyContext {
175            instance: instance.into_pyobject(py)?.into(),
176            rank: Extent::unity().point_of_rank(0).unwrap(),
177        })
178    }
179
180    /// Create a context from an existing instance.
181    /// This is used when the root client was bootstrapped via bootstrap_host()
182    /// instead of the default bootstrap_client().
183    #[staticmethod]
184    fn _from_instance(py: Python<'_>, instance: PyInstance) -> PyResult<PyContext> {
185        Ok(PyContext {
186            instance: instance.into_pyobject(py)?.into(),
187            rank: Extent::unity().point_of_rank(0).unwrap(),
188        })
189    }
190}
191
192impl PyContext {
193    pub(crate) fn new<T: hyperactor::actor::Actor>(
194        cx: &hyperactor::Context<T>,
195        instance: Py<PyInstance>,
196    ) -> PyContext {
197        PyContext {
198            instance,
199            rank: cx.cast_point(),
200        }
201    }
202}
203
204pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
205    hyperactor_mod.add_class::<PyInstance>()?;
206    hyperactor_mod.add_class::<PyContext>()?;
207    Ok(())
208}