monarch_hyperactor/
context.rs1use hyperactor::Instance;
10use hyperactor::context;
11use hyperactor_mesh::comm::multicast::CastInfo;
12use ndslice::Extent;
13use ndslice::Point;
14use pyo3::exceptions::PyRuntimeError;
15use pyo3::prelude::*;
16
17use crate::actor::PythonActor;
18use crate::actor::root_client_actor;
19use crate::mailbox::PyMailbox;
20use crate::proc::PyActorId;
21use crate::runtime;
22use crate::shape::PyPoint;
23
24#[pyclass(name = "Instance", module = "monarch._src.actor.actor_mesh")]
25pub struct PyInstance {
26 inner: Instance<PythonActor>,
27 #[pyo3(get, set)]
28 proc_mesh: Option<Py<PyAny>>,
29 #[pyo3(get, set, name = "_controller_controller")]
30 controller_controller: Option<Py<PyAny>>,
31 #[pyo3(get, set)]
32 pub(crate) rank: PyPoint,
33 #[pyo3(get, set, name = "_children")]
34 children: Option<Py<PyAny>>,
35
36 #[pyo3(get, set, name = "name")]
37 name: String,
38 #[pyo3(get, set, name = "class_name")]
39 class_name: Option<String>,
40 #[pyo3(get, set, name = "creator")]
41 creator: Option<Py<PyAny>>,
42
43 #[pyo3(get, set, name = "_mock_tensor_engine_factory")]
44 mock_tensor_engine_factory: Option<Py<PyAny>>,
45}
46
47impl Clone for PyInstance {
48 fn clone(&self) -> Self {
49 PyInstance {
50 inner: self.inner.clone_for_py(),
51 proc_mesh: self.proc_mesh.clone(),
52 controller_controller: self.controller_controller.clone(),
53 rank: self.rank.clone(),
54 children: self.children.clone(),
55 name: self.name.clone(),
56 class_name: self.class_name.clone(),
57 creator: self.creator.clone(),
58 mock_tensor_engine_factory: self.mock_tensor_engine_factory.clone(),
59 }
60 }
61}
62
63impl std::ops::Deref for PyInstance {
64 type Target = Instance<PythonActor>;
65
66 fn deref(&self) -> &Self::Target {
67 &self.inner
68 }
69}
70
71#[pymethods]
72impl PyInstance {
73 #[getter]
74 pub(crate) fn _mailbox(&self) -> PyMailbox {
75 PyMailbox {
76 inner: self.inner.mailbox_for_py().clone(),
77 }
78 }
79
80 #[getter]
81 pub fn actor_id(&self) -> PyActorId {
82 self.inner.self_id().clone().into()
83 }
84
85 #[pyo3(signature = (reason = None))]
86 fn abort(&self, reason: Option<&str>) -> PyResult<()> {
87 let reason = reason.unwrap_or("(no reason provided)");
88 Ok(self.inner.abort(reason).map_err(anyhow::Error::from)?)
89 }
90
91 #[pyo3(signature = (reason = None))]
92 fn stop(&self, reason: Option<&str>) -> PyResult<()> {
93 tracing::info!(actor_id = %self.inner.self_id(), "stopping PyInstance");
94 let reason = reason.unwrap_or("(no reason provided)");
95 self.inner
96 .stop(reason)
97 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
98 }
99
100 #[pyo3(signature = (reason = None))]
104 fn stop_and_wait(&self, reason: Option<&str>) -> PyResult<crate::pytokio::PyPythonTask> {
105 let reason = reason.unwrap_or("shutdown").to_string();
106 let actor_id = self.inner.self_id().clone();
107 let proc = self.inner.proc().clone();
108 crate::pytokio::PyPythonTask::new(async move {
109 let status_rx = proc.stop_actor(&actor_id, reason);
110 if let Some(mut rx) = status_rx {
111 let _ = rx.wait_for(|s| s.is_terminal()).await;
112 }
113 if let Err(e) = proc.flush().await {
114 tracing::warn!(%actor_id, "stop_and_wait: flush failed: {}", e);
115 }
116 Ok(())
117 })
118 }
119
120 fn set_system(&self) {
126 self.inner.set_system();
127 }
128}
129
130impl PyInstance {
131 pub fn into_instance(self) -> Instance<PythonActor> {
132 self.inner
133 }
134}
135
136impl<I: context::Actor<A = PythonActor>> From<I> for PyInstance {
137 fn from(ins: I) -> Self {
138 PyInstance {
139 inner: ins.instance().clone_for_py(),
140 proc_mesh: None,
141 controller_controller: None,
142 rank: PyPoint::new(0, Extent::unity().into()),
143 children: None,
144 name: "root".to_string(),
145 class_name: None,
146 creator: None,
147 mock_tensor_engine_factory: None,
148 }
149 }
150}
151
152#[pyclass(name = "Context", module = "monarch._src.actor.actor_mesh")]
153pub struct PyContext {
154 instance: Py<PyInstance>,
155 rank: Point,
156}
157
158#[pymethods]
159impl PyContext {
160 #[getter]
161 fn actor_instance(&self) -> &Py<PyInstance> {
162 &self.instance
163 }
164
165 #[getter]
166 fn message_rank(&self) -> PyPoint {
167 self.rank.clone().into()
168 }
169
170 #[staticmethod]
171 fn _root_client_context(py: Python<'_>) -> PyResult<PyContext> {
172 let _guard = runtime::get_tokio_runtime().enter();
173 let instance: PyInstance = root_client_actor(py).into();
174 Ok(PyContext {
175 instance: instance.into_pyobject(py)?.into(),
176 rank: Extent::unity().point_of_rank(0).unwrap(),
177 })
178 }
179
180 #[staticmethod]
184 fn _from_instance(py: Python<'_>, instance: PyInstance) -> PyResult<PyContext> {
185 Ok(PyContext {
186 instance: instance.into_pyobject(py)?.into(),
187 rank: Extent::unity().point_of_rank(0).unwrap(),
188 })
189 }
190}
191
192impl PyContext {
193 pub(crate) fn new<T: hyperactor::actor::Actor>(
194 cx: &hyperactor::Context<T>,
195 instance: Py<PyInstance>,
196 ) -> PyContext {
197 PyContext {
198 instance,
199 rank: cx.cast_point(),
200 }
201 }
202}
203
204pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
205 hyperactor_mod.add_class::<PyInstance>()?;
206 hyperactor_mod.add_class::<PyContext>()?;
207 Ok(())
208}