hyperactor_multiprocess/
proc_actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Proc actor manages a proc. It works in conjunction with a
10//! [`super::system_actor::SystemActor`]. Proc actors are usually spawned
11//! as the "agent" to manage a proc directly.
12
13use core::fmt;
14use std::collections::HashMap;
15use std::process::Stdio;
16use std::time::Duration;
17use std::time::SystemTime;
18
19use async_trait::async_trait;
20use hyperactor::Actor;
21use hyperactor::Context;
22use hyperactor::Data;
23use hyperactor::HandleClient;
24use hyperactor::Handler;
25use hyperactor::Instance;
26use hyperactor::Named;
27use hyperactor::OncePortRef;
28use hyperactor::PortRef;
29use hyperactor::RefClient;
30use hyperactor::RemoteMessage;
31use hyperactor::WorldId;
32use hyperactor::actor::ActorHandle;
33use hyperactor::actor::Referable;
34use hyperactor::actor::remote::Remote;
35use hyperactor::channel;
36use hyperactor::channel::ChannelAddr;
37use hyperactor::clock::Clock;
38use hyperactor::clock::ClockKind;
39use hyperactor::context;
40use hyperactor::mailbox::BoxedMailboxSender;
41use hyperactor::mailbox::DialMailboxRouter;
42use hyperactor::mailbox::MailboxAdminMessage;
43use hyperactor::mailbox::MailboxAdminMessageHandler;
44use hyperactor::mailbox::MailboxClient;
45use hyperactor::mailbox::MailboxServer;
46use hyperactor::mailbox::MailboxServerHandle;
47use hyperactor::mailbox::open_port;
48use hyperactor::proc::ActorLedgerSnapshot;
49use hyperactor::proc::Proc;
50use hyperactor::reference::ActorId;
51use hyperactor::reference::ActorRef;
52use hyperactor::reference::Index;
53use hyperactor::reference::ProcId;
54use hyperactor::supervision::ActorSupervisionEvent;
55use hyperactor_mesh::comm::CommActor;
56use serde::Deserialize;
57use serde::Serialize;
58use tokio::process::Command;
59use tokio::sync::watch;
60use tokio_retry::strategy::jitter;
61
62use crate::pyspy::PySpyTrace;
63use crate::pyspy::py_spy;
64use crate::supervision::ProcStatus;
65use crate::supervision::ProcSupervisionMessageClient;
66use crate::supervision::ProcSupervisionState;
67use crate::supervision::ProcSupervisor;
68use crate::system_actor::ProcLifecycleMode;
69use crate::system_actor::SYSTEM_ACTOR_REF;
70use crate::system_actor::SystemActor;
71use crate::system_actor::SystemMessageClient;
72
73static HYPERACTOR_WORLD_ID: &str = "HYPERACTOR_WORLD_ID";
74static HYPERACTOR_PROC_ID: &str = "HYPERACTOR_PROC_ID";
75static HYPERACTOR_BOOTSTRAP_ADDR: &str = "HYPERACTOR_BOOTSTRAP_ADDR";
76static HYPERACTOR_WORLD_SIZE: &str = "HYPERACTOR_WORLD_SIZE";
77static HYPERACTOR_RANK: &str = "HYPERACTOR_RANK";
78static HYPERACTOR_LOCAL_RANK: &str = "HYPERACTOR_LOCAL_RANK";
79
80/// All setup parameters for an actor within a proc actor.
81#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
82pub enum Environment {
83    /// The actor is spawned within the proc actor.
84    Local,
85
86    /// Spawn the actor in a separate proc.
87    /// The program is the executable that will be spawned
88    /// by the host actor. The program will operate under 3
89    /// environment variables: ${HYPERACTOR_PROC_ID}: the proc id
90    /// of the proc actor being spawned,
91    /// ${HYPERACTOR_BOOTSTRAP_ADDR}: the address of the system actor and
92    /// ${HYPERACTOR_WORLD_SIZE}: the world size the proc is a part of.
93    /// This is often useful where a actor needs to know the side of the
94    /// world like worker using nccl comms.
95    Exec {
96        /// The program to run in order to spawn the actor.
97        program: String,
98    },
99}
100
101/// The state of the proc.
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
103pub enum ProcState {
104    /// The proc is waiting to the join the system.
105    AwaitingJoin,
106
107    /// The proc has joined the system.
108    Joined,
109}
110
111impl fmt::Display for ProcState {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        match self {
114            Self::AwaitingJoin => write!(f, "AwaitingJoin"),
115            Self::Joined => write!(f, "Joined"),
116        }
117    }
118}
119
120/// The result after stopping the proc.
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
122pub struct ProcStopResult {
123    /// The proc being stopped.
124    pub proc_id: ProcId,
125    /// The number of actors observed to stop.
126    pub actors_stopped: usize,
127    /// The number of proc actors that were aborted.
128    pub actors_aborted: usize,
129}
130
131/// A snapshot of the proc.
132#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
133pub struct ProcSnapshot {
134    /// The state of the proc.
135    pub state: ProcState,
136    /// The snapshot of the actors in the proc.
137    pub actors: ActorLedgerSnapshot,
138}
139
140/// Remote py-spy dump configuration.
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
142pub enum PySpyConfig {
143    /// Nonblocking dump for only python frames.
144    NonBlocking,
145    /// Blocking dump. Specifies whether native threads are included,
146    /// and if so, whether those threads should also include native stack frames.
147    Blocking {
148        /// Dump native stack frames.
149        native: Option<bool>,
150        /// Dump stack frames for native threads. Implies native.
151        native_all: Option<bool>,
152    },
153}
154
155/// A stack trace of the proc.
156/// Wrapper to dervice Named.
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
158pub struct StackTrace {
159    /// The stack trace.
160    pub trace: PySpyTrace,
161}
162
163/// Proc management messages.
164#[derive(
165    Handler,
166    HandleClient,
167    RefClient,
168    Debug,
169    Serialize,
170    Deserialize,
171    Clone,
172    PartialEq,
173    Named
174)]
175pub enum ProcMessage {
176    /// Indicate that the proc has joined the system. This is sent by
177    /// the system actor when the proc has been registered and is ready
178    /// to receive instructions.
179    #[log_level(debug)]
180    Joined(),
181
182    /// Retrieve the state of the proc.
183    #[log_level(debug)]
184    State {
185        /// Used to return the result of the caller.
186        #[reply]
187        ret: OncePortRef<ProcState>,
188    },
189
190    /// Spawn an actor on the proc to the provided name.
191    Spawn {
192        /// registered actor type
193        actor_type: String,
194        /// spawned actor name
195        actor_name: String,
196        /// serialized parameters
197        params_data: Data,
198        /// reply port; the proc should send its rank to indicate a spawned actor
199        status_port: PortRef<Index>,
200    },
201
202    /// Spawn a set of proc actors in the specified world (never its own)
203    SpawnProc {
204        /// Spawn the proc locally or in a separate program.
205        env: Environment,
206        /// The world into which to spawn the procs.
207        world_id: WorldId,
208        /// The proc ids of the procs to spawn.
209        proc_ids: Vec<ProcId>,
210        /// The total number of procs in the specified world.
211        world_size: usize,
212    },
213
214    /// Self message to trigger a supervision update to the system actor.
215    #[log_level(debug)]
216    UpdateSupervision(),
217
218    /// Stop the proc. Returns a pair of counts:
219    /// - the number of actors observed to stop;
220    /// - the number of proc actors not observed to stop.
221    ///
222    /// There will be at least one actor (the proc-actor itself) that
223    /// will be not observed to stop. If there is more than one
224    /// timeouts are indicated.
225    Stop {
226        /// The duration to wait for an actor to report status `Stopped`.
227        timeout: Duration,
228        /// Used to return the result to the caller.
229        #[reply]
230        reply_to: OncePortRef<ProcStopResult>,
231    },
232
233    /// Return a snapshot view of this proc. Used for debugging.
234    #[log_level(debug)]
235    Snapshot {
236        /// Used to return the result of the caller.
237        #[reply]
238        reply_to: OncePortRef<ProcSnapshot>,
239    },
240
241    /// Get the proc local addr.
242    LocalAddr {
243        /// Used to return the result of the caller.
244        #[reply]
245        reply_to: OncePortRef<ChannelAddr>,
246    },
247
248    /// Run pyspy on the current proc and return the stack trace.
249    #[log_level(debug)]
250    PySpyDump {
251        /// Dump config.
252        config: PySpyConfig,
253        /// Used to return the result of the caller.
254        #[reply]
255        reply_to: OncePortRef<StackTrace>,
256    },
257}
258
259/// Parameters for managing the proc.
260#[derive(Debug, Clone)]
261pub struct ProcActorParams {
262    /// The proc that is managed by this actor.
263    pub proc: Proc,
264
265    /// The system world to which this proc belongs.
266    pub world_id: WorldId,
267
268    /// Reference to the system actor that is managing this proc.
269    pub system_actor_ref: ActorRef<SystemActor>,
270
271    /// The channel address used to communicate with the system actor
272    /// that manages this proc. This is passed through so that the proc
273    /// can spawn sibling procs.
274    pub bootstrap_channel_addr: ChannelAddr,
275
276    /// The local address of this proc actor. This is used by
277    /// the proc actor to register the proc with the system actor.
278    pub local_addr: ChannelAddr,
279
280    /// Watch into which the proc's state is published.
281    pub state_watch: watch::Sender<ProcState>,
282
283    /// Reference to supervisor.
284    pub supervisor_actor_ref: ActorRef<ProcSupervisor>,
285
286    /// Interval of reporting supervision status to the system actor.
287    pub supervision_update_interval: Duration,
288
289    /// Arbitrary labels for the proc. They can be used later to query
290    /// proc(s) using system snapshot api.
291    pub labels: HashMap<String, String>,
292
293    /// Proc lifecycle management mode.
294    ///
295    /// If a proc is not managed
296    ///   * it will not be stopped when system shutdowns.
297    ///   * it will not be captured by system snapshot.
298    ///
299    /// Not being managed is useful for procs that runs on the client side,
300    /// which might need to stay around for a while after the system is gone.
301    pub lifecycle_mode: ProcLifecycleMode,
302}
303
304/// Outputs from bootstrapping proc.
305#[derive(Debug)]
306pub struct BootstrappedProc {
307    /// Handle to proc actor.
308    pub proc_actor: ActorHandle<ProcActor>,
309    /// Handle to comm actor.
310    pub comm_actor: ActorHandle<CommActor>,
311    /// Mailbox for address served by proc actor.
312    pub mailbox: MailboxServerHandle,
313}
314
315/// ProcActor manages a single proc. It is responsible for managing
316/// the lifecycle of all of the proc's actors, and to route messages
317/// accordingly.
318#[derive(Debug)]
319#[hyperactor::export(
320    handlers = [
321        ProcMessage,
322        MailboxAdminMessage,
323    ],
324)]
325pub struct ProcActor {
326    params: ProcActorParams,
327    state: ProcState,
328    remote: Remote,
329    last_successful_supervision_update: SystemTime,
330}
331
332impl ProcActor {
333    /// Bootstrap a proc actor with the provided proc id. The bootstrapped proc
334    /// actor will use the provided listen address to serve its mailbox, while
335    /// the bootstrap address is used to register with the system actor.
336    #[hyperactor::instrument]
337    pub async fn bootstrap(
338        proc_id: ProcId,
339        world_id: WorldId,
340        listen_addr: ChannelAddr,
341        bootstrap_addr: ChannelAddr,
342        supervision_update_interval: Duration,
343        labels: HashMap<String, String>,
344        lifecycle_mode: ProcLifecycleMode,
345    ) -> Result<BootstrappedProc, anyhow::Error> {
346        let system_supervision_ref: ActorRef<ProcSupervisor> =
347            ActorRef::attest(SYSTEM_ACTOR_REF.actor_id().clone());
348
349        Self::try_bootstrap(
350            proc_id.clone(),
351            world_id.clone(),
352            listen_addr.clone(),
353            bootstrap_addr.clone(),
354            system_supervision_ref.clone(),
355            supervision_update_interval,
356            labels,
357            lifecycle_mode,
358        )
359        .await
360        .inspect_err(|err| {
361            tracing::error!(
362                "bootstrap {} {} {} {}: {}",
363                proc_id,
364                world_id,
365                listen_addr,
366                bootstrap_addr,
367                err
368            );
369        })
370    }
371
372    /// Attempt to bootstrap a proc actor with the provided proc id. The bootstrapped proc
373    /// actor will use the provided listen address to serve its mailbox, while
374    /// the bootstrap address is used to register with the system actor.
375    #[hyperactor::instrument]
376    pub async fn try_bootstrap(
377        proc_id: ProcId,
378        world_id: WorldId,
379        listen_addr: ChannelAddr,
380        bootstrap_addr: ChannelAddr,
381        supervisor_actor_ref: ActorRef<ProcSupervisor>,
382        supervision_update_interval: Duration,
383        labels: HashMap<String, String>,
384        lifecycle_mode: ProcLifecycleMode,
385    ) -> Result<BootstrappedProc, anyhow::Error> {
386        let system_sender =
387            BoxedMailboxSender::new(MailboxClient::new(channel::dial(bootstrap_addr.clone())?));
388        let clock = ClockKind::for_channel_addr(&listen_addr);
389
390        let proc_forwarder =
391            BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
392        let proc = Proc::new_with_clock(proc_id.clone(), proc_forwarder, clock);
393        Self::bootstrap_for_proc(
394            proc,
395            world_id,
396            listen_addr,
397            bootstrap_addr,
398            supervisor_actor_ref,
399            supervision_update_interval,
400            labels,
401            lifecycle_mode,
402        )
403        .await
404    }
405
406    /// Bootstrap a proc actor with the provided proc. The bootstrapped proc actor
407    /// will use the provided listen address to serve its mailbox, while the bootstrap
408    /// address is used to register with the system actor.
409    #[hyperactor::instrument]
410    pub async fn bootstrap_for_proc(
411        proc: Proc,
412        world_id: WorldId,
413        listen_addr: ChannelAddr,
414        bootstrap_addr: ChannelAddr,
415        supervisor_actor_ref: ActorRef<ProcSupervisor>,
416        supervision_update_interval: Duration,
417        labels: HashMap<String, String>,
418        lifecycle_mode: ProcLifecycleMode,
419    ) -> Result<BootstrappedProc, anyhow::Error> {
420        let (local_addr, rx) = channel::serve(listen_addr)?;
421        let mailbox_handle = proc.clone().serve(rx);
422        let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin);
423
424        let handle = match proc
425            .clone()
426            .spawn::<Self>(
427                "proc",
428                ProcActorParams {
429                    proc: proc.clone(),
430                    world_id: world_id.clone(),
431                    system_actor_ref: SYSTEM_ACTOR_REF.clone(),
432                    bootstrap_channel_addr: bootstrap_addr,
433                    local_addr,
434                    state_watch: state_tx,
435                    supervisor_actor_ref,
436                    supervision_update_interval,
437                    labels,
438                    lifecycle_mode,
439                },
440            )
441            .await
442        {
443            Ok(handle) => handle,
444            Err(e) => {
445                Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
446                return Err(e);
447            }
448        };
449
450        let comm_actor = match proc
451            .clone()
452            .spawn::<CommActor>("comm", Default::default())
453            .await
454        {
455            Ok(handle) => handle,
456            Err(e) => {
457                Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
458                return Err(e);
459            }
460        };
461        comm_actor.bind::<CommActor>();
462
463        loop {
464            let proc_state = state_rx.borrow_and_update().clone();
465            tracing::info!("{}: state: {:?}", &proc.proc_id(), proc_state);
466            if matches!(proc_state, ProcState::Joined) {
467                break;
468            }
469            match state_rx.changed().await {
470                Ok(_) => {}
471                Err(e) => {
472                    Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
473                    return Err(e.into());
474                }
475            }
476        }
477
478        proc.set_supervision_coordinator(handle.port::<ActorSupervisionEvent>())?;
479
480        Ok(BootstrappedProc {
481            proc_actor: handle,
482            mailbox: mailbox_handle,
483            comm_actor,
484        })
485    }
486
487    /// Shutdown the mailbox server to free up rx and its cooresponding listen address.
488    /// Because in the next bootstrap attempt, the same listen address will be used.
489    async fn failed_proc_bootstrap_cleanup(mailbox_handle: MailboxServerHandle) {
490        mailbox_handle.stop("failed proc bootstrap cleanup");
491        if let Err(shutdown_err) = mailbox_handle.await {
492            // Ignore the shutdown error and populate the original error.
493            tracing::error!(
494                "error shutting down during a failed bootstrap attempt: {}",
495                shutdown_err
496            );
497        }
498    }
499}
500
501#[async_trait]
502impl Actor for ProcActor {
503    type Params = ProcActorParams;
504
505    async fn new(params: ProcActorParams) -> Result<Self, anyhow::Error> {
506        let last_successful_supervision_update = params.proc.clock().system_time_now();
507        Ok(Self {
508            params,
509            state: ProcState::AwaitingJoin,
510            remote: Remote::collect(),
511            last_successful_supervision_update,
512        })
513    }
514
515    async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
516        // Bind ports early so that when the proc actor joins, it can serve.
517        this.bind::<Self>();
518
519        // Join the system.
520        self.params
521            .system_actor_ref
522            .join(
523                this,
524                /*world_id=*/ self.params.world_id.clone(),
525                /*proc_id=*/ self.params.proc.proc_id().clone(),
526                /*proc_message_port=*/ this.port().bind(),
527                /*proc_addr=*/ self.params.local_addr.clone(),
528                self.params.labels.clone(),
529                self.params.lifecycle_mode.clone(),
530            )
531            .await?;
532
533        // Trigger supervision status update
534        // TODO: let the system actor determine/update the supervision interval.
535        // Maybe by returning it from the join call, or defining some other proc
536        // message to adjust it.
537        if self.params.supervision_update_interval > Duration::from_secs(0)
538            && self.params.lifecycle_mode == ProcLifecycleMode::ManagedBySystem
539        {
540            this.self_message_with_delay(
541                ProcMessage::UpdateSupervision(),
542                self.params.supervision_update_interval,
543            )?;
544        }
545
546        Ok(())
547    }
548}
549
550impl ProcActor {
551    /// This proc's rank in the world.
552    fn rank(&self) -> Index {
553        self.params
554            .proc
555            .proc_id()
556            .rank()
557            .expect("proc must be ranked")
558    }
559}
560
561#[hyperactor::forward(MailboxAdminMessage)]
562#[async_trait]
563impl MailboxAdminMessageHandler for ProcActor {
564    async fn update_address(
565        &mut self,
566        cx: &Context<Self>,
567        proc_id: ProcId,
568        addr: ChannelAddr,
569    ) -> Result<(), anyhow::Error> {
570        tracing::trace!(
571            "received address update:\n{:#?}",
572            MailboxAdminMessage::UpdateAddress {
573                proc_id: proc_id.clone(),
574                addr: addr.clone()
575            }
576        );
577        let forwarder = cx.proc().forwarder();
578        if let Some(router) = forwarder.downcast_ref::<DialMailboxRouter>() {
579            router.bind(proc_id.into(), addr);
580        } else {
581            tracing::warn!(
582                "proc {} received update_address but does not use a DialMailboxRouter",
583                cx.proc().proc_id()
584            );
585        }
586
587        Ok(())
588    }
589}
590
591#[async_trait]
592#[hyperactor::forward(ProcMessage)]
593impl ProcMessageHandler for ProcActor {
594    async fn joined(&mut self, _cx: &Context<Self>) -> Result<(), anyhow::Error> {
595        self.state = ProcState::Joined;
596        let _ = self.params.state_watch.send(self.state.clone());
597        Ok(())
598    }
599
600    async fn state(&mut self, _cx: &Context<Self>) -> Result<ProcState, anyhow::Error> {
601        Ok(self.state.clone())
602    }
603
604    async fn spawn(
605        &mut self,
606        cx: &Context<Self>,
607        actor_type: String,
608        actor_name: String,
609        params_data: Data,
610        status_port: PortRef<Index>,
611    ) -> Result<(), anyhow::Error> {
612        let _actor_id = self
613            .remote
614            .gspawn(&self.params.proc, &actor_type, &actor_name, params_data)
615            .await?;
616
617        // Signal that the actor has joined:
618        status_port.send(cx, self.rank())?;
619        Ok(())
620    }
621
622    async fn spawn_proc(
623        &mut self,
624        _cx: &Context<Self>,
625        env: Environment,
626        world_id: WorldId,
627        proc_ids: Vec<ProcId>,
628        world_size: usize,
629    ) -> Result<(), anyhow::Error> {
630        for (index, proc_id) in proc_ids.into_iter().enumerate() {
631            let proc_world_id = proc_id
632                .world_id()
633                .expect("proc must be ranked for world_id access")
634                .clone();
635            // Check world id isn't the same as this proc's world id.
636            if &proc_world_id
637                == self
638                    .params
639                    .proc
640                    .proc_id()
641                    .world_id()
642                    .expect("proc must be ranked for world_id access")
643                || &world_id
644                    == self
645                        .params
646                        .proc
647                        .proc_id()
648                        .world_id()
649                        .expect("proc must be ranked for world_id access")
650            {
651                return Err(anyhow::anyhow!(
652                    "cannot spawn proc in same world {}",
653                    proc_world_id
654                ));
655            }
656            match env {
657                Environment::Local => {
658                    ProcActor::bootstrap(
659                        proc_id,
660                        world_id.clone(),
661                        ChannelAddr::any(self.params.bootstrap_channel_addr.transport()),
662                        self.params.bootstrap_channel_addr.clone(),
663                        self.params.supervision_update_interval,
664                        HashMap::new(),
665                        ProcLifecycleMode::ManagedBySystem,
666                    )
667                    .await?;
668                }
669                Environment::Exec { ref program } => {
670                    tracing::info!("spawning proc {} with program {}", proc_id, program);
671                    let mut child = Command::new(program);
672                    let _ = child
673                        .env(HYPERACTOR_WORLD_ID, world_id.to_string())
674                        .env(HYPERACTOR_PROC_ID, proc_id.to_string())
675                        .env(
676                            HYPERACTOR_BOOTSTRAP_ADDR,
677                            self.params.bootstrap_channel_addr.to_string(),
678                        )
679                        .env(HYPERACTOR_WORLD_SIZE, world_size.to_string())
680                        .env(
681                            HYPERACTOR_RANK,
682                            proc_id
683                                .rank()
684                                .expect("proc must be ranked for rank env var")
685                                .to_string(),
686                        )
687                        .env(HYPERACTOR_LOCAL_RANK, index.to_string())
688                        .stdin(Stdio::null())
689                        .stdout(Stdio::inherit())
690                        .stderr(Stdio::inherit())
691                        .spawn()?;
692                }
693            }
694        }
695        Ok(())
696    }
697
698    async fn update_supervision(&mut self, cx: &Context<Self>) -> Result<(), anyhow::Error> {
699        // Delay for next supervision update with some jitter.
700        let delay = jitter(self.params.supervision_update_interval);
701
702        // Only start updating supervision after the proc is joined.
703        if self.state != ProcState::Joined {
704            cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
705            return Ok(());
706        }
707
708        let msg = ProcSupervisionState {
709            world_id: self.params.world_id.clone(),
710            proc_id: self.params.proc.proc_id().clone(),
711            proc_addr: self.params.local_addr.clone(),
712            proc_health: ProcStatus::Alive,
713            failed_actors: Vec::new(),
714        };
715
716        match cx
717            .clock()
718            .timeout(
719                // TODO: make the timeout configurable
720                Duration::from_secs(10),
721                self.params.supervisor_actor_ref.update(cx, msg),
722            )
723            .await
724        {
725            Ok(_) => {
726                self.last_successful_supervision_update = cx.clock().system_time_now();
727            }
728            Err(_) => {}
729        }
730
731        let supervision_staleness = self
732            .last_successful_supervision_update
733            .elapsed()
734            .unwrap_or_default();
735        // Timeout when there are 3 consecutive supervision updates that fail.
736        // TODO: make number of failed updates configurable.
737        if supervision_staleness > 5 * self.params.supervision_update_interval {
738            tracing::error!(
739                "system actor isn't responsive to supervision update, stopping the proc"
740            );
741            // System actor is not responsive to supervision update, it is likely dead. Stop this proc.
742            // TODO: make the timeout configurable
743            self.stop(cx, Duration::from_secs(5)).await?;
744        } else {
745            // Schedule the next supervision update with some jitter.
746            let delay = jitter(self.params.supervision_update_interval);
747            cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
748        }
749
750        Ok(())
751    }
752
753    async fn stop(
754        &mut self,
755        cx: &Context<Self>,
756        timeout: Duration,
757    ) -> Result<ProcStopResult, anyhow::Error> {
758        tracing::info!("stopping proc {}", self.params.proc.proc_id());
759        self.params
760            .proc
761            .destroy_and_wait(timeout, Some(cx))
762            .await
763            .map(|(stopped, aborted)| {
764                tracing::info!("stopped proc {}", self.params.proc.proc_id());
765                ProcStopResult {
766                    proc_id: self.params.proc.proc_id().clone(),
767                    actors_stopped: stopped.len(),
768                    actors_aborted: aborted.len(),
769                }
770            })
771    }
772
773    async fn snapshot(&mut self, _cx: &Context<Self>) -> Result<ProcSnapshot, anyhow::Error> {
774        let state = self.state.clone();
775        let actors = self.params.proc.ledger_snapshot();
776        Ok(ProcSnapshot { state, actors })
777    }
778
779    async fn local_addr(&mut self, _cx: &Context<Self>) -> Result<ChannelAddr, anyhow::Error> {
780        Ok(self.params.local_addr.clone())
781    }
782
783    async fn py_spy_dump(
784        &mut self,
785        _cx: &Context<Self>,
786        config: PySpyConfig,
787    ) -> Result<StackTrace, anyhow::Error> {
788        let pid = std::process::id() as i32;
789        tracing::info!(
790            "running py-spy on proc {}, process id: {}",
791            self.params.proc.proc_id(),
792            pid
793        );
794        let trace = match config {
795            PySpyConfig::Blocking { native, native_all } => {
796                py_spy(
797                    pid,
798                    native.unwrap_or_default(),
799                    native_all.unwrap_or_default(),
800                    true,
801                )
802                .await?
803            }
804            PySpyConfig::NonBlocking => py_spy(pid, false, false, false).await?,
805        };
806        Ok(StackTrace { trace })
807    }
808}
809
810#[async_trait]
811impl Handler<ActorSupervisionEvent> for ProcActor {
812    async fn handle(
813        &mut self,
814        cx: &Context<Self>,
815        event: ActorSupervisionEvent,
816    ) -> anyhow::Result<()> {
817        let status = event.status();
818        let message = ProcSupervisionState {
819            world_id: self.params.world_id.clone(),
820            proc_id: self.params.proc.proc_id().clone(),
821            proc_addr: self.params.local_addr.clone(),
822            proc_health: ProcStatus::Alive,
823            failed_actors: Vec::from([(event.actor_id, status)]),
824        };
825        self.params.supervisor_actor_ref.update(cx, message).await?;
826        Ok(())
827    }
828}
829
830/// Convenience utility to spawn an actor on a proc. Spawn returns
831/// with the new ActorRef on success.
832pub async fn spawn<A: Actor + Referable>(
833    cx: &impl context::Actor,
834    proc_actor: &ActorRef<ProcActor>,
835    actor_name: &str,
836    params: &A::Params,
837) -> Result<ActorRef<A>, anyhow::Error>
838where
839    A::Params: RemoteMessage,
840{
841    let remote = Remote::collect();
842    let (spawned_port, mut spawned_receiver) = open_port(cx);
843    let ActorId(proc_id, _, _) = (*proc_actor).clone().into();
844
845    proc_actor
846        .spawn(
847            cx,
848            remote
849                .name_of::<A>()
850                .ok_or(anyhow::anyhow!("actor not registered"))?
851                .into(),
852            actor_name.into(),
853            bincode::serialize(params)?,
854            spawned_port.bind(),
855        )
856        .await?;
857
858    // Wait for the spawned actor to join.
859    while spawned_receiver.recv().await?
860        != proc_id
861            .rank()
862            .expect("proc must be ranked for rank comparison")
863    {}
864
865    // Gspawned actors are always exported.
866    Ok(ActorRef::attest(proc_id.actor_id(actor_name, 0)))
867}
868
869#[cfg(test)]
870mod tests {
871    use std::assert_matches::assert_matches;
872    use std::collections::HashSet;
873    use std::time::Duration;
874
875    use hyperactor::actor::ActorStatus;
876    use hyperactor::channel;
877    use hyperactor::channel::ChannelAddr;
878    use hyperactor::channel::ChannelTransport;
879    use hyperactor::clock::Clock;
880    use hyperactor::clock::RealClock;
881    use hyperactor::data::Named;
882    use hyperactor::forward;
883    use hyperactor::id;
884    use hyperactor::reference::ActorRef;
885    use hyperactor::test_utils::pingpong::PingPongActor;
886    use hyperactor::test_utils::pingpong::PingPongActorParams;
887    use hyperactor::test_utils::pingpong::PingPongMessage;
888    use maplit::hashset;
889    use rand::Rng;
890    use rand::distributions::Alphanumeric;
891    use regex::Regex;
892
893    use super::*;
894    use crate::supervision::ProcSupervisionMessage;
895    use crate::system::ServerHandle;
896    use crate::system::System;
897
898    const MAX_WAIT_TIME: Duration = Duration::new(10, 0);
899
900    struct Bootstrapped {
901        server_handle: ServerHandle,
902        proc_actor_ref: ActorRef<ProcActor>,
903        comm_actor_ref: ActorRef<CommActor>,
904        client: Instance<()>,
905    }
906
907    async fn bootstrap() -> Bootstrapped {
908        let server_handle = System::serve(
909            ChannelAddr::any(ChannelTransport::Local),
910            Duration::from_secs(10),
911            Duration::from_secs(10),
912        )
913        .await
914        .unwrap();
915
916        let world_id = id!(world);
917        let proc_id = world_id.proc_id(0);
918        let bootstrap = ProcActor::bootstrap(
919            proc_id,
920            world_id,
921            ChannelAddr::any(ChannelTransport::Local),
922            server_handle.local_addr().clone(),
923            Duration::from_secs(1),
924            HashMap::new(),
925            ProcLifecycleMode::ManagedBySystem,
926        )
927        .await
928        .unwrap();
929
930        // Now join the system and talk to the proc actor.
931
932        let mut system = System::new(server_handle.local_addr().clone());
933        let client = system.attach().await.unwrap();
934
935        // This is really not cool. We should manage state subscriptions instead.
936        let start = RealClock.now();
937        let mut proc_state;
938        loop {
939            proc_state = bootstrap.proc_actor.state(&client).await.unwrap();
940
941            if matches!(proc_state, ProcState::Joined) || start.elapsed() >= MAX_WAIT_TIME {
942                break;
943            }
944        }
945        assert_matches!(proc_state, ProcState::Joined);
946
947        Bootstrapped {
948            server_handle,
949            proc_actor_ref: bootstrap.proc_actor.bind(),
950            comm_actor_ref: bootstrap.comm_actor.bind(),
951            client,
952        }
953    }
954
955    #[tokio::test]
956    async fn test_bootstrap() {
957        let Bootstrapped { server_handle, .. } = bootstrap().await;
958
959        println!("bootrapped, now waiting");
960
961        server_handle.stop().await.unwrap();
962        server_handle.await;
963    }
964
965    #[derive(Debug, Default, Actor)]
966    #[hyperactor::export(
967        spawn = true,
968        handlers = [
969            TestActorMessage,
970        ],
971    )]
972    struct TestActor;
973
974    #[derive(Handler, HandleClient, RefClient, Serialize, Deserialize, Debug, Named)]
975    enum TestActorMessage {
976        Increment(u64, #[reply] OncePortRef<u64>),
977        Fail(String),
978    }
979
980    #[async_trait]
981    #[forward(TestActorMessage)]
982    impl TestActorMessageHandler for TestActor {
983        async fn increment(&mut self, _cx: &Context<Self>, num: u64) -> Result<u64, anyhow::Error> {
984            Ok(num + 1)
985        }
986
987        async fn fail(&mut self, _cx: &Context<Self>, err: String) -> Result<(), anyhow::Error> {
988            Err(anyhow::anyhow!(err))
989        }
990    }
991
992    #[tokio::test]
993    async fn test_stop() {
994        // Show here that the proc actors are stopped when the proc
995        // actor receives a `Stop()` message.
996        let Bootstrapped {
997            server_handle,
998            proc_actor_ref,
999            client,
1000            ..
1001        } = bootstrap().await;
1002
1003        const NUM_ACTORS: usize = 4usize;
1004        for i in 0..NUM_ACTORS {
1005            spawn::<TestActor>(&client, &proc_actor_ref, format!("test{i}").as_str(), &())
1006                .await
1007                .unwrap();
1008        }
1009
1010        let ProcStopResult {
1011            proc_id: _,
1012            actors_stopped,
1013            actors_aborted,
1014        } = proc_actor_ref
1015            .stop(&client, Duration::from_secs(1))
1016            .await
1017            .unwrap();
1018        assert_eq!(NUM_ACTORS + 1, actors_stopped);
1019        assert_eq!(1, actors_aborted);
1020
1021        server_handle.stop().await.unwrap();
1022        server_handle.await;
1023    }
1024
1025    // Sleep
1026    #[derive(Debug, Default, Actor)]
1027    #[hyperactor::export(
1028        spawn = true,
1029        handlers = [
1030            u64,
1031        ],
1032    )]
1033    struct SleepActor {}
1034
1035    #[async_trait]
1036    impl Handler<u64> for SleepActor {
1037        async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> anyhow::Result<()> {
1038            let duration = message;
1039            RealClock.sleep(Duration::from_secs(duration)).await;
1040            Ok(())
1041        }
1042    }
1043
1044    #[tracing_test::traced_test]
1045    #[tokio::test]
1046    async fn test_stop_timeout() {
1047        let Bootstrapped {
1048            server_handle,
1049            proc_actor_ref,
1050            client,
1051            ..
1052        } = bootstrap().await;
1053
1054        const NUM_ACTORS: usize = 4usize;
1055        for i in 0..NUM_ACTORS {
1056            let sleep_secs = 5u64;
1057            let sleeper = spawn::<SleepActor>(
1058                &client,
1059                &proc_actor_ref,
1060                format!("sleeper{i}").as_str(),
1061                &(),
1062            )
1063            .await
1064            .unwrap();
1065            if i > 0 {
1066                sleeper.send(&client, sleep_secs).unwrap();
1067            }
1068        }
1069
1070        let ProcStopResult {
1071            proc_id: _,
1072            actors_stopped,
1073            actors_aborted,
1074        } = proc_actor_ref
1075            .stop(&client, Duration::from_secs(1))
1076            .await
1077            .unwrap();
1078        assert_eq!(2, actors_stopped);
1079        assert_eq!((NUM_ACTORS - 1) + 1, actors_aborted);
1080
1081        assert!(tracing_test::internal::logs_with_scope_contain(
1082            "hyperactor::proc",
1083            "world[0].proc[0]: aborting (delayed) JoinHandle"
1084        ));
1085        for i in 1..3 {
1086            assert!(tracing_test::internal::logs_with_scope_contain(
1087                "hyperactor::proc",
1088                format!("world[0].sleeper{}[0]: aborting JoinHandle", i).as_str()
1089            ));
1090        }
1091        logs_assert(|logs| {
1092            let count = logs
1093                .iter()
1094                .filter(|log| {
1095                    log.contains("aborting JoinHandle")
1096                        || log.contains("aborting (delayed) JoinHandle")
1097                })
1098                .count();
1099            if count == actors_aborted {
1100                Ok(())
1101            } else {
1102                Err("task abort counting error".to_string())
1103            }
1104        });
1105
1106        server_handle.stop().await.unwrap();
1107        server_handle.await;
1108    }
1109
1110    #[tokio::test]
1111    async fn test_spawn() {
1112        let Bootstrapped {
1113            server_handle,
1114            proc_actor_ref,
1115            client,
1116            ..
1117        } = bootstrap().await;
1118
1119        let test_actor_ref = spawn::<TestActor>(&client, &proc_actor_ref, "test", &())
1120            .await
1121            .unwrap();
1122
1123        let result = test_actor_ref.increment(&client, 1).await.unwrap();
1124        assert_eq!(result, 2);
1125
1126        server_handle.stop().await.unwrap();
1127        server_handle.await;
1128    }
1129
1130    #[cfg(target_os = "linux")]
1131    fn random_abstract_addr() -> ChannelAddr {
1132        let random_string = rand::thread_rng()
1133            .sample_iter(&Alphanumeric)
1134            .take(24)
1135            .map(char::from)
1136            .collect::<String>();
1137        format!("unix!@{random_string}").parse().unwrap()
1138    }
1139
1140    #[cfg(target_os = "linux")] // remove after making abstract unix sockets store-and-forward
1141    #[tokio::test]
1142    async fn test_bootstrap_retry() {
1143        if std::env::var("CARGO_TEST").is_ok() {
1144            eprintln!("test skipped under cargo as it causes other tests to fail when run");
1145            return;
1146        }
1147
1148        // Spawn the proc before the server is up. This is imperfect
1149        // as we rely on sleeping. Ideally we'd make sure the proc performs
1150        // at least one try before we start the server.
1151        let bootstrap_addr = random_abstract_addr();
1152
1153        let bootstrap_addr_clone = bootstrap_addr.clone();
1154        let handle = tokio::spawn(async move {
1155            let world_id = id!(world);
1156            let proc_id = world_id.proc_id(0);
1157            let bootstrap = ProcActor::bootstrap(
1158                proc_id,
1159                world_id,
1160                random_abstract_addr(),
1161                bootstrap_addr_clone,
1162                Duration::from_secs(1),
1163                HashMap::new(),
1164                ProcLifecycleMode::ManagedBySystem,
1165            )
1166            .await
1167            .unwrap();
1168
1169            // Proc actor should still be running.
1170            let mut status = bootstrap.proc_actor.status();
1171            assert_eq!(*status.borrow_and_update(), ActorStatus::Idle);
1172        });
1173
1174        // Sleep for enough time, the ProcActor supervision shouldn't timed out causing ProcActor to stop.
1175        // When System actor is brought up later, it should finish properly.
1176        RealClock.sleep(Duration::from_secs(5)).await;
1177
1178        let _server_handle = System::serve(
1179            bootstrap_addr,
1180            Duration::from_secs(10),
1181            Duration::from_secs(10),
1182        )
1183        .await
1184        .unwrap();
1185
1186        // Task completed successfully, so it connected correctly.
1187        handle.await.unwrap();
1188    }
1189
1190    #[tokio::test]
1191    async fn test_supervision_message_handling() {
1192        if std::env::var("CARGO_TEST").is_ok() {
1193            eprintln!("test skipped under cargo as it fails when run with others");
1194            return;
1195        }
1196
1197        let server_handle = System::serve(
1198            ChannelAddr::any(ChannelTransport::Local),
1199            Duration::from_secs(3600),
1200            Duration::from_secs(3600),
1201        )
1202        .await
1203        .unwrap();
1204
1205        // A test supervisor.
1206        let mut system = System::new(server_handle.local_addr().clone());
1207        let supervisor = system.attach().await.unwrap();
1208        let (supervisor_supervision_tx, mut supervisor_supervision_receiver) =
1209            supervisor.open_port::<ProcSupervisionMessage>();
1210        supervisor_supervision_tx.bind_to(ProcSupervisionMessage::port());
1211        let supervisor_actor_ref: ActorRef<ProcSupervisor> =
1212            ActorRef::attest(supervisor.self_id().clone());
1213
1214        // Start the proc actor
1215        let local_world_id = hyperactor::id!(test_proc);
1216        let local_proc_id = local_world_id.proc_id(0);
1217        let bootstrap = ProcActor::try_bootstrap(
1218            local_proc_id.clone(),
1219            local_world_id.clone(),
1220            ChannelAddr::any(ChannelTransport::Local),
1221            server_handle.local_addr().clone(),
1222            supervisor_actor_ref.clone(),
1223            Duration::from_secs(1),
1224            HashMap::new(),
1225            ProcLifecycleMode::ManagedBySystem,
1226        )
1227        .await
1228        .unwrap();
1229
1230        // Should receive supervision message sent from the periodic task
1231        // indicating the proc is alive.
1232        let msg = supervisor_supervision_receiver.recv().await;
1233        match msg.unwrap() {
1234            ProcSupervisionMessage::Update(state, port) => {
1235                assert_eq!(
1236                    state,
1237                    ProcSupervisionState {
1238                        world_id: local_world_id.clone(),
1239                        proc_addr: ChannelAddr::Local(3),
1240                        proc_id: local_proc_id.clone(),
1241                        proc_health: ProcStatus::Alive,
1242                        failed_actors: Vec::new(),
1243                    }
1244                );
1245                let _ = port.send(&supervisor, ());
1246            }
1247        }
1248
1249        // Spawn a root actor on the proc.
1250        let proc_actor_ref = bootstrap.proc_actor.bind();
1251        let test_actor_ref = spawn::<TestActor>(&supervisor, &proc_actor_ref, "test", &())
1252            .await
1253            .unwrap();
1254
1255        test_actor_ref
1256            .fail(&supervisor, "test actor is erroring out".to_string())
1257            .await
1258            .unwrap();
1259        // Since we could get messages from both the periodic task and the
1260        // report from the failed actor, we need to poll for a while to make
1261        // sure we get the right message.
1262        let result = RealClock
1263            .timeout(Duration::from_secs(5), async {
1264                loop {
1265                    match supervisor_supervision_receiver.recv().await {
1266                        Ok(ProcSupervisionMessage::Update(state, _port)) => {
1267                            match state.failed_actors.iter().find(|(failed_id, _)| {
1268                                failed_id == test_actor_ref.clone().actor_id()
1269                            }) {
1270                                Some((_, actor_status)) => return Ok(actor_status.clone()),
1271                                None => {}
1272                            }
1273                        }
1274                        _ => anyhow::bail!("unexpected message type"),
1275                    }
1276                }
1277            })
1278            .await;
1279        assert_matches!(
1280            result.unwrap().unwrap(),
1281            ActorStatus::Failed(msg) if msg.contains("test actor is erroring out")
1282        );
1283
1284        server_handle.stop().await.unwrap();
1285        server_handle.await;
1286    }
1287
1288    // Verify that the proc actor's ProcMessage port is bound properly so
1289    // that we can send messages to it through the system actor.
1290    #[tokio::test]
1291    async fn test_bind_proc_actor_in_bootstrap() {
1292        let server_handle = System::serve(
1293            ChannelAddr::any(ChannelTransport::Local),
1294            Duration::from_secs(10),
1295            Duration::from_secs(10),
1296        )
1297        .await
1298        .unwrap();
1299        let mut system = System::new(server_handle.local_addr().clone());
1300        let client = system.attach().await.unwrap();
1301
1302        let world_id = id!(world);
1303        let proc_id = world_id.proc_id(0);
1304        let bootstrap = ProcActor::bootstrap(
1305            proc_id,
1306            world_id,
1307            ChannelAddr::any(ChannelTransport::Local),
1308            server_handle.local_addr().clone(),
1309            Duration::from_secs(1),
1310            HashMap::new(),
1311            ProcLifecycleMode::ManagedBySystem,
1312        )
1313        .await
1314        .unwrap();
1315        let proc_actor_id = bootstrap.proc_actor.actor_id().clone();
1316        let proc_actor_ref = ActorRef::<ProcActor>::attest(proc_actor_id);
1317
1318        let res = RealClock
1319            .timeout(Duration::from_secs(5), proc_actor_ref.state(&client))
1320            .await;
1321        // If ProcMessage's static Named port is not bound, this test will fail
1322        // due to timeout.
1323        assert!(res.is_ok());
1324        assert_matches!(res.unwrap().unwrap(), ProcState::Joined);
1325        server_handle.stop().await.unwrap();
1326        server_handle.await;
1327    }
1328
1329    #[tokio::test]
1330    async fn test_proc_snapshot() {
1331        let Bootstrapped {
1332            server_handle,
1333            proc_actor_ref,
1334            comm_actor_ref,
1335            client,
1336            ..
1337        } = bootstrap().await;
1338
1339        // Spawn some actors on this proc.
1340        let root: ActorRef<TestActor> = spawn::<TestActor>(&client, &proc_actor_ref, "root", &())
1341            .await
1342            .unwrap();
1343        let another_root = spawn::<TestActor>(&client, &proc_actor_ref, "another_root", &())
1344            .await
1345            .unwrap();
1346        {
1347            let snapshot = proc_actor_ref.snapshot(&client).await.unwrap();
1348            assert_eq!(snapshot.state, ProcState::Joined);
1349            assert_eq!(
1350                snapshot.actors.roots.keys().collect::<HashSet<_>>(),
1351                hashset! {
1352                    proc_actor_ref.actor_id(),
1353                    comm_actor_ref.actor_id(),
1354                    root.actor_id(),
1355                    another_root.actor_id(),
1356                }
1357            );
1358        }
1359
1360        server_handle.stop().await.unwrap();
1361        server_handle.await;
1362    }
1363
1364    #[tokio::test]
1365    async fn test_undeliverable_message_return() {
1366        // Proc can't send a message to a remote actor because the
1367        // system connection is lost.
1368        use hyperactor::mailbox::Undeliverable;
1369        use hyperactor::test_utils::pingpong::PingPongActor;
1370        use hyperactor::test_utils::pingpong::PingPongMessage;
1371
1372        // Use temporary config for this test
1373        let config = hyperactor::config::global::lock();
1374        let _guard = config.override_key(
1375            hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1376            Duration::from_secs(1),
1377        );
1378
1379        // Serve a system.
1380        let server_handle = System::serve(
1381            ChannelAddr::any(ChannelTransport::Tcp),
1382            Duration::from_secs(120),
1383            Duration::from_secs(120),
1384        )
1385        .await
1386        .unwrap();
1387        let mut system = System::new(server_handle.local_addr().clone());
1388
1389        // Build a supervisor.
1390        let supervisor = system.attach().await.unwrap();
1391        let (sup_tx, _sup_rx) = supervisor.open_port::<ProcSupervisionMessage>();
1392        sup_tx.bind_to(ProcSupervisionMessage::port());
1393        let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
1394
1395        // Construct a system sender.
1396        let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1397            channel::dial(server_handle.local_addr().clone()).unwrap(),
1398        ));
1399
1400        // Construct a proc forwarder in terms of the system sender.
1401        let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1402        let proc_forwarder =
1403            BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1404
1405        // Bootstrap proc 'world[0]', join the system.
1406        let world_id = id!(world);
1407        let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1408        let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1409            proc_0.clone(),
1410            world_id.clone(),
1411            listen_addr,
1412            server_handle.local_addr().clone(),
1413            sup_ref.clone(),
1414            Duration::from_secs(120),
1415            HashMap::new(),
1416            ProcLifecycleMode::ManagedBySystem,
1417        )
1418        .await
1419        .unwrap();
1420        let proc_0_client = proc_0.attach("client").unwrap();
1421        let (proc_0_undeliverable_tx, mut proc_0_undeliverable_rx) = proc_0_client.open_port();
1422
1423        // Bootstrap a second proc 'world[1]', join the system.
1424        let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1425        let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1426            proc_1.clone(),
1427            world_id.clone(),
1428            ChannelAddr::any(ChannelTransport::Tcp),
1429            server_handle.local_addr().clone(),
1430            sup_ref.clone(),
1431            Duration::from_secs(120),
1432            HashMap::new(),
1433            ProcLifecycleMode::ManagedBySystem,
1434        )
1435        .await
1436        .unwrap();
1437        let proc_1_client = proc_1.attach("client").unwrap();
1438        let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port();
1439
1440        let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1441        // Spawn two actors 'ping' and 'pong' where 'ping' runs on
1442        // 'world[0]' and 'pong' on 'world[1]' (that is, not on the
1443        // same proc).
1444        let ping_handle = proc_0
1445            .spawn::<PingPongActor>("ping", ping_params)
1446            .await
1447            .unwrap();
1448        let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1449        let pong_handle = proc_1
1450            .spawn::<PingPongActor>("pong", pong_params)
1451            .await
1452            .unwrap();
1453
1454        // Now kill the system server making message delivery between
1455        // procs impossible.
1456        server_handle.stop().await.unwrap();
1457        server_handle.await;
1458
1459        let n = 100usize;
1460        for i in 1..(n + 1) {
1461            // Have 'ping' send 'pong' a message.
1462            let ttl = 66 + i as u64; // Avoid ttl = 66!
1463            let (once_handle, _) = proc_0_client.open_once_port::<bool>();
1464            ping_handle
1465                .send(PingPongMessage(ttl, pong_handle.bind(), once_handle.bind()))
1466                .unwrap();
1467        }
1468
1469        // `PingPongActor`s do not exit their message loop (a
1470        // non-default actor behavior) when they have an undelivered
1471        // message sent back to them (the reason being this very
1472        // test).
1473        assert!(matches!(*ping_handle.status().borrow(), ActorStatus::Idle));
1474
1475        // We expect n undelivered messages.
1476        let Ok(Undeliverable(envelope)) = proc_0_undeliverable_rx.recv().await else {
1477            unreachable!()
1478        };
1479        let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1480        let mut count = 1;
1481        while let Ok(Some(Undeliverable(envelope))) = proc_0_undeliverable_rx.try_recv() {
1482            // We care that every undeliverable message was accounted
1483            // for. We can't assume anything about their arrival
1484            // order.
1485            count += 1;
1486            let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1487        }
1488        assert!(count == n);
1489    }
1490
1491    #[tracing_test::traced_test]
1492    #[tokio::test]
1493    async fn test_proc_actor_mailbox_admin_message() {
1494        // Verify that proc actors update their address books on first
1495        // contact, and that no additional updates are triggered for
1496        // known procs.
1497
1498        use hyperactor::test_utils::pingpong::PingPongActor;
1499        use hyperactor::test_utils::pingpong::PingPongMessage;
1500
1501        // Serve a system.
1502        let server_handle = System::serve(
1503            ChannelAddr::any(ChannelTransport::Tcp),
1504            Duration::from_secs(120),
1505            Duration::from_secs(120),
1506        )
1507        .await
1508        .unwrap();
1509        let mut system = System::new(server_handle.local_addr().clone());
1510        let system_actor = server_handle.system_actor_handle();
1511        let system_client = system.attach().await.unwrap(); // world id: user
1512
1513        // Build a supervisor.
1514        let supervisor = system.attach().await.unwrap();
1515        let (sup_tx, _sup_rx) = supervisor.open_port::<ProcSupervisionMessage>();
1516        sup_tx.bind_to(ProcSupervisionMessage::port());
1517        let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
1518
1519        // Construct a system sender.
1520        let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1521            channel::dial(server_handle.local_addr().clone()).unwrap(),
1522        ));
1523
1524        // Construct a proc forwarder in terms of the system sender.
1525        let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1526        let proc_forwarder =
1527            BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1528
1529        // Bootstrap proc 'world[0]', join the system.
1530        let world_id = id!(world);
1531        let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1532        let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1533            proc_0.clone(),
1534            world_id.clone(),
1535            listen_addr,
1536            server_handle.local_addr().clone(),
1537            sup_ref.clone(),
1538            Duration::from_secs(120),
1539            HashMap::new(),
1540            ProcLifecycleMode::ManagedBySystem,
1541        )
1542        .await
1543        .unwrap();
1544        let proc_0_client = proc_0.attach("client").unwrap();
1545        let (proc_0_undeliverable_tx, _proc_0_undeliverable_rx) = proc_0_client.open_port();
1546
1547        // Bootstrap a second proc 'world[1]', join the system.
1548        let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1549        let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1550            proc_1.clone(),
1551            world_id.clone(),
1552            ChannelAddr::any(ChannelTransport::Tcp),
1553            server_handle.local_addr().clone(),
1554            sup_ref.clone(),
1555            Duration::from_secs(120),
1556            HashMap::new(),
1557            ProcLifecycleMode::ManagedBySystem,
1558        )
1559        .await
1560        .unwrap();
1561        let proc_1_client = proc_1.attach("client").unwrap();
1562        let (proc_1_undeliverable_tx, _proc_1_undeliverable_rx) = proc_1_client.open_port();
1563
1564        // Spawn two actors 'ping' and 'pong' where 'ping' runs on
1565        // 'world[0]' and 'pong' on 'world[1]' (that is, not on the
1566        // same proc).
1567        let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1568        let ping_handle = proc_0
1569            .spawn::<PingPongActor>("ping", ping_params)
1570            .await
1571            .unwrap();
1572        let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1573        let pong_handle = proc_1
1574            .spawn::<PingPongActor>("pong", pong_params)
1575            .await
1576            .unwrap();
1577
1578        // Have 'ping' send 'pong' a message.
1579        let ttl = 10u64; // Avoid ttl = 66!
1580        let (once_tx, once_rx) = system_client.open_once_port::<bool>();
1581        ping_handle
1582            .send(PingPongMessage(ttl, pong_handle.bind(), once_tx.bind()))
1583            .unwrap();
1584
1585        assert!(once_rx.recv().await.unwrap());
1586
1587        // Ping gets Pong's address
1588        let expected_1 = r#"UpdateAddress {
1589    proc_id: Ranked(
1590        WorldId(
1591            "world",
1592        ),
1593        1,
1594    ),
1595    addr: Tcp("#;
1596
1597        // Pong gets Ping's address
1598        let expected_2 = r#"UpdateAddress {
1599    proc_id: Ranked(
1600        WorldId(
1601            "world",
1602        ),
1603        0,
1604    ),
1605    addr: Tcp("#;
1606
1607        // Ping gets "user"'s address
1608        let expected_3 = r#"UpdateAddress {
1609    proc_id: Ranked(
1610        WorldId(
1611            "user",
1612        ),"#;
1613
1614        logs_assert(|logs| {
1615            let log_body = logs.join("\n");
1616
1617            let pattern = Regex::new(r"(?m)^UpdateAddress \{\n(?:.*\n)*?^\}").unwrap();
1618            let count = pattern.find_iter(&log_body).count();
1619
1620            if count != 3 {
1621                return Err(format!(
1622                    "expected 3 UpdateAddress messages, found {}",
1623                    count
1624                ));
1625            }
1626
1627            if !log_body.contains(expected_1) {
1628                return Err("missing expected update for proc_id 1".into());
1629            }
1630            if !log_body.contains(expected_2) {
1631                return Err("missing expected update for proc_id 0".into());
1632            }
1633            if !log_body.contains(expected_3) {
1634                return Err("missing expected update for proc_id user".into());
1635            }
1636
1637            Ok(())
1638        });
1639
1640        let (once_tx, once_rx) = system_client.open_once_port::<()>();
1641        assert_matches!(
1642            system_actor
1643                .stop(
1644                    /*unused*/ &system_client,
1645                    None,
1646                    Duration::from_secs(1),
1647                    once_tx.bind()
1648                )
1649                .await,
1650            Ok(())
1651        );
1652        assert_matches!(once_rx.recv().await.unwrap(), ());
1653    }
1654
1655    #[tokio::test]
1656    async fn test_update_address_book_cache() {
1657        let server_handle = System::serve(
1658            ChannelAddr::any(ChannelTransport::Tcp),
1659            Duration::from_secs(2), // supervision update timeout
1660            Duration::from_secs(2), // duration to evict an unhealthy world
1661        )
1662        .await
1663        .unwrap();
1664        let system_addr = server_handle.local_addr().clone();
1665        let mut system = System::new(system_addr.clone());
1666
1667        let system_client = system.attach().await.unwrap();
1668
1669        // Spawn ping and pong actors to play a ping pong game.
1670        let ping_actor_id = id!(world[0].ping[0]);
1671        let (ping_actor_ref, _ping_proc_ref) =
1672            spawn_actor(&system_client, &ping_actor_id, &system_addr).await;
1673
1674        let pong_actor_id = id!(world[1].pong[0]);
1675        let (pong_actor_ref, pong_proc_ref) =
1676            spawn_actor(&system_client, &pong_actor_id, &system_addr).await;
1677
1678        // After playing the first round game, ping and pong actors has each other's
1679        // ChannelAddr cached in their procs' mailboxes, respectively.
1680        let (done_tx, done_rx) = system_client.open_once_port();
1681        let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1682        ping_actor_ref
1683            .send(&system_client, ping_pong_message)
1684            .unwrap();
1685        assert!(done_rx.recv().await.unwrap());
1686
1687        // Now we kill and respawn the pong actor so it's ChannelAddr is changed.
1688        let ProcStopResult { actors_aborted, .. } = pong_proc_ref
1689            .stop(&system_client, Duration::from_secs(1))
1690            .await
1691            .unwrap();
1692        assert_eq!(1, actors_aborted);
1693        let (pong_actor_ref, _pong_proc_ref) =
1694            spawn_actor(&system_client, &pong_actor_id, &system_addr).await;
1695
1696        // Now we expect to play the game between ping and new pong. The new pong has the same
1697        // proc ID as the old pong but different ChannelAddr. The game should still be playable
1698        // with system actor updating the cached address of Pong inside Ping's mailbox.
1699        let (done_tx, done_rx) = system_client.open_once_port();
1700        let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1701        ping_actor_ref
1702            .send(&system_client, ping_pong_message)
1703            .unwrap();
1704        assert!(done_rx.recv().await.unwrap());
1705    }
1706
1707    async fn spawn_actor(
1708        cx: &impl context::Actor,
1709        actor_id: &ActorId,
1710        system_addr: &ChannelAddr,
1711    ) -> (ActorRef<PingPongActor>, ActorRef<ProcActor>) {
1712        let listen_addr = ChannelAddr::any(ChannelTransport::Tcp);
1713        let bootstrap = ProcActor::bootstrap(
1714            actor_id.proc_id().clone(),
1715            actor_id
1716                .proc_id()
1717                .world_id()
1718                .expect("proc must be ranked for bootstrap world_id")
1719                .clone(),
1720            listen_addr.clone(),
1721            system_addr.clone(),
1722            Duration::from_secs(3),
1723            HashMap::new(),
1724            ProcLifecycleMode::ManagedBySystem,
1725        )
1726        .await
1727        .unwrap();
1728        let (undeliverable_msg_tx, _) = cx.mailbox().open_port();
1729        let params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
1730        let actor_ref = spawn::<PingPongActor>(
1731            cx,
1732            &bootstrap.proc_actor.bind(),
1733            &actor_id.to_string(),
1734            &params,
1735        )
1736        .await
1737        .unwrap();
1738        let proc_actor_ref = bootstrap.proc_actor.bind();
1739        (actor_ref, proc_actor_ref)
1740    }
1741}