Skip to main content

hyperactor_mesh/
proc_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::any::type_name;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::fmt;
13use std::hash::Hash;
14use std::ops::Deref;
15use std::sync::Arc;
16use std::time::Duration;
17
18use hyperactor::Actor;
19use hyperactor::ActorAddr;
20use hyperactor::ActorRef;
21use hyperactor::Endpoint as _;
22use hyperactor::Handler;
23use hyperactor::ProcAddr;
24use hyperactor::RemoteMessage;
25use hyperactor::RemoteSpawn;
26use hyperactor::accum::StreamingReducerOpts;
27use hyperactor::actor::ActorStatus;
28use hyperactor::actor::Referable;
29use hyperactor::actor::remote::Remote;
30use hyperactor::context;
31use hyperactor::id::Label;
32use hyperactor::supervision::ActorSupervisionEvent;
33use hyperactor_config::CONFIG;
34use hyperactor_config::ConfigAttr;
35use hyperactor_config::attrs::declare_attrs;
36use ndslice::Extent;
37use ndslice::ViewExt as _;
38use ndslice::view;
39use ndslice::view::CollectMeshExt;
40use ndslice::view::Ranked;
41use ndslice::view::Region;
42use serde::Deserialize;
43use serde::Serialize;
44use typeuri::Named;
45
46use crate::ActorMesh;
47use crate::ActorMeshRef;
48use crate::CommActor;
49use crate::Error;
50use crate::HostMeshRef;
51use crate::ValueMesh;
52use crate::comm::CommMeshConfig;
53use crate::host_mesh::host_agent::ProcState;
54use crate::host_mesh::mesh_to_rankedvalues_with_default;
55use crate::mesh_controller::ActorMeshController;
56use crate::mesh_id::ActorMeshId;
57use crate::mesh_id::ProcMeshId;
58use crate::proc_agent;
59use crate::proc_agent::ActorState;
60use crate::proc_agent::ProcAgent;
61use crate::resource;
62use crate::resource::GetRankStatus;
63use crate::resource::Status;
64use crate::supervision::MeshFailure;
65
66declare_attrs! {
67    /// The maximum idle time between updates while spawning actor
68    /// meshes.
69    @meta(CONFIG = ConfigAttr::new(
70        Some("HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()),
71        Some("actor_spawn_max_idle".to_string()),
72    ))
73    pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
74
75    /// The maximum idle time between updates while waiting for a response to GetState
76    /// from ProcAgent.
77    @meta(CONFIG = ConfigAttr::new(
78        Some("HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()),
79        Some("get_actor_state_max_idle".to_string()),
80    ))
81    pub attr GET_ACTOR_STATE_MAX_IDLE: Duration = Duration::from_secs(30);
82}
83
84/// Name used for the mesh communication actor spawned on each user proc.
85///
86/// The `CommActor` enables proc-to-proc mesh messaging and is always
87/// present as a system actor (`system_children`) on every proc mesh member.
88pub const COMM_ACTOR_NAME: &str = "comm";
89
90/// A reference to a single [`hyperactor::Proc`].
91#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
92pub struct ProcRef {
93    proc_id: ProcAddr,
94    /// The rank of this proc at creation.
95    create_rank: usize,
96    /// The agent managing this proc.
97    agent: ActorRef<ProcAgent>,
98}
99
100impl ProcRef {
101    /// Create a new proc ref from the provided id, create rank and agent.
102    pub fn new(proc_id: ProcAddr, create_rank: usize, agent: ActorRef<ProcAgent>) -> Self {
103        Self {
104            proc_id,
105            create_rank,
106            agent,
107        }
108    }
109
110    pub fn proc_addr(&self) -> &ProcAddr {
111        &self.proc_id
112    }
113
114    pub(crate) fn actor_addr(&self, id: &ActorMeshId) -> ActorAddr {
115        self.proc_id.actor_addr_uid(id.uid().clone())
116    }
117
118    /// Generic bound: `A: Referable` - required because we return
119    /// an `ActorRef<A>`.
120    pub(crate) fn attest<A: Referable>(&self, id: &ActorMeshId) -> ActorRef<A> {
121        ActorRef::attest(self.actor_addr(id))
122    }
123}
124
125/// A mesh of processes.
126#[derive(Debug)]
127pub struct ProcMesh {
128    #[allow(dead_code)]
129    id: ProcMeshId,
130    #[allow(dead_code)]
131    comm_actor_name: Option<ActorMeshId>,
132    current_ref: ProcMeshRef,
133    controller: Option<ActorRef<crate::mesh_controller::ProcMeshController>>,
134}
135
136impl ProcMesh {
137    pub(crate) async fn create<C: context::Actor>(
138        cx: &C,
139        id: ProcMeshId,
140        extent: Extent,
141        hosts: HostMeshRef,
142        ranks: Vec<ProcRef>,
143    ) -> crate::Result<Self>
144    where
145        C::A: Handler<MeshFailure>,
146    {
147        let comm_actor_name = ActorMeshId::singleton(Label::new(COMM_ACTOR_NAME).unwrap());
148
149        let region = extent.into();
150        let ranks = Arc::new(ranks);
151
152        // Set the global supervision sink to the first ProcAgent's
153        // supervision event handler. Last-mesh-wins semantics: if a
154        // previous mesh installed a sink, it is replaced.
155        if let Some(first) = ranks.first() {
156            crate::global_context::set_global_supervision_sink(
157                first.agent.port::<ActorSupervisionEvent>(),
158            );
159        }
160
161        let root_comm_actor: ActorRef<CommActor> = ActorRef::attest(
162            ranks
163                .first()
164                .expect("root mesh cannot be empty")
165                .actor_addr(&comm_actor_name),
166        );
167        let current_ref = ProcMeshRef::new(
168            id.clone(),
169            region,
170            ranks,
171            Some(hosts),
172            None, // this is the root mesh
173            None, // comm actor is not alive yet
174        )
175        .unwrap();
176
177        // Notify telemetry that the ProcAgent mesh was created.
178        {
179            let name_str = id.to_string();
180            let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
181
182            let hm = current_ref
183                .host_mesh
184                .as_ref()
185                .expect("ProcMesh always has a host mesh");
186            let parent_mesh_id = hyperactor_telemetry::hash_to_u64(&hm.id().to_string());
187            let parent_view_json = serde_json::to_string(hm.region())
188                .unwrap_or_else(|e| format!("encountered error when serializing region: {}", e));
189
190            hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
191                id: mesh_id_hash,
192                timestamp: std::time::SystemTime::now(),
193                class: "Proc".to_string(),
194                given_name: id
195                    .display_label()
196                    .map(|l| l.as_str())
197                    .unwrap_or("unnamed")
198                    .to_string(),
199                full_name: name_str,
200                shape_json: serde_json::to_string(&current_ref.region.extent()).unwrap_or_default(),
201                parent_mesh_id: Some(parent_mesh_id),
202                parent_view_json: Some(parent_view_json),
203            });
204
205            // Notify telemetry of each ProcAgent actor in this mesh.
206            // These are skipped in Proc::spawn_inner. mesh_id directly points to proc mesh.
207            let now = std::time::SystemTime::now();
208            for rank in current_ref.ranks.iter() {
209                let actor_id = rank.agent.actor_addr();
210
211                hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
212                    id: hyperactor_telemetry::hash_to_u64(&actor_id),
213                    timestamp: now,
214                    mesh_id: mesh_id_hash,
215                    rank: rank.create_rank as u64,
216                    full_name: actor_id.to_string(),
217                    display_name: None,
218                });
219            }
220        }
221
222        let mut proc_mesh = Self {
223            id,
224            comm_actor_name: Some(comm_actor_name.clone()),
225            current_ref,
226            controller: None,
227        };
228
229        // CommActor satisfies `Actor + Referable`, so it can be
230        // spawned and safely referenced via ActorRef<CommActor>.
231        // It is a system actor that should not have a controller managing it.
232        let comm_actor_mesh: ActorMesh<CommActor> = proc_mesh
233            .spawn_with_name(cx, comm_actor_name, &Default::default(), None, true)
234            .await?;
235        let address_book: HashMap<_, _> = comm_actor_mesh
236            .iter()
237            .map(|(point, actor_ref)| (point.rank(), actor_ref))
238            .collect();
239        // Now that we have all of the spawned comm actors, kick them all into
240        // mesh mode.
241        for (rank, comm_actor) in &address_book {
242            comm_actor.post(cx, CommMeshConfig::new(*rank, address_book.clone()));
243        }
244        proc_mesh.current_ref.root_comm_actor = Some(root_comm_actor);
245
246        Ok(proc_mesh)
247    }
248
249    /// Set or clear the controller actor managing this mesh.
250    pub(crate) fn set_controller(
251        &mut self,
252        controller: Option<ActorRef<crate::mesh_controller::ProcMeshController>>,
253    ) {
254        self.controller = controller;
255    }
256
257    /// Stop this mesh gracefully.
258    ///
259    /// If a `ProcMeshController` is present (owned meshes spawned from a host
260    /// mesh), the stop is delegated to the controller via `resource::Stop`;
261    /// the controller's handler awaits `HostMeshRef::stop_proc_mesh`, which
262    /// casts `Stop` + `WaitRankStatus{min_status: Stopped}` to the
263    /// HostAgents and waits up to `PROC_STOP_MAX_IDLE` for every proc to
264    /// reach `Stopped`. We then serialize behind that handler with a
265    /// `GetState` to read the final statuses out of the controller's
266    /// `health_state`.
267    pub async fn stop(&mut self, cx: &impl context::Actor, reason: String) -> anyhow::Result<()> {
268        if let Some(controller) = self.controller.take() {
269            let id = self.id.resource_id().clone();
270            controller.post(
271                cx,
272                resource::Stop {
273                    id: id.clone(),
274                    reason,
275                },
276            );
277
278            // The controller processes messages serially, so by the time it
279            // gets to this `GetState`, its `health_state.statuses` already
280            // reflects the outcome of `stop_proc_mesh` (Stopping, Stopped,
281            // Failed, or Timeout on `PROC_STOP_MAX_IDLE` exhaustion).
282            let (port, mut rx) = cx.mailbox().open_port();
283            controller.post(
284                cx,
285                resource::GetState::<resource::mesh::State<()>> {
286                    id: id.clone(),
287                    reply: port.bind(),
288                },
289            );
290
291            let statuses = rx.recv().await?;
292            let Some(state) = &statuses.state else {
293                anyhow::bail!(
294                    "non-existent state in GetState reply from controller: {}",
295                    controller.actor_addr()
296                );
297            };
298            // `is_terminating` accepts Stopping, Stopped, Failed, and
299            // Timeout. The controller's Stop handler has already awaited
300            // (or timed out) the underlying HostAgent wait, so any rank
301            // still in Running here means the controller never processed
302            // the stop for that rank.
303            let all_stopped = state.statuses.values().all(|s| s.is_terminating());
304            if !all_stopped {
305                anyhow::bail!(
306                    "proc mesh {} not all procs reached terminating state after stop: {:?}",
307                    id,
308                    state.statuses,
309                );
310            }
311            return Ok(());
312        }
313
314        let region = self.region.clone();
315        let procs = self.current_ref.proc_ids().collect::<Vec<ProcAddr>>();
316        // We use the proc mesh region rather than the host mesh region
317        // because the host agent stores one entry per proc, not per host.
318        self.current_ref
319            .host_mesh
320            .as_ref()
321            .expect("ProcMesh always has a host mesh")
322            .stop_proc_mesh(cx, &self.id, procs, region, reason)
323            .await
324            .map(|_| ())
325            .map_err(anyhow::Error::from)
326    }
327
328    #[cfg(test)]
329    pub(crate) fn ranks(&self) -> Arc<Vec<ProcRef>> {
330        Arc::clone(&self.current_ref.ranks)
331    }
332}
333
334impl fmt::Display for ProcMesh {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        write!(f, "{}", self.current_ref)
337    }
338}
339
340impl Deref for ProcMesh {
341    type Target = ProcMeshRef;
342
343    fn deref(&self) -> &Self::Target {
344        &self.current_ref
345    }
346}
347
348impl Drop for ProcMesh {
349    fn drop(&mut self) {
350        tracing::info!(
351            name = "ProcMeshStatus",
352            proc_mesh = %self.id,
353            status = "Dropped",
354        );
355    }
356}
357
358/// A reference to a ProcMesh, consisting of a set of ranked [`ProcRef`]s,
359/// arranged into a region. ProcMeshes are named, uniquely identifying the
360/// ProcMesh from which the reference was derived.
361///
362/// ProcMeshes can be sliced to create new ProcMeshes with a subset of the
363/// original ranks.
364#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
365pub struct ProcMeshRef {
366    id: ProcMeshId,
367    region: Region,
368    ranks: Arc<Vec<ProcRef>>,
369    // Some if this was spawned from a host mesh, else none.
370    host_mesh: Option<HostMeshRef>,
371    // Temporary: used to fit v1 ActorMesh with v0's casting implementation. This
372    // should be removed after we remove the v0 code.
373    // The root region of this mesh. None means this mesh itself is the root.
374    pub(crate) root_region: Option<Region>,
375    // Temporary: used to fit v1 ActorMesh with v0's casting implementation. This
376    // should be removed after we remove the v0 code.
377    // v0 casting requires root mesh rank 0 as the 1st hop, so we need to provide
378    // it here. For v1, this can be removed since v1 can use any rank.
379    pub(crate) root_comm_actor: Option<ActorRef<CommActor>>,
380}
381wirevalue::register_type!(ProcMeshRef);
382
383impl ProcMeshRef {
384    /// Create a new ProcMeshRef from the given id, region, ranks, and so on.
385    #[allow(clippy::result_large_err)]
386    fn new(
387        id: ProcMeshId,
388        region: Region,
389        ranks: Arc<Vec<ProcRef>>,
390        host_mesh: Option<HostMeshRef>,
391        root_region: Option<Region>,
392        root_comm_actor: Option<ActorRef<CommActor>>,
393    ) -> crate::Result<Self> {
394        if region.num_ranks() != ranks.len() {
395            return Err(crate::Error::InvalidRankCardinality {
396                expected: region.num_ranks(),
397                actual: ranks.len(),
398            });
399        }
400        Ok(Self {
401            id,
402            region,
403            ranks,
404            host_mesh,
405            root_region,
406            root_comm_actor,
407        })
408    }
409
410    /// Create a singleton ProcMeshRef, given the provided ProcRef and id.
411    /// This is used to support creating local singleton proc meshes to support `this_proc()`
412    /// in python client actors.
413    pub fn new_singleton(id: ProcMeshId, proc_ref: ProcRef) -> Self {
414        Self {
415            id,
416            region: Extent::unity().into(),
417            ranks: Arc::new(vec![proc_ref]),
418            host_mesh: None,
419            root_region: None,
420            root_comm_actor: None,
421        }
422    }
423
424    pub(crate) fn root_comm_actor(&self) -> Option<&ActorRef<CommActor>> {
425        self.root_comm_actor.as_ref()
426    }
427
428    pub fn id(&self) -> &ProcMeshId {
429        &self.id
430    }
431
432    pub fn host_mesh_id(&self) -> Option<&crate::mesh_id::HostMeshId> {
433        self.host_mesh.as_ref().map(|h| h.id())
434    }
435
436    /// Returns the HostMeshRef that owns this ProcMeshRef, if any.
437    pub fn hosts(&self) -> Option<&HostMeshRef> {
438        self.host_mesh.as_ref()
439    }
440
441    pub(crate) fn agent_mesh(&self) -> ActorMeshRef<ProcAgent> {
442        let agent_label = self
443            .ranks
444            .first()
445            .unwrap()
446            .agent
447            .actor_addr()
448            .label()
449            .cloned()
450            .unwrap_or_else(|| Label::new(proc_agent::PROC_AGENT_ACTOR_NAME).unwrap());
451        let id = ActorMeshId::singleton(agent_label);
452        ActorMeshRef::new(id, self.clone(), None)
453    }
454
455    /// Query the state of all actors in this mesh matching the given id.
456    pub async fn actor_states(
457        &self,
458        cx: &impl context::Actor,
459        id: ActorMeshId,
460    ) -> crate::Result<ValueMesh<resource::State<ActorState>>> {
461        self.actor_states_with_keepalive(cx, id, None).await
462    }
463
464    /// Query the state of all actors in this mesh matching the given id.
465    /// If keepalive is Some, use a message that indicates to the recipient
466    /// that the owner of the mesh is still alive, along with the expiry time
467    /// after which the actor should be considered orphaned. Else, use a normal
468    /// state query.
469    pub(crate) async fn actor_states_with_keepalive(
470        &self,
471        cx: &impl context::Actor,
472        id: ActorMeshId,
473        keepalive: Option<std::time::SystemTime>,
474    ) -> crate::Result<ValueMesh<resource::State<ActorState>>> {
475        let agent_mesh = self.agent_mesh();
476        let (port, mut rx) = cx.mailbox().open_port::<resource::State<ActorState>>();
477        let mut port = port.bind();
478        // If this proc dies or some other issue renders the reply undeliverable,
479        // the reply does not need to be returned to the sender.
480        port.return_undeliverable(false);
481        // TODO: Use accumulation to get back a single value (representing whether
482        // *any* of the actors failed) instead of a mesh.
483        let get_state = resource::GetState::<ActorState> {
484            id: id.resource_id().clone(),
485            reply: port,
486        };
487        if let Some(expires_after) = keepalive {
488            agent_mesh.cast(
489                cx,
490                resource::KeepaliveGetState {
491                    expires_after,
492                    get_state,
493                },
494            )?;
495        } else {
496            agent_mesh.cast(cx, get_state)?;
497        }
498        let expected = self.ranks.len();
499        let mut states = Vec::with_capacity(expected);
500        let timeout = hyperactor_config::global::get(GET_ACTOR_STATE_MAX_IDLE);
501        for _ in 0..expected {
502            // The agent runs on the same process as the running actor, so if some
503            // fatal event caused the process to crash (e.g. OOM, signal, process exit),
504            // the agent will be unresponsive.
505            // We handle this by setting a timeout on the recv, and if we don't get a
506            // message we assume the agent is dead and return a failed state.
507            let state = tokio::time::timeout(timeout, rx.recv()).await;
508            if let Ok(state) = state {
509                // Handle non-timeout receiver error.
510                let state = state?;
511                match state.state {
512                    Some(ref inner) => {
513                        states.push((inner.create_rank, state));
514                    }
515                    None => {
516                        return Err(Error::NotExist(state.id));
517                    }
518                }
519            } else {
520                tracing::error!(
521                    "timeout waiting for a message after {:?} from proc mesh agent in mesh {}",
522                    timeout,
523                    agent_mesh
524                );
525                // Timeout error, stop reading from the receiver and send back what we have so far,
526                // padding with failed states.
527                let all_ranks = (0..self.ranks.len()).collect::<HashSet<_>>();
528                let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
529                let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
530                assert_eq!(leftover_ranks.len(), expected - states.len());
531                while states.len() < expected {
532                    let rank = *leftover_ranks
533                        .pop()
534                        .expect("leftover ranks should not be empty");
535                    let agent = agent_mesh.get(rank).expect("agent should exist");
536                    let agent_id = agent.actor_addr().clone();
537                    states.push((
538                        // We populate with any ranks leftover at the time of the timeout.
539                        rank,
540                        resource::State {
541                            id: id.resource_id().clone(),
542                            status: resource::Status::Timeout(timeout),
543                            // We don't know the ActorAddr that used to live on this rank.
544                            // But we do know the mesh agent id, so we'll use that.
545                            // Use u64::MAX so this synthetic state always wins
546                            // last-writer-wins ordering against real streamed updates.
547                            generation: u64::MAX,
548                            timestamp: std::time::SystemTime::now(),
549                            state: Some(ActorState {
550                                actor_id: agent_id.clone(),
551                                create_rank: rank,
552                                supervision_events: vec![ActorSupervisionEvent::new(
553                                    agent_id,
554                                    None,
555                                    ActorStatus::generic_failure(format!(
556                                        "timeout waiting for message from proc mesh agent while querying for \"{}\". The process likely crashed",
557                                        id,
558                                    )),
559                                    None,
560                                )],
561                            }),
562                        },
563                    ));
564                }
565                break;
566            }
567        }
568        // Ensure that all ranks have replied. Note that if the mesh is sliced,
569        // not all create_ranks may be in the mesh.
570        // Sort by rank, so that the resulting mesh is ordered.
571        states.sort_by_key(|(rank, _)| *rank);
572        let vm = states
573            .into_iter()
574            .map(|(_, state)| state)
575            .collect_mesh::<ValueMesh<_>>(self.region.clone())?;
576        Ok(vm)
577    }
578
579    pub async fn proc_states(
580        &self,
581        cx: &impl context::Actor,
582        keepalive: Option<std::time::SystemTime>,
583    ) -> crate::Result<Option<ValueMesh<resource::State<ProcState>>>> {
584        let names = self.proc_ids().collect::<Vec<ProcAddr>>();
585        if let Some(host_mesh) = &self.host_mesh {
586            Ok(Some(
587                host_mesh
588                    .proc_states(cx, names, self.region.clone(), keepalive)
589                    .await?,
590            ))
591        } else {
592            Ok(None)
593        }
594    }
595
596    /// Returns an iterator over the proc ids in this mesh.
597    pub(crate) fn proc_ids(&self) -> impl Iterator<Item = ProcAddr> {
598        self.ranks.iter().map(|proc_ref| proc_ref.proc_id.clone())
599    }
600
601    /// Spawn an actor on all of the procs in this mesh, returning a
602    /// new ActorMesh.
603    ///
604    /// Bounds:
605    /// - `A: Actor` - the actor actually runs inside each proc.
606    /// - `A: Referable` - so we can return typed `ActorRef<A>`s
607    ///   inside the `ActorMesh`.
608    /// - `A::Params: RemoteMessage` - spawn parameters must be
609    ///   serializable and routable.
610    pub async fn spawn<A: RemoteSpawn, C: context::Actor>(
611        &self,
612        cx: &C,
613        name: &str,
614        params: &A::Params,
615    ) -> crate::Result<ActorMesh<A>>
616    where
617        A::Params: RemoteMessage,
618        C::A: Handler<MeshFailure>,
619    {
620        // Spawning from a string is never a system actor.
621        let id = ActorMeshId::instance(Label::strip(name));
622        self.spawn_with_name(cx, id, params, None, false).await
623    }
624
625    /// Spawn a 'service' actor. Service actors are *singletons*, using
626    /// reserved names. The provided name is used verbatim as the actor's
627    /// name, and thus it may be persistently looked up by constructing
628    /// the appropriate name.
629    ///
630    /// Note: avoid using service actors if possible; the mechanism will
631    /// be replaced by an actor registry.
632    pub async fn spawn_service<A: RemoteSpawn, C: context::Actor>(
633        &self,
634        cx: &C,
635        name: &str,
636        params: &A::Params,
637    ) -> crate::Result<ActorMesh<A>>
638    where
639        A::Params: RemoteMessage,
640        C::A: Handler<MeshFailure>,
641    {
642        let id = ActorMeshId::singleton(Label::strip(name));
643        self.spawn_with_name(cx, id, params, None, false).await
644    }
645
646    /// Spawn an actor on all procs in this mesh under the given
647    /// [`ActorMeshId`](crate::mesh_id::ActorMeshId), returning a new `ActorMesh`.
648    ///
649    /// This is the underlying implementation used by [`spawn`]; it
650    /// differs only in that the actor mesh id is passed explicitly
651    /// rather than as a `&str`.
652    ///
653    /// Bounds:
654    /// - `A: Actor` - the actor actually runs inside each proc.
655    /// - `A: Referable` - so we can return typed `ActorRef<A>`s
656    ///   inside the `ActorMesh`.
657    /// - `A::Params: RemoteMessage` - spawn parameters must be
658    ///   serializable and routable.
659    /// - `C::A: Handler<MeshFailure>` - in order to spawn actors,
660    ///   the actor must accept messages of type `MeshFailure`. This
661    ///   is delivered when the actors spawned in the mesh have a failure that
662    ///   isn't handled.
663    #[hyperactor::instrument(fields(
664        host_mesh=self.host_mesh_id().map(|id| id.to_string()),
665        proc_mesh=self.id.to_string(),
666        actor_name=name.to_string(),
667    ))]
668    pub async fn spawn_with_name<A: RemoteSpawn, C: context::Actor>(
669        &self,
670        cx: &C,
671        name: ActorMeshId,
672        params: &A::Params,
673        supervision_display_name: Option<String>,
674        is_system_actor: bool,
675    ) -> crate::Result<ActorMesh<A>>
676    where
677        A::Params: RemoteMessage,
678        C::A: Handler<MeshFailure>,
679    {
680        tracing::info!(
681            name = "ProcMeshStatus",
682            status = "ActorMesh::Spawn::Attempt",
683        );
684        tracing::info!(name = "ActorMeshStatus", status = "Spawn::Attempt");
685        let result = self
686            .spawn_with_name_inner(cx, name, params, supervision_display_name, is_system_actor)
687            .await;
688        match &result {
689            Ok(_) => {
690                tracing::info!(
691                    name = "ProcMeshStatus",
692                    status = "ActorMesh::Spawn::Success",
693                );
694                tracing::info!(name = "ActorMeshStatus", status = "Spawn::Success");
695            }
696            Err(error) => {
697                tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Spawn::Failed", %error);
698                tracing::error!(name = "ActorMeshStatus", status = "Spawn::Failed", %error);
699            }
700        }
701        result
702    }
703
704    async fn spawn_with_name_inner<A: RemoteSpawn, C: context::Actor>(
705        &self,
706        cx: &C,
707        actor_mesh_id: ActorMeshId,
708        params: &A::Params,
709        supervision_display_name: Option<String>,
710        is_system_actor: bool,
711    ) -> crate::Result<ActorMesh<A>>
712    where
713        C::A: Handler<MeshFailure>,
714    {
715        let remote = Remote::collect();
716        // `RemoteSpawn` + `register_spawnable!(A)` ensure that `A` has a
717        // `SpawnableActor` entry in this registry, so
718        // `name_of::<A>()` can resolve its global type name.
719        let actor_type = remote
720            .name_of::<A>()
721            .ok_or(Error::ActorTypeNotRegistered(type_name::<A>().to_string()))?
722            .to_string();
723
724        let serialized_params = bincode::serde::encode_to_vec(params, bincode::config::legacy())?;
725        let agent_mesh = self.agent_mesh();
726
727        agent_mesh.cast(
728            cx,
729            resource::CreateOrUpdate::<proc_agent::ActorSpec> {
730                id: actor_mesh_id.resource_id().clone(),
731                rank: Default::default(),
732                spec: proc_agent::ActorSpec {
733                    actor_type: actor_type.clone(),
734                    params_data: serialized_params.clone(),
735                },
736            },
737        )?;
738
739        let region = self.region().clone();
740        // Open an accum port that *receives overlays* and *emits full
741        // meshes*.
742        //
743        // NOTE: Mailbox initializes the accumulator state via
744        // `Default`, which is an *empty* ValueMesh (0 ranks). Our
745        // Accumulator<ValueMesh<T>> implementation detects this on
746        // the first update and replaces it with the caller-supplied
747        // template (the `self` passed into open_accum_port), which we
748        // seed here as "full NotExist over the target region".
749        let (port, rx) = cx.mailbox().open_accum_port_opts(
750            // Initial state for the accumulator: full mesh seeded to
751            // NotExist.
752            crate::StatusMesh::from_single(region.clone(), Status::NotExist),
753            StreamingReducerOpts {
754                max_update_interval: Some(Duration::from_millis(50)),
755                initial_update_interval: None,
756            },
757        );
758
759        let mut reply = port.bind();
760        // If this proc dies or some other issue renders the reply undeliverable,
761        // the reply does not need to be returned to the sender.
762        reply.return_undeliverable(false);
763        // Send a message to all ranks. They reply with overlays to
764        // `port`.
765        agent_mesh.cast(
766            cx,
767            resource::GetRankStatus {
768                id: actor_mesh_id.resource_id().clone(),
769                reply,
770            },
771        )?;
772
773        let start_time = tokio::time::Instant::now();
774
775        // Wait for all ranks to report a terminal or running status.
776        // If any proc reports a failure (via supervision) or the mesh
777        // times out, `wait()` returns Err with the final snapshot.
778        //
779        // `rx` is the accumulator output stream: each time reduced
780        // overlays are applied, it emits a new StatusMesh snapshot.
781        // `wait()` loops on it, deciding when the stream is
782        // "complete" (no more NotExist) or times out.
783        let (statuses, mut mesh) = match GetRankStatus::wait(
784            rx,
785            self.ranks.len(),
786            hyperactor_config::global::get(ACTOR_SPAWN_MAX_IDLE),
787            region.clone(), // fallback
788        )
789        .await
790        {
791            Ok(statuses) => {
792                // Spawn succeeds only if no rank has reported a
793                // supervision/terminal state. This preserves the old
794                // `first_terminating().is_none()` semantics.
795                let has_terminating = statuses.values().any(|s| s.is_terminating());
796                if !has_terminating {
797                    Ok((
798                        statuses,
799                        ActorMesh::new(self.clone(), actor_mesh_id.clone(), None),
800                    ))
801                } else {
802                    let legacy = mesh_to_rankedvalues_with_default(
803                        &statuses,
804                        Status::NotExist,
805                        Status::is_not_exist,
806                        self.ranks.len(),
807                    );
808                    Err(Error::ActorSpawnError { statuses: legacy })
809                }
810            }
811            Err(complete) => {
812                // Fill remaining ranks with a timeout status, now
813                // handled via the legacy shim.
814                let elapsed = start_time.elapsed();
815                let legacy = mesh_to_rankedvalues_with_default(
816                    &complete,
817                    Status::Timeout(elapsed),
818                    Status::is_not_exist,
819                    self.ranks.len(),
820                );
821                Err(Error::ActorSpawnError { statuses: legacy })
822            }
823        }?;
824        // We don't need controllers for a system actor like the CommActor.
825        if !is_system_actor {
826            // Spawn a unique mesh manager for each actor mesh, so the type of the
827            // mesh can be preserved.
828            let controller: ActorMeshController<A> = ActorMeshController::new(
829                mesh.deref().clone(),
830                supervision_display_name.clone(),
831                Some(cx.instance().port().bind()),
832                statuses,
833            );
834            // hyperactor::proc AI-3: controller name must include mesh
835            // identity for proc-wide ActorAddr uniqueness. A fixed base name alone
836            // collides across parents because pid allocation is
837            // parent-scoped.
838            let controller_name = format!(
839                "{}_{}",
840                crate::mesh_controller::ACTOR_MESH_CONTROLLER_NAME,
841                mesh.id()
842            );
843            let controller = controller
844                .spawn_with_name(cx, &controller_name)
845                .map_err(|e| {
846                    Error::ControllerActorSpawnError(mesh.id().resource_id().clone(), e)
847                })?;
848            // Controller and ActorMesh both depend on references from each other, break
849            // the cycle by setting the controller after the fact.
850            mesh.set_controller(Some(controller.bind()));
851        }
852        // Notify telemetry that an actor mesh was created.
853        {
854            let id_str = mesh.id().to_string();
855
856            // Hash the actor mesh id. This is used as mesh_id for both
857            // the MeshEvent and the per-actor ActorEvents below.
858            let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&id_str);
859
860            // Hash the proc mesh id for parent_mesh_id.
861            let parent_mesh_id_hash = hyperactor_telemetry::hash_to_u64(&self.id().to_string());
862
863            hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
864                id: mesh_id_hash,
865                timestamp: std::time::SystemTime::now(),
866                class: supervision_display_name
867                    .as_deref()
868                    .and_then(python_class_from_supervision_name)
869                    .unwrap_or(actor_type),
870                given_name: mesh
871                    .id()
872                    .display_label()
873                    .map(|l| l.as_str())
874                    .unwrap_or("unnamed")
875                    .to_string(),
876                full_name: id_str,
877                shape_json: serde_json::to_string(&self.region().extent()).unwrap_or_default(),
878                parent_mesh_id: Some(parent_mesh_id_hash),
879                parent_view_json: serde_json::to_string(self.region()).ok(),
880            });
881
882            // Notify telemetry of each actor in this mesh. The rank is
883            // the actor's position within the actor mesh (not the proc's
884            // create_rank, which reflects the original unsliced mesh).
885            let now = std::time::SystemTime::now();
886            for (rank, proc_ref) in self.ranks.iter().enumerate() {
887                let display_name = supervision_display_name.as_ref().map(|sdn| {
888                    let point = self.region().extent().point_of_rank(rank).unwrap();
889                    crate::actor_display_name(sdn, &point)
890                });
891                let actor_id = proc_ref.actor_addr(&actor_mesh_id);
892                hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
893                    id: hyperactor_telemetry::hash_to_u64(&actor_id),
894                    timestamp: now,
895                    mesh_id: mesh_id_hash,
896                    rank: rank as u64,
897                    full_name: actor_id.to_string(),
898                    display_name,
899                });
900            }
901        }
902
903        Ok(mesh)
904    }
905
906    /// Send stop actors message to all mesh agents for a specific actor mesh id.
907    #[hyperactor::instrument(fields(
908        host_mesh = self.host_mesh_id().map(|id| id.to_string()),
909        proc_mesh = self.id.to_string(),
910        actor_mesh = actor_mesh_id.to_string(),
911    ))]
912    pub(crate) async fn stop_actor_by_id(
913        &self,
914        cx: &impl context::Actor,
915        actor_mesh_id: ActorMeshId,
916        reason: String,
917    ) -> crate::Result<ValueMesh<Status>> {
918        tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Attempt");
919        tracing::info!(name = "ActorMeshStatus", status = "Stop::Attempt");
920        let result = self.stop_actor_by_id_inner(cx, actor_mesh_id, reason).await;
921        match &result {
922            Ok(_) => {
923                tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Success");
924                tracing::info!(name = "ActorMeshStatus", status = "Stop::Success");
925            }
926            Err(error) => {
927                tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Failed", %error);
928                tracing::error!(name = "ActorMeshStatus", status = "Stop::Failed", %error);
929            }
930        }
931        result
932    }
933
934    async fn stop_actor_by_id_inner(
935        &self,
936        cx: &impl context::Actor,
937        actor_mesh_id: ActorMeshId,
938        reason: String,
939    ) -> crate::Result<ValueMesh<Status>> {
940        let region = self.region().clone();
941        let agent_mesh = self.agent_mesh();
942        agent_mesh.cast(
943            cx,
944            resource::Stop {
945                id: actor_mesh_id.resource_id().clone(),
946                reason,
947            },
948        )?;
949
950        // Open an accum port that *receives overlays* and *emits full
951        // meshes*.
952        //
953        // NOTE: Mailbox initializes the accumulator state via
954        // `Default`, which is an *empty* ValueMesh (0 ranks). Our
955        // Accumulator<ValueMesh<T>> implementation detects this on
956        // the first update and replaces it with the caller-supplied
957        // template (the `self` passed into open_accum_port), which we
958        // seed here as "full NotExist over the target region".
959        let (port, rx) = cx.mailbox().open_accum_port_opts(
960            // Initial state for the accumulator: full mesh seeded to
961            // NotExist.
962            crate::StatusMesh::from_single(region.clone(), Status::NotExist),
963            StreamingReducerOpts {
964                max_update_interval: Some(Duration::from_millis(50)),
965                initial_update_interval: None,
966            },
967        );
968        // Use WaitRankStatus instead of GetRankStatus so agents defer
969        // their reply until the actor reaches terminal state, rather
970        // than replying immediately with Stopping.
971        agent_mesh.cast(
972            cx,
973            resource::WaitRankStatus {
974                id: actor_mesh_id.resource_id().clone(),
975                min_status: Status::Stopped,
976                reply: port.bind(),
977            },
978        )?;
979        let start_time = tokio::time::Instant::now();
980
981        // Reuse actor spawn idle time.
982        let max_idle_time = hyperactor_config::global::get(ACTOR_SPAWN_MAX_IDLE);
983        match GetRankStatus::wait(
984            rx,
985            self.ranks.len(),
986            max_idle_time,
987            region.clone(), // fallback mesh if nothing arrives
988        )
989        .await
990        {
991            Ok(statuses) => {
992                // Check that all actors are in a terminating state (Stopping
993                // or beyond). Failed is ok, because one of these actors may
994                // have failed earlier and we're trying to stop the others.
995                let all_stopped = statuses.values().all(|s| s.is_terminating());
996                if all_stopped {
997                    Ok(statuses)
998                } else {
999                    let legacy = mesh_to_rankedvalues_with_default(
1000                        &statuses,
1001                        Status::NotExist,
1002                        Status::is_not_exist,
1003                        self.ranks.len(),
1004                    );
1005                    Err(Error::ActorStopError { statuses: legacy })
1006                }
1007            }
1008            Err(complete) => {
1009                // Fill remaining ranks with a timeout status via the
1010                // legacy shim.
1011                let legacy = mesh_to_rankedvalues_with_default(
1012                    &complete,
1013                    Status::Timeout(start_time.elapsed()),
1014                    Status::is_not_exist,
1015                    self.ranks.len(),
1016                );
1017                Err(Error::ActorStopError { statuses: legacy })
1018            }
1019        }
1020    }
1021}
1022
1023impl fmt::Display for ProcMeshRef {
1024    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1025        write!(f, "{}{{{}}}", self.id, self.region)
1026    }
1027}
1028
1029impl view::Ranked for ProcMeshRef {
1030    type Item = ProcRef;
1031
1032    fn region(&self) -> &Region {
1033        &self.region
1034    }
1035
1036    fn get(&self, rank: usize) -> Option<&Self::Item> {
1037        self.ranks.get(rank)
1038    }
1039}
1040
1041impl view::RankedSliceable for ProcMeshRef {
1042    fn sliced(&self, region: Region) -> Self {
1043        debug_assert!(region.is_subset(view::Ranked::region(self)));
1044        let ranks = self
1045            .region()
1046            .remap(&region)
1047            .unwrap()
1048            .map(|index| self.get(index).unwrap().clone())
1049            .collect();
1050        Self::new(
1051            self.id.clone(),
1052            region,
1053            Arc::new(ranks),
1054            self.host_mesh.clone(),
1055            Some(self.root_region.as_ref().unwrap_or(&self.region).clone()),
1056            self.root_comm_actor.clone(),
1057        )
1058        .unwrap()
1059    }
1060}
1061
1062/// Extract a Python class display name from a supervision display name.
1063///
1064/// The supervision display name format is `{instance}.<{module}.{ClassName} {mesh_name}>`.
1065/// Returns `"Python<ClassName>"` if the format matches, `None` otherwise.
1066///
1067/// Scope note: this function is used only by telemetry
1068/// (`MeshEvent.class`), which needs the Python class as a
1069/// structured string and has no structured carrier today. It is
1070/// not on the supervision rendering path.
1071///
1072/// TODO: retained only because the telemetry path needs a
1073/// structured Python-class string and this is the only available
1074/// source. A follow-up should add a structured carrier (e.g. an
1075/// `actor_class` field on `ActorSupervisionEvent`, or a dedicated
1076/// telemetry-side field) and delete this function.
1077fn python_class_from_supervision_name(sdn: &str) -> Option<String> {
1078    let inner = sdn.rsplit_once('<')?.1.strip_suffix('>')?;
1079    let qualified = inner.split_whitespace().next()?;
1080    let class_name = qualified.rsplit_once('.')?.1;
1081    Some(format!("Python<{class_name}>"))
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086    #[cfg(fbcode_build)]
1087    use std::ops::Deref;
1088    #[cfg(fbcode_build)]
1089    use std::time::Duration;
1090
1091    #[cfg(fbcode_build)]
1092    use hyperactor::Instance;
1093    #[cfg(fbcode_build)]
1094    use hyperactor::config::ENABLE_DEST_ACTOR_REORDERING_BUFFER;
1095    #[cfg(fbcode_build)]
1096    use ndslice::ViewExt as _;
1097    #[cfg(fbcode_build)]
1098    use ndslice::extent;
1099    #[cfg(fbcode_build)]
1100    use timed_test::assert_no_process_leak;
1101    #[cfg(fbcode_build)]
1102    use timed_test::async_timed_test;
1103    #[cfg(fbcode_build)]
1104    use uuid::Uuid;
1105
1106    #[cfg(fbcode_build)]
1107    use crate::ActorMesh;
1108    #[cfg(fbcode_build)]
1109    use crate::comm::ENABLE_NATIVE_V1_CASTING;
1110    #[cfg(fbcode_build)]
1111    use crate::host_mesh::PROC_SPAWN_MAX_IDLE;
1112    #[cfg(fbcode_build)]
1113    use crate::resource::RankedValues;
1114    #[cfg(fbcode_build)]
1115    use crate::resource::Status;
1116    #[cfg(fbcode_build)]
1117    use crate::testactor;
1118    #[cfg(fbcode_build)]
1119    use crate::testing;
1120
1121    #[cfg(fbcode_build)]
1122    async fn execute_spawn_actor() {
1123        hyperactor_telemetry::initialize_logging(hyperactor_telemetry::DefaultTelemetryClock {});
1124
1125        let instance = testing::instance();
1126
1127        let mut hm = testing::host_mesh(2).await;
1128        let proc_mesh = hm
1129            .spawn(&instance, "test", extent!(gpus = 1), None, None)
1130            .await
1131            .unwrap();
1132        let actor_mesh = proc_mesh.spawn(instance, "test", &()).await.unwrap();
1133        testactor::assert_mesh_shape(actor_mesh).await;
1134
1135        let _ = hm.shutdown(instance).await;
1136    }
1137
1138    #[async_timed_test(timeout_secs = 120)]
1139    #[cfg(fbcode_build)]
1140    async fn test_spawn_actor_v1_casting() {
1141        let config = hyperactor_config::global::lock();
1142        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1143        let _guard2 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1144        let _guard3 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(120));
1145        let _guard4 = config.override_key(
1146            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1147            Duration::from_secs(120),
1148        );
1149        execute_spawn_actor().await;
1150    }
1151
1152    #[async_timed_test(timeout_secs = 120)]
1153    #[cfg(fbcode_build)]
1154    async fn test_spawn_actor_v1_casting_p2p() {
1155        let config = hyperactor_config::global::lock();
1156        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1157        let _guard2 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1158        let _guard3 = config.override_key(crate::config::V1_CAST_POINT_TO_POINT_THRESHOLD, 1024);
1159        let _guard4 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(120));
1160        let _guard5 = config.override_key(
1161            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1162            Duration::from_secs(120),
1163        );
1164        execute_spawn_actor().await;
1165    }
1166
1167    #[async_timed_test(timeout_secs = 120)]
1168    #[cfg(fbcode_build)]
1169    async fn test_spawn_actor_v0_casting() {
1170        let config = hyperactor_config::global::lock();
1171        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
1172        let _guard2 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(120));
1173        let _guard3 = config.override_key(
1174            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1175            Duration::from_secs(120),
1176        );
1177        execute_spawn_actor().await;
1178    }
1179
1180    /// Spawn an actor mesh, then do a random number of casts to bump the seq
1181    /// numbers for all actors participating in the cast. This avoids the test
1182    /// mistakenly passing.
1183    #[cfg(fbcode_build)]
1184    async fn spawn_for_seq_test(
1185        cx: &Instance<testing::TestRootClient>,
1186        proc_mesh: &super::ProcMeshRef,
1187    ) -> ActorMesh<testactor::TestActor> {
1188        let actor_mesh: ActorMesh<testactor::TestActor> =
1189            proc_mesh.spawn(cx, "test", &()).await.unwrap();
1190
1191        let (instance, _) = cx
1192            .proc()
1193            .client(&format!("random_casts_{}", Uuid::now_v7()))
1194            .unwrap();
1195        let n = 1;
1196        for _ in 0..n {
1197            actor_mesh.cast(&instance, ()).unwrap();
1198        }
1199        println!(
1200            "did {} casts with sequencer session id {}",
1201            n,
1202            instance.sequencer().session_id()
1203        );
1204        actor_mesh
1205    }
1206
1207    #[async_timed_test(timeout_secs = 60)]
1208    #[cfg(fbcode_build)]
1209    async fn test_seq_from_same_sender_to_different_meshes() {
1210        let config = hyperactor_config::global::lock();
1211        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1212        let _guard2 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1213        let _guard3 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(60));
1214        let _guard4 = config.override_key(
1215            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1216            Duration::from_secs(60),
1217        );
1218
1219        hyperactor_telemetry::initialize_logging_for_test();
1220        let instance = testing::instance();
1221        let session_id = instance.sequencer().session_id();
1222
1223        let mut hm = testing::host_mesh(2).await;
1224        let proc_mesh = hm
1225            .spawn(&instance, "test", extent!(gpus = 1), None, None)
1226            .await
1227            .unwrap();
1228        let proc_mesh_ref = proc_mesh.deref();
1229
1230        // Sequence numbers are scoped based on the (client, dest) pair.
1231        // So casts to different meshes from the same client instance would
1232        // result in seq 1 for all casts.
1233        let handles = (0..3)
1234            .map(|_| {
1235                let proc_mesh_ref_clone = proc_mesh_ref.clone();
1236                tokio::spawn(async move {
1237                    let actor_mesh = spawn_for_seq_test(instance, &proc_mesh_ref_clone).await;
1238                    let expected_seqs = vec![1; 2];
1239                    testactor::assert_casting_correctness(
1240                        &actor_mesh,
1241                        instance,
1242                        Some((session_id, expected_seqs)),
1243                    )
1244                    .await;
1245                })
1246            })
1247            .collect::<Vec<_>>();
1248        futures::future::join_all(handles).await;
1249
1250        let _ = hm.shutdown(instance).await;
1251    }
1252
1253    /// Verify that the seq numbers are assigned correctly when we cast to
1254    /// different views of the same root mesh.
1255    #[async_timed_test(timeout_secs = 60)]
1256    #[cfg(fbcode_build)]
1257    async fn test_seq_from_same_sender_to_different_views() {
1258        let config = hyperactor_config::global::lock();
1259        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1260        let _guard2 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1261        let _guard3 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(60));
1262        let _guard4 = config.override_key(
1263            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1264            Duration::from_secs(60),
1265        );
1266
1267        hyperactor_telemetry::initialize_logging_for_test();
1268
1269        let instance = testing::instance();
1270        let session_id = instance.sequencer().session_id();
1271
1272        let mut hm = testing::host_mesh(3).await;
1273        let proc_mesh = hm
1274            .spawn(&instance, "test", extent!(gpus = 1), None, None)
1275            .await
1276            .unwrap();
1277
1278        let actor_mesh = spawn_for_seq_test(instance, &proc_mesh).await;
1279
1280        // First cast. The seq should be 1 for all actors.
1281        let expected_seqs = vec![1; 3];
1282        testactor::assert_casting_correctness(
1283            &actor_mesh,
1284            instance,
1285            Some((session_id, expected_seqs)),
1286        )
1287        .await;
1288
1289        // Verify casting to the sliced actor mesh
1290        let sliced_actor_mesh = actor_mesh.range("hosts", 1..3).unwrap();
1291        // Second cast. The seq should be 2 for actors in the sliced mesh.
1292        let expected_seqs = vec![2; 2];
1293        testactor::assert_casting_correctness(
1294            &sliced_actor_mesh,
1295            instance,
1296            Some((session_id, expected_seqs)),
1297        )
1298        .await;
1299
1300        // Verify casting to a different sliced actor mesh
1301        let sliced_actor_mesh = actor_mesh.range("hosts", 0..2).unwrap();
1302        // For actors in the previous sliced mesh, the seq should be 3 since
1303        // this is the third cast for them. For other actors, the seq should
1304        // be 2.
1305        let expected_seqs = vec![2, 3];
1306        testactor::assert_casting_correctness(
1307            &sliced_actor_mesh,
1308            instance,
1309            Some((session_id, expected_seqs)),
1310        )
1311        .await;
1312
1313        let _ = hm.shutdown(instance).await;
1314    }
1315
1316    #[async_timed_test(timeout_secs = 60)]
1317    #[cfg(fbcode_build)]
1318    async fn test_seq_from_different_senders() {
1319        let config = hyperactor_config::global::lock();
1320        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1321        let _guard2 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1322        let _guard3 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(60));
1323        let _guard4 = config.override_key(
1324            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1325            Duration::from_secs(60),
1326        );
1327
1328        hyperactor_telemetry::initialize_logging_for_test();
1329
1330        use hyperactor::Proc;
1331        use hyperactor::channel::ChannelTransport;
1332
1333        let proc = Proc::direct(ChannelTransport::Unix.any(), "test_0".to_string()).unwrap();
1334        let instance = proc
1335            .actor_instance::<testing::TestRootClient>("test_client")
1336            .unwrap()
1337            .instance;
1338        let first_instance = proc
1339            .actor_instance::<testing::TestRootClient>("first_client")
1340            .unwrap()
1341            .instance;
1342        let second_instance = proc
1343            .actor_instance::<testing::TestRootClient>("second_client")
1344            .unwrap()
1345            .instance;
1346        let third_instance = proc
1347            .actor_instance::<testing::TestRootClient>("third_client")
1348            .unwrap()
1349            .instance;
1350
1351        let mut hm = testing::host_mesh(2).await;
1352        let proc_mesh = hm
1353            .spawn(&instance, "test", extent!(gpus = 1), None, None)
1354            .await
1355            .unwrap();
1356
1357        let actor_mesh = spawn_for_seq_test(&instance, &proc_mesh).await;
1358
1359        // Sequence numbers are calculated based on the sequencer, i.e. the
1360        // client name. So three casts would result in seq 1 for all actors.
1361        for inst in [&first_instance, &second_instance, &third_instance] {
1362            let expected_seqs = vec![1; 2];
1363            let session_id = inst.sequencer().session_id();
1364            testactor::assert_casting_correctness(
1365                &actor_mesh,
1366                inst,
1367                Some((session_id, expected_seqs)),
1368            )
1369            .await;
1370        }
1371
1372        let _ = hm.shutdown(&instance).await;
1373    }
1374
1375    #[cfg(fbcode_build)]
1376    #[assert_no_process_leak]
1377    #[tokio::test]
1378    async fn test_failing_spawn_actor() {
1379        hyperactor_telemetry::initialize_logging(hyperactor_telemetry::DefaultTelemetryClock {});
1380
1381        let config = hyperactor_config::global::lock();
1382        let _guard = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(60));
1383        let _guard2 = config.override_key(
1384            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1385            Duration::from_secs(60),
1386        );
1387
1388        let instance = testing::instance();
1389
1390        let mut hm = testing::host_mesh(1).await;
1391        let proc_mesh = hm
1392            .spawn(&instance, "test", extent!(gpus = 1), None, None)
1393            .await
1394            .unwrap();
1395        let err = proc_mesh
1396            .spawn::<testactor::FailingCreateTestActor, Instance<testing::TestRootClient>>(
1397                instance,
1398                "testfail",
1399                &(),
1400            )
1401            .await
1402            .unwrap_err();
1403        let statuses = err.into_actor_spawn_error().unwrap();
1404        assert_eq!(
1405            statuses,
1406            RankedValues::from((0..1, Status::Failed("test failure".to_string()))),
1407        );
1408
1409        let _ = hm.shutdown(instance).await;
1410    }
1411
1412    #[test]
1413    fn test_python_class_from_supervision_name() {
1414        use super::python_class_from_supervision_name;
1415
1416        assert_eq!(
1417            python_class_from_supervision_name("instance0.<my_module.MyWorker test_mesh>"),
1418            Some("Python<MyWorker>".to_string()),
1419        );
1420        assert_eq!(
1421            python_class_from_supervision_name(
1422                "instance0.<package.submodule.TrainingActor mesh_0>"
1423            ),
1424            Some("Python<TrainingActor>".to_string()),
1425        );
1426        // No angle brackets — not a Python supervision name.
1427        assert_eq!(python_class_from_supervision_name("plain_name"), None,);
1428        // Malformed: missing dot-qualified class name.
1429        assert_eq!(
1430            python_class_from_supervision_name("instance0.<NoModule mesh>"),
1431            None,
1432        );
1433    }
1434}