hyperactor_multiprocess/
supervision.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10
11use enum_as_inner::EnumAsInner;
12use hyperactor::HandleClient;
13use hyperactor::Handler;
14use hyperactor::Named;
15use hyperactor::OncePortRef;
16use hyperactor::RefClient;
17use hyperactor::channel::ChannelAddr;
18use hyperactor::reference::ActorId;
19use hyperactor::reference::Index;
20use hyperactor::reference::ProcId;
21use hyperactor::reference::WorldId;
22use serde::Deserialize;
23use serde::Serialize;
24
25/// Supervision message used to collect supervision state of a world.
26#[derive(
27    Handler,
28    HandleClient,
29    RefClient,
30    Serialize,
31    Deserialize,
32    Debug,
33    Clone,
34    PartialEq,
35    EnumAsInner,
36    Named
37)]
38pub enum WorldSupervisionMessage {
39    /// Request supervision state of a world. The reply will be sent back via
40    /// the once port ref in the message. None result indicates the world isn't
41    /// managed by the system.
42    State(WorldId, #[reply] OncePortRef<Option<WorldSupervisionState>>),
43}
44
45/// The supervision state of a world. It contains the supervision state of
46/// all procs in the world.
47#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Named)]
48pub struct WorldSupervisionState {
49    /// A map from proc id to proc supervision state.
50    pub procs: HashMap<Index, ProcSupervisionState>,
51}
52
53impl WorldSupervisionState {
54    /// Return whether this world is healthy, world is healthy if all its procs are healthy.
55    pub fn is_healthy(&self) -> bool {
56        self.procs.values().all(ProcSupervisionState::is_healthy)
57    }
58}
59
60/// Message to communicate proc supervision state.
61#[derive(
62    Handler,
63    HandleClient,
64    RefClient,
65    Serialize,
66    Deserialize,
67    Debug,
68    Clone,
69    PartialEq,
70    EnumAsInner,
71    Named
72)]
73pub enum ProcSupervisionMessage {
74    /// Update proc supervision state. The reply will be sent back via the once
75    /// port ref in the message to indicate whether the message receiver is
76    /// healthy or not.
77    Update(ProcSupervisionState, #[reply] OncePortRef<()>),
78}
79
80/// The health of a proc.
81#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Named)]
82pub enum ProcStatus {
83    /// No known issues.
84    Alive,
85
86    /// The proc hasn't provided any supervision updates in a
87    /// reasonable time.
88    Expired,
89
90    /// A failure to obtain a TCP/IP connection to the proc was
91    /// encountered.
92    ConnectionFailure,
93}
94
95impl std::fmt::Display for ProcStatus {
96    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
97        let description = match self {
98            ProcStatus::Alive => "Alive",
99            ProcStatus::Expired => "Expired",
100            ProcStatus::ConnectionFailure => "Connection failure",
101        };
102        write!(f, "{}", description)
103    }
104}
105
106/// The supervision state of a proc. It contains the supervision state of
107/// actors in the proc. This message is used for both supervision update and
108/// supervision state query.
109#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Named)]
110pub struct ProcSupervisionState {
111    /// The world to which this proc belongs.
112    pub world_id: WorldId,
113    /// The proc id.
114    pub proc_id: ProcId,
115    /// Address of the proc.
116    pub proc_addr: ChannelAddr,
117    /// The proc health.
118    pub proc_health: ProcStatus,
119    /// Contains the supervision state of (failed) actors in the proc.
120    /// Actors can appear more than once here if they have multiple failures
121    pub failed_actors: Vec<(ActorId, hyperactor::actor::ActorStatus)>,
122}
123
124impl ProcSupervisionState {
125    /// Returns whether this proc has any failed actors.
126    pub fn has_failed_actor(&self) -> bool {
127        !self.failed_actors.is_empty()
128    }
129
130    /// Return whether this proc is healthy, proc is alive and there is not failed actor.
131    pub fn is_healthy(&self) -> bool {
132        matches!(self.proc_health, ProcStatus::Alive) && !self.has_failed_actor()
133    }
134}
135
136hyperactor::alias!(ProcSupervisor, ProcSupervisionMessage); // For proc supervisor to implement (e.g. system actor)
137hyperactor::alias!(WorldSupervisor, WorldSupervisionMessage); // For world supervisor to implement (e.g. system actor)
138hyperactor::alias!(SupervisionClient, WorldSupervisionState); // For the end receiver of supervision events to implement (e.g. client)