1use core::fmt;
14use std::collections::HashMap;
15use std::process::Stdio;
16use std::time::Duration;
17use std::time::SystemTime;
18
19use async_trait::async_trait;
20use hyperactor::Actor;
21use hyperactor::Context;
22use hyperactor::Data;
23use hyperactor::HandleClient;
24use hyperactor::Handler;
25use hyperactor::Instance;
26use hyperactor::Named;
27use hyperactor::OncePortRef;
28use hyperactor::PortRef;
29use hyperactor::RefClient;
30use hyperactor::RemoteMessage;
31use hyperactor::WorldId;
32use hyperactor::actor::ActorHandle;
33use hyperactor::actor::RemoteActor;
34use hyperactor::actor::remote::Remote;
35use hyperactor::cap;
36use hyperactor::channel;
37use hyperactor::channel::ChannelAddr;
38use hyperactor::clock::Clock;
39use hyperactor::clock::ClockKind;
40use hyperactor::mailbox::BoxedMailboxSender;
41use hyperactor::mailbox::DialMailboxRouter;
42use hyperactor::mailbox::MailboxAdminMessage;
43use hyperactor::mailbox::MailboxAdminMessageHandler;
44use hyperactor::mailbox::MailboxClient;
45use hyperactor::mailbox::MailboxServer;
46use hyperactor::mailbox::MailboxServerHandle;
47use hyperactor::mailbox::open_port;
48use hyperactor::proc::ActorLedgerSnapshot;
49use hyperactor::proc::Proc;
50use hyperactor::reference::ActorId;
51use hyperactor::reference::ActorRef;
52use hyperactor::reference::Index;
53use hyperactor::reference::ProcId;
54use hyperactor::supervision::ActorSupervisionEvent;
55use hyperactor_mesh::comm::CommActor;
56use serde::Deserialize;
57use serde::Serialize;
58use tokio::process::Command;
59use tokio::sync::watch;
60use tokio_retry::strategy::jitter;
61
62use crate::pyspy::PySpyTrace;
63use crate::pyspy::py_spy;
64use crate::supervision::ProcStatus;
65use crate::supervision::ProcSupervisionMessageClient;
66use crate::supervision::ProcSupervisionState;
67use crate::supervision::ProcSupervisor;
68use crate::system_actor::ProcLifecycleMode;
69use crate::system_actor::SYSTEM_ACTOR_REF;
70use crate::system_actor::SystemActor;
71use crate::system_actor::SystemMessageClient;
72
73static HYPERACTOR_WORLD_ID: &str = "HYPERACTOR_WORLD_ID";
74static HYPERACTOR_PROC_ID: &str = "HYPERACTOR_PROC_ID";
75static HYPERACTOR_BOOTSTRAP_ADDR: &str = "HYPERACTOR_BOOTSTRAP_ADDR";
76static HYPERACTOR_WORLD_SIZE: &str = "HYPERACTOR_WORLD_SIZE";
77static HYPERACTOR_RANK: &str = "HYPERACTOR_RANK";
78static HYPERACTOR_LOCAL_RANK: &str = "HYPERACTOR_LOCAL_RANK";
79
80#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
82pub enum Environment {
83 Local,
85
86 Exec {
96 program: String,
98 },
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
103pub enum ProcState {
104 AwaitingJoin,
106
107 Joined,
109}
110
111impl fmt::Display for ProcState {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 match self {
114 Self::AwaitingJoin => write!(f, "AwaitingJoin"),
115 Self::Joined => write!(f, "Joined"),
116 }
117 }
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
122pub struct ProcStopResult {
123 pub proc_id: ProcId,
125 pub actors_stopped: usize,
127 pub actors_aborted: usize,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
133pub struct ProcSnapshot {
134 pub state: ProcState,
136 pub actors: ActorLedgerSnapshot,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
142pub enum PySpyConfig {
143 NonBlocking,
145 Blocking {
148 native: Option<bool>,
150 native_all: Option<bool>,
152 },
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
158pub struct StackTrace {
159 pub trace: PySpyTrace,
161}
162
163#[derive(
165 Handler,
166 HandleClient,
167 RefClient,
168 Debug,
169 Serialize,
170 Deserialize,
171 Clone,
172 PartialEq,
173 Named
174)]
175pub enum ProcMessage {
176 #[log_level(debug)]
180 Joined(),
181
182 #[log_level(debug)]
184 State {
185 #[reply]
187 ret: OncePortRef<ProcState>,
188 },
189
190 Spawn {
192 actor_type: String,
194 actor_name: String,
196 params_data: Data,
198 status_port: PortRef<Index>,
200 },
201
202 SpawnProc {
204 env: Environment,
206 world_id: WorldId,
208 proc_ids: Vec<ProcId>,
210 world_size: usize,
212 },
213
214 #[log_level(debug)]
216 UpdateSupervision(),
217
218 Stop {
226 timeout: Duration,
228 #[reply]
230 reply_to: OncePortRef<ProcStopResult>,
231 },
232
233 #[log_level(debug)]
235 Snapshot {
236 #[reply]
238 reply_to: OncePortRef<ProcSnapshot>,
239 },
240
241 LocalAddr {
243 #[reply]
245 reply_to: OncePortRef<ChannelAddr>,
246 },
247
248 #[log_level(debug)]
250 PySpyDump {
251 config: PySpyConfig,
253 #[reply]
255 reply_to: OncePortRef<StackTrace>,
256 },
257}
258
259#[derive(Debug, Clone)]
261pub struct ProcActorParams {
262 pub proc: Proc,
264
265 pub world_id: WorldId,
267
268 pub system_actor_ref: ActorRef<SystemActor>,
270
271 pub bootstrap_channel_addr: ChannelAddr,
275
276 pub local_addr: ChannelAddr,
279
280 pub state_watch: watch::Sender<ProcState>,
282
283 pub supervisor_actor_ref: ActorRef<ProcSupervisor>,
285
286 pub supervision_update_interval: Duration,
288
289 pub labels: HashMap<String, String>,
292
293 pub lifecycle_mode: ProcLifecycleMode,
302}
303
304#[derive(Debug)]
306pub struct BootstrappedProc {
307 pub proc_actor: ActorHandle<ProcActor>,
309 pub comm_actor: ActorHandle<CommActor>,
311 pub mailbox: MailboxServerHandle,
313}
314
315#[derive(Debug)]
319#[hyperactor::export(
320 handlers = [
321 ProcMessage,
322 MailboxAdminMessage,
323 ],
324)]
325pub struct ProcActor {
326 params: ProcActorParams,
327 state: ProcState,
328 remote: Remote,
329 last_successful_supervision_update: SystemTime,
330}
331
332impl ProcActor {
333 #[hyperactor::instrument]
337 pub async fn bootstrap(
338 proc_id: ProcId,
339 world_id: WorldId,
340 listen_addr: ChannelAddr,
341 bootstrap_addr: ChannelAddr,
342 supervision_update_interval: Duration,
343 labels: HashMap<String, String>,
344 lifecycle_mode: ProcLifecycleMode,
345 ) -> Result<BootstrappedProc, anyhow::Error> {
346 let system_supervision_ref: ActorRef<ProcSupervisor> =
347 ActorRef::attest(SYSTEM_ACTOR_REF.actor_id().clone());
348
349 Self::try_bootstrap(
350 proc_id.clone(),
351 world_id.clone(),
352 listen_addr.clone(),
353 bootstrap_addr.clone(),
354 system_supervision_ref.clone(),
355 supervision_update_interval,
356 labels,
357 lifecycle_mode,
358 )
359 .await
360 .inspect_err(|err| {
361 tracing::error!(
362 "bootstrap {} {} {} {}: {}",
363 proc_id,
364 world_id,
365 listen_addr,
366 bootstrap_addr,
367 err
368 );
369 })
370 }
371
372 #[hyperactor::instrument]
376 pub async fn try_bootstrap(
377 proc_id: ProcId,
378 world_id: WorldId,
379 listen_addr: ChannelAddr,
380 bootstrap_addr: ChannelAddr,
381 supervisor_actor_ref: ActorRef<ProcSupervisor>,
382 supervision_update_interval: Duration,
383 labels: HashMap<String, String>,
384 lifecycle_mode: ProcLifecycleMode,
385 ) -> Result<BootstrappedProc, anyhow::Error> {
386 let system_sender =
387 BoxedMailboxSender::new(MailboxClient::new(channel::dial(bootstrap_addr.clone())?));
388 let clock = ClockKind::for_channel_addr(&listen_addr);
389
390 let proc_forwarder =
391 BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
392 let proc = Proc::new_with_clock(proc_id.clone(), proc_forwarder, clock);
393 Self::bootstrap_for_proc(
394 proc,
395 world_id,
396 listen_addr,
397 bootstrap_addr,
398 supervisor_actor_ref,
399 supervision_update_interval,
400 labels,
401 lifecycle_mode,
402 )
403 .await
404 }
405
406 #[hyperactor::instrument]
410 pub async fn bootstrap_for_proc(
411 proc: Proc,
412 world_id: WorldId,
413 listen_addr: ChannelAddr,
414 bootstrap_addr: ChannelAddr,
415 supervisor_actor_ref: ActorRef<ProcSupervisor>,
416 supervision_update_interval: Duration,
417 labels: HashMap<String, String>,
418 lifecycle_mode: ProcLifecycleMode,
419 ) -> Result<BootstrappedProc, anyhow::Error> {
420 let (local_addr, rx) = channel::serve(listen_addr).await?;
421 let mailbox_handle = proc.clone().serve(rx);
422 let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin);
423
424 let handle = match proc
425 .clone()
426 .spawn::<Self>(
427 "proc",
428 ProcActorParams {
429 proc: proc.clone(),
430 world_id: world_id.clone(),
431 system_actor_ref: SYSTEM_ACTOR_REF.clone(),
432 bootstrap_channel_addr: bootstrap_addr,
433 local_addr,
434 state_watch: state_tx,
435 supervisor_actor_ref,
436 supervision_update_interval,
437 labels,
438 lifecycle_mode,
439 },
440 )
441 .await
442 {
443 Ok(handle) => handle,
444 Err(e) => {
445 Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
446 return Err(e);
447 }
448 };
449
450 let comm_actor = match proc
451 .clone()
452 .spawn::<CommActor>("comm", Default::default())
453 .await
454 {
455 Ok(handle) => handle,
456 Err(e) => {
457 Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
458 return Err(e);
459 }
460 };
461 comm_actor.bind::<CommActor>();
462
463 loop {
464 let proc_state = state_rx.borrow_and_update().clone();
465 tracing::info!("{}: state: {:?}", &proc.proc_id(), proc_state);
466 if matches!(proc_state, ProcState::Joined) {
467 break;
468 }
469 match state_rx.changed().await {
470 Ok(_) => {}
471 Err(e) => {
472 Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
473 return Err(e.into());
474 }
475 }
476 }
477
478 proc.set_supervision_coordinator(handle.port::<ActorSupervisionEvent>())?;
479
480 Ok(BootstrappedProc {
481 proc_actor: handle,
482 mailbox: mailbox_handle,
483 comm_actor,
484 })
485 }
486
487 async fn failed_proc_bootstrap_cleanup(mailbox_handle: MailboxServerHandle) {
490 mailbox_handle.stop("failed proc bootstrap cleanup");
491 if let Err(shutdown_err) = mailbox_handle.await {
492 tracing::error!(
494 "error shutting down during a failed bootstrap attempt: {}",
495 shutdown_err
496 );
497 }
498 }
499}
500
501#[async_trait]
502impl Actor for ProcActor {
503 type Params = ProcActorParams;
504
505 async fn new(params: ProcActorParams) -> Result<Self, anyhow::Error> {
506 let last_successful_supervision_update = params.proc.clock().system_time_now();
507 Ok(Self {
508 params,
509 state: ProcState::AwaitingJoin,
510 remote: Remote::collect(),
511 last_successful_supervision_update,
512 })
513 }
514
515 async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
516 this.bind::<Self>();
518
519 self.params
521 .system_actor_ref
522 .join(
523 this,
524 self.params.world_id.clone(),
525 self.params.proc.proc_id().clone(),
526 this.port().bind(),
527 self.params.local_addr.clone(),
528 self.params.labels.clone(),
529 self.params.lifecycle_mode.clone(),
530 )
531 .await?;
532
533 if self.params.supervision_update_interval > Duration::from_secs(0)
538 && self.params.lifecycle_mode == ProcLifecycleMode::ManagedBySystem
539 {
540 this.self_message_with_delay(
541 ProcMessage::UpdateSupervision(),
542 self.params.supervision_update_interval,
543 )?;
544 }
545
546 Ok(())
547 }
548}
549
550impl ProcActor {
551 fn rank(&self) -> Index {
553 self.params
554 .proc
555 .proc_id()
556 .rank()
557 .expect("proc must be ranked")
558 }
559}
560
561#[hyperactor::forward(MailboxAdminMessage)]
562#[async_trait]
563impl MailboxAdminMessageHandler for ProcActor {
564 async fn update_address(
565 &mut self,
566 cx: &Context<Self>,
567 proc_id: ProcId,
568 addr: ChannelAddr,
569 ) -> Result<(), anyhow::Error> {
570 tracing::trace!(
571 "received address update:\n{:#?}",
572 MailboxAdminMessage::UpdateAddress {
573 proc_id: proc_id.clone(),
574 addr: addr.clone()
575 }
576 );
577 let forwarder = cx.proc().forwarder();
578 if let Some(router) = forwarder.downcast_ref::<DialMailboxRouter>() {
579 router.bind(proc_id.into(), addr);
580 } else {
581 tracing::warn!(
582 "proc {} received update_address but does not use a DialMailboxRouter",
583 cx.proc().proc_id()
584 );
585 }
586
587 Ok(())
588 }
589}
590
591#[async_trait]
592#[hyperactor::forward(ProcMessage)]
593impl ProcMessageHandler for ProcActor {
594 async fn joined(&mut self, _cx: &Context<Self>) -> Result<(), anyhow::Error> {
595 self.state = ProcState::Joined;
596 let _ = self.params.state_watch.send(self.state.clone());
597 Ok(())
598 }
599
600 async fn state(&mut self, _cx: &Context<Self>) -> Result<ProcState, anyhow::Error> {
601 Ok(self.state.clone())
602 }
603
604 async fn spawn(
605 &mut self,
606 cx: &Context<Self>,
607 actor_type: String,
608 actor_name: String,
609 params_data: Data,
610 status_port: PortRef<Index>,
611 ) -> Result<(), anyhow::Error> {
612 let _actor_id = self
613 .remote
614 .gspawn(&self.params.proc, &actor_type, &actor_name, params_data)
615 .await?;
616
617 status_port.send(cx, self.rank())?;
619 Ok(())
620 }
621
622 async fn spawn_proc(
623 &mut self,
624 _cx: &Context<Self>,
625 env: Environment,
626 world_id: WorldId,
627 proc_ids: Vec<ProcId>,
628 world_size: usize,
629 ) -> Result<(), anyhow::Error> {
630 for (index, proc_id) in proc_ids.into_iter().enumerate() {
631 let proc_world_id = proc_id
632 .world_id()
633 .expect("proc must be ranked for world_id access")
634 .clone();
635 if &proc_world_id
637 == self
638 .params
639 .proc
640 .proc_id()
641 .world_id()
642 .expect("proc must be ranked for world_id access")
643 || &world_id
644 == self
645 .params
646 .proc
647 .proc_id()
648 .world_id()
649 .expect("proc must be ranked for world_id access")
650 {
651 return Err(anyhow::anyhow!(
652 "cannot spawn proc in same world {}",
653 proc_world_id
654 ));
655 }
656 match env {
657 Environment::Local => {
658 ProcActor::bootstrap(
659 proc_id,
660 world_id.clone(),
661 ChannelAddr::any(self.params.bootstrap_channel_addr.transport()),
662 self.params.bootstrap_channel_addr.clone(),
663 self.params.supervision_update_interval,
664 HashMap::new(),
665 ProcLifecycleMode::ManagedBySystem,
666 )
667 .await?;
668 }
669 Environment::Exec { ref program } => {
670 tracing::info!("spawning proc {} with program {}", proc_id, program);
671 let mut child = Command::new(program);
672 let _ = child
673 .env(HYPERACTOR_WORLD_ID, world_id.to_string())
674 .env(HYPERACTOR_PROC_ID, proc_id.to_string())
675 .env(
676 HYPERACTOR_BOOTSTRAP_ADDR,
677 self.params.bootstrap_channel_addr.to_string(),
678 )
679 .env(HYPERACTOR_WORLD_SIZE, world_size.to_string())
680 .env(
681 HYPERACTOR_RANK,
682 proc_id
683 .rank()
684 .expect("proc must be ranked for rank env var")
685 .to_string(),
686 )
687 .env(HYPERACTOR_LOCAL_RANK, index.to_string())
688 .stdin(Stdio::null())
689 .stdout(Stdio::inherit())
690 .stderr(Stdio::inherit())
691 .spawn()?;
692 }
693 }
694 }
695 Ok(())
696 }
697
698 async fn update_supervision(&mut self, cx: &Context<Self>) -> Result<(), anyhow::Error> {
699 let delay = jitter(self.params.supervision_update_interval);
701
702 if self.state != ProcState::Joined {
704 cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
705 return Ok(());
706 }
707
708 let msg = ProcSupervisionState {
709 world_id: self.params.world_id.clone(),
710 proc_id: self.params.proc.proc_id().clone(),
711 proc_addr: self.params.local_addr.clone(),
712 proc_health: ProcStatus::Alive,
713 failed_actors: Vec::new(),
714 };
715
716 match cx
717 .clock()
718 .timeout(
719 Duration::from_secs(10),
721 self.params.supervisor_actor_ref.update(cx, msg),
722 )
723 .await
724 {
725 Ok(_) => {
726 self.last_successful_supervision_update = cx.clock().system_time_now();
727 }
728 Err(_) => {}
729 }
730
731 let supervision_staleness = self
732 .last_successful_supervision_update
733 .elapsed()
734 .unwrap_or_default();
735 if supervision_staleness > 5 * self.params.supervision_update_interval {
738 tracing::error!(
739 "system actor isn't responsive to supervision update, stopping the proc"
740 );
741 self.stop(cx, Duration::from_secs(5)).await?;
744 } else {
745 let delay = jitter(self.params.supervision_update_interval);
747 cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
748 }
749
750 Ok(())
751 }
752
753 async fn stop(
754 &mut self,
755 cx: &Context<Self>,
756 timeout: Duration,
757 ) -> Result<ProcStopResult, anyhow::Error> {
758 tracing::info!("stopping proc {}", self.params.proc.proc_id());
759 self.params
760 .proc
761 .destroy_and_wait(timeout, Some(cx.self_id()))
762 .await
763 .map(|(stopped, aborted)| {
764 tracing::info!("stopped proc {}", self.params.proc.proc_id());
765 ProcStopResult {
766 proc_id: self.params.proc.proc_id().clone(),
767 actors_stopped: stopped.len(),
768 actors_aborted: aborted.len(),
769 }
770 })
771 }
772
773 async fn snapshot(&mut self, _cx: &Context<Self>) -> Result<ProcSnapshot, anyhow::Error> {
774 let state = self.state.clone();
775 let actors = self.params.proc.ledger_snapshot();
776 Ok(ProcSnapshot { state, actors })
777 }
778
779 async fn local_addr(&mut self, _cx: &Context<Self>) -> Result<ChannelAddr, anyhow::Error> {
780 Ok(self.params.local_addr.clone())
781 }
782
783 async fn py_spy_dump(
784 &mut self,
785 _cx: &Context<Self>,
786 config: PySpyConfig,
787 ) -> Result<StackTrace, anyhow::Error> {
788 let pid = std::process::id() as i32;
789 tracing::info!(
790 "running py-spy on proc {}, process id: {}",
791 self.params.proc.proc_id(),
792 pid
793 );
794 let trace = match config {
795 PySpyConfig::Blocking { native, native_all } => {
796 py_spy(
797 pid,
798 native.unwrap_or_default(),
799 native_all.unwrap_or_default(),
800 true,
801 )
802 .await?
803 }
804 PySpyConfig::NonBlocking => py_spy(pid, false, false, false).await?,
805 };
806 Ok(StackTrace { trace })
807 }
808}
809
810#[async_trait]
811impl Handler<ActorSupervisionEvent> for ProcActor {
812 async fn handle(
813 &mut self,
814 cx: &Context<Self>,
815 event: ActorSupervisionEvent,
816 ) -> anyhow::Result<()> {
817 let status = event.status();
818 let message = ProcSupervisionState {
819 world_id: self.params.world_id.clone(),
820 proc_id: self.params.proc.proc_id().clone(),
821 proc_addr: self.params.local_addr.clone(),
822 proc_health: ProcStatus::Alive,
823 failed_actors: Vec::from([(event.actor_id, status)]),
824 };
825 self.params.supervisor_actor_ref.update(cx, message).await?;
826 Ok(())
827 }
828}
829
830pub async fn spawn<A: Actor + RemoteActor>(
833 caps: &(impl cap::CanSend + cap::CanOpenPort),
834 proc_actor: &ActorRef<ProcActor>,
835 actor_name: &str,
836 params: &A::Params,
837) -> Result<ActorRef<A>, anyhow::Error>
838where
839 A::Params: RemoteMessage,
840{
841 let remote = Remote::collect();
842 let (spawned_port, mut spawned_receiver) = open_port(caps);
843 let ActorId(proc_id, _, _) = (*proc_actor).clone().into();
844
845 proc_actor
846 .spawn(
847 caps,
848 remote
849 .name_of::<A>()
850 .ok_or(anyhow::anyhow!("actor not registered"))?
851 .into(),
852 actor_name.into(),
853 bincode::serialize(params)?,
854 spawned_port.bind(),
855 )
856 .await?;
857
858 while spawned_receiver.recv().await?
860 != proc_id
861 .rank()
862 .expect("proc must be ranked for rank comparison")
863 {}
864
865 Ok(ActorRef::attest(proc_id.actor_id(actor_name, 0)))
867}
868
869#[cfg(test)]
870mod tests {
871 use std::assert_matches::assert_matches;
872 use std::collections::HashSet;
873 use std::time::Duration;
874
875 use hyperactor::actor::ActorStatus;
876 use hyperactor::channel;
877 use hyperactor::channel::ChannelAddr;
878 use hyperactor::channel::ChannelTransport;
879 use hyperactor::clock::Clock;
880 use hyperactor::clock::RealClock;
881 use hyperactor::data::Named;
882 use hyperactor::forward;
883 use hyperactor::id;
884 use hyperactor::mailbox::Mailbox;
885 use hyperactor::reference::ActorRef;
886 use hyperactor::test_utils::pingpong::PingPongActor;
887 use hyperactor::test_utils::pingpong::PingPongActorParams;
888 use hyperactor::test_utils::pingpong::PingPongMessage;
889 use maplit::hashset;
890 use rand::Rng;
891 use rand::distributions::Alphanumeric;
892 use regex::Regex;
893
894 use super::*;
895 use crate::supervision::ProcSupervisionMessage;
896 use crate::system::ServerHandle;
897 use crate::system::System;
898
899 const MAX_WAIT_TIME: Duration = Duration::new(10, 0);
900
901 struct Bootstrapped {
902 server_handle: ServerHandle,
903 proc_actor_ref: ActorRef<ProcActor>,
904 comm_actor_ref: ActorRef<CommActor>,
905 client: Mailbox,
906 }
907
908 async fn bootstrap() -> Bootstrapped {
909 let server_handle = System::serve(
910 ChannelAddr::any(ChannelTransport::Local),
911 Duration::from_secs(10),
912 Duration::from_secs(10),
913 )
914 .await
915 .unwrap();
916
917 let world_id = id!(world);
918 let proc_id = world_id.proc_id(0);
919 let bootstrap = ProcActor::bootstrap(
920 proc_id,
921 world_id,
922 ChannelAddr::any(ChannelTransport::Local),
923 server_handle.local_addr().clone(),
924 Duration::from_secs(1),
925 HashMap::new(),
926 ProcLifecycleMode::ManagedBySystem,
927 )
928 .await
929 .unwrap();
930
931 let mut system = System::new(server_handle.local_addr().clone());
934 let client = system.attach().await.unwrap();
935
936 let start = RealClock.now();
938 let mut proc_state;
939 loop {
940 proc_state = bootstrap.proc_actor.state(&client).await.unwrap();
941
942 if matches!(proc_state, ProcState::Joined) || start.elapsed() >= MAX_WAIT_TIME {
943 break;
944 }
945 }
946 assert_matches!(proc_state, ProcState::Joined);
947
948 Bootstrapped {
949 server_handle,
950 proc_actor_ref: bootstrap.proc_actor.bind(),
951 comm_actor_ref: bootstrap.comm_actor.bind(),
952 client,
953 }
954 }
955
956 #[tokio::test]
957 async fn test_bootstrap() {
958 let Bootstrapped { server_handle, .. } = bootstrap().await;
959
960 println!("bootrapped, now waiting");
961
962 server_handle.stop().await.unwrap();
963 server_handle.await;
964 }
965
966 #[derive(Debug, Default, Actor)]
967 #[hyperactor::export(
968 spawn = true,
969 handlers = [
970 TestActorMessage,
971 ],
972 )]
973 struct TestActor;
974
975 #[derive(Handler, HandleClient, RefClient, Serialize, Deserialize, Debug, Named)]
976 enum TestActorMessage {
977 Increment(u64, #[reply] OncePortRef<u64>),
978 Fail(String),
979 }
980
981 #[async_trait]
982 #[forward(TestActorMessage)]
983 impl TestActorMessageHandler for TestActor {
984 async fn increment(&mut self, _cx: &Context<Self>, num: u64) -> Result<u64, anyhow::Error> {
985 Ok(num + 1)
986 }
987
988 async fn fail(&mut self, _cx: &Context<Self>, err: String) -> Result<(), anyhow::Error> {
989 Err(anyhow::anyhow!(err))
990 }
991 }
992
993 #[tokio::test]
994 async fn test_stop() {
995 let Bootstrapped {
998 server_handle,
999 proc_actor_ref,
1000 client,
1001 ..
1002 } = bootstrap().await;
1003
1004 const NUM_ACTORS: usize = 4usize;
1005 for i in 0..NUM_ACTORS {
1006 spawn::<TestActor>(&client, &proc_actor_ref, format!("test{i}").as_str(), &())
1007 .await
1008 .unwrap();
1009 }
1010
1011 let ProcStopResult {
1012 proc_id: _,
1013 actors_stopped,
1014 actors_aborted,
1015 } = proc_actor_ref
1016 .stop(&client, Duration::from_secs(1))
1017 .await
1018 .unwrap();
1019 assert_eq!(NUM_ACTORS + 1, actors_stopped);
1020 assert_eq!(1, actors_aborted);
1021
1022 server_handle.stop().await.unwrap();
1023 server_handle.await;
1024 }
1025
1026 #[derive(Debug, Default, Actor)]
1028 #[hyperactor::export(
1029 spawn = true,
1030 handlers = [
1031 u64,
1032 ],
1033 )]
1034 struct SleepActor {}
1035
1036 #[async_trait]
1037 impl Handler<u64> for SleepActor {
1038 async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> anyhow::Result<()> {
1039 let duration = message;
1040 RealClock.sleep(Duration::from_secs(duration)).await;
1041 Ok(())
1042 }
1043 }
1044
1045 #[tracing_test::traced_test]
1046 #[tokio::test]
1047 async fn test_stop_timeout() {
1048 let Bootstrapped {
1049 server_handle,
1050 proc_actor_ref,
1051 client,
1052 ..
1053 } = bootstrap().await;
1054
1055 const NUM_ACTORS: usize = 4usize;
1056 for i in 0..NUM_ACTORS {
1057 let sleep_secs = 5u64;
1058 let sleeper = spawn::<SleepActor>(
1059 &client,
1060 &proc_actor_ref,
1061 format!("sleeper{i}").as_str(),
1062 &(),
1063 )
1064 .await
1065 .unwrap();
1066 if i > 0 {
1067 sleeper.send(&client, sleep_secs).unwrap();
1068 }
1069 }
1070
1071 let ProcStopResult {
1072 proc_id: _,
1073 actors_stopped,
1074 actors_aborted,
1075 } = proc_actor_ref
1076 .stop(&client, Duration::from_secs(1))
1077 .await
1078 .unwrap();
1079 assert_eq!(2, actors_stopped);
1080 assert_eq!((NUM_ACTORS - 1) + 1, actors_aborted);
1081
1082 assert!(tracing_test::internal::logs_with_scope_contain(
1083 "hyperactor::proc",
1084 "world[0].proc[0]: aborting JoinHandle"
1085 ));
1086 for i in 1..3 {
1087 assert!(tracing_test::internal::logs_with_scope_contain(
1088 "hyperactor::proc",
1089 format!("world[0].sleeper{}[0]: aborting JoinHandle", i).as_str()
1090 ));
1091 }
1092 logs_assert(|logs| {
1093 let count = logs
1094 .iter()
1095 .filter(|log| log.contains("aborting JoinHandle"))
1096 .count();
1097 if count == actors_aborted {
1098 Ok(())
1099 } else {
1100 Err("task abort counting error".to_string())
1101 }
1102 });
1103
1104 server_handle.stop().await.unwrap();
1105 server_handle.await;
1106 }
1107
1108 #[tokio::test]
1109 async fn test_spawn() {
1110 let Bootstrapped {
1111 server_handle,
1112 proc_actor_ref,
1113 client,
1114 ..
1115 } = bootstrap().await;
1116
1117 let test_actor_ref = spawn::<TestActor>(&client, &proc_actor_ref, "test", &())
1118 .await
1119 .unwrap();
1120
1121 let result = test_actor_ref.increment(&client, 1).await.unwrap();
1122 assert_eq!(result, 2);
1123
1124 server_handle.stop().await.unwrap();
1125 server_handle.await;
1126 }
1127
1128 #[cfg(target_os = "linux")]
1129 fn random_abstract_addr() -> ChannelAddr {
1130 let random_string = rand::thread_rng()
1131 .sample_iter(&Alphanumeric)
1132 .take(24)
1133 .map(char::from)
1134 .collect::<String>();
1135 format!("unix!@{random_string}").parse().unwrap()
1136 }
1137
1138 #[cfg(target_os = "linux")] #[tokio::test]
1140 async fn test_bootstrap_retry() {
1141 if std::env::var("CARGO_TEST").is_ok() {
1142 eprintln!("test skipped under cargo as it causes other tests to fail when run");
1143 return;
1144 }
1145
1146 let bootstrap_addr = random_abstract_addr();
1150
1151 let bootstrap_addr_clone = bootstrap_addr.clone();
1152 let handle = tokio::spawn(async move {
1153 let world_id = id!(world);
1154 let proc_id = world_id.proc_id(0);
1155 let bootstrap = ProcActor::bootstrap(
1156 proc_id,
1157 world_id,
1158 random_abstract_addr(),
1159 bootstrap_addr_clone,
1160 Duration::from_secs(1),
1161 HashMap::new(),
1162 ProcLifecycleMode::ManagedBySystem,
1163 )
1164 .await
1165 .unwrap();
1166
1167 let mut status = bootstrap.proc_actor.status();
1169 assert_eq!(*status.borrow_and_update(), ActorStatus::Idle);
1170 });
1171
1172 RealClock.sleep(Duration::from_secs(5)).await;
1175
1176 let _server_handle = System::serve(
1177 bootstrap_addr,
1178 Duration::from_secs(10),
1179 Duration::from_secs(10),
1180 )
1181 .await
1182 .unwrap();
1183
1184 handle.await.unwrap();
1186 }
1187
1188 #[tokio::test]
1189 async fn test_supervision_message_handling() {
1190 if std::env::var("CARGO_TEST").is_ok() {
1191 eprintln!("test skipped under cargo as it fails when run with others");
1192 return;
1193 }
1194
1195 let server_handle = System::serve(
1196 ChannelAddr::any(ChannelTransport::Local),
1197 Duration::from_secs(3600),
1198 Duration::from_secs(3600),
1199 )
1200 .await
1201 .unwrap();
1202
1203 let mut system = System::new(server_handle.local_addr().clone());
1205 let supervisor_mailbox = system.attach().await.unwrap();
1206 let (supervisor_supervision_tx, mut supervisor_supervision_receiver) =
1207 supervisor_mailbox.open_port::<ProcSupervisionMessage>();
1208 supervisor_supervision_tx.bind_to(ProcSupervisionMessage::port());
1209 let supervisor_actor_ref: ActorRef<ProcSupervisor> =
1210 ActorRef::attest(supervisor_mailbox.actor_id().clone());
1211
1212 let local_world_id = hyperactor::id!(test_proc);
1214 let local_proc_id = local_world_id.proc_id(0);
1215 let bootstrap = ProcActor::try_bootstrap(
1216 local_proc_id.clone(),
1217 local_world_id.clone(),
1218 ChannelAddr::any(ChannelTransport::Local),
1219 server_handle.local_addr().clone(),
1220 supervisor_actor_ref.clone(),
1221 Duration::from_secs(1),
1222 HashMap::new(),
1223 ProcLifecycleMode::ManagedBySystem,
1224 )
1225 .await
1226 .unwrap();
1227
1228 let msg = supervisor_supervision_receiver.recv().await;
1231 match msg.unwrap() {
1232 ProcSupervisionMessage::Update(state, port) => {
1233 assert_eq!(
1234 state,
1235 ProcSupervisionState {
1236 world_id: local_world_id.clone(),
1237 proc_addr: ChannelAddr::Local(3),
1238 proc_id: local_proc_id.clone(),
1239 proc_health: ProcStatus::Alive,
1240 failed_actors: Vec::new(),
1241 }
1242 );
1243 let _ = port.send(&supervisor_mailbox, ());
1244 }
1245 }
1246
1247 let proc_actor_ref = bootstrap.proc_actor.bind();
1249 let test_actor_ref = spawn::<TestActor>(&supervisor_mailbox, &proc_actor_ref, "test", &())
1250 .await
1251 .unwrap();
1252
1253 test_actor_ref
1254 .fail(
1255 &supervisor_mailbox,
1256 "test actor is erroring out".to_string(),
1257 )
1258 .await
1259 .unwrap();
1260 let result = RealClock
1264 .timeout(Duration::from_secs(5), async {
1265 loop {
1266 match supervisor_supervision_receiver.recv().await {
1267 Ok(ProcSupervisionMessage::Update(state, _port)) => {
1268 match state.failed_actors.iter().find(|(failed_id, _)| {
1269 failed_id == test_actor_ref.clone().actor_id()
1270 }) {
1271 Some((_, actor_status)) => return Ok(actor_status.clone()),
1272 None => {}
1273 }
1274 }
1275 _ => anyhow::bail!("unexpected message type"),
1276 }
1277 }
1278 })
1279 .await;
1280 assert_matches!(
1281 result.unwrap().unwrap(),
1282 ActorStatus::Failed(msg) if msg.contains("test actor is erroring out")
1283 );
1284
1285 server_handle.stop().await.unwrap();
1286 server_handle.await;
1287 }
1288
1289 #[tokio::test]
1292 async fn test_bind_proc_actor_in_bootstrap() {
1293 let server_handle = System::serve(
1294 ChannelAddr::any(ChannelTransport::Local),
1295 Duration::from_secs(10),
1296 Duration::from_secs(10),
1297 )
1298 .await
1299 .unwrap();
1300 let mut system = System::new(server_handle.local_addr().clone());
1301 let client = system.attach().await.unwrap();
1302
1303 let world_id = id!(world);
1304 let proc_id = world_id.proc_id(0);
1305 let bootstrap = ProcActor::bootstrap(
1306 proc_id,
1307 world_id,
1308 ChannelAddr::any(ChannelTransport::Local),
1309 server_handle.local_addr().clone(),
1310 Duration::from_secs(1),
1311 HashMap::new(),
1312 ProcLifecycleMode::ManagedBySystem,
1313 )
1314 .await
1315 .unwrap();
1316 let proc_actor_id = bootstrap.proc_actor.actor_id().clone();
1317 let proc_actor_ref = ActorRef::<ProcActor>::attest(proc_actor_id);
1318
1319 let res = RealClock
1320 .timeout(Duration::from_secs(5), proc_actor_ref.state(&client))
1321 .await;
1322 assert!(res.is_ok());
1325 assert_matches!(res.unwrap().unwrap(), ProcState::Joined);
1326 server_handle.stop().await.unwrap();
1327 server_handle.await;
1328 }
1329
1330 #[tokio::test]
1331 async fn test_proc_snapshot() {
1332 let Bootstrapped {
1333 server_handle,
1334 proc_actor_ref,
1335 comm_actor_ref,
1336 client,
1337 ..
1338 } = bootstrap().await;
1339
1340 let root: ActorRef<TestActor> = spawn::<TestActor>(&client, &proc_actor_ref, "root", &())
1342 .await
1343 .unwrap();
1344 let another_root = spawn::<TestActor>(&client, &proc_actor_ref, "another_root", &())
1345 .await
1346 .unwrap();
1347 {
1348 let snapshot = proc_actor_ref.snapshot(&client).await.unwrap();
1349 assert_eq!(snapshot.state, ProcState::Joined);
1350 assert_eq!(
1351 snapshot.actors.roots.keys().collect::<HashSet<_>>(),
1352 hashset! {
1353 proc_actor_ref.actor_id(),
1354 comm_actor_ref.actor_id(),
1355 root.actor_id(),
1356 another_root.actor_id(),
1357 }
1358 );
1359 }
1360
1361 server_handle.stop().await.unwrap();
1362 server_handle.await;
1363 }
1364
1365 #[tokio::test]
1366 async fn test_undeliverable_message_return() {
1367 use hyperactor::mailbox::Undeliverable;
1370 use hyperactor::test_utils::pingpong::PingPongActor;
1371 use hyperactor::test_utils::pingpong::PingPongMessage;
1372
1373 let config = hyperactor::config::global::lock();
1375 let _guard = config.override_key(
1376 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1377 Duration::from_secs(1),
1378 );
1379
1380 let server_handle = System::serve(
1382 ChannelAddr::any(ChannelTransport::Tcp),
1383 Duration::from_secs(120),
1384 Duration::from_secs(120),
1385 )
1386 .await
1387 .unwrap();
1388 let mut system = System::new(server_handle.local_addr().clone());
1389
1390 let sup_mail = system.attach().await.unwrap();
1392 let (sup_tx, _sup_rx) = sup_mail.open_port::<ProcSupervisionMessage>();
1393 sup_tx.bind_to(ProcSupervisionMessage::port());
1394 let sup_ref = ActorRef::<ProcSupervisor>::attest(sup_mail.actor_id().clone());
1395
1396 let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1398 channel::dial(server_handle.local_addr().clone()).unwrap(),
1399 ));
1400
1401 let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1403 let proc_forwarder =
1404 BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1405
1406 let world_id = id!(world);
1408 let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1409 let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1410 proc_0.clone(),
1411 world_id.clone(),
1412 listen_addr,
1413 server_handle.local_addr().clone(),
1414 sup_ref.clone(),
1415 Duration::from_secs(120),
1416 HashMap::new(),
1417 ProcLifecycleMode::ManagedBySystem,
1418 )
1419 .await
1420 .unwrap();
1421 let proc_0_client = proc_0.attach("client").unwrap();
1422 let (proc_0_undeliverable_tx, mut proc_0_undeliverable_rx) = proc_0_client.open_port();
1423
1424 let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1426 let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1427 proc_1.clone(),
1428 world_id.clone(),
1429 ChannelAddr::any(ChannelTransport::Tcp),
1430 server_handle.local_addr().clone(),
1431 sup_ref.clone(),
1432 Duration::from_secs(120),
1433 HashMap::new(),
1434 ProcLifecycleMode::ManagedBySystem,
1435 )
1436 .await
1437 .unwrap();
1438 let proc_1_client = proc_1.attach("client").unwrap();
1439 let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port();
1440
1441 let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1442 let ping_handle = proc_0
1446 .spawn::<PingPongActor>("ping", ping_params)
1447 .await
1448 .unwrap();
1449 let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1450 let pong_handle = proc_1
1451 .spawn::<PingPongActor>("pong", pong_params)
1452 .await
1453 .unwrap();
1454
1455 server_handle.stop().await.unwrap();
1458 server_handle.await;
1459
1460 let n = 100usize;
1461 for i in 1..(n + 1) {
1462 let ttl = 66 + i as u64; let (once_handle, _) = proc_0_client.open_once_port::<bool>();
1465 ping_handle
1466 .send(PingPongMessage(ttl, pong_handle.bind(), once_handle.bind()))
1467 .unwrap();
1468 }
1469
1470 assert!(matches!(*ping_handle.status().borrow(), ActorStatus::Idle));
1475
1476 let Ok(Undeliverable(envelope)) = proc_0_undeliverable_rx.recv().await else {
1478 unreachable!()
1479 };
1480 let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1481 let mut count = 1;
1482 while let Ok(Some(Undeliverable(envelope))) = proc_0_undeliverable_rx.try_recv() {
1483 count += 1;
1487 let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1488 }
1489 assert!(count == n);
1490 }
1491
1492 #[tracing_test::traced_test]
1493 #[tokio::test]
1494 async fn test_proc_actor_mailbox_admin_message() {
1495 use hyperactor::test_utils::pingpong::PingPongActor;
1500 use hyperactor::test_utils::pingpong::PingPongMessage;
1501
1502 let server_handle = System::serve(
1504 ChannelAddr::any(ChannelTransport::Tcp),
1505 Duration::from_secs(120),
1506 Duration::from_secs(120),
1507 )
1508 .await
1509 .unwrap();
1510 let mut system = System::new(server_handle.local_addr().clone());
1511 let system_actor = server_handle.system_actor_handle();
1512 let system_client = system.attach().await.unwrap(); let sup_mail = system.attach().await.unwrap();
1516 let (sup_tx, _sup_rx) = sup_mail.open_port::<ProcSupervisionMessage>();
1517 sup_tx.bind_to(ProcSupervisionMessage::port());
1518 let sup_ref = ActorRef::<ProcSupervisor>::attest(sup_mail.actor_id().clone());
1519
1520 let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1522 channel::dial(server_handle.local_addr().clone()).unwrap(),
1523 ));
1524
1525 let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1527 let proc_forwarder =
1528 BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1529
1530 let world_id = id!(world);
1532 let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1533 let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1534 proc_0.clone(),
1535 world_id.clone(),
1536 listen_addr,
1537 server_handle.local_addr().clone(),
1538 sup_ref.clone(),
1539 Duration::from_secs(120),
1540 HashMap::new(),
1541 ProcLifecycleMode::ManagedBySystem,
1542 )
1543 .await
1544 .unwrap();
1545 let proc_0_client = proc_0.attach("client").unwrap();
1546 let (proc_0_undeliverable_tx, _proc_0_undeliverable_rx) = proc_0_client.open_port();
1547
1548 let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1550 let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1551 proc_1.clone(),
1552 world_id.clone(),
1553 ChannelAddr::any(ChannelTransport::Tcp),
1554 server_handle.local_addr().clone(),
1555 sup_ref.clone(),
1556 Duration::from_secs(120),
1557 HashMap::new(),
1558 ProcLifecycleMode::ManagedBySystem,
1559 )
1560 .await
1561 .unwrap();
1562 let proc_1_client = proc_1.attach("client").unwrap();
1563 let (proc_1_undeliverable_tx, _proc_1_undeliverable_rx) = proc_1_client.open_port();
1564
1565 let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1569 let ping_handle = proc_0
1570 .spawn::<PingPongActor>("ping", ping_params)
1571 .await
1572 .unwrap();
1573 let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1574 let pong_handle = proc_1
1575 .spawn::<PingPongActor>("pong", pong_params)
1576 .await
1577 .unwrap();
1578
1579 let ttl = 10u64; let (once_tx, once_rx) = system_client.open_once_port::<bool>();
1582 ping_handle
1583 .send(PingPongMessage(ttl, pong_handle.bind(), once_tx.bind()))
1584 .unwrap();
1585
1586 assert!(once_rx.recv().await.unwrap());
1587
1588 let expected_1 = r#"UpdateAddress {
1590 proc_id: Ranked(
1591 WorldId(
1592 "world",
1593 ),
1594 1,
1595 ),
1596 addr: Tcp("#;
1597
1598 let expected_2 = r#"UpdateAddress {
1600 proc_id: Ranked(
1601 WorldId(
1602 "world",
1603 ),
1604 0,
1605 ),
1606 addr: Tcp("#;
1607
1608 let expected_3 = r#"UpdateAddress {
1610 proc_id: Ranked(
1611 WorldId(
1612 "user",
1613 ),"#;
1614
1615 logs_assert(|logs| {
1616 let log_body = logs.join("\n");
1617
1618 let pattern = Regex::new(r"(?m)^UpdateAddress \{\n(?:.*\n)*?^\}").unwrap();
1619 let count = pattern.find_iter(&log_body).count();
1620
1621 if count != 3 {
1622 return Err(format!(
1623 "expected 3 UpdateAddress messages, found {}",
1624 count
1625 ));
1626 }
1627
1628 if !log_body.contains(expected_1) {
1629 return Err("missing expected update for proc_id 1".into());
1630 }
1631 if !log_body.contains(expected_2) {
1632 return Err("missing expected update for proc_id 0".into());
1633 }
1634 if !log_body.contains(expected_3) {
1635 return Err("missing expected update for proc_id user".into());
1636 }
1637
1638 Ok(())
1639 });
1640
1641 let (once_tx, once_rx) = system_client.open_once_port::<()>();
1642 assert_matches!(
1643 system_actor
1644 .stop(
1645 &system_client,
1646 None,
1647 Duration::from_secs(1),
1648 once_tx.bind()
1649 )
1650 .await,
1651 Ok(())
1652 );
1653 assert_matches!(once_rx.recv().await.unwrap(), ());
1654 }
1655
1656 #[tokio::test]
1657 async fn test_update_address_book_cache() {
1658 let server_handle = System::serve(
1659 ChannelAddr::any(ChannelTransport::Tcp),
1660 Duration::from_secs(2), Duration::from_secs(2), )
1663 .await
1664 .unwrap();
1665 let system_addr = server_handle.local_addr().clone();
1666 let mut system = System::new(system_addr.clone());
1667
1668 let system_client = system.attach().await.unwrap();
1669
1670 let ping_actor_id = id!(world[0].ping[0]);
1672 let (ping_actor_ref, _ping_proc_ref) =
1673 spawn_actor(&ping_actor_id, &system_addr, system_client.clone()).await;
1674
1675 let pong_actor_id = id!(world[1].pong[0]);
1676 let (pong_actor_ref, pong_proc_ref) =
1677 spawn_actor(&pong_actor_id, &system_addr, system_client.clone()).await;
1678
1679 let (done_tx, done_rx) = system_client.open_once_port();
1682 let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1683 ping_actor_ref
1684 .send(&system_client, ping_pong_message)
1685 .unwrap();
1686 assert!(done_rx.recv().await.unwrap());
1687
1688 let ProcStopResult { actors_aborted, .. } = pong_proc_ref
1690 .stop(&system_client, Duration::from_secs(1))
1691 .await
1692 .unwrap();
1693 assert_eq!(1, actors_aborted);
1694 let (pong_actor_ref, _pong_proc_ref) =
1695 spawn_actor(&pong_actor_id, &system_addr, system_client.clone()).await;
1696
1697 let (done_tx, done_rx) = system_client.open_once_port();
1701 let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1702 ping_actor_ref
1703 .send(&system_client, ping_pong_message)
1704 .unwrap();
1705 assert!(done_rx.recv().await.unwrap());
1706 }
1707
1708 async fn spawn_actor(
1709 actor_id: &ActorId,
1710 system_addr: &ChannelAddr,
1711 system_client: Mailbox,
1712 ) -> (ActorRef<PingPongActor>, ActorRef<ProcActor>) {
1713 let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1714 let bootstrap = ProcActor::bootstrap(
1715 actor_id.proc_id().clone(),
1716 actor_id
1717 .proc_id()
1718 .world_id()
1719 .expect("proc must be ranked for bootstrap world_id")
1720 .clone(),
1721 listen_addr.clone(),
1722 system_addr.clone(),
1723 Duration::from_secs(3),
1724 HashMap::new(),
1725 ProcLifecycleMode::ManagedBySystem,
1726 )
1727 .await
1728 .unwrap();
1729 let (undeliverable_msg_tx, _) = system_client.open_port();
1730 let params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
1731 let actor_ref = spawn::<PingPongActor>(
1732 &system_client,
1733 &bootstrap.proc_actor.bind(),
1734 &actor_id.to_string(),
1735 ¶ms,
1736 )
1737 .await
1738 .unwrap();
1739 let proc_actor_ref = bootstrap.proc_actor.bind();
1740 (actor_ref, proc_actor_ref)
1741 }
1742}