hyperactor/testing/
pingpong.rs1use std::time::Duration;
10
11use async_trait::async_trait;
12use hyperactor_config::Flattrs;
13use serde::Deserialize;
14use serde::Serialize;
15
16use crate as hyperactor; use crate::Actor;
18use crate::ActorRef;
19use crate::Context;
20use crate::Handler;
21use crate::Instance;
22use crate::OncePortRef;
23use crate::PortRef;
24use crate::RemoteSpawn;
25use crate::endpoint::Endpoint as _;
26use crate::mailbox::MessageEnvelope;
27use crate::mailbox::Undeliverable;
28use crate::mailbox::UndeliverableMessageError;
29
30#[derive(Serialize, Deserialize, Debug, typeuri::Named)]
35pub struct PingPongMessage(pub u64, pub ActorRef<PingPongActor>, pub OncePortRef<bool>);
36wirevalue::register_type!(PingPongMessage);
37
38#[derive(Debug)]
40#[hyperactor::export(PingPongMessage)]
41#[hyperactor::spawnable]
42pub struct PingPongActor {
43 undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
45 error_ttl: Option<u64>,
47 delay: Option<Duration>,
49}
50
51impl PingPongActor {
52 pub fn new(
58 undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
59 error_ttl: Option<u64>,
60 delay: Option<Duration>,
61 ) -> Self {
62 Self {
63 undeliverable_port_ref,
64 error_ttl,
65 delay,
66 }
67 }
68}
69
70#[async_trait]
71impl RemoteSpawn for PingPongActor {
72 type Params = (
73 Option<PortRef<Undeliverable<MessageEnvelope>>>,
74 Option<u64>,
75 Option<Duration>,
76 );
77
78 async fn new(
79 (undeliverable_port_ref, error_ttl, delay): Self::Params,
80 _environment: Flattrs,
81 ) -> anyhow::Result<Self> {
82 Ok(Self::new(undeliverable_port_ref, error_ttl, delay))
83 }
84}
85
86#[async_trait]
87impl Actor for PingPongActor {
88 async fn handle_undeliverable_message(
92 &mut self,
93 cx: &Instance<Self>,
94 undelivered: crate::mailbox::Undeliverable<crate::mailbox::MessageEnvelope>,
95 ) -> Result<(), anyhow::Error> {
96 match &self.undeliverable_port_ref {
97 Some(port) => port.post(cx, undelivered),
98 None => match undelivered {
99 Undeliverable::Message(envelope) => {
100 anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
101 }
102 Undeliverable::Lost(lost) => {
103 anyhow::bail!(UndeliverableMessageError::Lost { lost });
104 }
105 },
106 }
107
108 Ok(())
109 }
110}
111
112#[async_trait]
113impl Handler<PingPongMessage> for PingPongActor {
114 async fn handle(
118 &mut self,
119 cx: &Context<Self>,
120 PingPongMessage(ttl, pong_actor, done_port): PingPongMessage,
121 ) -> anyhow::Result<()> {
122 if Some(ttl) == self.error_ttl {
125 anyhow::bail!("PingPong handler encountered an Error");
126 }
127 if ttl == 0 {
128 done_port.post(cx, true);
129 } else {
130 if let Some(delay) = self.delay {
131 tokio::time::sleep(delay).await;
132 }
133 let next_message = PingPongMessage(ttl - 1, cx.bind(), done_port);
134 pong_actor.post(cx, next_message);
135 }
136 Ok(())
137 }
138}