hyperactor/
supervision.rs1use std::fmt;
12use std::fmt::Debug;
13use std::fmt::Write;
14use std::time::SystemTime;
15
16use derivative::Derivative;
17use hyperactor_config::Flattrs;
18use indenter::indented;
19use serde::Deserialize;
20use serde::Serialize;
21
22use crate::actor::ActorErrorKind;
23use crate::actor::ActorStatus;
24use crate::reference;
25
26#[derive(Clone, Debug, Derivative, Serialize, Deserialize, typeuri::Named)]
28#[derivative(PartialEq, Eq)]
29pub struct ActorSupervisionEvent {
30 pub actor_id: reference::ActorId,
32 pub display_name: Option<String>,
34 #[derivative(PartialEq = "ignore")]
36 pub occurred_at: SystemTime,
37 pub actor_status: ActorStatus,
39 #[derivative(PartialEq = "ignore")]
41 pub message_headers: Option<Flattrs>,
42}
43wirevalue::register_type!(ActorSupervisionEvent);
44
45impl ActorSupervisionEvent {
46 pub fn new(
48 actor_id: reference::ActorId,
49 display_name: Option<String>,
50 actor_status: ActorStatus,
51 message_headers: Option<Flattrs>,
52 ) -> Self {
53 Self {
54 actor_id,
55 display_name,
56 occurred_at: std::time::SystemTime::now(),
57 actor_status,
58 message_headers,
59 }
60 }
61
62 fn actor_name(&self) -> String {
63 self.display_name
64 .clone()
65 .unwrap_or_else(|| self.actor_id.to_string())
66 }
67
68 pub fn actually_failing_actor(&self) -> &ActorSupervisionEvent {
71 let mut event = self;
72 while let ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(e)) =
73 &event.actor_status
74 {
75 event = e;
76 }
77 event
78 }
79
80 pub fn is_error(&self) -> bool {
82 self.actor_status.is_failed()
83 }
84}
85
86impl std::error::Error for ActorSupervisionEvent {}
87
88fn fmt_status<'a>(
89 actor_id: &reference::ActorId,
90 status: &'a ActorStatus,
91 f: &mut fmt::Formatter<'_>,
92) -> Result<Option<&'a ActorSupervisionEvent>, fmt::Error> {
93 let mut f = indented(f).with_str(" ");
94
95 match status {
96 ActorStatus::Stopped(_)
97 if actor_id.name() == "host_agent" || actor_id.name() == "proc_agent" =>
98 {
99 let name = actor_id.proc_id().addr().to_string();
101 write!(
102 f,
103 "The process {} owned by this actor became unresponsive and is assumed dead, check the log on the host for details",
104 name
105 )?;
106 Ok(None)
107 }
108 ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
109 msg,
110 during_handling_of,
111 )) => {
112 write!(f, "{}", msg)?;
113 Ok(Some(during_handling_of))
114 }
115 ActorStatus::Failed(ActorErrorKind::Generic(msg)) => {
116 write!(f, "{}", msg)?;
117 Ok(None)
118 }
119 status => {
120 write!(f, "{}", status)?;
121 Ok(None)
122 }
123 }
124}
125
126impl fmt::Display for ActorSupervisionEvent {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 let actor_name = self.actor_name();
129 writeln!(
130 f,
131 "The actor {} and all its descendants have failed.",
132 actor_name
133 )?;
134 let failing_event = self.actually_failing_actor();
135 let failing_actor = failing_event.actor_name();
136 let its_name = if failing_actor == actor_name {
137 "itself"
138 } else {
139 &failing_actor
140 };
141 writeln!(f, "This occurred because the actor {} failed.", its_name)?;
142 writeln!(f, "The error was:")?;
143 let during_handling_of =
144 fmt_status(&failing_event.actor_id, &failing_event.actor_status, f)?;
145 if let Some(event) = during_handling_of {
146 writeln!(
147 f,
148 "This error occurred during the handling of another failure:"
149 )?;
150 fmt::Display::fmt(event, f)?;
151 }
152 Ok(())
153 }
154}