hyperactor/test_utils/
pingpong.rs1use std::time::Duration;
10
11use async_trait::async_trait;
12use serde::Deserialize;
13use serde::Serialize;
14
15use crate as hyperactor; use crate::Actor;
17use crate::ActorRef;
18use crate::Context;
19use crate::Handler;
20use crate::Instance;
21use crate::Named;
22use crate::OncePortRef;
23use crate::PortRef;
24use crate::clock::Clock;
25use crate::clock::RealClock;
26use crate::mailbox::MessageEnvelope;
27use crate::mailbox::Undeliverable;
28use crate::mailbox::UndeliverableMessageError;
29
30#[derive(Serialize, Deserialize, Debug, Named)]
35pub struct PingPongMessage(pub u64, pub ActorRef<PingPongActor>, pub OncePortRef<bool>);
36
37#[derive(Debug, Named, Serialize, Deserialize, Clone)]
39pub struct PingPongActorParams {
40 undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
42 error_ttl: Option<u64>,
44 delay: Option<Duration>,
46}
47
48impl PingPongActorParams {
49 pub fn new(
51 undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
52 error_ttl: Option<u64>,
53 ) -> Self {
54 Self {
55 undeliverable_port_ref,
56 error_ttl,
57 delay: None,
58 }
59 }
60
61 pub fn set_delay(&mut self, delay: Duration) {
63 self.delay = Some(delay);
64 }
65}
66
67#[derive(Debug)]
69#[hyperactor::export(handlers = [PingPongMessage])]
70pub struct PingPongActor {
71 params: PingPongActorParams,
72}
73
74#[async_trait]
75impl Actor for PingPongActor {
76 type Params = PingPongActorParams;
77
78 async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
79 Ok(Self { params })
80 }
81
82 async fn handle_undeliverable_message(
86 &mut self,
87 cx: &Instance<Self>,
88 undelivered: crate::mailbox::Undeliverable<crate::mailbox::MessageEnvelope>,
89 ) -> Result<(), anyhow::Error> {
90 match &self.params.undeliverable_port_ref {
91 Some(port) => port.send(cx, undelivered).unwrap(),
92 None => {
93 let Undeliverable(envelope) = &undelivered;
94 anyhow::bail!(UndeliverableMessageError::delivery_failure(envelope));
95 }
96 }
97
98 Ok(())
99 }
100}
101
102#[async_trait]
103impl Handler<PingPongMessage> for PingPongActor {
104 async fn handle(
108 &mut self,
109 cx: &Context<Self>,
110 PingPongMessage(ttl, pong_actor, done_port): PingPongMessage,
111 ) -> anyhow::Result<()> {
112 if Some(ttl) == self.params.error_ttl {
115 anyhow::bail!("PingPong handler encountered an Error");
116 }
117 if ttl == 0 {
118 done_port.send(cx, true)?;
119 } else {
120 if let Some(delay) = self.params.delay {
121 RealClock.sleep(delay).await;
122 }
123 let next_message = PingPongMessage(ttl - 1, cx.bind(), done_port);
124 pong_actor.send(cx, next_message)?;
125 }
126 Ok(())
127 }
128}
129
130hyperactor::remote!(PingPongActor);