hyperactor/
supervision.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Messages used in supervision.
10
11use std::fmt;
12use std::fmt::Debug;
13use std::fmt::Write;
14use std::time::SystemTime;
15
16use derivative::Derivative;
17use hyperactor_config::Flattrs;
18use indenter::indented;
19use serde::Deserialize;
20use serde::Serialize;
21
22use crate::actor::ActorErrorKind;
23use crate::actor::ActorStatus;
24use crate::reference;
25
26/// This is the local actor supervision event. Child actor will propagate this event to its parent.
27#[derive(Clone, Debug, Derivative, Serialize, Deserialize, typeuri::Named)]
28#[derivative(PartialEq, Eq)]
29pub struct ActorSupervisionEvent {
30    /// The actor id of the child actor where the event is triggered.
31    pub actor_id: reference::ActorId,
32    /// Friendly display name, if the actor class customized it.
33    pub display_name: Option<String>,
34    /// The time when the event is triggered.
35    #[derivative(PartialEq = "ignore")]
36    pub occurred_at: SystemTime,
37    /// Status of the child actor.
38    pub actor_status: ActorStatus,
39    /// If this event is associated with a message, the message headers.
40    #[derivative(PartialEq = "ignore")]
41    pub message_headers: Option<Flattrs>,
42}
43wirevalue::register_type!(ActorSupervisionEvent);
44
45impl ActorSupervisionEvent {
46    /// Create a new supervision event. Timestamp is set to the current time.
47    pub fn new(
48        actor_id: reference::ActorId,
49        display_name: Option<String>,
50        actor_status: ActorStatus,
51        message_headers: Option<Flattrs>,
52    ) -> Self {
53        Self {
54            actor_id,
55            display_name,
56            occurred_at: std::time::SystemTime::now(),
57            actor_status,
58            message_headers,
59        }
60    }
61
62    fn actor_name(&self) -> String {
63        self.display_name
64            .clone()
65            .unwrap_or_else(|| self.actor_id.to_string())
66    }
67
68    /// Walk the `UnhandledSupervisionEvent` chain to find the root-cause
69    /// actor that originally failed.
70    pub fn actually_failing_actor(&self) -> &ActorSupervisionEvent {
71        let mut event = self;
72        while let ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(e)) =
73            &event.actor_status
74        {
75            event = e;
76        }
77        event
78    }
79
80    /// This event is for a a supervision error.
81    pub fn is_error(&self) -> bool {
82        self.actor_status.is_failed()
83    }
84}
85
86impl std::error::Error for ActorSupervisionEvent {}
87
88fn fmt_status<'a>(
89    actor_id: &reference::ActorId,
90    status: &'a ActorStatus,
91    f: &mut fmt::Formatter<'_>,
92) -> Result<Option<&'a ActorSupervisionEvent>, fmt::Error> {
93    let mut f = indented(f).with_str(" ");
94
95    match status {
96        ActorStatus::Stopped(_)
97            if actor_id.name() == "host_agent" || actor_id.name() == "proc_agent" =>
98        {
99            // Host agent stopped - use simplified message from D86984496
100            let name = actor_id.proc_id().addr().to_string();
101            write!(
102                f,
103                "The process {} owned by this actor became unresponsive and is assumed dead, check the log on the host for details",
104                name
105            )?;
106            Ok(None)
107        }
108        ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
109            msg,
110            during_handling_of,
111        )) => {
112            write!(f, "{}", msg)?;
113            Ok(Some(during_handling_of))
114        }
115        ActorStatus::Failed(ActorErrorKind::Generic(msg)) => {
116            write!(f, "{}", msg)?;
117            Ok(None)
118        }
119        status => {
120            write!(f, "{}", status)?;
121            Ok(None)
122        }
123    }
124}
125
126impl fmt::Display for ActorSupervisionEvent {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        let actor_name = self.actor_name();
129        writeln!(
130            f,
131            "The actor {} and all its descendants have failed.",
132            actor_name
133        )?;
134        let failing_event = self.actually_failing_actor();
135        let failing_actor = failing_event.actor_name();
136        let its_name = if failing_actor == actor_name {
137            "itself"
138        } else {
139            &failing_actor
140        };
141        writeln!(f, "This occurred because the actor {} failed.", its_name)?;
142        writeln!(f, "The error was:")?;
143        let during_handling_of =
144            fmt_status(&failing_event.actor_id, &failing_event.actor_status, f)?;
145        if let Some(event) = during_handling_of {
146            writeln!(
147                f,
148                "This error occurred during the handling of another failure:"
149            )?;
150            fmt::Display::fmt(event, f)?;
151        }
152        Ok(())
153    }
154}