hyperactor_multiprocess/
ping_pong.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#[cfg(test)]
10mod tests {
11    use std::collections::HashMap;
12    use std::time::Duration;
13
14    use hyperactor::ActorRef;
15    use hyperactor::Mailbox;
16    use hyperactor::channel::ChannelAddr;
17    use hyperactor::channel::sim::SimAddr;
18    use hyperactor::id;
19    use hyperactor::reference::Index;
20    use hyperactor::reference::WorldId;
21    use hyperactor::simnet;
22    use hyperactor::test_utils::pingpong::PingPongActor;
23    use hyperactor::test_utils::pingpong::PingPongActorParams;
24    use hyperactor::test_utils::pingpong::PingPongMessage;
25
26    use crate::System;
27    use crate::proc_actor::ProcActor;
28    use crate::proc_actor::spawn;
29    use crate::system_actor::ProcLifecycleMode;
30
31    #[tracing_test::traced_test]
32    #[tokio::test]
33    async fn test_sim_ping_pong() {
34        let system_addr = "local:1".parse::<ChannelAddr>().unwrap();
35
36        simnet::start();
37
38        let system_sim_addr = SimAddr::new(system_addr.clone()).unwrap();
39        let server_handle = System::serve(
40            ChannelAddr::Sim(system_sim_addr.clone()),
41            Duration::from_secs(10),
42            Duration::from_secs(10),
43        )
44        .await
45        .unwrap();
46
47        let server_local_addr = server_handle.local_addr();
48        let mut system = System::new(server_local_addr.clone());
49
50        // Create a client on its individual proc that can talk to the system.
51        let sys_mailbox = system.attach().await.unwrap();
52
53        let world_id = id!(world);
54
55        let ping_actor_ref = spawn_proc_actor(
56            2,
57            system_sim_addr.clone(),
58            sys_mailbox.clone(),
59            world_id.clone(),
60        )
61        .await;
62
63        let pong_actor_ref =
64            spawn_proc_actor(3, system_sim_addr, sys_mailbox.clone(), world_id.clone()).await;
65
66        // Kick start the ping pong game by sending a message to the ping actor. The message will ask the
67        // ping actor to deliver a message to the pong actor with TTL - 1. The pong actor will then
68        // deliver a message to the ping actor with TTL - 2. This will continue until the TTL reaches 0.
69        // The ping actor will then send a message to the done channel to indicate that the game is over.
70        let (done_tx, done_rx) = sys_mailbox.open_once_port();
71        let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
72        ping_actor_ref
73            .send(&sys_mailbox, ping_pong_message)
74            .unwrap();
75
76        assert!(done_rx.recv().await.unwrap());
77
78        let records = simnet::simnet_handle().unwrap().close().await.unwrap();
79        eprintln!(
80            "records: {}",
81            serde_json::to_string_pretty(&records).unwrap()
82        );
83    }
84
85    async fn spawn_proc_actor(
86        actor_index: Index,
87        system_addr: SimAddr,
88        sys_mailbox: Mailbox,
89        world_id: WorldId,
90    ) -> ActorRef<PingPongActor> {
91        let proc_addr = format!("local!{}", actor_index)
92            .parse::<ChannelAddr>()
93            .unwrap();
94
95        let proc_sim_addr = SimAddr::new(proc_addr.clone()).unwrap();
96        let proc_listen_addr = ChannelAddr::Sim(proc_sim_addr);
97        let proc_id = world_id.proc_id(actor_index);
98        let proc_to_system = ChannelAddr::Sim(
99            SimAddr::new_with_src(proc_addr.clone(), system_addr.addr().clone()).unwrap(),
100        );
101        let bootstrap = ProcActor::bootstrap(
102            proc_id,
103            world_id.clone(),
104            proc_listen_addr,
105            proc_to_system,
106            Duration::from_secs(3),
107            HashMap::new(),
108            ProcLifecycleMode::ManagedBySystem,
109        )
110        .await
111        .unwrap();
112
113        let params = PingPongActorParams::new(None, None);
114        spawn::<PingPongActor>(
115            &sys_mailbox,
116            &bootstrap.proc_actor.bind(),
117            actor_index.to_string().as_str(),
118            &params,
119        )
120        .await
121        .unwrap()
122    }
123}