hyperactor/test_utils/
proc_supervison.rs1use std::sync::Arc;
10use std::sync::Mutex;
11
12use async_trait::async_trait;
13
14use crate::Actor;
15use crate::Context;
16use crate::Handler;
17use crate::proc::Proc;
18use crate::supervision::ActorSupervisionEvent;
19
20#[derive(Debug)]
34pub struct ProcSupervisionCoordinator(ReportedEvent);
35
36impl ProcSupervisionCoordinator {
37 pub async fn set(proc: &Proc) -> Result<ReportedEvent, anyhow::Error> {
40 let state = ReportedEvent::new();
41 let actor = ProcSupervisionCoordinator(state.clone());
42 let coordinator = proc.spawn::<ProcSupervisionCoordinator>("coordinator", actor)?;
43 proc.set_supervision_coordinator(coordinator.port())?;
44 Ok(state)
45 }
46}
47
48#[derive(Clone, Debug)]
50pub struct ReportedEvent(Arc<Mutex<Option<ActorSupervisionEvent>>>);
51impl ReportedEvent {
52 fn new() -> Self {
53 Self(Arc::new(Mutex::new(None)))
54 }
55
56 pub fn event(&self) -> Option<ActorSupervisionEvent> {
58 self.0.lock().unwrap().clone()
59 }
60
61 fn set(&self, event: ActorSupervisionEvent) {
62 *self.0.lock().unwrap() = Some(event);
63 }
64}
65
66#[async_trait]
67impl Actor for ProcSupervisionCoordinator {}
68
69#[async_trait]
70impl Handler<ActorSupervisionEvent> for ProcSupervisionCoordinator {
71 async fn handle(
72 &mut self,
73 _cx: &Context<Self>,
74 msg: ActorSupervisionEvent,
75 ) -> anyhow::Result<()> {
76 tracing::debug!("in handler, handling supervision event");
77 self.0.set(msg);
78 Ok(())
79 }
80}