hyperactor/test_utils/
pingpong.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::time::Duration;
10
11use async_trait::async_trait;
12use serde::Deserialize;
13use serde::Serialize;
14
15use crate as hyperactor; // for macros
16use crate::Actor;
17use crate::ActorRef;
18use crate::Context;
19use crate::Handler;
20use crate::Instance;
21use crate::OncePortRef;
22use crate::PortRef;
23use crate::RemoteSpawn;
24use crate::clock::Clock;
25use crate::clock::RealClock;
26use crate::mailbox::MessageEnvelope;
27use crate::mailbox::Undeliverable;
28use crate::mailbox::UndeliverableMessageError;
29
30/// A message that can be passed around. It contains
31/// 0. the TTL of this PingPong game
32/// 1. the next actor to send the message to
33/// 2. a port to send a true value to when TTL = 0.
34#[derive(Serialize, Deserialize, Debug, typeuri::Named)]
35pub struct PingPongMessage(pub u64, pub ActorRef<PingPongActor>, pub OncePortRef<bool>);
36wirevalue::register_type!(PingPongMessage);
37
38/// A PingPong actor that can play the PingPong game by sending messages around.
39#[derive(Debug)]
40#[hyperactor::export(spawn = true, handlers = [PingPongMessage])]
41pub struct PingPongActor {
42    /// A port to send undeliverable messages to.
43    undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
44    /// The TTL at which the actor will exit with error.
45    error_ttl: Option<u64>,
46    /// Manual delay before sending handling the message.
47    delay: Option<Duration>,
48}
49
50impl PingPongActor {
51    /// Create a new ping pong actor with the following parameters:
52    ///
53    /// - `undeliverable_port_ref`: A port to send undeliverable messages to.
54    /// - `error_ttl`: The TTL at which the actor will exit with error.
55    /// - `delay`: Manual delay before sending handling the message.
56    pub fn new(
57        undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
58        error_ttl: Option<u64>,
59        delay: Option<Duration>,
60    ) -> Self {
61        Self {
62            undeliverable_port_ref,
63            error_ttl,
64            delay,
65        }
66    }
67}
68
69#[async_trait]
70impl RemoteSpawn for PingPongActor {
71    type Params = (
72        Option<PortRef<Undeliverable<MessageEnvelope>>>,
73        Option<u64>,
74        Option<Duration>,
75    );
76
77    async fn new((undeliverable_port_ref, error_ttl, delay): Self::Params) -> anyhow::Result<Self> {
78        Ok(Self::new(undeliverable_port_ref, error_ttl, delay))
79    }
80}
81
82#[async_trait]
83impl Actor for PingPongActor {
84    // This is an override of the default actor behavior. It is used
85    // for testing the mechanism for returning undeliverable messages to
86    // their senders.
87    async fn handle_undeliverable_message(
88        &mut self,
89        cx: &Instance<Self>,
90        undelivered: crate::mailbox::Undeliverable<crate::mailbox::MessageEnvelope>,
91    ) -> Result<(), anyhow::Error> {
92        match &self.undeliverable_port_ref {
93            Some(port) => port.send(cx, undelivered).unwrap(),
94            None => {
95                let Undeliverable(envelope) = undelivered;
96                anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
97            }
98        }
99
100        Ok(())
101    }
102}
103
104#[async_trait]
105impl Handler<PingPongMessage> for PingPongActor {
106    /// Handles the PingPong Message. It will send the message to th actor specified in the
107    /// PingPongMessage if TTL > 0. And deliver a true to the done port if TTL = 0.
108    /// It also panics if TTL == 66 for testing purpose.
109    async fn handle(
110        &mut self,
111        cx: &Context<Self>,
112        PingPongMessage(ttl, pong_actor, done_port): PingPongMessage,
113    ) -> anyhow::Result<()> {
114        // PingPongActor sends the messages back and forth. When it's ttl = 0, it will stop.
115        // User can set a preconfigured TTL that can cause mocked problem: such as an error.
116        if Some(ttl) == self.error_ttl {
117            anyhow::bail!("PingPong handler encountered an Error");
118        }
119        if ttl == 0 {
120            done_port.send(cx, true)?;
121        } else {
122            if let Some(delay) = self.delay {
123                RealClock.sleep(delay).await;
124            }
125            let next_message = PingPongMessage(ttl - 1, cx.bind(), done_port);
126            pong_actor.send(cx, next_message)?;
127        }
128        Ok(())
129    }
130}