hyperactor/test_utils/
proc_supervison.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::sync::Arc;
10use std::sync::Mutex;
11
12use async_trait::async_trait;
13
14use crate::Actor;
15use crate::Context;
16use crate::Handler;
17use crate::proc::Proc;
18use crate::supervision::ActorSupervisionEvent;
19
20/// Used to create a proc supervison coordinator for testing purposes. Normally you
21/// should not use this struct. It is only required in the following cases:
22///   1. The tests' logic involves actor failures;
23///   2. A supervison coordinator is not already set for the proc (e.g. the
24///      ProcActor scenario which will be explained later.)
25///
26///   This is because hyperactor's supervision logic requires actor failures in
27///   a proc to be bubbled up to through the supervision chain:
28///      
29///   grandchild actor -> child actor -> root actor -> proc supervison coordinator
30///
31///   If the the proc supervison coordinator is not set, supervision will crash the
32///   process because it cannot find the coordinator during the "bubbling up".
33#[derive(Debug)]
34pub struct ProcSupervisionCoordinator(ReportedEvent);
35
36impl ProcSupervisionCoordinator {
37    /// Spawn a coordinator actor and set it as the coordinator for the given
38    /// proc.
39    pub async fn set(proc: &Proc) -> Result<ReportedEvent, anyhow::Error> {
40        let state = ReportedEvent::new();
41        let actor = ProcSupervisionCoordinator(state.clone());
42        let coordinator = proc.spawn::<ProcSupervisionCoordinator>("coordinator", actor)?;
43        proc.set_supervision_coordinator(coordinator.port())?;
44        Ok(state)
45    }
46}
47
48/// Used to store the last event reported to [ProcSupervisionCoordinator].
49#[derive(Clone, Debug)]
50pub struct ReportedEvent(Arc<Mutex<Option<ActorSupervisionEvent>>>);
51impl ReportedEvent {
52    fn new() -> Self {
53        Self(Arc::new(Mutex::new(None)))
54    }
55
56    /// The last event reported to the coordinator.
57    pub fn event(&self) -> Option<ActorSupervisionEvent> {
58        self.0.lock().unwrap().clone()
59    }
60
61    fn set(&self, event: ActorSupervisionEvent) {
62        *self.0.lock().unwrap() = Some(event);
63    }
64}
65
66#[async_trait]
67impl Actor for ProcSupervisionCoordinator {}
68
69#[async_trait]
70impl Handler<ActorSupervisionEvent> for ProcSupervisionCoordinator {
71    async fn handle(
72        &mut self,
73        _cx: &Context<Self>,
74        msg: ActorSupervisionEvent,
75    ) -> anyhow::Result<()> {
76        tracing::debug!("in handler, handling supervision event");
77        self.0.set(msg);
78        Ok(())
79    }
80}