hyperactor/
supervision.rs1use std::fmt;
12use std::fmt::Debug;
13use std::fmt::Write;
14use std::time::SystemTime;
15
16use derivative::Derivative;
17use hyperactor::clock::Clock;
18use hyperactor::clock::RealClock;
19use indenter::indented;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate as hyperactor; use crate::Named;
25use crate::actor::ActorErrorKind;
26use crate::actor::ActorStatus;
27use crate::attrs::Attrs;
28use crate::reference::ActorId;
29
30#[derive(Clone, Debug, Derivative, Serialize, Deserialize, Named)]
32#[derivative(PartialEq, Eq)]
33pub struct ActorSupervisionEvent {
34 pub actor_id: ActorId,
36 pub display_name: Option<String>,
38 #[derivative(PartialEq = "ignore")]
40 pub occurred_at: SystemTime,
41 pub actor_status: ActorStatus,
43 #[derivative(PartialEq = "ignore")]
45 pub message_headers: Option<Attrs>,
46}
47
48impl ActorSupervisionEvent {
49 pub fn new(
51 actor_id: ActorId,
52 display_name: Option<String>,
53 actor_status: ActorStatus,
54 message_headers: Option<Attrs>,
55 ) -> Self {
56 Self {
57 actor_id,
58 display_name,
59 occurred_at: RealClock.system_time_now(),
60 actor_status,
61 message_headers,
62 }
63 }
64
65 fn actor_name(&self) -> String {
66 self.display_name
67 .clone()
68 .unwrap_or_else(|| self.actor_id.to_string())
69 }
70
71 fn actually_failing_actor(&self) -> &ActorSupervisionEvent {
72 let mut event = self;
73 while let ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(e)) =
74 &event.actor_status
75 {
76 event = e;
77 }
78 event
79 }
80
81 pub fn is_error(&self) -> bool {
83 self.actor_status.is_failed()
84 }
85}
86
87impl std::error::Error for ActorSupervisionEvent {}
88
89fn fmt_status<'a>(
90 actor_id: &ActorId,
91 status: &'a ActorStatus,
92 f: &mut fmt::Formatter<'_>,
93) -> Result<Option<&'a ActorSupervisionEvent>, fmt::Error> {
94 let mut f = indented(f).with_str(" ");
95
96 match status {
97 ActorStatus::Stopped if actor_id.name() == "agent" => {
98 let name = match actor_id.proc_id() {
100 crate::reference::ProcId::Direct(addr, _) => addr.to_string(),
101 crate::reference::ProcId::Ranked(_, _) => actor_id.proc_id().to_string(),
102 };
103 write!(
104 f,
105 "The process {} owned by this actor became unresponsive and is assumed dead, check the log on the host for details",
106 name
107 )?;
108 Ok(None)
109 }
110 ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
111 msg,
112 during_handling_of,
113 )) => {
114 write!(f, "{}", msg)?;
115 Ok(Some(during_handling_of))
116 }
117 ActorStatus::Failed(ActorErrorKind::Generic(msg)) => {
118 write!(f, "{}", msg)?;
119 Ok(None)
120 }
121 status => {
122 write!(f, "{}", status)?;
123 Ok(None)
124 }
125 }
126}
127
128impl fmt::Display for ActorSupervisionEvent {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 let actor_name = self.actor_name();
131 writeln!(
132 f,
133 "The actor {} and all its descendants have failed.",
134 actor_name
135 )?;
136 let failing_event = self.actually_failing_actor();
137 let failing_actor = failing_event.actor_name();
138 let its_name = if failing_actor == actor_name {
139 "itself"
140 } else {
141 &failing_actor
142 };
143 writeln!(f, "This occurred because the actor {} failed.", its_name)?;
144 writeln!(f, "The error was:")?;
145 let during_handling_of =
146 fmt_status(&failing_event.actor_id, &failing_event.actor_status, f)?;
147 if let Some(event) = during_handling_of {
148 writeln!(
149 f,
150 "This error occurred during the handling of another failure:"
151 )?;
152 fmt::Display::fmt(event, f)?;
153 }
154 Ok(())
155 }
156}