hyperactor_mesh/v1/
host_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use hyperactor::Actor;
10use hyperactor::Handler;
11use hyperactor::accum::ReducerOpts;
12use hyperactor::channel::ChannelTransport;
13use hyperactor::clock::Clock;
14use hyperactor::clock::RealClock;
15use hyperactor::host::Host;
16use hyperactor_config::CONFIG;
17use hyperactor_config::ConfigAttr;
18use hyperactor_config::attrs::declare_attrs;
19use ndslice::view::CollectMeshExt;
20
21use crate::supervision::MeshFailure;
22
23pub mod mesh_agent;
24
25use std::collections::HashSet;
26use std::ops::Deref;
27use std::str::FromStr;
28use std::sync::Arc;
29use std::time::Duration;
30
31use hyperactor::ActorRef;
32use hyperactor::ProcId;
33use hyperactor::channel::ChannelAddr;
34use hyperactor::context;
35use ndslice::Extent;
36use ndslice::Region;
37use ndslice::ViewExt;
38use ndslice::extent;
39use ndslice::view;
40use ndslice::view::Ranked;
41use ndslice::view::RegionParseError;
42use serde::Deserialize;
43use serde::Serialize;
44use typeuri::Named;
45
46use crate::Bootstrap;
47use crate::alloc::Alloc;
48use crate::bootstrap::BootstrapCommand;
49use crate::bootstrap::BootstrapProcManager;
50use crate::proc_mesh::DEFAULT_TRANSPORT;
51use crate::resource;
52use crate::resource::CreateOrUpdateClient;
53use crate::resource::GetRankStatus;
54use crate::resource::GetRankStatusClient;
55use crate::resource::ProcSpec;
56use crate::resource::RankedValues;
57use crate::resource::Status;
58use crate::v1;
59use crate::v1::Name;
60use crate::v1::ProcMesh;
61use crate::v1::ProcMeshRef;
62use crate::v1::ValueMesh;
63use crate::v1::host_mesh::mesh_agent::HostAgentMode;
64pub use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
65use crate::v1::host_mesh::mesh_agent::HostMeshAgentProcMeshTrampoline;
66use crate::v1::host_mesh::mesh_agent::ProcState;
67use crate::v1::host_mesh::mesh_agent::ShutdownHostClient;
68use crate::v1::mesh_controller::HostMeshController;
69use crate::v1::mesh_controller::ProcMeshController;
70use crate::v1::proc_mesh::ProcRef;
71
72declare_attrs! {
73    /// The maximum idle time between updates while spawning proc
74    /// meshes.
75    @meta(CONFIG = ConfigAttr {
76        env_name: Some("HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()),
77        py_name: Some("mesh_proc_spawn_max_idle".to_string()),
78    })
79    pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
80
81    /// The maximum idle time between updates while stopping proc
82    /// meshes.
83    @meta(CONFIG = ConfigAttr {
84        env_name: Some("HYPERACTOR_MESH_PROC_STOP_MAX_IDLE".to_string()),
85        py_name: Some("proc_stop_max_idle".to_string()),
86    })
87    pub attr PROC_STOP_MAX_IDLE: Duration = Duration::from_secs(30);
88
89    @meta(CONFIG = ConfigAttr {
90        env_name: Some("HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE".to_string()),
91        py_name: Some("get_proc_state_max_idle".to_string()),
92    })
93    pub attr GET_PROC_STATE_MAX_IDLE: Duration = Duration::from_mins(1);
94}
95
96/// A reference to a single host.
97#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
98pub struct HostRef(ChannelAddr);
99wirevalue::register_type!(HostRef);
100
101impl HostRef {
102    /// The host mesh agent associated with this host.
103    fn mesh_agent(&self) -> ActorRef<HostMeshAgent> {
104        ActorRef::attest(self.service_proc().actor_id("agent", 0))
105    }
106
107    /// The ProcId for the proc with name `name` on this host.
108    fn named_proc(&self, name: &Name) -> ProcId {
109        ProcId::Direct(self.0.clone(), name.to_string())
110    }
111
112    /// The service proc on this host.
113    fn service_proc(&self) -> ProcId {
114        ProcId::Direct(self.0.clone(), "service".to_string())
115    }
116
117    /// Request an orderly teardown of this host and all procs it
118    /// spawned.
119    ///
120    /// This resolves the per-child grace **timeout** and the maximum
121    /// termination **concurrency** from config and sends a
122    /// [`ShutdownHost`] message to the host's agent. The agent then:
123    ///
124    /// 1) Performs a graceful termination pass over all tracked
125    ///    children (TERM → wait(`timeout`) → KILL), with at most
126    ///    `max_in_flight` running concurrently.
127    /// 2) After the pass completes, **drops the Host**, which also
128    ///    drops the embedded `BootstrapProcManager`. The manager's
129    ///    `Drop` serves as a last-resort safety net (it SIGKILLs
130    ///    anything that somehow remains).
131    ///
132    /// This call returns `Ok(()))` only after the agent has finished
133    /// the termination pass and released the host, so the host is no
134    /// longer reachable when this returns.
135    pub(crate) async fn shutdown(
136        &self,
137        cx: &impl hyperactor::context::Actor,
138    ) -> anyhow::Result<()> {
139        let agent = self.mesh_agent();
140        let terminate_timeout =
141            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
142        let max_in_flight =
143            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
144        agent
145            .shutdown_host(cx, terminate_timeout, max_in_flight.clamp(1, 256))
146            .await?;
147        Ok(())
148    }
149}
150
151impl TryFrom<ActorRef<HostMeshAgent>> for HostRef {
152    type Error = v1::Error;
153
154    fn try_from(value: ActorRef<HostMeshAgent>) -> Result<Self, v1::Error> {
155        let proc_id = value.actor_id().proc_id();
156        match proc_id.as_direct() {
157            Some((addr, _)) => Ok(HostRef(addr.clone())),
158            None => Err(v1::Error::RankedProc(proc_id.clone())),
159        }
160    }
161}
162
163impl std::fmt::Display for HostRef {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        self.0.fmt(f)
166    }
167}
168
169impl FromStr for HostRef {
170    type Err = <ChannelAddr as FromStr>::Err;
171
172    fn from_str(s: &str) -> Result<Self, Self::Err> {
173        Ok(HostRef(ChannelAddr::from_str(s)?))
174    }
175}
176
177/// An owned mesh of hosts.
178///
179/// # Lifecycle
180/// `HostMesh` owns host lifecycles. Callers **must** invoke
181/// [`HostMesh::shutdown`] for deterministic teardown. The `Drop` impl
182/// performs **best-effort** cleanup only (spawned via Tokio if
183/// available); it is a safety net, not a substitute for orderly
184/// shutdown.
185///
186/// In tests and production, prefer explicit shutdown to guarantee
187/// that host agents drop their `BootstrapProcManager`s and that all
188/// child procs are reaped.
189#[allow(dead_code)]
190pub struct HostMesh {
191    name: Name,
192    extent: Extent,
193    allocation: HostMeshAllocation,
194    current_ref: HostMeshRef,
195}
196
197/// Allocation backing for an owned [`HostMesh`].
198///
199/// This enum records how the underlying hosts were provisioned, which
200/// in turn determines how their lifecycle is managed:
201///
202/// - `ProcMesh`: Hosts were allocated intrinsically via a
203///   [`ProcMesh`]. The `HostMesh` owns the proc mesh and its service
204///   procs, and dropping the mesh ensures that all spawned child procs
205///   are terminated.
206/// - `Owned`: Hosts were constructed externally and "taken" under
207///   ownership. The `HostMesh` assumes responsibility for their
208///   lifecycle from this point forward, ensuring consistent cleanup on
209///   drop.
210///
211/// Additional variants may be added for other provisioning sources,
212/// but in all cases `HostMesh` is an owned resource that guarantees
213/// no leaked child processes.
214#[allow(dead_code)]
215enum HostMeshAllocation {
216    /// Hosts were allocated intrinsically via a [`ProcMesh`].
217    ///
218    /// In this mode, the `HostMesh` owns both the `ProcMesh` itself
219    /// and the service procs that implement each host. Dropping the
220    /// `HostMesh` also drops the embedded `ProcMesh`, ensuring that
221    /// all spawned child procs are terminated cleanly.
222    ProcMesh {
223        proc_mesh: ProcMesh,
224        proc_mesh_ref: ProcMeshRef,
225        hosts: Vec<HostRef>,
226    },
227    /// Hosts were constructed externally and explicitly transferred
228    /// under ownership by this `HostMesh`.
229    ///
230    /// In this mode, the `HostMesh` assumes responsibility for the
231    /// provided hosts going forward. Dropping the mesh guarantees
232    /// teardown of all associated state and signals to prevent any
233    /// leaked processes.
234    Owned { hosts: Vec<HostRef> },
235}
236
237impl HostMesh {
238    /// Bring up a local single-host mesh and, in the launcher
239    /// process, return a `HostMesh` handle for it.
240    ///
241    /// There are two execution modes:
242    ///
243    /// - bootstrap-child mode: if `Bootstrap::get_from_env()` says
244    ///   this process was launched as a bootstrap child, we call
245    ///   `boot.bootstrap().await`, which hands control to the
246    ///   bootstrap logic for this process (as defined by the
247    ///   `BootstrapCommand` the parent used to spawn it). if that
248    ///   call returns, we log the error and terminate. this branch
249    ///   does not produce a `HostMesh`.
250    ///
251    /// - launcher mode: otherwise, we are the process that is setting
252    ///   up the mesh. we create a `Host`, spawn a `HostMeshAgent` in
253    ///   it, and build a single-host `HostMesh` around that. that
254    ///   `HostMesh` is returned to the caller.
255    ///
256    /// This API is intended for tests, examples, and local bring-up,
257    /// not production.
258    ///
259    /// TODO: fix up ownership
260    pub async fn local() -> v1::Result<HostMesh> {
261        Self::local_with_bootstrap(BootstrapCommand::current()?).await
262    }
263
264    /// Same as [`local`], but the caller supplies the
265    /// `BootstrapCommand` instead of deriving it from the current
266    /// process.
267    ///
268    /// The provided `bootstrap_cmd` is used when spawning bootstrap
269    /// children and determines the behavior of
270    /// `boot.bootstrap().await` in those children.
271    pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> v1::Result<HostMesh> {
272        if let Ok(Some(boot)) = Bootstrap::get_from_env() {
273            let err = boot.bootstrap().await;
274            tracing::error!("failed to bootstrap local host mesh process: {}", err);
275            std::process::exit(1);
276        }
277
278        let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
279
280        let manager = BootstrapProcManager::new(bootstrap_cmd)?;
281        let host = Host::new(manager, addr).await?;
282        let addr = host.addr().clone();
283        let system_proc = host.system_proc().clone();
284        let host_mesh_agent = system_proc
285            .spawn("agent", HostMeshAgent::new(HostAgentMode::Process(host)))
286            .map_err(v1::Error::SingletonActorSpawnError)?;
287        host_mesh_agent.bind::<HostMeshAgent>();
288
289        let host = HostRef(addr);
290        let host_mesh_ref = HostMeshRef::new(
291            Name::new("local").unwrap(),
292            extent!(hosts = 1).into(),
293            vec![host],
294        )?;
295        Ok(HostMesh::take(host_mesh_ref))
296    }
297
298    /// Create a new process-based host mesh. Each host is represented by a local process,
299    /// which manages its set of procs. This is not a true host mesh the sense that each host
300    /// is not independent. The intent of `process` is for testing, examples, and experimentation.
301    ///
302    /// The bootstrap command is used to bootstrap both hosts and processes, thus it should be
303    /// a command that reaches [`crate::bootstrap_or_die`]. `process` is itself a valid bootstrap
304    /// entry point; thus using `BootstrapCommand::current` works correctly as long as `process`
305    /// is called early in the lifecycle of the process and reached unconditionally.
306    ///
307    /// TODO: thread through ownership
308    pub async fn process(extent: Extent, command: BootstrapCommand) -> v1::Result<HostMesh> {
309        if let Ok(Some(boot)) = Bootstrap::get_from_env() {
310            let err = boot.bootstrap().await;
311            tracing::error!("failed to bootstrap process host mesh process: {}", err);
312            std::process::exit(1);
313        }
314
315        let bind_spec = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT);
316        let mut hosts = Vec::with_capacity(extent.num_ranks());
317        for _ in 0..extent.num_ranks() {
318            // Note: this can be racy. Possibly we should have a callback channel.
319            let addr = bind_spec.binding_addr();
320            let bootstrap = Bootstrap::Host {
321                addr: addr.clone(),
322                command: Some(command.clone()),
323                config: Some(hyperactor_config::global::attrs()),
324            };
325
326            let mut cmd = command.new();
327            bootstrap.to_env(&mut cmd);
328            cmd.spawn()?;
329            hosts.push(HostRef(addr));
330        }
331
332        let host_mesh_ref = HostMeshRef::new(Name::new("process").unwrap(), extent.into(), hosts)?;
333        Ok(HostMesh::take(host_mesh_ref))
334    }
335
336    /// Allocate a host mesh from an [`Alloc`]. This creates a HostMesh with the same extent
337    /// as the provided alloc. Allocs generate procs, and thus we define and run a Host for each
338    /// proc allocated by it.
339    ///
340    /// ## Allocation strategy
341    ///
342    /// Because HostMeshes use direct-addressed procs, and must fully control the procs they are
343    /// managing, `HostMesh::allocate` uses a trampoline actor to launch the host, which in turn
344    /// runs a [`crate::v1::host_mesh::mesh_agent::HostMeshAgent`] actor to manage the host itself.
345    /// The host (and thus all of its procs) are exposed directly through a separate listening
346    /// channel, established by the host.
347    ///
348    /// ```text
349    ///                        ┌ ─ ─┌────────────────────┐
350    ///                             │allocated Proc:     │
351    ///                        │    │ ┌─────────────────┐│
352    ///                             │ │TrampolineActor  ││
353    ///                        │    │ │ ┌──────────────┐││
354    ///                             │ │ │Host          │││
355    ///               ┌────┬ ─ ┘    │ │ │ ┌──────────┐ │││
356    ///            ┌─▶│Proc│        │ │ │ │HostAgent │ │││
357    ///            │  └────┴ ─ ┐    │ │ │ └──────────┘ │││
358    ///            │  ┌────┐        │ │ │             ██████
359    /// ┌────────┐ ├─▶│Proc│   │    │ │ └──────────────┘││ ▲
360    /// │ Client │─┤  └────┘        │ └─────────────────┘│ listening channel
361    /// └────────┘ │  ┌────┐   └ ─ ─└────────────────────┘
362    ///            ├─▶│Proc│
363    ///            │  └────┘
364    ///            │  ┌────┐
365    ///            └─▶│Proc│
366    ///               └────┘
367    ///                 ▲
368    ///
369    ///          `Alloc`-provided
370    ///                procs
371    /// ```
372    ///
373    /// ## Lifecycle
374    ///
375    /// The returned `HostMesh` **owns** the underlying hosts. Call
376    /// [`shutdown`](Self::shutdown) to deterministically tear them
377    /// down. If you skip shutdown, `Drop` will attempt best-effort
378    /// cleanup only. Do not rely on `Drop` for correctness.
379    pub async fn allocate<C: context::Actor>(
380        cx: &C,
381        alloc: Box<dyn Alloc + Send + Sync>,
382        name: &str,
383        bootstrap_params: Option<BootstrapCommand>,
384    ) -> v1::Result<Self>
385    where
386        C::A: Handler<MeshFailure>,
387    {
388        Self::allocate_inner(cx, alloc, Name::new(name)?, bootstrap_params).await
389    }
390
391    // Use allocate_inner to set field mesh_name in span
392    #[hyperactor::instrument(fields(host_mesh=name.to_string()))]
393    async fn allocate_inner<C: context::Actor>(
394        cx: &C,
395        alloc: Box<dyn Alloc + Send + Sync>,
396        name: Name,
397        bootstrap_params: Option<BootstrapCommand>,
398    ) -> v1::Result<Self>
399    where
400        C::A: Handler<MeshFailure>,
401    {
402        tracing::info!(name = "HostMeshStatus", status = "Allocate::Attempt");
403        let transport = alloc.transport();
404        let extent = alloc.extent().clone();
405        let is_local = alloc.is_local();
406        let proc_mesh = ProcMesh::allocate(cx, alloc, name.name()).await?;
407
408        // TODO: figure out how to deal with MAST allocs. It requires an extra dimension,
409        // into which it launches multiple procs, so we need to always specify an additional
410        // sub-host dimension of size 1.
411
412        let (mesh_agents, mut mesh_agents_rx) = cx.mailbox().open_port();
413        let trampoline_name = Name::new("host_mesh_trampoline").unwrap();
414        let _trampoline_actor_mesh = proc_mesh
415            .spawn_with_name::<HostMeshAgentProcMeshTrampoline, C>(
416                cx,
417                trampoline_name,
418                &(transport, mesh_agents.bind(), bootstrap_params, is_local),
419                None,
420                // The trampoline is a system actor and does not need a controller.
421                true,
422            )
423            .await?;
424
425        // TODO: don't re-rank the hosts
426        let mut hosts = Vec::new();
427        for _rank in 0..extent.num_ranks() {
428            let mesh_agent = mesh_agents_rx.recv().await?;
429
430            let Some((addr, _)) = mesh_agent.actor_id().proc_id().as_direct() else {
431                return Err(v1::Error::HostMeshAgentConfigurationError(
432                    mesh_agent.actor_id().clone(),
433                    "host mesh agent must be a direct actor".to_string(),
434                ));
435            };
436
437            let host_ref = HostRef(addr.clone());
438            if host_ref.mesh_agent() != mesh_agent {
439                return Err(v1::Error::HostMeshAgentConfigurationError(
440                    mesh_agent.actor_id().clone(),
441                    format!(
442                        "expected mesh agent actor id to be {}",
443                        host_ref.mesh_agent().actor_id()
444                    ),
445                ));
446            }
447            hosts.push(host_ref);
448        }
449
450        let proc_mesh_ref = proc_mesh.clone();
451        let mesh = Self {
452            name: name.clone(),
453            extent: extent.clone(),
454            allocation: HostMeshAllocation::ProcMesh {
455                proc_mesh,
456                proc_mesh_ref,
457                hosts: hosts.clone(),
458            },
459            current_ref: HostMeshRef::new(name, extent.into(), hosts).unwrap(),
460        };
461
462        // Spawn a unique mesh controller for each proc mesh, so the type of the
463        // mesh can be preserved.
464        let controller = HostMeshController::new(mesh.deref().clone());
465        controller
466            .spawn(cx)
467            .map_err(|e| v1::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
468
469        tracing::info!(name = "HostMeshStatus", status = "Allocate::Created");
470        Ok(mesh)
471    }
472
473    /// Take ownership of an existing host mesh reference.
474    ///
475    /// Consumes the `HostMeshRef`, captures its region/hosts, and
476    /// returns an owned `HostMesh` that assumes lifecycle
477    /// responsibility for those hosts (i.e., will shut them down on
478    /// Drop).
479    pub fn take(mesh: HostMeshRef) -> Self {
480        let region = mesh.region().clone();
481        let hosts: Vec<HostRef> = mesh.values().collect();
482
483        let current_ref = HostMeshRef::new(mesh.name.clone(), region.clone(), hosts.clone())
484            .expect("region/hosts cardinality must match");
485
486        Self {
487            name: mesh.name,
488            extent: region.extent().clone(),
489            allocation: HostMeshAllocation::Owned { hosts },
490            current_ref,
491        }
492    }
493
494    /// Request a clean shutdown of all hosts owned by this
495    /// `HostMesh`.
496    ///
497    /// For each host, this sends `ShutdownHost` to its
498    /// `HostMeshAgent`. The agent takes and drops its `Host` (via
499    /// `Option::take()`), which in turn drops the embedded
500    /// `BootstrapProcManager`. On drop, the manager walks its PID
501    /// table and sends SIGKILL to any procs it spawned—tying proc
502    /// lifetimes to their hosts and preventing leaks.
503    #[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
504    pub async fn shutdown(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
505        tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
506        let mut failed_hosts = vec![];
507        for host in self.current_ref.values() {
508            if let Err(e) = host.shutdown(cx).await {
509                tracing::warn!(
510                    name = "HostMeshStatus",
511                    status = "Shutdown::Host::Failed",
512                    host = %host,
513                    error = %e,
514                    "host shutdown failed"
515                );
516                failed_hosts.push(host);
517            }
518        }
519        if failed_hosts.is_empty() {
520            tracing::info!(name = "HostMeshStatus", status = "Shutdown::Success");
521        } else {
522            tracing::error!(
523                name = "HostMeshStatus",
524                status = "Shutdown::Failed",
525                "host mesh shutdown failed; check the logs of the failed hosts for details: {:?}",
526                failed_hosts
527            );
528        }
529
530        match &mut self.allocation {
531            HostMeshAllocation::ProcMesh { proc_mesh, .. } => {
532                proc_mesh.stop(cx).await?;
533            }
534            HostMeshAllocation::Owned { .. } => {}
535        }
536        Ok(())
537    }
538}
539
540impl Deref for HostMesh {
541    type Target = HostMeshRef;
542
543    fn deref(&self) -> &Self::Target {
544        &self.current_ref
545    }
546}
547
548impl Drop for HostMesh {
549    /// Best-effort cleanup for owned host meshes on drop.
550    ///
551    /// When a `HostMesh` is dropped, it attempts to shut down all
552    /// hosts it owns:
553    /// - If a Tokio runtime is available, we spawn an ephemeral
554    ///   `Proc` + `Instance` and send `ShutdownHost` messages to each
555    ///   host. This ensures that the embedded `BootstrapProcManager`s
556    ///   are dropped, and all child procs they spawned are killed.
557    /// - If no runtime is available, we cannot perform async cleanup
558    ///   here; in that case we log a warning and rely on kernel-level
559    ///   PDEATHSIG or the individual `BootstrapProcManager`'s `Drop`
560    ///   as the final safeguard.
561    ///
562    /// This path is **last resort**: callers should prefer explicit
563    /// [`HostMesh::shutdown`] to guarantee orderly teardown. Drop
564    /// only provides opportunistic cleanup to prevent process leaks
565    /// if shutdown is skipped.
566    fn drop(&mut self) {
567        tracing::info!(
568            name = "HostMeshStatus",
569            host_mesh = %self.name,
570            status = "Dropping",
571        );
572        // Snapshot the owned hosts we're responsible for.
573        let hosts: Vec<HostRef> = match &self.allocation {
574            HostMeshAllocation::ProcMesh { hosts, .. } | HostMeshAllocation::Owned { hosts } => {
575                hosts.clone()
576            }
577        };
578
579        // Best-effort only when a Tokio runtime is available.
580        if let Ok(handle) = tokio::runtime::Handle::try_current() {
581            let mesh_name = self.name.clone();
582            let allocation_label = match &self.allocation {
583                HostMeshAllocation::ProcMesh { .. } => "proc_mesh",
584                HostMeshAllocation::Owned { .. } => "owned",
585            }
586            .to_string();
587
588            handle.spawn(async move {
589                let span = tracing::info_span!(
590                    "hostmesh_drop_cleanup",
591                    host_mesh = %mesh_name,
592                    allocation = %allocation_label,
593                    hosts = hosts.len(),
594                );
595                let _g = span.enter();
596
597                // Spin up a tiny ephemeral proc+instance to get an
598                // Actor context.
599                match hyperactor::Proc::direct(
600                    ChannelTransport::Unix.any(),
601                    "hostmesh-drop".to_string(),
602                )
603                {
604                    Err(e) => {
605                        tracing::warn!(
606                            error = %e,
607                            "failed to construct ephemeral Proc for drop-cleanup; \
608                             relying on PDEATHSIG/manager Drop"
609                        );
610                    }
611                    Ok(proc) => {
612                        match proc.instance("drop") {
613                            Err(e) => {
614                                tracing::warn!(
615                                    error = %e,
616                                    "failed to create ephemeral instance for drop-cleanup; \
617                                     relying on PDEATHSIG/manager Drop"
618                                );
619                            }
620                            Ok((instance, _guard)) => {
621                                let mut attempted = 0usize;
622                                let mut ok = 0usize;
623                                let mut err = 0usize;
624
625                                for host in hosts {
626                                    attempted += 1;
627                                    tracing::debug!(host = %host, "drop-cleanup: shutdown start");
628                                    match host.shutdown(&instance).await {
629                                        Ok(()) => {
630                                            ok += 1;
631                                            tracing::debug!(host = %host, "drop-cleanup: shutdown ok");
632                                        }
633                                        Err(e) => {
634                                            err += 1;
635                                            tracing::warn!(host = %host, error = %e, "drop-cleanup: shutdown failed");
636                                        }
637                                    }
638                                }
639
640                                tracing::info!(
641                                    attempted, ok, err,
642                                    "hostmesh drop-cleanup summary"
643                                );
644                            }
645                        }
646                    }
647                }
648            });
649        } else {
650            // No runtime here; PDEATHSIG and manager Drop remain the
651            // last-resort safety net.
652            tracing::warn!(
653                host_mesh = %self.name,
654                hosts = hosts.len(),
655                "HostMesh dropped without a Tokio runtime; skipping \
656                 best-effort shutdown. This indicates that .shutdown() \
657                 on this mesh has not been called before program exit \
658                 (perhaps due to a missing call to \
659                 'monarch.actor.shutdown_context()'?) This in turn can \
660                 lead to backtrace output due to folly SIGTERM \
661                 handlers."
662            );
663        }
664
665        tracing::info!(
666            name = "HostMeshStatus",
667            host_mesh = %self.name,
668            status = "Dropped",
669        );
670    }
671}
672
673/// Helper: legacy shim for error types that still require
674/// RankedValues<Status>. TODO(shayne-fletcher): Delete this
675/// shim once Error::ActorSpawnError carries a StatusMesh
676/// (ValueMesh<Status>) directly. At that point, use the mesh
677/// as-is and remove `mesh_to_rankedvalues_*` calls below.
678/// is_sentinel should return true if the value matches a previous filled in
679/// value. If the input value matches the sentinel, it gets replaced with the
680/// default.
681pub(crate) fn mesh_to_rankedvalues_with_default<T, F>(
682    mesh: &ValueMesh<T>,
683    default: T,
684    is_sentinel: F,
685    len: usize,
686) -> RankedValues<T>
687where
688    T: Eq + Clone + 'static,
689    F: Fn(&T) -> bool,
690{
691    let mut out = RankedValues::from((0..len, default));
692    for (i, s) in mesh.values().enumerate() {
693        if !is_sentinel(&s) {
694            out.merge_from(RankedValues::from((i..i + 1, s)));
695        }
696    }
697    out
698}
699
700/// A non-owning reference to a mesh of hosts.
701///
702/// Logically, this is a data structure that contains a set of ranked
703/// hosts organized into a [`Region`]. `HostMeshRef`s can be sliced to
704/// produce new references that contain a subset of the hosts in the
705/// original mesh.
706///
707/// `HostMeshRef`s have a concrete syntax, implemented by its
708/// `Display` and `FromStr` implementations.
709///
710/// This type does **not** control lifecycle. It only describes the
711/// topology of hosts. To take ownership and perform deterministic
712/// teardown, use [`HostMesh::take`], which returns an owned
713/// [`HostMesh`] that guarantees cleanup on `shutdown()` or `Drop`.
714///
715/// Cloning this type does not confer ownership. If a corresponding
716/// owned [`HostMesh`] shuts down the hosts, operations via a cloned
717/// `HostMeshRef` may fail because the hosts are no longer running.
718#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
719pub struct HostMeshRef {
720    name: Name,
721    region: Region,
722    ranks: Arc<Vec<HostRef>>,
723}
724wirevalue::register_type!(HostMeshRef);
725
726impl HostMeshRef {
727    /// Create a new (raw) HostMeshRef from the provided region and associated
728    /// ranks, which must match in cardinality.
729    #[allow(clippy::result_large_err)]
730    fn new(name: Name, region: Region, ranks: Vec<HostRef>) -> v1::Result<Self> {
731        if region.num_ranks() != ranks.len() {
732            return Err(v1::Error::InvalidRankCardinality {
733                expected: region.num_ranks(),
734                actual: ranks.len(),
735            });
736        }
737        Ok(Self {
738            name,
739            region,
740            ranks: Arc::new(ranks),
741        })
742    }
743
744    /// Create a new HostMeshRef from an arbitrary set of hosts. This is meant to
745    /// enable extrinsic bootstrapping.
746    pub fn from_hosts(name: Name, hosts: Vec<ChannelAddr>) -> Self {
747        Self {
748            name,
749            region: extent!(hosts = hosts.len()).into(),
750            ranks: Arc::new(hosts.into_iter().map(HostRef).collect()),
751        }
752    }
753
754    /// Create a new HostMeshRef from an arbitrary set of host mesh agents.
755    pub fn from_host_agents(name: Name, agents: Vec<ActorRef<HostMeshAgent>>) -> v1::Result<Self> {
756        Ok(Self {
757            name,
758            region: extent!(hosts = agents.len()).into(),
759            ranks: Arc::new(
760                agents
761                    .into_iter()
762                    .map(HostRef::try_from)
763                    .collect::<v1::Result<_>>()?,
764            ),
765        })
766    }
767
768    /// Create a unit HostMeshRef from a host mesh agent.
769    pub fn from_host_agent(name: Name, agent: ActorRef<HostMeshAgent>) -> v1::Result<Self> {
770        Ok(Self {
771            name,
772            region: Extent::unity().into(),
773            ranks: Arc::new(vec![HostRef::try_from(agent)?]),
774        })
775    }
776
777    /// Spawn a ProcMesh onto this host mesh. The per_host extent specifies the shape
778    /// of the procs to spawn on each host.
779    ///
780    /// Currently, spawn issues direct calls to each host agent. This will be fixed by
781    /// maintaining a comm actor on the host service procs themselves.
782    #[allow(clippy::result_large_err)]
783    pub async fn spawn<C: context::Actor>(
784        &self,
785        cx: &C,
786        name: &str,
787        per_host: Extent,
788    ) -> v1::Result<ProcMesh>
789    where
790        C::A: Handler<MeshFailure>,
791    {
792        self.spawn_inner(cx, Name::new(name)?, per_host).await
793    }
794
795    #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
796    async fn spawn_inner<C: context::Actor>(
797        &self,
798        cx: &C,
799        proc_mesh_name: Name,
800        per_host: Extent,
801    ) -> v1::Result<ProcMesh>
802    where
803        C::A: Handler<MeshFailure>,
804    {
805        tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt");
806        tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",);
807        let result = self.spawn_inner_inner(cx, proc_mesh_name, per_host).await;
808        match &result {
809            Ok(_) => {
810                tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success");
811                tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success");
812            }
813            Err(error) => {
814                tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error);
815                tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error);
816            }
817        }
818        result
819    }
820
821    async fn spawn_inner_inner<C: context::Actor>(
822        &self,
823        cx: &C,
824        proc_mesh_name: Name,
825        per_host: Extent,
826    ) -> v1::Result<ProcMesh>
827    where
828        C::A: Handler<MeshFailure>,
829    {
830        let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
831        let host_labels = self.region.labels().iter().collect::<HashSet<_>>();
832        if !per_host_labels
833            .intersection(&host_labels)
834            .collect::<Vec<_>>()
835            .is_empty()
836        {
837            return Err(v1::Error::ConfigurationError(anyhow::anyhow!(
838                "per_host dims overlap with existing dims when spawning proc mesh"
839            )));
840        }
841
842        let extent = self
843            .region
844            .extent()
845            .concat(&per_host)
846            .map_err(|err| v1::Error::ConfigurationError(err.into()))?;
847
848        let region: Region = extent.clone().into();
849
850        tracing::info!(
851            name = "ProcMeshStatus",
852            status = "Spawn::Attempt",
853            %region,
854            "spawning proc mesh"
855        );
856
857        let mut procs = Vec::new();
858        let num_ranks = region.num_ranks();
859        // Accumulator outputs full StatusMesh snapshots; seed with
860        // NotExist.
861        let (port, rx) = cx.mailbox().open_accum_port_opts(
862            crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
863            Some(ReducerOpts {
864                max_update_interval: Some(Duration::from_millis(50)),
865                initial_update_interval: None,
866            }),
867        );
868
869        // Create or update each proc, then fence on receiving status
870        // overlays. This prevents a race where procs become
871        // addressable before their local muxers are ready, which
872        // could make early messages unroutable. A future improvement
873        // would allow buffering in the host-level muxer to eliminate
874        // the need for this synchronization step.
875        let mut proc_names = Vec::new();
876        let client_config_override = hyperactor_config::global::attrs();
877        for (host_rank, host) in self.ranks.iter().enumerate() {
878            for per_host_rank in 0..per_host.num_ranks() {
879                let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
880                let proc_name = Name::new(format!("{}_{}", proc_mesh_name.name(), per_host_rank))?;
881                proc_names.push(proc_name.clone());
882                host.mesh_agent()
883                    .create_or_update(
884                        cx,
885                        proc_name.clone(),
886                        resource::Rank::new(create_rank),
887                        ProcSpec::new(client_config_override.clone()),
888                    )
889                    .await
890                    .map_err(|e| {
891                        v1::Error::HostMeshAgentConfigurationError(
892                            host.mesh_agent().actor_id().clone(),
893                            format!("failed while creating proc: {}", e),
894                        )
895                    })?;
896                let mut reply_port = port.bind();
897                // If this proc dies or some other issue renders the reply undeliverable,
898                // the reply does not need to be returned to the sender.
899                reply_port.return_undeliverable(false);
900                host.mesh_agent()
901                    .get_rank_status(cx, proc_name.clone(), reply_port)
902                    .await
903                    .map_err(|e| {
904                        v1::Error::HostMeshAgentConfigurationError(
905                            host.mesh_agent().actor_id().clone(),
906                            format!("failed while querying proc status: {}", e),
907                        )
908                    })?;
909                let proc_id = host.named_proc(&proc_name);
910                tracing::info!(
911                    name = "ProcMeshStatus",
912                    status = "Spawn::CreatingProc",
913                    %proc_id,
914                    rank = create_rank,
915                );
916                procs.push(ProcRef::new(
917                    proc_id,
918                    create_rank,
919                    // TODO: specify or retrieve from state instead, to avoid attestation.
920                    ActorRef::attest(host.named_proc(&proc_name).actor_id("agent", 0)),
921                ));
922            }
923        }
924
925        let start_time = RealClock.now();
926
927        // Wait on accumulated StatusMesh snapshots until complete or
928        // timeout.
929        match GetRankStatus::wait(
930            rx,
931            num_ranks,
932            hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
933            region.clone(), // fallback mesh if nothing arrives
934        )
935        .await
936        {
937            Ok(statuses) => {
938                // If any rank is terminating, surface a
939                // ProcCreationError pointing at that rank.
940                if let Some((rank, status)) = statuses
941                    .values()
942                    .enumerate()
943                    .find(|(_, s)| s.is_terminating())
944                {
945                    let proc_name = &proc_names[rank];
946                    let host_rank = rank / per_host.num_ranks();
947                    let mesh_agent = self.ranks[host_rank].mesh_agent();
948                    let (reply_tx, mut reply_rx) = cx.mailbox().open_port();
949                    let mut reply_tx = reply_tx.bind();
950                    // If this proc dies or some other issue renders the reply undeliverable,
951                    // the reply does not need to be returned to the sender.
952                    reply_tx.return_undeliverable(false);
953                    mesh_agent
954                        .send(
955                            cx,
956                            resource::GetState {
957                                name: proc_name.clone(),
958                                reply: reply_tx,
959                            },
960                        )
961                        .map_err(|e| {
962                            v1::Error::SendingError(mesh_agent.actor_id().clone(), e.into())
963                        })?;
964                    let state = match RealClock
965                        .timeout(
966                            hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
967                            reply_rx.recv(),
968                        )
969                        .await
970                    {
971                        Ok(Ok(state)) => state,
972                        _ => resource::State {
973                            name: proc_name.clone(),
974                            status,
975                            state: None,
976                        },
977                    };
978
979                    tracing::error!(
980                        name = "ProcMeshStatus",
981                        status = "Spawn::GetRankStatus",
982                        rank = host_rank,
983                        "rank {} is terminating with state: {}",
984                        host_rank,
985                        state
986                    );
987
988                    return Err(v1::Error::ProcCreationError {
989                        state: Box::new(state),
990                        host_rank,
991                        mesh_agent,
992                    });
993                }
994            }
995            Err(complete) => {
996                tracing::error!(
997                    name = "ProcMeshStatus",
998                    status = "Spawn::GetRankStatus",
999                    "timeout after {:?} when waiting for procs being created",
1000                    hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1001                );
1002                // Fill remaining ranks with a timeout status via the
1003                // legacy shim.
1004                let legacy = mesh_to_rankedvalues_with_default(
1005                    &complete,
1006                    Status::Timeout(start_time.elapsed()),
1007                    Status::is_not_exist,
1008                    num_ranks,
1009                );
1010                return Err(v1::Error::ProcSpawnError { statuses: legacy });
1011            }
1012        }
1013
1014        let mesh =
1015            ProcMesh::create_owned_unchecked(cx, proc_mesh_name, extent, self.clone(), procs).await;
1016        if let Ok(ref mesh) = mesh {
1017            // Spawn a unique mesh controller for each proc mesh, so the type of the
1018            // mesh can be preserved.
1019            let controller = ProcMeshController::new(mesh.deref().clone());
1020            controller
1021                .spawn(cx)
1022                .map_err(|e| v1::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
1023        }
1024        mesh
1025    }
1026
1027    /// The name of the referenced host mesh.
1028    pub fn name(&self) -> &Name {
1029        &self.name
1030    }
1031
1032    #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
1033    pub(crate) async fn stop_proc_mesh(
1034        &self,
1035        cx: &impl hyperactor::context::Actor,
1036        proc_mesh_name: &Name,
1037        procs: impl IntoIterator<Item = ProcId>,
1038        region: Region,
1039    ) -> anyhow::Result<()> {
1040        // Accumulator outputs full StatusMesh snapshots; seed with
1041        // NotExist.
1042        let mut proc_names = Vec::new();
1043        let num_ranks = region.num_ranks();
1044        // Accumulator outputs full StatusMesh snapshots; seed with
1045        // NotExist.
1046        let (port, rx) = cx.mailbox().open_accum_port_opts(
1047            crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
1048            Some(ReducerOpts {
1049                max_update_interval: Some(Duration::from_millis(50)),
1050                initial_update_interval: None,
1051            }),
1052        );
1053        for proc_id in procs.into_iter() {
1054            let Some((addr, proc_name)) = proc_id.as_direct() else {
1055                return Err(anyhow::anyhow!(
1056                    "host mesh proc {} must be direct addressed",
1057                    proc_id,
1058                ));
1059            };
1060            // The name stored in HostMeshAgent is not the same as the
1061            // one stored in the ProcMesh. We instead take each proc id
1062            // and map it to that particular agent.
1063            let proc_name = proc_name.parse::<Name>()?;
1064            proc_names.push(proc_name.clone());
1065
1066            // Note that we don't send 1 message per host agent, we send 1 message
1067            // per proc.
1068            let host = HostRef(addr.clone());
1069            host.mesh_agent().send(
1070                cx,
1071                resource::Stop {
1072                    name: proc_name.clone(),
1073                },
1074            )?;
1075            host.mesh_agent()
1076                .get_rank_status(cx, proc_name, port.bind())
1077                .await?;
1078
1079            tracing::info!(
1080                name = "ProcMeshStatus",
1081                %proc_id,
1082                status = "Stop::Sent",
1083            );
1084        }
1085        tracing::info!(
1086            name = "HostMeshStatus",
1087            status = "ProcMesh::Stop::Sent",
1088            "sending Stop to proc mesh for {} procs: {}",
1089            proc_names.len(),
1090            proc_names
1091                .iter()
1092                .map(|n| n.to_string())
1093                .collect::<Vec<_>>()
1094                .join(", ")
1095        );
1096
1097        let start_time = RealClock.now();
1098
1099        match GetRankStatus::wait(
1100            rx,
1101            num_ranks,
1102            hyperactor_config::global::get(PROC_STOP_MAX_IDLE),
1103            region.clone(), // fallback mesh if nothing arrives
1104        )
1105        .await
1106        {
1107            Ok(statuses) => {
1108                let all_stopped = statuses.values().all(|s| s.is_terminating());
1109                if !all_stopped {
1110                    tracing::error!(
1111                        name = "ProcMeshStatus",
1112                        status = "FailedToStop",
1113                        "failed to terminate proc mesh: {:?}",
1114                        statuses,
1115                    );
1116                    return Err(anyhow::anyhow!(
1117                        "failed to terminate proc mesh: {:?}",
1118                        statuses,
1119                    ));
1120                }
1121                tracing::info!(name = "ProcMeshStatus", status = "Stopped");
1122            }
1123            Err(complete) => {
1124                // Fill remaining ranks with a timeout status via the
1125                // legacy shim.
1126                let legacy = mesh_to_rankedvalues_with_default(
1127                    &complete,
1128                    Status::Timeout(start_time.elapsed()),
1129                    Status::is_not_exist,
1130                    num_ranks,
1131                );
1132                tracing::error!(
1133                    name = "ProcMeshStatus",
1134                    status = "StoppingTimeout",
1135                    "failed to terminate proc mesh before timeout: {:?}",
1136                    legacy,
1137                );
1138                return Err(anyhow::anyhow!(
1139                    "failed to terminate proc mesh {} before timeout: {:?}",
1140                    proc_mesh_name,
1141                    legacy
1142                ));
1143            }
1144        }
1145        Ok(())
1146    }
1147
1148    /// Get the state of all procs with Name in this host mesh.
1149    /// The procs iterator must be in rank order.
1150    /// The returned ValueMesh will have a non-empty inner state unless there
1151    /// was a timeout reaching the host mesh agent.
1152    #[allow(clippy::result_large_err)]
1153    pub(crate) async fn proc_states(
1154        &self,
1155        cx: &impl context::Actor,
1156        procs: impl IntoIterator<Item = ProcId>,
1157        region: Region,
1158    ) -> v1::Result<ValueMesh<resource::State<ProcState>>> {
1159        let (tx, mut rx) = cx.mailbox().open_port();
1160
1161        let mut num_ranks = 0;
1162        let procs: Vec<ProcId> = procs.into_iter().collect();
1163        let mut proc_names = Vec::new();
1164        for proc_id in procs.iter() {
1165            num_ranks += 1;
1166            let Some((addr, proc_name)) = proc_id.as_direct() else {
1167                return Err(v1::Error::ConfigurationError(anyhow::anyhow!(
1168                    "host mesh proc {} must be direct addressed",
1169                    proc_id,
1170                )));
1171            };
1172
1173            // Note that we don't send 1 message per host agent, we send 1 message
1174            // per proc.
1175            let host = HostRef(addr.clone());
1176            let proc_name = proc_name.parse::<Name>()?;
1177            proc_names.push(proc_name.clone());
1178            let mut reply = tx.bind();
1179            // If this proc dies or some other issue renders the reply undeliverable,
1180            // the reply does not need to be returned to the sender.
1181            reply.return_undeliverable(false);
1182            host.mesh_agent()
1183                .send(
1184                    cx,
1185                    resource::GetState {
1186                        name: proc_name,
1187                        reply,
1188                    },
1189                )
1190                .map_err(|e| {
1191                    v1::Error::CallError(host.mesh_agent().actor_id().clone(), e.into())
1192                })?;
1193        }
1194
1195        let mut states = Vec::with_capacity(num_ranks);
1196        let timeout = hyperactor_config::global::get(GET_PROC_STATE_MAX_IDLE);
1197        for _ in 0..num_ranks {
1198            // The agent runs on the same process as the running actor, so if some
1199            // fatal event caused the process to crash (e.g. OOM, signal, process exit),
1200            // the agent will be unresponsive.
1201            // We handle this by setting a timeout on the recv, and if we don't get a
1202            // message we assume the agent is dead and return a failed state.
1203            let state = RealClock.timeout(timeout, rx.recv()).await;
1204            if let Ok(state) = state {
1205                // Handle non-timeout receiver error.
1206                let state = state?;
1207                match state.state {
1208                    Some(ref inner) => {
1209                        states.push((inner.create_rank, state));
1210                    }
1211                    None => {
1212                        return Err(v1::Error::NotExist(state.name));
1213                    }
1214                }
1215            } else {
1216                // Timeout error, stop reading from the receiver and send back what we have so far,
1217                // padding with failed states.
1218                tracing::warn!(
1219                    "Timeout waiting for response from host mesh agent for proc_states after {:?}",
1220                    timeout
1221                );
1222                let all_ranks = (0..num_ranks).collect::<HashSet<_>>();
1223                let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
1224                let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
1225                assert_eq!(leftover_ranks.len(), num_ranks - states.len());
1226                while states.len() < num_ranks {
1227                    let rank = *leftover_ranks
1228                        .pop()
1229                        .expect("leftover ranks should not be empty");
1230                    states.push((
1231                        // We populate with any ranks leftover at the time of the timeout.
1232                        rank,
1233                        resource::State {
1234                            name: proc_names[rank].clone(),
1235                            status: resource::Status::Timeout(timeout),
1236                            state: None,
1237                        },
1238                    ));
1239                }
1240                break;
1241            }
1242        }
1243        // Ensure that all ranks have replied. Note that if the mesh is sliced,
1244        // not all create_ranks may be in the mesh.
1245        // Sort by rank, so that the resulting mesh is ordered.
1246        states.sort_by_key(|(rank, _)| *rank);
1247        let vm = states
1248            .into_iter()
1249            .map(|(_, state)| state)
1250            .collect_mesh::<ValueMesh<_>>(region)?;
1251        Ok(vm)
1252    }
1253}
1254
1255impl view::Ranked for HostMeshRef {
1256    type Item = HostRef;
1257
1258    fn region(&self) -> &Region {
1259        &self.region
1260    }
1261
1262    fn get(&self, rank: usize) -> Option<&Self::Item> {
1263        self.ranks.get(rank)
1264    }
1265}
1266
1267impl view::RankedSliceable for HostMeshRef {
1268    fn sliced(&self, region: Region) -> Self {
1269        let ranks = self
1270            .region()
1271            .remap(&region)
1272            .unwrap()
1273            .map(|index| self.get(index).unwrap().clone());
1274        Self::new(self.name.clone(), region, ranks.collect()).unwrap()
1275    }
1276}
1277
1278impl std::fmt::Display for HostMeshRef {
1279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1280        write!(f, "{}:", self.name)?;
1281        for (rank, host) in self.ranks.iter().enumerate() {
1282            if rank > 0 {
1283                write!(f, ",")?;
1284            }
1285            write!(f, "{}", host)?;
1286        }
1287        write!(f, "@{}", self.region)
1288    }
1289}
1290
1291/// The type of error occuring during `HostMeshRef` parsing.
1292#[derive(thiserror::Error, Debug)]
1293pub enum HostMeshRefParseError {
1294    #[error(transparent)]
1295    RegionParseError(#[from] RegionParseError),
1296
1297    #[error("invalid host mesh ref: missing region")]
1298    MissingRegion,
1299
1300    #[error("invalid host mesh ref: missing name")]
1301    MissingName,
1302
1303    #[error(transparent)]
1304    InvalidName(#[from] v1::NameParseError),
1305
1306    #[error(transparent)]
1307    InvalidHostMeshRef(#[from] Box<v1::Error>),
1308
1309    #[error(transparent)]
1310    Other(#[from] anyhow::Error),
1311}
1312
1313impl From<v1::Error> for HostMeshRefParseError {
1314    fn from(err: v1::Error) -> Self {
1315        Self::InvalidHostMeshRef(Box::new(err))
1316    }
1317}
1318
1319impl FromStr for HostMeshRef {
1320    type Err = HostMeshRefParseError;
1321
1322    fn from_str(s: &str) -> Result<Self, Self::Err> {
1323        let (name, rest) = s
1324            .split_once(':')
1325            .ok_or(HostMeshRefParseError::MissingName)?;
1326
1327        let name = Name::from_str(name)?;
1328
1329        let (hosts, region) = rest
1330            .split_once('@')
1331            .ok_or(HostMeshRefParseError::MissingRegion)?;
1332        let hosts = hosts
1333            .split(',')
1334            .map(|host| host.trim())
1335            .map(|host| host.parse::<HostRef>())
1336            .collect::<Result<Vec<_>, _>>()?;
1337        let region = region.parse()?;
1338        Ok(HostMeshRef::new(name, region, hosts)?)
1339    }
1340}
1341
1342#[cfg(test)]
1343mod tests {
1344    use std::assert_matches::assert_matches;
1345    use std::collections::HashSet;
1346    use std::collections::VecDeque;
1347
1348    use hyperactor::context::Mailbox as _;
1349    use hyperactor_config::attrs::Attrs;
1350    use itertools::Itertools;
1351    use ndslice::ViewExt;
1352    use ndslice::extent;
1353    use tokio::process::Command;
1354
1355    use super::*;
1356    use crate::Bootstrap;
1357    use crate::bootstrap::MESH_TAIL_LOG_LINES;
1358    use crate::resource::Status;
1359    use crate::v1::ActorMesh;
1360    use crate::v1::testactor;
1361    use crate::v1::testactor::GetConfigAttrs;
1362    use crate::v1::testactor::SetConfigAttrs;
1363    use crate::v1::testing;
1364
1365    #[test]
1366    fn test_host_mesh_subset() {
1367        let hosts: HostMeshRef = "test:local:1,local:2,local:3,local:4@replica=2/2,host=2/1"
1368            .parse()
1369            .unwrap();
1370        assert_eq!(
1371            hosts.range("replica", 1).unwrap().to_string(),
1372            "test:local:3,local:4@2+replica=1/2,host=2/1"
1373        );
1374    }
1375
1376    #[test]
1377    fn test_host_mesh_ref_parse_roundtrip() {
1378        let host_mesh_ref = HostMeshRef::new(
1379            Name::new("test").unwrap(),
1380            extent!(replica = 2, host = 2).into(),
1381            vec![
1382                "tcp:127.0.0.1:123".parse().unwrap(),
1383                "tcp:127.0.0.1:123".parse().unwrap(),
1384                "tcp:127.0.0.1:123".parse().unwrap(),
1385                "tcp:127.0.0.1:123".parse().unwrap(),
1386            ],
1387        )
1388        .unwrap();
1389
1390        assert_eq!(
1391            host_mesh_ref.to_string().parse::<HostMeshRef>().unwrap(),
1392            host_mesh_ref
1393        );
1394    }
1395
1396    #[tokio::test]
1397    #[cfg(fbcode_build)]
1398    async fn test_allocate() {
1399        let config = hyperactor_config::global::lock();
1400        let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1401
1402        let instance = testing::instance();
1403
1404        for alloc in testing::allocs(extent!(replicas = 4)).await {
1405            let mut host_mesh = HostMesh::allocate(instance, alloc, "test", None)
1406                .await
1407                .unwrap();
1408
1409            let proc_mesh1 = host_mesh
1410                .spawn(instance, "test_1", Extent::unity())
1411                .await
1412                .unwrap();
1413
1414            let actor_mesh1: ActorMesh<testactor::TestActor> =
1415                proc_mesh1.spawn(instance, "test", &()).await.unwrap();
1416
1417            let proc_mesh2 = host_mesh
1418                .spawn(instance, "test_2", extent!(gpus = 3, extra = 2))
1419                .await
1420                .unwrap();
1421            assert_eq!(
1422                proc_mesh2.extent(),
1423                extent!(replicas = 4, gpus = 3, extra = 2)
1424            );
1425            assert_eq!(proc_mesh2.values().count(), 24);
1426
1427            let actor_mesh2: ActorMesh<testactor::TestActor> =
1428                proc_mesh2.spawn(instance, "test", &()).await.unwrap();
1429            assert_eq!(
1430                actor_mesh2.extent(),
1431                extent!(replicas = 4, gpus = 3, extra = 2)
1432            );
1433            assert_eq!(actor_mesh2.values().count(), 24);
1434
1435            // Host meshes can be dereferenced to produce a concrete ref.
1436            let host_mesh_ref: HostMeshRef = host_mesh.clone();
1437            // Here, the underlying host mesh does not change:
1438            assert_eq!(
1439                host_mesh_ref.iter().collect::<Vec<_>>(),
1440                host_mesh.iter().collect::<Vec<_>>(),
1441            );
1442
1443            // Validate we can cast:
1444            for actor_mesh in [&actor_mesh1, &actor_mesh2] {
1445                let (port, mut rx) = instance.mailbox().open_port();
1446                actor_mesh
1447                    .cast(instance, testactor::GetActorId(port.bind()))
1448                    .unwrap();
1449
1450                let mut expected_actor_ids: HashSet<_> = actor_mesh
1451                    .values()
1452                    .map(|actor_ref| actor_ref.actor_id().clone())
1453                    .collect();
1454
1455                while !expected_actor_ids.is_empty() {
1456                    let actor_id = rx.recv().await.unwrap();
1457                    assert!(
1458                        expected_actor_ids.remove(&actor_id),
1459                        "got {actor_id}, expect {expected_actor_ids:?}"
1460                    );
1461                }
1462            }
1463
1464            // Now forward a message through all directed edges across the two meshes.
1465            // This tests the full connectivity of all the hosts, procs, and actors
1466            // involved in these two meshes.
1467            let mut to_visit: VecDeque<_> = actor_mesh1
1468                .values()
1469                .chain(actor_mesh2.values())
1470                .map(|actor_ref| actor_ref.port())
1471                // Each ordered pair of ports
1472                .permutations(2)
1473                // Flatten them to create a path:
1474                .flatten()
1475                .collect();
1476
1477            let expect_visited: Vec<_> = to_visit.clone().into();
1478
1479            // We are going to send to the first, and then set up a port to receive the last.
1480            let (last, mut last_rx) = instance.mailbox().open_port();
1481            to_visit.push_back(last.bind());
1482
1483            let forward = testactor::Forward {
1484                to_visit,
1485                visited: Vec::new(),
1486            };
1487            let first = forward.to_visit.front().unwrap().clone();
1488            first.send(instance, forward).unwrap();
1489
1490            let forward = last_rx.recv().await.unwrap();
1491            assert_eq!(forward.visited, expect_visited);
1492
1493            let _ = host_mesh.shutdown(&instance).await;
1494        }
1495    }
1496
1497    /// Allocate a new port on localhost. This drops the listener, releasing the socket,
1498    /// before returning. Hyperactor's channel::net applies SO_REUSEADDR, so we do not hav
1499    /// to wait out the socket's TIMED_WAIT state.
1500    ///
1501    /// Even so, this is racy.
1502    fn free_localhost_addr() -> ChannelAddr {
1503        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1504        ChannelAddr::Tcp(listener.local_addr().unwrap())
1505    }
1506
1507    #[tokio::test]
1508    #[cfg(fbcode_build)]
1509    async fn test_extrinsic_allocation() {
1510        let config = hyperactor_config::global::lock();
1511        let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1512
1513        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1514
1515        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1516
1517        let mut children = Vec::new();
1518        for host in hosts.iter() {
1519            let mut cmd = Command::new(program.clone());
1520            let boot = Bootstrap::Host {
1521                addr: host.clone(),
1522                command: None, // use current binary
1523                config: None,
1524            };
1525            boot.to_env(&mut cmd);
1526            cmd.kill_on_drop(true);
1527            children.push(cmd.spawn().unwrap());
1528        }
1529
1530        let instance = testing::instance();
1531        let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1532
1533        let proc_mesh = host_mesh
1534            .spawn(&testing::instance(), "test", Extent::unity())
1535            .await
1536            .unwrap();
1537
1538        let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
1539            .spawn(&testing::instance(), "test", &())
1540            .await
1541            .unwrap();
1542
1543        testactor::assert_mesh_shape(actor_mesh).await;
1544
1545        HostMesh::take(host_mesh)
1546            .shutdown(&instance)
1547            .await
1548            .expect("hosts shutdown");
1549    }
1550
1551    #[tokio::test]
1552    #[cfg(fbcode_build)]
1553    async fn test_failing_proc_allocation() {
1554        let lock = hyperactor_config::global::lock();
1555        let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
1556
1557        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1558
1559        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1560
1561        let mut children = Vec::new();
1562        for host in hosts.iter() {
1563            let mut cmd = Command::new(program.clone());
1564            let boot = Bootstrap::Host {
1565                addr: host.clone(),
1566                config: None,
1567                // The entire purpose of this is to fail:
1568                command: Some(BootstrapCommand::from("false")),
1569            };
1570            boot.to_env(&mut cmd);
1571            cmd.kill_on_drop(true);
1572            children.push(cmd.spawn().unwrap());
1573        }
1574        let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1575
1576        let instance = testing::instance();
1577
1578        let err = host_mesh
1579            .spawn(&instance, "test", Extent::unity())
1580            .await
1581            .unwrap_err();
1582        assert_matches!(
1583            err,
1584            v1::Error::ProcCreationError { state, .. }
1585            if matches!(state.status, resource::Status::Failed(ref msg) if msg.contains("failed to configure process: Ready(Terminal(Stopped { exit_code: 1"))
1586        );
1587    }
1588
1589    #[tokio::test]
1590    #[cfg(fbcode_build)]
1591    async fn test_halting_proc_allocation() {
1592        let config = hyperactor_config::global::lock();
1593        let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(20));
1594
1595        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1596
1597        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1598
1599        let mut children = Vec::new();
1600
1601        for (index, host) in hosts.iter().enumerate() {
1602            let mut cmd = Command::new(program.clone());
1603            let command = if index == 0 {
1604                let mut command = BootstrapCommand::from("sleep");
1605                command.args.push("60".to_string());
1606                Some(command)
1607            } else {
1608                None
1609            };
1610            let boot = Bootstrap::Host {
1611                addr: host.clone(),
1612                config: None,
1613                command,
1614            };
1615            boot.to_env(&mut cmd);
1616            cmd.kill_on_drop(true);
1617            children.push(cmd.spawn().unwrap());
1618        }
1619        let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1620
1621        let instance = testing::instance();
1622
1623        let err = host_mesh
1624            .spawn(&instance, "test", Extent::unity())
1625            .await
1626            .unwrap_err();
1627        let statuses = err.into_proc_spawn_error().unwrap();
1628        assert_matches!(
1629            &statuses.materialized_iter(2).cloned().collect::<Vec<_>>()[..],
1630            &[Status::Timeout(_), Status::Running]
1631        );
1632    }
1633
1634    #[tokio::test]
1635    #[cfg(fbcode_build)]
1636    async fn test_client_config_override() {
1637        let config = hyperactor_config::global::lock();
1638        let _guard1 = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1639        let _guard2 = config.override_key(
1640            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1641            Duration::from_mins(2),
1642        );
1643        let _guard3 = config.override_key(
1644            hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1645            Duration::from_mins(1),
1646        );
1647
1648        // Unset env vars that were mirrored by TestOverride, so child
1649        // processes don't inherit them. This allows Runtime layer to
1650        // override ClientOverride. SAFETY: Single-threaded test under
1651        // global config lock.
1652        unsafe {
1653            std::env::remove_var("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT");
1654            std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT");
1655        }
1656
1657        let instance = testing::instance();
1658
1659        let proc_meshes = testing::proc_meshes(instance, extent!(replicas = 2)).await;
1660        let proc_mesh = proc_meshes.get(1).unwrap();
1661
1662        let actor_mesh: ActorMesh<testactor::TestActor> =
1663            proc_mesh.spawn(instance, "test", &()).await.unwrap();
1664
1665        let mut attrs_override = Attrs::new();
1666        attrs_override.set(
1667            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1668            Duration::from_mins(3),
1669        );
1670        actor_mesh
1671            .cast(
1672                instance,
1673                SetConfigAttrs(bincode::serialize(&attrs_override).unwrap()),
1674            )
1675            .unwrap();
1676
1677        let (tx, mut rx) = instance.open_port();
1678        actor_mesh
1679            .cast(instance, GetConfigAttrs(tx.bind()))
1680            .unwrap();
1681        let actual_attrs = rx.recv().await.unwrap();
1682        let actual_attrs = bincode::deserialize::<Attrs>(&actual_attrs).unwrap();
1683
1684        assert_eq!(
1685            *actual_attrs
1686                .get(hyperactor::config::HOST_SPAWN_READY_TIMEOUT)
1687                .unwrap(),
1688            Duration::from_mins(3)
1689        );
1690        assert_eq!(
1691            *actual_attrs
1692                .get(hyperactor::config::MESSAGE_DELIVERY_TIMEOUT)
1693                .unwrap(),
1694            Duration::from_mins(1)
1695        );
1696    }
1697}