hyperactor/test_utils/
proc_supervison.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::sync::Arc;
10use std::sync::Mutex;
11
12use async_trait::async_trait;
13
14use crate::Actor;
15use crate::Context;
16use crate::Handler;
17use crate::proc::Proc;
18use crate::supervision::ActorSupervisionEvent;
19
20/// Used to create a proc supervison coordinator for testing purposes. Normally you
21/// should not use this struct. It is only required in the following cases:
22///   1. The tests' logic involves actor failures;
23///   2. A supervison coordinator is not already set for the proc (e.g. the
24///      ProcActor scenario which will be explained later.)
25///
26///   This is because hyperactor's supervision logic requires actor failures in
27///   a proc to be bubbled up to through the supervision chain:
28///      
29///   grandchild actor -> child actor -> root actor -> proc supervison coordinator
30///
31///   If the the proc supervison coordinator is not set, supervision will crash the
32///   process because it cannot find the coordinator during the "bubbling up".
33///
34///   Note that if you are using hyperactor_multiprocess' ProcActor bootstrap,
35///   the `ProcActor` will be set as the coordinator by the bootstrap. As a
36///   result, you do not need to set the supervior again with this struct.
37#[derive(Debug)]
38pub struct ProcSupervisionCoordinator(ReportedEvent);
39
40impl ProcSupervisionCoordinator {
41    /// Spawn a coordinator actor and set it as the coordinator for the given
42    /// proc.
43    pub async fn set(proc: &Proc) -> Result<ReportedEvent, anyhow::Error> {
44        let state = ReportedEvent::new();
45        let coordinator = proc
46            .spawn::<ProcSupervisionCoordinator>("coordinator", state.clone())
47            .await?;
48        proc.set_supervision_coordinator(coordinator.port())?;
49        Ok(state)
50    }
51}
52
53/// Used to store the last event reported to [ProcSupervisionCoordinator].
54#[derive(Clone, Debug)]
55pub struct ReportedEvent(Arc<Mutex<Option<ActorSupervisionEvent>>>);
56impl ReportedEvent {
57    fn new() -> Self {
58        Self(Arc::new(Mutex::new(None)))
59    }
60
61    /// The last event reported to the coordinator.
62    pub fn event(&self) -> Option<ActorSupervisionEvent> {
63        self.0.lock().unwrap().clone()
64    }
65
66    fn set(&self, event: ActorSupervisionEvent) {
67        *self.0.lock().unwrap() = Some(event);
68    }
69}
70
71#[async_trait]
72impl Actor for ProcSupervisionCoordinator {
73    type Params = ReportedEvent;
74
75    async fn new(param: ReportedEvent) -> Result<Self, anyhow::Error> {
76        Ok(Self(param))
77    }
78}
79
80#[async_trait]
81impl Handler<ActorSupervisionEvent> for ProcSupervisionCoordinator {
82    async fn handle(
83        &mut self,
84        _cx: &Context<Self>,
85        msg: ActorSupervisionEvent,
86    ) -> anyhow::Result<()> {
87        tracing::debug!("in handler, handling supervision event");
88        self.0.set(msg);
89        Ok(())
90    }
91}