1use core::fmt;
14use std::collections::HashMap;
15use std::process::Stdio;
16use std::time::Duration;
17use std::time::SystemTime;
18
19use async_trait::async_trait;
20use hyperactor::Actor;
21use hyperactor::Context;
22use hyperactor::Data;
23use hyperactor::HandleClient;
24use hyperactor::Handler;
25use hyperactor::Instance;
26use hyperactor::Named;
27use hyperactor::OncePortRef;
28use hyperactor::PortRef;
29use hyperactor::RefClient;
30use hyperactor::RemoteMessage;
31use hyperactor::WorldId;
32use hyperactor::actor::ActorErrorKind;
33use hyperactor::actor::ActorHandle;
34use hyperactor::actor::ActorStatus;
35use hyperactor::actor::Referable;
36use hyperactor::actor::remote::Remote;
37use hyperactor::channel;
38use hyperactor::channel::ChannelAddr;
39use hyperactor::clock::Clock;
40use hyperactor::clock::ClockKind;
41use hyperactor::context;
42use hyperactor::mailbox::BoxedMailboxSender;
43use hyperactor::mailbox::DialMailboxRouter;
44use hyperactor::mailbox::MailboxAdminMessage;
45use hyperactor::mailbox::MailboxAdminMessageHandler;
46use hyperactor::mailbox::MailboxClient;
47use hyperactor::mailbox::MailboxServer;
48use hyperactor::mailbox::MailboxServerHandle;
49use hyperactor::mailbox::open_port;
50use hyperactor::proc::ActorLedgerSnapshot;
51use hyperactor::proc::Proc;
52use hyperactor::reference::ActorId;
53use hyperactor::reference::ActorRef;
54use hyperactor::reference::Index;
55use hyperactor::reference::ProcId;
56use hyperactor::supervision::ActorSupervisionEvent;
57use hyperactor_mesh::comm::CommActor;
58use serde::Deserialize;
59use serde::Serialize;
60use tokio::process::Command;
61use tokio::sync::watch;
62use tokio_retry::strategy::jitter;
63
64use crate::pyspy::PySpyTrace;
65use crate::pyspy::py_spy;
66use crate::supervision::ProcStatus;
67use crate::supervision::ProcSupervisionMessageClient;
68use crate::supervision::ProcSupervisionState;
69use crate::supervision::ProcSupervisor;
70use crate::system_actor::ProcLifecycleMode;
71use crate::system_actor::SYSTEM_ACTOR_REF;
72use crate::system_actor::SystemActor;
73use crate::system_actor::SystemMessageClient;
74
75static HYPERACTOR_WORLD_ID: &str = "HYPERACTOR_WORLD_ID";
76static HYPERACTOR_PROC_ID: &str = "HYPERACTOR_PROC_ID";
77static HYPERACTOR_BOOTSTRAP_ADDR: &str = "HYPERACTOR_BOOTSTRAP_ADDR";
78static HYPERACTOR_WORLD_SIZE: &str = "HYPERACTOR_WORLD_SIZE";
79static HYPERACTOR_RANK: &str = "HYPERACTOR_RANK";
80static HYPERACTOR_LOCAL_RANK: &str = "HYPERACTOR_LOCAL_RANK";
81
82#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
84pub enum Environment {
85 Local,
87
88 Exec {
98 program: String,
100 },
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
105pub enum ProcState {
106 AwaitingJoin,
108
109 Joined,
111}
112
113impl fmt::Display for ProcState {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 match self {
116 Self::AwaitingJoin => write!(f, "AwaitingJoin"),
117 Self::Joined => write!(f, "Joined"),
118 }
119 }
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
124pub struct ProcStopResult {
125 pub proc_id: ProcId,
127 pub actors_stopped: usize,
129 pub actors_aborted: usize,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
135pub struct ProcSnapshot {
136 pub state: ProcState,
138 pub actors: ActorLedgerSnapshot,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
144pub enum PySpyConfig {
145 NonBlocking,
147 Blocking {
150 native: Option<bool>,
152 native_all: Option<bool>,
154 },
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
160pub struct StackTrace {
161 pub trace: PySpyTrace,
163}
164
165#[derive(
167 Handler,
168 HandleClient,
169 RefClient,
170 Debug,
171 Serialize,
172 Deserialize,
173 Clone,
174 PartialEq,
175 Named
176)]
177pub enum ProcMessage {
178 #[log_level(debug)]
182 Joined(),
183
184 #[log_level(debug)]
186 State {
187 #[reply]
189 ret: OncePortRef<ProcState>,
190 },
191
192 Spawn {
194 actor_type: String,
196 actor_name: String,
198 params_data: Data,
200 status_port: PortRef<Index>,
202 },
203
204 SpawnProc {
206 env: Environment,
208 world_id: WorldId,
210 proc_ids: Vec<ProcId>,
212 world_size: usize,
214 },
215
216 #[log_level(debug)]
218 UpdateSupervision(),
219
220 Stop {
228 timeout: Duration,
230 #[reply]
232 reply_to: OncePortRef<ProcStopResult>,
233 },
234
235 #[log_level(debug)]
237 Snapshot {
238 #[reply]
240 reply_to: OncePortRef<ProcSnapshot>,
241 },
242
243 LocalAddr {
245 #[reply]
247 reply_to: OncePortRef<ChannelAddr>,
248 },
249
250 #[log_level(debug)]
252 PySpyDump {
253 config: PySpyConfig,
255 #[reply]
257 reply_to: OncePortRef<StackTrace>,
258 },
259}
260
261#[derive(Debug, Clone)]
263pub struct ProcActorParams {
264 pub proc: Proc,
266
267 pub world_id: WorldId,
269
270 pub system_actor_ref: ActorRef<SystemActor>,
272
273 pub bootstrap_channel_addr: ChannelAddr,
277
278 pub local_addr: ChannelAddr,
281
282 pub state_watch: watch::Sender<ProcState>,
284
285 pub supervisor_actor_ref: ActorRef<ProcSupervisor>,
287
288 pub supervision_update_interval: Duration,
290
291 pub labels: HashMap<String, String>,
294
295 pub lifecycle_mode: ProcLifecycleMode,
304}
305
306#[derive(Debug)]
308pub struct BootstrappedProc {
309 pub proc_actor: ActorHandle<ProcActor>,
311 pub comm_actor: ActorHandle<CommActor>,
313 pub mailbox: MailboxServerHandle,
315}
316
317#[derive(Debug)]
321#[hyperactor::export(
322 handlers = [
323 ProcMessage,
324 MailboxAdminMessage,
325 ],
326)]
327pub struct ProcActor {
328 params: ProcActorParams,
329 state: ProcState,
330 remote: Remote,
331 last_successful_supervision_update: SystemTime,
332}
333
334impl ProcActor {
335 #[hyperactor::instrument]
339 pub async fn bootstrap(
340 proc_id: ProcId,
341 world_id: WorldId,
342 listen_addr: ChannelAddr,
343 bootstrap_addr: ChannelAddr,
344 supervision_update_interval: Duration,
345 labels: HashMap<String, String>,
346 lifecycle_mode: ProcLifecycleMode,
347 ) -> Result<BootstrappedProc, anyhow::Error> {
348 let system_supervision_ref: ActorRef<ProcSupervisor> =
349 ActorRef::attest(SYSTEM_ACTOR_REF.actor_id().clone());
350
351 Self::try_bootstrap(
352 proc_id.clone(),
353 world_id.clone(),
354 listen_addr.clone(),
355 bootstrap_addr.clone(),
356 system_supervision_ref.clone(),
357 supervision_update_interval,
358 labels,
359 lifecycle_mode,
360 )
361 .await
362 .inspect_err(|err| {
363 tracing::error!(
364 "bootstrap {} {} {} {}: {}",
365 proc_id,
366 world_id,
367 listen_addr,
368 bootstrap_addr,
369 err
370 );
371 })
372 }
373
374 #[hyperactor::instrument]
378 pub async fn try_bootstrap(
379 proc_id: ProcId,
380 world_id: WorldId,
381 listen_addr: ChannelAddr,
382 bootstrap_addr: ChannelAddr,
383 supervisor_actor_ref: ActorRef<ProcSupervisor>,
384 supervision_update_interval: Duration,
385 labels: HashMap<String, String>,
386 lifecycle_mode: ProcLifecycleMode,
387 ) -> Result<BootstrappedProc, anyhow::Error> {
388 let system_sender =
389 BoxedMailboxSender::new(MailboxClient::new(channel::dial(bootstrap_addr.clone())?));
390 let clock = ClockKind::for_channel_addr(&listen_addr);
391
392 let proc_forwarder =
393 BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
394 let proc = Proc::new_with_clock(proc_id.clone(), proc_forwarder, clock);
395 Self::bootstrap_for_proc(
396 proc,
397 world_id,
398 listen_addr,
399 bootstrap_addr,
400 supervisor_actor_ref,
401 supervision_update_interval,
402 labels,
403 lifecycle_mode,
404 )
405 .await
406 }
407
408 #[hyperactor::instrument]
412 pub async fn bootstrap_for_proc(
413 proc: Proc,
414 world_id: WorldId,
415 listen_addr: ChannelAddr,
416 bootstrap_addr: ChannelAddr,
417 supervisor_actor_ref: ActorRef<ProcSupervisor>,
418 supervision_update_interval: Duration,
419 labels: HashMap<String, String>,
420 lifecycle_mode: ProcLifecycleMode,
421 ) -> Result<BootstrappedProc, anyhow::Error> {
422 let (local_addr, rx) = channel::serve(listen_addr)?;
423 let mailbox_handle = proc.clone().serve(rx);
424 let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin);
425
426 let handle = match proc
427 .clone()
428 .spawn::<Self>(
429 "proc",
430 ProcActorParams {
431 proc: proc.clone(),
432 world_id: world_id.clone(),
433 system_actor_ref: SYSTEM_ACTOR_REF.clone(),
434 bootstrap_channel_addr: bootstrap_addr,
435 local_addr,
436 state_watch: state_tx,
437 supervisor_actor_ref,
438 supervision_update_interval,
439 labels,
440 lifecycle_mode,
441 },
442 )
443 .await
444 {
445 Ok(handle) => handle,
446 Err(e) => {
447 Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
448 return Err(e);
449 }
450 };
451
452 let comm_actor = match proc
453 .clone()
454 .spawn::<CommActor>("comm", Default::default())
455 .await
456 {
457 Ok(handle) => handle,
458 Err(e) => {
459 Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
460 return Err(e);
461 }
462 };
463 comm_actor.bind::<CommActor>();
464
465 loop {
466 let proc_state = state_rx.borrow_and_update().clone();
467 tracing::info!("{}: state: {:?}", &proc.proc_id(), proc_state);
468 if matches!(proc_state, ProcState::Joined) {
469 break;
470 }
471 match state_rx.changed().await {
472 Ok(_) => {}
473 Err(e) => {
474 Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
475 return Err(e.into());
476 }
477 }
478 }
479
480 proc.set_supervision_coordinator(handle.port::<ActorSupervisionEvent>())?;
481
482 Ok(BootstrappedProc {
483 proc_actor: handle,
484 mailbox: mailbox_handle,
485 comm_actor,
486 })
487 }
488
489 async fn failed_proc_bootstrap_cleanup(mailbox_handle: MailboxServerHandle) {
492 mailbox_handle.stop("failed proc bootstrap cleanup");
493 if let Err(shutdown_err) = mailbox_handle.await {
494 tracing::error!(
496 "error shutting down during a failed bootstrap attempt: {}",
497 shutdown_err
498 );
499 }
500 }
501}
502
503#[async_trait]
504impl Actor for ProcActor {
505 type Params = ProcActorParams;
506
507 async fn new(params: ProcActorParams) -> Result<Self, anyhow::Error> {
508 let last_successful_supervision_update = params.proc.clock().system_time_now();
509 Ok(Self {
510 params,
511 state: ProcState::AwaitingJoin,
512 remote: Remote::collect(),
513 last_successful_supervision_update,
514 })
515 }
516
517 async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
518 this.bind::<Self>();
520
521 self.params
523 .system_actor_ref
524 .join(
525 this,
526 self.params.world_id.clone(),
527 self.params.proc.proc_id().clone(),
528 this.port().bind(),
529 self.params.local_addr.clone(),
530 self.params.labels.clone(),
531 self.params.lifecycle_mode.clone(),
532 )
533 .await?;
534
535 if self.params.supervision_update_interval > Duration::from_secs(0)
540 && self.params.lifecycle_mode == ProcLifecycleMode::ManagedBySystem
541 {
542 this.self_message_with_delay(
543 ProcMessage::UpdateSupervision(),
544 self.params.supervision_update_interval,
545 )?;
546 }
547
548 Ok(())
549 }
550}
551
552impl ProcActor {
553 fn rank(&self) -> Index {
555 self.params
556 .proc
557 .proc_id()
558 .rank()
559 .expect("proc must be ranked")
560 }
561}
562
563#[hyperactor::forward(MailboxAdminMessage)]
564#[async_trait]
565impl MailboxAdminMessageHandler for ProcActor {
566 async fn update_address(
567 &mut self,
568 cx: &Context<Self>,
569 proc_id: ProcId,
570 addr: ChannelAddr,
571 ) -> Result<(), anyhow::Error> {
572 tracing::trace!(
573 "received address update:\n{:#?}",
574 MailboxAdminMessage::UpdateAddress {
575 proc_id: proc_id.clone(),
576 addr: addr.clone()
577 }
578 );
579 let forwarder = cx.proc().forwarder();
580 if let Some(router) = forwarder.downcast_ref::<DialMailboxRouter>() {
581 router.bind(proc_id.into(), addr);
582 } else {
583 tracing::warn!(
584 "proc {} received update_address but does not use a DialMailboxRouter",
585 cx.proc().proc_id()
586 );
587 }
588
589 Ok(())
590 }
591}
592
593#[async_trait]
594#[hyperactor::forward(ProcMessage)]
595impl ProcMessageHandler for ProcActor {
596 async fn joined(&mut self, _cx: &Context<Self>) -> Result<(), anyhow::Error> {
597 self.state = ProcState::Joined;
598 let _ = self.params.state_watch.send(self.state.clone());
599 Ok(())
600 }
601
602 async fn state(&mut self, _cx: &Context<Self>) -> Result<ProcState, anyhow::Error> {
603 Ok(self.state.clone())
604 }
605
606 async fn spawn(
607 &mut self,
608 cx: &Context<Self>,
609 actor_type: String,
610 actor_name: String,
611 params_data: Data,
612 status_port: PortRef<Index>,
613 ) -> Result<(), anyhow::Error> {
614 let _actor_id = self
615 .remote
616 .gspawn(&self.params.proc, &actor_type, &actor_name, params_data)
617 .await?;
618
619 status_port.send(cx, self.rank())?;
621 Ok(())
622 }
623
624 async fn spawn_proc(
625 &mut self,
626 _cx: &Context<Self>,
627 env: Environment,
628 world_id: WorldId,
629 proc_ids: Vec<ProcId>,
630 world_size: usize,
631 ) -> Result<(), anyhow::Error> {
632 for (index, proc_id) in proc_ids.into_iter().enumerate() {
633 let proc_world_id = proc_id
634 .world_id()
635 .expect("proc must be ranked for world_id access")
636 .clone();
637 if &proc_world_id
639 == self
640 .params
641 .proc
642 .proc_id()
643 .world_id()
644 .expect("proc must be ranked for world_id access")
645 || &world_id
646 == self
647 .params
648 .proc
649 .proc_id()
650 .world_id()
651 .expect("proc must be ranked for world_id access")
652 {
653 return Err(anyhow::anyhow!(
654 "cannot spawn proc in same world {}",
655 proc_world_id
656 ));
657 }
658 match env {
659 Environment::Local => {
660 ProcActor::bootstrap(
661 proc_id,
662 world_id.clone(),
663 ChannelAddr::any(self.params.bootstrap_channel_addr.transport()),
664 self.params.bootstrap_channel_addr.clone(),
665 self.params.supervision_update_interval,
666 HashMap::new(),
667 ProcLifecycleMode::ManagedBySystem,
668 )
669 .await?;
670 }
671 Environment::Exec { ref program } => {
672 tracing::info!("spawning proc {} with program {}", proc_id, program);
673 let mut child = Command::new(program);
674 let _ = child
675 .env(HYPERACTOR_WORLD_ID, world_id.to_string())
676 .env(HYPERACTOR_PROC_ID, proc_id.to_string())
677 .env(
678 HYPERACTOR_BOOTSTRAP_ADDR,
679 self.params.bootstrap_channel_addr.to_string(),
680 )
681 .env(HYPERACTOR_WORLD_SIZE, world_size.to_string())
682 .env(
683 HYPERACTOR_RANK,
684 proc_id
685 .rank()
686 .expect("proc must be ranked for rank env var")
687 .to_string(),
688 )
689 .env(HYPERACTOR_LOCAL_RANK, index.to_string())
690 .stdin(Stdio::null())
691 .stdout(Stdio::inherit())
692 .stderr(Stdio::inherit())
693 .spawn()?;
694 }
695 }
696 }
697 Ok(())
698 }
699
700 async fn update_supervision(&mut self, cx: &Context<Self>) -> Result<(), anyhow::Error> {
701 let delay = jitter(self.params.supervision_update_interval);
703
704 if self.state != ProcState::Joined {
706 cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
707 return Ok(());
708 }
709
710 let msg = ProcSupervisionState {
711 world_id: self.params.world_id.clone(),
712 proc_id: self.params.proc.proc_id().clone(),
713 proc_addr: self.params.local_addr.clone(),
714 proc_health: ProcStatus::Alive,
715 failed_actors: Vec::new(),
716 };
717
718 match cx
719 .clock()
720 .timeout(
721 Duration::from_secs(10),
723 self.params.supervisor_actor_ref.update(cx, msg),
724 )
725 .await
726 {
727 Ok(_) => {
728 self.last_successful_supervision_update = cx.clock().system_time_now();
729 }
730 Err(_) => {}
731 }
732
733 let supervision_staleness = self
734 .last_successful_supervision_update
735 .elapsed()
736 .unwrap_or_default();
737 if supervision_staleness > 5 * self.params.supervision_update_interval {
740 tracing::error!(
741 "system actor isn't responsive to supervision update, stopping the proc"
742 );
743 self.stop(cx, Duration::from_secs(5)).await?;
746 } else {
747 let delay = jitter(self.params.supervision_update_interval);
749 cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
750 }
751
752 Ok(())
753 }
754
755 async fn stop(
756 &mut self,
757 cx: &Context<Self>,
758 timeout: Duration,
759 ) -> Result<ProcStopResult, anyhow::Error> {
760 tracing::info!("stopping proc {}", self.params.proc.proc_id());
761 self.params
762 .proc
763 .destroy_and_wait(timeout, Some(cx))
764 .await
765 .map(|(stopped, aborted)| {
766 tracing::info!("stopped proc {}", self.params.proc.proc_id());
767 ProcStopResult {
768 proc_id: self.params.proc.proc_id().clone(),
769 actors_stopped: stopped.len(),
770 actors_aborted: aborted.len(),
771 }
772 })
773 }
774
775 async fn snapshot(&mut self, _cx: &Context<Self>) -> Result<ProcSnapshot, anyhow::Error> {
776 let state = self.state.clone();
777 let actors = self.params.proc.ledger_snapshot();
778 Ok(ProcSnapshot { state, actors })
779 }
780
781 async fn local_addr(&mut self, _cx: &Context<Self>) -> Result<ChannelAddr, anyhow::Error> {
782 Ok(self.params.local_addr.clone())
783 }
784
785 async fn py_spy_dump(
786 &mut self,
787 _cx: &Context<Self>,
788 config: PySpyConfig,
789 ) -> Result<StackTrace, anyhow::Error> {
790 let pid = std::process::id() as i32;
791 tracing::info!(
792 "running py-spy on proc {}, process id: {}",
793 self.params.proc.proc_id(),
794 pid
795 );
796 let trace = match config {
797 PySpyConfig::Blocking { native, native_all } => {
798 py_spy(
799 pid,
800 native.unwrap_or_default(),
801 native_all.unwrap_or_default(),
802 true,
803 )
804 .await?
805 }
806 PySpyConfig::NonBlocking => py_spy(pid, false, false, false).await?,
807 };
808 Ok(StackTrace { trace })
809 }
810}
811
812#[async_trait]
813impl Handler<ActorSupervisionEvent> for ProcActor {
814 async fn handle(
815 &mut self,
816 cx: &Context<Self>,
817 event: ActorSupervisionEvent,
818 ) -> anyhow::Result<()> {
819 let actor_id = event.actor_id.clone();
820 let status = match event.actor_status {
821 ActorStatus::Failed(_) => {
822 ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(event)))
823 }
824 status => status,
825 };
826 let message = ProcSupervisionState {
827 world_id: self.params.world_id.clone(),
828 proc_id: self.params.proc.proc_id().clone(),
829 proc_addr: self.params.local_addr.clone(),
830 proc_health: ProcStatus::Alive,
831 failed_actors: Vec::from([(actor_id, status)]),
832 };
833 self.params.supervisor_actor_ref.update(cx, message).await?;
834 Ok(())
835 }
836}
837
838pub async fn spawn<A: Actor + Referable>(
841 cx: &impl context::Actor,
842 proc_actor: &ActorRef<ProcActor>,
843 actor_name: &str,
844 params: &A::Params,
845) -> Result<ActorRef<A>, anyhow::Error>
846where
847 A::Params: RemoteMessage,
848{
849 let remote = Remote::collect();
850 let (spawned_port, mut spawned_receiver) = open_port(cx);
851 let ActorId(proc_id, _, _) = (*proc_actor).clone().into();
852
853 proc_actor
854 .spawn(
855 cx,
856 remote
857 .name_of::<A>()
858 .ok_or(anyhow::anyhow!("actor not registered"))?
859 .into(),
860 actor_name.into(),
861 bincode::serialize(params)?,
862 spawned_port.bind(),
863 )
864 .await?;
865
866 while spawned_receiver.recv().await?
868 != proc_id
869 .rank()
870 .expect("proc must be ranked for rank comparison")
871 {}
872
873 Ok(ActorRef::attest(proc_id.actor_id(actor_name, 0)))
875}
876
877#[cfg(test)]
878mod tests {
879 use std::assert_matches::assert_matches;
880 use std::collections::HashSet;
881 use std::time::Duration;
882
883 use hyperactor::actor::ActorStatus;
884 use hyperactor::channel;
885 use hyperactor::channel::ChannelAddr;
886 use hyperactor::channel::ChannelTransport;
887 use hyperactor::channel::TcpMode;
888 use hyperactor::clock::Clock;
889 use hyperactor::clock::RealClock;
890 use hyperactor::forward;
891 use hyperactor::id;
892 use hyperactor::reference::ActorRef;
893 use hyperactor::test_utils::pingpong::PingPongActor;
894 use hyperactor::test_utils::pingpong::PingPongActorParams;
895 use hyperactor::test_utils::pingpong::PingPongMessage;
896 use maplit::hashset;
897 use rand::Rng;
898 use rand::distributions::Alphanumeric;
899 use regex::Regex;
900
901 use super::*;
902 use crate::supervision::ProcSupervisionMessage;
903 use crate::system::ServerHandle;
904 use crate::system::System;
905
906 const MAX_WAIT_TIME: Duration = Duration::new(10, 0);
907
908 struct Bootstrapped {
909 server_handle: ServerHandle,
910 proc_actor_ref: ActorRef<ProcActor>,
911 comm_actor_ref: ActorRef<CommActor>,
912 client: Instance<()>,
913 }
914
915 async fn bootstrap() -> Bootstrapped {
916 let server_handle = System::serve(
917 ChannelAddr::any(ChannelTransport::Local),
918 Duration::from_secs(10),
919 Duration::from_secs(10),
920 )
921 .await
922 .unwrap();
923
924 let world_id = id!(world);
925 let proc_id = world_id.proc_id(0);
926 let bootstrap = ProcActor::bootstrap(
927 proc_id,
928 world_id,
929 ChannelAddr::any(ChannelTransport::Local),
930 server_handle.local_addr().clone(),
931 Duration::from_secs(1),
932 HashMap::new(),
933 ProcLifecycleMode::ManagedBySystem,
934 )
935 .await
936 .unwrap();
937
938 let mut system = System::new(server_handle.local_addr().clone());
941 let client = system.attach().await.unwrap();
942
943 let start = RealClock.now();
945 let mut proc_state;
946 loop {
947 proc_state = bootstrap.proc_actor.state(&client).await.unwrap();
948
949 if matches!(proc_state, ProcState::Joined) || start.elapsed() >= MAX_WAIT_TIME {
950 break;
951 }
952 }
953 assert_matches!(proc_state, ProcState::Joined);
954
955 Bootstrapped {
956 server_handle,
957 proc_actor_ref: bootstrap.proc_actor.bind(),
958 comm_actor_ref: bootstrap.comm_actor.bind(),
959 client,
960 }
961 }
962
963 #[tokio::test]
964 async fn test_bootstrap() {
965 let Bootstrapped { server_handle, .. } = bootstrap().await;
966
967 println!("bootrapped, now waiting");
968
969 server_handle.stop().await.unwrap();
970 server_handle.await;
971 }
972
973 #[derive(Debug, Default, Actor)]
974 #[hyperactor::export(
975 spawn = true,
976 handlers = [
977 TestActorMessage,
978 ],
979 )]
980 struct TestActor;
981
982 #[derive(Handler, HandleClient, RefClient, Serialize, Deserialize, Debug, Named)]
983 enum TestActorMessage {
984 Increment(u64, #[reply] OncePortRef<u64>),
985 Fail(String),
986 }
987
988 #[async_trait]
989 #[forward(TestActorMessage)]
990 impl TestActorMessageHandler for TestActor {
991 async fn increment(&mut self, _cx: &Context<Self>, num: u64) -> Result<u64, anyhow::Error> {
992 Ok(num + 1)
993 }
994
995 async fn fail(&mut self, _cx: &Context<Self>, err: String) -> Result<(), anyhow::Error> {
996 Err(anyhow::anyhow!(err))
997 }
998 }
999
1000 #[tokio::test]
1001 async fn test_stop() {
1002 let Bootstrapped {
1005 server_handle,
1006 proc_actor_ref,
1007 client,
1008 ..
1009 } = bootstrap().await;
1010
1011 const NUM_ACTORS: usize = 4usize;
1012 for i in 0..NUM_ACTORS {
1013 spawn::<TestActor>(&client, &proc_actor_ref, format!("test{i}").as_str(), &())
1014 .await
1015 .unwrap();
1016 }
1017
1018 let ProcStopResult {
1019 proc_id: _,
1020 actors_stopped,
1021 actors_aborted,
1022 } = proc_actor_ref
1023 .stop(&client, Duration::from_secs(1))
1024 .await
1025 .unwrap();
1026 assert_eq!(NUM_ACTORS + 1, actors_stopped);
1027 assert_eq!(1, actors_aborted);
1028
1029 server_handle.stop().await.unwrap();
1030 server_handle.await;
1031 }
1032
1033 #[derive(Debug, Default, Actor)]
1035 #[hyperactor::export(
1036 spawn = true,
1037 handlers = [
1038 u64,
1039 ],
1040 )]
1041 struct SleepActor {}
1042
1043 #[async_trait]
1044 impl Handler<u64> for SleepActor {
1045 async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> anyhow::Result<()> {
1046 let duration = message;
1047 RealClock.sleep(Duration::from_secs(duration)).await;
1048 Ok(())
1049 }
1050 }
1051
1052 #[tracing_test::traced_test]
1053 #[tokio::test]
1054 #[cfg_attr(not(fbcode_build), ignore)]
1055 async fn test_stop_timeout() {
1056 let Bootstrapped {
1057 server_handle,
1058 proc_actor_ref,
1059 client,
1060 ..
1061 } = bootstrap().await;
1062
1063 const NUM_ACTORS: usize = 4usize;
1064 for i in 0..NUM_ACTORS {
1065 let sleep_secs = 5u64;
1066 let sleeper = spawn::<SleepActor>(
1067 &client,
1068 &proc_actor_ref,
1069 format!("sleeper{i}").as_str(),
1070 &(),
1071 )
1072 .await
1073 .unwrap();
1074 if i > 0 {
1075 sleeper.send(&client, sleep_secs).unwrap();
1076 }
1077 }
1078
1079 let ProcStopResult {
1080 proc_id: _,
1081 actors_stopped,
1082 actors_aborted,
1083 } = proc_actor_ref
1084 .stop(&client, Duration::from_secs(1))
1085 .await
1086 .unwrap();
1087 assert_eq!(2, actors_stopped);
1088 assert_eq!((NUM_ACTORS - 1) + 1, actors_aborted);
1089
1090 assert!(tracing_test::internal::logs_with_scope_contain(
1091 "hyperactor::proc",
1092 "world[0].proc[0]: aborting (delayed) JoinHandle"
1093 ));
1094 for i in 1..3 {
1095 assert!(tracing_test::internal::logs_with_scope_contain(
1096 "hyperactor::proc",
1097 format!("world[0].sleeper{}[0]: aborting JoinHandle", i).as_str()
1098 ));
1099 }
1100 logs_assert(|logs| {
1101 let count = logs
1102 .iter()
1103 .filter(|log| {
1104 log.contains("aborting JoinHandle")
1105 || log.contains("aborting (delayed) JoinHandle")
1106 })
1107 .count();
1108 if count == actors_aborted {
1109 Ok(())
1110 } else {
1111 Err("task abort counting error".to_string())
1112 }
1113 });
1114
1115 server_handle.stop().await.unwrap();
1116 server_handle.await;
1117 }
1118
1119 #[tokio::test]
1120 async fn test_spawn() {
1121 let Bootstrapped {
1122 server_handle,
1123 proc_actor_ref,
1124 client,
1125 ..
1126 } = bootstrap().await;
1127
1128 let test_actor_ref = spawn::<TestActor>(&client, &proc_actor_ref, "test", &())
1129 .await
1130 .unwrap();
1131
1132 let result = test_actor_ref.increment(&client, 1).await.unwrap();
1133 assert_eq!(result, 2);
1134
1135 server_handle.stop().await.unwrap();
1136 server_handle.await;
1137 }
1138
1139 #[cfg(target_os = "linux")]
1140 fn random_abstract_addr() -> ChannelAddr {
1141 let random_string = rand::thread_rng()
1142 .sample_iter(&Alphanumeric)
1143 .take(24)
1144 .map(char::from)
1145 .collect::<String>();
1146 format!("unix!@{random_string}").parse().unwrap()
1147 }
1148
1149 #[cfg(target_os = "linux")] #[tokio::test]
1151 async fn test_bootstrap_retry() {
1152 if std::env::var("CARGO_TEST").is_ok() {
1153 eprintln!("test skipped under cargo as it causes other tests to fail when run");
1154 return;
1155 }
1156
1157 let bootstrap_addr = random_abstract_addr();
1161
1162 let bootstrap_addr_clone = bootstrap_addr.clone();
1163 let handle = tokio::spawn(async move {
1164 let world_id = id!(world);
1165 let proc_id = world_id.proc_id(0);
1166 let bootstrap = ProcActor::bootstrap(
1167 proc_id,
1168 world_id,
1169 random_abstract_addr(),
1170 bootstrap_addr_clone,
1171 Duration::from_secs(1),
1172 HashMap::new(),
1173 ProcLifecycleMode::ManagedBySystem,
1174 )
1175 .await
1176 .unwrap();
1177
1178 let mut status = bootstrap.proc_actor.status();
1180 assert_eq!(*status.borrow_and_update(), ActorStatus::Idle);
1181 });
1182
1183 RealClock.sleep(Duration::from_secs(5)).await;
1186
1187 let _server_handle = System::serve(
1188 bootstrap_addr,
1189 Duration::from_secs(10),
1190 Duration::from_secs(10),
1191 )
1192 .await
1193 .unwrap();
1194
1195 handle.await.unwrap();
1197 }
1198
1199 #[tokio::test]
1200 async fn test_supervision_message_handling() {
1201 if std::env::var("CARGO_TEST").is_ok() {
1202 eprintln!("test skipped under cargo as it fails when run with others");
1203 return;
1204 }
1205
1206 let server_handle = System::serve(
1207 ChannelAddr::any(ChannelTransport::Local),
1208 Duration::from_secs(3600),
1209 Duration::from_secs(3600),
1210 )
1211 .await
1212 .unwrap();
1213
1214 let mut system = System::new(server_handle.local_addr().clone());
1216 let supervisor = system.attach().await.unwrap();
1217 let (_supervisor_supervision_tx, mut supervisor_supervision_receiver) =
1218 supervisor.bind_actor_port::<ProcSupervisionMessage>();
1219 let supervisor_actor_ref: ActorRef<ProcSupervisor> =
1220 ActorRef::attest(supervisor.self_id().clone());
1221
1222 let local_world_id = hyperactor::id!(test_proc);
1224 let local_proc_id = local_world_id.proc_id(0);
1225 let bootstrap = ProcActor::try_bootstrap(
1226 local_proc_id.clone(),
1227 local_world_id.clone(),
1228 ChannelAddr::any(ChannelTransport::Local),
1229 server_handle.local_addr().clone(),
1230 supervisor_actor_ref.clone(),
1231 Duration::from_secs(1),
1232 HashMap::new(),
1233 ProcLifecycleMode::ManagedBySystem,
1234 )
1235 .await
1236 .unwrap();
1237
1238 let msg = supervisor_supervision_receiver.recv().await;
1241 match msg.unwrap() {
1242 ProcSupervisionMessage::Update(state, port) => {
1243 assert_eq!(
1244 state,
1245 ProcSupervisionState {
1246 world_id: local_world_id.clone(),
1247 proc_addr: ChannelAddr::Local(3),
1248 proc_id: local_proc_id.clone(),
1249 proc_health: ProcStatus::Alive,
1250 failed_actors: Vec::new(),
1251 }
1252 );
1253 let _ = port.send(&supervisor, ());
1254 }
1255 }
1256
1257 let proc_actor_ref = bootstrap.proc_actor.bind();
1259 let test_actor_ref = spawn::<TestActor>(&supervisor, &proc_actor_ref, "test", &())
1260 .await
1261 .unwrap();
1262
1263 test_actor_ref
1264 .fail(&supervisor, "test actor is erroring out".to_string())
1265 .await
1266 .unwrap();
1267 let result = RealClock
1271 .timeout(Duration::from_secs(5), async {
1272 loop {
1273 match supervisor_supervision_receiver.recv().await {
1274 Ok(ProcSupervisionMessage::Update(state, _port)) => {
1275 match state.failed_actors.iter().find(|(failed_id, _)| {
1276 failed_id == test_actor_ref.clone().actor_id()
1277 }) {
1278 Some((_, actor_status)) => return Ok(actor_status.clone()),
1279 None => {}
1280 }
1281 }
1282 _ => anyhow::bail!("unexpected message type"),
1283 }
1284 }
1285 })
1286 .await;
1287 assert_matches!(
1288 result.unwrap().unwrap(),
1289 ActorStatus::Failed(msg) if msg.to_string().contains("test actor is erroring out")
1290 );
1291
1292 server_handle.stop().await.unwrap();
1293 server_handle.await;
1294 }
1295
1296 #[tokio::test]
1299 async fn test_bind_proc_actor_in_bootstrap() {
1300 let server_handle = System::serve(
1301 ChannelAddr::any(ChannelTransport::Local),
1302 Duration::from_secs(10),
1303 Duration::from_secs(10),
1304 )
1305 .await
1306 .unwrap();
1307 let mut system = System::new(server_handle.local_addr().clone());
1308 let client = system.attach().await.unwrap();
1309
1310 let world_id = id!(world);
1311 let proc_id = world_id.proc_id(0);
1312 let bootstrap = ProcActor::bootstrap(
1313 proc_id,
1314 world_id,
1315 ChannelAddr::any(ChannelTransport::Local),
1316 server_handle.local_addr().clone(),
1317 Duration::from_secs(1),
1318 HashMap::new(),
1319 ProcLifecycleMode::ManagedBySystem,
1320 )
1321 .await
1322 .unwrap();
1323 let proc_actor_id = bootstrap.proc_actor.actor_id().clone();
1324 let proc_actor_ref = ActorRef::<ProcActor>::attest(proc_actor_id);
1325
1326 let res = RealClock
1327 .timeout(Duration::from_secs(5), proc_actor_ref.state(&client))
1328 .await;
1329 assert!(res.is_ok());
1332 assert_matches!(res.unwrap().unwrap(), ProcState::Joined);
1333 server_handle.stop().await.unwrap();
1334 server_handle.await;
1335 }
1336
1337 #[tokio::test]
1338 async fn test_proc_snapshot() {
1339 let Bootstrapped {
1340 server_handle,
1341 proc_actor_ref,
1342 comm_actor_ref,
1343 client,
1344 ..
1345 } = bootstrap().await;
1346
1347 let root: ActorRef<TestActor> = spawn::<TestActor>(&client, &proc_actor_ref, "root", &())
1349 .await
1350 .unwrap();
1351 let another_root = spawn::<TestActor>(&client, &proc_actor_ref, "another_root", &())
1352 .await
1353 .unwrap();
1354 {
1355 let snapshot = proc_actor_ref.snapshot(&client).await.unwrap();
1356 assert_eq!(snapshot.state, ProcState::Joined);
1357 assert_eq!(
1358 snapshot.actors.roots.keys().collect::<HashSet<_>>(),
1359 hashset! {
1360 proc_actor_ref.actor_id(),
1361 comm_actor_ref.actor_id(),
1362 root.actor_id(),
1363 another_root.actor_id(),
1364 }
1365 );
1366 }
1367
1368 server_handle.stop().await.unwrap();
1369 server_handle.await;
1370 }
1371
1372 #[tokio::test]
1373 async fn test_undeliverable_message_return() {
1374 use hyperactor::mailbox::Undeliverable;
1377 use hyperactor::test_utils::pingpong::PingPongActor;
1378 use hyperactor::test_utils::pingpong::PingPongMessage;
1379
1380 let config = hyperactor::config::global::lock();
1382 let _guard = config.override_key(
1383 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1384 Duration::from_secs(1),
1385 );
1386
1387 let server_handle = System::serve(
1389 ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1390 Duration::from_secs(120),
1391 Duration::from_secs(120),
1392 )
1393 .await
1394 .unwrap();
1395 let mut system = System::new(server_handle.local_addr().clone());
1396
1397 let supervisor = system.attach().await.unwrap();
1399 let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::<ProcSupervisionMessage>();
1400 let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
1401
1402 let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1404 channel::dial(server_handle.local_addr().clone()).unwrap(),
1405 ));
1406
1407 let listen_addr = ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname));
1409 let proc_forwarder =
1410 BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1411
1412 let world_id = id!(world);
1414 let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1415 let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1416 proc_0.clone(),
1417 world_id.clone(),
1418 listen_addr,
1419 server_handle.local_addr().clone(),
1420 sup_ref.clone(),
1421 Duration::from_secs(120),
1422 HashMap::new(),
1423 ProcLifecycleMode::ManagedBySystem,
1424 )
1425 .await
1426 .unwrap();
1427 let proc_0_client = proc_0.attach("client").unwrap();
1428 let (proc_0_undeliverable_tx, mut proc_0_undeliverable_rx) = proc_0_client.open_port();
1429
1430 let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1432 let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1433 proc_1.clone(),
1434 world_id.clone(),
1435 ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1436 server_handle.local_addr().clone(),
1437 sup_ref.clone(),
1438 Duration::from_secs(120),
1439 HashMap::new(),
1440 ProcLifecycleMode::ManagedBySystem,
1441 )
1442 .await
1443 .unwrap();
1444 let proc_1_client = proc_1.attach("client").unwrap();
1445 let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port();
1446
1447 let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1448 let ping_handle = proc_0
1452 .spawn::<PingPongActor>("ping", ping_params)
1453 .await
1454 .unwrap();
1455 let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1456 let pong_handle = proc_1
1457 .spawn::<PingPongActor>("pong", pong_params)
1458 .await
1459 .unwrap();
1460
1461 server_handle.stop().await.unwrap();
1464 server_handle.await;
1465
1466 let n = 100usize;
1467 for i in 1..(n + 1) {
1468 let ttl = 66 + i as u64; let (once_handle, _) = proc_0_client.open_once_port::<bool>();
1471 ping_handle
1472 .send(PingPongMessage(ttl, pong_handle.bind(), once_handle.bind()))
1473 .unwrap();
1474 }
1475
1476 assert!(matches!(*ping_handle.status().borrow(), ActorStatus::Idle));
1481
1482 let Ok(Undeliverable(envelope)) = proc_0_undeliverable_rx.recv().await else {
1484 unreachable!()
1485 };
1486 let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1487 let mut count = 1;
1488 while let Ok(Some(Undeliverable(envelope))) = proc_0_undeliverable_rx.try_recv() {
1489 count += 1;
1493 let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1494 }
1495 assert!(count == n);
1496 }
1497
1498 #[tracing_test::traced_test]
1499 #[tokio::test]
1500 #[cfg_attr(not(fbcode_build), ignore)]
1501 async fn test_proc_actor_mailbox_admin_message() {
1502 use hyperactor::test_utils::pingpong::PingPongActor;
1507 use hyperactor::test_utils::pingpong::PingPongMessage;
1508
1509 let server_handle = System::serve(
1511 ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1512 Duration::from_secs(120),
1513 Duration::from_secs(120),
1514 )
1515 .await
1516 .unwrap();
1517 let mut system = System::new(server_handle.local_addr().clone());
1518 let system_actor = server_handle.system_actor_handle();
1519 let system_client = system.attach().await.unwrap(); let supervisor = system.attach().await.unwrap();
1523 let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::<ProcSupervisionMessage>();
1524 let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
1525
1526 let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1528 channel::dial(server_handle.local_addr().clone()).unwrap(),
1529 ));
1530
1531 let listen_addr = ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname));
1533 let proc_forwarder =
1534 BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1535
1536 let world_id = id!(world);
1538 let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1539 let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1540 proc_0.clone(),
1541 world_id.clone(),
1542 listen_addr,
1543 server_handle.local_addr().clone(),
1544 sup_ref.clone(),
1545 Duration::from_secs(120),
1546 HashMap::new(),
1547 ProcLifecycleMode::ManagedBySystem,
1548 )
1549 .await
1550 .unwrap();
1551 let proc_0_client = proc_0.attach("client").unwrap();
1552 let (proc_0_undeliverable_tx, _proc_0_undeliverable_rx) = proc_0_client.open_port();
1553
1554 let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1556 let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1557 proc_1.clone(),
1558 world_id.clone(),
1559 ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1560 server_handle.local_addr().clone(),
1561 sup_ref.clone(),
1562 Duration::from_secs(120),
1563 HashMap::new(),
1564 ProcLifecycleMode::ManagedBySystem,
1565 )
1566 .await
1567 .unwrap();
1568 let proc_1_client = proc_1.attach("client").unwrap();
1569 let (proc_1_undeliverable_tx, _proc_1_undeliverable_rx) = proc_1_client.open_port();
1570
1571 let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1575 let ping_handle = proc_0
1576 .spawn::<PingPongActor>("ping", ping_params)
1577 .await
1578 .unwrap();
1579 let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1580 let pong_handle = proc_1
1581 .spawn::<PingPongActor>("pong", pong_params)
1582 .await
1583 .unwrap();
1584
1585 let ttl = 10u64; let (once_tx, once_rx) = system_client.open_once_port::<bool>();
1588 ping_handle
1589 .send(PingPongMessage(ttl, pong_handle.bind(), once_tx.bind()))
1590 .unwrap();
1591
1592 assert!(once_rx.recv().await.unwrap());
1593
1594 let expected_1 = r#"UpdateAddress {
1596 proc_id: Ranked(
1597 WorldId(
1598 "world",
1599 ),
1600 1,
1601 ),
1602 addr: Tcp("#;
1603
1604 let expected_2 = r#"UpdateAddress {
1606 proc_id: Ranked(
1607 WorldId(
1608 "world",
1609 ),
1610 0,
1611 ),
1612 addr: Tcp("#;
1613
1614 let expected_3 = r#"UpdateAddress {
1616 proc_id: Ranked(
1617 WorldId(
1618 "user",
1619 ),"#;
1620
1621 logs_assert(|logs| {
1622 let log_body = logs.join("\n");
1623
1624 let pattern = Regex::new(r"(?m)^UpdateAddress \{\n(?:.*\n)*?^\}").unwrap();
1625 let count = pattern.find_iter(&log_body).count();
1626
1627 if count != 3 {
1628 return Err(format!(
1629 "expected 3 UpdateAddress messages, found {}",
1630 count
1631 ));
1632 }
1633
1634 if !log_body.contains(expected_1) {
1635 return Err("missing expected update for proc_id 1".into());
1636 }
1637 if !log_body.contains(expected_2) {
1638 return Err("missing expected update for proc_id 0".into());
1639 }
1640 if !log_body.contains(expected_3) {
1641 return Err("missing expected update for proc_id user".into());
1642 }
1643
1644 Ok(())
1645 });
1646
1647 let (once_tx, once_rx) = system_client.open_once_port::<()>();
1648 assert_matches!(
1649 system_actor
1650 .stop(
1651 &system_client,
1652 None,
1653 Duration::from_secs(1),
1654 once_tx.bind()
1655 )
1656 .await,
1657 Ok(())
1658 );
1659 assert_matches!(once_rx.recv().await.unwrap(), ());
1660 }
1661
1662 #[tokio::test]
1663 async fn test_update_address_book_cache() {
1664 let server_handle = System::serve(
1665 ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1666 Duration::from_secs(2), Duration::from_secs(2), )
1669 .await
1670 .unwrap();
1671 let system_addr = server_handle.local_addr().clone();
1672 let mut system = System::new(system_addr.clone());
1673
1674 let system_client = system.attach().await.unwrap();
1675
1676 let ping_actor_id = id!(world[0].ping[0]);
1678 let (ping_actor_ref, _ping_proc_ref) =
1679 spawn_actor(&system_client, &ping_actor_id, &system_addr).await;
1680
1681 let pong_actor_id = id!(world[1].pong[0]);
1682 let (pong_actor_ref, pong_proc_ref) =
1683 spawn_actor(&system_client, &pong_actor_id, &system_addr).await;
1684
1685 let (done_tx, done_rx) = system_client.open_once_port();
1688 let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1689 ping_actor_ref
1690 .send(&system_client, ping_pong_message)
1691 .unwrap();
1692 assert!(done_rx.recv().await.unwrap());
1693
1694 let ProcStopResult { actors_aborted, .. } = pong_proc_ref
1696 .stop(&system_client, Duration::from_secs(1))
1697 .await
1698 .unwrap();
1699 assert_eq!(1, actors_aborted);
1700 let (pong_actor_ref, _pong_proc_ref) =
1701 spawn_actor(&system_client, &pong_actor_id, &system_addr).await;
1702
1703 let (done_tx, done_rx) = system_client.open_once_port();
1707 let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1708 ping_actor_ref
1709 .send(&system_client, ping_pong_message)
1710 .unwrap();
1711 assert!(done_rx.recv().await.unwrap());
1712 }
1713
1714 async fn spawn_actor(
1715 cx: &impl context::Actor,
1716 actor_id: &ActorId,
1717 system_addr: &ChannelAddr,
1718 ) -> (ActorRef<PingPongActor>, ActorRef<ProcActor>) {
1719 let listen_addr = ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname));
1720 let bootstrap = ProcActor::bootstrap(
1721 actor_id.proc_id().clone(),
1722 actor_id
1723 .proc_id()
1724 .world_id()
1725 .expect("proc must be ranked for bootstrap world_id")
1726 .clone(),
1727 listen_addr.clone(),
1728 system_addr.clone(),
1729 Duration::from_secs(3),
1730 HashMap::new(),
1731 ProcLifecycleMode::ManagedBySystem,
1732 )
1733 .await
1734 .unwrap();
1735 let (undeliverable_msg_tx, _) = cx.mailbox().open_port();
1736 let params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
1737 let actor_ref = spawn::<PingPongActor>(
1738 cx,
1739 &bootstrap.proc_actor.bind(),
1740 &actor_id.to_string(),
1741 ¶ms,
1742 )
1743 .await
1744 .unwrap();
1745 let proc_actor_ref = bootstrap.proc_actor.bind();
1746 (actor_ref, proc_actor_ref)
1747 }
1748}