Skip to main content

hyperactor_mesh/
host_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Host-mesh attach and lifecycle.
10//!
11//! ## Host-mesh invariants (HM-*)
12//!
13//! These are the load-bearing semantic contracts of `HostMesh::attach()`
14//! and `HostMeshRef::push_config()`. They describe what callers may
15//! rely on; they do not pin specific mechanisms (the current
16//! implementation chooses a particular send path, error taxonomy, and
17//! timeout shape — those are not invariants and may evolve).
18//!
19//! - **HM-1 (attach-config-complete).** If `HostMesh::attach()` returns
20//!   `Ok`, every attached host has installed the client's propagatable
21//!   config snapshot.
22//!
23//! - **HM-2 (attach-config-fails-closed).** If config push fails on
24//!   any attached host, `HostMesh::attach()` returns `Err`. It must
25//!   not return a partially-configured mesh as success.
26//!
27//! - **HM-3 (in-band-request-failure-surface).** Attach-time
28//!   config-push *request* failure must surface through `attach()` /
29//!   `push_config()` as a structured error. It must not bypass that
30//!   result path by returning the outbound request on the caller's
31//!   `Undeliverable<MessageEnvelope>` channel. The invariant names a
32//!   specific prohibited bypass; what a caller chooses to do with the
33//!   returned `Err` (escalate, retry, abort) is outside scope.
34//!   Cross-cutting: depends on hyperactor undeliverable semantics in
35//!   `hyperactor::reference` and `hyperactor::actor`; the invariant
36//!   still belongs here because the attach contract is owned here.
37//!
38//! - **HM-4 (host-scoped-error-reporting).** Config-push failure
39//!   reported from `push_config()` identifies the failing host(s)
40//!   individually, so callers can act per-host. The contract commits
41//!   to per-host *identity*; the failure-mode taxonomy carried
42//!   alongside it (the `ConfigPushFailure` variant set) is
43//!   implementation detail and may evolve without changing the
44//!   contract.
45
46#![allow(clippy::result_large_err)]
47
48use hyperactor::Actor;
49use hyperactor::ActorRef;
50use hyperactor::Endpoint as _;
51use hyperactor::Handler;
52use hyperactor::accum::StreamingReducerOpts;
53use hyperactor::channel::ChannelTransport;
54use hyperactor::id::Label;
55use hyperactor_config::CONFIG;
56use hyperactor_config::ConfigAttr;
57use hyperactor_config::attrs::declare_attrs;
58use ndslice::view::CollectMeshExt;
59
60use crate::mesh_admin::MeshAdminAgent;
61use crate::supervision::MeshFailure;
62
63pub mod host_agent;
64
65use std::collections::HashSet;
66use std::hash::Hash;
67use std::ops::Deref;
68use std::ops::DerefMut;
69use std::str::FromStr;
70use std::sync::Arc;
71use std::time::Duration;
72
73use hyperactor::ActorAddr;
74use hyperactor::ProcAddr;
75use hyperactor::channel::ChannelAddr;
76use hyperactor::context;
77use ndslice::Extent;
78use ndslice::Region;
79use ndslice::ViewExt;
80use ndslice::extent;
81use ndslice::view;
82use ndslice::view::Ranked;
83use ndslice::view::RegionParseError;
84use serde::Deserialize;
85use serde::Serialize;
86use tracing::Instrument;
87use typeuri::Named;
88
89use crate::Bootstrap;
90use crate::ProcMesh;
91use crate::ValueMesh;
92use crate::bootstrap::BootstrapCommand;
93use crate::bootstrap::BootstrapProcManager;
94use crate::bootstrap::ProcBind;
95use crate::host::Host;
96use crate::host::LocalProcManager;
97use crate::host::SERVICE_PROC_NAME;
98use crate::host_mesh::host_agent::DrainHostClient;
99pub use crate::host_mesh::host_agent::HostAgent;
100use crate::host_mesh::host_agent::HostAgentMode;
101use crate::host_mesh::host_agent::ProcManagerSpawnFn;
102use crate::host_mesh::host_agent::ProcState;
103use crate::host_mesh::host_agent::ShutdownHostClient;
104use crate::mesh_controller::ProcMeshController;
105use crate::mesh_id::HostMeshId;
106use crate::mesh_id::ProcMeshId;
107use crate::mesh_id::ResourceId;
108use crate::proc_agent::ProcAgent;
109use crate::proc_mesh::ProcMeshRef;
110use crate::resource;
111use crate::resource::CreateOrUpdateClient;
112use crate::resource::GetRankStatus;
113use crate::resource::GetRankStatusClient;
114use crate::resource::RankedValues;
115use crate::resource::Status;
116use crate::resource::WaitRankStatusClient;
117use crate::transport::DEFAULT_TRANSPORT;
118
119/// Actor name for `ProcMeshController` when spawned as a named child.
120pub const PROC_MESH_CONTROLLER_NAME: &str = "proc_mesh_controller";
121
122declare_attrs! {
123    /// The maximum idle time between updates while spawning proc
124    /// meshes.
125    @meta(CONFIG = ConfigAttr::new(
126        Some("HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()),
127        Some("mesh_proc_spawn_max_idle".to_string()),
128    ))
129    pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
130
131    /// The maximum idle time between updates while stopping proc
132    /// meshes.
133    @meta(CONFIG = ConfigAttr::new(
134        Some("HYPERACTOR_MESH_PROC_STOP_MAX_IDLE".to_string()),
135        Some("proc_stop_max_idle".to_string()),
136    ))
137    pub attr PROC_STOP_MAX_IDLE: Duration = Duration::from_secs(30);
138
139    /// The maximum idle time between updates while querying host meshes
140    /// for their proc states.
141    @meta(CONFIG = ConfigAttr::new(
142        Some("HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE".to_string()),
143        Some("get_proc_state_max_idle".to_string()),
144    ))
145    pub attr GET_PROC_STATE_MAX_IDLE: Duration = Duration::from_mins(1);
146}
147
148/// A reference to a single host.
149#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
150pub struct HostRef(pub(crate) ChannelAddr);
151wirevalue::register_type!(HostRef);
152
153impl HostRef {
154    /// The host mesh agent associated with this host.
155    pub(crate) fn mesh_agent(&self) -> ActorRef<HostAgent> {
156        ActorRef::attest(
157            self.service_proc()
158                .actor_addr(host_agent::HOST_MESH_AGENT_ACTOR_NAME),
159        )
160    }
161
162    /// The ProcAddr for the proc with name `name` on this host.
163    fn named_proc(&self, id: &ResourceId) -> ProcAddr {
164        id.proc_addr(self.0.clone())
165    }
166
167    /// The service proc on this host.
168    fn service_proc(&self) -> ProcAddr {
169        ResourceId::proc_addr_from_name(self.0.clone(), SERVICE_PROC_NAME)
170    }
171
172    /// Request an orderly teardown of this host and all procs it
173    /// spawned.
174    ///
175    /// This resolves the per-child grace **timeout** and the maximum
176    /// termination **concurrency** from config and sends a
177    /// [`ShutdownHost`] message to the host's agent. The agent then:
178    ///
179    /// 1) Performs a graceful termination pass over all tracked
180    ///    children (TERM → wait(`timeout`) → KILL), with at most
181    ///    `max_in_flight` running concurrently.
182    /// 2) After the pass completes, **drops the Host**, which also
183    ///    drops the embedded `BootstrapProcManager`. The manager's
184    ///    `Drop` serves as a last-resort safety net (it SIGKILLs
185    ///    anything that somehow remains).
186    ///
187    /// This call returns `Ok(()))` only after the agent has finished
188    /// the termination pass and released the host, so the host is no
189    /// longer reachable when this returns.
190    pub(crate) async fn shutdown(
191        &self,
192        cx: &impl hyperactor::context::Actor,
193    ) -> anyhow::Result<()> {
194        let agent = self.mesh_agent();
195        let terminate_timeout =
196            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
197        let max_in_flight =
198            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
199        agent
200            .shutdown_host(cx, terminate_timeout, max_in_flight.clamp(1, 256))
201            .await?;
202        Ok(())
203    }
204
205    /// Drain all user procs on this host but keep the host, service
206    /// proc, and networking alive. Used during mesh stop/shutdown so
207    /// that forwarder flushes can still reach remote hosts.
208    pub(crate) async fn drain(
209        &self,
210        cx: &impl hyperactor::context::Actor,
211        host_mesh_id: Option<crate::mesh_id::HostMeshId>,
212    ) -> anyhow::Result<()> {
213        let agent = self.mesh_agent();
214        let terminate_timeout =
215            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
216        let max_in_flight =
217            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
218        agent
219            .drain_host(
220                cx,
221                terminate_timeout,
222                max_in_flight.clamp(1, 256),
223                host_mesh_id,
224            )
225            .await?;
226        Ok(())
227    }
228}
229
230impl TryFrom<ActorRef<HostAgent>> for HostRef {
231    type Error = crate::Error;
232
233    fn try_from(value: ActorRef<HostAgent>) -> Result<Self, crate::Error> {
234        let proc_id = value.actor_addr().proc_addr();
235        Ok(HostRef(proc_id.addr().clone()))
236    }
237}
238
239impl std::fmt::Display for HostRef {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        self.0.fmt(f)
242    }
243}
244
245impl FromStr for HostRef {
246    type Err = <ChannelAddr as FromStr>::Err;
247
248    fn from_str(s: &str) -> Result<Self, Self::Err> {
249        Ok(HostRef(ChannelAddr::from_str(s)?))
250    }
251}
252
253/// Per-host failure modes for the attach-time config push.
254///
255/// **Implementation detail of [`ConfigPushError`].** The variant
256/// taxonomy is *not* part of HM-4 or any other invariant — HM-4
257/// commits to per-host *identity* in the error, not to a specific
258/// menu of failure subtypes. The variant set may grow, shrink, or be
259/// reshaped without changing the contract; tests should not pin to a
260/// specific variant unless the fixture deterministically guarantees
261/// it.
262#[derive(Debug, thiserror::Error)]
263pub enum ConfigPushFailure {
264    /// Synchronous send failure (e.g. the request port refused the
265    /// post). The error is preserved for triage.
266    #[error("send failed: {0}")]
267    SendFailed(#[source] Box<hyperactor::mailbox::MailboxSenderError>),
268
269    /// The awaited reply did not arrive within
270    /// `MESH_ATTACH_CONFIG_TIMEOUT`. With request-bounce suppression
271    /// (HM-3 mechanism), this is the dominant failure mode for an
272    /// unreachable host: the channel-side `BrokenLink` is logged at
273    /// debug from `MailboxClient`'s buffer task; the contract surface
274    /// is just "did not reply in time".
275    #[error("reply timed out after MESH_ATTACH_CONFIG_TIMEOUT")]
276    ReplyTimedOut,
277
278    /// The reply receiver closed before any value arrived (sender
279    /// side dropped, or the local mailbox tore down).
280    #[error("reply channel closed before reply")]
281    ReplyChannelClosed,
282}
283
284/// Aggregated `attach()` config-push failure surface — one entry per
285/// host that didn't acknowledge installation.
286///
287/// HM-4: per-host identity is preserved so callers can act per-host.
288/// The host-identity key is `HostRef` (the iteration unit in
289/// `push_config()`); the per-host cause is a `ConfigPushFailure`
290/// (taxonomy is implementation-detail; see the type's doc).
291#[derive(Debug)]
292pub struct ConfigPushError {
293    /// One entry per host whose config push didn't succeed.
294    pub failures: Vec<(HostRef, ConfigPushFailure)>,
295}
296
297impl std::fmt::Display for ConfigPushError {
298    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299        write!(
300            f,
301            "config push failed during attach on {} host(s):",
302            self.failures.len()
303        )?;
304        for (host, failure) in &self.failures {
305            write!(f, "\n  - {}: {}", host, failure)?;
306        }
307        Ok(())
308    }
309}
310
311impl std::error::Error for ConfigPushError {
312    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
313        // Per-host causes are surfaced through `Display`. A single
314        // top-level `source()` would arbitrarily pick one host's
315        // cause and obscure the others.
316        None
317    }
318}
319
320/// Push the propagatable client config to a single host, awaiting
321/// the host's installation acknowledgement.
322///
323/// HM-3 mechanism: the outbound request envelope is constructed
324/// manually (rather than via the `RefClient`-derived
325/// `set_client_config` shortcut) so the per-call
326/// `return_undeliverable(false)` setter can suppress the request
327/// bounce. With suppression, the only failure surface is the awaited
328/// reply path — exactly what `push_config()` wants in-band.
329async fn push_config_to_host(
330    cx: &impl context::Actor,
331    host: &HostRef,
332    attrs: hyperactor_config::attrs::Attrs,
333    per_host_timeout: Duration,
334) -> Result<(), ConfigPushFailure> {
335    use crate::host_mesh::host_agent::SetClientConfig;
336
337    let (reply_handle, mut reply_receiver) = hyperactor::mailbox::open_port::<()>(cx);
338    let reply_ref = reply_handle.bind();
339    let msg = SetClientConfig {
340        attrs,
341        done: reply_ref,
342    };
343
344    let mut request_port = host.mesh_agent().port::<SetClientConfig>();
345    // HM-3: bypass the default `return_undeliverable: true` from
346    // `PortRef::attest`. Without this, a transient session failure
347    // (e.g. "never connected") would bounce the request back through
348    // the caller's `Undeliverable<MessageEnvelope>` handler — the
349    // exact bypass HM-3 prohibits.
350    request_port.return_undeliverable(false);
351
352    request_port.post(cx, msg);
353
354    match tokio::time::timeout(per_host_timeout, reply_receiver.recv()).await {
355        Ok(Ok(())) => Ok(()),
356        Ok(Err(_recv_err)) => Err(ConfigPushFailure::ReplyChannelClosed),
357        Err(_elapsed) => Err(ConfigPushFailure::ReplyTimedOut),
358    }
359}
360
361/// An owned mesh of hosts.
362///
363/// # Lifecycle
364/// `HostMesh` owns host lifecycles. Callers **must** invoke
365/// [`HostMesh::shutdown`] for deterministic teardown.
366///
367/// In tests and production, prefer explicit shutdown to guarantee
368/// that host agents drop their `BootstrapProcManager`s and that all
369/// child procs are reaped. You can use `shutdown_guard` to get a wrapper
370/// which will try to do a best-effort shutdown on Drop.
371pub struct HostMesh {
372    id: HostMeshId,
373    extent: Extent,
374    /// The hosts this `HostMesh` owns and is responsible for tearing
375    /// down on shutdown or drop.
376    owned_hosts: Vec<HostRef>,
377    current_ref: HostMeshRef,
378}
379
380impl HostMesh {
381    /// Emit a telemetry event for this host mesh creation.
382    fn notify_created(&self) {
383        let name_str = self.id.to_string();
384        let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
385
386        hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
387            id: mesh_id_hash,
388            timestamp: std::time::SystemTime::now(),
389            class: "Host".to_string(),
390            given_name: self
391                .id
392                .display_label()
393                .map(|l| l.as_str())
394                .unwrap_or("unnamed")
395                .to_string(),
396            full_name: name_str,
397            shape_json: serde_json::to_string(&self.extent).unwrap_or_default(),
398            parent_mesh_id: None,
399            parent_view_json: None,
400        });
401
402        // Notify telemetry of each HostAgent actor in this mesh.
403        // These are skipped in Proc::spawn_inner. mesh_id directly points to host mesh.
404        let now = std::time::SystemTime::now();
405        for (rank, host) in self.current_ref.hosts().iter().enumerate() {
406            let actor = host.mesh_agent();
407            hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
408                id: hyperactor_telemetry::hash_to_u64(&actor.actor_addr()),
409                timestamp: now,
410                mesh_id: mesh_id_hash,
411                rank: rank as u64,
412                full_name: actor.actor_addr().to_string(),
413                display_name: None,
414            });
415        }
416    }
417
418    /// Bring up a local single-host mesh and, in the launcher
419    /// process, return a `HostMesh` handle for it.
420    ///
421    /// There are two execution modes:
422    ///
423    /// - bootstrap-child mode: if `Bootstrap::get_from_env()` says
424    ///   this process was launched as a bootstrap child, we call
425    ///   `boot.bootstrap().await`, which hands control to the
426    ///   bootstrap logic for this process (as defined by the
427    ///   `BootstrapCommand` the parent used to spawn it). if that
428    ///   call returns, we log the error and terminate. this branch
429    ///   does not produce a `HostMesh`.
430    ///
431    /// - launcher mode: otherwise, we are the process that is setting
432    ///   up the mesh. we create a `Host`, spawn a `HostAgent` in
433    ///   it, and build a single-host `HostMesh` around that. that
434    ///   `HostMesh` is returned to the caller.
435    ///
436    /// This API is intended for tests, examples, and local bring-up,
437    /// not production.
438    ///
439    /// TODO: fix up ownership
440    pub async fn local() -> crate::Result<HostMesh> {
441        Self::local_with_bootstrap(BootstrapCommand::current()?).await
442    }
443
444    /// Same as [`local`], but the caller supplies the
445    /// `BootstrapCommand` instead of deriving it from the current
446    /// process.
447    ///
448    /// The provided `bootstrap_cmd` is used when spawning bootstrap
449    /// children and determines the behavior of
450    /// `boot.bootstrap().await` in those children.
451    pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> crate::Result<HostMesh> {
452        if let Ok(Some(boot)) = Bootstrap::get_from_env() {
453            let result = boot.bootstrap().await;
454            if let Err(err) = result {
455                tracing::error!("failed to bootstrap local host mesh process: {}", err);
456            }
457            std::process::exit(1);
458        }
459
460        let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
461
462        let manager = BootstrapProcManager::new(bootstrap_cmd)?;
463        let host = Host::new(manager, addr).await?;
464        let addr = host.addr().clone();
465        let system_proc = host.system_proc().clone();
466        let host_mesh_agent = system_proc
467            .spawn(
468                "host_agent",
469                HostAgent::new(HostAgentMode::Process {
470                    host,
471                    shutdown_tx: None,
472                }),
473            )
474            .map_err(crate::Error::SingletonActorSpawnError)?;
475        host_mesh_agent.bind::<HostAgent>();
476
477        let host = HostRef(addr);
478        let host_mesh_ref = HostMeshRef::new(
479            HostMeshId::instance(Label::new("local").unwrap()),
480            extent!(hosts = 1).into(),
481            vec![host],
482        )?;
483        Ok(HostMesh::take(host_mesh_ref))
484    }
485
486    /// Create a local in-process host mesh where all procs run in the
487    /// current OS process.
488    ///
489    /// Unlike [`local`] which spawns child processes for each proc,
490    /// this method uses [`LocalProcManager`] to run everything
491    /// in-process. This makes all actors visible in the admin tree
492    /// (useful for debugging with the TUI).
493    ///
494    /// This API is intended for tests, examples, and debugging.
495    pub async fn local_in_process() -> crate::Result<HostMesh> {
496        let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
497        Ok(HostMesh::take(Self::local_n_in_process(vec![addr]).await?))
498    }
499
500    /// Create a local in-process host mesh with multiple hosts, where
501    /// all procs run in the current OS process using [`LocalProcManager`].
502    ///
503    /// Each address in `addrs` becomes a separate host. The resulting
504    /// mesh has `extent!(hosts = addrs.len())`.
505    ///
506    /// This API is intended for unit tests that need a multi-host mesh
507    /// within a single process.
508    pub(crate) async fn local_n_in_process(addrs: Vec<ChannelAddr>) -> crate::Result<HostMeshRef> {
509        let n = addrs.len();
510        let mut host_refs = Vec::with_capacity(n);
511        for addr in addrs {
512            host_refs.push(Self::create_in_process_host(addr).await?);
513        }
514        HostMeshRef::new(
515            HostMeshId::instance(Label::new("local").unwrap()),
516            extent!(hosts = n).into(),
517            host_refs,
518        )
519    }
520
521    /// Create a single in-process host at the given address, returning
522    /// a [`HostRef`] for it.
523    async fn create_in_process_host(addr: ChannelAddr) -> crate::Result<HostRef> {
524        let spawn: ProcManagerSpawnFn =
525            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
526        let manager = LocalProcManager::new(spawn);
527        let host = Host::new(manager, addr).await?;
528        let addr = host.addr().clone();
529        let system_proc = host.system_proc().clone();
530        let host_mesh_agent = system_proc
531            .spawn(
532                host_agent::HOST_MESH_AGENT_ACTOR_NAME,
533                HostAgent::new(HostAgentMode::Local(host)),
534            )
535            .map_err(crate::Error::SingletonActorSpawnError)?;
536        host_mesh_agent.bind::<HostAgent>();
537        Ok(HostRef(addr))
538    }
539
540    /// Create a new process-based host mesh. Each host is represented by a local process,
541    /// which manages its set of procs. This is not a true host mesh the sense that each host
542    /// is not independent. The intent of `process` is for testing, examples, and experimentation.
543    ///
544    /// The bootstrap command is used to bootstrap both hosts and processes, thus it should be
545    /// a command that reaches [`crate::bootstrap_or_die`]. `process` is itself a valid bootstrap
546    /// entry point; thus using `BootstrapCommand::current` works correctly as long as `process`
547    /// is called early in the lifecycle of the process and reached unconditionally.
548    ///
549    /// TODO: thread through ownership
550    pub async fn process(extent: Extent, command: BootstrapCommand) -> crate::Result<HostMesh> {
551        if let Ok(Some(boot)) = Bootstrap::get_from_env() {
552            let result = boot.bootstrap().await;
553            if let Err(err) = result {
554                tracing::error!("failed to bootstrap process host mesh process: {}", err);
555            }
556            std::process::exit(1);
557        }
558
559        let bind_spec = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT);
560        let mut hosts = Vec::with_capacity(extent.num_ranks());
561        for _ in 0..extent.num_ranks() {
562            // Note: this can be racy. Possibly we should have a callback channel.
563            let addr = bind_spec.binding_addr();
564            let bootstrap = Bootstrap::Host {
565                addr: addr.clone(),
566                command: Some(command.clone()),
567                config: Some(hyperactor_config::global::attrs()),
568                exit_on_shutdown: false,
569            };
570
571            let mut cmd = command.new();
572            bootstrap.to_env(&mut cmd);
573            cmd.spawn()?;
574            hosts.push(HostRef(addr));
575        }
576
577        let host_mesh_ref = HostMeshRef::new(
578            HostMeshId::instance(Label::new("process").unwrap()),
579            extent.into(),
580            hosts,
581        )?;
582        Ok(HostMesh::take(host_mesh_ref))
583    }
584    /// Take ownership of an existing host mesh reference.
585    ///
586    /// Consumes the `HostMeshRef`, captures its region/hosts, and
587    /// returns an owned `HostMesh` that assumes lifecycle
588    /// responsibility for those hosts (i.e., will shut them down on
589    /// Drop).
590    pub fn take(mesh: HostMeshRef) -> Self {
591        let region = mesh.region().clone();
592        let hosts: Vec<HostRef> = mesh.values().collect();
593
594        let current_ref = HostMeshRef::new(mesh.id.clone(), region.clone(), hosts.clone())
595            .expect("region/hosts cardinality must match");
596
597        let result = Self {
598            id: mesh.id,
599            extent: region.extent().clone(),
600            owned_hosts: hosts,
601            current_ref,
602        };
603        result.notify_created();
604        result
605    }
606
607    /// Attach to pre-existing workers and push client config.
608    ///
609    /// This is the "simple bootstrap" attach protocol:
610    /// 1. Wraps the provided addresses into a `HostMeshRef`.
611    /// 2. Snapshots `propagatable_attrs()` from the client's global config.
612    /// 3. Pushes the config to each host agent as `Source::ClientOverride`,
613    ///    awaiting per-host installation acknowledgement.
614    /// 4. Returns the owned `HostMesh`.
615    ///
616    /// HM-1 / HM-2 / HM-3 / HM-4 (see module docs): if config push
617    /// fails on any host, this returns `Err`. A successful return
618    /// means every attached host installed the propagatable config
619    /// snapshot.
620    pub async fn attach(
621        cx: &impl context::Actor,
622        id: HostMeshId,
623        addresses: Vec<ChannelAddr>,
624    ) -> crate::Result<Self> {
625        let mesh_ref = HostMeshRef::from_hosts(id, addresses);
626        let config = hyperactor_config::global::propagatable_attrs();
627        mesh_ref.push_config(cx, config).await?;
628        Ok(Self::take(mesh_ref))
629    }
630
631    /// Request a clean shutdown of all hosts owned by this
632    /// `HostMesh`.
633    ///
634    /// Uses a two-phase approach:
635    /// 1. **Terminate children** on every host concurrently. Service
636    ///    infrastructure (host agent, comm proc, networking) stays
637    ///    alive so that forwarder flushes can still reach remote hosts.
638    /// 2. **Shut down hosts** concurrently. No user procs remain, so
639    ///    this is fast and cannot deadlock on cross-host flush
640    ///    timeouts.
641    #[hyperactor::instrument(fields(host_mesh=self.id.to_string()))]
642    pub async fn shutdown(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
643        let t0 = std::time::Instant::now();
644        tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
645
646        // Phase 1: terminate all user procs while service infrastructure
647        // stays alive so forwarder flushes can complete across hosts.
648        let results = futures::future::join_all(
649            self.current_ref
650                .values()
651                .map(|host| async move { host.drain(cx, None).await }),
652        )
653        .await;
654        let phase1_ms = t0.elapsed().as_millis();
655        for result in &results {
656            if let Err(e) = result {
657                tracing::warn!(
658                    name = "HostMeshStatus",
659                    status = "Shutdown::Drain::Failed",
660                    error = %e,
661                    "drain failed on a host"
662                );
663            }
664        }
665
666        // Phase 2: shut down hosts concurrently. No user procs remain.
667        let t1 = std::time::Instant::now();
668        let results = futures::future::join_all(self.current_ref.values().map(|host| async move {
669            let result = host.shutdown(cx).await;
670            (host, result)
671        }))
672        .await;
673        let phase2_ms = t1.elapsed().as_millis();
674        let total_ms = t0.elapsed().as_millis();
675        let mut failed_hosts = vec![];
676        for (host, result) in &results {
677            if let Err(e) = result {
678                tracing::warn!(
679                    name = "HostMeshStatus",
680                    status = "Shutdown::Host::Failed",
681                    host = %host,
682                    error = %e,
683                    "host shutdown failed"
684                );
685                failed_hosts.push(host);
686            }
687        }
688        if failed_hosts.is_empty() {
689            tracing::info!(
690                name = "HostMeshStatus",
691                status = "Shutdown::Success",
692                phase1_ms,
693                phase2_ms,
694                total_ms,
695            );
696        } else {
697            tracing::error!(
698                name = "HostMeshStatus",
699                status = "Shutdown::Failed",
700                phase1_ms,
701                phase2_ms,
702                total_ms,
703                "host mesh shutdown failed; check the logs of the failed hosts for details: {:?}",
704                failed_hosts
705            );
706        }
707
708        Ok(())
709    }
710
711    /// Consumes and wraps this HostMesh with a HostMeshShutdownGuard, which will
712    /// ensure shutdown is run on Drop.
713    pub fn shutdown_guard(self) -> HostMeshShutdownGuard {
714        HostMeshShutdownGuard(self)
715    }
716
717    /// Stop all hosts owned by this `HostMesh`, draining user procs
718    /// but keeping worker processes and their sockets alive for
719    /// reconnection.
720    ///
721    /// After `stop`, the same worker addresses can be passed to
722    /// [`HostMesh::attach`] to create a new mesh.
723    #[hyperactor::instrument(fields(host_mesh=self.id.to_string()))]
724    pub async fn stop(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
725        let t0 = std::time::Instant::now();
726        tracing::info!(name = "HostMeshStatus", status = "Stop::Attempt");
727
728        let mesh_id = self.id.clone();
729        let results = futures::future::join_all(self.current_ref.values().map(|host| {
730            let mesh_id = Some(mesh_id.clone());
731            async move { host.drain(cx, mesh_id).await }
732        }))
733        .await;
734        let total_ms = t0.elapsed().as_millis();
735        let mut failed_hosts = vec![];
736        for (i, result) in results.iter().enumerate() {
737            if let Err(e) = result {
738                tracing::warn!(
739                    name = "HostMeshStatus",
740                    status = "Stop::Drain::Failed",
741                    error = %e,
742                    "drain failed on a host"
743                );
744                failed_hosts.push(i);
745            }
746        }
747        if failed_hosts.is_empty() {
748            tracing::info!(name = "HostMeshStatus", status = "Stop::Success", total_ms,);
749        } else {
750            tracing::error!(
751                name = "HostMeshStatus",
752                status = "Stop::Failed",
753                total_ms,
754                "host mesh stop failed; check the logs of the failed hosts for details: {:?}",
755                failed_hosts
756            );
757        }
758
759        // Defuse the Drop impl so it doesn't send ShutdownHost to hosts
760        // we intentionally kept alive.
761        self.owned_hosts.clear();
762
763        Ok(())
764    }
765}
766
767impl HostMesh {
768    /// Set the bootstrap command on the underlying `HostMeshRef`,
769    /// so that future `spawn` calls use it. Unlike
770    /// `HostMeshRef::with_bootstrap` this mutates in place,
771    /// preserving ownership.
772    pub fn set_bootstrap(&mut self, cmd: BootstrapCommand) {
773        self.current_ref = self.current_ref.clone().with_bootstrap(cmd);
774    }
775}
776
777impl Deref for HostMesh {
778    type Target = HostMeshRef;
779
780    fn deref(&self) -> &Self::Target {
781        &self.current_ref
782    }
783}
784
785impl AsRef<HostMeshRef> for HostMesh {
786    fn as_ref(&self) -> &HostMeshRef {
787        self
788    }
789}
790
791impl AsRef<HostMeshRef> for HostMeshRef {
792    fn as_ref(&self) -> &HostMeshRef {
793        self
794    }
795}
796
797/// Wrapper around HostMesh that runs shutdown on Drop.
798pub struct HostMeshShutdownGuard(pub HostMesh);
799
800impl Deref for HostMeshShutdownGuard {
801    type Target = HostMesh;
802
803    fn deref(&self) -> &HostMesh {
804        &self.0
805    }
806}
807
808impl DerefMut for HostMeshShutdownGuard {
809    fn deref_mut(&mut self) -> &mut HostMesh {
810        &mut self.0
811    }
812}
813
814impl Drop for HostMeshShutdownGuard {
815    /// Best-effort cleanup for owned host meshes on drop.
816    ///
817    /// When a `HostMesh` is dropped, it attempts to shut down all
818    /// hosts it owns:
819    /// - If a Tokio runtime is available, we spawn an ephemeral
820    ///   `Proc` + `Instance` and send `ShutdownHost` messages to each
821    ///   host. This ensures that the embedded `BootstrapProcManager`s
822    ///   are dropped, and all child procs they spawned are killed.
823    /// - If no runtime is available, we cannot perform async cleanup
824    ///   here; in that case we log a warning and rely on kernel-level
825    ///   PDEATHSIG or the individual `BootstrapProcManager`'s `Drop`
826    ///   as the final safeguard.
827    ///
828    /// This path is **last resort**: callers should prefer explicit
829    /// [`HostMesh::shutdown`] to guarantee orderly teardown. Drop
830    /// only provides opportunistic cleanup to prevent process leaks
831    /// if shutdown is skipped.
832    fn drop(&mut self) {
833        tracing::info!(
834            name = "HostMeshStatus",
835            host_mesh = %self.0.id,
836            status = "Dropping",
837        );
838        // Snapshot the owned hosts we're responsible for.
839        let hosts: Vec<HostRef> = self.0.owned_hosts.clone();
840
841        // Best-effort only when a Tokio runtime is available.
842        if let Ok(handle) = tokio::runtime::Handle::try_current() {
843            let mesh_id = self.0.id.clone();
844            let span = tracing::info_span!(
845                "hostmesh_drop_cleanup",
846                host_mesh = %mesh_id,
847                hosts = hosts.len(),
848            );
849
850            handle.spawn(
851                async move {
852                    // Spin up a tiny ephemeral proc+instance to get an
853                    // Actor context.
854                    match hyperactor::Proc::direct(
855                        ChannelTransport::Unix.any(),
856                        "hostmesh-drop".to_string(),
857                    ) {
858                        Err(e) => {
859                            tracing::warn!(
860                                error = %e,
861                                "failed to construct ephemeral Proc for drop-cleanup; \
862                                 relying on PDEATHSIG/manager Drop"
863                            );
864                        }
865                        Ok(proc) => match proc.client("drop") {
866                            Err(e) => {
867                                tracing::warn!(
868                                    error = %e,
869                                    "failed to create ephemeral instance for drop-cleanup; \
870                                     relying on PDEATHSIG/manager Drop"
871                                );
872                            }
873                            Ok((instance, _guard)) => {
874                                let mut attempted = 0usize;
875                                let mut ok = 0usize;
876                                let mut err = 0usize;
877
878                                for host in hosts {
879                                    attempted += 1;
880                                    tracing::debug!(host = %host, "drop-cleanup: shutdown start");
881                                    match host.shutdown(&instance).await {
882                                        Ok(()) => {
883                                            ok += 1;
884                                            tracing::debug!(host = %host, "drop-cleanup: shutdown ok");
885                                        }
886                                        Err(e) => {
887                                            err += 1;
888                                            tracing::warn!(host = %host, error = %e, "drop-cleanup: shutdown failed");
889                                        }
890                                    }
891                                }
892
893                                tracing::info!(
894                                    attempted, ok, err,
895                                    "hostmesh drop-cleanup summary"
896                                );
897                            }
898                        },
899                    }
900                }
901                .instrument(span),
902            );
903        } else {
904            // No runtime here; PDEATHSIG and manager Drop remain the
905            // last-resort safety net.
906            tracing::warn!(
907                host_mesh = %self.0.id,
908                hosts = hosts.len(),
909                "HostMesh dropped without a Tokio runtime; skipping \
910                 best-effort shutdown. This indicates that .shutdown() \
911                 on this mesh has not been called before program exit \
912                 (perhaps due to a missing call to \
913                 'monarch.actor.shutdown_context()'?) This in turn can \
914                 lead to backtrace output due to folly SIGTERM \
915                 handlers."
916            );
917        }
918
919        tracing::info!(
920            name = "HostMeshStatus",
921            host_mesh = %self.0.id,
922            status = "Dropped",
923        );
924    }
925}
926
927/// Helper: legacy shim for error types that still require
928/// RankedValues<Status>. TODO(shayne-fletcher): Delete this
929/// shim once Error::ActorSpawnError carries a StatusMesh
930/// (ValueMesh<Status>) directly. At that point, use the mesh
931/// as-is and remove `mesh_to_rankedvalues_*` calls below.
932/// is_sentinel should return true if the value matches a previous filled in
933/// value. If the input value matches the sentinel, it gets replaced with the
934/// default.
935pub(crate) fn mesh_to_rankedvalues_with_default<T, F>(
936    mesh: &ValueMesh<T>,
937    default: T,
938    is_sentinel: F,
939    len: usize,
940) -> RankedValues<T>
941where
942    T: Eq + Clone + 'static,
943    F: Fn(&T) -> bool,
944{
945    let mut out = RankedValues::from((0..len, default));
946    for (i, s) in mesh.values().enumerate() {
947        if !is_sentinel(&s) {
948            out.merge_from(RankedValues::from((i..i + 1, s)));
949        }
950    }
951    out
952}
953
954/// A non-owning reference to a mesh of hosts.
955///
956/// Logically, this is a data structure that contains a set of ranked
957/// hosts organized into a [`Region`]. `HostMeshRef`s can be sliced to
958/// produce new references that contain a subset of the hosts in the
959/// original mesh.
960///
961/// `HostMeshRef`s have a concrete syntax, implemented by its
962/// `Display` and `FromStr` implementations.
963///
964/// This type does **not** control lifecycle. It only describes the
965/// topology of hosts. To take ownership and perform deterministic
966/// teardown, use [`HostMesh::take`], which returns an owned
967/// [`HostMesh`] that guarantees cleanup on `shutdown()` or `Drop`.
968///
969/// Cloning this type does not confer ownership. If a corresponding
970/// owned [`HostMesh`] shuts down the hosts, operations via a cloned
971/// `HostMeshRef` may fail because the hosts are no longer running.
972#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
973pub struct HostMeshRef {
974    id: HostMeshId,
975    region: Region,
976    ranks: Arc<Vec<HostRef>>,
977    /// Uniform bootstrap command to use when spawning procs on this
978    /// mesh. When `None`, each host agent uses its own default
979    /// command. Per-proc overrides are supplied at spawn time via the
980    /// `per_rank_bootstrap` parameter on [`HostMeshRef::spawn`].
981    #[serde(default)]
982    pub bootstrap_command: Option<BootstrapCommand>,
983}
984
985/// A function that produces a per-rank [`BootstrapCommand`], called
986/// once per proc during spawn with that proc's [`view::Point`] over
987/// the combined `host_extent ⊕ per_host` extent. Returning an error
988/// aborts the spawn with that error surfaced as a configuration
989/// failure.
990pub type PerRankBootstrapFn = dyn Fn(view::Point) -> anyhow::Result<BootstrapCommand> + Send + Sync;
991wirevalue::register_type!(HostMeshRef);
992
993impl HostMeshRef {
994    /// Create a new (raw) HostMeshRef from the provided region and associated
995    /// ranks, which must match in cardinality.
996    #[allow(clippy::result_large_err)]
997    fn new(id: HostMeshId, region: Region, ranks: Vec<HostRef>) -> crate::Result<Self> {
998        if region.num_ranks() != ranks.len() {
999            return Err(crate::Error::InvalidRankCardinality {
1000                expected: region.num_ranks(),
1001                actual: ranks.len(),
1002            });
1003        }
1004        Ok(Self {
1005            id,
1006            region,
1007            ranks: Arc::new(ranks),
1008            bootstrap_command: None,
1009        })
1010    }
1011
1012    /// Create a new HostMeshRef from an arbitrary set of hosts. This is meant to
1013    /// enable extrinsic bootstrapping.
1014    pub fn from_hosts(id: HostMeshId, hosts: Vec<ChannelAddr>) -> Self {
1015        Self {
1016            id,
1017            region: extent!(hosts = hosts.len()).into(),
1018            ranks: Arc::new(hosts.into_iter().map(HostRef).collect()),
1019            bootstrap_command: None,
1020        }
1021    }
1022
1023    /// Create a new HostMeshRef from an arbitrary set of host mesh agents.
1024    pub fn from_host_agents(
1025        id: HostMeshId,
1026        agents: Vec<ActorRef<HostAgent>>,
1027    ) -> crate::Result<Self> {
1028        Ok(Self {
1029            id,
1030            region: extent!(hosts = agents.len()).into(),
1031            ranks: Arc::new(
1032                agents
1033                    .into_iter()
1034                    .map(HostRef::try_from)
1035                    .collect::<crate::Result<_>>()?,
1036            ),
1037            bootstrap_command: None,
1038        })
1039    }
1040
1041    /// Create a unit HostMeshRef from a host mesh agent.
1042    pub fn from_host_agent(id: HostMeshId, agent: ActorRef<HostAgent>) -> crate::Result<Self> {
1043        Ok(Self {
1044            id,
1045            region: Extent::unity().into(),
1046            ranks: Arc::new(vec![HostRef::try_from(agent)?]),
1047            bootstrap_command: None,
1048        })
1049    }
1050
1051    /// Return a new `HostMeshRef` that will use `cmd` when spawning procs,
1052    /// overriding the host agent's default bootstrap command.
1053    pub fn with_bootstrap(self, cmd: BootstrapCommand) -> Self {
1054        Self {
1055            bootstrap_command: Some(cmd),
1056            ..self
1057        }
1058    }
1059
1060    /// Returns the host entries as `(addr_string, ActorRef<HostAgent>)` pairs.
1061    /// Used by `MeshAdminAgent::effective_hosts()` to merge C into the
1062    /// admin's host list (see CH-1 in mesh_admin module doc).
1063    pub(crate) fn host_entries(&self) -> Vec<(String, ActorRef<HostAgent>)> {
1064        self.ranks
1065            .iter()
1066            .map(|h| (h.0.to_string(), h.mesh_agent()))
1067            .collect()
1068    }
1069
1070    /// Push client config to all host agents in this mesh, in parallel.
1071    ///
1072    /// Each host installs the attrs as `Source::ClientOverride`.
1073    /// Idempotent: sending the same attrs twice replaces the layer.
1074    ///
1075    /// Implements HM-1, HM-2, HM-3, and HM-4 (see module docs):
1076    /// returns `Err(ConfigPushError)` if any host's push didn't
1077    /// succeed, naming each failing host individually. The outbound
1078    /// request envelope is sent with `return_undeliverable: false` to
1079    /// keep failure in-band on the awaited reply (HM-3 mechanism);
1080    /// per-host wait is bounded by `MESH_ATTACH_CONFIG_TIMEOUT`.
1081    pub(crate) async fn push_config(
1082        &self,
1083        cx: &impl context::Actor,
1084        attrs: hyperactor_config::attrs::Attrs,
1085    ) -> Result<(), ConfigPushError> {
1086        let timeout = hyperactor_config::global::get(crate::config::MESH_ATTACH_CONFIG_TIMEOUT);
1087        let hosts: Vec<_> = self.values().collect();
1088
1089        let per_host = hosts.into_iter().map(|host| {
1090            let attrs = attrs.clone();
1091            async move {
1092                (
1093                    host.clone(),
1094                    push_config_to_host(cx, &host, attrs, timeout).await,
1095                )
1096            }
1097        });
1098
1099        let results = futures::future::join_all(per_host).await;
1100
1101        let mut failures = Vec::new();
1102        let mut success = 0_usize;
1103        for (host, outcome) in results {
1104            match outcome {
1105                Ok(()) => {
1106                    success += 1;
1107                    tracing::debug!(host = %host, "host agent config installed");
1108                }
1109                Err(failure) => {
1110                    tracing::warn!(host = %host, error = %failure, "config push failed");
1111                    failures.push((host, failure));
1112                }
1113            }
1114        }
1115
1116        if failures.is_empty() {
1117            tracing::info!(success, "push_config complete");
1118            Ok(())
1119        } else {
1120            tracing::info!(
1121                success,
1122                failed = failures.len(),
1123                "push_config complete with failures",
1124            );
1125            Err(ConfigPushError { failures })
1126        }
1127    }
1128
1129    /// Spawn a ProcMesh onto this host mesh. The per_host extent specifies the shape
1130    /// of the procs to spawn on each host.
1131    ///
1132    /// `proc_bind`, when provided, is a per-process CPU/NUMA binding
1133    /// configuration. Its length must equal the number of ranks in
1134    /// `per_host`. Each entry maps binding keys (`cpunodebind`,
1135    /// `membind`, `physcpubind`, `cpus`) to their values.
1136    /// Only takes effect when running on Linux.
1137    ///
1138    /// `per_rank_bootstrap`, when provided, is a function called once
1139    /// per proc to produce that proc's [`BootstrapCommand`]. The
1140    /// function receives a [`view::Point`] over the combined
1141    /// `host_extent ⊕ per_host` extent. Its return value takes
1142    /// precedence over `self.bootstrap_command` for that proc only.
1143    ///
1144    /// Currently, spawn issues direct calls to each host agent. This will be fixed by
1145    /// maintaining a comm actor on the host service procs themselves.
1146    #[allow(clippy::result_large_err)]
1147    pub async fn spawn<C: context::Actor>(
1148        &self,
1149        cx: &C,
1150        name: &str,
1151        per_host: Extent,
1152        proc_bind: Option<Vec<ProcBind>>,
1153        per_rank_bootstrap: Option<Box<PerRankBootstrapFn>>,
1154    ) -> crate::Result<ProcMesh>
1155    where
1156        C::A: Handler<MeshFailure>,
1157    {
1158        self.spawn_inner(
1159            cx,
1160            ProcMeshId::instance(Label::strip(name)),
1161            per_host,
1162            proc_bind,
1163            per_rank_bootstrap,
1164        )
1165        .await
1166    }
1167
1168    #[hyperactor::instrument(fields(host_mesh=self.id.to_string(), proc_mesh=proc_mesh_id.to_string()))]
1169    async fn spawn_inner<C: context::Actor>(
1170        &self,
1171        cx: &C,
1172        proc_mesh_id: ProcMeshId,
1173        per_host: Extent,
1174        proc_bind: Option<Vec<ProcBind>>,
1175        per_rank_bootstrap: Option<Box<PerRankBootstrapFn>>,
1176    ) -> crate::Result<ProcMesh>
1177    where
1178        C::A: Handler<MeshFailure>,
1179    {
1180        tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt");
1181        tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",);
1182        let result = self
1183            .spawn_inner_inner(cx, proc_mesh_id, per_host, proc_bind, per_rank_bootstrap)
1184            .await;
1185        match &result {
1186            Ok(_) => {
1187                tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success");
1188                tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success");
1189            }
1190            Err(error) => {
1191                tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error);
1192                tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error);
1193            }
1194        }
1195        result
1196    }
1197
1198    async fn spawn_inner_inner<C: context::Actor>(
1199        &self,
1200        cx: &C,
1201        proc_mesh_id: ProcMeshId,
1202        per_host: Extent,
1203        proc_bind: Option<Vec<ProcBind>>,
1204        per_rank_bootstrap: Option<Box<PerRankBootstrapFn>>,
1205    ) -> crate::Result<ProcMesh>
1206    where
1207        C::A: Handler<MeshFailure>,
1208    {
1209        let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
1210        let host_labels = self.region.labels().iter().collect::<HashSet<_>>();
1211        if !per_host_labels
1212            .intersection(&host_labels)
1213            .collect::<Vec<_>>()
1214            .is_empty()
1215        {
1216            return Err(crate::Error::ConfigurationError(anyhow::anyhow!(
1217                "per_host dims overlap with existing dims when spawning proc mesh"
1218            )));
1219        }
1220        if let Some(proc_bind) = proc_bind.as_ref()
1221            && proc_bind.len() != per_host.num_ranks()
1222        {
1223            return Err(crate::Error::ConfigurationError(anyhow::anyhow!(
1224                "proc_bind length does not match per_host extent"
1225            )));
1226        }
1227
1228        let extent = self
1229            .region
1230            .extent()
1231            .concat(&per_host)
1232            .map_err(|err| crate::Error::ConfigurationError(err.into()))?;
1233
1234        let region: Region = extent.clone().into();
1235
1236        tracing::info!(
1237            name = "ProcMeshStatus",
1238            status = "Spawn::Attempt",
1239            %region,
1240            "spawning proc mesh"
1241        );
1242
1243        let mut procs = Vec::new();
1244        let num_ranks = region.num_ranks();
1245        // Accumulator outputs full StatusMesh snapshots; seed with
1246        // NotExist.
1247        let (port, rx) = cx.mailbox().open_accum_port_opts(
1248            crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1249            StreamingReducerOpts {
1250                max_update_interval: Some(Duration::from_millis(50)),
1251                initial_update_interval: None,
1252            },
1253        );
1254
1255        // Create or update each proc, then fence on receiving status
1256        // overlays. This prevents a race where procs become
1257        // addressable before their local muxers are ready, which
1258        // could make early messages unroutable. A future improvement
1259        // would allow buffering in the host-level muxer to eliminate
1260        // the need for this synchronization step.
1261        let mut proc_names = Vec::new();
1262        let client_config_override = hyperactor_config::global::propagatable_attrs();
1263        for (host_rank, host) in self.ranks.iter().enumerate() {
1264            for per_host_rank in 0..per_host.num_ranks() {
1265                let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
1266                let proc_name = ResourceId::instance(Label::strip(&format!(
1267                    "{}-{}",
1268                    proc_mesh_id
1269                        .display_label()
1270                        .map(|l| l.as_str())
1271                        .unwrap_or("unnamed"),
1272                    per_host_rank
1273                )));
1274                proc_names.push(proc_name.clone());
1275                let bind = proc_bind.as_ref().map(|v| v[per_host_rank].clone());
1276                let bootstrap_command = match per_rank_bootstrap.as_ref() {
1277                    Some(f) => Some(
1278                        f(extent
1279                            .point_of_rank(create_rank)
1280                            .expect("rank in combined extent"))
1281                        .map_err(crate::Error::ConfigurationError)?,
1282                    ),
1283                    None => self.bootstrap_command.clone(),
1284                };
1285                let proc_spec = resource::ProcSpec {
1286                    client_config_override: client_config_override.clone(),
1287                    bootstrap_command,
1288                    proc_bind: bind,
1289                    host_mesh_id: Some(self.id.clone()),
1290                };
1291                host.mesh_agent()
1292                    .create_or_update(
1293                        cx,
1294                        proc_name.clone(),
1295                        resource::Rank::new(create_rank),
1296                        proc_spec,
1297                    )
1298                    .await
1299                    .map_err(|e| {
1300                        crate::Error::HostMeshAgentConfigurationError(
1301                            host.mesh_agent().actor_addr().clone(),
1302                            format!("failed while creating proc: {}", e),
1303                        )
1304                    })?;
1305                let mut reply_port = port.bind();
1306                // If this proc dies or some other issue renders the reply undeliverable,
1307                // the reply does not need to be returned to the sender.
1308                reply_port.return_undeliverable(false);
1309                host.mesh_agent()
1310                    .get_rank_status(cx, proc_name.clone(), reply_port)
1311                    .await
1312                    .map_err(|e| {
1313                        crate::Error::HostMeshAgentConfigurationError(
1314                            host.mesh_agent().actor_addr().clone(),
1315                            format!("failed while querying proc status: {}", e),
1316                        )
1317                    })?;
1318                let proc_id = host.named_proc(&proc_name);
1319                tracing::info!(
1320                    name = "ProcMeshStatus",
1321                    status = "Spawn::CreatingProc",
1322                    %proc_id,
1323                    rank = create_rank,
1324                );
1325                procs.push(crate::proc_mesh::ProcRef::new(
1326                    proc_id,
1327                    create_rank,
1328                    // TODO: specify or retrieve from state instead, to avoid attestation.
1329                    ActorRef::attest(
1330                        host.named_proc(&proc_name)
1331                            .actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME),
1332                    ),
1333                ));
1334            }
1335        }
1336
1337        let start_time = tokio::time::Instant::now();
1338
1339        // Wait on accumulated StatusMesh snapshots until complete or
1340        // timeout.
1341        match GetRankStatus::wait(
1342            rx,
1343            num_ranks,
1344            hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1345            region.clone(), // fallback mesh if nothing arrives
1346        )
1347        .await
1348        {
1349            Ok(statuses) => {
1350                // If any rank is terminating, surface a
1351                // ProcCreationError pointing at that rank.
1352                if let Some((rank, status)) = statuses
1353                    .values()
1354                    .enumerate()
1355                    .find(|(_, s)| s.is_terminating())
1356                {
1357                    let proc_name = &proc_names[rank];
1358                    let host_rank = rank / per_host.num_ranks();
1359                    let mesh_agent = self.ranks[host_rank].mesh_agent();
1360                    let (reply_tx, mut reply_rx) = cx.mailbox().open_port();
1361                    let mut reply_tx = reply_tx.bind();
1362                    // If this proc dies or some other issue renders the reply undeliverable,
1363                    // the reply does not need to be returned to the sender.
1364                    reply_tx.return_undeliverable(false);
1365                    mesh_agent.post(
1366                        cx,
1367                        resource::GetState {
1368                            id: proc_name.clone(),
1369                            reply: reply_tx,
1370                        },
1371                    );
1372                    let state = match tokio::time::timeout(
1373                        hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1374                        reply_rx.recv(),
1375                    )
1376                    .await
1377                    {
1378                        Ok(Ok(state)) => state,
1379                        _ => resource::State {
1380                            id: proc_name.clone(),
1381                            status,
1382                            state: None,
1383                            generation: 0,
1384                            timestamp: std::time::SystemTime::now(),
1385                        },
1386                    };
1387
1388                    tracing::error!(
1389                        name = "ProcMeshStatus",
1390                        status = "Spawn::GetRankStatus",
1391                        rank = host_rank,
1392                        "rank {} is terminating with state: {}",
1393                        host_rank,
1394                        state
1395                    );
1396
1397                    return Err(crate::Error::ProcCreationError {
1398                        state: Box::new(state),
1399                        host_rank,
1400                        mesh_agent,
1401                    });
1402                }
1403            }
1404            Err(complete) => {
1405                tracing::error!(
1406                    name = "ProcMeshStatus",
1407                    status = "Spawn::GetRankStatus",
1408                    "timeout after {:?} when waiting for procs being created",
1409                    hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1410                );
1411                // Fill remaining ranks with a timeout status via the
1412                // legacy shim.
1413                let legacy = mesh_to_rankedvalues_with_default(
1414                    &complete,
1415                    Status::Timeout(start_time.elapsed()),
1416                    Status::is_not_exist,
1417                    num_ranks,
1418                );
1419                return Err(crate::Error::ProcSpawnError { statuses: legacy });
1420            }
1421        }
1422
1423        let mut mesh = ProcMesh::create(cx, proc_mesh_id, extent, self.clone(), procs).await;
1424        if let Ok(ref mut mesh) = mesh {
1425            // Spawn a unique mesh controller for each proc mesh, so the type of the
1426            // mesh can be preserved. Procs reached a non-terminating state above,
1427            // so seed the controller's per-rank statuses as Running.
1428            let mesh_ref: ProcMeshRef = (**mesh).clone();
1429            let region = ndslice::view::Ranked::region(&mesh_ref).clone();
1430            let initial_statuses: crate::ValueMesh<resource::Status> =
1431                std::iter::repeat_n(resource::Status::Running, region.num_ranks())
1432                    .collect_mesh::<crate::ValueMesh<_>>(region)?;
1433            let controller = ProcMeshController::new(mesh_ref, None, None, initial_statuses);
1434            // hyperactor::proc AI-3: controller name must include mesh
1435            // identity for proc-wide ActorAddr uniqueness.
1436            let controller_name = format!("{}_{}", PROC_MESH_CONTROLLER_NAME, mesh.id());
1437            let controller_handle =
1438                controller
1439                    .spawn_with_name(cx, &controller_name)
1440                    .map_err(|e| {
1441                        crate::Error::ControllerActorSpawnError(mesh.id().resource_id().clone(), e)
1442                    })?;
1443            // Bind the actor's well-known ports (Signal, IntrospectMessage,
1444            // Undeliverable). Without this, the controller's mailbox has no
1445            // port entries and messages (including introspection queries)
1446            // are returned as undeliverable.
1447            let controller_ref: ActorRef<ProcMeshController> = controller_handle.bind();
1448            mesh.set_controller(Some(controller_ref));
1449        }
1450        mesh
1451    }
1452
1453    /// The identity of the referenced host mesh.
1454    pub fn id(&self) -> &HostMeshId {
1455        &self.id
1456    }
1457
1458    /// The host references (channel addresses) in rank order.
1459    pub fn hosts(&self) -> &[HostRef] {
1460        &self.ranks
1461    }
1462
1463    /// Stop every proc in this proc mesh.
1464    ///
1465    /// On success returns the final per-rank `StatusMesh`, in which every
1466    /// rank is guaranteed to be `is_terminated()` (`Stopped`, `Failed`, or
1467    /// `Timeout`). Callers can apply these statuses to controller health
1468    /// state so that subsequent `GetState` queries reflect reality.
1469    ///
1470    /// Returns `crate::Error::ProcMeshStopError` if any rank did not reach
1471    /// a terminal state within `PROC_STOP_MAX_IDLE`; the error carries the
1472    /// best-known per-rank statuses for the same purpose.
1473    #[hyperactor::instrument(fields(host_mesh=self.id.to_string(), proc_mesh=proc_mesh_id.to_string()))]
1474    pub(crate) async fn stop_proc_mesh(
1475        &self,
1476        cx: &impl hyperactor::context::Actor,
1477        proc_mesh_id: &ProcMeshId,
1478        procs: impl IntoIterator<Item = ProcAddr>,
1479        region: Region,
1480        reason: String,
1481    ) -> crate::Result<crate::StatusMesh> {
1482        // Accumulator outputs full StatusMesh snapshots; seed with
1483        // NotExist.
1484        let mut proc_names = Vec::new();
1485        let num_ranks = region.num_ranks();
1486        // Accumulator outputs full StatusMesh snapshots; seed with
1487        // NotExist.
1488        let (port, rx) = cx.mailbox().open_accum_port_opts(
1489            crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1490            StreamingReducerOpts {
1491                max_update_interval: Some(Duration::from_millis(50)),
1492                initial_update_interval: None,
1493            },
1494        );
1495        for proc_id in procs.into_iter() {
1496            let addr = proc_id.addr().clone();
1497            // The name stored in HostAgent is not the same as the
1498            // one stored in the ProcMesh. We instead take each proc id
1499            // and map it to that particular agent.
1500            let proc_resource_id = ResourceId::new(proc_id.uid().clone(), proc_id.label().cloned());
1501            proc_names.push(proc_resource_id.clone());
1502
1503            // Note that we don't send 1 message per host agent, we send 1 message
1504            // per proc.
1505            let host = HostRef(addr);
1506            host.mesh_agent().post(
1507                cx,
1508                resource::Stop {
1509                    id: proc_resource_id.clone(),
1510                    reason: reason.clone(),
1511                },
1512            );
1513            host.mesh_agent()
1514                .wait_rank_status(cx, proc_resource_id, Status::Stopped, port.bind())
1515                .await
1516                .map_err(|e| crate::Error::CallError(host.mesh_agent().actor_addr().clone(), e))?;
1517
1518            tracing::info!(
1519                name = "ProcMeshStatus",
1520                %proc_id,
1521                status = "Stop::Sent",
1522            );
1523        }
1524        tracing::info!(
1525            name = "HostMeshStatus",
1526            status = "ProcMesh::Stop::Sent",
1527            "sending Stop to proc mesh for {} procs: {}",
1528            proc_names.len(),
1529            proc_names
1530                .iter()
1531                .map(|n| n.to_string())
1532                .collect::<Vec<_>>()
1533                .join(", ")
1534        );
1535
1536        let start_time = tokio::time::Instant::now();
1537
1538        match GetRankStatus::wait(
1539            rx,
1540            num_ranks,
1541            hyperactor_config::global::get(PROC_STOP_MAX_IDLE),
1542            region.clone(), // fallback mesh if nothing arrives
1543        )
1544        .await
1545        {
1546            Ok(statuses) => {
1547                let all_stopped = statuses.values().all(|s| s.is_terminated());
1548                if !all_stopped {
1549                    let legacy = mesh_to_rankedvalues_with_default(
1550                        &statuses,
1551                        Status::NotExist,
1552                        Status::is_not_exist,
1553                        num_ranks,
1554                    );
1555                    tracing::error!(
1556                        name = "ProcMeshStatus",
1557                        status = "FailedToStop",
1558                        "failed to terminate proc mesh: {:?}",
1559                        statuses,
1560                    );
1561                    return Err(crate::Error::ProcMeshStopError { statuses: legacy });
1562                }
1563                tracing::info!(name = "ProcMeshStatus", status = "Stopped");
1564                Ok(statuses)
1565            }
1566            Err(complete) => {
1567                // Fill remaining ranks with a timeout status via the
1568                // legacy shim.
1569                let legacy = mesh_to_rankedvalues_with_default(
1570                    &complete,
1571                    Status::Timeout(start_time.elapsed()),
1572                    Status::is_not_exist,
1573                    num_ranks,
1574                );
1575                tracing::error!(
1576                    name = "ProcMeshStatus",
1577                    status = "StoppingTimeout",
1578                    "failed to terminate proc mesh {} before timeout: {:?}",
1579                    proc_mesh_id,
1580                    legacy,
1581                );
1582                Err(crate::Error::ProcMeshStopError { statuses: legacy })
1583            }
1584        }
1585    }
1586
1587    /// Get the state of all procs with Name in this host mesh.
1588    /// The procs iterator must be in rank order.
1589    /// The returned ValueMesh will have a non-empty inner state unless there
1590    /// was a timeout reaching the host mesh agent.
1591    ///
1592    /// If `keepalive` is `Some`, send `KeepaliveGetState` so the host agent
1593    /// extends each proc's expiry time; otherwise send a plain `GetState`.
1594    #[allow(clippy::result_large_err)]
1595    pub(crate) async fn proc_states(
1596        &self,
1597        cx: &impl context::Actor,
1598        procs: impl IntoIterator<Item = ProcAddr>,
1599        region: Region,
1600        keepalive: Option<std::time::SystemTime>,
1601    ) -> crate::Result<ValueMesh<resource::State<ProcState>>> {
1602        let (tx, mut rx) = cx.mailbox().open_port();
1603
1604        let mut num_ranks = 0;
1605        let procs: Vec<ProcAddr> = procs.into_iter().collect();
1606        let mut proc_names = Vec::new();
1607        for proc_id in procs.iter() {
1608            num_ranks += 1;
1609            let addr = proc_id.addr().clone();
1610
1611            // Note that we don't send 1 message per host agent, we send 1 message
1612            // per proc.
1613            let host = HostRef(addr);
1614            let proc_resource_id = ResourceId::new(proc_id.uid().clone(), proc_id.label().cloned());
1615            proc_names.push(proc_resource_id.clone());
1616            let mut reply = tx.bind();
1617            // If this proc dies or some other issue renders the reply undeliverable,
1618            // the reply does not need to be returned to the sender.
1619            reply.return_undeliverable(false);
1620            let mut send_port = host.mesh_agent().port();
1621            // If the message is undeliverable, the timeout below will catch the issue, and the caller
1622            // can handle the error as it pleases. Set this so an undeliverable message doesn't cause
1623            // a supervision crash.
1624            send_port.return_undeliverable(false);
1625            let get_state = resource::GetState {
1626                id: proc_resource_id,
1627                reply,
1628            };
1629            if let Some(expires_after) = keepalive {
1630                let mut keepalive_port = host.mesh_agent().port();
1631                keepalive_port.return_undeliverable(false);
1632                keepalive_port.post(
1633                    cx,
1634                    resource::KeepaliveGetState {
1635                        expires_after,
1636                        get_state,
1637                    },
1638                );
1639            } else {
1640                send_port.post(cx, get_state);
1641            }
1642        }
1643
1644        let mut states = Vec::with_capacity(num_ranks);
1645        let timeout = hyperactor_config::global::get(GET_PROC_STATE_MAX_IDLE);
1646        for _ in 0..num_ranks {
1647            // The agent runs on the same process as the running actor, so if some
1648            // fatal event caused the process to crash (e.g. OOM, signal, process exit),
1649            // the agent will be unresponsive.
1650            // We handle this by setting a timeout on the recv, and if we don't get a
1651            // message we assume the agent is dead and return a failed state.
1652            let state = tokio::time::timeout(timeout, rx.recv()).await;
1653            if let Ok(state) = state {
1654                // Handle non-timeout receiver error.
1655                let state = state?;
1656                match state.state {
1657                    Some(ref inner) => {
1658                        states.push((inner.create_rank, state));
1659                    }
1660                    None => {
1661                        return Err(crate::Error::NotExist(state.id));
1662                    }
1663                }
1664            } else {
1665                // Timeout error, stop reading from the receiver and send back what we have so far,
1666                // padding with failed states.
1667                tracing::warn!(
1668                    "Timeout waiting for response from host mesh agent for proc_states after {:?}",
1669                    timeout
1670                );
1671                let all_ranks = (0..num_ranks).collect::<HashSet<_>>();
1672                let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
1673                let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
1674                assert_eq!(leftover_ranks.len(), num_ranks - states.len());
1675                while states.len() < num_ranks {
1676                    let rank = *leftover_ranks
1677                        .pop()
1678                        .expect("leftover ranks should not be empty");
1679                    states.push((
1680                        // We populate with any ranks leftover at the time of the timeout.
1681                        rank,
1682                        resource::State {
1683                            id: proc_names[rank].clone(),
1684                            status: resource::Status::Timeout(timeout),
1685                            state: None,
1686                            generation: 0,
1687                            timestamp: std::time::SystemTime::now(),
1688                        },
1689                    ));
1690                }
1691                break;
1692            }
1693        }
1694        // Ensure that all ranks have replied. Note that if the mesh is sliced,
1695        // not all create_ranks may be in the mesh.
1696        // Sort by rank, so that the resulting mesh is ordered.
1697        states.sort_by_key(|(rank, _)| *rank);
1698        let vm = states
1699            .into_iter()
1700            .map(|(_, state)| state)
1701            .collect_mesh::<ValueMesh<_>>(region)?;
1702        Ok(vm)
1703    }
1704}
1705
1706/// An ordered set of host entries, deduplicated by `HostAgent` `ActorAddr`
1707/// in first-seen order.
1708///
1709/// Insertion is idempotent by construction — SA-3 (dedup by ActorAddr)
1710/// is a property of this type, not a comment on careful control flow.
1711/// First-seen order is preserved: the first occurrence of a given
1712/// ActorAddr wins; subsequent duplicates are silently dropped.
1713struct HostSet {
1714    seen: HashSet<ActorAddr>,
1715    entries: Vec<(String, ActorRef<HostAgent>)>,
1716}
1717
1718impl HostSet {
1719    fn new() -> Self {
1720        Self {
1721            seen: HashSet::new(),
1722            entries: Vec::new(),
1723        }
1724    }
1725
1726    /// Insert a host entry. No-op if `ActorAddr` already present (SA-3).
1727    /// First-seen order is preserved.
1728    fn insert(&mut self, addr: String, agent_ref: ActorRef<HostAgent>) {
1729        if self.seen.insert(agent_ref.actor_addr().clone()) {
1730            self.entries.push((addr, agent_ref));
1731        }
1732    }
1733
1734    /// Extend from a `HostMeshRef`. SA-3 applies per entry.
1735    fn extend_from_mesh(&mut self, mesh: &HostMeshRef) {
1736        for h in mesh.hosts() {
1737            self.insert(h.0.to_string(), h.mesh_agent());
1738        }
1739    }
1740
1741    fn into_vec(self) -> Vec<(String, ActorRef<HostAgent>)> {
1742        self.entries
1743    }
1744}
1745
1746/// Ordered union of hosts from meshes and optional client host
1747/// entries, deduplicated by `HostAgent` `ActorAddr` in first-seen
1748/// order.
1749///
1750/// SA-3 dedup and SA-6 client-host merge are structural properties
1751/// of [`HostSet`], not invariants on this function's control flow.
1752fn aggregate_hosts(
1753    meshes: &[impl AsRef<HostMeshRef>],
1754    client_host_entries: Option<Vec<(String, ActorRef<HostAgent>)>>,
1755) -> Vec<(String, ActorRef<HostAgent>)> {
1756    let mut set = HostSet::new();
1757
1758    // SA-3: dedup across all mesh hosts in first-seen order.
1759    for mesh in meshes {
1760        set.extend_from_mesh(mesh.as_ref());
1761    }
1762
1763    // CH-1 / SA-6: client host entries merged after mesh aggregation.
1764    if let Some(entries) = client_host_entries {
1765        for (addr, agent_ref) in entries {
1766            set.insert(addr, agent_ref);
1767        }
1768    }
1769
1770    set.into_vec()
1771}
1772
1773/// Spawn a [`MeshAdminAgent`] that aggregates hosts from multiple
1774/// meshes.
1775///
1776/// The admin agent runs on the caller's local proc — the `Proc` of
1777/// the actor context `cx`. Hosts are deduplicated by actor ID across
1778/// all meshes.
1779///
1780/// Spawn a `MeshAdminAgent` aggregating topology across one or more
1781/// meshes. Returns a typed `ActorRef<MeshAdminAgent>`. Callers that
1782/// need the admin URL query it via `get_admin_addr`.
1783///
1784/// See the `mesh_admin` module doc for the SA-* (spawn/aggregation),
1785/// CH-* (client host), and AI-* (admin identity) invariants.
1786pub async fn spawn_admin(
1787    meshes: impl IntoIterator<Item = impl AsRef<HostMeshRef>>,
1788    cx: &impl hyperactor::context::Actor,
1789    admin_addr: Option<std::net::SocketAddr>,
1790    telemetry_url: Option<String>,
1791) -> anyhow::Result<ActorRef<MeshAdminAgent>> {
1792    let meshes: Vec<_> = meshes.into_iter().collect();
1793    anyhow::ensure!(!meshes.is_empty(), "at least one mesh is required (SA-1)");
1794    for (i, mesh) in meshes.iter().enumerate() {
1795        anyhow::ensure!(
1796            !mesh.as_ref().hosts().is_empty(),
1797            "mesh at index {} has no hosts (SA-2)",
1798            i,
1799        );
1800    }
1801
1802    let client_entries =
1803        crate::global_context::try_this_host().map(|client_host| client_host.host_entries());
1804    let hosts = aggregate_hosts(&meshes, client_entries);
1805
1806    let root_client_id = cx.mailbox().actor_addr().clone();
1807
1808    // Spawn the admin on the caller's local proc. Placement now
1809    // follows the caller context rather than mesh topology.
1810    let local_proc = cx.instance().proc();
1811    let agent_handle = local_proc.spawn(
1812        crate::mesh_admin::MESH_ADMIN_ACTOR_NAME,
1813        crate::mesh_admin::MeshAdminAgent::new(
1814            hosts,
1815            Some(root_client_id),
1816            admin_addr,
1817            telemetry_url,
1818        ),
1819    )?;
1820    let admin_ref = agent_handle.bind();
1821    Ok(admin_ref)
1822}
1823
1824impl view::Ranked for HostMeshRef {
1825    type Item = HostRef;
1826
1827    fn region(&self) -> &Region {
1828        &self.region
1829    }
1830
1831    fn get(&self, rank: usize) -> Option<&Self::Item> {
1832        self.ranks.get(rank)
1833    }
1834}
1835
1836impl view::RankedSliceable for HostMeshRef {
1837    fn sliced(&self, region: Region) -> Self {
1838        let ranks = self
1839            .region()
1840            .remap(&region)
1841            .unwrap()
1842            .map(|index| self.get(index).unwrap().clone());
1843        Self {
1844            bootstrap_command: self.bootstrap_command.clone(),
1845            ..Self::new(self.id.clone(), region, ranks.collect()).unwrap()
1846        }
1847    }
1848}
1849
1850impl std::fmt::Display for HostMeshRef {
1851    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1852        write!(f, "{}:", self.id)?;
1853        for (rank, host) in self.ranks.iter().enumerate() {
1854            if rank > 0 {
1855                write!(f, ",")?;
1856            }
1857            write!(f, "{}", host)?;
1858        }
1859        write!(f, "@{}", self.region)
1860    }
1861}
1862
1863/// The type of error occuring during `HostMeshRef` parsing.
1864#[derive(thiserror::Error, Debug)]
1865pub enum HostMeshRefParseError {
1866    #[error(transparent)]
1867    RegionParseError(#[from] RegionParseError),
1868
1869    #[error("invalid host mesh ref: missing region")]
1870    MissingRegion,
1871
1872    #[error("invalid host mesh ref: missing id")]
1873    MissingId,
1874
1875    #[error(transparent)]
1876    InvalidId(#[from] crate::mesh_id::ResourceIdParseError),
1877
1878    #[error(transparent)]
1879    InvalidHostMeshRef(#[from] Box<crate::Error>),
1880
1881    #[error(transparent)]
1882    Other(#[from] anyhow::Error),
1883}
1884
1885impl From<crate::Error> for HostMeshRefParseError {
1886    fn from(err: crate::Error) -> Self {
1887        Self::InvalidHostMeshRef(Box::new(err))
1888    }
1889}
1890
1891impl FromStr for HostMeshRef {
1892    type Err = HostMeshRefParseError;
1893
1894    fn from_str(s: &str) -> Result<Self, Self::Err> {
1895        let (id_str, rest) = s.split_once(':').ok_or(HostMeshRefParseError::MissingId)?;
1896
1897        let id = HostMeshId::from_str(id_str)?;
1898
1899        let (hosts, region) = rest
1900            .split_once('@')
1901            .ok_or(HostMeshRefParseError::MissingRegion)?;
1902        let hosts = hosts
1903            .split(',')
1904            .map(|host| host.trim())
1905            .map(|host| host.parse::<HostRef>())
1906            .collect::<Result<Vec<_>, _>>()?;
1907        let region = region.parse()?;
1908        Ok(HostMeshRef::new(id, region, hosts)?)
1909    }
1910}
1911
1912#[cfg(test)]
1913mod tests {
1914    #[cfg(fbcode_build)]
1915    use std::assert_matches;
1916
1917    #[cfg(fbcode_build)]
1918    use hyperactor::config::ENABLE_DEST_ACTOR_REORDERING_BUFFER;
1919    #[cfg(fbcode_build)]
1920    use hyperactor_config::attrs::Attrs;
1921    use ndslice::ViewExt;
1922    use ndslice::extent;
1923    #[cfg(fbcode_build)]
1924    use timed_test::assert_no_process_leak;
1925    #[cfg(fbcode_build)]
1926    use tokio::process::Command;
1927
1928    use super::*;
1929    #[cfg(fbcode_build)]
1930    use crate::ActorMesh;
1931    #[cfg(fbcode_build)]
1932    use crate::Bootstrap;
1933    #[cfg(fbcode_build)]
1934    use crate::bootstrap::MESH_TAIL_LOG_LINES;
1935    #[cfg(fbcode_build)]
1936    use crate::comm::ENABLE_NATIVE_V1_CASTING;
1937    #[cfg(fbcode_build)]
1938    use crate::resource::Status;
1939    #[cfg(fbcode_build)]
1940    use crate::testactor;
1941    #[cfg(fbcode_build)]
1942    use crate::testactor::GetConfigAttrs;
1943    #[cfg(fbcode_build)]
1944    use crate::testactor::SetConfigAttrs;
1945    use crate::testing;
1946
1947    #[test]
1948    fn test_host_mesh_subset() {
1949        let hosts: HostMeshRef = "test:local:1,local:2,local:3,local:4@replica=2/2,host=2/1"
1950            .parse()
1951            .unwrap();
1952        assert_eq!(
1953            hosts.range("replica", 1).unwrap().to_string(),
1954            "test:local:3,local:4@2+replica=1/2,host=2/1"
1955        );
1956    }
1957
1958    #[test]
1959    fn test_host_mesh_ref_parse_roundtrip() {
1960        let host_mesh_ref = HostMeshRef::new(
1961            HostMeshId::singleton(Label::new("test").unwrap()),
1962            extent!(replica = 2, host = 2).into(),
1963            vec![
1964                "tcp:127.0.0.1:123".parse().unwrap(),
1965                "tcp:127.0.0.1:123".parse().unwrap(),
1966                "tcp:127.0.0.1:123".parse().unwrap(),
1967                "tcp:127.0.0.1:123".parse().unwrap(),
1968            ],
1969        )
1970        .unwrap();
1971
1972        let parsed: HostMeshRef = host_mesh_ref.to_string().parse().unwrap();
1973        assert_eq!(parsed.id().to_string(), host_mesh_ref.id().to_string());
1974        assert_eq!(parsed.region(), host_mesh_ref.region());
1975        assert_eq!(parsed.hosts(), host_mesh_ref.hosts());
1976        assert_eq!(parsed.bootstrap_command, host_mesh_ref.bootstrap_command);
1977    }
1978
1979    /// Allocate a new port on localhost. This drops the listener, releasing the socket,
1980    /// before returning. Hyperactor's channel::net applies SO_REUSEADDR, so we do not hav
1981    /// to wait out the socket's TIMED_WAIT state.
1982    ///
1983    /// Even so, this is racy.
1984    fn free_localhost_addr() -> ChannelAddr {
1985        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1986        ChannelAddr::Tcp(listener.local_addr().unwrap())
1987    }
1988
1989    #[cfg(fbcode_build)]
1990    async fn execute_extrinsic_allocation(config: &hyperactor_config::global::ConfigLock) {
1991        let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1992
1993        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1994
1995        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1996
1997        let mut children = Vec::new();
1998        for host in hosts.iter() {
1999            let mut cmd = Command::new(program.clone());
2000            let boot = Bootstrap::Host {
2001                addr: host.clone(),
2002                command: None, // use current binary
2003                config: None,
2004                exit_on_shutdown: false,
2005            };
2006            boot.to_env(&mut cmd);
2007            cmd.kill_on_drop(true);
2008            children.push(cmd.spawn().unwrap());
2009        }
2010
2011        let instance = testing::instance();
2012        let host_mesh =
2013            HostMeshRef::from_hosts(HostMeshId::singleton(Label::new("test").unwrap()), hosts);
2014
2015        let proc_mesh = host_mesh
2016            .spawn(&testing::instance(), "test", Extent::unity(), None, None)
2017            .await
2018            .unwrap();
2019
2020        let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
2021            .spawn(&testing::instance(), "test", &())
2022            .await
2023            .unwrap();
2024
2025        testactor::assert_mesh_shape(actor_mesh).await;
2026
2027        HostMesh::take(host_mesh)
2028            .shutdown(&instance)
2029            .await
2030            .expect("hosts shutdown");
2031    }
2032
2033    #[tokio::test]
2034    #[cfg(fbcode_build)]
2035    async fn test_extrinsic_allocation_v0() {
2036        let config = hyperactor_config::global::lock();
2037        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
2038        execute_extrinsic_allocation(&config).await;
2039    }
2040
2041    #[tokio::test]
2042    #[cfg(fbcode_build)]
2043    async fn test_extrinsic_allocation_v1() {
2044        let config = hyperactor_config::global::lock();
2045        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
2046        let _guard1 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
2047        execute_extrinsic_allocation(&config).await;
2048    }
2049
2050    #[tokio::test]
2051    #[cfg(fbcode_build)]
2052    async fn test_failing_proc_allocation() {
2053        let lock = hyperactor_config::global::lock();
2054        let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
2055
2056        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
2057
2058        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
2059
2060        let mut children = Vec::new();
2061        for host in hosts.iter() {
2062            let mut cmd = Command::new(program.clone());
2063            let boot = Bootstrap::Host {
2064                addr: host.clone(),
2065                config: None,
2066                // The entire purpose of this is to fail:
2067                command: Some(BootstrapCommand::from("false")),
2068                exit_on_shutdown: false,
2069            };
2070            boot.to_env(&mut cmd);
2071            cmd.kill_on_drop(true);
2072            children.push(cmd.spawn().unwrap());
2073        }
2074        let host_mesh =
2075            HostMeshRef::from_hosts(HostMeshId::singleton(Label::new("test").unwrap()), hosts);
2076
2077        let instance = testing::instance();
2078
2079        let err = host_mesh
2080            .spawn(&instance, "test", Extent::unity(), None, None)
2081            .await
2082            .unwrap_err();
2083        assert_matches!(
2084            err,
2085            crate::Error::ProcCreationError { state, .. }
2086            if matches!(state.status, resource::Status::Failed(ref msg) if msg.contains("failed to configure process: Ready(Terminal(Stopped { exit_code: 1"))
2087        );
2088    }
2089
2090    #[cfg(fbcode_build)]
2091    #[assert_no_process_leak]
2092    #[tokio::test]
2093    async fn test_halting_proc_allocation() {
2094        let config = hyperactor_config::global::lock();
2095        let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(20));
2096
2097        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
2098
2099        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
2100
2101        let mut children = Vec::new();
2102
2103        for (index, host) in hosts.iter().enumerate() {
2104            let mut cmd = Command::new(program.clone());
2105            let command = if index == 0 {
2106                let mut command = BootstrapCommand::from("sleep");
2107                command.args.push("60".to_string());
2108                Some(command)
2109            } else {
2110                None
2111            };
2112            let boot = Bootstrap::Host {
2113                addr: host.clone(),
2114                config: None,
2115                command,
2116                exit_on_shutdown: false,
2117            };
2118            boot.to_env(&mut cmd);
2119            cmd.kill_on_drop(true);
2120            children.push(cmd.spawn().unwrap());
2121        }
2122        let host_mesh =
2123            HostMeshRef::from_hosts(HostMeshId::singleton(Label::new("test").unwrap()), hosts);
2124
2125        let instance = testing::instance();
2126
2127        let err = host_mesh
2128            .spawn(&instance, "test", Extent::unity(), None, None)
2129            .await
2130            .unwrap_err();
2131        let statuses = err.into_proc_spawn_error().unwrap();
2132        assert_matches!(
2133            &statuses.materialized_iter(2).cloned().collect::<Vec<_>>()[..],
2134            &[Status::Timeout(_), Status::Running]
2135        );
2136    }
2137
2138    #[tokio::test]
2139    #[cfg(fbcode_build)]
2140    async fn test_client_config_override() {
2141        let config = hyperactor_config::global::lock();
2142        let _guard1 = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
2143        let _guard2 = config.override_key(
2144            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2145            Duration::from_mins(2),
2146        );
2147        let _guard3 = config.override_key(
2148            hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
2149            Duration::from_mins(1),
2150        );
2151        let _guard4 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_mins(2));
2152
2153        // Unset env vars that were mirrored by TestOverride, so child
2154        // processes don't inherit them. This allows Runtime layer to
2155        // override ClientOverride. SAFETY: Single-threaded test under
2156        // global config lock.
2157        unsafe {
2158            std::env::remove_var("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT");
2159            std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT");
2160        }
2161
2162        let instance = testing::instance();
2163
2164        let mut hm = testing::host_mesh(2).await;
2165        let proc_mesh = hm
2166            .spawn(instance, "test", Extent::unity(), None, None)
2167            .await
2168            .unwrap();
2169        let proc_ids = proc_mesh
2170            .proc_ids()
2171            .map(|proc_addr| proc_addr.id().clone())
2172            .collect::<Vec<_>>();
2173        let unique_proc_ids = proc_ids.iter().collect::<std::collections::HashSet<_>>();
2174
2175        assert_eq!(proc_ids.len(), 2);
2176        assert_eq!(unique_proc_ids.len(), proc_ids.len());
2177
2178        let actor_mesh: ActorMesh<testactor::TestActor> =
2179            proc_mesh.spawn(instance, "test", &()).await.unwrap();
2180
2181        let mut attrs_override = Attrs::new();
2182        attrs_override.set(
2183            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2184            Duration::from_mins(3),
2185        );
2186        actor_mesh
2187            .cast(
2188                instance,
2189                SetConfigAttrs(
2190                    bincode::serde::encode_to_vec(&attrs_override, bincode::config::legacy())
2191                        .unwrap(),
2192                ),
2193            )
2194            .unwrap();
2195
2196        let (tx, mut rx) = instance.open_port();
2197        actor_mesh
2198            .cast(instance, GetConfigAttrs(tx.bind()))
2199            .unwrap();
2200        let actual_attrs = rx.recv().await.unwrap();
2201        let actual_attrs =
2202            bincode::serde::decode_from_slice::<Attrs, _>(&actual_attrs, bincode::config::legacy())
2203                .map(|(v, _)| v)
2204                .unwrap();
2205
2206        assert_eq!(
2207            *actual_attrs
2208                .get(hyperactor::config::HOST_SPAWN_READY_TIMEOUT)
2209                .unwrap(),
2210            Duration::from_mins(3)
2211        );
2212        assert_eq!(
2213            *actual_attrs
2214                .get(hyperactor::config::MESSAGE_DELIVERY_TIMEOUT)
2215                .unwrap(),
2216            Duration::from_mins(1)
2217        );
2218
2219        let _ = hm.shutdown(instance).await;
2220    }
2221
2222    // ---- HM-* invariant tests ----
2223    //
2224    // HM-1 (attach-config-complete) is covered by
2225    // `test_client_config_override` above: a successful end-to-end
2226    // attach + per-host config-override observation.
2227    //
2228    // The tests below cover HM-2, HM-3, and HM-4 in a single fixture
2229    // — `attach()` against a host address with no listener. Because
2230    // `testing::TestRootClient::handle::<MeshFailure>` panics on any
2231    // supervision event, a passing test is itself the HM-3
2232    // observation: no `Undeliverable<MessageEnvelope>` reached the
2233    // root client (had the bounce escaped, the test would panic).
2234
2235    /// HM-2 / HM-3 / HM-4: `attach()` against an unreachable host
2236    /// returns a structured `Err` that names the failing host, and
2237    /// the calling actor stays alive (no supervision crash from a
2238    /// bounce on the request path).
2239    #[tokio::test]
2240    async fn test_attach_fails_closed_on_unreachable_host() {
2241        let config = hyperactor_config::global::lock();
2242        // Tighten the per-host timeout so the test doesn't sit on
2243        // the 10 s default.
2244        let _guard = config.override_key(
2245            crate::config::MESH_ATTACH_CONFIG_TIMEOUT,
2246            Duration::from_millis(500),
2247        );
2248
2249        let instance = testing::instance();
2250
2251        // `free_localhost_addr` binds a TCP port and immediately
2252        // drops the listener. SO_REUSEADDR + no further bind means
2253        // sends to this address never connect — exactly the
2254        // production-shape failure mode.
2255        let unreachable = free_localhost_addr();
2256
2257        let id = HostMeshId::instance(Label::new("hm_test").unwrap());
2258        let result = HostMesh::attach(instance, id, vec![unreachable.clone()]).await;
2259
2260        // HM-2: attach returns Err on any failed config push.
2261        let err = match result {
2262            Ok(_) => panic!("HM-2: attach must fail when a host is unreachable"),
2263            Err(e) => e,
2264        };
2265
2266        // HM-4: the structured error names the failing host.
2267        let push_err = match err {
2268            crate::Error::ConfigPushFailed(e) => e,
2269            other => panic!("expected ConfigPushFailed, got: {other:?}"),
2270        };
2271        assert_eq!(push_err.failures.len(), 1);
2272        let (failed_host, _failure) = &push_err.failures[0];
2273        assert_eq!(
2274            failed_host,
2275            &HostRef(unreachable),
2276            "HM-4: failure entry must identify the unreachable host"
2277        );
2278        // Intentionally do NOT pin the `_failure` variant — the
2279        // contract commits to per-host identity, not to a specific
2280        // failure-mode subtype (see ConfigPushFailure's doc).
2281
2282        // HM-3: the test process getting here without panicking is
2283        // itself the assertion. `TestRootClient::handle::<MeshFailure>`
2284        // panics on supervision events; if the request bounce had
2285        // escaped through `Undeliverable<MessageEnvelope>`, the
2286        // root-client's default `handle_undeliverable_message` would
2287        // bail with `UndeliverableMessageError::DeliveryFailure`,
2288        // supervision would fire, and we wouldn't be here.
2289    }
2290
2291    #[tokio::test]
2292    async fn test_sa1_empty_mesh_set_rejected() {
2293        let instance = testing::instance();
2294        let result = spawn_admin(std::iter::empty::<&HostMeshRef>(), instance, None, None).await;
2295        let err = result.unwrap_err().to_string();
2296        assert!(err.contains("SA-1"), "expected SA-1 error, got: {err}");
2297    }
2298
2299    #[tokio::test]
2300    async fn test_sa2_empty_hosts_rejected() {
2301        let instance = testing::instance();
2302        let mesh =
2303            HostMeshRef::from_hosts(HostMeshId::singleton(Label::new("empty").unwrap()), vec![]);
2304        let result = spawn_admin([&mesh], instance, None, None).await;
2305        let err = result.unwrap_err().to_string();
2306        assert!(err.contains("SA-2"), "expected SA-2 error, got: {err}");
2307    }
2308
2309    /// SA-3: `HostSet::insert` is idempotent — inserting the same
2310    /// `ActorAddr` twice does not add a duplicate entry, and first-seen
2311    /// order is preserved. This is a structural property of `HostSet`,
2312    /// not an invariant on `aggregate_hosts` control flow.
2313    #[test]
2314    fn test_sa3_host_set_insert_idempotent() {
2315        let addr_a: ChannelAddr = "tcp:127.0.0.1:2001".parse().unwrap();
2316        let addr_b: ChannelAddr = "tcp:127.0.0.1:2002".parse().unwrap();
2317
2318        let ref_a = HostRef(addr_a.clone()).mesh_agent();
2319        let ref_b = HostRef(addr_b.clone()).mesh_agent();
2320
2321        let mut set = HostSet::new();
2322        set.insert(addr_a.to_string(), ref_a.clone());
2323        set.insert(addr_b.to_string(), ref_b.clone());
2324        // Insert ref_a again — should be a no-op (SA-3).
2325        set.insert("duplicate_addr".to_string(), ref_a.clone());
2326
2327        let result = set.into_vec();
2328        assert_eq!(
2329            result.len(),
2330            2,
2331            "SA-3: duplicate ActorAddr must not add entry"
2332        );
2333        assert_eq!(
2334            result[0].0,
2335            addr_a.to_string(),
2336            "SA-3: first-seen order preserved"
2337        );
2338        assert_eq!(
2339            result[1].0,
2340            addr_b.to_string(),
2341            "SA-3: first-seen order preserved"
2342        );
2343    }
2344
2345    #[test]
2346    fn test_sa3_aggregate_hosts_dedup() {
2347        let addr_a: ChannelAddr = "tcp:127.0.0.1:1001".parse().unwrap();
2348        let addr_b: ChannelAddr = "tcp:127.0.0.1:1002".parse().unwrap();
2349        let addr_c: ChannelAddr = "tcp:127.0.0.1:1003".parse().unwrap();
2350
2351        // mesh_a: hosts a, b
2352        let mesh_a = HostMeshRef::from_hosts(
2353            HostMeshId::singleton(Label::new("mesh-a").unwrap()),
2354            vec![addr_a.clone(), addr_b.clone()],
2355        );
2356        // mesh_b: hosts b, c  (b overlaps with mesh_a)
2357        let mesh_b = HostMeshRef::from_hosts(
2358            HostMeshId::singleton(Label::new("mesh-b").unwrap()),
2359            vec![addr_b.clone(), addr_c.clone()],
2360        );
2361
2362        let result = aggregate_hosts(&[&mesh_a, &mesh_b], None);
2363
2364        // 3 unique hosts: a, b, c — b is deduplicated.
2365        assert_eq!(result.len(), 3, "expected 3 hosts, got {:?}", result);
2366
2367        // First-seen order: a (mesh_a[0]), b (mesh_a[1]), c (mesh_b[1]).
2368        let addrs: Vec<String> = result.iter().map(|(a, _)| a.clone()).collect();
2369        assert_eq!(addrs[0], addr_a.to_string());
2370        assert_eq!(addrs[1], addr_b.to_string());
2371        assert_eq!(addrs[2], addr_c.to_string());
2372    }
2373
2374    /// SA-6 / CH-1: client host entries are deduplicated against the
2375    /// already-aggregated mesh host set.
2376    #[test]
2377    fn test_sa6_ch1_client_host_dedup() {
2378        let addr_a: ChannelAddr = "tcp:127.0.0.1:1001".parse().unwrap();
2379        let addr_b: ChannelAddr = "tcp:127.0.0.1:1002".parse().unwrap();
2380
2381        let mesh = HostMeshRef::from_hosts(
2382            HostMeshId::singleton(Label::new("mesh").unwrap()),
2383            vec![addr_a.clone(), addr_b.clone()],
2384        );
2385
2386        // Client host entry overlaps with addr_a.
2387        let client_ref = HostRef(addr_a.clone()).mesh_agent();
2388        let client_entries = vec![("client_addr".to_string(), client_ref)];
2389
2390        let result = aggregate_hosts(&[&mesh], Some(client_entries));
2391
2392        // addr_a already in mesh — client entry is deduplicated.
2393        assert_eq!(result.len(), 2, "expected 2 hosts, got {:?}", result);
2394        let addrs: Vec<String> = result.iter().map(|(a, _)| a.clone()).collect();
2395        assert_eq!(addrs[0], addr_a.to_string());
2396        assert_eq!(addrs[1], addr_b.to_string());
2397    }
2398}