monarch_hyperactor/
supervision.rs1use async_trait::async_trait;
10use hyperactor::Instance;
11use hyperactor_mesh::supervision::MeshFailure;
12use pyo3::exceptions::PyRuntimeError;
13use pyo3::prelude::*;
14
15use crate::actor::PythonActor;
16
17#[async_trait]
23pub trait Supervisable: Send + Sync {
24 async fn supervision_event(&self, instance: &Instance<PythonActor>) -> Option<PyErr>;
29}
30
31#[pyclass(
32 name = "SupervisionError",
33 module = "monarch._rust_bindings.monarch_hyperactor.supervision",
34 extends = PyRuntimeError
35)]
36#[derive(Clone, Debug)]
37pub struct SupervisionError {
38 #[pyo3(set)]
39 pub endpoint: Option<String>,
40 pub message: String,
41}
42
43#[pymethods]
44impl SupervisionError {
45 #[new]
46 #[pyo3(signature = (message, endpoint=None))]
47 fn new(message: String, endpoint: Option<String>) -> Self {
48 SupervisionError { endpoint, message }
49 }
50
51 #[staticmethod]
52 pub fn new_err(message: String) -> PyErr {
53 PyErr::new::<Self, _>(message)
54 }
55
56 #[staticmethod]
57 pub fn new_err_from_endpoint(message: String, endpoint: String) -> PyErr {
58 PyErr::new::<Self, _>((message, Some(endpoint)))
59 }
60
61 fn __str__(&self) -> String {
62 if let Some(ep) = &self.endpoint {
63 format!("Endpoint call {} failed, {}", ep, self.message)
64 } else {
65 self.message.clone()
66 }
67 }
68
69 fn __repr__(&self) -> String {
70 if let Some(ep) = &self.endpoint {
71 format!("SupervisionError(endpoint='{}', '{}')", ep, self.message)
72 } else {
73 format!("SupervisionError('{}')", self.message)
74 }
75 }
76}
77
78impl SupervisionError {
79 #[allow(dead_code)]
81 pub(crate) fn new_err_from(failure: MeshFailure) -> PyErr {
82 let event = failure.event;
83 Self::new_err(format!(
84 "Actor {} exited because of the following reason: {}",
85 event.actor_id, event,
86 ))
87 }
88 pub fn set_endpoint_on_err(py: Python<'_>, err: PyErr, endpoint: String) -> PyErr {
93 if let Ok(supervision_err) = err.value(py).extract::<SupervisionError>() {
94 Self::new_err_from_endpoint(supervision_err.message, endpoint)
95 } else {
96 err
97 }
98 }
99}
100
101#[derive(Clone, Debug)]
103#[pyclass(
104 name = "MeshFailure",
105 module = "monarch._rust_bindings.monarch_hyperactor.supervision"
106)]
107pub struct PyMeshFailure {
108 pub inner: MeshFailure,
109}
110
111impl PyMeshFailure {
112 pub fn new(failure: MeshFailure) -> Self {
113 Self { inner: failure }
114 }
115}
116
117impl From<MeshFailure> for PyMeshFailure {
118 fn from(failure: MeshFailure) -> Self {
119 Self { inner: failure }
120 }
121}
122
123impl std::fmt::Display for PyMeshFailure {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 write!(
126 f,
127 "MeshFailure(mesh_name={}, rank={}, event={})",
128 self.inner
129 .actor_mesh_name
130 .clone()
131 .unwrap_or("<none>".into()),
132 self.inner.rank.map_or("<none>".into(), |r| r.to_string()),
133 self.inner.event
134 )
135 }
136}
137
138#[pymethods]
139impl PyMeshFailure {
140 #[getter]
142 fn mesh(&self) {}
143
144 #[getter]
145 fn mesh_name(&self) -> String {
146 self.inner
147 .actor_mesh_name
148 .clone()
149 .unwrap_or("<none>".into())
150 }
151
152 fn __repr__(&self) -> String {
153 format!("{}", self)
154 }
155
156 fn report(&self) -> String {
157 format!("{}", self.inner.event)
158 }
159}
160
161pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
162 let py = module.py();
164 module.add("SupervisionError", py.get_type::<SupervisionError>())?;
166 module.add("MeshFailure", py.get_type::<PyMeshFailure>())?;
167 Ok(())
168}