monarch_hyperactor/
supervision.rs1use async_trait::async_trait;
17use hyperactor::Instance;
18use hyperactor_mesh::supervision::MeshFailure;
19use pyo3::exceptions::PyRuntimeError;
20use pyo3::prelude::*;
21
22use crate::actor::PythonActor;
23
24#[async_trait]
30pub trait Supervisable: Send + Sync {
31 async fn supervision_event(&self, instance: &Instance<PythonActor>) -> Option<PyErr>;
36}
37
38#[pyclass(
39 name = "SupervisionError",
40 module = "monarch._rust_bindings.monarch_hyperactor.supervision",
41 extends = PyRuntimeError
42)]
43#[derive(Clone, Debug)]
44pub struct SupervisionError {
45 #[pyo3(set)]
46 pub endpoint: Option<String>,
47 pub message: String,
48}
49
50#[pymethods]
51impl SupervisionError {
52 #[new]
53 #[pyo3(signature = (message, endpoint=None))]
54 fn new(message: String, endpoint: Option<String>) -> Self {
55 SupervisionError { endpoint, message }
56 }
57
58 #[staticmethod]
59 pub fn new_err(message: String) -> PyErr {
60 PyErr::new::<Self, _>(message)
61 }
62
63 #[staticmethod]
64 pub fn new_err_from_endpoint(message: String, endpoint: String) -> PyErr {
65 PyErr::new::<Self, _>((message, Some(endpoint)))
66 }
67
68 fn __str__(&self) -> String {
69 if let Some(ep) = &self.endpoint {
70 format!("Endpoint call {} failed, {}", ep, self.message)
71 } else {
72 self.message.clone()
73 }
74 }
75
76 fn __repr__(&self) -> String {
77 if let Some(ep) = &self.endpoint {
78 format!("SupervisionError(endpoint='{}', '{}')", ep, self.message)
79 } else {
80 format!("SupervisionError('{}')", self.message)
81 }
82 }
83}
84
85impl SupervisionError {
86 #[allow(dead_code)]
88 pub(crate) fn new_err_from(failure: MeshFailure) -> PyErr {
89 let event = failure.event;
90 let message = event
91 .failure_report()
92 .unwrap_or_else(|| format!("{}", event));
93 Self::new_err(message)
94 }
95 pub fn set_endpoint_on_err(py: Python<'_>, err: PyErr, endpoint: String) -> PyErr {
100 if let Ok(supervision_err) = err.value(py).extract::<SupervisionError>() {
101 Self::new_err_from_endpoint(supervision_err.message, endpoint)
102 } else {
103 err
104 }
105 }
106}
107
108#[derive(Clone, Debug)]
110#[pyclass(
111 name = "MeshFailure",
112 module = "monarch._rust_bindings.monarch_hyperactor.supervision"
113)]
114pub struct PyMeshFailure {
115 pub inner: MeshFailure,
116}
117
118impl PyMeshFailure {
119 pub fn new(failure: MeshFailure) -> Self {
120 Self { inner: failure }
121 }
122}
123
124impl From<MeshFailure> for PyMeshFailure {
125 fn from(failure: MeshFailure) -> Self {
126 Self { inner: failure }
127 }
128}
129
130impl std::fmt::Display for PyMeshFailure {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 write!(
133 f,
134 "MeshFailure(mesh_name={}, crashed_ranks={:?}, event={})",
135 self.inner
136 .actor_mesh_name
137 .clone()
138 .unwrap_or("<none>".into()),
139 self.inner.crashed_ranks,
140 self.inner.event
141 )
142 }
143}
144
145#[pymethods]
146impl PyMeshFailure {
147 #[getter]
149 fn mesh(&self) {}
150
151 #[getter]
152 fn mesh_name(&self) -> String {
153 self.inner
154 .actor_mesh_name
155 .clone()
156 .unwrap_or("<none>".into())
157 }
158
159 fn __repr__(&self) -> String {
160 format!("{}", self)
161 }
162
163 fn report(&self) -> String {
164 self.inner
165 .event
166 .failure_report()
167 .unwrap_or_else(|| format!("{}", self.inner.event))
168 }
169}
170
171pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
172 let py = module.py();
174 module.add("SupervisionError", py.get_type::<SupervisionError>())?;
176 module.add("MeshFailure", py.get_type::<PyMeshFailure>())?;
177 Ok(())
178}