hyperactor/testing/proc_supervison.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::sync::Arc;
10use std::sync::Mutex;
11
12use async_trait::async_trait;
13
14use crate::Actor;
15use crate::ActorHandle;
16use crate::Context;
17use crate::Handler;
18use crate::proc::Proc;
19use crate::supervision::ActorSupervisionEvent;
20
21/// Used to create a proc supervison coordinator for testing purposes. Normally you
22/// should not use this struct. It is only required in the following cases:
23/// 1. The tests' logic involves actor failures;
24/// 2. A supervison coordinator is not already set for the proc (e.g. the
25/// ProcActor scenario which will be explained later.)
26///
27/// This is because hyperactor's supervision logic requires actor failures in
28/// a proc to be bubbled up to through the supervision chain:
29///
30/// grandchild actor -> child actor -> root actor -> proc supervison coordinator
31///
32/// If the the proc supervison coordinator is not set, supervision will crash the
33/// process because it cannot find the coordinator during the "bubbling up".
34#[derive(Debug)]
35pub struct ProcSupervisionCoordinator(ReportedEvent);
36
37impl ProcSupervisionCoordinator {
38 /// Spawn a coordinator actor and set it as the coordinator for the given
39 /// proc. Returns the reported event state and the coordinator actor handle.
40 /// Callers should drain and stop the coordinator handle when the test is
41 /// done to avoid ASAN thread-leak errors.
42 pub async fn set(
43 proc: &Proc,
44 ) -> Result<(ReportedEvent, ActorHandle<ProcSupervisionCoordinator>), anyhow::Error> {
45 let state = ReportedEvent::new();
46 let actor = ProcSupervisionCoordinator(state.clone());
47 let coordinator = proc.spawn::<ProcSupervisionCoordinator>("coordinator", actor)?;
48 proc.set_supervision_coordinator(coordinator.port())?;
49 Ok((state, coordinator))
50 }
51}
52
53/// Used to store the last event reported to [ProcSupervisionCoordinator].
54#[derive(Clone, Debug)]
55pub struct ReportedEvent(Arc<Mutex<Option<ActorSupervisionEvent>>>);
56impl ReportedEvent {
57 fn new() -> Self {
58 Self(Arc::new(Mutex::new(None)))
59 }
60
61 /// The last event reported to the coordinator.
62 pub fn event(&self) -> Option<ActorSupervisionEvent> {
63 self.0.lock().unwrap().clone()
64 }
65
66 fn set(&self, event: ActorSupervisionEvent) {
67 *self.0.lock().unwrap() = Some(event);
68 }
69}
70
71#[async_trait]
72impl Actor for ProcSupervisionCoordinator {}
73
74#[async_trait]
75impl Handler<ActorSupervisionEvent> for ProcSupervisionCoordinator {
76 async fn handle(
77 &mut self,
78 _cx: &Context<Self>,
79 msg: ActorSupervisionEvent,
80 ) -> anyhow::Result<()> {
81 tracing::debug!("in handler, handling supervision event");
82 self.0.set(msg);
83 Ok(())
84 }
85}