hyperactor/testing/
proc_supervison.rs1use std::sync::Arc;
10use std::sync::Mutex;
11
12use async_trait::async_trait;
13use tokio::sync::mpsc;
14
15use crate::Actor;
16use crate::ActorHandle;
17use crate::Context;
18use crate::Handler;
19use crate::proc::Proc;
20use crate::supervision::ActorSupervisionEvent;
21
22#[derive(Debug)]
36pub struct ProcSupervisionCoordinator {
37 tx: mpsc::UnboundedSender<ActorSupervisionEvent>,
38 last: Arc<Mutex<Option<ActorSupervisionEvent>>>,
39}
40
41impl ProcSupervisionCoordinator {
42 pub async fn set(
47 proc: &Proc,
48 ) -> Result<(ReportedEvent, ActorHandle<ProcSupervisionCoordinator>), anyhow::Error> {
49 let (tx, rx) = mpsc::unbounded_channel();
50 let last = Arc::new(Mutex::new(None));
51 let actor = ProcSupervisionCoordinator {
52 tx,
53 last: last.clone(),
54 };
55 let coordinator = proc.spawn::<ProcSupervisionCoordinator>("coordinator", actor)?;
56 proc.set_supervision_coordinator(coordinator.port())?;
57 Ok((ReportedEvent { rx, last }, coordinator))
58 }
59}
60
61#[derive(Debug)]
63pub struct ReportedEvent {
64 rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
65 last: Arc<Mutex<Option<ActorSupervisionEvent>>>,
66}
67
68impl ReportedEvent {
69 pub fn event(&self) -> Option<ActorSupervisionEvent> {
71 self.last.lock().unwrap().clone()
72 }
73
74 pub async fn recv(&mut self) -> ActorSupervisionEvent {
76 self.rx
77 .recv()
78 .await
79 .expect("coordinator sender dropped without sending an event")
80 }
81}
82
83#[async_trait]
84impl Actor for ProcSupervisionCoordinator {}
85
86#[async_trait]
87impl Handler<ActorSupervisionEvent> for ProcSupervisionCoordinator {
88 async fn handle(
89 &mut self,
90 _cx: &Context<Self>,
91 msg: ActorSupervisionEvent,
92 ) -> anyhow::Result<()> {
93 tracing::debug!("in handler, handling supervision event");
94 *self.last.lock().unwrap() = Some(msg.clone());
95 let _ = self.tx.send(msg);
96 Ok(())
97 }
98}