hyperactor/testing/
pingpong.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::time::Duration;
10
11use async_trait::async_trait;
12use hyperactor_config::Flattrs;
13use serde::Deserialize;
14use serde::Serialize;
15
16use crate as hyperactor; // for macros
17use crate::Actor;
18use crate::Context;
19use crate::Handler;
20use crate::Instance;
21use crate::RemoteSpawn;
22use crate::mailbox::MessageEnvelope;
23use crate::mailbox::Undeliverable;
24use crate::mailbox::UndeliverableMessageError;
25use crate::reference;
26
27/// A message that can be passed around. It contains
28/// 0. the TTL of this PingPong game
29/// 1. the next actor to send the message to
30/// 2. a port to send a true value to when TTL = 0.
31#[derive(Serialize, Deserialize, Debug, typeuri::Named)]
32pub struct PingPongMessage(
33    pub u64,
34    pub reference::ActorRef<PingPongActor>,
35    pub reference::OncePortRef<bool>,
36);
37wirevalue::register_type!(PingPongMessage);
38
39/// A PingPong actor that can play the PingPong game by sending messages around.
40#[derive(Debug)]
41#[hyperactor::export(spawn = true, handlers = [PingPongMessage])]
42pub struct PingPongActor {
43    /// A port to send undeliverable messages to.
44    undeliverable_port_ref: Option<reference::PortRef<Undeliverable<MessageEnvelope>>>,
45    /// The TTL at which the actor will exit with error.
46    error_ttl: Option<u64>,
47    /// Manual delay before sending handling the message.
48    delay: Option<Duration>,
49}
50
51impl PingPongActor {
52    /// Create a new ping pong actor with the following parameters:
53    ///
54    /// - `undeliverable_port_ref`: A port to send undeliverable messages to.
55    /// - `error_ttl`: The TTL at which the actor will exit with error.
56    /// - `delay`: Manual delay before sending handling the message.
57    pub fn new(
58        undeliverable_port_ref: Option<reference::PortRef<Undeliverable<MessageEnvelope>>>,
59        error_ttl: Option<u64>,
60        delay: Option<Duration>,
61    ) -> Self {
62        Self {
63            undeliverable_port_ref,
64            error_ttl,
65            delay,
66        }
67    }
68}
69
70#[async_trait]
71impl RemoteSpawn for PingPongActor {
72    type Params = (
73        Option<reference::PortRef<Undeliverable<MessageEnvelope>>>,
74        Option<u64>,
75        Option<Duration>,
76    );
77
78    async fn new(
79        (undeliverable_port_ref, error_ttl, delay): Self::Params,
80        _environment: Flattrs,
81    ) -> anyhow::Result<Self> {
82        Ok(Self::new(undeliverable_port_ref, error_ttl, delay))
83    }
84}
85
86#[async_trait]
87impl Actor for PingPongActor {
88    // This is an override of the default actor behavior. It is used
89    // for testing the mechanism for returning undeliverable messages to
90    // their senders.
91    async fn handle_undeliverable_message(
92        &mut self,
93        cx: &Instance<Self>,
94        undelivered: crate::mailbox::Undeliverable<crate::mailbox::MessageEnvelope>,
95    ) -> Result<(), anyhow::Error> {
96        match &self.undeliverable_port_ref {
97            Some(port) => port.send(cx, undelivered).unwrap(),
98            None => {
99                let Undeliverable(envelope) = undelivered;
100                anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
101            }
102        }
103
104        Ok(())
105    }
106}
107
108#[async_trait]
109impl Handler<PingPongMessage> for PingPongActor {
110    /// Handles the PingPong Message. It will send the message to th actor specified in the
111    /// PingPongMessage if TTL > 0. And deliver a true to the done port if TTL = 0.
112    /// It also panics if TTL == 66 for testing purpose.
113    async fn handle(
114        &mut self,
115        cx: &Context<Self>,
116        PingPongMessage(ttl, pong_actor, done_port): PingPongMessage,
117    ) -> anyhow::Result<()> {
118        // PingPongActor sends the messages back and forth. When it's ttl = 0, it will stop.
119        // User can set a preconfigured TTL that can cause mocked problem: such as an error.
120        if Some(ttl) == self.error_ttl {
121            anyhow::bail!("PingPong handler encountered an Error");
122        }
123        if ttl == 0 {
124            done_port.send(cx, true)?;
125        } else {
126            if let Some(delay) = self.delay {
127                tokio::time::sleep(delay).await;
128            }
129            let next_message = PingPongMessage(ttl - 1, cx.bind(), done_port);
130            pong_actor.send(cx, next_message)?;
131        }
132        Ok(())
133    }
134}