Skip to main content

hyperactor/testing/
pingpong.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::time::Duration;
10
11use async_trait::async_trait;
12use hyperactor_config::Flattrs;
13use serde::Deserialize;
14use serde::Serialize;
15
16use crate as hyperactor; // for macros
17use crate::Actor;
18use crate::ActorRef;
19use crate::Context;
20use crate::Handler;
21use crate::Instance;
22use crate::OncePortRef;
23use crate::PortRef;
24use crate::RemoteSpawn;
25use crate::endpoint::Endpoint as _;
26use crate::mailbox::MessageEnvelope;
27use crate::mailbox::Undeliverable;
28use crate::mailbox::UndeliverableMessageError;
29
30/// A message that can be passed around. It contains
31/// 0. the TTL of this PingPong game
32/// 1. the next actor to send the message to
33/// 2. a port to send a true value to when TTL = 0.
34#[derive(Serialize, Deserialize, Debug, typeuri::Named)]
35pub struct PingPongMessage(pub u64, pub ActorRef<PingPongActor>, pub OncePortRef<bool>);
36wirevalue::register_type!(PingPongMessage);
37
38/// A PingPong actor that can play the PingPong game by sending messages around.
39#[derive(Debug)]
40#[hyperactor::export(PingPongMessage)]
41#[hyperactor::spawnable]
42pub struct PingPongActor {
43    /// A port to send undeliverable messages to.
44    undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
45    /// The TTL at which the actor will exit with error.
46    error_ttl: Option<u64>,
47    /// Manual delay before sending handling the message.
48    delay: Option<Duration>,
49}
50
51impl PingPongActor {
52    /// Create a new ping pong actor with the following parameters:
53    ///
54    /// - `undeliverable_port_ref`: A port to send undeliverable messages to.
55    /// - `error_ttl`: The TTL at which the actor will exit with error.
56    /// - `delay`: Manual delay before sending handling the message.
57    pub fn new(
58        undeliverable_port_ref: Option<PortRef<Undeliverable<MessageEnvelope>>>,
59        error_ttl: Option<u64>,
60        delay: Option<Duration>,
61    ) -> Self {
62        Self {
63            undeliverable_port_ref,
64            error_ttl,
65            delay,
66        }
67    }
68}
69
70#[async_trait]
71impl RemoteSpawn for PingPongActor {
72    type Params = (
73        Option<PortRef<Undeliverable<MessageEnvelope>>>,
74        Option<u64>,
75        Option<Duration>,
76    );
77
78    async fn new(
79        (undeliverable_port_ref, error_ttl, delay): Self::Params,
80        _environment: Flattrs,
81    ) -> anyhow::Result<Self> {
82        Ok(Self::new(undeliverable_port_ref, error_ttl, delay))
83    }
84}
85
86#[async_trait]
87impl Actor for PingPongActor {
88    // This is an override of the default actor behavior. It is used
89    // for testing the mechanism for returning undeliverable messages to
90    // their senders.
91    async fn handle_undeliverable_message(
92        &mut self,
93        cx: &Instance<Self>,
94        undelivered: crate::mailbox::Undeliverable<crate::mailbox::MessageEnvelope>,
95    ) -> Result<(), anyhow::Error> {
96        match &self.undeliverable_port_ref {
97            Some(port) => port.post(cx, undelivered),
98            None => match undelivered {
99                Undeliverable::Message(envelope) => {
100                    anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
101                }
102                Undeliverable::Lost(lost) => {
103                    anyhow::bail!(UndeliverableMessageError::Lost { lost });
104                }
105            },
106        }
107
108        Ok(())
109    }
110}
111
112#[async_trait]
113impl Handler<PingPongMessage> for PingPongActor {
114    /// Handles the PingPong Message. It will send the message to th actor specified in the
115    /// PingPongMessage if TTL > 0. And deliver a true to the done port if TTL = 0.
116    /// It also panics if TTL == 66 for testing purpose.
117    async fn handle(
118        &mut self,
119        cx: &Context<Self>,
120        PingPongMessage(ttl, pong_actor, done_port): PingPongMessage,
121    ) -> anyhow::Result<()> {
122        // PingPongActor sends the messages back and forth. When it's ttl = 0, it will stop.
123        // User can set a preconfigured TTL that can cause mocked problem: such as an error.
124        if Some(ttl) == self.error_ttl {
125            anyhow::bail!("PingPong handler encountered an Error");
126        }
127        if ttl == 0 {
128            done_port.post(cx, true);
129        } else {
130            if let Some(delay) = self.delay {
131                tokio::time::sleep(delay).await;
132            }
133            let next_message = PingPongMessage(ttl - 1, cx.bind(), done_port);
134            pong_actor.post(cx, next_message);
135        }
136        Ok(())
137    }
138}