hyperactor/testing/
proc_supervison.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::sync::Arc;
10use std::sync::Mutex;
11
12use async_trait::async_trait;
13
14use crate::Actor;
15use crate::ActorHandle;
16use crate::Context;
17use crate::Handler;
18use crate::proc::Proc;
19use crate::supervision::ActorSupervisionEvent;
20
21/// Used to create a proc supervison coordinator for testing purposes. Normally you
22/// should not use this struct. It is only required in the following cases:
23///   1. The tests' logic involves actor failures;
24///   2. A supervison coordinator is not already set for the proc (e.g. the
25///      ProcActor scenario which will be explained later.)
26///
27///   This is because hyperactor's supervision logic requires actor failures in
28///   a proc to be bubbled up to through the supervision chain:
29///      
30///   grandchild actor -> child actor -> root actor -> proc supervison coordinator
31///
32///   If the the proc supervison coordinator is not set, supervision will crash the
33///   process because it cannot find the coordinator during the "bubbling up".
34#[derive(Debug)]
35pub struct ProcSupervisionCoordinator(ReportedEvent);
36
37impl ProcSupervisionCoordinator {
38    /// Spawn a coordinator actor and set it as the coordinator for the given
39    /// proc. Returns the reported event state and the coordinator actor handle.
40    /// Callers should drain and stop the coordinator handle when the test is
41    /// done to avoid ASAN thread-leak errors.
42    pub async fn set(
43        proc: &Proc,
44    ) -> Result<(ReportedEvent, ActorHandle<ProcSupervisionCoordinator>), anyhow::Error> {
45        let state = ReportedEvent::new();
46        let actor = ProcSupervisionCoordinator(state.clone());
47        let coordinator = proc.spawn::<ProcSupervisionCoordinator>("coordinator", actor)?;
48        proc.set_supervision_coordinator(coordinator.port())?;
49        Ok((state, coordinator))
50    }
51}
52
53/// Used to store the last event reported to [ProcSupervisionCoordinator].
54#[derive(Clone, Debug)]
55pub struct ReportedEvent(Arc<Mutex<Option<ActorSupervisionEvent>>>);
56impl ReportedEvent {
57    fn new() -> Self {
58        Self(Arc::new(Mutex::new(None)))
59    }
60
61    /// The last event reported to the coordinator.
62    pub fn event(&self) -> Option<ActorSupervisionEvent> {
63        self.0.lock().unwrap().clone()
64    }
65
66    fn set(&self, event: ActorSupervisionEvent) {
67        *self.0.lock().unwrap() = Some(event);
68    }
69}
70
71#[async_trait]
72impl Actor for ProcSupervisionCoordinator {}
73
74#[async_trait]
75impl Handler<ActorSupervisionEvent> for ProcSupervisionCoordinator {
76    async fn handle(
77        &mut self,
78        _cx: &Context<Self>,
79        msg: ActorSupervisionEvent,
80    ) -> anyhow::Result<()> {
81        tracing::debug!("in handler, handling supervision event");
82        self.0.set(msg);
83        Ok(())
84    }
85}