Skip to main content

monarch_hyperactor/
supervision.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Python-facing supervision boundary. `SupervisionError`'s
10//! pyclass shape is unchanged by the mesh-name plumbing in this
11//! diff; any rendered improvement users see in the Python
12//! exception comes from the upstream `Display` chain once mesh
13//! name and Python-class `display_name` are populated by their
14//! producer sites.
15
16use async_trait::async_trait;
17use hyperactor::Instance;
18use hyperactor_mesh::supervision::MeshFailure;
19use pyo3::exceptions::PyRuntimeError;
20use pyo3::prelude::*;
21
22use crate::actor::PythonActor;
23
24/// Trait for types that can provide supervision events.
25///
26/// This trait abstracts the supervision functionality, allowing endpoint
27/// operations to work with any type that can monitor actor health without
28/// depending on the full ActorMesh interface.
29#[async_trait]
30pub trait Supervisable: Send + Sync {
31    /// Wait for the next supervision event indicating an actor failure.
32    ///
33    /// Returns `Some(PyErr)` if a supervision failure is detected,
34    /// or `None` if supervision is not available or the mesh is healthy.
35    async fn supervision_event(&self, instance: &Instance<PythonActor>) -> Option<PyErr>;
36}
37
38#[pyclass(
39    name = "SupervisionError",
40    module = "monarch._rust_bindings.monarch_hyperactor.supervision",
41    extends = PyRuntimeError
42)]
43#[derive(Clone, Debug)]
44pub struct SupervisionError {
45    #[pyo3(set)]
46    pub endpoint: Option<String>,
47    pub message: String,
48}
49
50#[pymethods]
51impl SupervisionError {
52    #[new]
53    #[pyo3(signature = (message, endpoint=None))]
54    fn new(message: String, endpoint: Option<String>) -> Self {
55        SupervisionError { endpoint, message }
56    }
57
58    #[staticmethod]
59    pub fn new_err(message: String) -> PyErr {
60        PyErr::new::<Self, _>(message)
61    }
62
63    #[staticmethod]
64    pub fn new_err_from_endpoint(message: String, endpoint: String) -> PyErr {
65        PyErr::new::<Self, _>((message, Some(endpoint)))
66    }
67
68    fn __str__(&self) -> String {
69        if let Some(ep) = &self.endpoint {
70            format!("Endpoint call {} failed, {}", ep, self.message)
71        } else {
72            self.message.clone()
73        }
74    }
75
76    fn __repr__(&self) -> String {
77        if let Some(ep) = &self.endpoint {
78            format!("SupervisionError(endpoint='{}', '{}')", ep, self.message)
79        } else {
80            format!("SupervisionError('{}')", self.message)
81        }
82    }
83}
84
85impl SupervisionError {
86    // Not From<MeshFailure> because the return type needs to be PyErr.
87    #[allow(dead_code)]
88    pub(crate) fn new_err_from(failure: MeshFailure) -> PyErr {
89        let event = failure.event;
90        let message = event
91            .failure_report()
92            .unwrap_or_else(|| format!("{}", event));
93        Self::new_err(message)
94    }
95    /// Set the endpoint on a PyErr containing a SupervisionError.
96    ///
97    /// If the error is a SupervisionError, sets its endpoint field and returns a new
98    /// error with the endpoint prefix. If not a SupervisionError, returns the original error.
99    pub fn set_endpoint_on_err(py: Python<'_>, err: PyErr, endpoint: String) -> PyErr {
100        if let Ok(supervision_err) = err.value(py).extract::<SupervisionError>() {
101            Self::new_err_from_endpoint(supervision_err.message, endpoint)
102        } else {
103            err
104        }
105    }
106}
107
108// TODO: find out how to extend a Python exception and have internal data.
109#[derive(Clone, Debug)]
110#[pyclass(
111    name = "MeshFailure",
112    module = "monarch._rust_bindings.monarch_hyperactor.supervision"
113)]
114pub struct PyMeshFailure {
115    pub inner: MeshFailure,
116}
117
118impl PyMeshFailure {
119    pub fn new(failure: MeshFailure) -> Self {
120        Self { inner: failure }
121    }
122}
123
124impl From<MeshFailure> for PyMeshFailure {
125    fn from(failure: MeshFailure) -> Self {
126        Self { inner: failure }
127    }
128}
129
130impl std::fmt::Display for PyMeshFailure {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        write!(
133            f,
134            "MeshFailure(mesh_name={}, crashed_ranks={:?}, event={})",
135            self.inner
136                .actor_mesh_name
137                .clone()
138                .unwrap_or("<none>".into()),
139            self.inner.crashed_ranks,
140            self.inner.event
141        )
142    }
143}
144
145#[pymethods]
146impl PyMeshFailure {
147    // TODO: store and return the mesh object.
148    #[getter]
149    fn mesh(&self) {}
150
151    #[getter]
152    fn mesh_name(&self) -> String {
153        self.inner
154            .actor_mesh_name
155            .clone()
156            .unwrap_or("<none>".into())
157    }
158
159    fn __repr__(&self) -> String {
160        format!("{}", self)
161    }
162
163    fn report(&self) -> String {
164        self.inner
165            .event
166            .failure_report()
167            .unwrap_or_else(|| format!("{}", self.inner.event))
168    }
169}
170
171pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
172    // Get the Python interpreter instance from the module
173    let py = module.py();
174    // Add the exception to the module using its type object
175    module.add("SupervisionError", py.get_type::<SupervisionError>())?;
176    module.add("MeshFailure", py.get_type::<PyMeshFailure>())?;
177    Ok(())
178}