hyperactor_mesh/v1/
proc_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::any::type_name;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::fmt;
13use std::ops::Deref;
14use std::panic::Location;
15use std::sync::Arc;
16use std::sync::OnceLock;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering;
19use std::time::Duration;
20
21use hyperactor::Actor;
22use hyperactor::ActorHandle;
23use hyperactor::ActorId;
24use hyperactor::ActorRef;
25use hyperactor::Named;
26use hyperactor::ProcId;
27use hyperactor::RemoteMessage;
28use hyperactor::accum::ReducerOpts;
29use hyperactor::actor::ActorStatus;
30use hyperactor::actor::Referable;
31use hyperactor::actor::remote::Remote;
32use hyperactor::channel;
33use hyperactor::channel::ChannelAddr;
34use hyperactor::clock::Clock;
35use hyperactor::clock::RealClock;
36use hyperactor::config;
37use hyperactor::config::CONFIG;
38use hyperactor::config::ConfigAttr;
39use hyperactor::context;
40use hyperactor::declare_attrs;
41use hyperactor::mailbox::DialMailboxRouter;
42use hyperactor::mailbox::MailboxServer;
43use hyperactor::supervision::ActorSupervisionEvent;
44use ndslice::Extent;
45use ndslice::ViewExt as _;
46use ndslice::view;
47use ndslice::view::CollectMeshExt;
48use ndslice::view::MapIntoExt;
49use ndslice::view::Ranked;
50use ndslice::view::Region;
51use serde::Deserialize;
52use serde::Serialize;
53use tokio::sync::Notify;
54use tracing::Instrument;
55
56use crate::CommActor;
57use crate::alloc::Alloc;
58use crate::alloc::AllocExt;
59use crate::alloc::AllocatedProc;
60use crate::assign::Ranks;
61use crate::comm::CommActorMode;
62use crate::proc_mesh::mesh_agent;
63use crate::proc_mesh::mesh_agent::ActorState;
64use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;
65use crate::proc_mesh::mesh_agent::ProcMeshAgent;
66use crate::proc_mesh::mesh_agent::ReconfigurableMailboxSender;
67use crate::resource;
68use crate::resource::GetRankStatus;
69use crate::resource::Status;
70use crate::v1;
71use crate::v1::ActorMesh;
72use crate::v1::ActorMeshRef;
73use crate::v1::Error;
74use crate::v1::HostMeshRef;
75use crate::v1::Name;
76use crate::v1::ValueMesh;
77use crate::v1::host_mesh::mesh_agent::ProcState;
78use crate::v1::host_mesh::mesh_to_rankedvalues_with_default;
79use crate::v1::mesh_controller::ActorMeshController;
80
81declare_attrs! {
82    /// The maximum idle time between updates while spawning actor
83    /// meshes.
84    @meta(CONFIG = ConfigAttr {
85        env_name: Some("HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()),
86        py_name: None,
87    })
88    pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
89
90    @meta(CONFIG = ConfigAttr {
91        env_name: Some("HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()),
92        py_name: None,
93    })
94    pub attr GET_ACTOR_STATE_MAX_IDLE: Duration = Duration::from_secs(60);
95}
96
97/// A reference to a single [`hyperactor::Proc`].
98#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
99pub struct ProcRef {
100    proc_id: ProcId,
101    /// The rank of this proc at creation.
102    create_rank: usize,
103    /// The agent managing this proc.
104    agent: ActorRef<ProcMeshAgent>,
105}
106
107impl ProcRef {
108    pub(crate) fn new(proc_id: ProcId, create_rank: usize, agent: ActorRef<ProcMeshAgent>) -> Self {
109        Self {
110            proc_id,
111            create_rank,
112            agent,
113        }
114    }
115
116    /// Pings the proc, returning whether it is alive. This will be replaced by a
117    /// finer-grained lifecycle status in the near future.
118    pub(crate) async fn status(&self, cx: &impl context::Actor) -> v1::Result<bool> {
119        let (port, mut rx) = cx.mailbox().open_port();
120        self.agent
121            .status(cx, port.bind())
122            .await
123            .map_err(|e| Error::CallError(self.agent.actor_id().clone(), e))?;
124        loop {
125            let (rank, status) = rx
126                .recv()
127                .await
128                .map_err(|e| Error::CallError(self.agent.actor_id().clone(), e.into()))?;
129            if rank == self.create_rank {
130                break Ok(status);
131            }
132        }
133    }
134
135    /// Get the supervision events for one actor with the given name.
136    #[allow(dead_code)]
137    async fn actor_state(
138        &self,
139        cx: &impl context::Actor,
140        name: Name,
141    ) -> v1::Result<resource::State<ActorState>> {
142        let (port, mut rx) = cx.mailbox().open_port::<resource::State<ActorState>>();
143        self.agent
144            .send(
145                cx,
146                resource::GetState::<ActorState> {
147                    name: name.clone(),
148                    reply: port.bind(),
149                },
150            )
151            .map_err(|e| Error::CallError(self.agent.actor_id().clone(), e.into()))?;
152        let state = rx
153            .recv()
154            .await
155            .map_err(|e| Error::CallError(self.agent.actor_id().clone(), e.into()))?;
156        if let Some(ref inner) = state.state {
157            let rank = inner.create_rank;
158            if rank == self.create_rank {
159                Ok(state)
160            } else {
161                Err(Error::CallError(
162                    self.agent.actor_id().clone(),
163                    anyhow::anyhow!(
164                        "Rank on mesh agent not matching for Actor {}: returned {}, expected {}",
165                        name,
166                        rank,
167                        self.create_rank
168                    ),
169                ))
170            }
171        } else {
172            Err(Error::CallError(
173                self.agent.actor_id().clone(),
174                anyhow::anyhow!("Actor {} does not exist", name),
175            ))
176        }
177    }
178
179    pub fn proc_id(&self) -> &ProcId {
180        &self.proc_id
181    }
182
183    pub(crate) fn actor_id(&self, name: &Name) -> ActorId {
184        self.proc_id.actor_id(name.to_string(), 0)
185    }
186
187    /// Generic bound: `A: Referable` - required because we return
188    /// an `ActorRef<A>`.
189    pub(crate) fn attest<A: Referable>(&self, name: &Name) -> ActorRef<A> {
190        ActorRef::attest(self.actor_id(name))
191    }
192}
193
194/// A mesh of processes.
195#[derive(Debug)]
196pub struct ProcMesh {
197    #[allow(dead_code)]
198    name: Name,
199    allocation: ProcMeshAllocation,
200    #[allow(dead_code)]
201    comm_actor_name: Option<Name>,
202    current_ref: ProcMeshRef,
203}
204
205impl ProcMesh {
206    async fn create(
207        cx: &impl context::Actor,
208        name: Name,
209        allocation: ProcMeshAllocation,
210        spawn_comm_actor: bool,
211    ) -> v1::Result<Self> {
212        let comm_actor_name = if spawn_comm_actor {
213            Some(Name::new("comm"))
214        } else {
215            None
216        };
217
218        let region = allocation.extent().clone().into();
219        let ranks = allocation.ranks();
220        let root_comm_actor = comm_actor_name.as_ref().map(|name| {
221            ActorRef::attest(
222                ranks
223                    .first()
224                    .expect("root mesh cannot be empty")
225                    .actor_id(name),
226            )
227        });
228        let host_mesh = allocation.hosts();
229        let current_ref = ProcMeshRef::new(
230            name.clone(),
231            region,
232            ranks,
233            host_mesh.cloned(),
234            None, // this is the root mesh
235            None, // comm actor is not alive yet
236        )
237        .unwrap();
238
239        let mut proc_mesh = Self {
240            name,
241            allocation,
242            comm_actor_name: comm_actor_name.clone(),
243            current_ref,
244        };
245
246        if let Some(comm_actor_name) = comm_actor_name {
247            // CommActor satisfies `Actor + Referable`, so it can be
248            // spawned and safely referenced via ActorRef<CommActor>.
249            let comm_actor_mesh = proc_mesh
250                .spawn_with_name::<CommActor>(cx, comm_actor_name, &Default::default())
251                .await?;
252            let address_book: HashMap<_, _> = comm_actor_mesh
253                .iter()
254                .map(|(point, actor_ref)| (point.rank(), actor_ref))
255                .collect();
256            // Now that we have all of the spawned comm actors, kick them all into
257            // mesh mode.
258            for (rank, comm_actor) in &address_book {
259                comm_actor
260                    .send(cx, CommActorMode::Mesh(*rank, address_book.clone()))
261                    .map_err(|e| Error::SendingError(comm_actor.actor_id().clone(), Box::new(e)))?
262            }
263
264            // The comm actor is now set up and ready to go.
265            proc_mesh.current_ref.root_comm_actor = root_comm_actor;
266        }
267
268        Ok(proc_mesh)
269    }
270
271    pub(crate) async fn create_owned_unchecked(
272        cx: &impl context::Actor,
273        name: Name,
274        extent: Extent,
275        hosts: HostMeshRef,
276        ranks: Vec<ProcRef>,
277    ) -> v1::Result<Self> {
278        Self::create(
279            cx,
280            name,
281            ProcMeshAllocation::Owned {
282                hosts,
283                extent,
284                ranks: Arc::new(ranks),
285            },
286            true,
287        )
288        .await
289    }
290
291    fn alloc_counter() -> &'static AtomicUsize {
292        static C: OnceLock<AtomicUsize> = OnceLock::new();
293        C.get_or_init(|| AtomicUsize::new(0))
294    }
295
296    /// Allocate a new ProcMesh from the provided alloc.
297    /// Allocate does not require an owning actor because references are not owned.
298    #[track_caller]
299    pub async fn allocate(
300        cx: &impl context::Actor,
301        alloc: Box<dyn Alloc + Send + Sync + 'static>,
302        name: &str,
303    ) -> v1::Result<Self> {
304        let caller = Location::caller();
305        Self::allocate_inner(cx, alloc, Name::new(name), caller).await
306    }
307
308    // Use allocate_inner to set field mesh_name in span
309    #[hyperactor::instrument(fields(proc_mesh=name.to_string()))]
310    async fn allocate_inner(
311        cx: &impl context::Actor,
312        mut alloc: Box<dyn Alloc + Send + Sync + 'static>,
313        name: Name,
314        caller: &'static Location<'static>,
315    ) -> v1::Result<Self> {
316        let alloc_id = Self::alloc_counter().fetch_add(1, Ordering::Relaxed) + 1;
317        tracing::info!(
318            name = "ProcMeshStatus",
319            status = "Allocate::Attempt",
320            %caller,
321            alloc_id,
322            shape = ?alloc.shape(),
323            "allocating proc mesh"
324        );
325
326        let running = alloc
327            .initialize()
328            .instrument(tracing::info_span!(
329                "ProcMeshStatus::Allocate::Initialize",
330                alloc_id,
331                proc_mesh = %name
332            ))
333            .await?;
334
335        // Wire the newly created mesh into the proc, so that it is routable.
336        // We route all of the relevant prefixes into the proc's forwarder,
337        // and serve it on the alloc's transport.
338        //
339        // This will be removed with direct addressing.
340        let proc = cx.instance().proc();
341
342        // First make sure we can serve the proc:
343        let proc_channel_addr = {
344            let _guard =
345                tracing::info_span!("allocate_serve_proc", proc_id = %proc.proc_id()).entered();
346            let (addr, rx) = channel::serve(ChannelAddr::any(alloc.transport()))?;
347            proc.clone().serve(rx);
348            tracing::info!(
349                name = "ProcMeshStatus",
350                status = "Allocate::ChannelServe",
351                proc_mesh = %name,
352                %addr,
353                "proc started listening on addr: {addr}"
354            );
355            addr
356        };
357
358        let bind_allocated_procs = |router: &DialMailboxRouter| {
359            // Route all of the allocated procs:
360            for AllocatedProc { proc_id, addr, .. } in running.iter() {
361                if proc_id.is_direct() {
362                    continue;
363                }
364                router.bind(proc_id.clone().into(), addr.clone());
365            }
366        };
367
368        // Temporary for backward compatibility with ranked procs and v0 API.
369        // Proc meshes can be allocated either using the root client proc (which
370        // has a DialMailboxRouter forwarder) or a mesh agent proc (which has a
371        // ReconfigurableMailboxSender forwarder with an inner DialMailboxRouter).
372        if let Some(router) = proc.forwarder().downcast_ref() {
373            bind_allocated_procs(router);
374        } else if let Some(router) = proc
375            .forwarder()
376            .downcast_ref::<ReconfigurableMailboxSender>()
377        {
378            bind_allocated_procs(
379                router
380                    .as_inner()
381                    .map_err(|_| Error::UnroutableMesh())?
382                    .as_configured()
383                    .ok_or(Error::UnroutableMesh())?
384                    .downcast_ref()
385                    .ok_or(Error::UnroutableMesh())?,
386            );
387        } else {
388            return Err(Error::UnroutableMesh());
389        }
390
391        // Set up the mesh agents. Since references are not owned, we don't supervise it.
392        // Instead, we just let procs die when they have unhandled supervision events.
393        let address_book: HashMap<_, _> = running
394            .iter()
395            .map(
396                |AllocatedProc {
397                     addr, mesh_agent, ..
398                 }| { (mesh_agent.actor_id().proc_id().clone(), addr.clone()) },
399            )
400            .collect();
401
402        let (config_handle, mut config_receiver) = cx.mailbox().open_port();
403        for (rank, AllocatedProc { mesh_agent, .. }) in running.iter().enumerate() {
404            mesh_agent
405                .configure(
406                    cx,
407                    rank,
408                    proc_channel_addr.clone(),
409                    None, // no supervisor; we just crash
410                    address_book.clone(),
411                    config_handle.bind(),
412                    true,
413                )
414                .await
415                .map_err(Error::ConfigurationError)?;
416        }
417        let mut completed = Ranks::new(running.len());
418        while !completed.is_full() {
419            let rank = config_receiver
420                .recv()
421                .await
422                .map_err(|err| Error::ConfigurationError(err.into()))?;
423            if completed.insert(rank, rank).is_some() {
424                tracing::warn!("multiple completions received for rank {}", rank);
425            }
426        }
427
428        let ranks: Vec<_> = running
429            .into_iter()
430            .enumerate()
431            .map(|(create_rank, allocated)| ProcRef {
432                proc_id: allocated.proc_id,
433                create_rank,
434                agent: allocated.mesh_agent,
435            })
436            .collect();
437
438        let stop = Arc::new(Notify::new());
439        let extent = alloc.extent().clone();
440        let alloc_name = alloc.world_id().to_string();
441
442        {
443            let stop = Arc::clone(&stop);
444
445            tokio::spawn(
446                async move {
447                    loop {
448                        tokio::select! {
449                            _ = stop.notified() => {
450                                // If we are explicitly stopped, the alloc is torn down.
451                                if let Err(error) = alloc.stop_and_wait().await {
452                                    tracing::error!(
453                                        name = "ProcMeshStatus",
454                                        alloc_name = %alloc.world_id(),
455                                        status = "FailedToStopAlloc",
456                                        %error,
457                                    );
458                                }
459                                break;
460                            }
461                            // We are mostly just using this to drive allocation events.
462                            proc_state = alloc.next() => {
463                                match proc_state {
464                                    // The alloc was stopped.
465                                    None => break,
466                                    Some(proc_state) => {
467                                        tracing::debug!(
468                                            alloc_name = %alloc.world_id(),
469                                            "unmonitored allocation event: {}", proc_state);
470                                    }
471                                }
472
473                            }
474                        }
475                    }
476                }
477                .instrument(tracing::info_span!("alloc_monitor")),
478            );
479        }
480
481        let mesh = Self::create(
482            cx,
483            name,
484            ProcMeshAllocation::Allocated {
485                alloc_name,
486                stop,
487                extent,
488                ranks: Arc::new(ranks),
489            },
490            true, // alloc-based meshes support comm actors
491        )
492        .await;
493        match &mesh {
494            Ok(_) => tracing::info!(name = "ProcMeshStatus", status = "Allocate::Created"),
495            Err(error) => {
496                tracing::info!(name = "ProcMeshStatus", status = "Allocate::Failed", %error)
497            }
498        }
499        mesh
500    }
501
502    /// Detach the proc mesh from the lifetime of `self`, and return its reference.
503    #[cfg(test)]
504    pub(crate) fn detach(self) -> ProcMeshRef {
505        // This also keeps the ProcMeshAllocation::Allocated alloc task alive.
506        self.current_ref.clone()
507    }
508
509    /// Stop this mesh gracefully.
510    pub async fn stop(&mut self, cx: &impl context::Actor) -> anyhow::Result<()> {
511        let region = self.region.clone();
512        match &mut self.allocation {
513            ProcMeshAllocation::Allocated {
514                stop, alloc_name, ..
515            } => {
516                stop.notify_one();
517                tracing::info!(
518                    name = "ProcMeshStatus",
519                    proc_mesh = %self.name,
520                    alloc_name,
521                    status = "StoppingAlloc",
522                    "sending stop to alloc {alloc_name}; check its log for stop status",
523                );
524                Ok(())
525            }
526            ProcMeshAllocation::Owned { hosts, .. } => {
527                let procs = self.current_ref.proc_ids().collect::<Vec<ProcId>>();
528                // We use the proc mesh region rather than the host mesh region
529                // because the host agent stores one entry per proc, not per host.
530                hosts.stop_proc_mesh(cx, &self.name, procs, region).await
531            }
532        }
533    }
534}
535
536impl fmt::Display for ProcMesh {
537    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
538        write!(f, "{}", self.current_ref)
539    }
540}
541
542impl Deref for ProcMesh {
543    type Target = ProcMeshRef;
544
545    fn deref(&self) -> &Self::Target {
546        &self.current_ref
547    }
548}
549
550impl Drop for ProcMesh {
551    fn drop(&mut self) {
552        tracing::info!(
553            name = "ProcMeshStatus",
554            proc_mesh = %self.name,
555            status = "Dropped",
556        );
557    }
558}
559
560/// Represents different ways ProcMeshes can be allocated.
561enum ProcMeshAllocation {
562    /// A mesh that has been allocated from an `Alloc`.
563    Allocated {
564        // The name of the alloc from which this mesh was allocated.
565        alloc_name: String,
566
567        // A cancellation token used to stop the task keeping the alloc alive.
568        stop: Arc<Notify>,
569
570        extent: Extent,
571
572        // The allocated ranks.
573        ranks: Arc<Vec<ProcRef>>,
574    },
575
576    /// An owned allocation: this ProcMesh fully owns the set of ranks.
577    Owned {
578        /// The host mesh from which the proc mesh was spawned.
579        hosts: HostMeshRef,
580        // This is purely for storage: `hosts.extent()` returns a computed (by value)
581        // extent.
582        extent: Extent,
583        /// A proc reference for each rank in the mesh.
584        ranks: Arc<Vec<ProcRef>>,
585    },
586}
587
588impl ProcMeshAllocation {
589    fn extent(&self) -> &Extent {
590        match self {
591            ProcMeshAllocation::Allocated { extent, .. } => extent,
592            ProcMeshAllocation::Owned { extent, .. } => extent,
593        }
594    }
595
596    fn ranks(&self) -> Arc<Vec<ProcRef>> {
597        Arc::clone(match self {
598            ProcMeshAllocation::Allocated { ranks, .. } => ranks,
599            ProcMeshAllocation::Owned { ranks, .. } => ranks,
600        })
601    }
602
603    fn hosts(&self) -> Option<&HostMeshRef> {
604        match self {
605            ProcMeshAllocation::Allocated { .. } => None,
606            ProcMeshAllocation::Owned { hosts, .. } => Some(hosts),
607        }
608    }
609}
610
611impl fmt::Debug for ProcMeshAllocation {
612    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
613        match self {
614            ProcMeshAllocation::Allocated { ranks, .. } => f
615                .debug_struct("ProcMeshAllocation::Allocated")
616                .field("alloc", &"<dyn Alloc>")
617                .field("ranks", ranks)
618                .finish(),
619            ProcMeshAllocation::Owned {
620                hosts,
621                ranks,
622                extent: _,
623            } => f
624                .debug_struct("ProcMeshAllocation::Owned")
625                .field("hosts", hosts)
626                .field("ranks", ranks)
627                .finish(),
628        }
629    }
630}
631
632/// A reference to a ProcMesh, consisting of a set of ranked [`ProcRef`]s,
633/// arranged into a region. ProcMeshes are named, uniquely identifying the
634/// ProcMesh from which the reference was derived.
635///
636/// ProcMeshes can be sliced to create new ProcMeshes with a subset of the
637/// original ranks.
638#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
639pub struct ProcMeshRef {
640    name: Name,
641    region: Region,
642    ranks: Arc<Vec<ProcRef>>,
643    // Some if this was spawned from a host mesh, else none.
644    host_mesh: Option<HostMeshRef>,
645    // Temporary: used to fit v1 ActorMesh with v0's casting implementation. This
646    // should be removed after we remove the v0 code.
647    // The root region of this mesh. None means this mesh itself is the root.
648    pub(crate) root_region: Option<Region>,
649    // Temporary: used to fit v1 ActorMesh with v0's casting implementation. This
650    // should be removed after we remove the v0 code.
651    // v0 casting requires root mesh rank 0 as the 1st hop, so we need to provide
652    // it here. For v1, this can be removed since v1 can use any rank.
653    pub(crate) root_comm_actor: Option<ActorRef<CommActor>>,
654}
655
656impl ProcMeshRef {
657    /// Create a new ProcMeshRef from the given name, region, ranks, and so on.
658    #[allow(clippy::result_large_err)]
659    fn new(
660        name: Name,
661        region: Region,
662        ranks: Arc<Vec<ProcRef>>,
663        host_mesh: Option<HostMeshRef>,
664        root_region: Option<Region>,
665        root_comm_actor: Option<ActorRef<CommActor>>,
666    ) -> v1::Result<Self> {
667        if region.num_ranks() != ranks.len() {
668            return Err(v1::Error::InvalidRankCardinality {
669                expected: region.num_ranks(),
670                actual: ranks.len(),
671            });
672        }
673        Ok(Self {
674            name,
675            region,
676            ranks,
677            host_mesh,
678            root_region,
679            root_comm_actor,
680        })
681    }
682
683    pub(crate) fn root_comm_actor(&self) -> Option<&ActorRef<CommActor>> {
684        self.root_comm_actor.as_ref()
685    }
686
687    pub fn name(&self) -> &Name {
688        &self.name
689    }
690
691    pub fn host_mesh_name(&self) -> Option<&Name> {
692        self.host_mesh.as_ref().map(|h| h.name())
693    }
694
695    /// Returns the HostMeshRef that this ProcMeshRef might be backed by.
696    /// Returns None if this ProcMeshRef is backed by an Alloc instead of a host mesh.
697    pub fn hosts(&self) -> Option<&HostMeshRef> {
698        self.host_mesh.as_ref()
699    }
700
701    /// The current statuses of procs in this mesh.
702    pub async fn status(&self, cx: &impl context::Actor) -> v1::Result<ValueMesh<bool>> {
703        let vm: ValueMesh<_> = self.map_into(|proc_ref| {
704            let proc_ref = proc_ref.clone();
705            async move { proc_ref.status(cx).await }
706        });
707        vm.join().await.transpose()
708    }
709
710    pub(crate) fn agent_mesh(&self) -> ActorMeshRef<ProcMeshAgent> {
711        let agent_name = self.ranks.first().unwrap().agent.actor_id().name();
712        // This name must match the ProcMeshAgent name, which can change depending on the allocator.
713        ActorMeshRef::new(Name::new_reserved(agent_name), self.clone())
714    }
715
716    /// The supervision events of procs in this mesh.
717    pub async fn actor_states(
718        &self,
719        cx: &impl context::Actor,
720        name: Name,
721    ) -> v1::Result<ValueMesh<resource::State<ActorState>>> {
722        let agent_mesh = self.agent_mesh();
723        let (port, mut rx) = cx.mailbox().open_port::<resource::State<ActorState>>();
724        // TODO: Use accumulation to get back a single value (representing whether
725        // *any* of the actors failed) instead of a mesh.
726        agent_mesh.cast(
727            cx,
728            resource::GetState::<ActorState> {
729                name: name.clone(),
730                reply: port.bind(),
731            },
732        )?;
733        let expected = self.ranks.len();
734        let mut states = Vec::with_capacity(expected);
735        let timeout = config::global::get(GET_ACTOR_STATE_MAX_IDLE);
736        for _ in 0..expected {
737            // The agent runs on the same process as the running actor, so if some
738            // fatal event caused the process to crash (e.g. OOM, signal, process exit),
739            // the agent will be unresponsive.
740            // We handle this by setting a timeout on the recv, and if we don't get a
741            // message we assume the agent is dead and return a failed state.
742            let state = RealClock.timeout(timeout, rx.recv()).await;
743            if let Ok(state) = state {
744                // Handle non-timeout receiver error.
745                let state = state?;
746                match state.state {
747                    Some(ref inner) => {
748                        states.push((inner.create_rank, state));
749                    }
750                    None => {
751                        return Err(Error::NotExist(state.name));
752                    }
753                }
754            } else {
755                tracing::error!(
756                    "timeout waiting for a message after {:?} from proc mesh agent in mesh {}",
757                    timeout,
758                    agent_mesh
759                );
760                // Timeout error, stop reading from the receiver and send back what we have so far,
761                // padding with failed states.
762                let all_ranks = (0..self.ranks.len()).collect::<HashSet<_>>();
763                let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
764                let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
765                assert_eq!(leftover_ranks.len(), expected - states.len());
766                while states.len() < expected {
767                    let rank = *leftover_ranks
768                        .pop()
769                        .expect("leftover ranks should not be empty");
770                    let agent = agent_mesh.get(rank).expect("agent should exist");
771                    let agent_id = agent.actor_id().clone();
772                    states.push((
773                        // We populate with any ranks leftover at the time of the timeout.
774                        rank,
775                        resource::State {
776                            name: name.clone(),
777                            status: resource::Status::Timeout(timeout),
778                            // We don't know the ActorId that used to live on this rank.
779                            // But we do know the mesh agent id, so we'll use that.
780                            state: Some(ActorState {
781                                actor_id: agent_id.clone(),
782                                create_rank: rank,
783                                supervision_events: vec![ActorSupervisionEvent::new(
784                                    agent_id,
785                                    None,
786                                    ActorStatus::Stopped,
787                                    None,
788                                )],
789                            }),
790                        },
791                    ));
792                }
793                break;
794            }
795        }
796        // Ensure that all ranks have replied. Note that if the mesh is sliced,
797        // not all create_ranks may be in the mesh.
798        // Sort by rank, so that the resulting mesh is ordered.
799        states.sort_by_key(|(rank, _)| *rank);
800        let vm = states
801            .into_iter()
802            .map(|(_, state)| state)
803            .collect_mesh::<ValueMesh<_>>(self.region.clone())?;
804        Ok(vm)
805    }
806
807    pub async fn proc_states(
808        &self,
809        cx: &impl context::Actor,
810    ) -> v1::Result<Option<ValueMesh<resource::State<ProcState>>>> {
811        let names = self.proc_ids().collect::<Vec<ProcId>>();
812        if let Some(host_mesh) = &self.host_mesh {
813            Ok(Some(
814                host_mesh
815                    .proc_states(cx, names, self.region.clone())
816                    .await?,
817            ))
818        } else {
819            Ok(None)
820        }
821    }
822
823    /// Returns an iterator over the proc ids in this mesh.
824    pub(crate) fn proc_ids(&self) -> impl Iterator<Item = ProcId> {
825        self.ranks.iter().map(|proc_ref| proc_ref.proc_id.clone())
826    }
827
828    /// Spawn an actor on all of the procs in this mesh, returning a
829    /// new ActorMesh.
830    ///
831    /// Bounds:
832    /// - `A: Actor` - the actor actually runs inside each proc.
833    /// - `A: Referable` - so we can return typed `ActorRef<A>`s
834    ///   inside the `ActorMesh`.
835    /// - `A::Params: RemoteMessage` - spawn parameters must be
836    ///   serializable and routable.
837    pub async fn spawn<A: Actor + Referable>(
838        &self,
839        cx: &impl context::Actor,
840        name: &str,
841        params: &A::Params,
842    ) -> v1::Result<ActorMesh<A>>
843    where
844        A::Params: RemoteMessage,
845    {
846        self.spawn_with_name(cx, Name::new(name), params).await
847    }
848
849    /// Spawn a 'service' actor. Service actors are *singletons*, using
850    /// reserved names. The provided name is used verbatim as the actor's
851    /// name, and thus it may be persistently looked up by constructing
852    /// the appropriate name.
853    ///
854    /// Note: avoid using service actors if possible; the mechanism will
855    /// be replaced by an actor registry.
856    pub async fn spawn_service<A: Actor + Referable>(
857        &self,
858        cx: &impl context::Actor,
859        name: &str,
860        params: &A::Params,
861    ) -> v1::Result<ActorMesh<A>>
862    where
863        A::Params: RemoteMessage,
864    {
865        self.spawn_with_name(cx, Name::new_reserved(name), params)
866            .await
867    }
868
869    /// Spawn an actor on all procs in this mesh under the given
870    /// [`Name`], returning a new `ActorMesh`.
871    ///
872    /// This is the underlying implementation used by [`spawn`]; it
873    /// differs only in that the actor name is passed explicitly
874    /// rather than as a `&str`.
875    ///
876    /// Bounds:
877    /// - `A: Actor` - the actor actually runs inside each proc.
878    /// - `A: Referable` - so we can return typed `ActorRef<A>`s
879    ///   inside the `ActorMesh`.
880    /// - `A::Params: RemoteMessage` - spawn parameters must be
881    ///   serializable and routable.
882    #[hyperactor::instrument(fields(
883        host_mesh=self.host_mesh_name().map(|n| n.to_string()),
884        proc_mesh=self.name.to_string(),
885        actor_name=name.to_string(),
886    ))]
887    pub(crate) async fn spawn_with_name<A: Actor + Referable>(
888        &self,
889        cx: &impl context::Actor,
890        name: Name,
891        params: &A::Params,
892    ) -> v1::Result<ActorMesh<A>>
893    where
894        A::Params: RemoteMessage,
895    {
896        tracing::info!(
897            name = "ProcMeshStatus",
898            status = "ActorMesh::Spawn::Attempt",
899        );
900        tracing::info!(name = "ActorMeshStatus", status = "Spawn::Attempt");
901        let result = self.spawn_with_name_inner(cx, name, params).await;
902        match &result {
903            Ok(_) => {
904                tracing::info!(
905                    name = "ProcMeshStatus",
906                    status = "ActorMesh::Spawn::Success",
907                );
908                tracing::info!(name = "ActorMeshStatus", status = "Spawn::Success");
909            }
910            Err(error) => {
911                tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Spawn::Failed", %error);
912                tracing::error!(name = "ActorMeshStatus", status = "Spawn::Failed", %error);
913            }
914        }
915        result
916    }
917
918    async fn spawn_with_name_inner<A: Actor + Referable>(
919        &self,
920        cx: &impl context::Actor,
921        name: Name,
922        params: &A::Params,
923    ) -> v1::Result<ActorMesh<A>>
924    where
925        A::Params: RemoteMessage,
926    {
927        let remote = Remote::collect();
928        // `Referable` ensures the type `A` is registered with
929        // `Remote`.
930        let actor_type = remote
931            .name_of::<A>()
932            .ok_or(Error::ActorTypeNotRegistered(type_name::<A>().to_string()))?
933            .to_string();
934
935        let serialized_params = bincode::serialize(params)?;
936        let agent_mesh = self.agent_mesh();
937
938        agent_mesh.cast(
939            cx,
940            resource::CreateOrUpdate::<mesh_agent::ActorSpec> {
941                name: name.clone(),
942                rank: Default::default(),
943                spec: mesh_agent::ActorSpec {
944                    actor_type: actor_type.clone(),
945                    params_data: serialized_params.clone(),
946                },
947            },
948        )?;
949
950        let region = self.region().clone();
951        // Open an accum port that *receives overlays* and *emits full
952        // meshes*.
953        //
954        // NOTE: Mailbox initializes the accumulator state via
955        // `Default`, which is an *empty* ValueMesh (0 ranks). Our
956        // Accumulator<ValueMesh<T>> implementation detects this on
957        // the first update and replaces it with the caller-supplied
958        // template (the `self` passed into open_accum_port), which we
959        // seed here as "full NotExist over the target region".
960        let (port, rx) = cx.mailbox().open_accum_port_opts(
961            // Initial state for the accumulator: full mesh seeded to
962            // NotExist.
963            crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
964            Some(ReducerOpts {
965                max_update_interval: Some(Duration::from_millis(50)),
966            }),
967        );
968
969        let mut reply = port.bind();
970        // If this proc dies or some other issue renders the reply undeliverable,
971        // the reply does not need to be returned to the sender.
972        reply.return_undeliverable(false);
973        // Send a message to all ranks. They reply with overlays to
974        // `port`.
975        agent_mesh.cast(
976            cx,
977            resource::GetRankStatus {
978                name: name.clone(),
979                reply,
980            },
981        )?;
982
983        let start_time = RealClock.now();
984
985        // Wait for all ranks to report a terminal or running status.
986        // If any proc reports a failure (via supervision) or the mesh
987        // times out, `wait()` returns Err with the final snapshot.
988        //
989        // `rx` is the accumulator output stream: each time reduced
990        // overlays are applied, it emits a new StatusMesh snapshot.
991        // `wait()` loops on it, deciding when the stream is
992        // "complete" (no more NotExist) or times out.
993        let mesh = match GetRankStatus::wait(
994            rx,
995            self.ranks.len(),
996            config::global::get(ACTOR_SPAWN_MAX_IDLE),
997            region.clone(), // fallback
998        )
999        .await
1000        {
1001            Ok(statuses) => {
1002                // Spawn succeeds only if no rank has reported a
1003                // supervision/terminal state. This preserves the old
1004                // `first_terminating().is_none()` semantics.
1005                let has_terminating = statuses.values().any(|s| s.is_terminating());
1006                if !has_terminating {
1007                    Ok(ActorMesh::new(self.clone(), name))
1008                } else {
1009                    let legacy = mesh_to_rankedvalues_with_default(
1010                        &statuses,
1011                        Status::NotExist,
1012                        Status::is_not_exist,
1013                        self.ranks.len(),
1014                    );
1015                    Err(Error::ActorSpawnError { statuses: legacy })
1016                }
1017            }
1018            Err(complete) => {
1019                // Fill remaining ranks with a timeout status, now
1020                // handled via the legacy shim.
1021                let elapsed = start_time.elapsed();
1022                let legacy = mesh_to_rankedvalues_with_default(
1023                    &complete,
1024                    Status::Timeout(elapsed),
1025                    Status::is_not_exist,
1026                    self.ranks.len(),
1027                );
1028                Err(Error::ActorSpawnError { statuses: legacy })
1029            }
1030        }?;
1031        // Spawn a unique mesh manager for each actor mesh, so the type of the
1032        // mesh can be preserved.
1033        let _controller: ActorHandle<ActorMeshController<A>> =
1034            ActorMeshController::<A>::spawn(cx, mesh.deref().clone())
1035                .await
1036                .map_err(|e| Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
1037        Ok(mesh)
1038    }
1039
1040    /// Send stop actors message to all mesh agents for a specific mesh name
1041    #[hyperactor::instrument(fields(
1042        host_mesh = self.host_mesh_name().map(|n| n.to_string()),
1043        proc_mesh = self.name.to_string(),
1044        actor_mesh = mesh_name.to_string(),
1045    ))]
1046    pub(crate) async fn stop_actor_by_name(
1047        &self,
1048        cx: &impl context::Actor,
1049        mesh_name: Name,
1050    ) -> v1::Result<()> {
1051        tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Attempt");
1052        tracing::info!(name = "ActorMeshStatus", status = "Stop::Attempt");
1053        let result = self.stop_actor_by_name_inner(cx, mesh_name).await;
1054        match &result {
1055            Ok(_) => {
1056                tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Success");
1057                tracing::info!(name = "ActorMeshStatus", status = "Stop::Success");
1058            }
1059            Err(error) => {
1060                tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Failed", %error);
1061                tracing::error!(name = "ActorMeshStatus", status = "Stop::Failed", %error);
1062            }
1063        }
1064        result
1065    }
1066
1067    async fn stop_actor_by_name_inner(
1068        &self,
1069        cx: &impl context::Actor,
1070        mesh_name: Name,
1071    ) -> v1::Result<()> {
1072        let region = self.region().clone();
1073        let agent_mesh = self.agent_mesh();
1074        agent_mesh.cast(
1075            cx,
1076            resource::Stop {
1077                name: mesh_name.clone(),
1078            },
1079        )?;
1080
1081        // Open an accum port that *receives overlays* and *emits full
1082        // meshes*.
1083        //
1084        // NOTE: Mailbox initializes the accumulator state via
1085        // `Default`, which is an *empty* ValueMesh (0 ranks). Our
1086        // Accumulator<ValueMesh<T>> implementation detects this on
1087        // the first update and replaces it with the caller-supplied
1088        // template (the `self` passed into open_accum_port), which we
1089        // seed here as "full NotExist over the target region".
1090        let (port, rx) = cx.mailbox().open_accum_port_opts(
1091            // Initial state for the accumulator: full mesh seeded to
1092            // NotExist.
1093            crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
1094            Some(ReducerOpts {
1095                max_update_interval: Some(Duration::from_millis(50)),
1096            }),
1097        );
1098        agent_mesh.cast(
1099            cx,
1100            resource::GetRankStatus {
1101                name: mesh_name,
1102                reply: port.bind(),
1103            },
1104        )?;
1105        let start_time = RealClock.now();
1106
1107        // Reuse actor spawn idle time.
1108        let max_idle_time = config::global::get(ACTOR_SPAWN_MAX_IDLE);
1109        match GetRankStatus::wait(
1110            rx,
1111            self.ranks.len(),
1112            max_idle_time,
1113            region.clone(), // fallback mesh if nothing arrives
1114        )
1115        .await
1116        {
1117            Ok(statuses) => {
1118                // Check that all actors are in some terminal state.
1119                // Failed is ok, because one of these actors may have failed earlier
1120                // and we're trying to stop the others.
1121                let all_stopped = statuses.values().all(|s| s.is_terminating());
1122                if all_stopped {
1123                    Ok(())
1124                } else {
1125                    let legacy = mesh_to_rankedvalues_with_default(
1126                        &statuses,
1127                        Status::NotExist,
1128                        Status::is_not_exist,
1129                        self.ranks.len(),
1130                    );
1131                    Err(Error::ActorStopError { statuses: legacy })
1132                }
1133            }
1134            Err(complete) => {
1135                // Fill remaining ranks with a timeout status via the
1136                // legacy shim.
1137                let legacy = mesh_to_rankedvalues_with_default(
1138                    &complete,
1139                    Status::Timeout(start_time.elapsed()),
1140                    Status::is_not_exist,
1141                    self.ranks.len(),
1142                );
1143                Err(Error::ActorStopError { statuses: legacy })
1144            }
1145        }
1146    }
1147}
1148
1149impl fmt::Display for ProcMeshRef {
1150    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1151        write!(f, "{}{{{}}}", self.name, self.region)
1152    }
1153}
1154
1155impl view::Ranked for ProcMeshRef {
1156    type Item = ProcRef;
1157
1158    fn region(&self) -> &Region {
1159        &self.region
1160    }
1161
1162    fn get(&self, rank: usize) -> Option<&Self::Item> {
1163        self.ranks.get(rank)
1164    }
1165}
1166
1167impl view::RankedSliceable for ProcMeshRef {
1168    fn sliced(&self, region: Region) -> Self {
1169        debug_assert!(region.is_subset(view::Ranked::region(self)));
1170        let ranks = self
1171            .region()
1172            .remap(&region)
1173            .unwrap()
1174            .map(|index| self.get(index).unwrap().clone())
1175            .collect();
1176        Self::new(
1177            self.name.clone(),
1178            region,
1179            Arc::new(ranks),
1180            self.host_mesh.clone(),
1181            Some(self.root_region.as_ref().unwrap_or(&self.region).clone()),
1182            self.root_comm_actor.clone(),
1183        )
1184        .unwrap()
1185    }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190    use ndslice::ViewExt;
1191    use ndslice::extent;
1192    use timed_test::async_timed_test;
1193
1194    use crate::resource::RankedValues;
1195    use crate::resource::Status;
1196    use crate::v1::testactor;
1197    use crate::v1::testing;
1198
1199    #[tokio::test]
1200    async fn test_proc_mesh_allocate() {
1201        let (mesh, actor, router) = testing::local_proc_mesh(extent!(replica = 4)).await;
1202        assert_eq!(mesh.extent(), extent!(replica = 4));
1203        assert_eq!(mesh.ranks.len(), 4);
1204        assert!(!router.prefixes().is_empty());
1205
1206        // All of the agents are alive, and reachable (both ways).
1207        for proc_ref in mesh.values() {
1208            assert!(proc_ref.status(&actor).await.unwrap());
1209        }
1210
1211        // Same on the proc mesh:
1212        assert!(
1213            mesh.status(&actor)
1214                .await
1215                .unwrap()
1216                .values()
1217                .all(|status| status)
1218        );
1219    }
1220
1221    #[async_timed_test(timeout_secs = 30)]
1222    #[cfg(fbcode_build)]
1223    async fn test_spawn_actor() {
1224        hyperactor_telemetry::initialize_logging(hyperactor::clock::ClockKind::default());
1225
1226        let instance = testing::instance().await;
1227
1228        for proc_mesh in testing::proc_meshes(&instance, extent!(replicas = 4, hosts = 2)).await {
1229            testactor::assert_mesh_shape(proc_mesh.spawn(instance, "test", &()).await.unwrap())
1230                .await;
1231        }
1232    }
1233
1234    #[tokio::test]
1235    #[cfg(fbcode_build)]
1236    async fn test_failing_spawn_actor() {
1237        hyperactor_telemetry::initialize_logging(hyperactor::clock::ClockKind::default());
1238
1239        let instance = testing::instance().await;
1240
1241        for proc_mesh in testing::proc_meshes(&instance, extent!(replicas = 4, hosts = 2)).await {
1242            let err = proc_mesh
1243                .spawn::<testactor::FailingCreateTestActor>(instance, "testfail", &())
1244                .await
1245                .unwrap_err();
1246            let statuses = err.into_actor_spawn_error().unwrap();
1247            assert_eq!(
1248                statuses,
1249                RankedValues::from((0..8, Status::Failed("test failure".to_string()))),
1250            );
1251        }
1252    }
1253}