monarch_hyperactor/
context.rs1use hyperactor::Instance;
10use hyperactor::context;
11use hyperactor_mesh::comm::multicast::CastInfo;
12use ndslice::Extent;
13use ndslice::Point;
14use pyo3::exceptions::PyRuntimeError;
15use pyo3::prelude::*;
16
17use crate::actor::PythonActor;
18use crate::actor::root_client_actor;
19use crate::mailbox::PyMailbox;
20use crate::proc::PyActorAddr;
21use crate::runtime;
22use crate::shape::PyPoint;
23
24#[pyclass(name = "Instance", module = "monarch._src.actor.actor_mesh")]
25pub struct PyInstance {
26 inner: Instance<PythonActor>,
27 #[pyo3(get, set)]
28 proc_mesh: Option<Py<PyAny>>,
29 #[pyo3(get, set, name = "_controller_controller")]
30 controller_controller: Option<Py<PyAny>>,
31 #[pyo3(get, set)]
32 pub(crate) rank: PyPoint,
33 #[pyo3(get, set, name = "_children")]
34 children: Option<Py<PyAny>>,
35
36 #[pyo3(get, set, name = "name")]
37 name: String,
38 #[pyo3(get, set, name = "class_name")]
39 class_name: Option<String>,
40 #[pyo3(get, set, name = "creator")]
41 creator: Option<Py<PyAny>>,
42
43 #[pyo3(get, set, name = "_mock_tensor_engine_factory")]
44 mock_tensor_engine_factory: Option<Py<PyAny>>,
45}
46
47impl Clone for PyInstance {
48 fn clone(&self) -> Self {
49 PyInstance {
50 inner: self.inner.clone_for_py(),
51 proc_mesh: self.proc_mesh.clone(),
52 controller_controller: self.controller_controller.clone(),
53 rank: self.rank.clone(),
54 children: self.children.clone(),
55 name: self.name.clone(),
56 class_name: self.class_name.clone(),
57 creator: self.creator.clone(),
58 mock_tensor_engine_factory: self.mock_tensor_engine_factory.clone(),
59 }
60 }
61}
62
63impl std::ops::Deref for PyInstance {
64 type Target = Instance<PythonActor>;
65
66 fn deref(&self) -> &Self::Target {
67 &self.inner
68 }
69}
70
71#[pymethods]
72impl PyInstance {
73 #[getter]
74 pub(crate) fn _mailbox(&self) -> PyMailbox {
75 PyMailbox {
76 inner: self.inner.mailbox_for_py().clone(),
77 }
78 }
79
80 #[getter]
81 pub fn actor_id(&self) -> PyActorAddr {
82 let actor_id: hyperactor::ActorAddr = self.inner.self_addr().clone();
83 actor_id.into()
84 }
85
86 #[pyo3(signature = (reason = None))]
87 fn abort(&self, reason: Option<&str>) -> PyResult<()> {
88 let reason = reason.unwrap_or("(no reason provided)");
89 Ok(self.inner.abort(reason).map_err(anyhow::Error::from)?)
90 }
91
92 #[pyo3(signature = (reason = None))]
93 fn stop(&self, reason: Option<&str>) -> PyResult<()> {
94 tracing::info!(actor_id = %self.inner.self_addr(), "stopping PyInstance");
95 let reason = reason.unwrap_or("(no reason provided)");
96 self.inner
97 .stop(reason)
98 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
99 }
100
101 #[pyo3(signature = (reason = None))]
105 fn stop_and_wait(&self, reason: Option<&str>) -> PyResult<crate::pytokio::PyPythonTask> {
106 let reason = reason.unwrap_or("shutdown").to_string();
107 let actor_id = self.inner.self_addr().clone();
108 let proc = self.inner.proc().clone();
109 crate::pytokio::PyPythonTask::new(async move {
110 let status_rx = proc.stop_actor(actor_id.id(), reason);
111 if let Some(mut rx) = status_rx {
112 let _ = rx.wait_for(|s| s.is_terminal()).await;
113 }
114 if let Err(e) = proc.flush().await {
115 tracing::warn!(%actor_id, "stop_and_wait: flush failed: {}", e);
116 }
117 Ok(())
118 })
119 }
120
121 fn set_system(&self) {
127 self.inner.set_system();
128 }
129}
130
131impl PyInstance {
132 pub fn into_instance(self) -> Instance<PythonActor> {
133 self.inner
134 }
135}
136
137impl<I: context::Actor<A = PythonActor>> From<I> for PyInstance {
138 fn from(ins: I) -> Self {
139 PyInstance {
140 inner: ins.instance().clone_for_py(),
141 proc_mesh: None,
142 controller_controller: None,
143 rank: PyPoint::new(0, Extent::unity().into()),
144 children: None,
145 name: "root".to_string(),
146 class_name: None,
147 creator: None,
148 mock_tensor_engine_factory: None,
149 }
150 }
151}
152
153#[pyclass(name = "Context", module = "monarch._src.actor.actor_mesh")]
154pub struct PyContext {
155 instance: Py<PyInstance>,
156 rank: Point,
157 recording_span: Option<tracing::Span>,
162}
163
164#[pymethods]
165impl PyContext {
166 #[getter]
167 fn actor_instance(&self) -> &Py<PyInstance> {
168 &self.instance
169 }
170
171 #[getter]
172 fn message_rank(&self) -> PyPoint {
173 self.rank.clone().into()
174 }
175
176 #[staticmethod]
177 fn _root_client_context(py: Python<'_>) -> PyResult<PyContext> {
178 let _guard = runtime::get_tokio_runtime().enter();
179 let instance: PyInstance = root_client_actor(py).into();
180 Ok(PyContext {
181 instance: instance.into_pyobject(py)?.into(),
182 rank: Extent::unity().point_of_rank(0).unwrap(),
183 recording_span: None,
184 })
185 }
186
187 #[staticmethod]
191 fn _from_instance(py: Python<'_>, instance: PyInstance) -> PyResult<PyContext> {
192 Ok(PyContext {
193 instance: instance.into_pyobject(py)?.into(),
194 rank: Extent::unity().point_of_rank(0).unwrap(),
195 recording_span: None,
196 })
197 }
198}
199
200impl PyContext {
201 pub(crate) fn new<T: hyperactor::actor::Actor>(
202 cx: &hyperactor::Context<T>,
203 instance: Py<PyInstance>,
204 ) -> PyContext {
205 PyContext {
206 instance,
207 rank: cx.cast_point(),
208 recording_span: Some(cx.recording_span()),
209 }
210 }
211
212 pub(crate) fn recording_span(&self) -> Option<&tracing::Span> {
217 self.recording_span.as_ref()
218 }
219
220 #[doc(hidden)]
224 pub fn for_test(py: Python<'_>, recording_span: Option<tracing::Span>) -> PyResult<PyContext> {
225 let mut ctx = Self::_root_client_context(py)?;
226 ctx.recording_span = recording_span;
227 Ok(ctx)
228 }
229}
230
231pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
232 hyperactor_mod.add_class::<PyInstance>()?;
233 hyperactor_mod.add_class::<PyContext>()?;
234 Ok(())
235}