hyperactor/testing/
pingpong.rs1use std::time::Duration;
10
11use async_trait::async_trait;
12use hyperactor_config::Flattrs;
13use serde::Deserialize;
14use serde::Serialize;
15
16use crate as hyperactor; use crate::Actor;
18use crate::Context;
19use crate::Handler;
20use crate::Instance;
21use crate::RemoteSpawn;
22use crate::mailbox::MessageEnvelope;
23use crate::mailbox::Undeliverable;
24use crate::mailbox::UndeliverableMessageError;
25use crate::reference;
26
27#[derive(Serialize, Deserialize, Debug, typeuri::Named)]
32pub struct PingPongMessage(
33 pub u64,
34 pub reference::ActorRef<PingPongActor>,
35 pub reference::OncePortRef<bool>,
36);
37wirevalue::register_type!(PingPongMessage);
38
39#[derive(Debug)]
41#[hyperactor::export(spawn = true, handlers = [PingPongMessage])]
42pub struct PingPongActor {
43 undeliverable_port_ref: Option<reference::PortRef<Undeliverable<MessageEnvelope>>>,
45 error_ttl: Option<u64>,
47 delay: Option<Duration>,
49}
50
51impl PingPongActor {
52 pub fn new(
58 undeliverable_port_ref: Option<reference::PortRef<Undeliverable<MessageEnvelope>>>,
59 error_ttl: Option<u64>,
60 delay: Option<Duration>,
61 ) -> Self {
62 Self {
63 undeliverable_port_ref,
64 error_ttl,
65 delay,
66 }
67 }
68}
69
70#[async_trait]
71impl RemoteSpawn for PingPongActor {
72 type Params = (
73 Option<reference::PortRef<Undeliverable<MessageEnvelope>>>,
74 Option<u64>,
75 Option<Duration>,
76 );
77
78 async fn new(
79 (undeliverable_port_ref, error_ttl, delay): Self::Params,
80 _environment: Flattrs,
81 ) -> anyhow::Result<Self> {
82 Ok(Self::new(undeliverable_port_ref, error_ttl, delay))
83 }
84}
85
86#[async_trait]
87impl Actor for PingPongActor {
88 async fn handle_undeliverable_message(
92 &mut self,
93 cx: &Instance<Self>,
94 undelivered: crate::mailbox::Undeliverable<crate::mailbox::MessageEnvelope>,
95 ) -> Result<(), anyhow::Error> {
96 match &self.undeliverable_port_ref {
97 Some(port) => port.send(cx, undelivered).unwrap(),
98 None => {
99 let Undeliverable(envelope) = undelivered;
100 anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
101 }
102 }
103
104 Ok(())
105 }
106}
107
108#[async_trait]
109impl Handler<PingPongMessage> for PingPongActor {
110 async fn handle(
114 &mut self,
115 cx: &Context<Self>,
116 PingPongMessage(ttl, pong_actor, done_port): PingPongMessage,
117 ) -> anyhow::Result<()> {
118 if Some(ttl) == self.error_ttl {
121 anyhow::bail!("PingPong handler encountered an Error");
122 }
123 if ttl == 0 {
124 done_port.send(cx, true)?;
125 } else {
126 if let Some(delay) = self.delay {
127 tokio::time::sleep(delay).await;
128 }
129 let next_message = PingPongMessage(ttl - 1, cx.bind(), done_port);
130 pong_actor.send(cx, next_message)?;
131 }
132 Ok(())
133 }
134}