hyperactor/test_utils/
pingpong.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::time::Duration;
10
11use async_trait::async_trait;
12use serde::Deserialize;
13use serde::Serialize;
14
15use crate as hyperactor; // for macros
16use crate::Actor;
17use crate::ActorRef;
18use crate::Context;
19use crate::Handler;
20use crate::Instance;
21use crate::Named;
22use crate::OncePortRef;
23use crate::PortRef;
24use crate::clock::Clock;
25use crate::clock::RealClock;
26use crate::mailbox::MessageEnvelope;
27use crate::mailbox::Undeliverable;
28use crate::mailbox::UndeliverableMessageError;
29
30/// A message that can be passed around. It contains
31/// 0. the TTL of this PingPong game
32/// 1. the next actor to send the message to
33/// 2. a port to send a true value to when TTL = 0.
34#[derive(Serialize, Deserialize, Debug, Named)]
35pub struct PingPongMessage(pub u64, pub ActorRef<PingPongActor>, pub OncePortRef<bool>);
36
37/// Initialization parameters for `PingPongActor`s.
38#[derive(Debug, Named, Serialize, Deserialize, Clone)]
39pub struct PingPongActorParams {
40    /// A port to send undeliverable messages to.
41    undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
42    /// The TTL at which the actor will exit with error.
43    error_ttl: Option<u64>,
44    /// Manual delay before sending handling the message.
45    delay: Option<Duration>,
46}
47
48impl PingPongActorParams {
49    /// Create a new set of initialization parameters.
50    pub fn new(
51        undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
52        error_ttl: Option<u64>,
53    ) -> Self {
54        Self {
55            undeliverable_port_ref,
56            error_ttl,
57            delay: None,
58        }
59    }
60
61    /// Set the delay
62    pub fn set_delay(&mut self, delay: Duration) {
63        self.delay = Some(delay);
64    }
65}
66
67/// A PingPong actor that can play the PingPong game by sending messages around.
68#[derive(Debug)]
69#[hyperactor::export(handlers = [PingPongMessage])]
70pub struct PingPongActor {
71    params: PingPongActorParams,
72}
73
74#[async_trait]
75impl Actor for PingPongActor {
76    type Params = PingPongActorParams;
77
78    async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
79        Ok(Self { params })
80    }
81
82    // This is an override of the default actor behavior. It is used
83    // for testing the mechanism for returning undeliverable messages to
84    // their senders.
85    async fn handle_undeliverable_message(
86        &mut self,
87        cx: &Instance<Self>,
88        undelivered: crate::mailbox::Undeliverable<crate::mailbox::MessageEnvelope>,
89    ) -> Result<(), anyhow::Error> {
90        match &self.params.undeliverable_port_ref {
91            Some(port) => port.send(cx, undelivered).unwrap(),
92            None => {
93                let Undeliverable(envelope) = &undelivered;
94                anyhow::bail!(UndeliverableMessageError::delivery_failure(envelope));
95            }
96        }
97
98        Ok(())
99    }
100}
101
102#[async_trait]
103impl Handler<PingPongMessage> for PingPongActor {
104    /// Handles the PingPong Message. It will send the message to th actor specified in the
105    /// PingPongMessage if TTL > 0. And deliver a true to the done port if TTL = 0.
106    /// It also panics if TTL == 66 for testing purpose.
107    async fn handle(
108        &mut self,
109        cx: &Context<Self>,
110        PingPongMessage(ttl, pong_actor, done_port): PingPongMessage,
111    ) -> anyhow::Result<()> {
112        // PingPongActor sends the messages back and forth. When it's ttl = 0, it will stop.
113        // User can set a preconfigured TTL that can cause mocked problem: such as an error.
114        if Some(ttl) == self.params.error_ttl {
115            anyhow::bail!("PingPong handler encountered an Error");
116        }
117        if ttl == 0 {
118            done_port.send(cx, true)?;
119        } else {
120            if let Some(delay) = self.params.delay {
121                RealClock.sleep(delay).await;
122            }
123            let next_message = PingPongMessage(ttl - 1, cx.bind(), done_port);
124            pong_actor.send(cx, next_message)?;
125        }
126        Ok(())
127    }
128}
129
130hyperactor::remote!(PingPongActor);