hyperactor/
supervision.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Messages used in supervision.
10
11use std::fmt;
12use std::fmt::Debug;
13use std::fmt::Write;
14use std::time::SystemTime;
15
16use derivative::Derivative;
17use hyperactor::clock::Clock;
18use hyperactor::clock::RealClock;
19use indenter::indented;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate as hyperactor; // for macros
24use crate::Named;
25use crate::actor::ActorErrorKind;
26use crate::actor::ActorStatus;
27use crate::attrs::Attrs;
28use crate::reference::ActorId;
29
30/// This is the local actor supervision event. Child actor will propagate this event to its parent.
31#[derive(Clone, Debug, Derivative, Serialize, Deserialize, Named)]
32#[derivative(PartialEq, Eq)]
33pub struct ActorSupervisionEvent {
34    /// The actor id of the child actor where the event is triggered.
35    pub actor_id: ActorId,
36    /// Friendly display name, if the actor class customized it.
37    pub display_name: Option<String>,
38    /// The time when the event is triggered.
39    #[derivative(PartialEq = "ignore")]
40    pub occurred_at: SystemTime,
41    /// Status of the child actor.
42    pub actor_status: ActorStatus,
43    /// If this event is associated with a message, the message headers.
44    #[derivative(PartialEq = "ignore")]
45    pub message_headers: Option<Attrs>,
46}
47
48impl ActorSupervisionEvent {
49    /// Create a new supervision event. Timestamp is set to the current time.
50    pub fn new(
51        actor_id: ActorId,
52        display_name: Option<String>,
53        actor_status: ActorStatus,
54        message_headers: Option<Attrs>,
55    ) -> Self {
56        Self {
57            actor_id,
58            display_name,
59            occurred_at: RealClock.system_time_now(),
60            actor_status,
61            message_headers,
62        }
63    }
64
65    fn actor_name(&self) -> String {
66        self.display_name
67            .clone()
68            .unwrap_or_else(|| self.actor_id.to_string())
69    }
70
71    fn actually_failing_actor(&self) -> &ActorSupervisionEvent {
72        let mut event = self;
73        while let ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(e)) =
74            &event.actor_status
75        {
76            event = e;
77        }
78        event
79    }
80
81    /// This event is for a a supervision error.
82    pub fn is_error(&self) -> bool {
83        self.actor_status.is_failed()
84    }
85}
86
87impl std::error::Error for ActorSupervisionEvent {}
88
89fn fmt_status<'a>(
90    actor_id: &ActorId,
91    status: &'a ActorStatus,
92    f: &mut fmt::Formatter<'_>,
93) -> Result<Option<&'a ActorSupervisionEvent>, fmt::Error> {
94    let mut f = indented(f).with_str(" ");
95
96    match status {
97        ActorStatus::Stopped if actor_id.name() == "agent" => {
98            // Host agent stopped - use simplified message from D86984496
99            let name = match actor_id.proc_id() {
100                crate::reference::ProcId::Direct(addr, _) => addr.to_string(),
101                crate::reference::ProcId::Ranked(_, _) => actor_id.proc_id().to_string(),
102            };
103            write!(
104                f,
105                "The process {} owned by this actor became unresponsive and is assumed dead, check the log on the host for details",
106                name
107            )?;
108            Ok(None)
109        }
110        ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
111            msg,
112            during_handling_of,
113        )) => {
114            write!(f, "{}", msg)?;
115            Ok(Some(during_handling_of))
116        }
117        ActorStatus::Failed(ActorErrorKind::Generic(msg)) => {
118            write!(f, "{}", msg)?;
119            Ok(None)
120        }
121        status => {
122            write!(f, "{}", status)?;
123            Ok(None)
124        }
125    }
126}
127
128impl fmt::Display for ActorSupervisionEvent {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        let actor_name = self.actor_name();
131        writeln!(
132            f,
133            "The actor {} and all its descendants have failed.",
134            actor_name
135        )?;
136        let failing_event = self.actually_failing_actor();
137        let failing_actor = failing_event.actor_name();
138        let its_name = if failing_actor == actor_name {
139            "itself"
140        } else {
141            &failing_actor
142        };
143        writeln!(f, "This occurred because the actor {} failed.", its_name)?;
144        writeln!(f, "The error was:")?;
145        let during_handling_of =
146            fmt_status(&failing_event.actor_id, &failing_event.actor_status, f)?;
147        if let Some(event) = during_handling_of {
148            writeln!(
149                f,
150                "This error occurred during the handling of another failure:"
151            )?;
152            fmt::Display::fmt(event, f)?;
153        }
154        Ok(())
155    }
156}