hyperactor_multiprocess/
ping_pong.rs1#[cfg(test)]
10mod tests {
11 use std::collections::HashMap;
12 use std::time::Duration;
13
14 use hyperactor::ActorRef;
15 use hyperactor::Mailbox;
16 use hyperactor::channel::ChannelAddr;
17 use hyperactor::channel::sim::SimAddr;
18 use hyperactor::id;
19 use hyperactor::reference::Index;
20 use hyperactor::reference::WorldId;
21 use hyperactor::simnet;
22 use hyperactor::test_utils::pingpong::PingPongActor;
23 use hyperactor::test_utils::pingpong::PingPongActorParams;
24 use hyperactor::test_utils::pingpong::PingPongMessage;
25
26 use crate::System;
27 use crate::proc_actor::ProcActor;
28 use crate::proc_actor::spawn;
29 use crate::system_actor::ProcLifecycleMode;
30
31 #[tracing_test::traced_test]
32 #[tokio::test]
33 async fn test_sim_ping_pong() {
34 let system_addr = "local:1".parse::<ChannelAddr>().unwrap();
35
36 simnet::start();
37
38 let system_sim_addr = SimAddr::new(system_addr.clone()).unwrap();
39 let server_handle = System::serve(
40 ChannelAddr::Sim(system_sim_addr.clone()),
41 Duration::from_secs(10),
42 Duration::from_secs(10),
43 )
44 .await
45 .unwrap();
46
47 let server_local_addr = server_handle.local_addr();
48 let mut system = System::new(server_local_addr.clone());
49
50 let sys_mailbox = system.attach().await.unwrap();
52
53 let world_id = id!(world);
54
55 let ping_actor_ref = spawn_proc_actor(
56 2,
57 system_sim_addr.clone(),
58 sys_mailbox.clone(),
59 world_id.clone(),
60 )
61 .await;
62
63 let pong_actor_ref =
64 spawn_proc_actor(3, system_sim_addr, sys_mailbox.clone(), world_id.clone()).await;
65
66 let (done_tx, done_rx) = sys_mailbox.open_once_port();
71 let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
72 ping_actor_ref
73 .send(&sys_mailbox, ping_pong_message)
74 .unwrap();
75
76 assert!(done_rx.recv().await.unwrap());
77
78 let records = simnet::simnet_handle().unwrap().close().await.unwrap();
79 eprintln!(
80 "records: {}",
81 serde_json::to_string_pretty(&records).unwrap()
82 );
83 }
84
85 async fn spawn_proc_actor(
86 actor_index: Index,
87 system_addr: SimAddr,
88 sys_mailbox: Mailbox,
89 world_id: WorldId,
90 ) -> ActorRef<PingPongActor> {
91 let proc_addr = format!("local!{}", actor_index)
92 .parse::<ChannelAddr>()
93 .unwrap();
94
95 let proc_sim_addr = SimAddr::new(proc_addr.clone()).unwrap();
96 let proc_listen_addr = ChannelAddr::Sim(proc_sim_addr);
97 let proc_id = world_id.proc_id(actor_index);
98 let proc_to_system = ChannelAddr::Sim(
99 SimAddr::new_with_src(proc_addr.clone(), system_addr.addr().clone()).unwrap(),
100 );
101 let bootstrap = ProcActor::bootstrap(
102 proc_id,
103 world_id.clone(),
104 proc_listen_addr,
105 proc_to_system,
106 Duration::from_secs(3),
107 HashMap::new(),
108 ProcLifecycleMode::ManagedBySystem,
109 )
110 .await
111 .unwrap();
112
113 let params = PingPongActorParams::new(None, None);
114 spawn::<PingPongActor>(
115 &sys_mailbox,
116 &bootstrap.proc_actor.bind(),
117 actor_index.to_string().as_str(),
118 ¶ms,
119 )
120 .await
121 .unwrap()
122 }
123}