hyperactor_mesh/
supervision.rs1use hyperactor::Bind;
12use hyperactor::Unbind;
13use hyperactor::actor::ActorErrorKind;
14use hyperactor::actor::ActorStatus;
15use hyperactor::context;
16use hyperactor::supervision::ActorSupervisionEvent;
17use serde::Deserialize;
18use serde::Serialize;
19use typeuri::Named;
20
21#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Bind, Unbind)]
24pub struct MeshFailure {
25 pub actor_mesh_name: Option<String>,
27 pub event: ActorSupervisionEvent,
29 pub crashed_ranks: Vec<usize>,
32}
33wirevalue::register_type!(MeshFailure);
34
35impl MeshFailure {
36 pub fn contains_rank(&self, rank: usize) -> bool {
39 self.crashed_ranks.is_empty() || self.crashed_ranks.contains(&rank)
40 }
41
42 pub fn default_handler(&self, cx: &impl context::Actor) -> Result<(), anyhow::Error> {
45 let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(ActorSupervisionEvent::new(
48 cx.instance().self_id().clone(),
49 None,
50 ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(
51 self.event.clone(),
52 ))),
53 None,
54 )));
55 Err(anyhow::Error::new(err))
56 }
57}
58
59impl std::fmt::Display for MeshFailure {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 let actor_mesh_name = self
62 .actor_mesh_name
63 .as_ref()
64 .map(|m| format!(" on mesh \"{}\"", m))
65 .unwrap_or("".to_string());
66 let ranks = if self.crashed_ranks.is_empty() {
67 String::new()
68 } else {
69 format!(" at ranks {:?}", self.crashed_ranks)
70 };
71 write!(
72 f,
73 "failure{}{} with event: {}",
74 actor_mesh_name, ranks, self.event
75 )
76 }
77}
78
79#[derive(Debug, Clone)]
81pub(crate) enum Unhealthy {
82 StreamClosed(MeshFailure), Crashed(MeshFailure), }