hyperactor/testing/
proc_supervison.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::sync::Arc;
10use std::sync::Mutex;
11
12use async_trait::async_trait;
13use tokio::sync::mpsc;
14
15use crate::Actor;
16use crate::ActorHandle;
17use crate::Context;
18use crate::Handler;
19use crate::proc::Proc;
20use crate::supervision::ActorSupervisionEvent;
21
22/// Used to create a proc supervison coordinator for testing purposes. Normally you
23/// should not use this struct. It is only required in the following cases:
24///   1. The tests' logic involves actor failures;
25///   2. A supervison coordinator is not already set for the proc (e.g. the
26///      ProcActor scenario which will be explained later.)
27///
28///   This is because hyperactor's supervision logic requires actor failures in
29///   a proc to be bubbled up to through the supervision chain:
30///
31///   grandchild actor -> child actor -> root actor -> proc supervison coordinator
32///
33///   If the the proc supervison coordinator is not set, supervision will crash the
34///   process because it cannot find the coordinator during the "bubbling up".
35#[derive(Debug)]
36pub struct ProcSupervisionCoordinator {
37    tx: mpsc::UnboundedSender<ActorSupervisionEvent>,
38    last: Arc<Mutex<Option<ActorSupervisionEvent>>>,
39}
40
41impl ProcSupervisionCoordinator {
42    /// Spawn a coordinator actor and set it as the coordinator for the given
43    /// proc. Returns the reported event state and the coordinator actor handle.
44    /// Callers should drain and stop the coordinator handle when the test is
45    /// done to avoid ASAN thread-leak errors.
46    pub async fn set(
47        proc: &Proc,
48    ) -> Result<(ReportedEvent, ActorHandle<ProcSupervisionCoordinator>), anyhow::Error> {
49        let (tx, rx) = mpsc::unbounded_channel();
50        let last = Arc::new(Mutex::new(None));
51        let actor = ProcSupervisionCoordinator {
52            tx,
53            last: last.clone(),
54        };
55        let coordinator = proc.spawn::<ProcSupervisionCoordinator>("coordinator", actor)?;
56        proc.set_supervision_coordinator(coordinator.port())?;
57        Ok((ReportedEvent { rx, last }, coordinator))
58    }
59}
60
61/// Collects supervision events reported to [ProcSupervisionCoordinator].
62#[derive(Debug)]
63pub struct ReportedEvent {
64    rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
65    last: Arc<Mutex<Option<ActorSupervisionEvent>>>,
66}
67
68impl ReportedEvent {
69    /// The last event reported to the coordinator.
70    pub fn event(&self) -> Option<ActorSupervisionEvent> {
71        self.last.lock().unwrap().clone()
72    }
73
74    /// Wait until the coordinator receives an event.
75    pub async fn recv(&mut self) -> ActorSupervisionEvent {
76        self.rx
77            .recv()
78            .await
79            .expect("coordinator sender dropped without sending an event")
80    }
81}
82
83#[async_trait]
84impl Actor for ProcSupervisionCoordinator {}
85
86#[async_trait]
87impl Handler<ActorSupervisionEvent> for ProcSupervisionCoordinator {
88    async fn handle(
89        &mut self,
90        _cx: &Context<Self>,
91        msg: ActorSupervisionEvent,
92    ) -> anyhow::Result<()> {
93        tracing::debug!("in handler, handling supervision event");
94        *self.last.lock().unwrap() = Some(msg.clone());
95        let _ = self.tx.send(msg);
96        Ok(())
97    }
98}