hyperactor_multiprocess/
proc_actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Proc actor manages a proc. It works in conjunction with a
10//! [`super::system_actor::SystemActor`]. Proc actors are usually spawned
11//! as the "agent" to manage a proc directly.
12
13use core::fmt;
14use std::collections::HashMap;
15use std::process::Stdio;
16use std::time::Duration;
17use std::time::SystemTime;
18
19use async_trait::async_trait;
20use hyperactor::Actor;
21use hyperactor::Context;
22use hyperactor::Data;
23use hyperactor::HandleClient;
24use hyperactor::Handler;
25use hyperactor::Instance;
26use hyperactor::Named;
27use hyperactor::OncePortRef;
28use hyperactor::PortRef;
29use hyperactor::RefClient;
30use hyperactor::RemoteMessage;
31use hyperactor::WorldId;
32use hyperactor::actor::ActorHandle;
33use hyperactor::actor::RemoteActor;
34use hyperactor::actor::remote::Remote;
35use hyperactor::cap;
36use hyperactor::channel;
37use hyperactor::channel::ChannelAddr;
38use hyperactor::clock::Clock;
39use hyperactor::clock::ClockKind;
40use hyperactor::mailbox::BoxedMailboxSender;
41use hyperactor::mailbox::DialMailboxRouter;
42use hyperactor::mailbox::MailboxAdminMessage;
43use hyperactor::mailbox::MailboxAdminMessageHandler;
44use hyperactor::mailbox::MailboxClient;
45use hyperactor::mailbox::MailboxServer;
46use hyperactor::mailbox::MailboxServerHandle;
47use hyperactor::mailbox::open_port;
48use hyperactor::proc::ActorLedgerSnapshot;
49use hyperactor::proc::Proc;
50use hyperactor::reference::ActorId;
51use hyperactor::reference::ActorRef;
52use hyperactor::reference::Index;
53use hyperactor::reference::ProcId;
54use hyperactor::supervision::ActorSupervisionEvent;
55use hyperactor_mesh::comm::CommActor;
56use serde::Deserialize;
57use serde::Serialize;
58use tokio::process::Command;
59use tokio::sync::watch;
60use tokio_retry::strategy::jitter;
61
62use crate::pyspy::PySpyTrace;
63use crate::pyspy::py_spy;
64use crate::supervision::ProcStatus;
65use crate::supervision::ProcSupervisionMessageClient;
66use crate::supervision::ProcSupervisionState;
67use crate::supervision::ProcSupervisor;
68use crate::system_actor::ProcLifecycleMode;
69use crate::system_actor::SYSTEM_ACTOR_REF;
70use crate::system_actor::SystemActor;
71use crate::system_actor::SystemMessageClient;
72
73static HYPERACTOR_WORLD_ID: &str = "HYPERACTOR_WORLD_ID";
74static HYPERACTOR_PROC_ID: &str = "HYPERACTOR_PROC_ID";
75static HYPERACTOR_BOOTSTRAP_ADDR: &str = "HYPERACTOR_BOOTSTRAP_ADDR";
76static HYPERACTOR_WORLD_SIZE: &str = "HYPERACTOR_WORLD_SIZE";
77static HYPERACTOR_RANK: &str = "HYPERACTOR_RANK";
78static HYPERACTOR_LOCAL_RANK: &str = "HYPERACTOR_LOCAL_RANK";
79
80/// All setup parameters for an actor within a proc actor.
81#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
82pub enum Environment {
83    /// The actor is spawned within the proc actor.
84    Local,
85
86    /// Spawn the actor in a separate proc.
87    /// The program is the executable that will be spawned
88    /// by the host actor. The program will operate under 3
89    /// environment variables: ${HYPERACTOR_PROC_ID}: the proc id
90    /// of the proc actor being spawned,
91    /// ${HYPERACTOR_BOOTSTRAP_ADDR}: the address of the system actor and
92    /// ${HYPERACTOR_WORLD_SIZE}: the world size the proc is a part of.
93    /// This is often useful where a actor needs to know the side of the
94    /// world like worker using nccl comms.
95    Exec {
96        /// The program to run in order to spawn the actor.
97        program: String,
98    },
99}
100
101/// The state of the proc.
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
103pub enum ProcState {
104    /// The proc is waiting to the join the system.
105    AwaitingJoin,
106
107    /// The proc has joined the system.
108    Joined,
109}
110
111impl fmt::Display for ProcState {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        match self {
114            Self::AwaitingJoin => write!(f, "AwaitingJoin"),
115            Self::Joined => write!(f, "Joined"),
116        }
117    }
118}
119
120/// The result after stopping the proc.
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
122pub struct ProcStopResult {
123    /// The proc being stopped.
124    pub proc_id: ProcId,
125    /// The number of actors observed to stop.
126    pub actors_stopped: usize,
127    /// The number of proc actors that were aborted.
128    pub actors_aborted: usize,
129}
130
131/// A snapshot of the proc.
132#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
133pub struct ProcSnapshot {
134    /// The state of the proc.
135    pub state: ProcState,
136    /// The snapshot of the actors in the proc.
137    pub actors: ActorLedgerSnapshot,
138}
139
140/// Remote py-spy dump configuration.
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
142pub enum PySpyConfig {
143    /// Nonblocking dump for only python frames.
144    NonBlocking,
145    /// Blocking dump. Specifies whether native threads are included,
146    /// and if so, whether those threads should also include native stack frames.
147    Blocking {
148        /// Dump native stack frames.
149        native: Option<bool>,
150        /// Dump stack frames for native threads. Implies native.
151        native_all: Option<bool>,
152    },
153}
154
155/// A stack trace of the proc.
156/// Wrapper to dervice Named.
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
158pub struct StackTrace {
159    /// The stack trace.
160    pub trace: PySpyTrace,
161}
162
163/// Proc management messages.
164#[derive(
165    Handler,
166    HandleClient,
167    RefClient,
168    Debug,
169    Serialize,
170    Deserialize,
171    Clone,
172    PartialEq,
173    Named
174)]
175pub enum ProcMessage {
176    /// Indicate that the proc has joined the system. This is sent by
177    /// the system actor when the proc has been registered and is ready
178    /// to receive instructions.
179    #[log_level(debug)]
180    Joined(),
181
182    /// Retrieve the state of the proc.
183    #[log_level(debug)]
184    State {
185        /// Used to return the result of the caller.
186        #[reply]
187        ret: OncePortRef<ProcState>,
188    },
189
190    /// Spawn an actor on the proc to the provided name.
191    Spawn {
192        /// registered actor type
193        actor_type: String,
194        /// spawned actor name
195        actor_name: String,
196        /// serialized parameters
197        params_data: Data,
198        /// reply port; the proc should send its rank to indicate a spawned actor
199        status_port: PortRef<Index>,
200    },
201
202    /// Spawn a set of proc actors in the specified world (never its own)
203    SpawnProc {
204        /// Spawn the proc locally or in a separate program.
205        env: Environment,
206        /// The world into which to spawn the procs.
207        world_id: WorldId,
208        /// The proc ids of the procs to spawn.
209        proc_ids: Vec<ProcId>,
210        /// The total number of procs in the specified world.
211        world_size: usize,
212    },
213
214    /// Self message to trigger a supervision update to the system actor.
215    #[log_level(debug)]
216    UpdateSupervision(),
217
218    /// Stop the proc. Returns a pair of counts:
219    /// - the number of actors observed to stop;
220    /// - the number of proc actors not observed to stop.
221    ///
222    /// There will be at least one actor (the proc-actor itself) that
223    /// will be not observed to stop. If there is more than one
224    /// timeouts are indicated.
225    Stop {
226        /// The duration to wait for an actor to report status `Stopped`.
227        timeout: Duration,
228        /// Used to return the result to the caller.
229        #[reply]
230        reply_to: OncePortRef<ProcStopResult>,
231    },
232
233    /// Return a snapshot view of this proc. Used for debugging.
234    #[log_level(debug)]
235    Snapshot {
236        /// Used to return the result of the caller.
237        #[reply]
238        reply_to: OncePortRef<ProcSnapshot>,
239    },
240
241    /// Get the proc local addr.
242    LocalAddr {
243        /// Used to return the result of the caller.
244        #[reply]
245        reply_to: OncePortRef<ChannelAddr>,
246    },
247
248    /// Run pyspy on the current proc and return the stack trace.
249    #[log_level(debug)]
250    PySpyDump {
251        /// Dump config.
252        config: PySpyConfig,
253        /// Used to return the result of the caller.
254        #[reply]
255        reply_to: OncePortRef<StackTrace>,
256    },
257}
258
259/// Parameters for managing the proc.
260#[derive(Debug, Clone)]
261pub struct ProcActorParams {
262    /// The proc that is managed by this actor.
263    pub proc: Proc,
264
265    /// The system world to which this proc belongs.
266    pub world_id: WorldId,
267
268    /// Reference to the system actor that is managing this proc.
269    pub system_actor_ref: ActorRef<SystemActor>,
270
271    /// The channel address used to communicate with the system actor
272    /// that manages this proc. This is passed through so that the proc
273    /// can spawn sibling procs.
274    pub bootstrap_channel_addr: ChannelAddr,
275
276    /// The local address of this proc actor. This is used by
277    /// the proc actor to register the proc with the system actor.
278    pub local_addr: ChannelAddr,
279
280    /// Watch into which the proc's state is published.
281    pub state_watch: watch::Sender<ProcState>,
282
283    /// Reference to supervisor.
284    pub supervisor_actor_ref: ActorRef<ProcSupervisor>,
285
286    /// Interval of reporting supervision status to the system actor.
287    pub supervision_update_interval: Duration,
288
289    /// Arbitrary labels for the proc. They can be used later to query
290    /// proc(s) using system snapshot api.
291    pub labels: HashMap<String, String>,
292
293    /// Proc lifecycle management mode.
294    ///
295    /// If a proc is not managed
296    ///   * it will not be stopped when system shutdowns.
297    ///   * it will not be captured by system snapshot.
298    ///
299    /// Not being managed is useful for procs that runs on the client side,
300    /// which might need to stay around for a while after the system is gone.
301    pub lifecycle_mode: ProcLifecycleMode,
302}
303
304/// Outputs from bootstrapping proc.
305#[derive(Debug)]
306pub struct BootstrappedProc {
307    /// Handle to proc actor.
308    pub proc_actor: ActorHandle<ProcActor>,
309    /// Handle to comm actor.
310    pub comm_actor: ActorHandle<CommActor>,
311    /// Mailbox for address served by proc actor.
312    pub mailbox: MailboxServerHandle,
313}
314
315/// ProcActor manages a single proc. It is responsible for managing
316/// the lifecycle of all of the proc's actors, and to route messages
317/// accordingly.
318#[derive(Debug)]
319#[hyperactor::export(
320    handlers = [
321        ProcMessage,
322        MailboxAdminMessage,
323    ],
324)]
325pub struct ProcActor {
326    params: ProcActorParams,
327    state: ProcState,
328    remote: Remote,
329    last_successful_supervision_update: SystemTime,
330}
331
332impl ProcActor {
333    /// Bootstrap a proc actor with the provided proc id. The bootstrapped proc
334    /// actor will use the provided listen address to serve its mailbox, while
335    /// the bootstrap address is used to register with the system actor.
336    #[hyperactor::instrument]
337    pub async fn bootstrap(
338        proc_id: ProcId,
339        world_id: WorldId,
340        listen_addr: ChannelAddr,
341        bootstrap_addr: ChannelAddr,
342        supervision_update_interval: Duration,
343        labels: HashMap<String, String>,
344        lifecycle_mode: ProcLifecycleMode,
345    ) -> Result<BootstrappedProc, anyhow::Error> {
346        let system_supervision_ref: ActorRef<ProcSupervisor> =
347            ActorRef::attest(SYSTEM_ACTOR_REF.actor_id().clone());
348
349        Self::try_bootstrap(
350            proc_id.clone(),
351            world_id.clone(),
352            listen_addr.clone(),
353            bootstrap_addr.clone(),
354            system_supervision_ref.clone(),
355            supervision_update_interval,
356            labels,
357            lifecycle_mode,
358        )
359        .await
360        .inspect_err(|err| {
361            tracing::error!(
362                "bootstrap {} {} {} {}: {}",
363                proc_id,
364                world_id,
365                listen_addr,
366                bootstrap_addr,
367                err
368            );
369        })
370    }
371
372    /// Attempt to bootstrap a proc actor with the provided proc id. The bootstrapped proc
373    /// actor will use the provided listen address to serve its mailbox, while
374    /// the bootstrap address is used to register with the system actor.
375    #[hyperactor::instrument]
376    pub async fn try_bootstrap(
377        proc_id: ProcId,
378        world_id: WorldId,
379        listen_addr: ChannelAddr,
380        bootstrap_addr: ChannelAddr,
381        supervisor_actor_ref: ActorRef<ProcSupervisor>,
382        supervision_update_interval: Duration,
383        labels: HashMap<String, String>,
384        lifecycle_mode: ProcLifecycleMode,
385    ) -> Result<BootstrappedProc, anyhow::Error> {
386        let system_sender =
387            BoxedMailboxSender::new(MailboxClient::new(channel::dial(bootstrap_addr.clone())?));
388        let clock = ClockKind::for_channel_addr(&listen_addr);
389
390        let proc_forwarder =
391            BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
392        let proc = Proc::new_with_clock(proc_id.clone(), proc_forwarder, clock);
393        Self::bootstrap_for_proc(
394            proc,
395            world_id,
396            listen_addr,
397            bootstrap_addr,
398            supervisor_actor_ref,
399            supervision_update_interval,
400            labels,
401            lifecycle_mode,
402        )
403        .await
404    }
405
406    /// Bootstrap a proc actor with the provided proc. The bootstrapped proc actor
407    /// will use the provided listen address to serve its mailbox, while the bootstrap
408    /// address is used to register with the system actor.
409    #[hyperactor::instrument]
410    pub async fn bootstrap_for_proc(
411        proc: Proc,
412        world_id: WorldId,
413        listen_addr: ChannelAddr,
414        bootstrap_addr: ChannelAddr,
415        supervisor_actor_ref: ActorRef<ProcSupervisor>,
416        supervision_update_interval: Duration,
417        labels: HashMap<String, String>,
418        lifecycle_mode: ProcLifecycleMode,
419    ) -> Result<BootstrappedProc, anyhow::Error> {
420        let (local_addr, rx) = channel::serve(listen_addr).await?;
421        let mailbox_handle = proc.clone().serve(rx);
422        let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin);
423
424        let handle = match proc
425            .clone()
426            .spawn::<Self>(
427                "proc",
428                ProcActorParams {
429                    proc: proc.clone(),
430                    world_id: world_id.clone(),
431                    system_actor_ref: SYSTEM_ACTOR_REF.clone(),
432                    bootstrap_channel_addr: bootstrap_addr,
433                    local_addr,
434                    state_watch: state_tx,
435                    supervisor_actor_ref,
436                    supervision_update_interval,
437                    labels,
438                    lifecycle_mode,
439                },
440            )
441            .await
442        {
443            Ok(handle) => handle,
444            Err(e) => {
445                Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
446                return Err(e);
447            }
448        };
449
450        let comm_actor = match proc
451            .clone()
452            .spawn::<CommActor>("comm", Default::default())
453            .await
454        {
455            Ok(handle) => handle,
456            Err(e) => {
457                Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
458                return Err(e);
459            }
460        };
461        comm_actor.bind::<CommActor>();
462
463        loop {
464            let proc_state = state_rx.borrow_and_update().clone();
465            tracing::info!("{}: state: {:?}", &proc.proc_id(), proc_state);
466            if matches!(proc_state, ProcState::Joined) {
467                break;
468            }
469            match state_rx.changed().await {
470                Ok(_) => {}
471                Err(e) => {
472                    Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
473                    return Err(e.into());
474                }
475            }
476        }
477
478        proc.set_supervision_coordinator(handle.port::<ActorSupervisionEvent>())?;
479
480        Ok(BootstrappedProc {
481            proc_actor: handle,
482            mailbox: mailbox_handle,
483            comm_actor,
484        })
485    }
486
487    /// Shutdown the mailbox server to free up rx and its cooresponding listen address.
488    /// Because in the next bootstrap attempt, the same listen address will be used.
489    async fn failed_proc_bootstrap_cleanup(mailbox_handle: MailboxServerHandle) {
490        mailbox_handle.stop("failed proc bootstrap cleanup");
491        if let Err(shutdown_err) = mailbox_handle.await {
492            // Ignore the shutdown error and populate the original error.
493            tracing::error!(
494                "error shutting down during a failed bootstrap attempt: {}",
495                shutdown_err
496            );
497        }
498    }
499}
500
501#[async_trait]
502impl Actor for ProcActor {
503    type Params = ProcActorParams;
504
505    async fn new(params: ProcActorParams) -> Result<Self, anyhow::Error> {
506        let last_successful_supervision_update = params.proc.clock().system_time_now();
507        Ok(Self {
508            params,
509            state: ProcState::AwaitingJoin,
510            remote: Remote::collect(),
511            last_successful_supervision_update,
512        })
513    }
514
515    async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
516        // Bind ports early so that when the proc actor joins, it can serve.
517        this.bind::<Self>();
518
519        // Join the system.
520        self.params
521            .system_actor_ref
522            .join(
523                this,
524                /*world_id=*/ self.params.world_id.clone(),
525                /*proc_id=*/ self.params.proc.proc_id().clone(),
526                /*proc_message_port=*/ this.port().bind(),
527                /*proc_addr=*/ self.params.local_addr.clone(),
528                self.params.labels.clone(),
529                self.params.lifecycle_mode.clone(),
530            )
531            .await?;
532
533        // Trigger supervision status update
534        // TODO: let the system actor determine/update the supervision interval.
535        // Maybe by returning it from the join call, or defining some other proc
536        // message to adjust it.
537        if self.params.supervision_update_interval > Duration::from_secs(0)
538            && self.params.lifecycle_mode == ProcLifecycleMode::ManagedBySystem
539        {
540            this.self_message_with_delay(
541                ProcMessage::UpdateSupervision(),
542                self.params.supervision_update_interval,
543            )?;
544        }
545
546        Ok(())
547    }
548}
549
550impl ProcActor {
551    /// This proc's rank in the world.
552    fn rank(&self) -> Index {
553        self.params
554            .proc
555            .proc_id()
556            .rank()
557            .expect("proc must be ranked")
558    }
559}
560
561#[hyperactor::forward(MailboxAdminMessage)]
562#[async_trait]
563impl MailboxAdminMessageHandler for ProcActor {
564    async fn update_address(
565        &mut self,
566        cx: &Context<Self>,
567        proc_id: ProcId,
568        addr: ChannelAddr,
569    ) -> Result<(), anyhow::Error> {
570        tracing::trace!(
571            "received address update:\n{:#?}",
572            MailboxAdminMessage::UpdateAddress {
573                proc_id: proc_id.clone(),
574                addr: addr.clone()
575            }
576        );
577        let forwarder = cx.proc().forwarder();
578        if let Some(router) = forwarder.downcast_ref::<DialMailboxRouter>() {
579            router.bind(proc_id.into(), addr);
580        } else {
581            tracing::warn!(
582                "proc {} received update_address but does not use a DialMailboxRouter",
583                cx.proc().proc_id()
584            );
585        }
586
587        Ok(())
588    }
589}
590
591#[async_trait]
592#[hyperactor::forward(ProcMessage)]
593impl ProcMessageHandler for ProcActor {
594    async fn joined(&mut self, _cx: &Context<Self>) -> Result<(), anyhow::Error> {
595        self.state = ProcState::Joined;
596        let _ = self.params.state_watch.send(self.state.clone());
597        Ok(())
598    }
599
600    async fn state(&mut self, _cx: &Context<Self>) -> Result<ProcState, anyhow::Error> {
601        Ok(self.state.clone())
602    }
603
604    async fn spawn(
605        &mut self,
606        cx: &Context<Self>,
607        actor_type: String,
608        actor_name: String,
609        params_data: Data,
610        status_port: PortRef<Index>,
611    ) -> Result<(), anyhow::Error> {
612        let _actor_id = self
613            .remote
614            .gspawn(&self.params.proc, &actor_type, &actor_name, params_data)
615            .await?;
616
617        // Signal that the actor has joined:
618        status_port.send(cx, self.rank())?;
619        Ok(())
620    }
621
622    async fn spawn_proc(
623        &mut self,
624        _cx: &Context<Self>,
625        env: Environment,
626        world_id: WorldId,
627        proc_ids: Vec<ProcId>,
628        world_size: usize,
629    ) -> Result<(), anyhow::Error> {
630        for (index, proc_id) in proc_ids.into_iter().enumerate() {
631            let proc_world_id = proc_id
632                .world_id()
633                .expect("proc must be ranked for world_id access")
634                .clone();
635            // Check world id isn't the same as this proc's world id.
636            if &proc_world_id
637                == self
638                    .params
639                    .proc
640                    .proc_id()
641                    .world_id()
642                    .expect("proc must be ranked for world_id access")
643                || &world_id
644                    == self
645                        .params
646                        .proc
647                        .proc_id()
648                        .world_id()
649                        .expect("proc must be ranked for world_id access")
650            {
651                return Err(anyhow::anyhow!(
652                    "cannot spawn proc in same world {}",
653                    proc_world_id
654                ));
655            }
656            match env {
657                Environment::Local => {
658                    ProcActor::bootstrap(
659                        proc_id,
660                        world_id.clone(),
661                        ChannelAddr::any(self.params.bootstrap_channel_addr.transport()),
662                        self.params.bootstrap_channel_addr.clone(),
663                        self.params.supervision_update_interval,
664                        HashMap::new(),
665                        ProcLifecycleMode::ManagedBySystem,
666                    )
667                    .await?;
668                }
669                Environment::Exec { ref program } => {
670                    tracing::info!("spawning proc {} with program {}", proc_id, program);
671                    let mut child = Command::new(program);
672                    let _ = child
673                        .env(HYPERACTOR_WORLD_ID, world_id.to_string())
674                        .env(HYPERACTOR_PROC_ID, proc_id.to_string())
675                        .env(
676                            HYPERACTOR_BOOTSTRAP_ADDR,
677                            self.params.bootstrap_channel_addr.to_string(),
678                        )
679                        .env(HYPERACTOR_WORLD_SIZE, world_size.to_string())
680                        .env(
681                            HYPERACTOR_RANK,
682                            proc_id
683                                .rank()
684                                .expect("proc must be ranked for rank env var")
685                                .to_string(),
686                        )
687                        .env(HYPERACTOR_LOCAL_RANK, index.to_string())
688                        .stdin(Stdio::null())
689                        .stdout(Stdio::inherit())
690                        .stderr(Stdio::inherit())
691                        .spawn()?;
692                }
693            }
694        }
695        Ok(())
696    }
697
698    async fn update_supervision(&mut self, cx: &Context<Self>) -> Result<(), anyhow::Error> {
699        // Delay for next supervision update with some jitter.
700        let delay = jitter(self.params.supervision_update_interval);
701
702        // Only start updating supervision after the proc is joined.
703        if self.state != ProcState::Joined {
704            cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
705            return Ok(());
706        }
707
708        let msg = ProcSupervisionState {
709            world_id: self.params.world_id.clone(),
710            proc_id: self.params.proc.proc_id().clone(),
711            proc_addr: self.params.local_addr.clone(),
712            proc_health: ProcStatus::Alive,
713            failed_actors: Vec::new(),
714        };
715
716        match cx
717            .clock()
718            .timeout(
719                // TODO: make the timeout configurable
720                Duration::from_secs(10),
721                self.params.supervisor_actor_ref.update(cx, msg),
722            )
723            .await
724        {
725            Ok(_) => {
726                self.last_successful_supervision_update = cx.clock().system_time_now();
727            }
728            Err(_) => {}
729        }
730
731        let supervision_staleness = self
732            .last_successful_supervision_update
733            .elapsed()
734            .unwrap_or_default();
735        // Timeout when there are 3 consecutive supervision updates that fail.
736        // TODO: make number of failed updates configurable.
737        if supervision_staleness > 5 * self.params.supervision_update_interval {
738            tracing::error!(
739                "system actor isn't responsive to supervision update, stopping the proc"
740            );
741            // System actor is not responsive to supervision update, it is likely dead. Stop this proc.
742            // TODO: make the timeout configurable
743            self.stop(cx, Duration::from_secs(5)).await?;
744        } else {
745            // Schedule the next supervision update with some jitter.
746            let delay = jitter(self.params.supervision_update_interval);
747            cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
748        }
749
750        Ok(())
751    }
752
753    async fn stop(
754        &mut self,
755        cx: &Context<Self>,
756        timeout: Duration,
757    ) -> Result<ProcStopResult, anyhow::Error> {
758        tracing::info!("stopping proc {}", self.params.proc.proc_id());
759        self.params
760            .proc
761            .destroy_and_wait(timeout, Some(cx.self_id()))
762            .await
763            .map(|(stopped, aborted)| {
764                tracing::info!("stopped proc {}", self.params.proc.proc_id());
765                ProcStopResult {
766                    proc_id: self.params.proc.proc_id().clone(),
767                    actors_stopped: stopped.len(),
768                    actors_aborted: aborted.len(),
769                }
770            })
771    }
772
773    async fn snapshot(&mut self, _cx: &Context<Self>) -> Result<ProcSnapshot, anyhow::Error> {
774        let state = self.state.clone();
775        let actors = self.params.proc.ledger_snapshot();
776        Ok(ProcSnapshot { state, actors })
777    }
778
779    async fn local_addr(&mut self, _cx: &Context<Self>) -> Result<ChannelAddr, anyhow::Error> {
780        Ok(self.params.local_addr.clone())
781    }
782
783    async fn py_spy_dump(
784        &mut self,
785        _cx: &Context<Self>,
786        config: PySpyConfig,
787    ) -> Result<StackTrace, anyhow::Error> {
788        let pid = std::process::id() as i32;
789        tracing::info!(
790            "running py-spy on proc {}, process id: {}",
791            self.params.proc.proc_id(),
792            pid
793        );
794        let trace = match config {
795            PySpyConfig::Blocking { native, native_all } => {
796                py_spy(
797                    pid,
798                    native.unwrap_or_default(),
799                    native_all.unwrap_or_default(),
800                    true,
801                )
802                .await?
803            }
804            PySpyConfig::NonBlocking => py_spy(pid, false, false, false).await?,
805        };
806        Ok(StackTrace { trace })
807    }
808}
809
810#[async_trait]
811impl Handler<ActorSupervisionEvent> for ProcActor {
812    async fn handle(
813        &mut self,
814        cx: &Context<Self>,
815        event: ActorSupervisionEvent,
816    ) -> anyhow::Result<()> {
817        let status = event.status();
818        let message = ProcSupervisionState {
819            world_id: self.params.world_id.clone(),
820            proc_id: self.params.proc.proc_id().clone(),
821            proc_addr: self.params.local_addr.clone(),
822            proc_health: ProcStatus::Alive,
823            failed_actors: Vec::from([(event.actor_id, status)]),
824        };
825        self.params.supervisor_actor_ref.update(cx, message).await?;
826        Ok(())
827    }
828}
829
830/// Convenience utility to spawn an actor on a proc. Spawn returns
831/// with the new ActorRef on success.
832pub async fn spawn<A: Actor + RemoteActor>(
833    caps: &(impl cap::CanSend + cap::CanOpenPort),
834    proc_actor: &ActorRef<ProcActor>,
835    actor_name: &str,
836    params: &A::Params,
837) -> Result<ActorRef<A>, anyhow::Error>
838where
839    A::Params: RemoteMessage,
840{
841    let remote = Remote::collect();
842    let (spawned_port, mut spawned_receiver) = open_port(caps);
843    let ActorId(proc_id, _, _) = (*proc_actor).clone().into();
844
845    proc_actor
846        .spawn(
847            caps,
848            remote
849                .name_of::<A>()
850                .ok_or(anyhow::anyhow!("actor not registered"))?
851                .into(),
852            actor_name.into(),
853            bincode::serialize(params)?,
854            spawned_port.bind(),
855        )
856        .await?;
857
858    // Wait for the spawned actor to join.
859    while spawned_receiver.recv().await?
860        != proc_id
861            .rank()
862            .expect("proc must be ranked for rank comparison")
863    {}
864
865    // Gspawned actors are always exported.
866    Ok(ActorRef::attest(proc_id.actor_id(actor_name, 0)))
867}
868
869#[cfg(test)]
870mod tests {
871    use std::assert_matches::assert_matches;
872    use std::collections::HashSet;
873    use std::time::Duration;
874
875    use hyperactor::actor::ActorStatus;
876    use hyperactor::channel;
877    use hyperactor::channel::ChannelAddr;
878    use hyperactor::channel::ChannelTransport;
879    use hyperactor::clock::Clock;
880    use hyperactor::clock::RealClock;
881    use hyperactor::data::Named;
882    use hyperactor::forward;
883    use hyperactor::id;
884    use hyperactor::mailbox::Mailbox;
885    use hyperactor::reference::ActorRef;
886    use hyperactor::test_utils::pingpong::PingPongActor;
887    use hyperactor::test_utils::pingpong::PingPongActorParams;
888    use hyperactor::test_utils::pingpong::PingPongMessage;
889    use maplit::hashset;
890    use rand::Rng;
891    use rand::distributions::Alphanumeric;
892    use regex::Regex;
893
894    use super::*;
895    use crate::supervision::ProcSupervisionMessage;
896    use crate::system::ServerHandle;
897    use crate::system::System;
898
899    const MAX_WAIT_TIME: Duration = Duration::new(10, 0);
900
901    struct Bootstrapped {
902        server_handle: ServerHandle,
903        proc_actor_ref: ActorRef<ProcActor>,
904        comm_actor_ref: ActorRef<CommActor>,
905        client: Mailbox,
906    }
907
908    async fn bootstrap() -> Bootstrapped {
909        let server_handle = System::serve(
910            ChannelAddr::any(ChannelTransport::Local),
911            Duration::from_secs(10),
912            Duration::from_secs(10),
913        )
914        .await
915        .unwrap();
916
917        let world_id = id!(world);
918        let proc_id = world_id.proc_id(0);
919        let bootstrap = ProcActor::bootstrap(
920            proc_id,
921            world_id,
922            ChannelAddr::any(ChannelTransport::Local),
923            server_handle.local_addr().clone(),
924            Duration::from_secs(1),
925            HashMap::new(),
926            ProcLifecycleMode::ManagedBySystem,
927        )
928        .await
929        .unwrap();
930
931        // Now join the system and talk to the proc actor.
932
933        let mut system = System::new(server_handle.local_addr().clone());
934        let client = system.attach().await.unwrap();
935
936        // This is really not cool. We should manage state subscriptions instead.
937        let start = RealClock.now();
938        let mut proc_state;
939        loop {
940            proc_state = bootstrap.proc_actor.state(&client).await.unwrap();
941
942            if matches!(proc_state, ProcState::Joined) || start.elapsed() >= MAX_WAIT_TIME {
943                break;
944            }
945        }
946        assert_matches!(proc_state, ProcState::Joined);
947
948        Bootstrapped {
949            server_handle,
950            proc_actor_ref: bootstrap.proc_actor.bind(),
951            comm_actor_ref: bootstrap.comm_actor.bind(),
952            client,
953        }
954    }
955
956    #[tokio::test]
957    async fn test_bootstrap() {
958        let Bootstrapped { server_handle, .. } = bootstrap().await;
959
960        println!("bootrapped, now waiting");
961
962        server_handle.stop().await.unwrap();
963        server_handle.await;
964    }
965
966    #[derive(Debug, Default, Actor)]
967    #[hyperactor::export(
968        spawn = true,
969        handlers = [
970            TestActorMessage,
971        ],
972    )]
973    struct TestActor;
974
975    #[derive(Handler, HandleClient, RefClient, Serialize, Deserialize, Debug, Named)]
976    enum TestActorMessage {
977        Increment(u64, #[reply] OncePortRef<u64>),
978        Fail(String),
979    }
980
981    #[async_trait]
982    #[forward(TestActorMessage)]
983    impl TestActorMessageHandler for TestActor {
984        async fn increment(&mut self, _cx: &Context<Self>, num: u64) -> Result<u64, anyhow::Error> {
985            Ok(num + 1)
986        }
987
988        async fn fail(&mut self, _cx: &Context<Self>, err: String) -> Result<(), anyhow::Error> {
989            Err(anyhow::anyhow!(err))
990        }
991    }
992
993    #[tokio::test]
994    async fn test_stop() {
995        // Show here that the proc actors are stopped when the proc
996        // actor receives a `Stop()` message.
997        let Bootstrapped {
998            server_handle,
999            proc_actor_ref,
1000            client,
1001            ..
1002        } = bootstrap().await;
1003
1004        const NUM_ACTORS: usize = 4usize;
1005        for i in 0..NUM_ACTORS {
1006            spawn::<TestActor>(&client, &proc_actor_ref, format!("test{i}").as_str(), &())
1007                .await
1008                .unwrap();
1009        }
1010
1011        let ProcStopResult {
1012            proc_id: _,
1013            actors_stopped,
1014            actors_aborted,
1015        } = proc_actor_ref
1016            .stop(&client, Duration::from_secs(1))
1017            .await
1018            .unwrap();
1019        assert_eq!(NUM_ACTORS + 1, actors_stopped);
1020        assert_eq!(1, actors_aborted);
1021
1022        server_handle.stop().await.unwrap();
1023        server_handle.await;
1024    }
1025
1026    // Sleep
1027    #[derive(Debug, Default, Actor)]
1028    #[hyperactor::export(
1029        spawn = true,
1030        handlers = [
1031            u64,
1032        ],
1033    )]
1034    struct SleepActor {}
1035
1036    #[async_trait]
1037    impl Handler<u64> for SleepActor {
1038        async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> anyhow::Result<()> {
1039            let duration = message;
1040            RealClock.sleep(Duration::from_secs(duration)).await;
1041            Ok(())
1042        }
1043    }
1044
1045    #[tracing_test::traced_test]
1046    #[tokio::test]
1047    async fn test_stop_timeout() {
1048        let Bootstrapped {
1049            server_handle,
1050            proc_actor_ref,
1051            client,
1052            ..
1053        } = bootstrap().await;
1054
1055        const NUM_ACTORS: usize = 4usize;
1056        for i in 0..NUM_ACTORS {
1057            let sleep_secs = 5u64;
1058            let sleeper = spawn::<SleepActor>(
1059                &client,
1060                &proc_actor_ref,
1061                format!("sleeper{i}").as_str(),
1062                &(),
1063            )
1064            .await
1065            .unwrap();
1066            if i > 0 {
1067                sleeper.send(&client, sleep_secs).unwrap();
1068            }
1069        }
1070
1071        let ProcStopResult {
1072            proc_id: _,
1073            actors_stopped,
1074            actors_aborted,
1075        } = proc_actor_ref
1076            .stop(&client, Duration::from_secs(1))
1077            .await
1078            .unwrap();
1079        assert_eq!(2, actors_stopped);
1080        assert_eq!((NUM_ACTORS - 1) + 1, actors_aborted);
1081
1082        assert!(tracing_test::internal::logs_with_scope_contain(
1083            "hyperactor::proc",
1084            "world[0].proc[0]: aborting JoinHandle"
1085        ));
1086        for i in 1..3 {
1087            assert!(tracing_test::internal::logs_with_scope_contain(
1088                "hyperactor::proc",
1089                format!("world[0].sleeper{}[0]: aborting JoinHandle", i).as_str()
1090            ));
1091        }
1092        logs_assert(|logs| {
1093            let count = logs
1094                .iter()
1095                .filter(|log| log.contains("aborting JoinHandle"))
1096                .count();
1097            if count == actors_aborted {
1098                Ok(())
1099            } else {
1100                Err("task abort counting error".to_string())
1101            }
1102        });
1103
1104        server_handle.stop().await.unwrap();
1105        server_handle.await;
1106    }
1107
1108    #[tokio::test]
1109    async fn test_spawn() {
1110        let Bootstrapped {
1111            server_handle,
1112            proc_actor_ref,
1113            client,
1114            ..
1115        } = bootstrap().await;
1116
1117        let test_actor_ref = spawn::<TestActor>(&client, &proc_actor_ref, "test", &())
1118            .await
1119            .unwrap();
1120
1121        let result = test_actor_ref.increment(&client, 1).await.unwrap();
1122        assert_eq!(result, 2);
1123
1124        server_handle.stop().await.unwrap();
1125        server_handle.await;
1126    }
1127
1128    #[cfg(target_os = "linux")]
1129    fn random_abstract_addr() -> ChannelAddr {
1130        let random_string = rand::thread_rng()
1131            .sample_iter(&Alphanumeric)
1132            .take(24)
1133            .map(char::from)
1134            .collect::<String>();
1135        format!("unix!@{random_string}").parse().unwrap()
1136    }
1137
1138    #[cfg(target_os = "linux")] // remove after making abstract unix sockets store-and-forward
1139    #[tokio::test]
1140    async fn test_bootstrap_retry() {
1141        if std::env::var("CARGO_TEST").is_ok() {
1142            eprintln!("test skipped under cargo as it causes other tests to fail when run");
1143            return;
1144        }
1145
1146        // Spawn the proc before the server is up. This is imperfect
1147        // as we rely on sleeping. Ideally we'd make sure the proc performs
1148        // at least one try before we start the server.
1149        let bootstrap_addr = random_abstract_addr();
1150
1151        let bootstrap_addr_clone = bootstrap_addr.clone();
1152        let handle = tokio::spawn(async move {
1153            let world_id = id!(world);
1154            let proc_id = world_id.proc_id(0);
1155            let bootstrap = ProcActor::bootstrap(
1156                proc_id,
1157                world_id,
1158                random_abstract_addr(),
1159                bootstrap_addr_clone,
1160                Duration::from_secs(1),
1161                HashMap::new(),
1162                ProcLifecycleMode::ManagedBySystem,
1163            )
1164            .await
1165            .unwrap();
1166
1167            // Proc actor should still be running.
1168            let mut status = bootstrap.proc_actor.status();
1169            assert_eq!(*status.borrow_and_update(), ActorStatus::Idle);
1170        });
1171
1172        // Sleep for enough time, the ProcActor supervision shouldn't timed out causing ProcActor to stop.
1173        // When System actor is brought up later, it should finish properly.
1174        RealClock.sleep(Duration::from_secs(5)).await;
1175
1176        let _server_handle = System::serve(
1177            bootstrap_addr,
1178            Duration::from_secs(10),
1179            Duration::from_secs(10),
1180        )
1181        .await
1182        .unwrap();
1183
1184        // Task completed successfully, so it connected correctly.
1185        handle.await.unwrap();
1186    }
1187
1188    #[tokio::test]
1189    async fn test_supervision_message_handling() {
1190        if std::env::var("CARGO_TEST").is_ok() {
1191            eprintln!("test skipped under cargo as it fails when run with others");
1192            return;
1193        }
1194
1195        let server_handle = System::serve(
1196            ChannelAddr::any(ChannelTransport::Local),
1197            Duration::from_secs(3600),
1198            Duration::from_secs(3600),
1199        )
1200        .await
1201        .unwrap();
1202
1203        // A test supervisor.
1204        let mut system = System::new(server_handle.local_addr().clone());
1205        let supervisor_mailbox = system.attach().await.unwrap();
1206        let (supervisor_supervision_tx, mut supervisor_supervision_receiver) =
1207            supervisor_mailbox.open_port::<ProcSupervisionMessage>();
1208        supervisor_supervision_tx.bind_to(ProcSupervisionMessage::port());
1209        let supervisor_actor_ref: ActorRef<ProcSupervisor> =
1210            ActorRef::attest(supervisor_mailbox.actor_id().clone());
1211
1212        // Start the proc actor
1213        let local_world_id = hyperactor::id!(test_proc);
1214        let local_proc_id = local_world_id.proc_id(0);
1215        let bootstrap = ProcActor::try_bootstrap(
1216            local_proc_id.clone(),
1217            local_world_id.clone(),
1218            ChannelAddr::any(ChannelTransport::Local),
1219            server_handle.local_addr().clone(),
1220            supervisor_actor_ref.clone(),
1221            Duration::from_secs(1),
1222            HashMap::new(),
1223            ProcLifecycleMode::ManagedBySystem,
1224        )
1225        .await
1226        .unwrap();
1227
1228        // Should receive supervision message sent from the periodic task
1229        // indicating the proc is alive.
1230        let msg = supervisor_supervision_receiver.recv().await;
1231        match msg.unwrap() {
1232            ProcSupervisionMessage::Update(state, port) => {
1233                assert_eq!(
1234                    state,
1235                    ProcSupervisionState {
1236                        world_id: local_world_id.clone(),
1237                        proc_addr: ChannelAddr::Local(3),
1238                        proc_id: local_proc_id.clone(),
1239                        proc_health: ProcStatus::Alive,
1240                        failed_actors: Vec::new(),
1241                    }
1242                );
1243                let _ = port.send(&supervisor_mailbox, ());
1244            }
1245        }
1246
1247        // Spawn a root actor on the proc.
1248        let proc_actor_ref = bootstrap.proc_actor.bind();
1249        let test_actor_ref = spawn::<TestActor>(&supervisor_mailbox, &proc_actor_ref, "test", &())
1250            .await
1251            .unwrap();
1252
1253        test_actor_ref
1254            .fail(
1255                &supervisor_mailbox,
1256                "test actor is erroring out".to_string(),
1257            )
1258            .await
1259            .unwrap();
1260        // Since we could get messages from both the periodic task and the
1261        // report from the failed actor, we need to poll for a while to make
1262        // sure we get the right message.
1263        let result = RealClock
1264            .timeout(Duration::from_secs(5), async {
1265                loop {
1266                    match supervisor_supervision_receiver.recv().await {
1267                        Ok(ProcSupervisionMessage::Update(state, _port)) => {
1268                            match state.failed_actors.iter().find(|(failed_id, _)| {
1269                                failed_id == test_actor_ref.clone().actor_id()
1270                            }) {
1271                                Some((_, actor_status)) => return Ok(actor_status.clone()),
1272                                None => {}
1273                            }
1274                        }
1275                        _ => anyhow::bail!("unexpected message type"),
1276                    }
1277                }
1278            })
1279            .await;
1280        assert_matches!(
1281            result.unwrap().unwrap(),
1282            ActorStatus::Failed(msg) if msg.contains("test actor is erroring out")
1283        );
1284
1285        server_handle.stop().await.unwrap();
1286        server_handle.await;
1287    }
1288
1289    // Verify that the proc actor's ProcMessage port is bound properly so
1290    // that we can send messages to it through the system actor.
1291    #[tokio::test]
1292    async fn test_bind_proc_actor_in_bootstrap() {
1293        let server_handle = System::serve(
1294            ChannelAddr::any(ChannelTransport::Local),
1295            Duration::from_secs(10),
1296            Duration::from_secs(10),
1297        )
1298        .await
1299        .unwrap();
1300        let mut system = System::new(server_handle.local_addr().clone());
1301        let client = system.attach().await.unwrap();
1302
1303        let world_id = id!(world);
1304        let proc_id = world_id.proc_id(0);
1305        let bootstrap = ProcActor::bootstrap(
1306            proc_id,
1307            world_id,
1308            ChannelAddr::any(ChannelTransport::Local),
1309            server_handle.local_addr().clone(),
1310            Duration::from_secs(1),
1311            HashMap::new(),
1312            ProcLifecycleMode::ManagedBySystem,
1313        )
1314        .await
1315        .unwrap();
1316        let proc_actor_id = bootstrap.proc_actor.actor_id().clone();
1317        let proc_actor_ref = ActorRef::<ProcActor>::attest(proc_actor_id);
1318
1319        let res = RealClock
1320            .timeout(Duration::from_secs(5), proc_actor_ref.state(&client))
1321            .await;
1322        // If ProcMessage's static Named port is not bound, this test will fail
1323        // due to timeout.
1324        assert!(res.is_ok());
1325        assert_matches!(res.unwrap().unwrap(), ProcState::Joined);
1326        server_handle.stop().await.unwrap();
1327        server_handle.await;
1328    }
1329
1330    #[tokio::test]
1331    async fn test_proc_snapshot() {
1332        let Bootstrapped {
1333            server_handle,
1334            proc_actor_ref,
1335            comm_actor_ref,
1336            client,
1337            ..
1338        } = bootstrap().await;
1339
1340        // Spawn some actors on this proc.
1341        let root: ActorRef<TestActor> = spawn::<TestActor>(&client, &proc_actor_ref, "root", &())
1342            .await
1343            .unwrap();
1344        let another_root = spawn::<TestActor>(&client, &proc_actor_ref, "another_root", &())
1345            .await
1346            .unwrap();
1347        {
1348            let snapshot = proc_actor_ref.snapshot(&client).await.unwrap();
1349            assert_eq!(snapshot.state, ProcState::Joined);
1350            assert_eq!(
1351                snapshot.actors.roots.keys().collect::<HashSet<_>>(),
1352                hashset! {
1353                    proc_actor_ref.actor_id(),
1354                    comm_actor_ref.actor_id(),
1355                    root.actor_id(),
1356                    another_root.actor_id(),
1357                }
1358            );
1359        }
1360
1361        server_handle.stop().await.unwrap();
1362        server_handle.await;
1363    }
1364
1365    #[tokio::test]
1366    async fn test_undeliverable_message_return() {
1367        // Proc can't send a message to a remote actor because the
1368        // system connection is lost.
1369        use hyperactor::mailbox::Undeliverable;
1370        use hyperactor::test_utils::pingpong::PingPongActor;
1371        use hyperactor::test_utils::pingpong::PingPongMessage;
1372
1373        // Use temporary config for this test
1374        let config = hyperactor::config::global::lock();
1375        let _guard = config.override_key(
1376            hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1377            Duration::from_secs(1),
1378        );
1379
1380        // Serve a system.
1381        let server_handle = System::serve(
1382            ChannelAddr::any(ChannelTransport::Tcp),
1383            Duration::from_secs(120),
1384            Duration::from_secs(120),
1385        )
1386        .await
1387        .unwrap();
1388        let mut system = System::new(server_handle.local_addr().clone());
1389
1390        // Build a supervisor.
1391        let sup_mail = system.attach().await.unwrap();
1392        let (sup_tx, _sup_rx) = sup_mail.open_port::<ProcSupervisionMessage>();
1393        sup_tx.bind_to(ProcSupervisionMessage::port());
1394        let sup_ref = ActorRef::<ProcSupervisor>::attest(sup_mail.actor_id().clone());
1395
1396        // Construct a system sender.
1397        let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1398            channel::dial(server_handle.local_addr().clone()).unwrap(),
1399        ));
1400
1401        // Construct a proc forwarder in terms of the system sender.
1402        let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1403        let proc_forwarder =
1404            BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1405
1406        // Bootstrap proc 'world[0]', join the system.
1407        let world_id = id!(world);
1408        let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1409        let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1410            proc_0.clone(),
1411            world_id.clone(),
1412            listen_addr,
1413            server_handle.local_addr().clone(),
1414            sup_ref.clone(),
1415            Duration::from_secs(120),
1416            HashMap::new(),
1417            ProcLifecycleMode::ManagedBySystem,
1418        )
1419        .await
1420        .unwrap();
1421        let proc_0_client = proc_0.attach("client").unwrap();
1422        let (proc_0_undeliverable_tx, mut proc_0_undeliverable_rx) = proc_0_client.open_port();
1423
1424        // Bootstrap a second proc 'world[1]', join the system.
1425        let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1426        let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1427            proc_1.clone(),
1428            world_id.clone(),
1429            ChannelAddr::any(ChannelTransport::Tcp),
1430            server_handle.local_addr().clone(),
1431            sup_ref.clone(),
1432            Duration::from_secs(120),
1433            HashMap::new(),
1434            ProcLifecycleMode::ManagedBySystem,
1435        )
1436        .await
1437        .unwrap();
1438        let proc_1_client = proc_1.attach("client").unwrap();
1439        let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port();
1440
1441        let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1442        // Spawn two actors 'ping' and 'pong' where 'ping' runs on
1443        // 'world[0]' and 'pong' on 'world[1]' (that is, not on the
1444        // same proc).
1445        let ping_handle = proc_0
1446            .spawn::<PingPongActor>("ping", ping_params)
1447            .await
1448            .unwrap();
1449        let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1450        let pong_handle = proc_1
1451            .spawn::<PingPongActor>("pong", pong_params)
1452            .await
1453            .unwrap();
1454
1455        // Now kill the system server making message delivery between
1456        // procs impossible.
1457        server_handle.stop().await.unwrap();
1458        server_handle.await;
1459
1460        let n = 100usize;
1461        for i in 1..(n + 1) {
1462            // Have 'ping' send 'pong' a message.
1463            let ttl = 66 + i as u64; // Avoid ttl = 66!
1464            let (once_handle, _) = proc_0_client.open_once_port::<bool>();
1465            ping_handle
1466                .send(PingPongMessage(ttl, pong_handle.bind(), once_handle.bind()))
1467                .unwrap();
1468        }
1469
1470        // `PingPongActor`s do not exit their message loop (a
1471        // non-default actor behavior) when they have an undelivered
1472        // message sent back to them (the reason being this very
1473        // test).
1474        assert!(matches!(*ping_handle.status().borrow(), ActorStatus::Idle));
1475
1476        // We expect n undelivered messages.
1477        let Ok(Undeliverable(envelope)) = proc_0_undeliverable_rx.recv().await else {
1478            unreachable!()
1479        };
1480        let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1481        let mut count = 1;
1482        while let Ok(Some(Undeliverable(envelope))) = proc_0_undeliverable_rx.try_recv() {
1483            // We care that every undeliverable message was accounted
1484            // for. We can't assume anything about their arrival
1485            // order.
1486            count += 1;
1487            let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1488        }
1489        assert!(count == n);
1490    }
1491
1492    #[tracing_test::traced_test]
1493    #[tokio::test]
1494    async fn test_proc_actor_mailbox_admin_message() {
1495        // Verify that proc actors update their address books on first
1496        // contact, and that no additional updates are triggered for
1497        // known procs.
1498
1499        use hyperactor::test_utils::pingpong::PingPongActor;
1500        use hyperactor::test_utils::pingpong::PingPongMessage;
1501
1502        // Serve a system.
1503        let server_handle = System::serve(
1504            ChannelAddr::any(ChannelTransport::Tcp),
1505            Duration::from_secs(120),
1506            Duration::from_secs(120),
1507        )
1508        .await
1509        .unwrap();
1510        let mut system = System::new(server_handle.local_addr().clone());
1511        let system_actor = server_handle.system_actor_handle();
1512        let system_client = system.attach().await.unwrap(); // world id: user
1513
1514        // Build a supervisor.
1515        let sup_mail = system.attach().await.unwrap();
1516        let (sup_tx, _sup_rx) = sup_mail.open_port::<ProcSupervisionMessage>();
1517        sup_tx.bind_to(ProcSupervisionMessage::port());
1518        let sup_ref = ActorRef::<ProcSupervisor>::attest(sup_mail.actor_id().clone());
1519
1520        // Construct a system sender.
1521        let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1522            channel::dial(server_handle.local_addr().clone()).unwrap(),
1523        ));
1524
1525        // Construct a proc forwarder in terms of the system sender.
1526        let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1527        let proc_forwarder =
1528            BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1529
1530        // Bootstrap proc 'world[0]', join the system.
1531        let world_id = id!(world);
1532        let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1533        let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1534            proc_0.clone(),
1535            world_id.clone(),
1536            listen_addr,
1537            server_handle.local_addr().clone(),
1538            sup_ref.clone(),
1539            Duration::from_secs(120),
1540            HashMap::new(),
1541            ProcLifecycleMode::ManagedBySystem,
1542        )
1543        .await
1544        .unwrap();
1545        let proc_0_client = proc_0.attach("client").unwrap();
1546        let (proc_0_undeliverable_tx, _proc_0_undeliverable_rx) = proc_0_client.open_port();
1547
1548        // Bootstrap a second proc 'world[1]', join the system.
1549        let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1550        let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1551            proc_1.clone(),
1552            world_id.clone(),
1553            ChannelAddr::any(ChannelTransport::Tcp),
1554            server_handle.local_addr().clone(),
1555            sup_ref.clone(),
1556            Duration::from_secs(120),
1557            HashMap::new(),
1558            ProcLifecycleMode::ManagedBySystem,
1559        )
1560        .await
1561        .unwrap();
1562        let proc_1_client = proc_1.attach("client").unwrap();
1563        let (proc_1_undeliverable_tx, _proc_1_undeliverable_rx) = proc_1_client.open_port();
1564
1565        // Spawn two actors 'ping' and 'pong' where 'ping' runs on
1566        // 'world[0]' and 'pong' on 'world[1]' (that is, not on the
1567        // same proc).
1568        let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1569        let ping_handle = proc_0
1570            .spawn::<PingPongActor>("ping", ping_params)
1571            .await
1572            .unwrap();
1573        let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1574        let pong_handle = proc_1
1575            .spawn::<PingPongActor>("pong", pong_params)
1576            .await
1577            .unwrap();
1578
1579        // Have 'ping' send 'pong' a message.
1580        let ttl = 10u64; // Avoid ttl = 66!
1581        let (once_tx, once_rx) = system_client.open_once_port::<bool>();
1582        ping_handle
1583            .send(PingPongMessage(ttl, pong_handle.bind(), once_tx.bind()))
1584            .unwrap();
1585
1586        assert!(once_rx.recv().await.unwrap());
1587
1588        // Ping gets Pong's address
1589        let expected_1 = r#"UpdateAddress {
1590    proc_id: Ranked(
1591        WorldId(
1592            "world",
1593        ),
1594        1,
1595    ),
1596    addr: Tcp("#;
1597
1598        // Pong gets Ping's address
1599        let expected_2 = r#"UpdateAddress {
1600    proc_id: Ranked(
1601        WorldId(
1602            "world",
1603        ),
1604        0,
1605    ),
1606    addr: Tcp("#;
1607
1608        // Ping gets "user"'s address
1609        let expected_3 = r#"UpdateAddress {
1610    proc_id: Ranked(
1611        WorldId(
1612            "user",
1613        ),"#;
1614
1615        logs_assert(|logs| {
1616            let log_body = logs.join("\n");
1617
1618            let pattern = Regex::new(r"(?m)^UpdateAddress \{\n(?:.*\n)*?^\}").unwrap();
1619            let count = pattern.find_iter(&log_body).count();
1620
1621            if count != 3 {
1622                return Err(format!(
1623                    "expected 3 UpdateAddress messages, found {}",
1624                    count
1625                ));
1626            }
1627
1628            if !log_body.contains(expected_1) {
1629                return Err("missing expected update for proc_id 1".into());
1630            }
1631            if !log_body.contains(expected_2) {
1632                return Err("missing expected update for proc_id 0".into());
1633            }
1634            if !log_body.contains(expected_3) {
1635                return Err("missing expected update for proc_id user".into());
1636            }
1637
1638            Ok(())
1639        });
1640
1641        let (once_tx, once_rx) = system_client.open_once_port::<()>();
1642        assert_matches!(
1643            system_actor
1644                .stop(
1645                    /*unused*/ &system_client,
1646                    None,
1647                    Duration::from_secs(1),
1648                    once_tx.bind()
1649                )
1650                .await,
1651            Ok(())
1652        );
1653        assert_matches!(once_rx.recv().await.unwrap(), ());
1654    }
1655
1656    #[tokio::test]
1657    async fn test_update_address_book_cache() {
1658        let server_handle = System::serve(
1659            ChannelAddr::any(ChannelTransport::Tcp),
1660            Duration::from_secs(2), // supervision update timeout
1661            Duration::from_secs(2), // duration to evict an unhealthy world
1662        )
1663        .await
1664        .unwrap();
1665        let system_addr = server_handle.local_addr().clone();
1666        let mut system = System::new(system_addr.clone());
1667
1668        let system_client = system.attach().await.unwrap();
1669
1670        // Spawn ping and pong actors to play a ping pong game.
1671        let ping_actor_id = id!(world[0].ping[0]);
1672        let (ping_actor_ref, _ping_proc_ref) =
1673            spawn_actor(&ping_actor_id, &system_addr, system_client.clone()).await;
1674
1675        let pong_actor_id = id!(world[1].pong[0]);
1676        let (pong_actor_ref, pong_proc_ref) =
1677            spawn_actor(&pong_actor_id, &system_addr, system_client.clone()).await;
1678
1679        // After playing the first round game, ping and pong actors has each other's
1680        // ChannelAddr cached in their procs' mailboxes, respectively.
1681        let (done_tx, done_rx) = system_client.open_once_port();
1682        let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1683        ping_actor_ref
1684            .send(&system_client, ping_pong_message)
1685            .unwrap();
1686        assert!(done_rx.recv().await.unwrap());
1687
1688        // Now we kill and respawn the pong actor so it's ChannelAddr is changed.
1689        let ProcStopResult { actors_aborted, .. } = pong_proc_ref
1690            .stop(&system_client, Duration::from_secs(1))
1691            .await
1692            .unwrap();
1693        assert_eq!(1, actors_aborted);
1694        let (pong_actor_ref, _pong_proc_ref) =
1695            spawn_actor(&pong_actor_id, &system_addr, system_client.clone()).await;
1696
1697        // Now we expect to play the game between ping and new pong. The new pong has the same
1698        // proc ID as the old pong but different ChannelAddr. The game should still be playable
1699        // with system actor updating the cached address of Pong inside Ping's mailbox.
1700        let (done_tx, done_rx) = system_client.open_once_port();
1701        let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1702        ping_actor_ref
1703            .send(&system_client, ping_pong_message)
1704            .unwrap();
1705        assert!(done_rx.recv().await.unwrap());
1706    }
1707
1708    async fn spawn_actor(
1709        actor_id: &ActorId,
1710        system_addr: &ChannelAddr,
1711        system_client: Mailbox,
1712    ) -> (ActorRef<PingPongActor>, ActorRef<ProcActor>) {
1713        let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1714        let bootstrap = ProcActor::bootstrap(
1715            actor_id.proc_id().clone(),
1716            actor_id
1717                .proc_id()
1718                .world_id()
1719                .expect("proc must be ranked for bootstrap world_id")
1720                .clone(),
1721            listen_addr.clone(),
1722            system_addr.clone(),
1723            Duration::from_secs(3),
1724            HashMap::new(),
1725            ProcLifecycleMode::ManagedBySystem,
1726        )
1727        .await
1728        .unwrap();
1729        let (undeliverable_msg_tx, _) = system_client.open_port();
1730        let params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
1731        let actor_ref = spawn::<PingPongActor>(
1732            &system_client,
1733            &bootstrap.proc_actor.bind(),
1734            &actor_id.to_string(),
1735            &params,
1736        )
1737        .await
1738        .unwrap();
1739        let proc_actor_ref = bootstrap.proc_actor.bind();
1740        (actor_ref, proc_actor_ref)
1741    }
1742}