monarch_hyperactor/
supervision.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use async_trait::async_trait;
10use hyperactor::Instance;
11use hyperactor_mesh::supervision::MeshFailure;
12use pyo3::exceptions::PyRuntimeError;
13use pyo3::prelude::*;
14
15use crate::actor::PythonActor;
16
17/// Trait for types that can provide supervision events.
18///
19/// This trait abstracts the supervision functionality, allowing endpoint
20/// operations to work with any type that can monitor actor health without
21/// depending on the full ActorMesh interface.
22#[async_trait]
23pub trait Supervisable: Send + Sync {
24    /// Wait for the next supervision event indicating an actor failure.
25    ///
26    /// Returns `Some(PyErr)` if a supervision failure is detected,
27    /// or `None` if supervision is not available or the mesh is healthy.
28    async fn supervision_event(&self, instance: &Instance<PythonActor>) -> Option<PyErr>;
29}
30
31#[pyclass(
32    name = "SupervisionError",
33    module = "monarch._rust_bindings.monarch_hyperactor.supervision",
34    extends = PyRuntimeError
35)]
36#[derive(Clone, Debug)]
37pub struct SupervisionError {
38    #[pyo3(set)]
39    pub endpoint: Option<String>,
40    pub message: String,
41}
42
43#[pymethods]
44impl SupervisionError {
45    #[new]
46    #[pyo3(signature = (message, endpoint=None))]
47    fn new(message: String, endpoint: Option<String>) -> Self {
48        SupervisionError { endpoint, message }
49    }
50
51    #[staticmethod]
52    pub fn new_err(message: String) -> PyErr {
53        PyErr::new::<Self, _>(message)
54    }
55
56    #[staticmethod]
57    pub fn new_err_from_endpoint(message: String, endpoint: String) -> PyErr {
58        PyErr::new::<Self, _>((message, Some(endpoint)))
59    }
60
61    fn __str__(&self) -> String {
62        if let Some(ep) = &self.endpoint {
63            format!("Endpoint call {} failed, {}", ep, self.message)
64        } else {
65            self.message.clone()
66        }
67    }
68
69    fn __repr__(&self) -> String {
70        if let Some(ep) = &self.endpoint {
71            format!("SupervisionError(endpoint='{}', '{}')", ep, self.message)
72        } else {
73            format!("SupervisionError('{}')", self.message)
74        }
75    }
76}
77
78impl SupervisionError {
79    // Not From<MeshFailure> because the return type needs to be PyErr.
80    #[allow(dead_code)]
81    pub(crate) fn new_err_from(failure: MeshFailure) -> PyErr {
82        let event = failure.event;
83        Self::new_err(format!(
84            "Actor {} exited because of the following reason: {}",
85            event.actor_id, event,
86        ))
87    }
88    /// Set the endpoint on a PyErr containing a SupervisionError.
89    ///
90    /// If the error is a SupervisionError, sets its endpoint field and returns a new
91    /// error with the endpoint prefix. If not a SupervisionError, returns the original error.
92    pub fn set_endpoint_on_err(py: Python<'_>, err: PyErr, endpoint: String) -> PyErr {
93        if let Ok(supervision_err) = err.value(py).extract::<SupervisionError>() {
94            Self::new_err_from_endpoint(supervision_err.message, endpoint)
95        } else {
96            err
97        }
98    }
99}
100
101// TODO: find out how to extend a Python exception and have internal data.
102#[derive(Clone, Debug)]
103#[pyclass(
104    name = "MeshFailure",
105    module = "monarch._rust_bindings.monarch_hyperactor.supervision"
106)]
107pub struct PyMeshFailure {
108    pub inner: MeshFailure,
109}
110
111impl PyMeshFailure {
112    pub fn new(failure: MeshFailure) -> Self {
113        Self { inner: failure }
114    }
115}
116
117impl From<MeshFailure> for PyMeshFailure {
118    fn from(failure: MeshFailure) -> Self {
119        Self { inner: failure }
120    }
121}
122
123impl std::fmt::Display for PyMeshFailure {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        write!(
126            f,
127            "MeshFailure(mesh_name={}, rank={}, event={})",
128            self.inner
129                .actor_mesh_name
130                .clone()
131                .unwrap_or("<none>".into()),
132            self.inner.rank.map_or("<none>".into(), |r| r.to_string()),
133            self.inner.event
134        )
135    }
136}
137
138#[pymethods]
139impl PyMeshFailure {
140    // TODO: store and return the mesh object.
141    #[getter]
142    fn mesh(&self) {}
143
144    #[getter]
145    fn mesh_name(&self) -> String {
146        self.inner
147            .actor_mesh_name
148            .clone()
149            .unwrap_or("<none>".into())
150    }
151
152    fn __repr__(&self) -> String {
153        format!("{}", self)
154    }
155
156    fn report(&self) -> String {
157        format!("{}", self.inner.event)
158    }
159}
160
161pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
162    // Get the Python interpreter instance from the module
163    let py = module.py();
164    // Add the exception to the module using its type object
165    module.add("SupervisionError", py.get_type::<SupervisionError>())?;
166    module.add("MeshFailure", py.get_type::<PyMeshFailure>())?;
167    Ok(())
168}