hyperactor/
supervision.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Messages used in supervision.
10//!
11//! ## Supervision invariants (SV-*)
12//!
13//! - **SV-1 (root-cause attribution):** For an
14//!   `UnhandledSupervisionEvent` chain, `actually_failing_actor()`
15//!   returns the event that should be treated as the root cause
16//!   for structured failure attribution. In particular, if a
17//!   failed parent wraps a stopped child event, the stopped child
18//!   remains the root cause.
19
20use std::fmt;
21use std::fmt::Debug;
22use std::fmt::Write;
23use std::time::SystemTime;
24
25use derivative::Derivative;
26use hyperactor_config::Flattrs;
27use indenter::indented;
28use serde::Deserialize;
29use serde::Serialize;
30
31use crate::actor::ActorErrorKind;
32use crate::actor::ActorStatus;
33use crate::reference;
34
35/// This is the local actor supervision event. Child actor will propagate this event to its parent.
36#[derive(Clone, Debug, Derivative, Serialize, Deserialize, typeuri::Named)]
37#[derivative(PartialEq, Eq)]
38pub struct ActorSupervisionEvent {
39    /// The actor id of the child actor where the event is triggered.
40    pub actor_id: reference::ActorId,
41    /// Friendly display name, if the actor class customized it.
42    pub display_name: Option<String>,
43    /// The time when the event is triggered.
44    #[derivative(PartialEq = "ignore")]
45    pub occurred_at: SystemTime,
46    /// Status of the child actor.
47    pub actor_status: ActorStatus,
48    /// If this event is associated with a message, the message headers.
49    #[derivative(PartialEq = "ignore")]
50    pub message_headers: Option<Flattrs>,
51}
52wirevalue::register_type!(ActorSupervisionEvent);
53
54impl ActorSupervisionEvent {
55    /// Create a new supervision event. Timestamp is set to the current time.
56    pub fn new(
57        actor_id: reference::ActorId,
58        display_name: Option<String>,
59        actor_status: ActorStatus,
60        message_headers: Option<Flattrs>,
61    ) -> Self {
62        Self {
63            actor_id,
64            display_name,
65            occurred_at: std::time::SystemTime::now(),
66            actor_status,
67            message_headers,
68        }
69    }
70
71    fn actor_name(&self) -> String {
72        self.display_name
73            .clone()
74            .unwrap_or_else(|| self.actor_id.to_string())
75    }
76
77    /// Walk the `UnhandledSupervisionEvent` chain to the root-cause
78    /// event — the first event whose status is not
79    /// `UnhandledSupervisionEvent`.
80    pub fn caused_by(&self) -> &ActorSupervisionEvent {
81        let mut event = self;
82        while let ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(inner)) =
83            &event.actor_status
84        {
85            event = inner;
86        }
87        event
88    }
89
90    /// Walk the `UnhandledSupervisionEvent` chain to find the root-cause
91    /// actor that originally failed.
92    ///
93    /// Returns `None` if the event is not a failure. Always returns the
94    /// leaf of the chain — the actor whose status is the root cause,
95    /// even if that leaf is a non-failure (e.g. a stopped process).
96    pub fn actually_failing_actor(&self) -> Option<&ActorSupervisionEvent> {
97        if !self.is_error() {
98            return None;
99        }
100        Some(self.caused_by())
101    }
102
103    /// This event is for a supervision error.
104    pub fn is_error(&self) -> bool {
105        self.actor_status.is_failed()
106    }
107
108    /// Produce a concise failure report. Returns `None` for non-failure
109    /// events.
110    pub fn failure_report(&self) -> Option<String> {
111        if !self.is_error() {
112            return None;
113        }
114        let mut output = String::new();
115        self.write_failure_report(&mut output)
116            .expect("writing to String cannot fail");
117        Some(output)
118    }
119
120    fn write_failure_report(&self, f: &mut String) -> fmt::Result {
121        let mut current = self;
122        let mut last_unhandled: Option<&ActorSupervisionEvent> = None;
123        while let ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(inner)) =
124            &current.actor_status
125        {
126            last_unhandled = Some(current);
127            current = inner;
128        }
129
130        if !current.actor_status.is_failed() {
131            let parent = last_unhandled.expect(
132                "top-level event is a failure but leaf is not; \
133                 chain must contain an UnhandledSupervisionEvent",
134            );
135            writeln!(
136                f,
137                "The actor {} failed because it did not handle a supervision event \
138                 from its child. The event was:",
139                parent.actor_name()
140            )?;
141            return write!(indented(f).with_str("  "), "{}", current);
142        }
143
144        writeln!(
145            f,
146            "The actor {} and all its descendants have failed:",
147            current.actor_name()
148        )?;
149        match &current.actor_status {
150            ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(msg, child)) => {
151                writeln!(indented(f).with_str("  "), "{}", msg.trim_end())?;
152                writeln!(f, "This error occurred while handling another failure:")?;
153                let child_report = child
154                    .failure_report()
155                    .expect("child of ErrorDuringHandlingSupervision is always a failure");
156                write!(indented(f).with_str("  "), "{}", child_report)
157            }
158            ActorStatus::Failed(err) => write!(indented(f).with_str("  "), "{}", err),
159            _ => unreachable!("current.is_failed() was true"),
160        }
161    }
162}
163
164impl std::error::Error for ActorSupervisionEvent {}
165
166impl fmt::Display for ActorSupervisionEvent {
167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168        let name = self.actor_name();
169        match &self.actor_status {
170            ActorStatus::Failed(
171                err @ (ActorErrorKind::Generic(_) | ActorErrorKind::Aborted(_)),
172            ) => {
173                writeln!(f, "Supervision event: actor {} failed:", name)?;
174                write!(indented(f).with_str("  "), "{}", err)
175            }
176            ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(child)) => {
177                writeln!(
178                    f,
179                    "Supervision event: actor {} failed because it did not handle \
180                     a supervision event from its child. The child's event was:",
181                    name
182                )?;
183                write!(indented(f).with_str("  "), "{}", child)
184            }
185            ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(msg, child)) => {
186                writeln!(f, "Supervision event: actor {} failed:", name)?;
187                writeln!(indented(f).with_str("  "), "{}", msg.trim_end())?;
188                writeln!(
189                    f,
190                    "This error occurred while handling a supervision event from \
191                     its child. The event was:"
192                )?;
193                write!(indented(f).with_str("  "), "{}", child)
194            }
195            ActorStatus::Stopped(_)
196                if self.actor_id.name() == "host_agent" || self.actor_id.name() == "proc_agent" =>
197            {
198                let addr = self.actor_id.proc_id().addr().to_string();
199                write!(
200                    f,
201                    "Supervision event: the process {} owned by actor {} became unresponsive \
202                     and is assumed dead, check the log on the host for details",
203                    addr,
204                    self.actor_name()
205                )
206            }
207            status => {
208                writeln!(f, "Supervision event: actor {} has status:", name)?;
209                write!(indented(f).with_str("  "), "{}", status)
210            }
211        }
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use crate::actor::ActorErrorKind;
219    use crate::actor::ActorStatus;
220    use crate::channel::ChannelAddr;
221
222    fn test_event(name: &str, status: ActorStatus) -> ActorSupervisionEvent {
223        let proc_id = reference::ProcId::with_name(ChannelAddr::Local(0), "test_proc");
224        ActorSupervisionEvent::new(
225            proc_id.actor_id(name, 0),
226            Some(name.to_string()),
227            status,
228            None,
229        )
230    }
231
232    fn test_event_with_addr(
233        name: &str,
234        addr: ChannelAddr,
235        status: ActorStatus,
236    ) -> ActorSupervisionEvent {
237        let proc_id = reference::ProcId::with_name(addr, "test_proc");
238        ActorSupervisionEvent::new(proc_id.actor_id(name, 0), None, status, None)
239    }
240
241    fn generic(name: &str, msg: &str) -> ActorSupervisionEvent {
242        test_event(
243            name,
244            ActorStatus::Failed(ActorErrorKind::Generic(msg.to_string())),
245        )
246    }
247
248    fn aborted(name: &str, msg: &str) -> ActorSupervisionEvent {
249        test_event(
250            name,
251            ActorStatus::Failed(ActorErrorKind::Aborted(msg.to_string())),
252        )
253    }
254
255    fn unhandled(name: &str, child: ActorSupervisionEvent) -> ActorSupervisionEvent {
256        test_event(
257            name,
258            ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(child))),
259        )
260    }
261
262    fn error_during(name: &str, msg: &str, child: ActorSupervisionEvent) -> ActorSupervisionEvent {
263        test_event(
264            name,
265            ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
266                msg.to_string(),
267                Box::new(child),
268            )),
269        )
270    }
271
272    fn stopped(name: &str, reason: &str) -> ActorSupervisionEvent {
273        test_event(name, ActorStatus::Stopped(reason.to_string()))
274    }
275
276    // Display tests
277
278    #[test]
279    fn test_display_generic() {
280        let e = generic("actor_a", "something went wrong");
281        assert_eq!(
282            format!("{}", e),
283            "Supervision event: actor actor_a failed:\n\
284             \x20 something went wrong"
285        );
286    }
287
288    #[test]
289    fn test_display_aborted() {
290        let e = aborted("actor_a", "user requested");
291        assert_eq!(
292            format!("{}", e),
293            "Supervision event: actor actor_a failed:\n\
294             \x20 actor explicitly aborted due to: user requested"
295        );
296    }
297
298    #[test]
299    fn test_display_unhandled_with_generic_child() {
300        let child = generic("child", "child error");
301        let parent = unhandled("parent", child);
302        assert_eq!(
303            format!("{}", parent),
304            "Supervision event: actor parent failed because it did not handle \
305             a supervision event from its child. The child's event was:\n\
306             \x20 Supervision event: actor child failed:\n\
307             \x20   child error"
308        );
309    }
310
311    #[test]
312    fn test_display_error_during_handling() {
313        let child = generic("child", "child error");
314        let parent = error_during("parent", "handler crashed", child);
315        assert_eq!(
316            format!("{}", parent),
317            "Supervision event: actor parent failed:\n\
318             \x20 handler crashed\n\
319             This error occurred while handling a supervision event from \
320             its child. The event was:\n\
321             \x20 Supervision event: actor child failed:\n\
322             \x20   child error"
323        );
324    }
325
326    #[test]
327    fn test_display_stopped() {
328        let e = stopped("actor_a", "done");
329        assert_eq!(
330            format!("{}", e),
331            "Supervision event: actor actor_a has status:\n\
332             \x20 stopped: done"
333        );
334    }
335
336    #[test]
337    fn test_display_deep_nesting() {
338        let leaf = generic("leaf", "root cause");
339        let mid = unhandled("mid", leaf);
340        let top = unhandled("top", mid);
341        let output = format!("{}", top);
342        assert!(output.contains("actor top failed because"));
343        assert!(output.contains("  Supervision event: actor mid failed because"));
344        assert!(output.contains("    Supervision event: actor leaf failed:"));
345        assert!(output.contains("      root cause"));
346    }
347
348    #[test]
349    fn test_display_unhandled_stopped_child() {
350        let child = stopped("child", "process exited");
351        let parent = unhandled("parent", child);
352        assert_eq!(
353            format!("{}", parent),
354            "Supervision event: actor parent failed because it did not handle \
355             a supervision event from its child. The child's event was:\n\
356             \x20 Supervision event: actor child has status:\n\
357             \x20   stopped: process exited"
358        );
359    }
360
361    // failure_report tests
362
363    #[test]
364    fn test_failure_report_generic() {
365        let e = generic("actor_a", "boom");
366        assert_eq!(
367            e.failure_report().unwrap(),
368            "The actor actor_a and all its descendants have failed:\n\
369             \x20 boom"
370        );
371    }
372
373    #[test]
374    fn test_failure_report_aborted() {
375        let e = aborted("actor_a", "user requested");
376        assert_eq!(
377            e.failure_report().unwrap(),
378            "The actor actor_a and all its descendants have failed:\n\
379             \x20 actor explicitly aborted due to: user requested"
380        );
381    }
382
383    #[test]
384    fn test_failure_report_unhandled_chain_to_generic() {
385        let leaf = generic("leaf", "root cause");
386        let mid = unhandled("mid", leaf);
387        let top = unhandled("top", mid);
388        assert_eq!(
389            top.failure_report().unwrap(),
390            "The actor leaf and all its descendants have failed:\n\
391             \x20 root cause"
392        );
393    }
394
395    #[test]
396    fn test_failure_report_unhandled_chain_to_stopped() {
397        let leaf = stopped("some_actor", "process exited");
398        let mid = unhandled("mid", leaf);
399        let top = unhandled("top", mid);
400        let report = top.failure_report().unwrap();
401        assert_eq!(
402            report,
403            "The actor mid failed because it did not handle a supervision event \
404             from its child. The event was:\n\
405             \x20 Supervision event: actor some_actor has status:\n\
406             \x20   stopped: process exited"
407        );
408    }
409
410    #[test]
411    fn test_failure_report_unhandled_chain_to_stopped_proc_agent() {
412        let leaf = test_event_with_addr(
413            "proc_agent",
414            ChannelAddr::Local(99),
415            ActorStatus::Stopped("process exited".to_string()),
416        );
417        let mid = unhandled("mid", leaf);
418        let top = unhandled("top", mid);
419        let report = top.failure_report().unwrap();
420        assert!(
421            report.contains("did not handle a supervision event"),
422            "got: {}",
423            report
424        );
425        assert!(
426            report.contains("process local:99 owned by actor") && report.contains("unresponsive"),
427            "got: {}",
428            report
429        );
430    }
431
432    #[test]
433    fn test_failure_report_error_during_handling() {
434        let child = generic("child", "original error");
435        let parent = error_during("parent", "handler failed", child);
436        assert_eq!(
437            parent.failure_report().unwrap(),
438            "The actor parent and all its descendants have failed:\n\
439             \x20 handler failed\n\
440             This error occurred while handling another failure:\n\
441             \x20 The actor child and all its descendants have failed:\n\
442             \x20   original error"
443        );
444    }
445
446    #[test]
447    fn test_failure_report_error_during_handling_nested() {
448        let leaf = generic("leaf", "root cause");
449        let mid = error_during("mid", "mid failed", leaf);
450        let top = error_during("top", "top failed", mid);
451        let report = top.failure_report().unwrap();
452        assert!(report.starts_with(
453            "The actor top and all its descendants have failed:\n\
454             \x20 top failed\n\
455             This error occurred while handling another failure:\n\
456             \x20 The actor mid and all its descendants have failed:\n\
457             \x20   mid failed\n\
458             \x20 This error occurred while handling another failure:\n\
459             \x20   The actor leaf and all its descendants have failed:\n\
460             \x20     root cause"
461        ));
462    }
463
464    #[test]
465    fn test_failure_report_unhandled_to_error_during_handling() {
466        let leaf = generic("leaf", "root cause");
467        let handler_err = error_during("handler", "while handling", leaf);
468        let top = unhandled("top", handler_err);
469        let report = top.failure_report().unwrap();
470        assert!(report.contains("The actor handler and all its descendants have failed:"));
471        assert!(report.contains("while handling"));
472        assert!(report.contains("root cause"));
473    }
474
475    #[test]
476    fn test_failure_report_none_on_non_failure() {
477        let e = stopped("actor_a", "done");
478        assert!(e.failure_report().is_none());
479    }
480
481    #[test]
482    fn test_failure_report_direct_generic_no_chain() {
483        let e = generic("solo", "direct error");
484        assert_eq!(
485            e.failure_report().unwrap(),
486            "The actor solo and all its descendants have failed:\n\
487             \x20 direct error"
488        );
489    }
490
491    #[test]
492    fn test_display_host_agent_stopped() {
493        let e = test_event_with_addr(
494            "host_agent",
495            ChannelAddr::Local(42),
496            ActorStatus::Stopped("gone".to_string()),
497        );
498        let output = format!("{}", e);
499        assert!(
500            output.contains("process local:42 owned by actor") && output.contains("unresponsive"),
501            "got: {}",
502            output
503        );
504    }
505
506    #[test]
507    fn test_display_proc_agent_stopped() {
508        let e = test_event_with_addr(
509            "proc_agent",
510            ChannelAddr::Local(7),
511            ActorStatus::Stopped("dead".to_string()),
512        );
513        let output = format!("{}", e);
514        assert!(
515            output.contains("process local:7 owned by actor") && output.contains("unresponsive"),
516            "got: {}",
517            output
518        );
519    }
520
521    #[test]
522    fn test_display_error_during_handling_trim_end() {
523        let child = generic("child", "child error");
524        let parent = error_during("parent", "msg with trailing newline\n", child);
525        let output = format!("{}", parent);
526        assert!(
527            output.contains("  msg with trailing newline\nThis error occurred"),
528            "writeln! should trim trailing newline from msg: {}",
529            output
530        );
531    }
532
533    #[test]
534    fn test_failure_report_error_during_handling_trim_end() {
535        let child = generic("child", "child error");
536        let parent = error_during("parent", "msg with trailing newline\n", child);
537        let report = parent.failure_report().unwrap();
538        assert!(
539            report.contains("  msg with trailing newline\nThis error occurred"),
540            "writeln! should trim trailing newline from msg: {}",
541            report
542        );
543    }
544
545    /// Exercises SV-1 (see module doc): for a parent wrapping a
546    /// stopped child in `UnhandledSupervisionEvent`,
547    /// `actually_failing_actor()` returns the stopped child as
548    /// root cause for structured failure attribution.
549    #[test]
550    fn test_sv1_actually_failing_actor_returns_stopped_child() {
551        let proc_id = reference::ProcId::with_name(ChannelAddr::Local(0), "test_proc");
552        let child_id = proc_id.actor_id("proc_agent", 0);
553        let parent_id = proc_id.actor_id("controller", 0);
554
555        let child_event = ActorSupervisionEvent::new(
556            child_id.clone(),
557            Some("proc_agent".into()),
558            ActorStatus::Stopped("host died".into()),
559            None,
560        );
561        let parent_event = ActorSupervisionEvent::new(
562            parent_id,
563            Some("controller".into()),
564            ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(
565                child_event,
566            ))),
567            None,
568        );
569
570        // SV-1: root cause is the stopped child, not the parent.
571        let root = parent_event
572            .actually_failing_actor()
573            .expect("parent_event is a failure");
574        assert_eq!(root.actor_id, child_id);
575        assert!(
576            matches!(root.actor_status, ActorStatus::Stopped(_)),
577            "root cause should be the stopped child, got: {:?}",
578            root.actor_status,
579        );
580    }
581}