hyperactor/test_utils/
pingpong.rs1use std::time::Duration;
10
11use async_trait::async_trait;
12use serde::Deserialize;
13use serde::Serialize;
14
15use crate as hyperactor; use crate::Actor;
17use crate::ActorRef;
18use crate::Context;
19use crate::Handler;
20use crate::Instance;
21use crate::OncePortRef;
22use crate::PortRef;
23use crate::RemoteSpawn;
24use crate::clock::Clock;
25use crate::clock::RealClock;
26use crate::mailbox::MessageEnvelope;
27use crate::mailbox::Undeliverable;
28use crate::mailbox::UndeliverableMessageError;
29
30#[derive(Serialize, Deserialize, Debug, typeuri::Named)]
35pub struct PingPongMessage(pub u64, pub ActorRef<PingPongActor>, pub OncePortRef<bool>);
36wirevalue::register_type!(PingPongMessage);
37
38#[derive(Debug)]
40#[hyperactor::export(spawn = true, handlers = [PingPongMessage])]
41pub struct PingPongActor {
42 undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
44 error_ttl: Option<u64>,
46 delay: Option<Duration>,
48}
49
50impl PingPongActor {
51 pub fn new(
57 undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
58 error_ttl: Option<u64>,
59 delay: Option<Duration>,
60 ) -> Self {
61 Self {
62 undeliverable_port_ref,
63 error_ttl,
64 delay,
65 }
66 }
67}
68
69#[async_trait]
70impl RemoteSpawn for PingPongActor {
71 type Params = (
72 Option<PortRef<Undeliverable<MessageEnvelope>>>,
73 Option<u64>,
74 Option<Duration>,
75 );
76
77 async fn new((undeliverable_port_ref, error_ttl, delay): Self::Params) -> anyhow::Result<Self> {
78 Ok(Self::new(undeliverable_port_ref, error_ttl, delay))
79 }
80}
81
82#[async_trait]
83impl Actor for PingPongActor {
84 async fn handle_undeliverable_message(
88 &mut self,
89 cx: &Instance<Self>,
90 undelivered: crate::mailbox::Undeliverable<crate::mailbox::MessageEnvelope>,
91 ) -> Result<(), anyhow::Error> {
92 match &self.undeliverable_port_ref {
93 Some(port) => port.send(cx, undelivered).unwrap(),
94 None => {
95 let Undeliverable(envelope) = undelivered;
96 anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
97 }
98 }
99
100 Ok(())
101 }
102}
103
104#[async_trait]
105impl Handler<PingPongMessage> for PingPongActor {
106 async fn handle(
110 &mut self,
111 cx: &Context<Self>,
112 PingPongMessage(ttl, pong_actor, done_port): PingPongMessage,
113 ) -> anyhow::Result<()> {
114 if Some(ttl) == self.error_ttl {
117 anyhow::bail!("PingPong handler encountered an Error");
118 }
119 if ttl == 0 {
120 done_port.send(cx, true)?;
121 } else {
122 if let Some(delay) = self.delay {
123 RealClock.sleep(delay).await;
124 }
125 let next_message = PingPongMessage(ttl - 1, cx.bind(), done_port);
126 pong_actor.send(cx, next_message)?;
127 }
128 Ok(())
129 }
130}