1use std::fmt;
38use std::fmt::Debug;
39use std::fmt::Write;
40use std::time::SystemTime;
41
42use derivative::Derivative;
43use hyperactor_config::Flattrs;
44use indenter::indented;
45use serde::Deserialize;
46use serde::Serialize;
47
48use crate::ActorAddr;
49use crate::actor::ActorErrorKind;
50use crate::actor::ActorStatus;
51
52#[derive(Clone, Debug, Derivative, Serialize, Deserialize, typeuri::Named)]
54#[derivative(PartialEq, Eq)]
55pub struct ActorSupervisionEvent {
56 pub actor_id: ActorAddr,
58 pub display_name: Option<String>,
60 #[derivative(PartialEq = "ignore")]
62 pub occurred_at: SystemTime,
63 pub actor_status: ActorStatus,
65 #[derivative(PartialEq = "ignore")]
67 pub message_headers: Option<Flattrs>,
68}
69wirevalue::register_type!(ActorSupervisionEvent);
70
71impl ActorSupervisionEvent {
72 pub fn new(
74 actor_id: impl Into<ActorAddr>,
75 display_name: Option<String>,
76 actor_status: ActorStatus,
77 message_headers: Option<Flattrs>,
78 ) -> Self {
79 Self {
80 actor_id: actor_id.into(),
81 display_name,
82 occurred_at: std::time::SystemTime::now(),
83 actor_status,
84 message_headers,
85 }
86 }
87
88 fn actor_name(&self) -> String {
89 self.display_name.clone().unwrap_or_else(|| {
90 if self.actor_id.is_root() {
91 format!("{},{}", self.actor_id.proc_addr(), self.actor_id.log_name())
92 } else {
93 self.actor_id.to_string()
94 }
95 })
96 }
97
98 pub fn caused_by(&self) -> &ActorSupervisionEvent {
102 let mut event = self;
103 while let ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(inner)) =
104 &event.actor_status
105 {
106 event = inner;
107 }
108 event
109 }
110
111 pub fn actually_failing_actor(&self) -> Option<&ActorSupervisionEvent> {
118 if !self.is_error() {
119 return None;
120 }
121 Some(self.caused_by())
122 }
123
124 pub fn is_error(&self) -> bool {
126 self.actor_status.is_failed()
127 }
128
129 pub fn failure_report(&self) -> Option<String> {
132 if !self.is_error() {
133 return None;
134 }
135 let mut output = String::new();
136 self.write_failure_report(&mut output)
137 .expect("writing to String cannot fail");
138 Some(output)
139 }
140
141 fn write_failure_report(&self, f: &mut String) -> fmt::Result {
142 let mut current = self;
143 let mut last_unhandled: Option<&ActorSupervisionEvent> = None;
144 while let ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(inner)) =
145 ¤t.actor_status
146 {
147 last_unhandled = Some(current);
148 current = inner;
149 }
150
151 if !current.actor_status.is_failed() {
152 let parent = last_unhandled.expect(
153 "top-level event is a failure but leaf is not; \
154 chain must contain an UnhandledSupervisionEvent",
155 );
156 writeln!(
157 f,
158 "The actor {} failed because it did not handle a supervision event \
159 from its child. The event was:",
160 parent.actor_name()
161 )?;
162 return write!(indented(f).with_str(" "), "{}", current);
163 }
164
165 writeln!(
166 f,
167 "The actor {} and all its descendants have failed:",
168 current.actor_name()
169 )?;
170 match ¤t.actor_status {
171 ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(msg, child)) => {
172 writeln!(indented(f).with_str(" "), "{}", msg.trim_end())?;
173 writeln!(f, "This error occurred while handling another failure:")?;
174 let child_report = child
175 .failure_report()
176 .expect("child of ErrorDuringHandlingSupervision is always a failure");
177 write!(indented(f).with_str(" "), "{}", child_report)
178 }
179 ActorStatus::Failed(err) => write!(indented(f).with_str(" "), "{}", err),
180 _ => unreachable!("current.is_failed() was true"),
181 }
182 }
183}
184
185impl std::error::Error for ActorSupervisionEvent {}
186
187impl fmt::Display for ActorSupervisionEvent {
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189 let name = self.actor_name();
190 match &self.actor_status {
191 ActorStatus::Failed(
192 err @ (ActorErrorKind::Generic(_) | ActorErrorKind::Aborted(_)),
193 ) => {
194 writeln!(f, "Supervision event: actor {} failed:", name)?;
195 write!(indented(f).with_str(" "), "{}", err)
196 }
197 ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(child)) => {
198 writeln!(
199 f,
200 "Supervision event: actor {} failed because it did not handle \
201 a supervision event from its child. The child's event was:",
202 name
203 )?;
204 write!(indented(f).with_str(" "), "{}", child)
205 }
206 ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(msg, child)) => {
207 writeln!(f, "Supervision event: actor {} failed:", name)?;
208 writeln!(indented(f).with_str(" "), "{}", msg.trim_end())?;
209 writeln!(
210 f,
211 "This error occurred while handling a supervision event from \
212 its child. The event was:"
213 )?;
214 write!(indented(f).with_str(" "), "{}", child)
215 }
216 ActorStatus::Stopped(_)
217 if self
218 .actor_id
219 .label()
220 .is_some_and(|l| l.as_str() == "host_agent" || l.as_str() == "proc_agent") =>
221 {
222 let addr = self.actor_id.proc_addr().addr().to_string();
223 write!(
224 f,
225 "Supervision event: the process {} owned by actor {} became unresponsive \
226 and is assumed dead, check the log on the host for details",
227 addr,
228 self.actor_name()
229 )
230 }
231 status => {
232 writeln!(f, "Supervision event: actor {} has status:", name)?;
233 write!(indented(f).with_str(" "), "{}", status)
234 }
235 }
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use crate::ProcAddr;
243 use crate::actor::ActorErrorKind;
244 use crate::actor::ActorStatus;
245 use crate::channel::ChannelAddr;
246
247 fn test_event(name: &str, status: ActorStatus) -> ActorSupervisionEvent {
248 let proc_id = ProcAddr::singleton(ChannelAddr::Local(0), "test_proc");
249 ActorSupervisionEvent::new(
250 proc_id.actor_addr(name),
251 Some(name.to_string()),
252 status,
253 None,
254 )
255 }
256
257 fn test_event_with_addr(
258 name: &str,
259 addr: ChannelAddr,
260 status: ActorStatus,
261 ) -> ActorSupervisionEvent {
262 let proc_id = ProcAddr::singleton(addr, "test_proc");
263 ActorSupervisionEvent::new(proc_id.actor_addr(name), None, status, None)
264 }
265
266 fn generic(name: &str, msg: &str) -> ActorSupervisionEvent {
267 test_event(
268 name,
269 ActorStatus::Failed(ActorErrorKind::Generic(msg.to_string())),
270 )
271 }
272
273 fn aborted(name: &str, msg: &str) -> ActorSupervisionEvent {
274 test_event(
275 name,
276 ActorStatus::Failed(ActorErrorKind::Aborted(msg.to_string())),
277 )
278 }
279
280 fn unhandled(name: &str, child: ActorSupervisionEvent) -> ActorSupervisionEvent {
281 test_event(
282 name,
283 ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(child))),
284 )
285 }
286
287 fn error_during(name: &str, msg: &str, child: ActorSupervisionEvent) -> ActorSupervisionEvent {
288 test_event(
289 name,
290 ActorStatus::Failed(ActorErrorKind::ErrorDuringHandlingSupervision(
291 msg.to_string(),
292 Box::new(child),
293 )),
294 )
295 }
296
297 fn stopped(name: &str, reason: &str) -> ActorSupervisionEvent {
298 test_event(name, ActorStatus::Stopped(reason.to_string()))
299 }
300
301 #[test]
304 fn test_display_generic() {
305 let e = generic("actor_a", "something went wrong");
306 assert_eq!(
307 format!("{}", e),
308 "Supervision event: actor actor_a failed:\n\
309 \x20 something went wrong"
310 );
311 }
312
313 #[test]
314 fn test_display_aborted() {
315 let e = aborted("actor_a", "user requested");
316 assert_eq!(
317 format!("{}", e),
318 "Supervision event: actor actor_a failed:\n\
319 \x20 actor explicitly aborted due to: user requested"
320 );
321 }
322
323 #[test]
324 fn test_display_unhandled_with_generic_child() {
325 let child = generic("child", "child error");
326 let parent = unhandled("parent", child);
327 assert_eq!(
328 format!("{}", parent),
329 "Supervision event: actor parent failed because it did not handle \
330 a supervision event from its child. The child's event was:\n\
331 \x20 Supervision event: actor child failed:\n\
332 \x20 child error"
333 );
334 }
335
336 #[test]
337 fn test_display_error_during_handling() {
338 let child = generic("child", "child error");
339 let parent = error_during("parent", "handler crashed", child);
340 assert_eq!(
341 format!("{}", parent),
342 "Supervision event: actor parent failed:\n\
343 \x20 handler crashed\n\
344 This error occurred while handling a supervision event from \
345 its child. The event was:\n\
346 \x20 Supervision event: actor child failed:\n\
347 \x20 child error"
348 );
349 }
350
351 #[test]
352 fn test_display_stopped() {
353 let e = stopped("actor_a", "done");
354 assert_eq!(
355 format!("{}", e),
356 "Supervision event: actor actor_a has status:\n\
357 \x20 stopped: done"
358 );
359 }
360
361 #[test]
362 fn test_display_deep_nesting() {
363 let leaf = generic("leaf", "root cause");
364 let mid = unhandled("mid", leaf);
365 let top = unhandled("top", mid);
366 let output = format!("{}", top);
367 assert!(output.contains("actor top failed because"));
368 assert!(output.contains(" Supervision event: actor mid failed because"));
369 assert!(output.contains(" Supervision event: actor leaf failed:"));
370 assert!(output.contains(" root cause"));
371 }
372
373 #[test]
374 fn test_display_unhandled_stopped_child() {
375 let child = stopped("child", "process exited");
376 let parent = unhandled("parent", child);
377 assert_eq!(
378 format!("{}", parent),
379 "Supervision event: actor parent failed because it did not handle \
380 a supervision event from its child. The child's event was:\n\
381 \x20 Supervision event: actor child has status:\n\
382 \x20 stopped: process exited"
383 );
384 }
385
386 #[test]
389 fn test_failure_report_generic() {
390 let e = generic("actor_a", "boom");
391 assert_eq!(
392 e.failure_report().unwrap(),
393 "The actor actor_a and all its descendants have failed:\n\
394 \x20 boom"
395 );
396 }
397
398 #[test]
399 fn test_failure_report_aborted() {
400 let e = aborted("actor_a", "user requested");
401 assert_eq!(
402 e.failure_report().unwrap(),
403 "The actor actor_a and all its descendants have failed:\n\
404 \x20 actor explicitly aborted due to: user requested"
405 );
406 }
407
408 #[test]
409 fn test_failure_report_unhandled_chain_to_generic() {
410 let leaf = generic("leaf", "root cause");
411 let mid = unhandled("mid", leaf);
412 let top = unhandled("top", mid);
413 assert_eq!(
414 top.failure_report().unwrap(),
415 "The actor leaf and all its descendants have failed:\n\
416 \x20 root cause"
417 );
418 }
419
420 #[test]
421 fn test_failure_report_unhandled_chain_to_stopped() {
422 let leaf = stopped("some_actor", "process exited");
423 let mid = unhandled("mid", leaf);
424 let top = unhandled("top", mid);
425 let report = top.failure_report().unwrap();
426 assert_eq!(
427 report,
428 "The actor mid failed because it did not handle a supervision event \
429 from its child. The event was:\n\
430 \x20 Supervision event: actor some_actor has status:\n\
431 \x20 stopped: process exited"
432 );
433 }
434
435 #[test]
436 fn test_failure_report_unhandled_chain_to_stopped_proc_agent() {
437 let leaf = test_event_with_addr(
438 "proc_agent",
439 ChannelAddr::Local(99),
440 ActorStatus::Stopped("process exited".to_string()),
441 );
442 let mid = unhandled("mid", leaf);
443 let top = unhandled("top", mid);
444 let report = top.failure_report().unwrap();
445 assert!(
446 report.contains("did not handle a supervision event"),
447 "got: {}",
448 report
449 );
450 assert!(
451 report.contains("process local:99 owned by actor") && report.contains("unresponsive"),
452 "got: {}",
453 report
454 );
455 }
456
457 #[test]
458 fn test_failure_report_error_during_handling() {
459 let child = generic("child", "original error");
460 let parent = error_during("parent", "handler failed", child);
461 assert_eq!(
462 parent.failure_report().unwrap(),
463 "The actor parent and all its descendants have failed:\n\
464 \x20 handler failed\n\
465 This error occurred while handling another failure:\n\
466 \x20 The actor child and all its descendants have failed:\n\
467 \x20 original error"
468 );
469 }
470
471 #[test]
472 fn test_failure_report_error_during_handling_nested() {
473 let leaf = generic("leaf", "root cause");
474 let mid = error_during("mid", "mid failed", leaf);
475 let top = error_during("top", "top failed", mid);
476 let report = top.failure_report().unwrap();
477 assert!(report.starts_with(
478 "The actor top and all its descendants have failed:\n\
479 \x20 top failed\n\
480 This error occurred while handling another failure:\n\
481 \x20 The actor mid and all its descendants have failed:\n\
482 \x20 mid failed\n\
483 \x20 This error occurred while handling another failure:\n\
484 \x20 The actor leaf and all its descendants have failed:\n\
485 \x20 root cause"
486 ));
487 }
488
489 #[test]
490 fn test_failure_report_unhandled_to_error_during_handling() {
491 let leaf = generic("leaf", "root cause");
492 let handler_err = error_during("handler", "while handling", leaf);
493 let top = unhandled("top", handler_err);
494 let report = top.failure_report().unwrap();
495 assert!(report.contains("The actor handler and all its descendants have failed:"));
496 assert!(report.contains("while handling"));
497 assert!(report.contains("root cause"));
498 }
499
500 #[test]
501 fn test_failure_report_none_on_non_failure() {
502 let e = stopped("actor_a", "done");
503 assert!(e.failure_report().is_none());
504 }
505
506 #[test]
507 fn test_failure_report_direct_generic_no_chain() {
508 let e = generic("solo", "direct error");
509 assert_eq!(
510 e.failure_report().unwrap(),
511 "The actor solo and all its descendants have failed:\n\
512 \x20 direct error"
513 );
514 }
515
516 #[test]
517 fn test_display_host_agent_stopped() {
518 let e = test_event_with_addr(
519 "host_agent",
520 ChannelAddr::Local(42),
521 ActorStatus::Stopped("gone".to_string()),
522 );
523 let output = format!("{}", e);
524 assert!(
525 output.contains("process local:42 owned by actor") && output.contains("unresponsive"),
526 "got: {}",
527 output
528 );
529 }
530
531 #[test]
532 fn test_display_proc_agent_stopped() {
533 let e = test_event_with_addr(
534 "proc_agent",
535 ChannelAddr::Local(7),
536 ActorStatus::Stopped("dead".to_string()),
537 );
538 let output = format!("{}", e);
539 assert!(
540 output.contains("process local:7 owned by actor") && output.contains("unresponsive"),
541 "got: {}",
542 output
543 );
544 }
545
546 #[test]
547 fn test_display_error_during_handling_trim_end() {
548 let child = generic("child", "child error");
549 let parent = error_during("parent", "msg with trailing newline\n", child);
550 let output = format!("{}", parent);
551 assert!(
552 output.contains(" msg with trailing newline\nThis error occurred"),
553 "writeln! should trim trailing newline from msg: {}",
554 output
555 );
556 }
557
558 #[test]
559 fn test_failure_report_error_during_handling_trim_end() {
560 let child = generic("child", "child error");
561 let parent = error_during("parent", "msg with trailing newline\n", child);
562 let report = parent.failure_report().unwrap();
563 assert!(
564 report.contains(" msg with trailing newline\nThis error occurred"),
565 "writeln! should trim trailing newline from msg: {}",
566 report
567 );
568 }
569
570 #[test]
575 fn test_sv1_actually_failing_actor_returns_stopped_child() {
576 let proc_id = ProcAddr::singleton(ChannelAddr::Local(0), "test_proc");
577 let child_id = proc_id.actor_addr("proc_agent");
578 let parent_id = proc_id.actor_addr("controller");
579
580 let child_event = ActorSupervisionEvent::new(
581 child_id.clone(),
582 Some("proc_agent".into()),
583 ActorStatus::Stopped("host died".into()),
584 None,
585 );
586 let parent_event = ActorSupervisionEvent::new(
587 parent_id,
588 Some("controller".into()),
589 ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(
590 child_event,
591 ))),
592 None,
593 );
594
595 let root = parent_event
597 .actually_failing_actor()
598 .expect("parent_event is a failure");
599 assert_eq!(root.actor_id, child_id);
600 assert!(
601 matches!(root.actor_status, ActorStatus::Stopped(_)),
602 "root cause should be the stopped child, got: {:?}",
603 root.actor_status,
604 );
605 }
606}