hyperactor_mesh/
proc_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::any::type_name;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::fmt;
13use std::hash::Hash;
14use std::ops::Deref;
15use std::panic::Location;
16use std::sync::Arc;
17use std::sync::OnceLock;
18use std::sync::atomic::AtomicUsize;
19use std::sync::atomic::Ordering;
20use std::time::Duration;
21
22use hyperactor::Actor;
23use hyperactor::Handler;
24use hyperactor::RemoteMessage;
25use hyperactor::RemoteSpawn;
26use hyperactor::accum::StreamingReducerOpts;
27use hyperactor::actor::ActorStatus;
28use hyperactor::actor::Referable;
29use hyperactor::actor::remote::Remote;
30use hyperactor::channel;
31use hyperactor::channel::ChannelAddr;
32use hyperactor::context;
33use hyperactor::mailbox::DialMailboxRouter;
34use hyperactor::mailbox::MailboxServer;
35use hyperactor::reference as hyperactor_reference;
36use hyperactor::supervision::ActorSupervisionEvent;
37use hyperactor_config::CONFIG;
38use hyperactor_config::ConfigAttr;
39use hyperactor_config::attrs::declare_attrs;
40use ndslice::Extent;
41use ndslice::ViewExt as _;
42use ndslice::view;
43use ndslice::view::CollectMeshExt;
44use ndslice::view::MapIntoExt;
45use ndslice::view::Ranked;
46use ndslice::view::Region;
47use serde::Deserialize;
48use serde::Serialize;
49use tokio::sync::Notify;
50use tracing::Instrument;
51use typeuri::Named;
52
53use crate::ActorMesh;
54use crate::ActorMeshRef;
55use crate::CommActor;
56use crate::Error;
57use crate::HostMeshRef;
58use crate::Name;
59use crate::ValueMesh;
60use crate::alloc::Alloc;
61use crate::alloc::AllocExt;
62use crate::alloc::AllocatedProc;
63use crate::assign::Ranks;
64use crate::comm::CommMeshConfig;
65use crate::host_mesh::host_agent::ProcState;
66use crate::host_mesh::mesh_to_rankedvalues_with_default;
67use crate::mesh_controller::ActorMeshController;
68use crate::proc_agent;
69use crate::proc_agent::ActorState;
70use crate::proc_agent::MeshAgentMessageClient;
71use crate::proc_agent::ProcAgent;
72use crate::proc_agent::ReconfigurableMailboxSender;
73use crate::resource;
74use crate::resource::GetRankStatus;
75use crate::resource::Status;
76use crate::supervision::MeshFailure;
77
78declare_attrs! {
79    /// The maximum idle time between updates while spawning actor
80    /// meshes.
81    @meta(CONFIG = ConfigAttr::new(
82        Some("HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()),
83        Some("actor_spawn_max_idle".to_string()),
84    ))
85    pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
86
87    /// The maximum idle time between updates while waiting for a response to GetState
88    /// from ProcAgent.
89    @meta(CONFIG = ConfigAttr::new(
90        Some("HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()),
91        Some("get_actor_state_max_idle".to_string()),
92    ))
93    pub attr GET_ACTOR_STATE_MAX_IDLE: Duration = Duration::from_secs(30);
94}
95
96/// Name used for the mesh communication actor spawned on each user proc.
97///
98/// The `CommActor` enables proc-to-proc mesh messaging and is always
99/// present as a system actor (`system_children`) on every proc mesh member.
100pub const COMM_ACTOR_NAME: &str = "comm";
101
102/// A reference to a single [`hyperactor::Proc`].
103#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
104pub struct ProcRef {
105    proc_id: hyperactor_reference::ProcId,
106    /// The rank of this proc at creation.
107    create_rank: usize,
108    /// The agent managing this proc.
109    agent: hyperactor_reference::ActorRef<ProcAgent>,
110}
111
112impl ProcRef {
113    /// Create a new proc ref from the provided id, create rank and agent.
114    pub fn new(
115        proc_id: hyperactor_reference::ProcId,
116        create_rank: usize,
117        agent: hyperactor_reference::ActorRef<ProcAgent>,
118    ) -> Self {
119        Self {
120            proc_id,
121            create_rank,
122            agent,
123        }
124    }
125
126    /// Pings the proc, returning whether it is alive. This will be replaced by a
127    /// finer-grained lifecycle status in the near future.
128    pub(crate) async fn status(&self, cx: &impl context::Actor) -> crate::Result<bool> {
129        let (port, mut rx) = cx.mailbox().open_port();
130        self.agent
131            .status(cx, port.bind())
132            .await
133            .map_err(|e| Error::CallError(self.agent.actor_id().clone(), e))?;
134        loop {
135            let (rank, status) = rx
136                .recv()
137                .await
138                .map_err(|e| Error::CallError(self.agent.actor_id().clone(), e.into()))?;
139            if rank == self.create_rank {
140                break Ok(status);
141            }
142        }
143    }
144
145    pub fn proc_id(&self) -> &hyperactor_reference::ProcId {
146        &self.proc_id
147    }
148
149    pub(crate) fn actor_id(&self, name: &Name) -> hyperactor_reference::ActorId {
150        self.proc_id.actor_id(name.to_string(), 0)
151    }
152
153    /// Generic bound: `A: Referable` - required because we return
154    /// an `ActorRef<A>`.
155    pub(crate) fn attest<A: Referable>(&self, name: &Name) -> hyperactor_reference::ActorRef<A> {
156        hyperactor_reference::ActorRef::attest(self.actor_id(name))
157    }
158}
159
160/// A mesh of processes.
161#[derive(Debug)]
162pub struct ProcMesh {
163    #[allow(dead_code)]
164    name: Name,
165    allocation: ProcMeshAllocation,
166    #[allow(dead_code)]
167    comm_actor_name: Option<Name>,
168    current_ref: ProcMeshRef,
169}
170
171impl ProcMesh {
172    async fn create<C: context::Actor>(
173        cx: &C,
174        name: Name,
175        allocation: ProcMeshAllocation,
176        spawn_comm_actor: bool,
177    ) -> crate::Result<Self>
178    where
179        C::A: Handler<MeshFailure>,
180    {
181        let comm_actor_name = if spawn_comm_actor {
182            Some(Name::new(COMM_ACTOR_NAME).unwrap())
183        } else {
184            None
185        };
186
187        let region = allocation.extent().clone().into();
188        let ranks = allocation.ranks();
189
190        // Set the global supervision sink to the first ProcAgent's
191        // supervision event handler. Last-mesh-wins semantics: if a
192        // previous mesh installed a sink, it is replaced.
193        if let Some(first) = ranks.first() {
194            crate::global_context::set_global_supervision_sink(
195                first.agent.port::<ActorSupervisionEvent>(),
196            );
197        }
198
199        let root_comm_actor = comm_actor_name.as_ref().map(|name| {
200            hyperactor_reference::ActorRef::attest(
201                ranks
202                    .first()
203                    .expect("root mesh cannot be empty")
204                    .actor_id(name),
205            )
206        });
207        let host_mesh = allocation.hosts();
208        let current_ref = ProcMeshRef::new(
209            name.clone(),
210            region,
211            ranks,
212            host_mesh.cloned(),
213            None, // this is the root mesh
214            None, // comm actor is not alive yet
215        )
216        .unwrap();
217
218        // Notify telemetry that the ProcAgent mesh was created.
219        {
220            let name_str = name.to_string();
221            let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
222
223            let (parent_mesh_id, parent_view_json) = match host_mesh {
224                Some(hm) => (
225                    Some(hyperactor_telemetry::hash_to_u64(&hm.name().to_string())),
226                    serde_json::to_string(hm.region()).ok(),
227                ),
228                None => (None, None),
229            };
230
231            hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
232                id: mesh_id_hash,
233                timestamp: std::time::SystemTime::now(),
234                class: "Proc".to_string(),
235                given_name: name.name().to_string(),
236                full_name: name_str,
237                shape_json: serde_json::to_string(&current_ref.region.extent()).unwrap_or_default(),
238                parent_mesh_id,
239                parent_view_json,
240            });
241
242            // Notify telemetry of each ProcAgent actor in this mesh.
243            // These are skipped in Proc::spawn_inner. mesh_id directly points to proc mesh.
244            let now = std::time::SystemTime::now();
245            for rank in current_ref.ranks.iter() {
246                let actor_id = rank.agent.actor_id();
247
248                hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
249                    id: hyperactor_telemetry::hash_to_u64(actor_id),
250                    timestamp: now,
251                    mesh_id: mesh_id_hash,
252                    rank: rank.create_rank as u64,
253                    full_name: actor_id.to_string(),
254                    display_name: None,
255                });
256            }
257        }
258
259        let mut proc_mesh = Self {
260            name,
261            allocation,
262            comm_actor_name: comm_actor_name.clone(),
263            current_ref,
264        };
265
266        if let Some(comm_actor_name) = comm_actor_name {
267            // CommActor satisfies `Actor + Referable`, so it can be
268            // spawned and safely referenced via ActorRef<CommActor>.
269            // It is a system actor that should not have a controller managing it.
270            let comm_actor_mesh: ActorMesh<CommActor> = proc_mesh
271                .spawn_with_name(cx, comm_actor_name, &Default::default(), None, true)
272                .await?;
273            let address_book: HashMap<_, _> = comm_actor_mesh
274                .iter()
275                .map(|(point, actor_ref)| (point.rank(), actor_ref))
276                .collect();
277            // Now that we have all of the spawned comm actors, kick them all into
278            // mesh mode.
279            for (rank, comm_actor) in &address_book {
280                comm_actor
281                    .send(cx, CommMeshConfig::new(*rank, address_book.clone()))
282                    .map_err(|e| Error::SendingError(comm_actor.actor_id().clone(), Box::new(e)))?
283            }
284
285            // The comm actor is now set up and ready to go.
286            proc_mesh.current_ref.root_comm_actor = root_comm_actor;
287        }
288
289        Ok(proc_mesh)
290    }
291
292    pub(crate) async fn create_owned_unchecked<C: context::Actor>(
293        cx: &C,
294        name: Name,
295        extent: Extent,
296        hosts: HostMeshRef,
297        ranks: Vec<ProcRef>,
298    ) -> crate::Result<Self>
299    where
300        C::A: Handler<MeshFailure>,
301    {
302        Self::create(
303            cx,
304            name,
305            ProcMeshAllocation::Owned {
306                hosts,
307                extent,
308                ranks: Arc::new(ranks),
309            },
310            true,
311        )
312        .await
313    }
314
315    fn alloc_counter() -> &'static AtomicUsize {
316        static C: OnceLock<AtomicUsize> = OnceLock::new();
317        C.get_or_init(|| AtomicUsize::new(0))
318    }
319
320    /// Allocate a new ProcMesh from the provided alloc.
321    /// Allocate does not require an owning actor because references are not owned.
322    #[track_caller]
323    pub async fn allocate<C: context::Actor>(
324        cx: &C,
325        alloc: Box<dyn Alloc + Send + Sync + 'static>,
326        name: &str,
327    ) -> crate::Result<Self>
328    where
329        C::A: Handler<MeshFailure>,
330    {
331        let caller = Location::caller();
332        Self::allocate_inner(cx, alloc, Name::new(name)?, caller).await
333    }
334
335    // Use allocate_inner to set field mesh_name in span
336    #[hyperactor::instrument(fields(proc_mesh=name.to_string()))]
337    async fn allocate_inner<C: context::Actor>(
338        cx: &C,
339        mut alloc: Box<dyn Alloc + Send + Sync + 'static>,
340        name: Name,
341        caller: &'static Location<'static>,
342    ) -> crate::Result<Self>
343    where
344        C::A: Handler<MeshFailure>,
345    {
346        let alloc_id = Self::alloc_counter().fetch_add(1, Ordering::Relaxed) + 1;
347        tracing::info!(
348            name = "ProcMeshStatus",
349            status = "Allocate::Attempt",
350            %caller,
351            alloc_id,
352            shape = ?alloc.shape(),
353            "allocating proc mesh"
354        );
355
356        let running = alloc
357            .initialize()
358            .instrument(tracing::info_span!(
359                "ProcMeshStatus::Allocate::Initialize",
360                alloc_id,
361                proc_mesh = %name
362            ))
363            .await?;
364
365        // Wire the newly created mesh into the proc, so that it is routable.
366        // We route all of the relevant prefixes into the proc's forwarder,
367        // and serve it on the alloc's transport.
368        //
369        // This will be removed with direct addressing.
370        let proc = cx.instance().proc();
371
372        // First make sure we can serve the proc:
373        let proc_channel_addr = {
374            let _guard =
375                tracing::info_span!("allocate_serve_proc", proc_id = %proc.proc_id()).entered();
376            let (addr, rx) = channel::serve(ChannelAddr::any(alloc.transport()))?;
377            proc.clone().serve(rx);
378            tracing::info!(
379                name = "ProcMeshStatus",
380                status = "Allocate::ChannelServe",
381                proc_mesh = %name,
382                %addr,
383                "proc started listening on addr: {addr}"
384            );
385            addr
386        };
387
388        let bind_allocated_procs = |router: &DialMailboxRouter| {
389            // Bind procs whose ProcId address differs from their mailbox
390            // serving address. In the v0 bootstrap path, the ProcId embeds
391            // the bootstrap channel address while the proc's mailbox is
392            // served on a separate address. Without an explicit binding the
393            // DialMailboxRouter would direct-dial the bootstrap channel
394            // (which expects Allocator2Process, not MessageEnvelope).
395            for AllocatedProc { proc_id, addr, .. } in running.iter() {
396                if proc_id.addr() != addr {
397                    router.bind(proc_id.clone().into(), addr.clone());
398                }
399            }
400        };
401
402        // Temporary for backward compatibility with ranked procs and v0 API.
403        // Proc meshes can be allocated either using the root client proc (which
404        // has a DialMailboxRouter forwarder) or a mesh agent proc (which has a
405        // ReconfigurableMailboxSender forwarder with an inner DialMailboxRouter).
406        if let Some(router) = proc.forwarder().downcast_ref() {
407            bind_allocated_procs(router);
408        } else if let Some(router) = proc
409            .forwarder()
410            .downcast_ref::<ReconfigurableMailboxSender>()
411        {
412            bind_allocated_procs(
413                router
414                    .as_inner()
415                    .map_err(|_| Error::UnroutableMesh())?
416                    .as_configured()
417                    .ok_or(Error::UnroutableMesh())?
418                    .downcast_ref()
419                    .ok_or(Error::UnroutableMesh())?,
420            );
421        } else {
422            return Err(Error::UnroutableMesh());
423        }
424
425        // Set up the mesh agents. Since references are not owned, we don't supervise it.
426        // Instead, we just let procs die when they have unhandled supervision events.
427        let address_book: HashMap<_, _> = running
428            .iter()
429            .map(
430                |AllocatedProc {
431                     addr, mesh_agent, ..
432                 }| { (mesh_agent.actor_id().proc_id().clone(), addr.clone()) },
433            )
434            .collect();
435
436        let (config_handle, mut config_receiver) = cx.mailbox().open_port();
437        for (rank, AllocatedProc { mesh_agent, .. }) in running.iter().enumerate() {
438            mesh_agent
439                .configure(
440                    cx,
441                    rank,
442                    proc_channel_addr.clone(),
443                    None, // no supervisor; we just crash
444                    address_book.clone(),
445                    config_handle.bind(),
446                    true,
447                )
448                .await
449                .map_err(Error::ConfigurationError)?;
450        }
451        let mut completed = Ranks::new(running.len());
452        while !completed.is_full() {
453            let rank = config_receiver
454                .recv()
455                .await
456                .map_err(|err| Error::ConfigurationError(err.into()))?;
457            if completed.insert(rank, rank).is_some() {
458                tracing::warn!("multiple completions received for rank {}", rank);
459            }
460        }
461
462        let ranks: Vec<_> = running
463            .into_iter()
464            .enumerate()
465            .map(|(create_rank, allocated)| ProcRef {
466                proc_id: allocated.proc_id,
467                create_rank,
468                agent: allocated.mesh_agent,
469            })
470            .collect();
471
472        let stop = Arc::new(Notify::new());
473        let extent = alloc.extent().clone();
474        let alloc_name = alloc.alloc_name().to_string();
475
476        let alloc_task = {
477            let stop = Arc::clone(&stop);
478
479            tokio::spawn(
480                async move {
481                    loop {
482                        tokio::select! {
483                            _ = stop.notified() => {
484                                // If we are explicitly stopped, the alloc is torn down.
485                                if let Err(error) = alloc.stop_and_wait().await {
486                                    tracing::error!(
487                                        name = "ProcMeshStatus",
488                                        alloc_name = %alloc.alloc_name(),
489                                        status = "FailedToStopAlloc",
490                                        %error,
491                                    );
492                                }
493                                break;
494                            }
495                            // We are mostly just using this to drive allocation events.
496                            proc_state = alloc.next() => {
497                                match proc_state {
498                                    // The alloc was stopped.
499                                    None => break,
500                                    Some(proc_state) => {
501                                        tracing::debug!(
502                                            alloc_name = %alloc.alloc_name(),
503                                            "unmonitored allocation event: {}", proc_state);
504                                    }
505                                }
506
507                            }
508                        }
509                    }
510                }
511                .instrument(tracing::info_span!("alloc_monitor")),
512            )
513        };
514
515        let mesh = Self::create(
516            cx,
517            name,
518            ProcMeshAllocation::Allocated {
519                alloc_name,
520                stop,
521                extent,
522                ranks: Arc::new(ranks),
523                alloc_task: Some(alloc_task),
524            },
525            true, // alloc-based meshes support comm actors
526        )
527        .await;
528        match &mesh {
529            Ok(_) => tracing::info!(name = "ProcMeshStatus", status = "Allocate::Created"),
530            Err(error) => {
531                tracing::info!(name = "ProcMeshStatus", status = "Allocate::Failed", %error)
532            }
533        }
534        mesh
535    }
536
537    /// Stop this mesh gracefully.
538    pub async fn stop(&mut self, cx: &impl context::Actor, reason: String) -> anyhow::Result<()> {
539        let region = self.region.clone();
540        match &mut self.allocation {
541            ProcMeshAllocation::Allocated {
542                stop,
543                alloc_task,
544                alloc_name,
545                ..
546            } => {
547                stop.notify_one();
548                // Wait for the alloc monitor task to complete, ensuring the
549                // alloc has fully stopped before we drop it.
550                if let Some(handle) = alloc_task.take() {
551                    if let Err(e) = handle.await {
552                        tracing::warn!(
553                            name = "ProcMeshStatus",
554                            proc_mesh = %self.name,
555                            alloc_name,
556                            %e,
557                            "alloc monitor task failed"
558                        );
559                    }
560                }
561                tracing::info!(
562                    name = "ProcMeshStatus",
563                    proc_mesh = %self.name,
564                    alloc_name,
565                    status = "StoppingAlloc",
566                    "alloc {alloc_name} has stopped",
567                );
568
569                Ok(())
570            }
571            ProcMeshAllocation::Owned { hosts, .. } => {
572                let procs = self
573                    .current_ref
574                    .proc_ids()
575                    .collect::<Vec<hyperactor_reference::ProcId>>();
576                // We use the proc mesh region rather than the host mesh region
577                // because the host agent stores one entry per proc, not per host.
578                hosts
579                    .stop_proc_mesh(cx, &self.name, procs, region, reason)
580                    .await
581            }
582        }
583    }
584
585    #[cfg(test)]
586    pub(crate) fn ranks(&self) -> Arc<Vec<ProcRef>> {
587        self.allocation.ranks()
588    }
589}
590
591impl fmt::Display for ProcMesh {
592    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
593        write!(f, "{}", self.current_ref)
594    }
595}
596
597impl Deref for ProcMesh {
598    type Target = ProcMeshRef;
599
600    fn deref(&self) -> &Self::Target {
601        &self.current_ref
602    }
603}
604
605impl Drop for ProcMesh {
606    fn drop(&mut self) {
607        tracing::info!(
608            name = "ProcMeshStatus",
609            proc_mesh = %self.name,
610            status = "Dropped",
611        );
612    }
613}
614
615/// Represents different ways ProcMeshes can be allocated.
616enum ProcMeshAllocation {
617    /// A mesh that has been allocated from an `Alloc`.
618    Allocated {
619        // The name of the alloc from which this mesh was allocated.
620        alloc_name: String,
621
622        // A cancellation token used to stop the task keeping the alloc alive.
623        stop: Arc<Notify>,
624
625        extent: Extent,
626
627        // The allocated ranks.
628        ranks: Arc<Vec<ProcRef>>,
629
630        // The task handle for the alloc monitor. Used to wait for clean shutdown.
631        alloc_task: Option<tokio::task::JoinHandle<()>>,
632    },
633
634    /// An owned allocation: this ProcMesh fully owns the set of ranks.
635    Owned {
636        /// The host mesh from which the proc mesh was spawned.
637        hosts: HostMeshRef,
638        // This is purely for storage: `hosts.extent()` returns a computed (by value)
639        // extent.
640        extent: Extent,
641        /// A proc reference for each rank in the mesh.
642        ranks: Arc<Vec<ProcRef>>,
643    },
644}
645
646impl ProcMeshAllocation {
647    fn extent(&self) -> &Extent {
648        match self {
649            ProcMeshAllocation::Allocated { extent, .. } => extent,
650            ProcMeshAllocation::Owned { extent, .. } => extent,
651        }
652    }
653
654    fn ranks(&self) -> Arc<Vec<ProcRef>> {
655        Arc::clone(match self {
656            ProcMeshAllocation::Allocated { ranks, .. } => ranks,
657            ProcMeshAllocation::Owned { ranks, .. } => ranks,
658        })
659    }
660
661    fn hosts(&self) -> Option<&HostMeshRef> {
662        match self {
663            ProcMeshAllocation::Allocated { .. } => None,
664            ProcMeshAllocation::Owned { hosts, .. } => Some(hosts),
665        }
666    }
667}
668
669impl fmt::Debug for ProcMeshAllocation {
670    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
671        match self {
672            ProcMeshAllocation::Allocated { ranks, .. } => f
673                .debug_struct("ProcMeshAllocation::Allocated")
674                .field("alloc", &"<dyn Alloc>")
675                .field("ranks", ranks)
676                .finish(),
677            ProcMeshAllocation::Owned {
678                hosts,
679                ranks,
680                extent: _,
681            } => f
682                .debug_struct("ProcMeshAllocation::Owned")
683                .field("hosts", hosts)
684                .field("ranks", ranks)
685                .finish(),
686        }
687    }
688}
689
690/// A reference to a ProcMesh, consisting of a set of ranked [`ProcRef`]s,
691/// arranged into a region. ProcMeshes are named, uniquely identifying the
692/// ProcMesh from which the reference was derived.
693///
694/// ProcMeshes can be sliced to create new ProcMeshes with a subset of the
695/// original ranks.
696#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
697pub struct ProcMeshRef {
698    name: Name,
699    region: Region,
700    ranks: Arc<Vec<ProcRef>>,
701    // Some if this was spawned from a host mesh, else none.
702    host_mesh: Option<HostMeshRef>,
703    // Temporary: used to fit v1 ActorMesh with v0's casting implementation. This
704    // should be removed after we remove the v0 code.
705    // The root region of this mesh. None means this mesh itself is the root.
706    pub(crate) root_region: Option<Region>,
707    // Temporary: used to fit v1 ActorMesh with v0's casting implementation. This
708    // should be removed after we remove the v0 code.
709    // v0 casting requires root mesh rank 0 as the 1st hop, so we need to provide
710    // it here. For v1, this can be removed since v1 can use any rank.
711    pub(crate) root_comm_actor: Option<hyperactor_reference::ActorRef<CommActor>>,
712}
713wirevalue::register_type!(ProcMeshRef);
714
715impl ProcMeshRef {
716    /// Create a new ProcMeshRef from the given name, region, ranks, and so on.
717    #[allow(clippy::result_large_err)]
718    fn new(
719        name: Name,
720        region: Region,
721        ranks: Arc<Vec<ProcRef>>,
722        host_mesh: Option<HostMeshRef>,
723        root_region: Option<Region>,
724        root_comm_actor: Option<hyperactor_reference::ActorRef<CommActor>>,
725    ) -> crate::Result<Self> {
726        if region.num_ranks() != ranks.len() {
727            return Err(crate::Error::InvalidRankCardinality {
728                expected: region.num_ranks(),
729                actual: ranks.len(),
730            });
731        }
732        Ok(Self {
733            name,
734            region,
735            ranks,
736            host_mesh,
737            root_region,
738            root_comm_actor,
739        })
740    }
741
742    /// Create a singleton ProcMeshRef, given the provided ProcRef and name.
743    /// This is used to support creating local singleton proc meshes to support `this_proc()`
744    /// in python client actors.
745    pub fn new_singleton(name: Name, proc_ref: ProcRef) -> Self {
746        Self {
747            name,
748            region: Extent::unity().into(),
749            ranks: Arc::new(vec![proc_ref]),
750            host_mesh: None,
751            root_region: None,
752            root_comm_actor: None,
753        }
754    }
755
756    pub(crate) fn root_comm_actor(&self) -> Option<&hyperactor_reference::ActorRef<CommActor>> {
757        self.root_comm_actor.as_ref()
758    }
759
760    pub fn name(&self) -> &Name {
761        &self.name
762    }
763
764    pub fn host_mesh_name(&self) -> Option<&Name> {
765        self.host_mesh.as_ref().map(|h| h.name())
766    }
767
768    /// Returns the HostMeshRef that this ProcMeshRef might be backed by.
769    /// Returns None if this ProcMeshRef is backed by an Alloc instead of a host mesh.
770    pub fn hosts(&self) -> Option<&HostMeshRef> {
771        self.host_mesh.as_ref()
772    }
773
774    /// The current statuses of procs in this mesh.
775    pub async fn status(&self, cx: &impl context::Actor) -> crate::Result<ValueMesh<bool>> {
776        let vm: ValueMesh<_> = self.map_into(|proc_ref| {
777            let proc_ref = proc_ref.clone();
778            async move { proc_ref.status(cx).await }
779        });
780        vm.join().await.transpose()
781    }
782
783    pub(crate) fn agent_mesh(&self) -> ActorMeshRef<ProcAgent> {
784        let agent_name = self.ranks.first().unwrap().agent.actor_id().name();
785        // This name must match the ProcAgent name, which can change depending on the allocator.
786        // Since we control the agent_name, it is guaranteed to be a valid mesh identifier.
787        // No controller for the ProcAgent mesh.
788        ActorMeshRef::new(Name::new_reserved(agent_name).unwrap(), self.clone(), None)
789    }
790
791    /// Query the state of all actors in this mesh matching "name".
792    pub async fn actor_states(
793        &self,
794        cx: &impl context::Actor,
795        name: Name,
796    ) -> crate::Result<ValueMesh<resource::State<ActorState>>> {
797        self.actor_states_with_keepalive(cx, name, None).await
798    }
799
800    /// Query the state of all actors in this mesh matching "name".
801    /// If keepalive is Some, use a message that indicates to the recipient
802    /// that the owner of the mesh is still alive, along with the expiry time
803    /// after which the actor should be considered orphaned. Else, use a normal
804    /// state query.
805    pub(crate) async fn actor_states_with_keepalive(
806        &self,
807        cx: &impl context::Actor,
808        name: Name,
809        keepalive: Option<std::time::SystemTime>,
810    ) -> crate::Result<ValueMesh<resource::State<ActorState>>> {
811        let agent_mesh = self.agent_mesh();
812        let (port, mut rx) = cx.mailbox().open_port::<resource::State<ActorState>>();
813        // TODO: Use accumulation to get back a single value (representing whether
814        // *any* of the actors failed) instead of a mesh.
815        let get_state = resource::GetState::<ActorState> {
816            name: name.clone(),
817            reply: port.bind(),
818        };
819        if let Some(expires_after) = keepalive {
820            agent_mesh.cast(
821                cx,
822                resource::KeepaliveGetState {
823                    expires_after,
824                    get_state,
825                },
826            )?;
827        } else {
828            agent_mesh.cast(cx, get_state)?;
829        }
830        let expected = self.ranks.len();
831        let mut states = Vec::with_capacity(expected);
832        let timeout = hyperactor_config::global::get(GET_ACTOR_STATE_MAX_IDLE);
833        for _ in 0..expected {
834            // The agent runs on the same process as the running actor, so if some
835            // fatal event caused the process to crash (e.g. OOM, signal, process exit),
836            // the agent will be unresponsive.
837            // We handle this by setting a timeout on the recv, and if we don't get a
838            // message we assume the agent is dead and return a failed state.
839            let state = tokio::time::timeout(timeout, rx.recv()).await;
840            if let Ok(state) = state {
841                // Handle non-timeout receiver error.
842                let state = state?;
843                match state.state {
844                    Some(ref inner) => {
845                        states.push((inner.create_rank, state));
846                    }
847                    None => {
848                        return Err(Error::NotExist(state.name));
849                    }
850                }
851            } else {
852                tracing::error!(
853                    "timeout waiting for a message after {:?} from proc mesh agent in mesh {}",
854                    timeout,
855                    agent_mesh
856                );
857                // Timeout error, stop reading from the receiver and send back what we have so far,
858                // padding with failed states.
859                let all_ranks = (0..self.ranks.len()).collect::<HashSet<_>>();
860                let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
861                let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
862                assert_eq!(leftover_ranks.len(), expected - states.len());
863                while states.len() < expected {
864                    let rank = *leftover_ranks
865                        .pop()
866                        .expect("leftover ranks should not be empty");
867                    let agent = agent_mesh.get(rank).expect("agent should exist");
868                    let agent_id = agent.actor_id().clone();
869                    states.push((
870                        // We populate with any ranks leftover at the time of the timeout.
871                        rank,
872                        resource::State {
873                            name: name.clone(),
874                            status: resource::Status::Timeout(timeout),
875                            // We don't know the ActorId that used to live on this rank.
876                            // But we do know the mesh agent id, so we'll use that.
877                            state: Some(ActorState {
878                                actor_id: agent_id.clone(),
879                                create_rank: rank,
880                                supervision_events: vec![ActorSupervisionEvent::new(
881                                    agent_id,
882                                    None,
883                                    ActorStatus::generic_failure(format!(
884                                        "timeout waiting for message from proc mesh agent while querying for \"{}\". The process likely crashed",
885                                        name,
886                                    )),
887                                    None,
888                                )],
889                            }),
890                        },
891                    ));
892                }
893                break;
894            }
895        }
896        // Ensure that all ranks have replied. Note that if the mesh is sliced,
897        // not all create_ranks may be in the mesh.
898        // Sort by rank, so that the resulting mesh is ordered.
899        states.sort_by_key(|(rank, _)| *rank);
900        let vm = states
901            .into_iter()
902            .map(|(_, state)| state)
903            .collect_mesh::<ValueMesh<_>>(self.region.clone())?;
904        Ok(vm)
905    }
906
907    pub async fn proc_states(
908        &self,
909        cx: &impl context::Actor,
910    ) -> crate::Result<Option<ValueMesh<resource::State<ProcState>>>> {
911        let names = self
912            .proc_ids()
913            .collect::<Vec<hyperactor_reference::ProcId>>();
914        if let Some(host_mesh) = &self.host_mesh {
915            Ok(Some(
916                host_mesh
917                    .proc_states(cx, names, self.region.clone())
918                    .await?,
919            ))
920        } else {
921            Ok(None)
922        }
923    }
924
925    /// Returns an iterator over the proc ids in this mesh.
926    pub(crate) fn proc_ids(&self) -> impl Iterator<Item = hyperactor_reference::ProcId> {
927        self.ranks.iter().map(|proc_ref| proc_ref.proc_id.clone())
928    }
929
930    /// Spawn an actor on all of the procs in this mesh, returning a
931    /// new ActorMesh.
932    ///
933    /// Bounds:
934    /// - `A: Actor` - the actor actually runs inside each proc.
935    /// - `A: Referable` - so we can return typed `ActorRef<A>`s
936    ///   inside the `ActorMesh`.
937    /// - `A::Params: RemoteMessage` - spawn parameters must be
938    ///   serializable and routable.
939    pub async fn spawn<A: RemoteSpawn, C: context::Actor>(
940        &self,
941        cx: &C,
942        name: &str,
943        params: &A::Params,
944    ) -> crate::Result<ActorMesh<A>>
945    where
946        A::Params: RemoteMessage,
947        C::A: Handler<MeshFailure>,
948    {
949        // Spawning from a string is never a system actor.
950        self.spawn_with_name(cx, Name::new(name)?, params, None, false)
951            .await
952    }
953
954    /// Spawn a 'service' actor. Service actors are *singletons*, using
955    /// reserved names. The provided name is used verbatim as the actor's
956    /// name, and thus it may be persistently looked up by constructing
957    /// the appropriate name.
958    ///
959    /// Note: avoid using service actors if possible; the mechanism will
960    /// be replaced by an actor registry.
961    pub async fn spawn_service<A: RemoteSpawn, C: context::Actor>(
962        &self,
963        cx: &C,
964        name: &str,
965        params: &A::Params,
966    ) -> crate::Result<ActorMesh<A>>
967    where
968        A::Params: RemoteMessage,
969        C::A: Handler<MeshFailure>,
970    {
971        self.spawn_with_name(cx, Name::new_reserved(name)?, params, None, false)
972            .await
973    }
974
975    /// Spawn an actor on all procs in this mesh under the given
976    /// [`Name`], returning a new `ActorMesh`.
977    ///
978    /// This is the underlying implementation used by [`spawn`]; it
979    /// differs only in that the actor name is passed explicitly
980    /// rather than as a `&str`.
981    ///
982    /// Bounds:
983    /// - `A: Actor` - the actor actually runs inside each proc.
984    /// - `A: Referable` - so we can return typed `ActorRef<A>`s
985    ///   inside the `ActorMesh`.
986    /// - `A::Params: RemoteMessage` - spawn parameters must be
987    ///   serializable and routable.
988    /// - `C::A: Handler<MeshFailure>` - in order to spawn actors,
989    ///   the actor must accept messages of type `MeshFailure`. This
990    ///   is delivered when the actors spawned in the mesh have a failure that
991    ///   isn't handled.
992    #[hyperactor::instrument(fields(
993        host_mesh=self.host_mesh_name().map(|n| n.to_string()),
994        proc_mesh=self.name.to_string(),
995        actor_name=name.to_string(),
996    ))]
997    pub async fn spawn_with_name<A: RemoteSpawn, C: context::Actor>(
998        &self,
999        cx: &C,
1000        name: Name,
1001        params: &A::Params,
1002        supervision_display_name: Option<String>,
1003        is_system_actor: bool,
1004    ) -> crate::Result<ActorMesh<A>>
1005    where
1006        A::Params: RemoteMessage,
1007        C::A: Handler<MeshFailure>,
1008    {
1009        tracing::info!(
1010            name = "ProcMeshStatus",
1011            status = "ActorMesh::Spawn::Attempt",
1012        );
1013        tracing::info!(name = "ActorMeshStatus", status = "Spawn::Attempt");
1014        let result = self
1015            .spawn_with_name_inner(cx, name, params, supervision_display_name, is_system_actor)
1016            .await;
1017        match &result {
1018            Ok(_) => {
1019                tracing::info!(
1020                    name = "ProcMeshStatus",
1021                    status = "ActorMesh::Spawn::Success",
1022                );
1023                tracing::info!(name = "ActorMeshStatus", status = "Spawn::Success");
1024            }
1025            Err(error) => {
1026                tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Spawn::Failed", %error);
1027                tracing::error!(name = "ActorMeshStatus", status = "Spawn::Failed", %error);
1028            }
1029        }
1030        result
1031    }
1032
1033    async fn spawn_with_name_inner<A: RemoteSpawn, C: context::Actor>(
1034        &self,
1035        cx: &C,
1036        name: Name,
1037        params: &A::Params,
1038        supervision_display_name: Option<String>,
1039        is_system_actor: bool,
1040    ) -> crate::Result<ActorMesh<A>>
1041    where
1042        C::A: Handler<MeshFailure>,
1043    {
1044        let remote = Remote::collect();
1045        // `RemoteSpawn` + `remote!(A)` ensure that `A` has a
1046        // `SpawnableActor` entry in this registry, so
1047        // `name_of::<A>()` can resolve its global type name.
1048        let actor_type = remote
1049            .name_of::<A>()
1050            .ok_or(Error::ActorTypeNotRegistered(type_name::<A>().to_string()))?
1051            .to_string();
1052
1053        let serialized_params = bincode::serialize(params)?;
1054        let agent_mesh = self.agent_mesh();
1055
1056        agent_mesh.cast(
1057            cx,
1058            resource::CreateOrUpdate::<proc_agent::ActorSpec> {
1059                name: name.clone(),
1060                rank: Default::default(),
1061                spec: proc_agent::ActorSpec {
1062                    actor_type: actor_type.clone(),
1063                    params_data: serialized_params.clone(),
1064                },
1065            },
1066        )?;
1067
1068        let region = self.region().clone();
1069        // Open an accum port that *receives overlays* and *emits full
1070        // meshes*.
1071        //
1072        // NOTE: Mailbox initializes the accumulator state via
1073        // `Default`, which is an *empty* ValueMesh (0 ranks). Our
1074        // Accumulator<ValueMesh<T>> implementation detects this on
1075        // the first update and replaces it with the caller-supplied
1076        // template (the `self` passed into open_accum_port), which we
1077        // seed here as "full NotExist over the target region".
1078        let (port, rx) = cx.mailbox().open_accum_port_opts(
1079            // Initial state for the accumulator: full mesh seeded to
1080            // NotExist.
1081            crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1082            StreamingReducerOpts {
1083                max_update_interval: Some(Duration::from_millis(50)),
1084                initial_update_interval: None,
1085            },
1086        );
1087
1088        let mut reply = port.bind();
1089        // If this proc dies or some other issue renders the reply undeliverable,
1090        // the reply does not need to be returned to the sender.
1091        reply.return_undeliverable(false);
1092        // Send a message to all ranks. They reply with overlays to
1093        // `port`.
1094        agent_mesh.cast(
1095            cx,
1096            resource::GetRankStatus {
1097                name: name.clone(),
1098                reply,
1099            },
1100        )?;
1101
1102        let start_time = tokio::time::Instant::now();
1103
1104        // Wait for all ranks to report a terminal or running status.
1105        // If any proc reports a failure (via supervision) or the mesh
1106        // times out, `wait()` returns Err with the final snapshot.
1107        //
1108        // `rx` is the accumulator output stream: each time reduced
1109        // overlays are applied, it emits a new StatusMesh snapshot.
1110        // `wait()` loops on it, deciding when the stream is
1111        // "complete" (no more NotExist) or times out.
1112        let (statuses, mut mesh) = match GetRankStatus::wait(
1113            rx,
1114            self.ranks.len(),
1115            hyperactor_config::global::get(ACTOR_SPAWN_MAX_IDLE),
1116            region.clone(), // fallback
1117        )
1118        .await
1119        {
1120            Ok(statuses) => {
1121                // Spawn succeeds only if no rank has reported a
1122                // supervision/terminal state. This preserves the old
1123                // `first_terminating().is_none()` semantics.
1124                let has_terminating = statuses.values().any(|s| s.is_terminating());
1125                if !has_terminating {
1126                    Ok((statuses, ActorMesh::new(self.clone(), name.clone(), None)))
1127                } else {
1128                    let legacy = mesh_to_rankedvalues_with_default(
1129                        &statuses,
1130                        Status::NotExist,
1131                        Status::is_not_exist,
1132                        self.ranks.len(),
1133                    );
1134                    Err(Error::ActorSpawnError { statuses: legacy })
1135                }
1136            }
1137            Err(complete) => {
1138                // Fill remaining ranks with a timeout status, now
1139                // handled via the legacy shim.
1140                let elapsed = start_time.elapsed();
1141                let legacy = mesh_to_rankedvalues_with_default(
1142                    &complete,
1143                    Status::Timeout(elapsed),
1144                    Status::is_not_exist,
1145                    self.ranks.len(),
1146                );
1147                Err(Error::ActorSpawnError { statuses: legacy })
1148            }
1149        }?;
1150        // We don't need controllers for a system actor like the CommActor.
1151        if !is_system_actor {
1152            // Spawn a unique mesh manager for each actor mesh, so the type of the
1153            // mesh can be preserved.
1154            let controller: ActorMeshController<A> = ActorMeshController::new(
1155                mesh.deref().clone(),
1156                supervision_display_name.clone(),
1157                Some(cx.instance().port().bind()),
1158                statuses,
1159            );
1160            // AI-3: controller name must include mesh identity for
1161            // proc-wide ActorId uniqueness. A fixed base name alone
1162            // collides across parents because pid allocation is
1163            // parent-scoped.
1164            let controller_name = format!(
1165                "{}_{}",
1166                crate::mesh_controller::ACTOR_MESH_CONTROLLER_NAME,
1167                mesh.name()
1168            );
1169            let controller = controller
1170                .spawn_with_name(cx, &controller_name)
1171                .map_err(|e| Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
1172            // Controller and ActorMesh both depend on references from each other, break
1173            // the cycle by setting the controller after the fact.
1174            mesh.set_controller(Some(controller.bind()));
1175        }
1176        // Notify telemetry that an actor mesh was created.
1177        {
1178            let name_str = mesh.name().to_string();
1179
1180            // Hash the actor mesh name. This is used as mesh_id for both
1181            // the MeshEvent and the per-actor ActorEvents below.
1182            let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
1183
1184            // Hash the proc mesh name for parent_mesh_id.
1185            let parent_mesh_id_hash = hyperactor_telemetry::hash_to_u64(&self.name().to_string());
1186
1187            hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
1188                id: mesh_id_hash,
1189                timestamp: std::time::SystemTime::now(),
1190                class: supervision_display_name
1191                    .as_deref()
1192                    .and_then(python_class_from_supervision_name)
1193                    .unwrap_or(actor_type),
1194                given_name: mesh.name().name().to_string(),
1195                full_name: name_str,
1196                shape_json: serde_json::to_string(&self.region().extent()).unwrap_or_default(),
1197                parent_mesh_id: Some(parent_mesh_id_hash),
1198                parent_view_json: serde_json::to_string(self.region()).ok(),
1199            });
1200
1201            // Notify telemetry of each actor in this mesh. The rank is
1202            // the actor's position within the actor mesh (not the proc's
1203            // create_rank, which reflects the original unsliced mesh).
1204            let now = std::time::SystemTime::now();
1205            for (rank, proc_ref) in self.ranks.iter().enumerate() {
1206                let display_name = supervision_display_name.as_ref().map(|sdn| {
1207                    let point = self.region().extent().point_of_rank(rank).unwrap();
1208                    crate::actor_display_name(sdn, &point)
1209                });
1210                let actor_id = proc_ref.actor_id(&name);
1211                hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
1212                    id: hyperactor_telemetry::hash_to_u64(&actor_id),
1213                    timestamp: now,
1214                    mesh_id: mesh_id_hash,
1215                    rank: rank as u64,
1216                    full_name: actor_id.to_string(),
1217                    display_name,
1218                });
1219            }
1220        }
1221
1222        Ok(mesh)
1223    }
1224
1225    /// Send stop actors message to all mesh agents for a specific mesh name
1226    #[hyperactor::instrument(fields(
1227        host_mesh = self.host_mesh_name().map(|n| n.to_string()),
1228        proc_mesh = self.name.to_string(),
1229        actor_mesh = mesh_name.to_string(),
1230    ))]
1231    pub(crate) async fn stop_actor_by_name(
1232        &self,
1233        cx: &impl context::Actor,
1234        mesh_name: Name,
1235        reason: String,
1236    ) -> crate::Result<ValueMesh<Status>> {
1237        tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Attempt");
1238        tracing::info!(name = "ActorMeshStatus", status = "Stop::Attempt");
1239        let result = self.stop_actor_by_name_inner(cx, mesh_name, reason).await;
1240        match &result {
1241            Ok(_) => {
1242                tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Success");
1243                tracing::info!(name = "ActorMeshStatus", status = "Stop::Success");
1244            }
1245            Err(error) => {
1246                tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Failed", %error);
1247                tracing::error!(name = "ActorMeshStatus", status = "Stop::Failed", %error);
1248            }
1249        }
1250        result
1251    }
1252
1253    async fn stop_actor_by_name_inner(
1254        &self,
1255        cx: &impl context::Actor,
1256        mesh_name: Name,
1257        reason: String,
1258    ) -> crate::Result<ValueMesh<Status>> {
1259        let region = self.region().clone();
1260        let agent_mesh = self.agent_mesh();
1261        agent_mesh.cast(
1262            cx,
1263            resource::Stop {
1264                name: mesh_name.clone(),
1265                reason,
1266            },
1267        )?;
1268
1269        // Open an accum port that *receives overlays* and *emits full
1270        // meshes*.
1271        //
1272        // NOTE: Mailbox initializes the accumulator state via
1273        // `Default`, which is an *empty* ValueMesh (0 ranks). Our
1274        // Accumulator<ValueMesh<T>> implementation detects this on
1275        // the first update and replaces it with the caller-supplied
1276        // template (the `self` passed into open_accum_port), which we
1277        // seed here as "full NotExist over the target region".
1278        let (port, rx) = cx.mailbox().open_accum_port_opts(
1279            // Initial state for the accumulator: full mesh seeded to
1280            // NotExist.
1281            crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1282            StreamingReducerOpts {
1283                max_update_interval: Some(Duration::from_millis(50)),
1284                initial_update_interval: None,
1285            },
1286        );
1287        agent_mesh.cast(
1288            cx,
1289            resource::GetRankStatus {
1290                name: mesh_name,
1291                reply: port.bind(),
1292            },
1293        )?;
1294        let start_time = tokio::time::Instant::now();
1295
1296        // Reuse actor spawn idle time.
1297        let max_idle_time = hyperactor_config::global::get(ACTOR_SPAWN_MAX_IDLE);
1298        match GetRankStatus::wait(
1299            rx,
1300            self.ranks.len(),
1301            max_idle_time,
1302            region.clone(), // fallback mesh if nothing arrives
1303        )
1304        .await
1305        {
1306            Ok(statuses) => {
1307                // Check that all actors are in some terminal state.
1308                // Failed is ok, because one of these actors may have failed earlier
1309                // and we're trying to stop the others.
1310                let all_stopped = statuses.values().all(|s| s.is_terminating());
1311                if all_stopped {
1312                    Ok(statuses)
1313                } else {
1314                    let legacy = mesh_to_rankedvalues_with_default(
1315                        &statuses,
1316                        Status::NotExist,
1317                        Status::is_not_exist,
1318                        self.ranks.len(),
1319                    );
1320                    Err(Error::ActorStopError { statuses: legacy })
1321                }
1322            }
1323            Err(complete) => {
1324                // Fill remaining ranks with a timeout status via the
1325                // legacy shim.
1326                let legacy = mesh_to_rankedvalues_with_default(
1327                    &complete,
1328                    Status::Timeout(start_time.elapsed()),
1329                    Status::is_not_exist,
1330                    self.ranks.len(),
1331                );
1332                Err(Error::ActorStopError { statuses: legacy })
1333            }
1334        }
1335    }
1336}
1337
1338impl fmt::Display for ProcMeshRef {
1339    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1340        write!(f, "{}{{{}}}", self.name, self.region)
1341    }
1342}
1343
1344impl view::Ranked for ProcMeshRef {
1345    type Item = ProcRef;
1346
1347    fn region(&self) -> &Region {
1348        &self.region
1349    }
1350
1351    fn get(&self, rank: usize) -> Option<&Self::Item> {
1352        self.ranks.get(rank)
1353    }
1354}
1355
1356impl view::RankedSliceable for ProcMeshRef {
1357    fn sliced(&self, region: Region) -> Self {
1358        debug_assert!(region.is_subset(view::Ranked::region(self)));
1359        let ranks = self
1360            .region()
1361            .remap(&region)
1362            .unwrap()
1363            .map(|index| self.get(index).unwrap().clone())
1364            .collect();
1365        Self::new(
1366            self.name.clone(),
1367            region,
1368            Arc::new(ranks),
1369            self.host_mesh.clone(),
1370            Some(self.root_region.as_ref().unwrap_or(&self.region).clone()),
1371            self.root_comm_actor.clone(),
1372        )
1373        .unwrap()
1374    }
1375}
1376
1377/// Extract a Python class display name from a supervision display name.
1378///
1379/// The supervision display name format is `{instance}.<{module}.{ClassName} {mesh_name}>`.
1380/// Returns `"Python<ClassName>"` if the format matches, `None` otherwise.
1381fn python_class_from_supervision_name(sdn: &str) -> Option<String> {
1382    let inner = sdn.rsplit_once('<')?.1.strip_suffix('>')?;
1383    let qualified = inner.split_whitespace().next()?;
1384    let class_name = qualified.rsplit_once('.')?.1;
1385    Some(format!("Python<{class_name}>"))
1386}
1387
1388#[cfg(test)]
1389mod tests {
1390    use hyperactor::Instance;
1391    use ndslice::ViewExt;
1392    use ndslice::extent;
1393    use timed_test::async_timed_test;
1394
1395    use crate::resource::RankedValues;
1396    use crate::resource::Status;
1397    use crate::testactor;
1398    use crate::testing;
1399
1400    #[tokio::test]
1401    async fn test_proc_mesh_allocate() {
1402        let (mesh, actor, _router) = testing::local_proc_mesh(extent!(replica = 4)).await;
1403        assert_eq!(mesh.extent(), extent!(replica = 4));
1404        assert_eq!(mesh.ranks.len(), 4);
1405
1406        // All of the agents are alive, and reachable (both ways).
1407        for proc_ref in mesh.values() {
1408            assert!(proc_ref.status(&actor).await.unwrap());
1409        }
1410
1411        // Same on the proc mesh:
1412        assert!(
1413            mesh.status(&actor)
1414                .await
1415                .unwrap()
1416                .values()
1417                .all(|status| status)
1418        );
1419    }
1420
1421    #[async_timed_test(timeout_secs = 30)]
1422    #[cfg(fbcode_build)]
1423    async fn test_spawn_actor() {
1424        hyperactor_telemetry::initialize_logging(hyperactor_telemetry::DefaultTelemetryClock {});
1425
1426        let instance = testing::instance();
1427
1428        let mut hm = testing::host_mesh(4).await;
1429        let proc_mesh = hm
1430            .spawn(&instance, "test", extent!(gpus = 2))
1431            .await
1432            .unwrap();
1433        let actor_mesh = proc_mesh.spawn(instance, "test", &()).await.unwrap();
1434        testactor::assert_mesh_shape(actor_mesh).await;
1435
1436        let _ = hm.shutdown(instance).await;
1437    }
1438
1439    #[tokio::test]
1440    #[cfg(fbcode_build)]
1441    async fn test_failing_spawn_actor() {
1442        hyperactor_telemetry::initialize_logging(hyperactor_telemetry::DefaultTelemetryClock {});
1443
1444        let instance = testing::instance();
1445
1446        let mut hm = testing::host_mesh(4).await;
1447        let proc_mesh = hm
1448            .spawn(&instance, "test", extent!(gpus = 2))
1449            .await
1450            .unwrap();
1451        let err = proc_mesh
1452            .spawn::<testactor::FailingCreateTestActor, Instance<testing::TestRootClient>>(
1453                instance,
1454                "testfail",
1455                &(),
1456            )
1457            .await
1458            .unwrap_err();
1459        let statuses = err.into_actor_spawn_error().unwrap();
1460        assert_eq!(
1461            statuses,
1462            RankedValues::from((0..8, Status::Failed("test failure".to_string()))),
1463        );
1464
1465        let _ = hm.shutdown(instance).await;
1466    }
1467
1468    #[test]
1469    fn test_python_class_from_supervision_name() {
1470        use super::python_class_from_supervision_name;
1471
1472        assert_eq!(
1473            python_class_from_supervision_name("instance0.<my_module.MyWorker test_mesh>"),
1474            Some("Python<MyWorker>".to_string()),
1475        );
1476        assert_eq!(
1477            python_class_from_supervision_name(
1478                "instance0.<package.submodule.TrainingActor mesh_0>"
1479            ),
1480            Some("Python<TrainingActor>".to_string()),
1481        );
1482        // No angle brackets — not a Python supervision name.
1483        assert_eq!(python_class_from_supervision_name("plain_name"), None,);
1484        // Malformed: missing dot-qualified class name.
1485        assert_eq!(
1486            python_class_from_supervision_name("instance0.<NoModule mesh>"),
1487            None,
1488        );
1489    }
1490}