hyperactor_multiprocess/
ping_pong.rs1#[cfg(test)]
10mod tests {
11 use std::collections::HashMap;
12 use std::time::Duration;
13
14 use hyperactor::ActorRef;
15 use hyperactor::channel::ChannelAddr;
16 use hyperactor::channel::sim::SimAddr;
17 use hyperactor::context;
18 use hyperactor::context::Mailbox as _;
19 use hyperactor::id;
20 use hyperactor::reference::Index;
21 use hyperactor::reference::WorldId;
22 use hyperactor::simnet;
23 use hyperactor::test_utils::pingpong::PingPongActor;
24 use hyperactor::test_utils::pingpong::PingPongActorParams;
25 use hyperactor::test_utils::pingpong::PingPongMessage;
26
27 use crate::System;
28 use crate::proc_actor::ProcActor;
29 use crate::proc_actor::spawn;
30 use crate::system_actor::ProcLifecycleMode;
31
32 #[tracing_test::traced_test]
33 #[tokio::test]
34 async fn test_sim_ping_pong() {
35 let system_addr = "local:1".parse::<ChannelAddr>().unwrap();
36
37 simnet::start();
38
39 let system_sim_addr = SimAddr::new(system_addr.clone()).unwrap();
40 let server_handle = System::serve(
41 ChannelAddr::Sim(system_sim_addr.clone()),
42 Duration::from_secs(10),
43 Duration::from_secs(10),
44 )
45 .await
46 .unwrap();
47
48 let server_local_addr = server_handle.local_addr();
49 let mut system = System::new(server_local_addr.clone());
50
51 let instance = system.attach().await.unwrap();
53
54 let world_id = id!(world);
55
56 let ping_actor_ref =
57 spawn_proc_actor(&instance, 2, system_sim_addr.clone(), world_id.clone()).await;
58
59 let pong_actor_ref =
60 spawn_proc_actor(&instance, 3, system_sim_addr, world_id.clone()).await;
61
62 let (done_tx, done_rx) = instance.mailbox().open_once_port();
67 let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
68 ping_actor_ref.send(&instance, ping_pong_message).unwrap();
69
70 assert!(done_rx.recv().await.unwrap());
71
72 let records = simnet::simnet_handle().unwrap().close().await.unwrap();
73 eprintln!(
74 "records: {}",
75 serde_json::to_string_pretty(&records).unwrap()
76 );
77 }
78
79 async fn spawn_proc_actor(
80 cx: &impl context::Actor,
81 actor_index: Index,
82 system_addr: SimAddr,
83 world_id: WorldId,
84 ) -> ActorRef<PingPongActor> {
85 let proc_addr = format!("local!{}", actor_index)
86 .parse::<ChannelAddr>()
87 .unwrap();
88
89 let proc_sim_addr = SimAddr::new(proc_addr.clone()).unwrap();
90 let proc_listen_addr = ChannelAddr::Sim(proc_sim_addr);
91 let proc_id = world_id.proc_id(actor_index);
92 let proc_to_system = ChannelAddr::Sim(
93 SimAddr::new_with_src(proc_addr.clone(), system_addr.addr().clone()).unwrap(),
94 );
95 let bootstrap = ProcActor::bootstrap(
96 proc_id,
97 world_id.clone(),
98 proc_listen_addr,
99 proc_to_system,
100 Duration::from_secs(3),
101 HashMap::new(),
102 ProcLifecycleMode::ManagedBySystem,
103 )
104 .await
105 .unwrap();
106
107 let params = PingPongActorParams::new(None, None);
108 spawn::<PingPongActor>(
109 cx,
110 &bootstrap.proc_actor.bind(),
111 actor_index.to_string().as_str(),
112 ¶ms,
113 )
114 .await
115 .unwrap()
116 }
117}