hyperactor/test_utils/proc_supervison.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::sync::Arc;
10use std::sync::Mutex;
11
12use async_trait::async_trait;
13
14use crate::Actor;
15use crate::Context;
16use crate::Handler;
17use crate::proc::Proc;
18use crate::supervision::ActorSupervisionEvent;
19
20/// Used to create a proc supervison coordinator for testing purposes. Normally you
21/// should not use this struct. It is only required in the following cases:
22/// 1. The tests' logic involves actor failures;
23/// 2. A supervison coordinator is not already set for the proc (e.g. the
24/// ProcActor scenario which will be explained later.)
25///
26/// This is because hyperactor's supervision logic requires actor failures in
27/// a proc to be bubbled up to through the supervision chain:
28///
29/// grandchild actor -> child actor -> root actor -> proc supervison coordinator
30///
31/// If the the proc supervison coordinator is not set, supervision will crash the
32/// process because it cannot find the coordinator during the "bubbling up".
33///
34/// Note that if you are using hyperactor_multiprocess' ProcActor bootstrap,
35/// the `ProcActor` will be set as the coordinator by the bootstrap. As a
36/// result, you do not need to set the supervior again with this struct.
37#[derive(Debug)]
38pub struct ProcSupervisionCoordinator(ReportedEvent);
39
40impl ProcSupervisionCoordinator {
41 /// Spawn a coordinator actor and set it as the coordinator for the given
42 /// proc.
43 pub async fn set(proc: &Proc) -> Result<ReportedEvent, anyhow::Error> {
44 let state = ReportedEvent::new();
45 let coordinator = proc
46 .spawn::<ProcSupervisionCoordinator>("coordinator", state.clone())
47 .await?;
48 proc.set_supervision_coordinator(coordinator.port())?;
49 Ok(state)
50 }
51}
52
53/// Used to store the last event reported to [ProcSupervisionCoordinator].
54#[derive(Clone, Debug)]
55pub struct ReportedEvent(Arc<Mutex<Option<ActorSupervisionEvent>>>);
56impl ReportedEvent {
57 fn new() -> Self {
58 Self(Arc::new(Mutex::new(None)))
59 }
60
61 /// The last event reported to the coordinator.
62 pub fn event(&self) -> Option<ActorSupervisionEvent> {
63 self.0.lock().unwrap().clone()
64 }
65
66 fn set(&self, event: ActorSupervisionEvent) {
67 *self.0.lock().unwrap() = Some(event);
68 }
69}
70
71#[async_trait]
72impl Actor for ProcSupervisionCoordinator {
73 type Params = ReportedEvent;
74
75 async fn new(param: ReportedEvent) -> Result<Self, anyhow::Error> {
76 Ok(Self(param))
77 }
78}
79
80#[async_trait]
81impl Handler<ActorSupervisionEvent> for ProcSupervisionCoordinator {
82 async fn handle(
83 &mut self,
84 _cx: &Context<Self>,
85 msg: ActorSupervisionEvent,
86 ) -> anyhow::Result<()> {
87 tracing::debug!("in handler, handling supervision event");
88 self.0.set(msg);
89 Ok(())
90 }
91}