hyperactor/
supervision.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Messages used in supervision.
10
11use std::fmt;
12use std::fmt::Debug;
13use std::time::SystemTime;
14
15use chrono::DateTime;
16use chrono::offset::Local;
17use derivative::Derivative;
18use hyperactor::clock::Clock;
19use hyperactor::clock::RealClock;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate as hyperactor; // for macros
24use crate::Named;
25use crate::actor::ActorStatus;
26use crate::attrs::Attrs;
27use crate::reference::ActorId;
28
29/// This is the local actor supervision event. Child actor will propagate this event to its parent.
30#[derive(Clone, Debug, Derivative, Serialize, Deserialize, Named)]
31#[derivative(PartialEq, Eq)]
32pub struct ActorSupervisionEvent {
33    /// The actor id of the child actor where the event is triggered.
34    pub actor_id: ActorId,
35    /// The time when the event is triggered.
36    #[derivative(PartialEq = "ignore")]
37    pub occurred_at: SystemTime,
38    /// Status of the child actor.
39    pub actor_status: ActorStatus,
40    /// If this event is associated with a message, the message headers.
41    #[derivative(PartialEq = "ignore")]
42    pub message_headers: Option<Attrs>,
43    /// Optional supervision event that caused this event, for recursive propagation.
44    pub caused_by: Option<Box<ActorSupervisionEvent>>,
45}
46
47impl ActorSupervisionEvent {
48    /// Create a new supervision event. Timestamp is set to the current time.
49    pub fn new(
50        actor_id: ActorId,
51        actor_status: ActorStatus,
52        message_headers: Option<Attrs>,
53        caused_by: Option<Box<ActorSupervisionEvent>>,
54    ) -> Self {
55        Self {
56            actor_id,
57            occurred_at: RealClock.system_time_now(),
58            actor_status,
59            message_headers,
60            caused_by,
61        }
62    }
63    /// Compute an actor status from this event, ensuring that "caused-by"
64    /// events are included in failure states. This should be used as the
65    /// actor status when reporting events to users.
66    pub fn status(&self) -> ActorStatus {
67        match &self.actor_status {
68            ActorStatus::Failed(msg) => ActorStatus::Failed(format!("{}: {}", self, msg)),
69            status => status.clone(),
70        }
71    }
72}
73
74impl std::error::Error for ActorSupervisionEvent {}
75
76impl fmt::Display for ActorSupervisionEvent {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        write!(
79            f,
80            "{}: {} at {}",
81            self.actor_id,
82            self.actor_status,
83            DateTime::<Local>::from(self.occurred_at),
84        )?;
85        if let Some(message_headers) = &self.message_headers {
86            let headers = serde_json::to_string(&message_headers)
87                .expect("could not serialize message headers");
88            write!(f, " (headers: {})", headers)?;
89        }
90        if let Some(caused_by) = &self.caused_by {
91            write!(f, ": (caused by: {})", caused_by)?;
92        }
93        Ok(())
94    }
95}