1use core::fmt;
14use std::collections::HashMap;
15use std::process::Stdio;
16use std::time::Duration;
17use std::time::SystemTime;
18
19use async_trait::async_trait;
20use hyperactor::Actor;
21use hyperactor::Context;
22use hyperactor::Data;
23use hyperactor::HandleClient;
24use hyperactor::Handler;
25use hyperactor::Instance;
26use hyperactor::Named;
27use hyperactor::OncePortRef;
28use hyperactor::PortRef;
29use hyperactor::RefClient;
30use hyperactor::RemoteMessage;
31use hyperactor::WorldId;
32use hyperactor::actor::ActorHandle;
33use hyperactor::actor::Referable;
34use hyperactor::actor::remote::Remote;
35use hyperactor::channel;
36use hyperactor::channel::ChannelAddr;
37use hyperactor::clock::Clock;
38use hyperactor::clock::ClockKind;
39use hyperactor::context;
40use hyperactor::mailbox::BoxedMailboxSender;
41use hyperactor::mailbox::DialMailboxRouter;
42use hyperactor::mailbox::MailboxAdminMessage;
43use hyperactor::mailbox::MailboxAdminMessageHandler;
44use hyperactor::mailbox::MailboxClient;
45use hyperactor::mailbox::MailboxServer;
46use hyperactor::mailbox::MailboxServerHandle;
47use hyperactor::mailbox::open_port;
48use hyperactor::proc::ActorLedgerSnapshot;
49use hyperactor::proc::Proc;
50use hyperactor::reference::ActorId;
51use hyperactor::reference::ActorRef;
52use hyperactor::reference::Index;
53use hyperactor::reference::ProcId;
54use hyperactor::supervision::ActorSupervisionEvent;
55use hyperactor_mesh::comm::CommActor;
56use serde::Deserialize;
57use serde::Serialize;
58use tokio::process::Command;
59use tokio::sync::watch;
60use tokio_retry::strategy::jitter;
61
62use crate::pyspy::PySpyTrace;
63use crate::pyspy::py_spy;
64use crate::supervision::ProcStatus;
65use crate::supervision::ProcSupervisionMessageClient;
66use crate::supervision::ProcSupervisionState;
67use crate::supervision::ProcSupervisor;
68use crate::system_actor::ProcLifecycleMode;
69use crate::system_actor::SYSTEM_ACTOR_REF;
70use crate::system_actor::SystemActor;
71use crate::system_actor::SystemMessageClient;
72
73static HYPERACTOR_WORLD_ID: &str = "HYPERACTOR_WORLD_ID";
74static HYPERACTOR_PROC_ID: &str = "HYPERACTOR_PROC_ID";
75static HYPERACTOR_BOOTSTRAP_ADDR: &str = "HYPERACTOR_BOOTSTRAP_ADDR";
76static HYPERACTOR_WORLD_SIZE: &str = "HYPERACTOR_WORLD_SIZE";
77static HYPERACTOR_RANK: &str = "HYPERACTOR_RANK";
78static HYPERACTOR_LOCAL_RANK: &str = "HYPERACTOR_LOCAL_RANK";
79
80#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
82pub enum Environment {
83 Local,
85
86 Exec {
96 program: String,
98 },
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
103pub enum ProcState {
104 AwaitingJoin,
106
107 Joined,
109}
110
111impl fmt::Display for ProcState {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 match self {
114 Self::AwaitingJoin => write!(f, "AwaitingJoin"),
115 Self::Joined => write!(f, "Joined"),
116 }
117 }
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
122pub struct ProcStopResult {
123 pub proc_id: ProcId,
125 pub actors_stopped: usize,
127 pub actors_aborted: usize,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
133pub struct ProcSnapshot {
134 pub state: ProcState,
136 pub actors: ActorLedgerSnapshot,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
142pub enum PySpyConfig {
143 NonBlocking,
145 Blocking {
148 native: Option<bool>,
150 native_all: Option<bool>,
152 },
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
158pub struct StackTrace {
159 pub trace: PySpyTrace,
161}
162
163#[derive(
165 Handler,
166 HandleClient,
167 RefClient,
168 Debug,
169 Serialize,
170 Deserialize,
171 Clone,
172 PartialEq,
173 Named
174)]
175pub enum ProcMessage {
176 #[log_level(debug)]
180 Joined(),
181
182 #[log_level(debug)]
184 State {
185 #[reply]
187 ret: OncePortRef<ProcState>,
188 },
189
190 Spawn {
192 actor_type: String,
194 actor_name: String,
196 params_data: Data,
198 status_port: PortRef<Index>,
200 },
201
202 SpawnProc {
204 env: Environment,
206 world_id: WorldId,
208 proc_ids: Vec<ProcId>,
210 world_size: usize,
212 },
213
214 #[log_level(debug)]
216 UpdateSupervision(),
217
218 Stop {
226 timeout: Duration,
228 #[reply]
230 reply_to: OncePortRef<ProcStopResult>,
231 },
232
233 #[log_level(debug)]
235 Snapshot {
236 #[reply]
238 reply_to: OncePortRef<ProcSnapshot>,
239 },
240
241 LocalAddr {
243 #[reply]
245 reply_to: OncePortRef<ChannelAddr>,
246 },
247
248 #[log_level(debug)]
250 PySpyDump {
251 config: PySpyConfig,
253 #[reply]
255 reply_to: OncePortRef<StackTrace>,
256 },
257}
258
259#[derive(Debug, Clone)]
261pub struct ProcActorParams {
262 pub proc: Proc,
264
265 pub world_id: WorldId,
267
268 pub system_actor_ref: ActorRef<SystemActor>,
270
271 pub bootstrap_channel_addr: ChannelAddr,
275
276 pub local_addr: ChannelAddr,
279
280 pub state_watch: watch::Sender<ProcState>,
282
283 pub supervisor_actor_ref: ActorRef<ProcSupervisor>,
285
286 pub supervision_update_interval: Duration,
288
289 pub labels: HashMap<String, String>,
292
293 pub lifecycle_mode: ProcLifecycleMode,
302}
303
304#[derive(Debug)]
306pub struct BootstrappedProc {
307 pub proc_actor: ActorHandle<ProcActor>,
309 pub comm_actor: ActorHandle<CommActor>,
311 pub mailbox: MailboxServerHandle,
313}
314
315#[derive(Debug)]
319#[hyperactor::export(
320 handlers = [
321 ProcMessage,
322 MailboxAdminMessage,
323 ],
324)]
325pub struct ProcActor {
326 params: ProcActorParams,
327 state: ProcState,
328 remote: Remote,
329 last_successful_supervision_update: SystemTime,
330}
331
332impl ProcActor {
333 #[hyperactor::instrument]
337 pub async fn bootstrap(
338 proc_id: ProcId,
339 world_id: WorldId,
340 listen_addr: ChannelAddr,
341 bootstrap_addr: ChannelAddr,
342 supervision_update_interval: Duration,
343 labels: HashMap<String, String>,
344 lifecycle_mode: ProcLifecycleMode,
345 ) -> Result<BootstrappedProc, anyhow::Error> {
346 let system_supervision_ref: ActorRef<ProcSupervisor> =
347 ActorRef::attest(SYSTEM_ACTOR_REF.actor_id().clone());
348
349 Self::try_bootstrap(
350 proc_id.clone(),
351 world_id.clone(),
352 listen_addr.clone(),
353 bootstrap_addr.clone(),
354 system_supervision_ref.clone(),
355 supervision_update_interval,
356 labels,
357 lifecycle_mode,
358 )
359 .await
360 .inspect_err(|err| {
361 tracing::error!(
362 "bootstrap {} {} {} {}: {}",
363 proc_id,
364 world_id,
365 listen_addr,
366 bootstrap_addr,
367 err
368 );
369 })
370 }
371
372 #[hyperactor::instrument]
376 pub async fn try_bootstrap(
377 proc_id: ProcId,
378 world_id: WorldId,
379 listen_addr: ChannelAddr,
380 bootstrap_addr: ChannelAddr,
381 supervisor_actor_ref: ActorRef<ProcSupervisor>,
382 supervision_update_interval: Duration,
383 labels: HashMap<String, String>,
384 lifecycle_mode: ProcLifecycleMode,
385 ) -> Result<BootstrappedProc, anyhow::Error> {
386 let system_sender =
387 BoxedMailboxSender::new(MailboxClient::new(channel::dial(bootstrap_addr.clone())?));
388 let clock = ClockKind::for_channel_addr(&listen_addr);
389
390 let proc_forwarder =
391 BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
392 let proc = Proc::new_with_clock(proc_id.clone(), proc_forwarder, clock);
393 Self::bootstrap_for_proc(
394 proc,
395 world_id,
396 listen_addr,
397 bootstrap_addr,
398 supervisor_actor_ref,
399 supervision_update_interval,
400 labels,
401 lifecycle_mode,
402 )
403 .await
404 }
405
406 #[hyperactor::instrument]
410 pub async fn bootstrap_for_proc(
411 proc: Proc,
412 world_id: WorldId,
413 listen_addr: ChannelAddr,
414 bootstrap_addr: ChannelAddr,
415 supervisor_actor_ref: ActorRef<ProcSupervisor>,
416 supervision_update_interval: Duration,
417 labels: HashMap<String, String>,
418 lifecycle_mode: ProcLifecycleMode,
419 ) -> Result<BootstrappedProc, anyhow::Error> {
420 let (local_addr, rx) = channel::serve(listen_addr)?;
421 let mailbox_handle = proc.clone().serve(rx);
422 let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin);
423
424 let handle = match proc
425 .clone()
426 .spawn::<Self>(
427 "proc",
428 ProcActorParams {
429 proc: proc.clone(),
430 world_id: world_id.clone(),
431 system_actor_ref: SYSTEM_ACTOR_REF.clone(),
432 bootstrap_channel_addr: bootstrap_addr,
433 local_addr,
434 state_watch: state_tx,
435 supervisor_actor_ref,
436 supervision_update_interval,
437 labels,
438 lifecycle_mode,
439 },
440 )
441 .await
442 {
443 Ok(handle) => handle,
444 Err(e) => {
445 Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
446 return Err(e);
447 }
448 };
449
450 let comm_actor = match proc
451 .clone()
452 .spawn::<CommActor>("comm", Default::default())
453 .await
454 {
455 Ok(handle) => handle,
456 Err(e) => {
457 Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
458 return Err(e);
459 }
460 };
461 comm_actor.bind::<CommActor>();
462
463 loop {
464 let proc_state = state_rx.borrow_and_update().clone();
465 tracing::info!("{}: state: {:?}", &proc.proc_id(), proc_state);
466 if matches!(proc_state, ProcState::Joined) {
467 break;
468 }
469 match state_rx.changed().await {
470 Ok(_) => {}
471 Err(e) => {
472 Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
473 return Err(e.into());
474 }
475 }
476 }
477
478 proc.set_supervision_coordinator(handle.port::<ActorSupervisionEvent>())?;
479
480 Ok(BootstrappedProc {
481 proc_actor: handle,
482 mailbox: mailbox_handle,
483 comm_actor,
484 })
485 }
486
487 async fn failed_proc_bootstrap_cleanup(mailbox_handle: MailboxServerHandle) {
490 mailbox_handle.stop("failed proc bootstrap cleanup");
491 if let Err(shutdown_err) = mailbox_handle.await {
492 tracing::error!(
494 "error shutting down during a failed bootstrap attempt: {}",
495 shutdown_err
496 );
497 }
498 }
499}
500
501#[async_trait]
502impl Actor for ProcActor {
503 type Params = ProcActorParams;
504
505 async fn new(params: ProcActorParams) -> Result<Self, anyhow::Error> {
506 let last_successful_supervision_update = params.proc.clock().system_time_now();
507 Ok(Self {
508 params,
509 state: ProcState::AwaitingJoin,
510 remote: Remote::collect(),
511 last_successful_supervision_update,
512 })
513 }
514
515 async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
516 this.bind::<Self>();
518
519 self.params
521 .system_actor_ref
522 .join(
523 this,
524 self.params.world_id.clone(),
525 self.params.proc.proc_id().clone(),
526 this.port().bind(),
527 self.params.local_addr.clone(),
528 self.params.labels.clone(),
529 self.params.lifecycle_mode.clone(),
530 )
531 .await?;
532
533 if self.params.supervision_update_interval > Duration::from_secs(0)
538 && self.params.lifecycle_mode == ProcLifecycleMode::ManagedBySystem
539 {
540 this.self_message_with_delay(
541 ProcMessage::UpdateSupervision(),
542 self.params.supervision_update_interval,
543 )?;
544 }
545
546 Ok(())
547 }
548}
549
550impl ProcActor {
551 fn rank(&self) -> Index {
553 self.params
554 .proc
555 .proc_id()
556 .rank()
557 .expect("proc must be ranked")
558 }
559}
560
561#[hyperactor::forward(MailboxAdminMessage)]
562#[async_trait]
563impl MailboxAdminMessageHandler for ProcActor {
564 async fn update_address(
565 &mut self,
566 cx: &Context<Self>,
567 proc_id: ProcId,
568 addr: ChannelAddr,
569 ) -> Result<(), anyhow::Error> {
570 tracing::trace!(
571 "received address update:\n{:#?}",
572 MailboxAdminMessage::UpdateAddress {
573 proc_id: proc_id.clone(),
574 addr: addr.clone()
575 }
576 );
577 let forwarder = cx.proc().forwarder();
578 if let Some(router) = forwarder.downcast_ref::<DialMailboxRouter>() {
579 router.bind(proc_id.into(), addr);
580 } else {
581 tracing::warn!(
582 "proc {} received update_address but does not use a DialMailboxRouter",
583 cx.proc().proc_id()
584 );
585 }
586
587 Ok(())
588 }
589}
590
591#[async_trait]
592#[hyperactor::forward(ProcMessage)]
593impl ProcMessageHandler for ProcActor {
594 async fn joined(&mut self, _cx: &Context<Self>) -> Result<(), anyhow::Error> {
595 self.state = ProcState::Joined;
596 let _ = self.params.state_watch.send(self.state.clone());
597 Ok(())
598 }
599
600 async fn state(&mut self, _cx: &Context<Self>) -> Result<ProcState, anyhow::Error> {
601 Ok(self.state.clone())
602 }
603
604 async fn spawn(
605 &mut self,
606 cx: &Context<Self>,
607 actor_type: String,
608 actor_name: String,
609 params_data: Data,
610 status_port: PortRef<Index>,
611 ) -> Result<(), anyhow::Error> {
612 let _actor_id = self
613 .remote
614 .gspawn(&self.params.proc, &actor_type, &actor_name, params_data)
615 .await?;
616
617 status_port.send(cx, self.rank())?;
619 Ok(())
620 }
621
622 async fn spawn_proc(
623 &mut self,
624 _cx: &Context<Self>,
625 env: Environment,
626 world_id: WorldId,
627 proc_ids: Vec<ProcId>,
628 world_size: usize,
629 ) -> Result<(), anyhow::Error> {
630 for (index, proc_id) in proc_ids.into_iter().enumerate() {
631 let proc_world_id = proc_id
632 .world_id()
633 .expect("proc must be ranked for world_id access")
634 .clone();
635 if &proc_world_id
637 == self
638 .params
639 .proc
640 .proc_id()
641 .world_id()
642 .expect("proc must be ranked for world_id access")
643 || &world_id
644 == self
645 .params
646 .proc
647 .proc_id()
648 .world_id()
649 .expect("proc must be ranked for world_id access")
650 {
651 return Err(anyhow::anyhow!(
652 "cannot spawn proc in same world {}",
653 proc_world_id
654 ));
655 }
656 match env {
657 Environment::Local => {
658 ProcActor::bootstrap(
659 proc_id,
660 world_id.clone(),
661 ChannelAddr::any(self.params.bootstrap_channel_addr.transport()),
662 self.params.bootstrap_channel_addr.clone(),
663 self.params.supervision_update_interval,
664 HashMap::new(),
665 ProcLifecycleMode::ManagedBySystem,
666 )
667 .await?;
668 }
669 Environment::Exec { ref program } => {
670 tracing::info!("spawning proc {} with program {}", proc_id, program);
671 let mut child = Command::new(program);
672 let _ = child
673 .env(HYPERACTOR_WORLD_ID, world_id.to_string())
674 .env(HYPERACTOR_PROC_ID, proc_id.to_string())
675 .env(
676 HYPERACTOR_BOOTSTRAP_ADDR,
677 self.params.bootstrap_channel_addr.to_string(),
678 )
679 .env(HYPERACTOR_WORLD_SIZE, world_size.to_string())
680 .env(
681 HYPERACTOR_RANK,
682 proc_id
683 .rank()
684 .expect("proc must be ranked for rank env var")
685 .to_string(),
686 )
687 .env(HYPERACTOR_LOCAL_RANK, index.to_string())
688 .stdin(Stdio::null())
689 .stdout(Stdio::inherit())
690 .stderr(Stdio::inherit())
691 .spawn()?;
692 }
693 }
694 }
695 Ok(())
696 }
697
698 async fn update_supervision(&mut self, cx: &Context<Self>) -> Result<(), anyhow::Error> {
699 let delay = jitter(self.params.supervision_update_interval);
701
702 if self.state != ProcState::Joined {
704 cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
705 return Ok(());
706 }
707
708 let msg = ProcSupervisionState {
709 world_id: self.params.world_id.clone(),
710 proc_id: self.params.proc.proc_id().clone(),
711 proc_addr: self.params.local_addr.clone(),
712 proc_health: ProcStatus::Alive,
713 failed_actors: Vec::new(),
714 };
715
716 match cx
717 .clock()
718 .timeout(
719 Duration::from_secs(10),
721 self.params.supervisor_actor_ref.update(cx, msg),
722 )
723 .await
724 {
725 Ok(_) => {
726 self.last_successful_supervision_update = cx.clock().system_time_now();
727 }
728 Err(_) => {}
729 }
730
731 let supervision_staleness = self
732 .last_successful_supervision_update
733 .elapsed()
734 .unwrap_or_default();
735 if supervision_staleness > 5 * self.params.supervision_update_interval {
738 tracing::error!(
739 "system actor isn't responsive to supervision update, stopping the proc"
740 );
741 self.stop(cx, Duration::from_secs(5)).await?;
744 } else {
745 let delay = jitter(self.params.supervision_update_interval);
747 cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
748 }
749
750 Ok(())
751 }
752
753 async fn stop(
754 &mut self,
755 cx: &Context<Self>,
756 timeout: Duration,
757 ) -> Result<ProcStopResult, anyhow::Error> {
758 tracing::info!("stopping proc {}", self.params.proc.proc_id());
759 self.params
760 .proc
761 .destroy_and_wait(timeout, Some(cx))
762 .await
763 .map(|(stopped, aborted)| {
764 tracing::info!("stopped proc {}", self.params.proc.proc_id());
765 ProcStopResult {
766 proc_id: self.params.proc.proc_id().clone(),
767 actors_stopped: stopped.len(),
768 actors_aborted: aborted.len(),
769 }
770 })
771 }
772
773 async fn snapshot(&mut self, _cx: &Context<Self>) -> Result<ProcSnapshot, anyhow::Error> {
774 let state = self.state.clone();
775 let actors = self.params.proc.ledger_snapshot();
776 Ok(ProcSnapshot { state, actors })
777 }
778
779 async fn local_addr(&mut self, _cx: &Context<Self>) -> Result<ChannelAddr, anyhow::Error> {
780 Ok(self.params.local_addr.clone())
781 }
782
783 async fn py_spy_dump(
784 &mut self,
785 _cx: &Context<Self>,
786 config: PySpyConfig,
787 ) -> Result<StackTrace, anyhow::Error> {
788 let pid = std::process::id() as i32;
789 tracing::info!(
790 "running py-spy on proc {}, process id: {}",
791 self.params.proc.proc_id(),
792 pid
793 );
794 let trace = match config {
795 PySpyConfig::Blocking { native, native_all } => {
796 py_spy(
797 pid,
798 native.unwrap_or_default(),
799 native_all.unwrap_or_default(),
800 true,
801 )
802 .await?
803 }
804 PySpyConfig::NonBlocking => py_spy(pid, false, false, false).await?,
805 };
806 Ok(StackTrace { trace })
807 }
808}
809
810#[async_trait]
811impl Handler<ActorSupervisionEvent> for ProcActor {
812 async fn handle(
813 &mut self,
814 cx: &Context<Self>,
815 event: ActorSupervisionEvent,
816 ) -> anyhow::Result<()> {
817 let status = event.status();
818 let message = ProcSupervisionState {
819 world_id: self.params.world_id.clone(),
820 proc_id: self.params.proc.proc_id().clone(),
821 proc_addr: self.params.local_addr.clone(),
822 proc_health: ProcStatus::Alive,
823 failed_actors: Vec::from([(event.actor_id, status)]),
824 };
825 self.params.supervisor_actor_ref.update(cx, message).await?;
826 Ok(())
827 }
828}
829
830pub async fn spawn<A: Actor + Referable>(
833 cx: &impl context::Actor,
834 proc_actor: &ActorRef<ProcActor>,
835 actor_name: &str,
836 params: &A::Params,
837) -> Result<ActorRef<A>, anyhow::Error>
838where
839 A::Params: RemoteMessage,
840{
841 let remote = Remote::collect();
842 let (spawned_port, mut spawned_receiver) = open_port(cx);
843 let ActorId(proc_id, _, _) = (*proc_actor).clone().into();
844
845 proc_actor
846 .spawn(
847 cx,
848 remote
849 .name_of::<A>()
850 .ok_or(anyhow::anyhow!("actor not registered"))?
851 .into(),
852 actor_name.into(),
853 bincode::serialize(params)?,
854 spawned_port.bind(),
855 )
856 .await?;
857
858 while spawned_receiver.recv().await?
860 != proc_id
861 .rank()
862 .expect("proc must be ranked for rank comparison")
863 {}
864
865 Ok(ActorRef::attest(proc_id.actor_id(actor_name, 0)))
867}
868
869#[cfg(test)]
870mod tests {
871 use std::assert_matches::assert_matches;
872 use std::collections::HashSet;
873 use std::time::Duration;
874
875 use hyperactor::actor::ActorStatus;
876 use hyperactor::channel;
877 use hyperactor::channel::ChannelAddr;
878 use hyperactor::channel::ChannelTransport;
879 use hyperactor::clock::Clock;
880 use hyperactor::clock::RealClock;
881 use hyperactor::data::Named;
882 use hyperactor::forward;
883 use hyperactor::id;
884 use hyperactor::reference::ActorRef;
885 use hyperactor::test_utils::pingpong::PingPongActor;
886 use hyperactor::test_utils::pingpong::PingPongActorParams;
887 use hyperactor::test_utils::pingpong::PingPongMessage;
888 use maplit::hashset;
889 use rand::Rng;
890 use rand::distributions::Alphanumeric;
891 use regex::Regex;
892
893 use super::*;
894 use crate::supervision::ProcSupervisionMessage;
895 use crate::system::ServerHandle;
896 use crate::system::System;
897
898 const MAX_WAIT_TIME: Duration = Duration::new(10, 0);
899
900 struct Bootstrapped {
901 server_handle: ServerHandle,
902 proc_actor_ref: ActorRef<ProcActor>,
903 comm_actor_ref: ActorRef<CommActor>,
904 client: Instance<()>,
905 }
906
907 async fn bootstrap() -> Bootstrapped {
908 let server_handle = System::serve(
909 ChannelAddr::any(ChannelTransport::Local),
910 Duration::from_secs(10),
911 Duration::from_secs(10),
912 )
913 .await
914 .unwrap();
915
916 let world_id = id!(world);
917 let proc_id = world_id.proc_id(0);
918 let bootstrap = ProcActor::bootstrap(
919 proc_id,
920 world_id,
921 ChannelAddr::any(ChannelTransport::Local),
922 server_handle.local_addr().clone(),
923 Duration::from_secs(1),
924 HashMap::new(),
925 ProcLifecycleMode::ManagedBySystem,
926 )
927 .await
928 .unwrap();
929
930 let mut system = System::new(server_handle.local_addr().clone());
933 let client = system.attach().await.unwrap();
934
935 let start = RealClock.now();
937 let mut proc_state;
938 loop {
939 proc_state = bootstrap.proc_actor.state(&client).await.unwrap();
940
941 if matches!(proc_state, ProcState::Joined) || start.elapsed() >= MAX_WAIT_TIME {
942 break;
943 }
944 }
945 assert_matches!(proc_state, ProcState::Joined);
946
947 Bootstrapped {
948 server_handle,
949 proc_actor_ref: bootstrap.proc_actor.bind(),
950 comm_actor_ref: bootstrap.comm_actor.bind(),
951 client,
952 }
953 }
954
955 #[tokio::test]
956 async fn test_bootstrap() {
957 let Bootstrapped { server_handle, .. } = bootstrap().await;
958
959 println!("bootrapped, now waiting");
960
961 server_handle.stop().await.unwrap();
962 server_handle.await;
963 }
964
965 #[derive(Debug, Default, Actor)]
966 #[hyperactor::export(
967 spawn = true,
968 handlers = [
969 TestActorMessage,
970 ],
971 )]
972 struct TestActor;
973
974 #[derive(Handler, HandleClient, RefClient, Serialize, Deserialize, Debug, Named)]
975 enum TestActorMessage {
976 Increment(u64, #[reply] OncePortRef<u64>),
977 Fail(String),
978 }
979
980 #[async_trait]
981 #[forward(TestActorMessage)]
982 impl TestActorMessageHandler for TestActor {
983 async fn increment(&mut self, _cx: &Context<Self>, num: u64) -> Result<u64, anyhow::Error> {
984 Ok(num + 1)
985 }
986
987 async fn fail(&mut self, _cx: &Context<Self>, err: String) -> Result<(), anyhow::Error> {
988 Err(anyhow::anyhow!(err))
989 }
990 }
991
992 #[tokio::test]
993 async fn test_stop() {
994 let Bootstrapped {
997 server_handle,
998 proc_actor_ref,
999 client,
1000 ..
1001 } = bootstrap().await;
1002
1003 const NUM_ACTORS: usize = 4usize;
1004 for i in 0..NUM_ACTORS {
1005 spawn::<TestActor>(&client, &proc_actor_ref, format!("test{i}").as_str(), &())
1006 .await
1007 .unwrap();
1008 }
1009
1010 let ProcStopResult {
1011 proc_id: _,
1012 actors_stopped,
1013 actors_aborted,
1014 } = proc_actor_ref
1015 .stop(&client, Duration::from_secs(1))
1016 .await
1017 .unwrap();
1018 assert_eq!(NUM_ACTORS + 1, actors_stopped);
1019 assert_eq!(1, actors_aborted);
1020
1021 server_handle.stop().await.unwrap();
1022 server_handle.await;
1023 }
1024
1025 #[derive(Debug, Default, Actor)]
1027 #[hyperactor::export(
1028 spawn = true,
1029 handlers = [
1030 u64,
1031 ],
1032 )]
1033 struct SleepActor {}
1034
1035 #[async_trait]
1036 impl Handler<u64> for SleepActor {
1037 async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> anyhow::Result<()> {
1038 let duration = message;
1039 RealClock.sleep(Duration::from_secs(duration)).await;
1040 Ok(())
1041 }
1042 }
1043
1044 #[tracing_test::traced_test]
1045 #[tokio::test]
1046 async fn test_stop_timeout() {
1047 let Bootstrapped {
1048 server_handle,
1049 proc_actor_ref,
1050 client,
1051 ..
1052 } = bootstrap().await;
1053
1054 const NUM_ACTORS: usize = 4usize;
1055 for i in 0..NUM_ACTORS {
1056 let sleep_secs = 5u64;
1057 let sleeper = spawn::<SleepActor>(
1058 &client,
1059 &proc_actor_ref,
1060 format!("sleeper{i}").as_str(),
1061 &(),
1062 )
1063 .await
1064 .unwrap();
1065 if i > 0 {
1066 sleeper.send(&client, sleep_secs).unwrap();
1067 }
1068 }
1069
1070 let ProcStopResult {
1071 proc_id: _,
1072 actors_stopped,
1073 actors_aborted,
1074 } = proc_actor_ref
1075 .stop(&client, Duration::from_secs(1))
1076 .await
1077 .unwrap();
1078 assert_eq!(2, actors_stopped);
1079 assert_eq!((NUM_ACTORS - 1) + 1, actors_aborted);
1080
1081 assert!(tracing_test::internal::logs_with_scope_contain(
1082 "hyperactor::proc",
1083 "world[0].proc[0]: aborting (delayed) JoinHandle"
1084 ));
1085 for i in 1..3 {
1086 assert!(tracing_test::internal::logs_with_scope_contain(
1087 "hyperactor::proc",
1088 format!("world[0].sleeper{}[0]: aborting JoinHandle", i).as_str()
1089 ));
1090 }
1091 logs_assert(|logs| {
1092 let count = logs
1093 .iter()
1094 .filter(|log| {
1095 log.contains("aborting JoinHandle")
1096 || log.contains("aborting (delayed) JoinHandle")
1097 })
1098 .count();
1099 if count == actors_aborted {
1100 Ok(())
1101 } else {
1102 Err("task abort counting error".to_string())
1103 }
1104 });
1105
1106 server_handle.stop().await.unwrap();
1107 server_handle.await;
1108 }
1109
1110 #[tokio::test]
1111 async fn test_spawn() {
1112 let Bootstrapped {
1113 server_handle,
1114 proc_actor_ref,
1115 client,
1116 ..
1117 } = bootstrap().await;
1118
1119 let test_actor_ref = spawn::<TestActor>(&client, &proc_actor_ref, "test", &())
1120 .await
1121 .unwrap();
1122
1123 let result = test_actor_ref.increment(&client, 1).await.unwrap();
1124 assert_eq!(result, 2);
1125
1126 server_handle.stop().await.unwrap();
1127 server_handle.await;
1128 }
1129
1130 #[cfg(target_os = "linux")]
1131 fn random_abstract_addr() -> ChannelAddr {
1132 let random_string = rand::thread_rng()
1133 .sample_iter(&Alphanumeric)
1134 .take(24)
1135 .map(char::from)
1136 .collect::<String>();
1137 format!("unix!@{random_string}").parse().unwrap()
1138 }
1139
1140 #[cfg(target_os = "linux")] #[tokio::test]
1142 async fn test_bootstrap_retry() {
1143 if std::env::var("CARGO_TEST").is_ok() {
1144 eprintln!("test skipped under cargo as it causes other tests to fail when run");
1145 return;
1146 }
1147
1148 let bootstrap_addr = random_abstract_addr();
1152
1153 let bootstrap_addr_clone = bootstrap_addr.clone();
1154 let handle = tokio::spawn(async move {
1155 let world_id = id!(world);
1156 let proc_id = world_id.proc_id(0);
1157 let bootstrap = ProcActor::bootstrap(
1158 proc_id,
1159 world_id,
1160 random_abstract_addr(),
1161 bootstrap_addr_clone,
1162 Duration::from_secs(1),
1163 HashMap::new(),
1164 ProcLifecycleMode::ManagedBySystem,
1165 )
1166 .await
1167 .unwrap();
1168
1169 let mut status = bootstrap.proc_actor.status();
1171 assert_eq!(*status.borrow_and_update(), ActorStatus::Idle);
1172 });
1173
1174 RealClock.sleep(Duration::from_secs(5)).await;
1177
1178 let _server_handle = System::serve(
1179 bootstrap_addr,
1180 Duration::from_secs(10),
1181 Duration::from_secs(10),
1182 )
1183 .await
1184 .unwrap();
1185
1186 handle.await.unwrap();
1188 }
1189
1190 #[tokio::test]
1191 async fn test_supervision_message_handling() {
1192 if std::env::var("CARGO_TEST").is_ok() {
1193 eprintln!("test skipped under cargo as it fails when run with others");
1194 return;
1195 }
1196
1197 let server_handle = System::serve(
1198 ChannelAddr::any(ChannelTransport::Local),
1199 Duration::from_secs(3600),
1200 Duration::from_secs(3600),
1201 )
1202 .await
1203 .unwrap();
1204
1205 let mut system = System::new(server_handle.local_addr().clone());
1207 let supervisor = system.attach().await.unwrap();
1208 let (supervisor_supervision_tx, mut supervisor_supervision_receiver) =
1209 supervisor.open_port::<ProcSupervisionMessage>();
1210 supervisor_supervision_tx.bind_to(ProcSupervisionMessage::port());
1211 let supervisor_actor_ref: ActorRef<ProcSupervisor> =
1212 ActorRef::attest(supervisor.self_id().clone());
1213
1214 let local_world_id = hyperactor::id!(test_proc);
1216 let local_proc_id = local_world_id.proc_id(0);
1217 let bootstrap = ProcActor::try_bootstrap(
1218 local_proc_id.clone(),
1219 local_world_id.clone(),
1220 ChannelAddr::any(ChannelTransport::Local),
1221 server_handle.local_addr().clone(),
1222 supervisor_actor_ref.clone(),
1223 Duration::from_secs(1),
1224 HashMap::new(),
1225 ProcLifecycleMode::ManagedBySystem,
1226 )
1227 .await
1228 .unwrap();
1229
1230 let msg = supervisor_supervision_receiver.recv().await;
1233 match msg.unwrap() {
1234 ProcSupervisionMessage::Update(state, port) => {
1235 assert_eq!(
1236 state,
1237 ProcSupervisionState {
1238 world_id: local_world_id.clone(),
1239 proc_addr: ChannelAddr::Local(3),
1240 proc_id: local_proc_id.clone(),
1241 proc_health: ProcStatus::Alive,
1242 failed_actors: Vec::new(),
1243 }
1244 );
1245 let _ = port.send(&supervisor, ());
1246 }
1247 }
1248
1249 let proc_actor_ref = bootstrap.proc_actor.bind();
1251 let test_actor_ref = spawn::<TestActor>(&supervisor, &proc_actor_ref, "test", &())
1252 .await
1253 .unwrap();
1254
1255 test_actor_ref
1256 .fail(&supervisor, "test actor is erroring out".to_string())
1257 .await
1258 .unwrap();
1259 let result = RealClock
1263 .timeout(Duration::from_secs(5), async {
1264 loop {
1265 match supervisor_supervision_receiver.recv().await {
1266 Ok(ProcSupervisionMessage::Update(state, _port)) => {
1267 match state.failed_actors.iter().find(|(failed_id, _)| {
1268 failed_id == test_actor_ref.clone().actor_id()
1269 }) {
1270 Some((_, actor_status)) => return Ok(actor_status.clone()),
1271 None => {}
1272 }
1273 }
1274 _ => anyhow::bail!("unexpected message type"),
1275 }
1276 }
1277 })
1278 .await;
1279 assert_matches!(
1280 result.unwrap().unwrap(),
1281 ActorStatus::Failed(msg) if msg.contains("test actor is erroring out")
1282 );
1283
1284 server_handle.stop().await.unwrap();
1285 server_handle.await;
1286 }
1287
1288 #[tokio::test]
1291 async fn test_bind_proc_actor_in_bootstrap() {
1292 let server_handle = System::serve(
1293 ChannelAddr::any(ChannelTransport::Local),
1294 Duration::from_secs(10),
1295 Duration::from_secs(10),
1296 )
1297 .await
1298 .unwrap();
1299 let mut system = System::new(server_handle.local_addr().clone());
1300 let client = system.attach().await.unwrap();
1301
1302 let world_id = id!(world);
1303 let proc_id = world_id.proc_id(0);
1304 let bootstrap = ProcActor::bootstrap(
1305 proc_id,
1306 world_id,
1307 ChannelAddr::any(ChannelTransport::Local),
1308 server_handle.local_addr().clone(),
1309 Duration::from_secs(1),
1310 HashMap::new(),
1311 ProcLifecycleMode::ManagedBySystem,
1312 )
1313 .await
1314 .unwrap();
1315 let proc_actor_id = bootstrap.proc_actor.actor_id().clone();
1316 let proc_actor_ref = ActorRef::<ProcActor>::attest(proc_actor_id);
1317
1318 let res = RealClock
1319 .timeout(Duration::from_secs(5), proc_actor_ref.state(&client))
1320 .await;
1321 assert!(res.is_ok());
1324 assert_matches!(res.unwrap().unwrap(), ProcState::Joined);
1325 server_handle.stop().await.unwrap();
1326 server_handle.await;
1327 }
1328
1329 #[tokio::test]
1330 async fn test_proc_snapshot() {
1331 let Bootstrapped {
1332 server_handle,
1333 proc_actor_ref,
1334 comm_actor_ref,
1335 client,
1336 ..
1337 } = bootstrap().await;
1338
1339 let root: ActorRef<TestActor> = spawn::<TestActor>(&client, &proc_actor_ref, "root", &())
1341 .await
1342 .unwrap();
1343 let another_root = spawn::<TestActor>(&client, &proc_actor_ref, "another_root", &())
1344 .await
1345 .unwrap();
1346 {
1347 let snapshot = proc_actor_ref.snapshot(&client).await.unwrap();
1348 assert_eq!(snapshot.state, ProcState::Joined);
1349 assert_eq!(
1350 snapshot.actors.roots.keys().collect::<HashSet<_>>(),
1351 hashset! {
1352 proc_actor_ref.actor_id(),
1353 comm_actor_ref.actor_id(),
1354 root.actor_id(),
1355 another_root.actor_id(),
1356 }
1357 );
1358 }
1359
1360 server_handle.stop().await.unwrap();
1361 server_handle.await;
1362 }
1363
1364 #[tokio::test]
1365 async fn test_undeliverable_message_return() {
1366 use hyperactor::mailbox::Undeliverable;
1369 use hyperactor::test_utils::pingpong::PingPongActor;
1370 use hyperactor::test_utils::pingpong::PingPongMessage;
1371
1372 let config = hyperactor::config::global::lock();
1374 let _guard = config.override_key(
1375 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1376 Duration::from_secs(1),
1377 );
1378
1379 let server_handle = System::serve(
1381 ChannelAddr::any(ChannelTransport::Tcp),
1382 Duration::from_secs(120),
1383 Duration::from_secs(120),
1384 )
1385 .await
1386 .unwrap();
1387 let mut system = System::new(server_handle.local_addr().clone());
1388
1389 let supervisor = system.attach().await.unwrap();
1391 let (sup_tx, _sup_rx) = supervisor.open_port::<ProcSupervisionMessage>();
1392 sup_tx.bind_to(ProcSupervisionMessage::port());
1393 let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
1394
1395 let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1397 channel::dial(server_handle.local_addr().clone()).unwrap(),
1398 ));
1399
1400 let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1402 let proc_forwarder =
1403 BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1404
1405 let world_id = id!(world);
1407 let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1408 let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1409 proc_0.clone(),
1410 world_id.clone(),
1411 listen_addr,
1412 server_handle.local_addr().clone(),
1413 sup_ref.clone(),
1414 Duration::from_secs(120),
1415 HashMap::new(),
1416 ProcLifecycleMode::ManagedBySystem,
1417 )
1418 .await
1419 .unwrap();
1420 let proc_0_client = proc_0.attach("client").unwrap();
1421 let (proc_0_undeliverable_tx, mut proc_0_undeliverable_rx) = proc_0_client.open_port();
1422
1423 let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1425 let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1426 proc_1.clone(),
1427 world_id.clone(),
1428 ChannelAddr::any(ChannelTransport::Tcp),
1429 server_handle.local_addr().clone(),
1430 sup_ref.clone(),
1431 Duration::from_secs(120),
1432 HashMap::new(),
1433 ProcLifecycleMode::ManagedBySystem,
1434 )
1435 .await
1436 .unwrap();
1437 let proc_1_client = proc_1.attach("client").unwrap();
1438 let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port();
1439
1440 let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1441 let ping_handle = proc_0
1445 .spawn::<PingPongActor>("ping", ping_params)
1446 .await
1447 .unwrap();
1448 let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1449 let pong_handle = proc_1
1450 .spawn::<PingPongActor>("pong", pong_params)
1451 .await
1452 .unwrap();
1453
1454 server_handle.stop().await.unwrap();
1457 server_handle.await;
1458
1459 let n = 100usize;
1460 for i in 1..(n + 1) {
1461 let ttl = 66 + i as u64; let (once_handle, _) = proc_0_client.open_once_port::<bool>();
1464 ping_handle
1465 .send(PingPongMessage(ttl, pong_handle.bind(), once_handle.bind()))
1466 .unwrap();
1467 }
1468
1469 assert!(matches!(*ping_handle.status().borrow(), ActorStatus::Idle));
1474
1475 let Ok(Undeliverable(envelope)) = proc_0_undeliverable_rx.recv().await else {
1477 unreachable!()
1478 };
1479 let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1480 let mut count = 1;
1481 while let Ok(Some(Undeliverable(envelope))) = proc_0_undeliverable_rx.try_recv() {
1482 count += 1;
1486 let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1487 }
1488 assert!(count == n);
1489 }
1490
1491 #[tracing_test::traced_test]
1492 #[tokio::test]
1493 async fn test_proc_actor_mailbox_admin_message() {
1494 use hyperactor::test_utils::pingpong::PingPongActor;
1499 use hyperactor::test_utils::pingpong::PingPongMessage;
1500
1501 let server_handle = System::serve(
1503 ChannelAddr::any(ChannelTransport::Tcp),
1504 Duration::from_secs(120),
1505 Duration::from_secs(120),
1506 )
1507 .await
1508 .unwrap();
1509 let mut system = System::new(server_handle.local_addr().clone());
1510 let system_actor = server_handle.system_actor_handle();
1511 let system_client = system.attach().await.unwrap(); let supervisor = system.attach().await.unwrap();
1515 let (sup_tx, _sup_rx) = supervisor.open_port::<ProcSupervisionMessage>();
1516 sup_tx.bind_to(ProcSupervisionMessage::port());
1517 let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
1518
1519 let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1521 channel::dial(server_handle.local_addr().clone()).unwrap(),
1522 ));
1523
1524 let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1526 let proc_forwarder =
1527 BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1528
1529 let world_id = id!(world);
1531 let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1532 let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1533 proc_0.clone(),
1534 world_id.clone(),
1535 listen_addr,
1536 server_handle.local_addr().clone(),
1537 sup_ref.clone(),
1538 Duration::from_secs(120),
1539 HashMap::new(),
1540 ProcLifecycleMode::ManagedBySystem,
1541 )
1542 .await
1543 .unwrap();
1544 let proc_0_client = proc_0.attach("client").unwrap();
1545 let (proc_0_undeliverable_tx, _proc_0_undeliverable_rx) = proc_0_client.open_port();
1546
1547 let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1549 let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1550 proc_1.clone(),
1551 world_id.clone(),
1552 ChannelAddr::any(ChannelTransport::Tcp),
1553 server_handle.local_addr().clone(),
1554 sup_ref.clone(),
1555 Duration::from_secs(120),
1556 HashMap::new(),
1557 ProcLifecycleMode::ManagedBySystem,
1558 )
1559 .await
1560 .unwrap();
1561 let proc_1_client = proc_1.attach("client").unwrap();
1562 let (proc_1_undeliverable_tx, _proc_1_undeliverable_rx) = proc_1_client.open_port();
1563
1564 let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1568 let ping_handle = proc_0
1569 .spawn::<PingPongActor>("ping", ping_params)
1570 .await
1571 .unwrap();
1572 let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1573 let pong_handle = proc_1
1574 .spawn::<PingPongActor>("pong", pong_params)
1575 .await
1576 .unwrap();
1577
1578 let ttl = 10u64; let (once_tx, once_rx) = system_client.open_once_port::<bool>();
1581 ping_handle
1582 .send(PingPongMessage(ttl, pong_handle.bind(), once_tx.bind()))
1583 .unwrap();
1584
1585 assert!(once_rx.recv().await.unwrap());
1586
1587 let expected_1 = r#"UpdateAddress {
1589 proc_id: Ranked(
1590 WorldId(
1591 "world",
1592 ),
1593 1,
1594 ),
1595 addr: Tcp("#;
1596
1597 let expected_2 = r#"UpdateAddress {
1599 proc_id: Ranked(
1600 WorldId(
1601 "world",
1602 ),
1603 0,
1604 ),
1605 addr: Tcp("#;
1606
1607 let expected_3 = r#"UpdateAddress {
1609 proc_id: Ranked(
1610 WorldId(
1611 "user",
1612 ),"#;
1613
1614 logs_assert(|logs| {
1615 let log_body = logs.join("\n");
1616
1617 let pattern = Regex::new(r"(?m)^UpdateAddress \{\n(?:.*\n)*?^\}").unwrap();
1618 let count = pattern.find_iter(&log_body).count();
1619
1620 if count != 3 {
1621 return Err(format!(
1622 "expected 3 UpdateAddress messages, found {}",
1623 count
1624 ));
1625 }
1626
1627 if !log_body.contains(expected_1) {
1628 return Err("missing expected update for proc_id 1".into());
1629 }
1630 if !log_body.contains(expected_2) {
1631 return Err("missing expected update for proc_id 0".into());
1632 }
1633 if !log_body.contains(expected_3) {
1634 return Err("missing expected update for proc_id user".into());
1635 }
1636
1637 Ok(())
1638 });
1639
1640 let (once_tx, once_rx) = system_client.open_once_port::<()>();
1641 assert_matches!(
1642 system_actor
1643 .stop(
1644 &system_client,
1645 None,
1646 Duration::from_secs(1),
1647 once_tx.bind()
1648 )
1649 .await,
1650 Ok(())
1651 );
1652 assert_matches!(once_rx.recv().await.unwrap(), ());
1653 }
1654
1655 #[tokio::test]
1656 async fn test_update_address_book_cache() {
1657 let server_handle = System::serve(
1658 ChannelAddr::any(ChannelTransport::Tcp),
1659 Duration::from_secs(2), Duration::from_secs(2), )
1662 .await
1663 .unwrap();
1664 let system_addr = server_handle.local_addr().clone();
1665 let mut system = System::new(system_addr.clone());
1666
1667 let system_client = system.attach().await.unwrap();
1668
1669 let ping_actor_id = id!(world[0].ping[0]);
1671 let (ping_actor_ref, _ping_proc_ref) =
1672 spawn_actor(&system_client, &ping_actor_id, &system_addr).await;
1673
1674 let pong_actor_id = id!(world[1].pong[0]);
1675 let (pong_actor_ref, pong_proc_ref) =
1676 spawn_actor(&system_client, &pong_actor_id, &system_addr).await;
1677
1678 let (done_tx, done_rx) = system_client.open_once_port();
1681 let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1682 ping_actor_ref
1683 .send(&system_client, ping_pong_message)
1684 .unwrap();
1685 assert!(done_rx.recv().await.unwrap());
1686
1687 let ProcStopResult { actors_aborted, .. } = pong_proc_ref
1689 .stop(&system_client, Duration::from_secs(1))
1690 .await
1691 .unwrap();
1692 assert_eq!(1, actors_aborted);
1693 let (pong_actor_ref, _pong_proc_ref) =
1694 spawn_actor(&system_client, &pong_actor_id, &system_addr).await;
1695
1696 let (done_tx, done_rx) = system_client.open_once_port();
1700 let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1701 ping_actor_ref
1702 .send(&system_client, ping_pong_message)
1703 .unwrap();
1704 assert!(done_rx.recv().await.unwrap());
1705 }
1706
1707 async fn spawn_actor(
1708 cx: &impl context::Actor,
1709 actor_id: &ActorId,
1710 system_addr: &ChannelAddr,
1711 ) -> (ActorRef<PingPongActor>, ActorRef<ProcActor>) {
1712 let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1713 let bootstrap = ProcActor::bootstrap(
1714 actor_id.proc_id().clone(),
1715 actor_id
1716 .proc_id()
1717 .world_id()
1718 .expect("proc must be ranked for bootstrap world_id")
1719 .clone(),
1720 listen_addr.clone(),
1721 system_addr.clone(),
1722 Duration::from_secs(3),
1723 HashMap::new(),
1724 ProcLifecycleMode::ManagedBySystem,
1725 )
1726 .await
1727 .unwrap();
1728 let (undeliverable_msg_tx, _) = cx.mailbox().open_port();
1729 let params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
1730 let actor_ref = spawn::<PingPongActor>(
1731 cx,
1732 &bootstrap.proc_actor.bind(),
1733 &actor_id.to_string(),
1734 ¶ms,
1735 )
1736 .await
1737 .unwrap();
1738 let proc_actor_ref = bootstrap.proc_actor.bind();
1739 (actor_ref, proc_actor_ref)
1740 }
1741}