hyperactor_multiprocess/
proc_actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Proc actor manages a proc. It works in conjunction with a
10//! [`super::system_actor::SystemActor`]. Proc actors are usually spawned
11//! as the "agent" to manage a proc directly.
12
13use core::fmt;
14use std::collections::HashMap;
15use std::process::Stdio;
16use std::time::Duration;
17use std::time::SystemTime;
18
19use async_trait::async_trait;
20use hyperactor::Actor;
21use hyperactor::Context;
22use hyperactor::Data;
23use hyperactor::HandleClient;
24use hyperactor::Handler;
25use hyperactor::Instance;
26use hyperactor::Named;
27use hyperactor::OncePortRef;
28use hyperactor::PortRef;
29use hyperactor::RefClient;
30use hyperactor::RemoteMessage;
31use hyperactor::WorldId;
32use hyperactor::actor::ActorErrorKind;
33use hyperactor::actor::ActorHandle;
34use hyperactor::actor::ActorStatus;
35use hyperactor::actor::Referable;
36use hyperactor::actor::remote::Remote;
37use hyperactor::channel;
38use hyperactor::channel::ChannelAddr;
39use hyperactor::clock::Clock;
40use hyperactor::clock::ClockKind;
41use hyperactor::context;
42use hyperactor::mailbox::BoxedMailboxSender;
43use hyperactor::mailbox::DialMailboxRouter;
44use hyperactor::mailbox::MailboxAdminMessage;
45use hyperactor::mailbox::MailboxAdminMessageHandler;
46use hyperactor::mailbox::MailboxClient;
47use hyperactor::mailbox::MailboxServer;
48use hyperactor::mailbox::MailboxServerHandle;
49use hyperactor::mailbox::open_port;
50use hyperactor::proc::ActorLedgerSnapshot;
51use hyperactor::proc::Proc;
52use hyperactor::reference::ActorId;
53use hyperactor::reference::ActorRef;
54use hyperactor::reference::Index;
55use hyperactor::reference::ProcId;
56use hyperactor::supervision::ActorSupervisionEvent;
57use hyperactor_mesh::comm::CommActor;
58use serde::Deserialize;
59use serde::Serialize;
60use tokio::process::Command;
61use tokio::sync::watch;
62use tokio_retry::strategy::jitter;
63
64use crate::pyspy::PySpyTrace;
65use crate::pyspy::py_spy;
66use crate::supervision::ProcStatus;
67use crate::supervision::ProcSupervisionMessageClient;
68use crate::supervision::ProcSupervisionState;
69use crate::supervision::ProcSupervisor;
70use crate::system_actor::ProcLifecycleMode;
71use crate::system_actor::SYSTEM_ACTOR_REF;
72use crate::system_actor::SystemActor;
73use crate::system_actor::SystemMessageClient;
74
75static HYPERACTOR_WORLD_ID: &str = "HYPERACTOR_WORLD_ID";
76static HYPERACTOR_PROC_ID: &str = "HYPERACTOR_PROC_ID";
77static HYPERACTOR_BOOTSTRAP_ADDR: &str = "HYPERACTOR_BOOTSTRAP_ADDR";
78static HYPERACTOR_WORLD_SIZE: &str = "HYPERACTOR_WORLD_SIZE";
79static HYPERACTOR_RANK: &str = "HYPERACTOR_RANK";
80static HYPERACTOR_LOCAL_RANK: &str = "HYPERACTOR_LOCAL_RANK";
81
82/// All setup parameters for an actor within a proc actor.
83#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
84pub enum Environment {
85    /// The actor is spawned within the proc actor.
86    Local,
87
88    /// Spawn the actor in a separate proc.
89    /// The program is the executable that will be spawned
90    /// by the host actor. The program will operate under 3
91    /// environment variables: ${HYPERACTOR_PROC_ID}: the proc id
92    /// of the proc actor being spawned,
93    /// ${HYPERACTOR_BOOTSTRAP_ADDR}: the address of the system actor and
94    /// ${HYPERACTOR_WORLD_SIZE}: the world size the proc is a part of.
95    /// This is often useful where a actor needs to know the side of the
96    /// world like worker using nccl comms.
97    Exec {
98        /// The program to run in order to spawn the actor.
99        program: String,
100    },
101}
102
103/// The state of the proc.
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
105pub enum ProcState {
106    /// The proc is waiting to the join the system.
107    AwaitingJoin,
108
109    /// The proc has joined the system.
110    Joined,
111}
112
113impl fmt::Display for ProcState {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        match self {
116            Self::AwaitingJoin => write!(f, "AwaitingJoin"),
117            Self::Joined => write!(f, "Joined"),
118        }
119    }
120}
121
122/// The result after stopping the proc.
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
124pub struct ProcStopResult {
125    /// The proc being stopped.
126    pub proc_id: ProcId,
127    /// The number of actors observed to stop.
128    pub actors_stopped: usize,
129    /// The number of proc actors that were aborted.
130    pub actors_aborted: usize,
131}
132
133/// A snapshot of the proc.
134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
135pub struct ProcSnapshot {
136    /// The state of the proc.
137    pub state: ProcState,
138    /// The snapshot of the actors in the proc.
139    pub actors: ActorLedgerSnapshot,
140}
141
142/// Remote py-spy dump configuration.
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
144pub enum PySpyConfig {
145    /// Nonblocking dump for only python frames.
146    NonBlocking,
147    /// Blocking dump. Specifies whether native threads are included,
148    /// and if so, whether those threads should also include native stack frames.
149    Blocking {
150        /// Dump native stack frames.
151        native: Option<bool>,
152        /// Dump stack frames for native threads. Implies native.
153        native_all: Option<bool>,
154    },
155}
156
157/// A stack trace of the proc.
158/// Wrapper to dervice Named.
159#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Named)]
160pub struct StackTrace {
161    /// The stack trace.
162    pub trace: PySpyTrace,
163}
164
165/// Proc management messages.
166#[derive(
167    Handler,
168    HandleClient,
169    RefClient,
170    Debug,
171    Serialize,
172    Deserialize,
173    Clone,
174    PartialEq,
175    Named
176)]
177pub enum ProcMessage {
178    /// Indicate that the proc has joined the system. This is sent by
179    /// the system actor when the proc has been registered and is ready
180    /// to receive instructions.
181    #[log_level(debug)]
182    Joined(),
183
184    /// Retrieve the state of the proc.
185    #[log_level(debug)]
186    State {
187        /// Used to return the result of the caller.
188        #[reply]
189        ret: OncePortRef<ProcState>,
190    },
191
192    /// Spawn an actor on the proc to the provided name.
193    Spawn {
194        /// registered actor type
195        actor_type: String,
196        /// spawned actor name
197        actor_name: String,
198        /// serialized parameters
199        params_data: Data,
200        /// reply port; the proc should send its rank to indicate a spawned actor
201        status_port: PortRef<Index>,
202    },
203
204    /// Spawn a set of proc actors in the specified world (never its own)
205    SpawnProc {
206        /// Spawn the proc locally or in a separate program.
207        env: Environment,
208        /// The world into which to spawn the procs.
209        world_id: WorldId,
210        /// The proc ids of the procs to spawn.
211        proc_ids: Vec<ProcId>,
212        /// The total number of procs in the specified world.
213        world_size: usize,
214    },
215
216    /// Self message to trigger a supervision update to the system actor.
217    #[log_level(debug)]
218    UpdateSupervision(),
219
220    /// Stop the proc. Returns a pair of counts:
221    /// - the number of actors observed to stop;
222    /// - the number of proc actors not observed to stop.
223    ///
224    /// There will be at least one actor (the proc-actor itself) that
225    /// will be not observed to stop. If there is more than one
226    /// timeouts are indicated.
227    Stop {
228        /// The duration to wait for an actor to report status `Stopped`.
229        timeout: Duration,
230        /// Used to return the result to the caller.
231        #[reply]
232        reply_to: OncePortRef<ProcStopResult>,
233    },
234
235    /// Return a snapshot view of this proc. Used for debugging.
236    #[log_level(debug)]
237    Snapshot {
238        /// Used to return the result of the caller.
239        #[reply]
240        reply_to: OncePortRef<ProcSnapshot>,
241    },
242
243    /// Get the proc local addr.
244    LocalAddr {
245        /// Used to return the result of the caller.
246        #[reply]
247        reply_to: OncePortRef<ChannelAddr>,
248    },
249
250    /// Run pyspy on the current proc and return the stack trace.
251    #[log_level(debug)]
252    PySpyDump {
253        /// Dump config.
254        config: PySpyConfig,
255        /// Used to return the result of the caller.
256        #[reply]
257        reply_to: OncePortRef<StackTrace>,
258    },
259}
260
261/// Parameters for managing the proc.
262#[derive(Debug, Clone)]
263pub struct ProcActorParams {
264    /// The proc that is managed by this actor.
265    pub proc: Proc,
266
267    /// The system world to which this proc belongs.
268    pub world_id: WorldId,
269
270    /// Reference to the system actor that is managing this proc.
271    pub system_actor_ref: ActorRef<SystemActor>,
272
273    /// The channel address used to communicate with the system actor
274    /// that manages this proc. This is passed through so that the proc
275    /// can spawn sibling procs.
276    pub bootstrap_channel_addr: ChannelAddr,
277
278    /// The local address of this proc actor. This is used by
279    /// the proc actor to register the proc with the system actor.
280    pub local_addr: ChannelAddr,
281
282    /// Watch into which the proc's state is published.
283    pub state_watch: watch::Sender<ProcState>,
284
285    /// Reference to supervisor.
286    pub supervisor_actor_ref: ActorRef<ProcSupervisor>,
287
288    /// Interval of reporting supervision status to the system actor.
289    pub supervision_update_interval: Duration,
290
291    /// Arbitrary labels for the proc. They can be used later to query
292    /// proc(s) using system snapshot api.
293    pub labels: HashMap<String, String>,
294
295    /// Proc lifecycle management mode.
296    ///
297    /// If a proc is not managed
298    ///   * it will not be stopped when system shutdowns.
299    ///   * it will not be captured by system snapshot.
300    ///
301    /// Not being managed is useful for procs that runs on the client side,
302    /// which might need to stay around for a while after the system is gone.
303    pub lifecycle_mode: ProcLifecycleMode,
304}
305
306/// Outputs from bootstrapping proc.
307#[derive(Debug)]
308pub struct BootstrappedProc {
309    /// Handle to proc actor.
310    pub proc_actor: ActorHandle<ProcActor>,
311    /// Handle to comm actor.
312    pub comm_actor: ActorHandle<CommActor>,
313    /// Mailbox for address served by proc actor.
314    pub mailbox: MailboxServerHandle,
315}
316
317/// ProcActor manages a single proc. It is responsible for managing
318/// the lifecycle of all of the proc's actors, and to route messages
319/// accordingly.
320#[derive(Debug)]
321#[hyperactor::export(
322    handlers = [
323        ProcMessage,
324        MailboxAdminMessage,
325    ],
326)]
327pub struct ProcActor {
328    params: ProcActorParams,
329    state: ProcState,
330    remote: Remote,
331    last_successful_supervision_update: SystemTime,
332}
333
334impl ProcActor {
335    /// Bootstrap a proc actor with the provided proc id. The bootstrapped proc
336    /// actor will use the provided listen address to serve its mailbox, while
337    /// the bootstrap address is used to register with the system actor.
338    #[hyperactor::instrument]
339    pub async fn bootstrap(
340        proc_id: ProcId,
341        world_id: WorldId,
342        listen_addr: ChannelAddr,
343        bootstrap_addr: ChannelAddr,
344        supervision_update_interval: Duration,
345        labels: HashMap<String, String>,
346        lifecycle_mode: ProcLifecycleMode,
347    ) -> Result<BootstrappedProc, anyhow::Error> {
348        let system_supervision_ref: ActorRef<ProcSupervisor> =
349            ActorRef::attest(SYSTEM_ACTOR_REF.actor_id().clone());
350
351        Self::try_bootstrap(
352            proc_id.clone(),
353            world_id.clone(),
354            listen_addr.clone(),
355            bootstrap_addr.clone(),
356            system_supervision_ref.clone(),
357            supervision_update_interval,
358            labels,
359            lifecycle_mode,
360        )
361        .await
362        .inspect_err(|err| {
363            tracing::error!(
364                "bootstrap {} {} {} {}: {}",
365                proc_id,
366                world_id,
367                listen_addr,
368                bootstrap_addr,
369                err
370            );
371        })
372    }
373
374    /// Attempt to bootstrap a proc actor with the provided proc id. The bootstrapped proc
375    /// actor will use the provided listen address to serve its mailbox, while
376    /// the bootstrap address is used to register with the system actor.
377    #[hyperactor::instrument]
378    pub async fn try_bootstrap(
379        proc_id: ProcId,
380        world_id: WorldId,
381        listen_addr: ChannelAddr,
382        bootstrap_addr: ChannelAddr,
383        supervisor_actor_ref: ActorRef<ProcSupervisor>,
384        supervision_update_interval: Duration,
385        labels: HashMap<String, String>,
386        lifecycle_mode: ProcLifecycleMode,
387    ) -> Result<BootstrappedProc, anyhow::Error> {
388        let system_sender =
389            BoxedMailboxSender::new(MailboxClient::new(channel::dial(bootstrap_addr.clone())?));
390        let clock = ClockKind::for_channel_addr(&listen_addr);
391
392        let proc_forwarder =
393            BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
394        let proc = Proc::new_with_clock(proc_id.clone(), proc_forwarder, clock);
395        Self::bootstrap_for_proc(
396            proc,
397            world_id,
398            listen_addr,
399            bootstrap_addr,
400            supervisor_actor_ref,
401            supervision_update_interval,
402            labels,
403            lifecycle_mode,
404        )
405        .await
406    }
407
408    /// Bootstrap a proc actor with the provided proc. The bootstrapped proc actor
409    /// will use the provided listen address to serve its mailbox, while the bootstrap
410    /// address is used to register with the system actor.
411    #[hyperactor::instrument]
412    pub async fn bootstrap_for_proc(
413        proc: Proc,
414        world_id: WorldId,
415        listen_addr: ChannelAddr,
416        bootstrap_addr: ChannelAddr,
417        supervisor_actor_ref: ActorRef<ProcSupervisor>,
418        supervision_update_interval: Duration,
419        labels: HashMap<String, String>,
420        lifecycle_mode: ProcLifecycleMode,
421    ) -> Result<BootstrappedProc, anyhow::Error> {
422        let (local_addr, rx) = channel::serve(listen_addr)?;
423        let mailbox_handle = proc.clone().serve(rx);
424        let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin);
425
426        let handle = match proc
427            .clone()
428            .spawn::<Self>(
429                "proc",
430                ProcActorParams {
431                    proc: proc.clone(),
432                    world_id: world_id.clone(),
433                    system_actor_ref: SYSTEM_ACTOR_REF.clone(),
434                    bootstrap_channel_addr: bootstrap_addr,
435                    local_addr,
436                    state_watch: state_tx,
437                    supervisor_actor_ref,
438                    supervision_update_interval,
439                    labels,
440                    lifecycle_mode,
441                },
442            )
443            .await
444        {
445            Ok(handle) => handle,
446            Err(e) => {
447                Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
448                return Err(e);
449            }
450        };
451
452        let comm_actor = match proc
453            .clone()
454            .spawn::<CommActor>("comm", Default::default())
455            .await
456        {
457            Ok(handle) => handle,
458            Err(e) => {
459                Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
460                return Err(e);
461            }
462        };
463        comm_actor.bind::<CommActor>();
464
465        loop {
466            let proc_state = state_rx.borrow_and_update().clone();
467            tracing::info!("{}: state: {:?}", &proc.proc_id(), proc_state);
468            if matches!(proc_state, ProcState::Joined) {
469                break;
470            }
471            match state_rx.changed().await {
472                Ok(_) => {}
473                Err(e) => {
474                    Self::failed_proc_bootstrap_cleanup(mailbox_handle).await;
475                    return Err(e.into());
476                }
477            }
478        }
479
480        proc.set_supervision_coordinator(handle.port::<ActorSupervisionEvent>())?;
481
482        Ok(BootstrappedProc {
483            proc_actor: handle,
484            mailbox: mailbox_handle,
485            comm_actor,
486        })
487    }
488
489    /// Shutdown the mailbox server to free up rx and its cooresponding listen address.
490    /// Because in the next bootstrap attempt, the same listen address will be used.
491    async fn failed_proc_bootstrap_cleanup(mailbox_handle: MailboxServerHandle) {
492        mailbox_handle.stop("failed proc bootstrap cleanup");
493        if let Err(shutdown_err) = mailbox_handle.await {
494            // Ignore the shutdown error and populate the original error.
495            tracing::error!(
496                "error shutting down during a failed bootstrap attempt: {}",
497                shutdown_err
498            );
499        }
500    }
501}
502
503#[async_trait]
504impl Actor for ProcActor {
505    type Params = ProcActorParams;
506
507    async fn new(params: ProcActorParams) -> Result<Self, anyhow::Error> {
508        let last_successful_supervision_update = params.proc.clock().system_time_now();
509        Ok(Self {
510            params,
511            state: ProcState::AwaitingJoin,
512            remote: Remote::collect(),
513            last_successful_supervision_update,
514        })
515    }
516
517    async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
518        // Bind ports early so that when the proc actor joins, it can serve.
519        this.bind::<Self>();
520
521        // Join the system.
522        self.params
523            .system_actor_ref
524            .join(
525                this,
526                /*world_id=*/ self.params.world_id.clone(),
527                /*proc_id=*/ self.params.proc.proc_id().clone(),
528                /*proc_message_port=*/ this.port().bind(),
529                /*proc_addr=*/ self.params.local_addr.clone(),
530                self.params.labels.clone(),
531                self.params.lifecycle_mode.clone(),
532            )
533            .await?;
534
535        // Trigger supervision status update
536        // TODO: let the system actor determine/update the supervision interval.
537        // Maybe by returning it from the join call, or defining some other proc
538        // message to adjust it.
539        if self.params.supervision_update_interval > Duration::from_secs(0)
540            && self.params.lifecycle_mode == ProcLifecycleMode::ManagedBySystem
541        {
542            this.self_message_with_delay(
543                ProcMessage::UpdateSupervision(),
544                self.params.supervision_update_interval,
545            )?;
546        }
547
548        Ok(())
549    }
550}
551
552impl ProcActor {
553    /// This proc's rank in the world.
554    fn rank(&self) -> Index {
555        self.params
556            .proc
557            .proc_id()
558            .rank()
559            .expect("proc must be ranked")
560    }
561}
562
563#[hyperactor::forward(MailboxAdminMessage)]
564#[async_trait]
565impl MailboxAdminMessageHandler for ProcActor {
566    async fn update_address(
567        &mut self,
568        cx: &Context<Self>,
569        proc_id: ProcId,
570        addr: ChannelAddr,
571    ) -> Result<(), anyhow::Error> {
572        tracing::trace!(
573            "received address update:\n{:#?}",
574            MailboxAdminMessage::UpdateAddress {
575                proc_id: proc_id.clone(),
576                addr: addr.clone()
577            }
578        );
579        let forwarder = cx.proc().forwarder();
580        if let Some(router) = forwarder.downcast_ref::<DialMailboxRouter>() {
581            router.bind(proc_id.into(), addr);
582        } else {
583            tracing::warn!(
584                "proc {} received update_address but does not use a DialMailboxRouter",
585                cx.proc().proc_id()
586            );
587        }
588
589        Ok(())
590    }
591}
592
593#[async_trait]
594#[hyperactor::forward(ProcMessage)]
595impl ProcMessageHandler for ProcActor {
596    async fn joined(&mut self, _cx: &Context<Self>) -> Result<(), anyhow::Error> {
597        self.state = ProcState::Joined;
598        let _ = self.params.state_watch.send(self.state.clone());
599        Ok(())
600    }
601
602    async fn state(&mut self, _cx: &Context<Self>) -> Result<ProcState, anyhow::Error> {
603        Ok(self.state.clone())
604    }
605
606    async fn spawn(
607        &mut self,
608        cx: &Context<Self>,
609        actor_type: String,
610        actor_name: String,
611        params_data: Data,
612        status_port: PortRef<Index>,
613    ) -> Result<(), anyhow::Error> {
614        let _actor_id = self
615            .remote
616            .gspawn(&self.params.proc, &actor_type, &actor_name, params_data)
617            .await?;
618
619        // Signal that the actor has joined:
620        status_port.send(cx, self.rank())?;
621        Ok(())
622    }
623
624    async fn spawn_proc(
625        &mut self,
626        _cx: &Context<Self>,
627        env: Environment,
628        world_id: WorldId,
629        proc_ids: Vec<ProcId>,
630        world_size: usize,
631    ) -> Result<(), anyhow::Error> {
632        for (index, proc_id) in proc_ids.into_iter().enumerate() {
633            let proc_world_id = proc_id
634                .world_id()
635                .expect("proc must be ranked for world_id access")
636                .clone();
637            // Check world id isn't the same as this proc's world id.
638            if &proc_world_id
639                == self
640                    .params
641                    .proc
642                    .proc_id()
643                    .world_id()
644                    .expect("proc must be ranked for world_id access")
645                || &world_id
646                    == self
647                        .params
648                        .proc
649                        .proc_id()
650                        .world_id()
651                        .expect("proc must be ranked for world_id access")
652            {
653                return Err(anyhow::anyhow!(
654                    "cannot spawn proc in same world {}",
655                    proc_world_id
656                ));
657            }
658            match env {
659                Environment::Local => {
660                    ProcActor::bootstrap(
661                        proc_id,
662                        world_id.clone(),
663                        ChannelAddr::any(self.params.bootstrap_channel_addr.transport()),
664                        self.params.bootstrap_channel_addr.clone(),
665                        self.params.supervision_update_interval,
666                        HashMap::new(),
667                        ProcLifecycleMode::ManagedBySystem,
668                    )
669                    .await?;
670                }
671                Environment::Exec { ref program } => {
672                    tracing::info!("spawning proc {} with program {}", proc_id, program);
673                    let mut child = Command::new(program);
674                    let _ = child
675                        .env(HYPERACTOR_WORLD_ID, world_id.to_string())
676                        .env(HYPERACTOR_PROC_ID, proc_id.to_string())
677                        .env(
678                            HYPERACTOR_BOOTSTRAP_ADDR,
679                            self.params.bootstrap_channel_addr.to_string(),
680                        )
681                        .env(HYPERACTOR_WORLD_SIZE, world_size.to_string())
682                        .env(
683                            HYPERACTOR_RANK,
684                            proc_id
685                                .rank()
686                                .expect("proc must be ranked for rank env var")
687                                .to_string(),
688                        )
689                        .env(HYPERACTOR_LOCAL_RANK, index.to_string())
690                        .stdin(Stdio::null())
691                        .stdout(Stdio::inherit())
692                        .stderr(Stdio::inherit())
693                        .spawn()?;
694                }
695            }
696        }
697        Ok(())
698    }
699
700    async fn update_supervision(&mut self, cx: &Context<Self>) -> Result<(), anyhow::Error> {
701        // Delay for next supervision update with some jitter.
702        let delay = jitter(self.params.supervision_update_interval);
703
704        // Only start updating supervision after the proc is joined.
705        if self.state != ProcState::Joined {
706            cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
707            return Ok(());
708        }
709
710        let msg = ProcSupervisionState {
711            world_id: self.params.world_id.clone(),
712            proc_id: self.params.proc.proc_id().clone(),
713            proc_addr: self.params.local_addr.clone(),
714            proc_health: ProcStatus::Alive,
715            failed_actors: Vec::new(),
716        };
717
718        match cx
719            .clock()
720            .timeout(
721                // TODO: make the timeout configurable
722                Duration::from_secs(10),
723                self.params.supervisor_actor_ref.update(cx, msg),
724            )
725            .await
726        {
727            Ok(_) => {
728                self.last_successful_supervision_update = cx.clock().system_time_now();
729            }
730            Err(_) => {}
731        }
732
733        let supervision_staleness = self
734            .last_successful_supervision_update
735            .elapsed()
736            .unwrap_or_default();
737        // Timeout when there are 3 consecutive supervision updates that fail.
738        // TODO: make number of failed updates configurable.
739        if supervision_staleness > 5 * self.params.supervision_update_interval {
740            tracing::error!(
741                "system actor isn't responsive to supervision update, stopping the proc"
742            );
743            // System actor is not responsive to supervision update, it is likely dead. Stop this proc.
744            // TODO: make the timeout configurable
745            self.stop(cx, Duration::from_secs(5)).await?;
746        } else {
747            // Schedule the next supervision update with some jitter.
748            let delay = jitter(self.params.supervision_update_interval);
749            cx.self_message_with_delay(ProcMessage::UpdateSupervision(), delay)?;
750        }
751
752        Ok(())
753    }
754
755    async fn stop(
756        &mut self,
757        cx: &Context<Self>,
758        timeout: Duration,
759    ) -> Result<ProcStopResult, anyhow::Error> {
760        tracing::info!("stopping proc {}", self.params.proc.proc_id());
761        self.params
762            .proc
763            .destroy_and_wait(timeout, Some(cx))
764            .await
765            .map(|(stopped, aborted)| {
766                tracing::info!("stopped proc {}", self.params.proc.proc_id());
767                ProcStopResult {
768                    proc_id: self.params.proc.proc_id().clone(),
769                    actors_stopped: stopped.len(),
770                    actors_aborted: aborted.len(),
771                }
772            })
773    }
774
775    async fn snapshot(&mut self, _cx: &Context<Self>) -> Result<ProcSnapshot, anyhow::Error> {
776        let state = self.state.clone();
777        let actors = self.params.proc.ledger_snapshot();
778        Ok(ProcSnapshot { state, actors })
779    }
780
781    async fn local_addr(&mut self, _cx: &Context<Self>) -> Result<ChannelAddr, anyhow::Error> {
782        Ok(self.params.local_addr.clone())
783    }
784
785    async fn py_spy_dump(
786        &mut self,
787        _cx: &Context<Self>,
788        config: PySpyConfig,
789    ) -> Result<StackTrace, anyhow::Error> {
790        let pid = std::process::id() as i32;
791        tracing::info!(
792            "running py-spy on proc {}, process id: {}",
793            self.params.proc.proc_id(),
794            pid
795        );
796        let trace = match config {
797            PySpyConfig::Blocking { native, native_all } => {
798                py_spy(
799                    pid,
800                    native.unwrap_or_default(),
801                    native_all.unwrap_or_default(),
802                    true,
803                )
804                .await?
805            }
806            PySpyConfig::NonBlocking => py_spy(pid, false, false, false).await?,
807        };
808        Ok(StackTrace { trace })
809    }
810}
811
812#[async_trait]
813impl Handler<ActorSupervisionEvent> for ProcActor {
814    async fn handle(
815        &mut self,
816        cx: &Context<Self>,
817        event: ActorSupervisionEvent,
818    ) -> anyhow::Result<()> {
819        let actor_id = event.actor_id.clone();
820        let status = match event.actor_status {
821            ActorStatus::Failed(_) => {
822                ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(event)))
823            }
824            status => status,
825        };
826        let message = ProcSupervisionState {
827            world_id: self.params.world_id.clone(),
828            proc_id: self.params.proc.proc_id().clone(),
829            proc_addr: self.params.local_addr.clone(),
830            proc_health: ProcStatus::Alive,
831            failed_actors: Vec::from([(actor_id, status)]),
832        };
833        self.params.supervisor_actor_ref.update(cx, message).await?;
834        Ok(())
835    }
836}
837
838/// Convenience utility to spawn an actor on a proc. Spawn returns
839/// with the new ActorRef on success.
840pub async fn spawn<A: Actor + Referable>(
841    cx: &impl context::Actor,
842    proc_actor: &ActorRef<ProcActor>,
843    actor_name: &str,
844    params: &A::Params,
845) -> Result<ActorRef<A>, anyhow::Error>
846where
847    A::Params: RemoteMessage,
848{
849    let remote = Remote::collect();
850    let (spawned_port, mut spawned_receiver) = open_port(cx);
851    let ActorId(proc_id, _, _) = (*proc_actor).clone().into();
852
853    proc_actor
854        .spawn(
855            cx,
856            remote
857                .name_of::<A>()
858                .ok_or(anyhow::anyhow!("actor not registered"))?
859                .into(),
860            actor_name.into(),
861            bincode::serialize(params)?,
862            spawned_port.bind(),
863        )
864        .await?;
865
866    // Wait for the spawned actor to join.
867    while spawned_receiver.recv().await?
868        != proc_id
869            .rank()
870            .expect("proc must be ranked for rank comparison")
871    {}
872
873    // Gspawned actors are always exported.
874    Ok(ActorRef::attest(proc_id.actor_id(actor_name, 0)))
875}
876
877#[cfg(test)]
878mod tests {
879    use std::assert_matches::assert_matches;
880    use std::collections::HashSet;
881    use std::time::Duration;
882
883    use hyperactor::actor::ActorStatus;
884    use hyperactor::channel;
885    use hyperactor::channel::ChannelAddr;
886    use hyperactor::channel::ChannelTransport;
887    use hyperactor::channel::TcpMode;
888    use hyperactor::clock::Clock;
889    use hyperactor::clock::RealClock;
890    use hyperactor::forward;
891    use hyperactor::id;
892    use hyperactor::reference::ActorRef;
893    use hyperactor::test_utils::pingpong::PingPongActor;
894    use hyperactor::test_utils::pingpong::PingPongActorParams;
895    use hyperactor::test_utils::pingpong::PingPongMessage;
896    use maplit::hashset;
897    use rand::Rng;
898    use rand::distributions::Alphanumeric;
899    use regex::Regex;
900
901    use super::*;
902    use crate::supervision::ProcSupervisionMessage;
903    use crate::system::ServerHandle;
904    use crate::system::System;
905
906    const MAX_WAIT_TIME: Duration = Duration::new(10, 0);
907
908    struct Bootstrapped {
909        server_handle: ServerHandle,
910        proc_actor_ref: ActorRef<ProcActor>,
911        comm_actor_ref: ActorRef<CommActor>,
912        client: Instance<()>,
913    }
914
915    async fn bootstrap() -> Bootstrapped {
916        let server_handle = System::serve(
917            ChannelAddr::any(ChannelTransport::Local),
918            Duration::from_secs(10),
919            Duration::from_secs(10),
920        )
921        .await
922        .unwrap();
923
924        let world_id = id!(world);
925        let proc_id = world_id.proc_id(0);
926        let bootstrap = ProcActor::bootstrap(
927            proc_id,
928            world_id,
929            ChannelAddr::any(ChannelTransport::Local),
930            server_handle.local_addr().clone(),
931            Duration::from_secs(1),
932            HashMap::new(),
933            ProcLifecycleMode::ManagedBySystem,
934        )
935        .await
936        .unwrap();
937
938        // Now join the system and talk to the proc actor.
939
940        let mut system = System::new(server_handle.local_addr().clone());
941        let client = system.attach().await.unwrap();
942
943        // This is really not cool. We should manage state subscriptions instead.
944        let start = RealClock.now();
945        let mut proc_state;
946        loop {
947            proc_state = bootstrap.proc_actor.state(&client).await.unwrap();
948
949            if matches!(proc_state, ProcState::Joined) || start.elapsed() >= MAX_WAIT_TIME {
950                break;
951            }
952        }
953        assert_matches!(proc_state, ProcState::Joined);
954
955        Bootstrapped {
956            server_handle,
957            proc_actor_ref: bootstrap.proc_actor.bind(),
958            comm_actor_ref: bootstrap.comm_actor.bind(),
959            client,
960        }
961    }
962
963    #[tokio::test]
964    async fn test_bootstrap() {
965        let Bootstrapped { server_handle, .. } = bootstrap().await;
966
967        println!("bootrapped, now waiting");
968
969        server_handle.stop().await.unwrap();
970        server_handle.await;
971    }
972
973    #[derive(Debug, Default, Actor)]
974    #[hyperactor::export(
975        spawn = true,
976        handlers = [
977            TestActorMessage,
978        ],
979    )]
980    struct TestActor;
981
982    #[derive(Handler, HandleClient, RefClient, Serialize, Deserialize, Debug, Named)]
983    enum TestActorMessage {
984        Increment(u64, #[reply] OncePortRef<u64>),
985        Fail(String),
986    }
987
988    #[async_trait]
989    #[forward(TestActorMessage)]
990    impl TestActorMessageHandler for TestActor {
991        async fn increment(&mut self, _cx: &Context<Self>, num: u64) -> Result<u64, anyhow::Error> {
992            Ok(num + 1)
993        }
994
995        async fn fail(&mut self, _cx: &Context<Self>, err: String) -> Result<(), anyhow::Error> {
996            Err(anyhow::anyhow!(err))
997        }
998    }
999
1000    #[tokio::test]
1001    async fn test_stop() {
1002        // Show here that the proc actors are stopped when the proc
1003        // actor receives a `Stop()` message.
1004        let Bootstrapped {
1005            server_handle,
1006            proc_actor_ref,
1007            client,
1008            ..
1009        } = bootstrap().await;
1010
1011        const NUM_ACTORS: usize = 4usize;
1012        for i in 0..NUM_ACTORS {
1013            spawn::<TestActor>(&client, &proc_actor_ref, format!("test{i}").as_str(), &())
1014                .await
1015                .unwrap();
1016        }
1017
1018        let ProcStopResult {
1019            proc_id: _,
1020            actors_stopped,
1021            actors_aborted,
1022        } = proc_actor_ref
1023            .stop(&client, Duration::from_secs(1))
1024            .await
1025            .unwrap();
1026        assert_eq!(NUM_ACTORS + 1, actors_stopped);
1027        assert_eq!(1, actors_aborted);
1028
1029        server_handle.stop().await.unwrap();
1030        server_handle.await;
1031    }
1032
1033    // Sleep
1034    #[derive(Debug, Default, Actor)]
1035    #[hyperactor::export(
1036        spawn = true,
1037        handlers = [
1038            u64,
1039        ],
1040    )]
1041    struct SleepActor {}
1042
1043    #[async_trait]
1044    impl Handler<u64> for SleepActor {
1045        async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> anyhow::Result<()> {
1046            let duration = message;
1047            RealClock.sleep(Duration::from_secs(duration)).await;
1048            Ok(())
1049        }
1050    }
1051
1052    #[tracing_test::traced_test]
1053    #[tokio::test]
1054    #[cfg_attr(not(fbcode_build), ignore)]
1055    async fn test_stop_timeout() {
1056        let Bootstrapped {
1057            server_handle,
1058            proc_actor_ref,
1059            client,
1060            ..
1061        } = bootstrap().await;
1062
1063        const NUM_ACTORS: usize = 4usize;
1064        for i in 0..NUM_ACTORS {
1065            let sleep_secs = 5u64;
1066            let sleeper = spawn::<SleepActor>(
1067                &client,
1068                &proc_actor_ref,
1069                format!("sleeper{i}").as_str(),
1070                &(),
1071            )
1072            .await
1073            .unwrap();
1074            if i > 0 {
1075                sleeper.send(&client, sleep_secs).unwrap();
1076            }
1077        }
1078
1079        let ProcStopResult {
1080            proc_id: _,
1081            actors_stopped,
1082            actors_aborted,
1083        } = proc_actor_ref
1084            .stop(&client, Duration::from_secs(1))
1085            .await
1086            .unwrap();
1087        assert_eq!(2, actors_stopped);
1088        assert_eq!((NUM_ACTORS - 1) + 1, actors_aborted);
1089
1090        assert!(tracing_test::internal::logs_with_scope_contain(
1091            "hyperactor::proc",
1092            "world[0].proc[0]: aborting (delayed) JoinHandle"
1093        ));
1094        for i in 1..3 {
1095            assert!(tracing_test::internal::logs_with_scope_contain(
1096                "hyperactor::proc",
1097                format!("world[0].sleeper{}[0]: aborting JoinHandle", i).as_str()
1098            ));
1099        }
1100        logs_assert(|logs| {
1101            let count = logs
1102                .iter()
1103                .filter(|log| {
1104                    log.contains("aborting JoinHandle")
1105                        || log.contains("aborting (delayed) JoinHandle")
1106                })
1107                .count();
1108            if count == actors_aborted {
1109                Ok(())
1110            } else {
1111                Err("task abort counting error".to_string())
1112            }
1113        });
1114
1115        server_handle.stop().await.unwrap();
1116        server_handle.await;
1117    }
1118
1119    #[tokio::test]
1120    async fn test_spawn() {
1121        let Bootstrapped {
1122            server_handle,
1123            proc_actor_ref,
1124            client,
1125            ..
1126        } = bootstrap().await;
1127
1128        let test_actor_ref = spawn::<TestActor>(&client, &proc_actor_ref, "test", &())
1129            .await
1130            .unwrap();
1131
1132        let result = test_actor_ref.increment(&client, 1).await.unwrap();
1133        assert_eq!(result, 2);
1134
1135        server_handle.stop().await.unwrap();
1136        server_handle.await;
1137    }
1138
1139    #[cfg(target_os = "linux")]
1140    fn random_abstract_addr() -> ChannelAddr {
1141        let random_string = rand::thread_rng()
1142            .sample_iter(&Alphanumeric)
1143            .take(24)
1144            .map(char::from)
1145            .collect::<String>();
1146        format!("unix!@{random_string}").parse().unwrap()
1147    }
1148
1149    #[cfg(target_os = "linux")] // remove after making abstract unix sockets store-and-forward
1150    #[tokio::test]
1151    async fn test_bootstrap_retry() {
1152        if std::env::var("CARGO_TEST").is_ok() {
1153            eprintln!("test skipped under cargo as it causes other tests to fail when run");
1154            return;
1155        }
1156
1157        // Spawn the proc before the server is up. This is imperfect
1158        // as we rely on sleeping. Ideally we'd make sure the proc performs
1159        // at least one try before we start the server.
1160        let bootstrap_addr = random_abstract_addr();
1161
1162        let bootstrap_addr_clone = bootstrap_addr.clone();
1163        let handle = tokio::spawn(async move {
1164            let world_id = id!(world);
1165            let proc_id = world_id.proc_id(0);
1166            let bootstrap = ProcActor::bootstrap(
1167                proc_id,
1168                world_id,
1169                random_abstract_addr(),
1170                bootstrap_addr_clone,
1171                Duration::from_secs(1),
1172                HashMap::new(),
1173                ProcLifecycleMode::ManagedBySystem,
1174            )
1175            .await
1176            .unwrap();
1177
1178            // Proc actor should still be running.
1179            let mut status = bootstrap.proc_actor.status();
1180            assert_eq!(*status.borrow_and_update(), ActorStatus::Idle);
1181        });
1182
1183        // Sleep for enough time, the ProcActor supervision shouldn't timed out causing ProcActor to stop.
1184        // When System actor is brought up later, it should finish properly.
1185        RealClock.sleep(Duration::from_secs(5)).await;
1186
1187        let _server_handle = System::serve(
1188            bootstrap_addr,
1189            Duration::from_secs(10),
1190            Duration::from_secs(10),
1191        )
1192        .await
1193        .unwrap();
1194
1195        // Task completed successfully, so it connected correctly.
1196        handle.await.unwrap();
1197    }
1198
1199    #[tokio::test]
1200    async fn test_supervision_message_handling() {
1201        if std::env::var("CARGO_TEST").is_ok() {
1202            eprintln!("test skipped under cargo as it fails when run with others");
1203            return;
1204        }
1205
1206        let server_handle = System::serve(
1207            ChannelAddr::any(ChannelTransport::Local),
1208            Duration::from_secs(3600),
1209            Duration::from_secs(3600),
1210        )
1211        .await
1212        .unwrap();
1213
1214        // A test supervisor.
1215        let mut system = System::new(server_handle.local_addr().clone());
1216        let supervisor = system.attach().await.unwrap();
1217        let (_supervisor_supervision_tx, mut supervisor_supervision_receiver) =
1218            supervisor.bind_actor_port::<ProcSupervisionMessage>();
1219        let supervisor_actor_ref: ActorRef<ProcSupervisor> =
1220            ActorRef::attest(supervisor.self_id().clone());
1221
1222        // Start the proc actor
1223        let local_world_id = hyperactor::id!(test_proc);
1224        let local_proc_id = local_world_id.proc_id(0);
1225        let bootstrap = ProcActor::try_bootstrap(
1226            local_proc_id.clone(),
1227            local_world_id.clone(),
1228            ChannelAddr::any(ChannelTransport::Local),
1229            server_handle.local_addr().clone(),
1230            supervisor_actor_ref.clone(),
1231            Duration::from_secs(1),
1232            HashMap::new(),
1233            ProcLifecycleMode::ManagedBySystem,
1234        )
1235        .await
1236        .unwrap();
1237
1238        // Should receive supervision message sent from the periodic task
1239        // indicating the proc is alive.
1240        let msg = supervisor_supervision_receiver.recv().await;
1241        match msg.unwrap() {
1242            ProcSupervisionMessage::Update(state, port) => {
1243                assert_eq!(
1244                    state,
1245                    ProcSupervisionState {
1246                        world_id: local_world_id.clone(),
1247                        proc_addr: ChannelAddr::Local(3),
1248                        proc_id: local_proc_id.clone(),
1249                        proc_health: ProcStatus::Alive,
1250                        failed_actors: Vec::new(),
1251                    }
1252                );
1253                let _ = port.send(&supervisor, ());
1254            }
1255        }
1256
1257        // Spawn a root actor on the proc.
1258        let proc_actor_ref = bootstrap.proc_actor.bind();
1259        let test_actor_ref = spawn::<TestActor>(&supervisor, &proc_actor_ref, "test", &())
1260            .await
1261            .unwrap();
1262
1263        test_actor_ref
1264            .fail(&supervisor, "test actor is erroring out".to_string())
1265            .await
1266            .unwrap();
1267        // Since we could get messages from both the periodic task and the
1268        // report from the failed actor, we need to poll for a while to make
1269        // sure we get the right message.
1270        let result = RealClock
1271            .timeout(Duration::from_secs(5), async {
1272                loop {
1273                    match supervisor_supervision_receiver.recv().await {
1274                        Ok(ProcSupervisionMessage::Update(state, _port)) => {
1275                            match state.failed_actors.iter().find(|(failed_id, _)| {
1276                                failed_id == test_actor_ref.clone().actor_id()
1277                            }) {
1278                                Some((_, actor_status)) => return Ok(actor_status.clone()),
1279                                None => {}
1280                            }
1281                        }
1282                        _ => anyhow::bail!("unexpected message type"),
1283                    }
1284                }
1285            })
1286            .await;
1287        assert_matches!(
1288            result.unwrap().unwrap(),
1289            ActorStatus::Failed(msg) if msg.to_string().contains("test actor is erroring out")
1290        );
1291
1292        server_handle.stop().await.unwrap();
1293        server_handle.await;
1294    }
1295
1296    // Verify that the proc actor's ProcMessage port is bound properly so
1297    // that we can send messages to it through the system actor.
1298    #[tokio::test]
1299    async fn test_bind_proc_actor_in_bootstrap() {
1300        let server_handle = System::serve(
1301            ChannelAddr::any(ChannelTransport::Local),
1302            Duration::from_secs(10),
1303            Duration::from_secs(10),
1304        )
1305        .await
1306        .unwrap();
1307        let mut system = System::new(server_handle.local_addr().clone());
1308        let client = system.attach().await.unwrap();
1309
1310        let world_id = id!(world);
1311        let proc_id = world_id.proc_id(0);
1312        let bootstrap = ProcActor::bootstrap(
1313            proc_id,
1314            world_id,
1315            ChannelAddr::any(ChannelTransport::Local),
1316            server_handle.local_addr().clone(),
1317            Duration::from_secs(1),
1318            HashMap::new(),
1319            ProcLifecycleMode::ManagedBySystem,
1320        )
1321        .await
1322        .unwrap();
1323        let proc_actor_id = bootstrap.proc_actor.actor_id().clone();
1324        let proc_actor_ref = ActorRef::<ProcActor>::attest(proc_actor_id);
1325
1326        let res = RealClock
1327            .timeout(Duration::from_secs(5), proc_actor_ref.state(&client))
1328            .await;
1329        // If ProcMessage's static Named port is not bound, this test will fail
1330        // due to timeout.
1331        assert!(res.is_ok());
1332        assert_matches!(res.unwrap().unwrap(), ProcState::Joined);
1333        server_handle.stop().await.unwrap();
1334        server_handle.await;
1335    }
1336
1337    #[tokio::test]
1338    async fn test_proc_snapshot() {
1339        let Bootstrapped {
1340            server_handle,
1341            proc_actor_ref,
1342            comm_actor_ref,
1343            client,
1344            ..
1345        } = bootstrap().await;
1346
1347        // Spawn some actors on this proc.
1348        let root: ActorRef<TestActor> = spawn::<TestActor>(&client, &proc_actor_ref, "root", &())
1349            .await
1350            .unwrap();
1351        let another_root = spawn::<TestActor>(&client, &proc_actor_ref, "another_root", &())
1352            .await
1353            .unwrap();
1354        {
1355            let snapshot = proc_actor_ref.snapshot(&client).await.unwrap();
1356            assert_eq!(snapshot.state, ProcState::Joined);
1357            assert_eq!(
1358                snapshot.actors.roots.keys().collect::<HashSet<_>>(),
1359                hashset! {
1360                    proc_actor_ref.actor_id(),
1361                    comm_actor_ref.actor_id(),
1362                    root.actor_id(),
1363                    another_root.actor_id(),
1364                }
1365            );
1366        }
1367
1368        server_handle.stop().await.unwrap();
1369        server_handle.await;
1370    }
1371
1372    #[tokio::test]
1373    async fn test_undeliverable_message_return() {
1374        // Proc can't send a message to a remote actor because the
1375        // system connection is lost.
1376        use hyperactor::mailbox::Undeliverable;
1377        use hyperactor::test_utils::pingpong::PingPongActor;
1378        use hyperactor::test_utils::pingpong::PingPongMessage;
1379
1380        // Use temporary config for this test
1381        let config = hyperactor::config::global::lock();
1382        let _guard = config.override_key(
1383            hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1384            Duration::from_secs(1),
1385        );
1386
1387        // Serve a system.
1388        let server_handle = System::serve(
1389            ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1390            Duration::from_secs(120),
1391            Duration::from_secs(120),
1392        )
1393        .await
1394        .unwrap();
1395        let mut system = System::new(server_handle.local_addr().clone());
1396
1397        // Build a supervisor.
1398        let supervisor = system.attach().await.unwrap();
1399        let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::<ProcSupervisionMessage>();
1400        let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
1401
1402        // Construct a system sender.
1403        let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1404            channel::dial(server_handle.local_addr().clone()).unwrap(),
1405        ));
1406
1407        // Construct a proc forwarder in terms of the system sender.
1408        let listen_addr = ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname));
1409        let proc_forwarder =
1410            BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1411
1412        // Bootstrap proc 'world[0]', join the system.
1413        let world_id = id!(world);
1414        let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1415        let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1416            proc_0.clone(),
1417            world_id.clone(),
1418            listen_addr,
1419            server_handle.local_addr().clone(),
1420            sup_ref.clone(),
1421            Duration::from_secs(120),
1422            HashMap::new(),
1423            ProcLifecycleMode::ManagedBySystem,
1424        )
1425        .await
1426        .unwrap();
1427        let proc_0_client = proc_0.attach("client").unwrap();
1428        let (proc_0_undeliverable_tx, mut proc_0_undeliverable_rx) = proc_0_client.open_port();
1429
1430        // Bootstrap a second proc 'world[1]', join the system.
1431        let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1432        let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1433            proc_1.clone(),
1434            world_id.clone(),
1435            ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1436            server_handle.local_addr().clone(),
1437            sup_ref.clone(),
1438            Duration::from_secs(120),
1439            HashMap::new(),
1440            ProcLifecycleMode::ManagedBySystem,
1441        )
1442        .await
1443        .unwrap();
1444        let proc_1_client = proc_1.attach("client").unwrap();
1445        let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port();
1446
1447        let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1448        // Spawn two actors 'ping' and 'pong' where 'ping' runs on
1449        // 'world[0]' and 'pong' on 'world[1]' (that is, not on the
1450        // same proc).
1451        let ping_handle = proc_0
1452            .spawn::<PingPongActor>("ping", ping_params)
1453            .await
1454            .unwrap();
1455        let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1456        let pong_handle = proc_1
1457            .spawn::<PingPongActor>("pong", pong_params)
1458            .await
1459            .unwrap();
1460
1461        // Now kill the system server making message delivery between
1462        // procs impossible.
1463        server_handle.stop().await.unwrap();
1464        server_handle.await;
1465
1466        let n = 100usize;
1467        for i in 1..(n + 1) {
1468            // Have 'ping' send 'pong' a message.
1469            let ttl = 66 + i as u64; // Avoid ttl = 66!
1470            let (once_handle, _) = proc_0_client.open_once_port::<bool>();
1471            ping_handle
1472                .send(PingPongMessage(ttl, pong_handle.bind(), once_handle.bind()))
1473                .unwrap();
1474        }
1475
1476        // `PingPongActor`s do not exit their message loop (a
1477        // non-default actor behavior) when they have an undelivered
1478        // message sent back to them (the reason being this very
1479        // test).
1480        assert!(matches!(*ping_handle.status().borrow(), ActorStatus::Idle));
1481
1482        // We expect n undelivered messages.
1483        let Ok(Undeliverable(envelope)) = proc_0_undeliverable_rx.recv().await else {
1484            unreachable!()
1485        };
1486        let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1487        let mut count = 1;
1488        while let Ok(Some(Undeliverable(envelope))) = proc_0_undeliverable_rx.try_recv() {
1489            // We care that every undeliverable message was accounted
1490            // for. We can't assume anything about their arrival
1491            // order.
1492            count += 1;
1493            let PingPongMessage(_, _, _) = envelope.deserialized().unwrap();
1494        }
1495        assert!(count == n);
1496    }
1497
1498    #[tracing_test::traced_test]
1499    #[tokio::test]
1500    #[cfg_attr(not(fbcode_build), ignore)]
1501    async fn test_proc_actor_mailbox_admin_message() {
1502        // Verify that proc actors update their address books on first
1503        // contact, and that no additional updates are triggered for
1504        // known procs.
1505
1506        use hyperactor::test_utils::pingpong::PingPongActor;
1507        use hyperactor::test_utils::pingpong::PingPongMessage;
1508
1509        // Serve a system.
1510        let server_handle = System::serve(
1511            ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1512            Duration::from_secs(120),
1513            Duration::from_secs(120),
1514        )
1515        .await
1516        .unwrap();
1517        let mut system = System::new(server_handle.local_addr().clone());
1518        let system_actor = server_handle.system_actor_handle();
1519        let system_client = system.attach().await.unwrap(); // world id: user
1520
1521        // Build a supervisor.
1522        let supervisor = system.attach().await.unwrap();
1523        let (_sup_tx, _sup_rx) = supervisor.bind_actor_port::<ProcSupervisionMessage>();
1524        let sup_ref = ActorRef::<ProcSupervisor>::attest(supervisor.self_id().clone());
1525
1526        // Construct a system sender.
1527        let system_sender = BoxedMailboxSender::new(MailboxClient::new(
1528            channel::dial(server_handle.local_addr().clone()).unwrap(),
1529        ));
1530
1531        // Construct a proc forwarder in terms of the system sender.
1532        let listen_addr = ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname));
1533        let proc_forwarder =
1534            BoxedMailboxSender::new(DialMailboxRouter::new_with_default(system_sender));
1535
1536        // Bootstrap proc 'world[0]', join the system.
1537        let world_id = id!(world);
1538        let proc_0 = Proc::new(world_id.proc_id(0), proc_forwarder.clone());
1539        let _proc_actor_0 = ProcActor::bootstrap_for_proc(
1540            proc_0.clone(),
1541            world_id.clone(),
1542            listen_addr,
1543            server_handle.local_addr().clone(),
1544            sup_ref.clone(),
1545            Duration::from_secs(120),
1546            HashMap::new(),
1547            ProcLifecycleMode::ManagedBySystem,
1548        )
1549        .await
1550        .unwrap();
1551        let proc_0_client = proc_0.attach("client").unwrap();
1552        let (proc_0_undeliverable_tx, _proc_0_undeliverable_rx) = proc_0_client.open_port();
1553
1554        // Bootstrap a second proc 'world[1]', join the system.
1555        let proc_1 = Proc::new(world_id.proc_id(1), proc_forwarder.clone());
1556        let _proc_actor_1 = ProcActor::bootstrap_for_proc(
1557            proc_1.clone(),
1558            world_id.clone(),
1559            ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1560            server_handle.local_addr().clone(),
1561            sup_ref.clone(),
1562            Duration::from_secs(120),
1563            HashMap::new(),
1564            ProcLifecycleMode::ManagedBySystem,
1565        )
1566        .await
1567        .unwrap();
1568        let proc_1_client = proc_1.attach("client").unwrap();
1569        let (proc_1_undeliverable_tx, _proc_1_undeliverable_rx) = proc_1_client.open_port();
1570
1571        // Spawn two actors 'ping' and 'pong' where 'ping' runs on
1572        // 'world[0]' and 'pong' on 'world[1]' (that is, not on the
1573        // same proc).
1574        let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None);
1575        let ping_handle = proc_0
1576            .spawn::<PingPongActor>("ping", ping_params)
1577            .await
1578            .unwrap();
1579        let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None);
1580        let pong_handle = proc_1
1581            .spawn::<PingPongActor>("pong", pong_params)
1582            .await
1583            .unwrap();
1584
1585        // Have 'ping' send 'pong' a message.
1586        let ttl = 10u64; // Avoid ttl = 66!
1587        let (once_tx, once_rx) = system_client.open_once_port::<bool>();
1588        ping_handle
1589            .send(PingPongMessage(ttl, pong_handle.bind(), once_tx.bind()))
1590            .unwrap();
1591
1592        assert!(once_rx.recv().await.unwrap());
1593
1594        // Ping gets Pong's address
1595        let expected_1 = r#"UpdateAddress {
1596    proc_id: Ranked(
1597        WorldId(
1598            "world",
1599        ),
1600        1,
1601    ),
1602    addr: Tcp("#;
1603
1604        // Pong gets Ping's address
1605        let expected_2 = r#"UpdateAddress {
1606    proc_id: Ranked(
1607        WorldId(
1608            "world",
1609        ),
1610        0,
1611    ),
1612    addr: Tcp("#;
1613
1614        // Ping gets "user"'s address
1615        let expected_3 = r#"UpdateAddress {
1616    proc_id: Ranked(
1617        WorldId(
1618            "user",
1619        ),"#;
1620
1621        logs_assert(|logs| {
1622            let log_body = logs.join("\n");
1623
1624            let pattern = Regex::new(r"(?m)^UpdateAddress \{\n(?:.*\n)*?^\}").unwrap();
1625            let count = pattern.find_iter(&log_body).count();
1626
1627            if count != 3 {
1628                return Err(format!(
1629                    "expected 3 UpdateAddress messages, found {}",
1630                    count
1631                ));
1632            }
1633
1634            if !log_body.contains(expected_1) {
1635                return Err("missing expected update for proc_id 1".into());
1636            }
1637            if !log_body.contains(expected_2) {
1638                return Err("missing expected update for proc_id 0".into());
1639            }
1640            if !log_body.contains(expected_3) {
1641                return Err("missing expected update for proc_id user".into());
1642            }
1643
1644            Ok(())
1645        });
1646
1647        let (once_tx, once_rx) = system_client.open_once_port::<()>();
1648        assert_matches!(
1649            system_actor
1650                .stop(
1651                    /*unused*/ &system_client,
1652                    None,
1653                    Duration::from_secs(1),
1654                    once_tx.bind()
1655                )
1656                .await,
1657            Ok(())
1658        );
1659        assert_matches!(once_rx.recv().await.unwrap(), ());
1660    }
1661
1662    #[tokio::test]
1663    async fn test_update_address_book_cache() {
1664        let server_handle = System::serve(
1665            ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
1666            Duration::from_secs(2), // supervision update timeout
1667            Duration::from_secs(2), // duration to evict an unhealthy world
1668        )
1669        .await
1670        .unwrap();
1671        let system_addr = server_handle.local_addr().clone();
1672        let mut system = System::new(system_addr.clone());
1673
1674        let system_client = system.attach().await.unwrap();
1675
1676        // Spawn ping and pong actors to play a ping pong game.
1677        let ping_actor_id = id!(world[0].ping[0]);
1678        let (ping_actor_ref, _ping_proc_ref) =
1679            spawn_actor(&system_client, &ping_actor_id, &system_addr).await;
1680
1681        let pong_actor_id = id!(world[1].pong[0]);
1682        let (pong_actor_ref, pong_proc_ref) =
1683            spawn_actor(&system_client, &pong_actor_id, &system_addr).await;
1684
1685        // After playing the first round game, ping and pong actors has each other's
1686        // ChannelAddr cached in their procs' mailboxes, respectively.
1687        let (done_tx, done_rx) = system_client.open_once_port();
1688        let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1689        ping_actor_ref
1690            .send(&system_client, ping_pong_message)
1691            .unwrap();
1692        assert!(done_rx.recv().await.unwrap());
1693
1694        // Now we kill and respawn the pong actor so it's ChannelAddr is changed.
1695        let ProcStopResult { actors_aborted, .. } = pong_proc_ref
1696            .stop(&system_client, Duration::from_secs(1))
1697            .await
1698            .unwrap();
1699        assert_eq!(1, actors_aborted);
1700        let (pong_actor_ref, _pong_proc_ref) =
1701            spawn_actor(&system_client, &pong_actor_id, &system_addr).await;
1702
1703        // Now we expect to play the game between ping and new pong. The new pong has the same
1704        // proc ID as the old pong but different ChannelAddr. The game should still be playable
1705        // with system actor updating the cached address of Pong inside Ping's mailbox.
1706        let (done_tx, done_rx) = system_client.open_once_port();
1707        let ping_pong_message = PingPongMessage(4, pong_actor_ref.clone(), done_tx.bind());
1708        ping_actor_ref
1709            .send(&system_client, ping_pong_message)
1710            .unwrap();
1711        assert!(done_rx.recv().await.unwrap());
1712    }
1713
1714    async fn spawn_actor(
1715        cx: &impl context::Actor,
1716        actor_id: &ActorId,
1717        system_addr: &ChannelAddr,
1718    ) -> (ActorRef<PingPongActor>, ActorRef<ProcActor>) {
1719        let listen_addr = ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname));
1720        let bootstrap = ProcActor::bootstrap(
1721            actor_id.proc_id().clone(),
1722            actor_id
1723                .proc_id()
1724                .world_id()
1725                .expect("proc must be ranked for bootstrap world_id")
1726                .clone(),
1727            listen_addr.clone(),
1728            system_addr.clone(),
1729            Duration::from_secs(3),
1730            HashMap::new(),
1731            ProcLifecycleMode::ManagedBySystem,
1732        )
1733        .await
1734        .unwrap();
1735        let (undeliverable_msg_tx, _) = cx.mailbox().open_port();
1736        let params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
1737        let actor_ref = spawn::<PingPongActor>(
1738            cx,
1739            &bootstrap.proc_actor.bind(),
1740            &actor_id.to_string(),
1741            &params,
1742        )
1743        .await
1744        .unwrap();
1745        let proc_actor_ref = bootstrap.proc_actor.bind();
1746        (actor_ref, proc_actor_ref)
1747    }
1748}