hyperactor_multiprocess/
ping_pong.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#[cfg(test)]
10mod tests {
11    use std::collections::HashMap;
12    use std::time::Duration;
13
14    use hyperactor::ActorRef;
15    use hyperactor::channel::ChannelAddr;
16    use hyperactor::channel::sim::SimAddr;
17    use hyperactor::context;
18    use hyperactor::context::Mailbox as _;
19    use hyperactor::id;
20    use hyperactor::reference::Index;
21    use hyperactor::reference::WorldId;
22    use hyperactor::simnet;
23    use hyperactor::test_utils::pingpong::PingPongActor;
24    use hyperactor::test_utils::pingpong::PingPongActorParams;
25    use hyperactor::test_utils::pingpong::PingPongMessage;
26
27    use crate::System;
28    use crate::proc_actor::ProcActor;
29    use crate::proc_actor::spawn;
30    use crate::system_actor::ProcLifecycleMode;
31
32    #[tracing_test::traced_test]
33    #[tokio::test]
34    async fn test_sim_ping_pong() {
35        let system_addr = "local:1".parse::<ChannelAddr>().unwrap();
36
37        simnet::start();
38
39        let system_sim_addr = SimAddr::new(system_addr.clone()).unwrap();
40        let server_handle = System::serve(
41            ChannelAddr::Sim(system_sim_addr.clone()),
42            Duration::from_secs(10),
43            Duration::from_secs(10),
44        )
45        .await
46        .unwrap();
47
48        let server_local_addr = server_handle.local_addr();
49        let mut system = System::new(server_local_addr.clone());
50
51        // Create a client on its individual proc that can talk to the system.
52        let instance = system.attach().await.unwrap();
53
54        let world_id = id!(world);
55
56        let ping_actor_ref =
57            spawn_proc_actor(&instance, 2, system_sim_addr.clone(), world_id.clone()).await;
58
59        let pong_actor_ref =
60            spawn_proc_actor(&instance, 3, system_sim_addr, world_id.clone()).await;
61
62        // Kick start the ping pong game by sending a message to the ping actor. The message will ask the
63        // ping actor to deliver a message to the pong actor with TTL - 1. The pong actor will then
64        // deliver a message to the ping actor with TTL - 2. This will continue until the TTL reaches 0.
65        // The ping actor will then send a message to the done channel to indicate that the game is over.
66        let (done_tx, done_rx) = instance.mailbox().open_once_port();
67        let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
68        ping_actor_ref.send(&instance, ping_pong_message).unwrap();
69
70        assert!(done_rx.recv().await.unwrap());
71
72        let records = simnet::simnet_handle().unwrap().close().await.unwrap();
73        eprintln!(
74            "records: {}",
75            serde_json::to_string_pretty(&records).unwrap()
76        );
77    }
78
79    async fn spawn_proc_actor(
80        cx: &impl context::Actor,
81        actor_index: Index,
82        system_addr: SimAddr,
83        world_id: WorldId,
84    ) -> ActorRef<PingPongActor> {
85        let proc_addr = format!("local!{}", actor_index)
86            .parse::<ChannelAddr>()
87            .unwrap();
88
89        let proc_sim_addr = SimAddr::new(proc_addr.clone()).unwrap();
90        let proc_listen_addr = ChannelAddr::Sim(proc_sim_addr);
91        let proc_id = world_id.proc_id(actor_index);
92        let proc_to_system = ChannelAddr::Sim(
93            SimAddr::new_with_src(proc_addr.clone(), system_addr.addr().clone()).unwrap(),
94        );
95        let bootstrap = ProcActor::bootstrap(
96            proc_id,
97            world_id.clone(),
98            proc_listen_addr,
99            proc_to_system,
100            Duration::from_secs(3),
101            HashMap::new(),
102            ProcLifecycleMode::ManagedBySystem,
103        )
104        .await
105        .unwrap();
106
107        let params = PingPongActorParams::new(None, None);
108        spawn::<PingPongActor>(
109            cx,
110            &bootstrap.proc_actor.bind(),
111            actor_index.to_string().as_str(),
112            &params,
113        )
114        .await
115        .unwrap()
116    }
117}