hyperactor_multiprocess/
supervision.rs1use std::collections::HashMap;
10
11use enum_as_inner::EnumAsInner;
12use hyperactor::HandleClient;
13use hyperactor::Handler;
14use hyperactor::Named;
15use hyperactor::OncePortRef;
16use hyperactor::RefClient;
17use hyperactor::channel::ChannelAddr;
18use hyperactor::reference::ActorId;
19use hyperactor::reference::Index;
20use hyperactor::reference::ProcId;
21use hyperactor::reference::WorldId;
22use serde::Deserialize;
23use serde::Serialize;
24
25#[derive(
27 Handler,
28 HandleClient,
29 RefClient,
30 Serialize,
31 Deserialize,
32 Debug,
33 Clone,
34 PartialEq,
35 EnumAsInner,
36 Named
37)]
38pub enum WorldSupervisionMessage {
39 State(WorldId, #[reply] OncePortRef<Option<WorldSupervisionState>>),
43}
44
45#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Named)]
48pub struct WorldSupervisionState {
49 pub procs: HashMap<Index, ProcSupervisionState>,
51}
52
53impl WorldSupervisionState {
54 pub fn is_healthy(&self) -> bool {
56 self.procs.values().all(ProcSupervisionState::is_healthy)
57 }
58}
59
60#[derive(
62 Handler,
63 HandleClient,
64 RefClient,
65 Serialize,
66 Deserialize,
67 Debug,
68 Clone,
69 PartialEq,
70 EnumAsInner,
71 Named
72)]
73pub enum ProcSupervisionMessage {
74 Update(ProcSupervisionState, #[reply] OncePortRef<()>),
78}
79
80#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Named)]
82pub enum ProcStatus {
83 Alive,
85
86 Expired,
89
90 ConnectionFailure,
93}
94
95impl std::fmt::Display for ProcStatus {
96 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
97 let description = match self {
98 ProcStatus::Alive => "Alive",
99 ProcStatus::Expired => "Expired",
100 ProcStatus::ConnectionFailure => "Connection failure",
101 };
102 write!(f, "{}", description)
103 }
104}
105
106#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Named)]
110pub struct ProcSupervisionState {
111 pub world_id: WorldId,
113 pub proc_id: ProcId,
115 pub proc_addr: ChannelAddr,
117 pub proc_health: ProcStatus,
119 pub failed_actors: Vec<(ActorId, hyperactor::actor::ActorStatus)>,
122}
123
124impl ProcSupervisionState {
125 pub fn has_failed_actor(&self) -> bool {
127 !self.failed_actors.is_empty()
128 }
129
130 pub fn is_healthy(&self) -> bool {
132 matches!(self.proc_health, ProcStatus::Alive) && !self.has_failed_actor()
133 }
134}
135
136hyperactor::alias!(ProcSupervisor, ProcSupervisionMessage); hyperactor::alias!(WorldSupervisor, WorldSupervisionMessage); hyperactor::alias!(SupervisionClient, WorldSupervisionState);