hyperactor_mesh/
host_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use hyperactor::Actor;
10use hyperactor::Handler;
11use hyperactor::accum::StreamingReducerOpts;
12use hyperactor::channel::ChannelTransport;
13use hyperactor::host::Host;
14use hyperactor::host::LocalProcManager;
15use hyperactor::host::SERVICE_PROC_NAME;
16use hyperactor_config::CONFIG;
17use hyperactor_config::ConfigAttr;
18use hyperactor_config::attrs::declare_attrs;
19use ndslice::view::CollectMeshExt;
20
21use crate::supervision::MeshFailure;
22
23pub mod host_agent;
24
25use std::collections::HashSet;
26use std::hash::Hash;
27use std::ops::Deref;
28use std::ops::DerefMut;
29use std::str::FromStr;
30use std::sync::Arc;
31use std::time::Duration;
32
33use hyperactor::channel::ChannelAddr;
34use hyperactor::context;
35use hyperactor::reference as hyperactor_reference;
36use ndslice::Extent;
37use ndslice::Region;
38use ndslice::ViewExt;
39use ndslice::extent;
40use ndslice::view;
41use ndslice::view::Ranked;
42use ndslice::view::RegionParseError;
43use serde::Deserialize;
44use serde::Serialize;
45use typeuri::Named;
46
47use crate::Bootstrap;
48use crate::Name;
49use crate::ProcMesh;
50use crate::ProcMeshRef;
51use crate::ValueMesh;
52use crate::alloc::Alloc;
53use crate::bootstrap::BootstrapCommand;
54use crate::bootstrap::BootstrapProcManager;
55use crate::bootstrap::ProcBind;
56use crate::host_mesh::host_agent::DrainHostClient;
57pub use crate::host_mesh::host_agent::HostAgent;
58use crate::host_mesh::host_agent::HostAgentMode;
59use crate::host_mesh::host_agent::HostMeshAgentProcMeshTrampoline;
60use crate::host_mesh::host_agent::ProcManagerSpawnFn;
61use crate::host_mesh::host_agent::ProcState;
62use crate::host_mesh::host_agent::SetClientConfigClient;
63use crate::host_mesh::host_agent::ShutdownHostClient;
64use crate::mesh_admin::MeshAdminMessageClient;
65use crate::mesh_controller::HostMeshController;
66use crate::mesh_controller::ProcMeshController;
67use crate::proc_agent::ProcAgent;
68use crate::proc_mesh::ProcRef;
69use crate::resource;
70use crate::resource::CreateOrUpdateClient;
71use crate::resource::GetRankStatus;
72use crate::resource::GetRankStatusClient;
73use crate::resource::RankedValues;
74use crate::resource::Status;
75use crate::resource::WaitRankStatusClient;
76use crate::transport::DEFAULT_TRANSPORT;
77
78/// Actor name for `HostMeshController` when spawned as a named child.
79pub const HOST_MESH_CONTROLLER_NAME: &str = "host_mesh_controller";
80
81/// Actor name for `ProcMeshController` when spawned as a named child.
82pub const PROC_MESH_CONTROLLER_NAME: &str = "proc_mesh_controller";
83
84declare_attrs! {
85    /// The maximum idle time between updates while spawning proc
86    /// meshes.
87    @meta(CONFIG = ConfigAttr::new(
88        Some("HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()),
89        Some("mesh_proc_spawn_max_idle".to_string()),
90    ))
91    pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
92
93    /// The maximum idle time between updates while stopping proc
94    /// meshes.
95    @meta(CONFIG = ConfigAttr::new(
96        Some("HYPERACTOR_MESH_PROC_STOP_MAX_IDLE".to_string()),
97        Some("proc_stop_max_idle".to_string()),
98    ))
99    pub attr PROC_STOP_MAX_IDLE: Duration = Duration::from_secs(30);
100
101    /// The maximum idle time between updates while querying host meshes
102    /// for their proc states.
103    @meta(CONFIG = ConfigAttr::new(
104        Some("HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE".to_string()),
105        Some("get_proc_state_max_idle".to_string()),
106    ))
107    pub attr GET_PROC_STATE_MAX_IDLE: Duration = Duration::from_mins(1);
108}
109
110/// A reference to a single host.
111#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
112pub struct HostRef(ChannelAddr);
113wirevalue::register_type!(HostRef);
114
115impl HostRef {
116    /// The host mesh agent associated with this host.
117    fn mesh_agent(&self) -> hyperactor_reference::ActorRef<HostAgent> {
118        hyperactor_reference::ActorRef::attest(
119            self.service_proc()
120                .actor_id(host_agent::HOST_MESH_AGENT_ACTOR_NAME, 0),
121        )
122    }
123
124    /// The ProcId for the proc with name `name` on this host.
125    fn named_proc(&self, name: &Name) -> hyperactor_reference::ProcId {
126        hyperactor_reference::ProcId::with_name(self.0.clone(), name.to_string())
127    }
128
129    /// The service proc on this host.
130    fn service_proc(&self) -> hyperactor_reference::ProcId {
131        hyperactor_reference::ProcId::with_name(self.0.clone(), SERVICE_PROC_NAME)
132    }
133
134    /// Request an orderly teardown of this host and all procs it
135    /// spawned.
136    ///
137    /// This resolves the per-child grace **timeout** and the maximum
138    /// termination **concurrency** from config and sends a
139    /// [`ShutdownHost`] message to the host's agent. The agent then:
140    ///
141    /// 1) Performs a graceful termination pass over all tracked
142    ///    children (TERM → wait(`timeout`) → KILL), with at most
143    ///    `max_in_flight` running concurrently.
144    /// 2) After the pass completes, **drops the Host**, which also
145    ///    drops the embedded `BootstrapProcManager`. The manager's
146    ///    `Drop` serves as a last-resort safety net (it SIGKILLs
147    ///    anything that somehow remains).
148    ///
149    /// This call returns `Ok(()))` only after the agent has finished
150    /// the termination pass and released the host, so the host is no
151    /// longer reachable when this returns.
152    pub(crate) async fn shutdown(
153        &self,
154        cx: &impl hyperactor::context::Actor,
155    ) -> anyhow::Result<()> {
156        let agent = self.mesh_agent();
157        let terminate_timeout =
158            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
159        let max_in_flight =
160            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
161        agent
162            .shutdown_host(cx, terminate_timeout, max_in_flight.clamp(1, 256))
163            .await?;
164        Ok(())
165    }
166
167    /// Drain all user procs on this host but keep the host, service
168    /// proc, and networking alive. Used during mesh stop/shutdown so
169    /// that forwarder flushes can still reach remote hosts.
170    pub(crate) async fn drain(
171        &self,
172        cx: &impl hyperactor::context::Actor,
173        host_mesh_name: Option<Name>,
174    ) -> anyhow::Result<()> {
175        let agent = self.mesh_agent();
176        let terminate_timeout =
177            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
178        let max_in_flight =
179            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
180        agent
181            .drain_host(
182                cx,
183                terminate_timeout,
184                max_in_flight.clamp(1, 256),
185                host_mesh_name,
186            )
187            .await?;
188        Ok(())
189    }
190}
191
192impl TryFrom<hyperactor_reference::ActorRef<HostAgent>> for HostRef {
193    type Error = crate::Error;
194
195    fn try_from(value: hyperactor_reference::ActorRef<HostAgent>) -> Result<Self, crate::Error> {
196        let proc_id = value.actor_id().proc_id();
197        Ok(HostRef(proc_id.addr().clone()))
198    }
199}
200
201impl std::fmt::Display for HostRef {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        self.0.fmt(f)
204    }
205}
206
207impl FromStr for HostRef {
208    type Err = <ChannelAddr as FromStr>::Err;
209
210    fn from_str(s: &str) -> Result<Self, Self::Err> {
211        Ok(HostRef(ChannelAddr::from_str(s)?))
212    }
213}
214
215/// An owned mesh of hosts.
216///
217/// # Lifecycle
218/// `HostMesh` owns host lifecycles. Callers **must** invoke
219/// [`HostMesh::shutdown`] for deterministic teardown.
220///
221/// In tests and production, prefer explicit shutdown to guarantee
222/// that host agents drop their `BootstrapProcManager`s and that all
223/// child procs are reaped. You can use `shutdown_guard` to get a wrapper
224/// which will try to do a best-effort shutdown on Drop.
225pub struct HostMesh {
226    name: Name,
227    extent: Extent,
228    allocation: HostMeshAllocation,
229    current_ref: HostMeshRef,
230}
231
232/// Allocation backing for an owned [`HostMesh`].
233///
234/// This enum records how the underlying hosts were provisioned, which
235/// in turn determines how their lifecycle is managed:
236///
237/// - `ProcMesh`: Hosts were allocated intrinsically via a
238///   [`ProcMesh`]. The `HostMesh` owns the proc mesh and its service
239///   procs, and dropping the mesh ensures that all spawned child procs
240///   are terminated.
241/// - `Owned`: Hosts were constructed externally and "taken" under
242///   ownership. The `HostMesh` assumes responsibility for their
243///   lifecycle from this point forward, ensuring consistent cleanup on
244///   drop.
245///
246/// Additional variants may be added for other provisioning sources,
247/// but in all cases `HostMesh` is an owned resource that guarantees
248/// no leaked child processes.
249#[allow(dead_code)]
250enum HostMeshAllocation {
251    /// Hosts were allocated intrinsically via a [`ProcMesh`].
252    ///
253    /// In this mode, the `HostMesh` owns both the `ProcMesh` itself
254    /// and the service procs that implement each host. Dropping the
255    /// `HostMesh` also drops the embedded `ProcMesh`, ensuring that
256    /// all spawned child procs are terminated cleanly.
257    ProcMesh {
258        proc_mesh: ProcMesh,
259        proc_mesh_ref: ProcMeshRef,
260        hosts: Vec<HostRef>,
261    },
262    /// Hosts were constructed externally and explicitly transferred
263    /// under ownership by this `HostMesh`.
264    ///
265    /// In this mode, the `HostMesh` assumes responsibility for the
266    /// provided hosts going forward. Dropping the mesh guarantees
267    /// teardown of all associated state and signals to prevent any
268    /// leaked processes.
269    Owned { hosts: Vec<HostRef> },
270}
271
272impl HostMesh {
273    /// Emit a telemetry event for this host mesh creation.
274    fn notify_created(&self) {
275        let name_str = self.name.to_string();
276        let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
277
278        hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
279            id: mesh_id_hash,
280            timestamp: std::time::SystemTime::now(),
281            class: "Host".to_string(),
282            given_name: self.name.name().to_string(),
283            full_name: name_str,
284            shape_json: serde_json::to_string(&self.extent).unwrap_or_default(),
285            parent_mesh_id: None,
286            parent_view_json: None,
287        });
288
289        // Notify telemetry of each HostAgent actor in this mesh.
290        // These are skipped in Proc::spawn_inner. mesh_id directly points to host mesh.
291        let now = std::time::SystemTime::now();
292        for (rank, host) in self.current_ref.hosts().iter().enumerate() {
293            let actor = host.mesh_agent();
294            hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
295                id: hyperactor_telemetry::hash_to_u64(actor.actor_id()),
296                timestamp: now,
297                mesh_id: mesh_id_hash,
298                rank: rank as u64,
299                full_name: actor.actor_id().to_string(),
300                display_name: None,
301            });
302        }
303    }
304
305    /// Bring up a local single-host mesh and, in the launcher
306    /// process, return a `HostMesh` handle for it.
307    ///
308    /// There are two execution modes:
309    ///
310    /// - bootstrap-child mode: if `Bootstrap::get_from_env()` says
311    ///   this process was launched as a bootstrap child, we call
312    ///   `boot.bootstrap().await`, which hands control to the
313    ///   bootstrap logic for this process (as defined by the
314    ///   `BootstrapCommand` the parent used to spawn it). if that
315    ///   call returns, we log the error and terminate. this branch
316    ///   does not produce a `HostMesh`.
317    ///
318    /// - launcher mode: otherwise, we are the process that is setting
319    ///   up the mesh. we create a `Host`, spawn a `HostAgent` in
320    ///   it, and build a single-host `HostMesh` around that. that
321    ///   `HostMesh` is returned to the caller.
322    ///
323    /// This API is intended for tests, examples, and local bring-up,
324    /// not production.
325    ///
326    /// TODO: fix up ownership
327    pub async fn local() -> crate::Result<HostMesh> {
328        Self::local_with_bootstrap(BootstrapCommand::current()?).await
329    }
330
331    /// Same as [`local`], but the caller supplies the
332    /// `BootstrapCommand` instead of deriving it from the current
333    /// process.
334    ///
335    /// The provided `bootstrap_cmd` is used when spawning bootstrap
336    /// children and determines the behavior of
337    /// `boot.bootstrap().await` in those children.
338    pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> crate::Result<HostMesh> {
339        if let Ok(Some(boot)) = Bootstrap::get_from_env() {
340            let result = boot.bootstrap().await;
341            if let Err(err) = result {
342                tracing::error!("failed to bootstrap local host mesh process: {}", err);
343            }
344            std::process::exit(1);
345        }
346
347        let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
348
349        let manager = BootstrapProcManager::new(bootstrap_cmd)?;
350        let host = Host::new(manager, addr).await?;
351        let addr = host.addr().clone();
352        let system_proc = host.system_proc().clone();
353        let host_mesh_agent = system_proc
354            .spawn(
355                "host_agent",
356                HostAgent::new(HostAgentMode::Process {
357                    host,
358                    shutdown_tx: None,
359                }),
360            )
361            .map_err(crate::Error::SingletonActorSpawnError)?;
362        host_mesh_agent.bind::<HostAgent>();
363
364        let host = HostRef(addr);
365        let host_mesh_ref = HostMeshRef::new(
366            Name::new("local").unwrap(),
367            extent!(hosts = 1).into(),
368            vec![host],
369        )?;
370        Ok(HostMesh::take(host_mesh_ref))
371    }
372
373    /// Create a local in-process host mesh where all procs run in the
374    /// current OS process.
375    ///
376    /// Unlike [`local`] which spawns child processes for each proc,
377    /// this method uses [`LocalProcManager`] to run everything
378    /// in-process. This makes all actors visible in the admin tree
379    /// (useful for debugging with the TUI).
380    ///
381    /// This API is intended for tests, examples, and debugging.
382    pub async fn local_in_process() -> crate::Result<HostMesh> {
383        let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
384        Ok(HostMesh::take(Self::local_n_in_process(vec![addr]).await?))
385    }
386
387    /// Create a local in-process host mesh with multiple hosts, where
388    /// all procs run in the current OS process using [`LocalProcManager`].
389    ///
390    /// Each address in `addrs` becomes a separate host. The resulting
391    /// mesh has `extent!(hosts = addrs.len())`.
392    ///
393    /// This API is intended for unit tests that need a multi-host mesh
394    /// within a single process.
395    pub(crate) async fn local_n_in_process(addrs: Vec<ChannelAddr>) -> crate::Result<HostMeshRef> {
396        let n = addrs.len();
397        let mut host_refs = Vec::with_capacity(n);
398        for addr in addrs {
399            host_refs.push(Self::create_in_process_host(addr).await?);
400        }
401        HostMeshRef::new(
402            Name::new("local").unwrap(),
403            extent!(hosts = n).into(),
404            host_refs,
405        )
406    }
407
408    /// Create a single in-process host at the given address, returning
409    /// a [`HostRef`] for it.
410    async fn create_in_process_host(addr: ChannelAddr) -> crate::Result<HostRef> {
411        let spawn: ProcManagerSpawnFn =
412            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
413        let manager = LocalProcManager::new(spawn);
414        let host = Host::new(manager, addr).await?;
415        let addr = host.addr().clone();
416        let system_proc = host.system_proc().clone();
417        let host_mesh_agent = system_proc
418            .spawn(
419                host_agent::HOST_MESH_AGENT_ACTOR_NAME,
420                HostAgent::new(HostAgentMode::Local(host)),
421            )
422            .map_err(crate::Error::SingletonActorSpawnError)?;
423        host_mesh_agent.bind::<HostAgent>();
424        Ok(HostRef(addr))
425    }
426
427    /// Create a new process-based host mesh. Each host is represented by a local process,
428    /// which manages its set of procs. This is not a true host mesh the sense that each host
429    /// is not independent. The intent of `process` is for testing, examples, and experimentation.
430    ///
431    /// The bootstrap command is used to bootstrap both hosts and processes, thus it should be
432    /// a command that reaches [`crate::bootstrap_or_die`]. `process` is itself a valid bootstrap
433    /// entry point; thus using `BootstrapCommand::current` works correctly as long as `process`
434    /// is called early in the lifecycle of the process and reached unconditionally.
435    ///
436    /// TODO: thread through ownership
437    pub async fn process(extent: Extent, command: BootstrapCommand) -> crate::Result<HostMesh> {
438        if let Ok(Some(boot)) = Bootstrap::get_from_env() {
439            let result = boot.bootstrap().await;
440            if let Err(err) = result {
441                tracing::error!("failed to bootstrap process host mesh process: {}", err);
442            }
443            std::process::exit(1);
444        }
445
446        let bind_spec = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT);
447        let mut hosts = Vec::with_capacity(extent.num_ranks());
448        for _ in 0..extent.num_ranks() {
449            // Note: this can be racy. Possibly we should have a callback channel.
450            let addr = bind_spec.binding_addr();
451            let bootstrap = Bootstrap::Host {
452                addr: addr.clone(),
453                command: Some(command.clone()),
454                config: Some(hyperactor_config::global::attrs()),
455                exit_on_shutdown: false,
456            };
457
458            let mut cmd = command.new();
459            bootstrap.to_env(&mut cmd);
460            cmd.spawn()?;
461            hosts.push(HostRef(addr));
462        }
463
464        let host_mesh_ref = HostMeshRef::new(Name::new("process").unwrap(), extent.into(), hosts)?;
465        Ok(HostMesh::take(host_mesh_ref))
466    }
467
468    /// Allocate a host mesh from an [`Alloc`]. This creates a HostMesh with the same extent
469    /// as the provided alloc. Allocs generate procs, and thus we define and run a Host for each
470    /// proc allocated by it.
471    ///
472    /// ## Allocation strategy
473    ///
474    /// Because HostMeshes use direct-addressed procs, and must fully control the procs they are
475    /// managing, `HostMesh::allocate` uses a trampoline actor to launch the host, which in turn
476    /// runs a [`crate::host_mesh::host_agent::HostAgent`] actor to manage the host itself.
477    /// The host (and thus all of its procs) are exposed directly through a separate listening
478    /// channel, established by the host.
479    ///
480    /// ```text
481    ///                        ┌ ─ ─┌────────────────────┐
482    ///                             │allocated Proc:     │
483    ///                        │    │ ┌─────────────────┐│
484    ///                             │ │TrampolineActor  ││
485    ///                        │    │ │ ┌──────────────┐││
486    ///                             │ │ │Host          │││
487    ///               ┌────┬ ─ ┘    │ │ │ ┌──────────┐ │││
488    ///            ┌─▶│Proc│        │ │ │ │HostAgent │ │││
489    ///            │  └────┴ ─ ┐    │ │ │ └──────────┘ │││
490    ///            │  ┌────┐        │ │ │             ██████
491    /// ┌────────┐ ├─▶│Proc│   │    │ │ └──────────────┘││ ▲
492    /// │ Client │─┤  └────┘        │ └─────────────────┘│ listening channel
493    /// └────────┘ │  ┌────┐   └ ─ ─└────────────────────┘
494    ///            ├─▶│Proc│
495    ///            │  └────┘
496    ///            │  ┌────┐
497    ///            └─▶│Proc│
498    ///               └────┘
499    ///                 ▲
500    ///
501    ///          `Alloc`-provided
502    ///                procs
503    /// ```
504    ///
505    /// ## Lifecycle
506    ///
507    /// The returned `HostMesh` **owns** the underlying hosts. Call
508    /// [`shutdown`](Self::shutdown) to deterministically tear them
509    /// down. If you skip shutdown, `Drop` will attempt best-effort
510    /// cleanup only. Do not rely on `Drop` for correctness.
511    pub async fn allocate<C: context::Actor>(
512        cx: &C,
513        alloc: Box<dyn Alloc + Send + Sync>,
514        name: &str,
515        bootstrap_params: Option<BootstrapCommand>,
516    ) -> crate::Result<Self>
517    where
518        C::A: Handler<MeshFailure>,
519    {
520        Self::allocate_inner(cx, alloc, Name::new(name)?, bootstrap_params).await
521    }
522
523    // Use allocate_inner to set field mesh_name in span
524    #[hyperactor::instrument(fields(host_mesh=name.to_string()))]
525    async fn allocate_inner<C: context::Actor>(
526        cx: &C,
527        alloc: Box<dyn Alloc + Send + Sync>,
528        name: Name,
529        bootstrap_params: Option<BootstrapCommand>,
530    ) -> crate::Result<Self>
531    where
532        C::A: Handler<MeshFailure>,
533    {
534        tracing::info!(name = "HostMeshStatus", status = "Allocate::Attempt");
535        let transport = alloc.transport();
536        let extent = alloc.extent().clone();
537        let is_local = alloc.is_local();
538        let proc_mesh = ProcMesh::allocate(cx, alloc, name.name()).await?;
539
540        // TODO: figure out how to deal with MAST allocs. It requires an extra dimension,
541        // into which it launches multiple procs, so we need to always specify an additional
542        // sub-host dimension of size 1.
543
544        let (mesh_agents, mut mesh_agents_rx) = cx.mailbox().open_port();
545        let trampoline_name = Name::new("host_mesh_trampoline").unwrap();
546        let _trampoline_actor_mesh = proc_mesh
547            .spawn_with_name::<HostMeshAgentProcMeshTrampoline, C>(
548                cx,
549                trampoline_name,
550                &(transport, mesh_agents.bind(), bootstrap_params, is_local),
551                None,
552                // The trampoline is a system actor and does not need a controller.
553                true,
554            )
555            .await?;
556
557        // TODO: don't re-rank the hosts
558        let mut hosts = Vec::new();
559        for _rank in 0..extent.num_ranks() {
560            let mesh_agent = mesh_agents_rx.recv().await?;
561
562            let addr = mesh_agent.actor_id().proc_id().addr().clone();
563
564            let host_ref = HostRef(addr);
565            if host_ref.mesh_agent() != mesh_agent {
566                return Err(crate::Error::HostMeshAgentConfigurationError(
567                    mesh_agent.actor_id().clone(),
568                    format!(
569                        "expected mesh agent actor id to be {}",
570                        host_ref.mesh_agent().actor_id()
571                    ),
572                ));
573            }
574            hosts.push(host_ref);
575        }
576
577        let proc_mesh_ref = proc_mesh.clone();
578        let mesh = Self {
579            name: name.clone(),
580            extent: extent.clone(),
581            allocation: HostMeshAllocation::ProcMesh {
582                proc_mesh,
583                proc_mesh_ref,
584                hosts: hosts.clone(),
585            },
586            current_ref: HostMeshRef::new(name, extent.into(), hosts).unwrap(),
587        };
588
589        // Spawn a unique mesh controller for each proc mesh, so the type of the
590        // mesh can be preserved.
591        let controller = HostMeshController::new(mesh.deref().clone());
592        // hyperactor::proc AI-3: controller name must include mesh
593        // identity for proc-wide ActorId uniqueness.
594        let controller_name = format!("{}_{}", HOST_MESH_CONTROLLER_NAME, mesh.name());
595        let controller_handle = controller
596            .spawn_with_name(cx, &controller_name)
597            .map_err(|e| crate::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
598        // Bind the actor's well-known ports (Signal, IntrospectMessage,
599        // Undeliverable). Without this, the controller's mailbox has no
600        // port entries and messages (including introspection queries)
601        // are returned as undeliverable.
602        let _: hyperactor::reference::ActorRef<HostMeshController> = controller_handle.bind();
603
604        tracing::info!(name = "HostMeshStatus", status = "Allocate::Created");
605
606        mesh.notify_created();
607
608        Ok(mesh)
609    }
610
611    /// Take ownership of an existing host mesh reference.
612    ///
613    /// Consumes the `HostMeshRef`, captures its region/hosts, and
614    /// returns an owned `HostMesh` that assumes lifecycle
615    /// responsibility for those hosts (i.e., will shut them down on
616    /// Drop).
617    pub fn take(mesh: HostMeshRef) -> Self {
618        let region = mesh.region().clone();
619        let hosts: Vec<HostRef> = mesh.values().collect();
620
621        let current_ref = HostMeshRef::new(mesh.name.clone(), region.clone(), hosts.clone())
622            .expect("region/hosts cardinality must match");
623
624        let result = Self {
625            name: mesh.name,
626            extent: region.extent().clone(),
627            allocation: HostMeshAllocation::Owned { hosts },
628            current_ref,
629        };
630        result.notify_created();
631        result
632    }
633
634    /// Attach to pre-existing workers and push client config.
635    ///
636    /// This is the "simple bootstrap" attach protocol:
637    /// 1. Wraps the provided addresses into a `HostMeshRef`.
638    /// 2. Snapshots `propagatable_attrs()` from the client's global config.
639    /// 3. Pushes the config to each host agent as `Source::ClientOverride`,
640    ///    with a barrier to confirm installation.
641    /// 4. Returns the owned `HostMesh`.
642    ///
643    /// After this returns, host agents have the client's config.
644    pub async fn attach(
645        cx: &impl context::Actor,
646        name: Name,
647        addresses: Vec<ChannelAddr>,
648    ) -> crate::Result<Self> {
649        let mesh_ref = HostMeshRef::from_hosts(name, addresses);
650        let config = hyperactor_config::global::propagatable_attrs();
651        mesh_ref.push_config(cx, config).await;
652        Ok(Self::take(mesh_ref))
653    }
654
655    /// Request a clean shutdown of all hosts owned by this
656    /// `HostMesh`.
657    ///
658    /// Uses a two-phase approach:
659    /// 1. **Terminate children** on every host concurrently. Service
660    ///    infrastructure (host agent, comm proc, networking) stays
661    ///    alive so that forwarder flushes can still reach remote hosts.
662    /// 2. **Shut down hosts** concurrently. No user procs remain, so
663    ///    this is fast and cannot deadlock on cross-host flush
664    ///    timeouts.
665    #[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
666    pub async fn shutdown(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
667        let t0 = std::time::Instant::now();
668        tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
669
670        // Phase 1: terminate all user procs while service infrastructure
671        // stays alive so forwarder flushes can complete across hosts.
672        let results = futures::future::join_all(
673            self.current_ref
674                .values()
675                .map(|host| async move { host.drain(cx, None).await }),
676        )
677        .await;
678        let phase1_ms = t0.elapsed().as_millis();
679        for result in &results {
680            if let Err(e) = result {
681                tracing::warn!(
682                    name = "HostMeshStatus",
683                    status = "Shutdown::Drain::Failed",
684                    error = %e,
685                    "drain failed on a host"
686                );
687            }
688        }
689
690        // Phase 2: shut down hosts concurrently. No user procs remain.
691        let t1 = std::time::Instant::now();
692        let results = futures::future::join_all(self.current_ref.values().map(|host| async move {
693            let result = host.shutdown(cx).await;
694            (host, result)
695        }))
696        .await;
697        let phase2_ms = t1.elapsed().as_millis();
698        let total_ms = t0.elapsed().as_millis();
699        let mut failed_hosts = vec![];
700        for (host, result) in &results {
701            if let Err(e) = result {
702                tracing::warn!(
703                    name = "HostMeshStatus",
704                    status = "Shutdown::Host::Failed",
705                    host = %host,
706                    error = %e,
707                    "host shutdown failed"
708                );
709                failed_hosts.push(host);
710            }
711        }
712        if failed_hosts.is_empty() {
713            tracing::info!(
714                name = "HostMeshStatus",
715                status = "Shutdown::Success",
716                phase1_ms,
717                phase2_ms,
718                total_ms,
719            );
720        } else {
721            tracing::error!(
722                name = "HostMeshStatus",
723                status = "Shutdown::Failed",
724                phase1_ms,
725                phase2_ms,
726                total_ms,
727                "host mesh shutdown failed; check the logs of the failed hosts for details: {:?}",
728                failed_hosts
729            );
730        }
731
732        match &mut self.allocation {
733            HostMeshAllocation::ProcMesh { proc_mesh, .. } => {
734                proc_mesh.stop(cx, "host mesh shutdown".to_string()).await?;
735            }
736            HostMeshAllocation::Owned { .. } => {}
737        }
738        Ok(())
739    }
740
741    /// Consumes and wraps this HostMesh with a HostMeshShutdownGuard, which will
742    /// ensure shutdown is run on Drop.
743    pub fn shutdown_guard(self) -> HostMeshShutdownGuard {
744        HostMeshShutdownGuard(self)
745    }
746
747    /// Stop all hosts owned by this `HostMesh`, draining user procs
748    /// but keeping worker processes and their sockets alive for
749    /// reconnection.
750    ///
751    /// After `stop`, the same worker addresses can be passed to
752    /// [`HostMesh::attach`] to create a new mesh.
753    #[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
754    pub async fn stop(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
755        let t0 = std::time::Instant::now();
756        tracing::info!(name = "HostMeshStatus", status = "Stop::Attempt");
757
758        let mesh_name = self.name.clone();
759        let results = futures::future::join_all(self.current_ref.values().map(|host| {
760            let mesh_name = Some(mesh_name.clone());
761            async move { host.drain(cx, mesh_name).await }
762        }))
763        .await;
764        let total_ms = t0.elapsed().as_millis();
765        let mut failed_hosts = vec![];
766        for (i, result) in results.iter().enumerate() {
767            if let Err(e) = result {
768                tracing::warn!(
769                    name = "HostMeshStatus",
770                    status = "Stop::Drain::Failed",
771                    error = %e,
772                    "drain failed on a host"
773                );
774                failed_hosts.push(i);
775            }
776        }
777        if failed_hosts.is_empty() {
778            tracing::info!(name = "HostMeshStatus", status = "Stop::Success", total_ms,);
779        } else {
780            tracing::error!(
781                name = "HostMeshStatus",
782                status = "Stop::Failed",
783                total_ms,
784                "host mesh stop failed; check the logs of the failed hosts for details: {:?}",
785                failed_hosts
786            );
787        }
788
789        // Defuse the Drop impl so it doesn't send ShutdownHost to hosts
790        // we intentionally kept alive. Replace the allocation with an
791        // empty Owned variant so Drop has no hosts to iterate.
792        self.allocation = HostMeshAllocation::Owned { hosts: vec![] };
793
794        Ok(())
795    }
796}
797
798impl HostMesh {
799    /// Set the bootstrap command on the underlying `HostMeshRef`,
800    /// so that future `spawn` calls use it. Unlike
801    /// `HostMeshRef::with_bootstrap` this mutates in place,
802    /// preserving ownership.
803    pub fn set_bootstrap(&mut self, cmd: BootstrapCommand) {
804        self.current_ref = self.current_ref.clone().with_bootstrap(cmd);
805    }
806}
807
808impl Deref for HostMesh {
809    type Target = HostMeshRef;
810
811    fn deref(&self) -> &Self::Target {
812        &self.current_ref
813    }
814}
815
816impl AsRef<HostMeshRef> for HostMesh {
817    fn as_ref(&self) -> &HostMeshRef {
818        self
819    }
820}
821
822impl AsRef<HostMeshRef> for HostMeshRef {
823    fn as_ref(&self) -> &HostMeshRef {
824        self
825    }
826}
827
828/// Wrapper around HostMesh that runs shutdown on Drop.
829pub struct HostMeshShutdownGuard(pub HostMesh);
830
831impl Deref for HostMeshShutdownGuard {
832    type Target = HostMesh;
833
834    fn deref(&self) -> &HostMesh {
835        &self.0
836    }
837}
838
839impl DerefMut for HostMeshShutdownGuard {
840    fn deref_mut(&mut self) -> &mut HostMesh {
841        &mut self.0
842    }
843}
844
845impl Drop for HostMeshShutdownGuard {
846    /// Best-effort cleanup for owned host meshes on drop.
847    ///
848    /// When a `HostMesh` is dropped, it attempts to shut down all
849    /// hosts it owns:
850    /// - If a Tokio runtime is available, we spawn an ephemeral
851    ///   `Proc` + `Instance` and send `ShutdownHost` messages to each
852    ///   host. This ensures that the embedded `BootstrapProcManager`s
853    ///   are dropped, and all child procs they spawned are killed.
854    /// - If no runtime is available, we cannot perform async cleanup
855    ///   here; in that case we log a warning and rely on kernel-level
856    ///   PDEATHSIG or the individual `BootstrapProcManager`'s `Drop`
857    ///   as the final safeguard.
858    ///
859    /// This path is **last resort**: callers should prefer explicit
860    /// [`HostMesh::shutdown`] to guarantee orderly teardown. Drop
861    /// only provides opportunistic cleanup to prevent process leaks
862    /// if shutdown is skipped.
863    fn drop(&mut self) {
864        tracing::info!(
865            name = "HostMeshStatus",
866            host_mesh = %self.0.name,
867            status = "Dropping",
868        );
869        // Snapshot the owned hosts we're responsible for.
870        let hosts: Vec<HostRef> = match &self.0.allocation {
871            HostMeshAllocation::ProcMesh { hosts, .. } | HostMeshAllocation::Owned { hosts } => {
872                hosts.clone()
873            }
874        };
875
876        // Best-effort only when a Tokio runtime is available.
877        if let Ok(handle) = tokio::runtime::Handle::try_current() {
878            let mesh_name = self.0.name.clone();
879            let allocation_label = match &self.0.allocation {
880                HostMeshAllocation::ProcMesh { .. } => "proc_mesh",
881                HostMeshAllocation::Owned { .. } => "owned",
882            }
883            .to_string();
884
885            handle.spawn(async move {
886                let span = tracing::info_span!(
887                    "hostmesh_drop_cleanup",
888                    host_mesh = %mesh_name,
889                    allocation = %allocation_label,
890                    hosts = hosts.len(),
891                );
892                let _g = span.enter();
893
894                // Spin up a tiny ephemeral proc+instance to get an
895                // Actor context.
896                match hyperactor::Proc::direct(
897                    ChannelTransport::Unix.any(),
898                    "hostmesh-drop".to_string(),
899                )
900                {
901                    Err(e) => {
902                        tracing::warn!(
903                            error = %e,
904                            "failed to construct ephemeral Proc for drop-cleanup; \
905                             relying on PDEATHSIG/manager Drop"
906                        );
907                    }
908                    Ok(proc) => {
909                        match proc.instance("drop") {
910                            Err(e) => {
911                                tracing::warn!(
912                                    error = %e,
913                                    "failed to create ephemeral instance for drop-cleanup; \
914                                     relying on PDEATHSIG/manager Drop"
915                                );
916                            }
917                            Ok((instance, _guard)) => {
918                                let mut attempted = 0usize;
919                                let mut ok = 0usize;
920                                let mut err = 0usize;
921
922                                for host in hosts {
923                                    attempted += 1;
924                                    tracing::debug!(host = %host, "drop-cleanup: shutdown start");
925                                    match host.shutdown(&instance).await {
926                                        Ok(()) => {
927                                            ok += 1;
928                                            tracing::debug!(host = %host, "drop-cleanup: shutdown ok");
929                                        }
930                                        Err(e) => {
931                                            err += 1;
932                                            tracing::warn!(host = %host, error = %e, "drop-cleanup: shutdown failed");
933                                        }
934                                    }
935                                }
936
937                                tracing::info!(
938                                    attempted, ok, err,
939                                    "hostmesh drop-cleanup summary"
940                                );
941                            }
942                        }
943                    }
944                }
945            });
946        } else {
947            // No runtime here; PDEATHSIG and manager Drop remain the
948            // last-resort safety net.
949            tracing::warn!(
950                host_mesh = %self.0.name,
951                hosts = hosts.len(),
952                "HostMesh dropped without a Tokio runtime; skipping \
953                 best-effort shutdown. This indicates that .shutdown() \
954                 on this mesh has not been called before program exit \
955                 (perhaps due to a missing call to \
956                 'monarch.actor.shutdown_context()'?) This in turn can \
957                 lead to backtrace output due to folly SIGTERM \
958                 handlers."
959            );
960        }
961
962        tracing::info!(
963            name = "HostMeshStatus",
964            host_mesh = %self.0.name,
965            status = "Dropped",
966        );
967    }
968}
969
970/// Helper: legacy shim for error types that still require
971/// RankedValues<Status>. TODO(shayne-fletcher): Delete this
972/// shim once Error::ActorSpawnError carries a StatusMesh
973/// (ValueMesh<Status>) directly. At that point, use the mesh
974/// as-is and remove `mesh_to_rankedvalues_*` calls below.
975/// is_sentinel should return true if the value matches a previous filled in
976/// value. If the input value matches the sentinel, it gets replaced with the
977/// default.
978pub(crate) fn mesh_to_rankedvalues_with_default<T, F>(
979    mesh: &ValueMesh<T>,
980    default: T,
981    is_sentinel: F,
982    len: usize,
983) -> RankedValues<T>
984where
985    T: Eq + Clone + 'static,
986    F: Fn(&T) -> bool,
987{
988    let mut out = RankedValues::from((0..len, default));
989    for (i, s) in mesh.values().enumerate() {
990        if !is_sentinel(&s) {
991            out.merge_from(RankedValues::from((i..i + 1, s)));
992        }
993    }
994    out
995}
996
997/// A non-owning reference to a mesh of hosts.
998///
999/// Logically, this is a data structure that contains a set of ranked
1000/// hosts organized into a [`Region`]. `HostMeshRef`s can be sliced to
1001/// produce new references that contain a subset of the hosts in the
1002/// original mesh.
1003///
1004/// `HostMeshRef`s have a concrete syntax, implemented by its
1005/// `Display` and `FromStr` implementations.
1006///
1007/// This type does **not** control lifecycle. It only describes the
1008/// topology of hosts. To take ownership and perform deterministic
1009/// teardown, use [`HostMesh::take`], which returns an owned
1010/// [`HostMesh`] that guarantees cleanup on `shutdown()` or `Drop`.
1011///
1012/// Cloning this type does not confer ownership. If a corresponding
1013/// owned [`HostMesh`] shuts down the hosts, operations via a cloned
1014/// `HostMeshRef` may fail because the hosts are no longer running.
1015#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
1016pub struct HostMeshRef {
1017    name: Name,
1018    region: Region,
1019    ranks: Arc<Vec<HostRef>>,
1020    /// Bootstrap command to use when spawning procs on this mesh.
1021    /// When `None`, each host agent uses its own default command.
1022    #[serde(default)]
1023    pub bootstrap_command: Option<BootstrapCommand>,
1024}
1025wirevalue::register_type!(HostMeshRef);
1026
1027impl HostMeshRef {
1028    /// Create a new (raw) HostMeshRef from the provided region and associated
1029    /// ranks, which must match in cardinality.
1030    #[allow(clippy::result_large_err)]
1031    fn new(name: Name, region: Region, ranks: Vec<HostRef>) -> crate::Result<Self> {
1032        if region.num_ranks() != ranks.len() {
1033            return Err(crate::Error::InvalidRankCardinality {
1034                expected: region.num_ranks(),
1035                actual: ranks.len(),
1036            });
1037        }
1038        Ok(Self {
1039            name,
1040            region,
1041            ranks: Arc::new(ranks),
1042            bootstrap_command: None,
1043        })
1044    }
1045
1046    /// Create a new HostMeshRef from an arbitrary set of hosts. This is meant to
1047    /// enable extrinsic bootstrapping.
1048    pub fn from_hosts(name: Name, hosts: Vec<ChannelAddr>) -> Self {
1049        Self {
1050            name,
1051            region: extent!(hosts = hosts.len()).into(),
1052            ranks: Arc::new(hosts.into_iter().map(HostRef).collect()),
1053            bootstrap_command: None,
1054        }
1055    }
1056
1057    /// Create a new HostMeshRef from an arbitrary set of host mesh agents.
1058    pub fn from_host_agents(
1059        name: Name,
1060        agents: Vec<hyperactor_reference::ActorRef<HostAgent>>,
1061    ) -> crate::Result<Self> {
1062        Ok(Self {
1063            name,
1064            region: extent!(hosts = agents.len()).into(),
1065            ranks: Arc::new(
1066                agents
1067                    .into_iter()
1068                    .map(HostRef::try_from)
1069                    .collect::<crate::Result<_>>()?,
1070            ),
1071            bootstrap_command: None,
1072        })
1073    }
1074
1075    /// Create a unit HostMeshRef from a host mesh agent.
1076    pub fn from_host_agent(
1077        name: Name,
1078        agent: hyperactor_reference::ActorRef<HostAgent>,
1079    ) -> crate::Result<Self> {
1080        Ok(Self {
1081            name,
1082            region: Extent::unity().into(),
1083            ranks: Arc::new(vec![HostRef::try_from(agent)?]),
1084            bootstrap_command: None,
1085        })
1086    }
1087
1088    /// Return a new `HostMeshRef` that will use `cmd` when spawning procs,
1089    /// overriding the host agent's default bootstrap command.
1090    pub fn with_bootstrap(self, cmd: BootstrapCommand) -> Self {
1091        Self {
1092            bootstrap_command: Some(cmd),
1093            ..self
1094        }
1095    }
1096
1097    /// Returns the host entries as `(addr_string, ActorRef<HostAgent>)` pairs.
1098    /// Used by `MeshAdminAgent::effective_hosts()` to merge C into the
1099    /// admin's host list (see CH-1 in mesh_admin module doc).
1100    pub(crate) fn host_entries(&self) -> Vec<(String, hyperactor_reference::ActorRef<HostAgent>)> {
1101        self.ranks
1102            .iter()
1103            .map(|h| (h.0.to_string(), h.mesh_agent()))
1104            .collect()
1105    }
1106
1107    /// Push client config to all host agents in this mesh, in parallel.
1108    ///
1109    /// Each host installs the attrs as `Source::ClientOverride`.
1110    /// Idempotent: sending the same attrs twice replaces the layer.
1111    ///
1112    /// Sends request-reply to each host and barriers on all replies.
1113    /// Best-effort: on timeout or error, logs a warning and continues.
1114    /// Timeout controlled by `MESH_ATTACH_CONFIG_TIMEOUT` (default 10s).
1115    pub(crate) async fn push_config(
1116        &self,
1117        cx: &impl context::Actor,
1118        attrs: hyperactor_config::attrs::Attrs,
1119    ) {
1120        let timeout = hyperactor_config::global::get(crate::config::MESH_ATTACH_CONFIG_TIMEOUT);
1121        let hosts: Vec<_> = self.values().collect();
1122        let num_hosts = hosts.len();
1123
1124        let barrier = futures::future::join_all(hosts.into_iter().map(|host| {
1125            let attrs = attrs.clone();
1126            let agent_id = host.mesh_agent().actor_id().clone();
1127            async move {
1128                match host.mesh_agent().set_client_config(cx, attrs).await {
1129                    Ok(()) => {
1130                        tracing::debug!(host = %agent_id, "host agent config installed");
1131                        true
1132                    }
1133                    Err(e) => {
1134                        tracing::warn!(
1135                            host = %agent_id,
1136                            error = %e,
1137                            "failed to push client config to host agent, \
1138                             continuing without it",
1139                        );
1140                        false
1141                    }
1142                }
1143            }
1144        }));
1145
1146        match tokio::time::timeout(timeout, barrier).await {
1147            Ok(results) => {
1148                let success = results.iter().filter(|&&r| r).count();
1149                let failed = num_hosts - success;
1150                tracing::info!(
1151                    success = success,
1152                    failed = failed,
1153                    "push_config barrier complete",
1154                );
1155            }
1156            Err(_) => {
1157                tracing::warn!(
1158                    num_hosts = num_hosts,
1159                    timeout_secs = timeout.as_secs(),
1160                    "push_config barrier timed out, some hosts may not \
1161                     have received client config",
1162                );
1163            }
1164        }
1165    }
1166
1167    /// Spawn a ProcMesh onto this host mesh. The per_host extent specifies the shape
1168    /// of the procs to spawn on each host.
1169    ///
1170    /// `proc_bind`, when provided, is a per-process CPU/NUMA binding
1171    /// configuration. Its length must equal the number of ranks in
1172    /// `per_host`. Each entry maps binding keys (`cpunodebind`,
1173    /// `membind`, `physcpubind`, `cpus`) to their values.
1174    /// Only takes effect when running on Linux.
1175    ///
1176    /// Currently, spawn issues direct calls to each host agent. This will be fixed by
1177    /// maintaining a comm actor on the host service procs themselves.
1178    #[allow(clippy::result_large_err)]
1179    pub async fn spawn<C: context::Actor>(
1180        &self,
1181        cx: &C,
1182        name: &str,
1183        per_host: Extent,
1184        proc_bind: Option<Vec<ProcBind>>,
1185    ) -> crate::Result<ProcMesh>
1186    where
1187        C::A: Handler<MeshFailure>,
1188    {
1189        self.spawn_inner(cx, Name::new(name)?, per_host, proc_bind)
1190            .await
1191    }
1192
1193    #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
1194    async fn spawn_inner<C: context::Actor>(
1195        &self,
1196        cx: &C,
1197        proc_mesh_name: Name,
1198        per_host: Extent,
1199        proc_bind: Option<Vec<ProcBind>>,
1200    ) -> crate::Result<ProcMesh>
1201    where
1202        C::A: Handler<MeshFailure>,
1203    {
1204        tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt");
1205        tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",);
1206        let result = self
1207            .spawn_inner_inner(cx, proc_mesh_name, per_host, proc_bind)
1208            .await;
1209        match &result {
1210            Ok(_) => {
1211                tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success");
1212                tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success");
1213            }
1214            Err(error) => {
1215                tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error);
1216                tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error);
1217            }
1218        }
1219        result
1220    }
1221
1222    async fn spawn_inner_inner<C: context::Actor>(
1223        &self,
1224        cx: &C,
1225        proc_mesh_name: Name,
1226        per_host: Extent,
1227        proc_bind: Option<Vec<ProcBind>>,
1228    ) -> crate::Result<ProcMesh>
1229    where
1230        C::A: Handler<MeshFailure>,
1231    {
1232        let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
1233        let host_labels = self.region.labels().iter().collect::<HashSet<_>>();
1234        if !per_host_labels
1235            .intersection(&host_labels)
1236            .collect::<Vec<_>>()
1237            .is_empty()
1238        {
1239            return Err(crate::Error::ConfigurationError(anyhow::anyhow!(
1240                "per_host dims overlap with existing dims when spawning proc mesh"
1241            )));
1242        }
1243        if let Some(proc_bind) = proc_bind.as_ref() {
1244            if proc_bind.len() != per_host.num_ranks() {
1245                return Err(crate::Error::ConfigurationError(anyhow::anyhow!(
1246                    "proc_bind length does not match per_host extent"
1247                )));
1248            }
1249        }
1250
1251        let extent = self
1252            .region
1253            .extent()
1254            .concat(&per_host)
1255            .map_err(|err| crate::Error::ConfigurationError(err.into()))?;
1256
1257        let region: Region = extent.clone().into();
1258
1259        tracing::info!(
1260            name = "ProcMeshStatus",
1261            status = "Spawn::Attempt",
1262            %region,
1263            "spawning proc mesh"
1264        );
1265
1266        let mut procs = Vec::new();
1267        let num_ranks = region.num_ranks();
1268        // Accumulator outputs full StatusMesh snapshots; seed with
1269        // NotExist.
1270        let (port, rx) = cx.mailbox().open_accum_port_opts(
1271            crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1272            StreamingReducerOpts {
1273                max_update_interval: Some(Duration::from_millis(50)),
1274                initial_update_interval: None,
1275            },
1276        );
1277
1278        // Create or update each proc, then fence on receiving status
1279        // overlays. This prevents a race where procs become
1280        // addressable before their local muxers are ready, which
1281        // could make early messages unroutable. A future improvement
1282        // would allow buffering in the host-level muxer to eliminate
1283        // the need for this synchronization step.
1284        let mut proc_names = Vec::new();
1285        let client_config_override = hyperactor_config::global::propagatable_attrs();
1286        for (host_rank, host) in self.ranks.iter().enumerate() {
1287            for per_host_rank in 0..per_host.num_ranks() {
1288                let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
1289                let proc_name = Name::new(format!("{}_{}", proc_mesh_name.name(), per_host_rank))?;
1290                proc_names.push(proc_name.clone());
1291                let bind = proc_bind.as_ref().map(|v| v[per_host_rank].clone());
1292                let proc_spec = resource::ProcSpec {
1293                    client_config_override: client_config_override.clone(),
1294                    bootstrap_command: self.bootstrap_command.clone(),
1295                    proc_bind: bind,
1296                    host_mesh_name: Some(self.name.clone()),
1297                };
1298                host.mesh_agent()
1299                    .create_or_update(
1300                        cx,
1301                        proc_name.clone(),
1302                        resource::Rank::new(create_rank),
1303                        proc_spec,
1304                    )
1305                    .await
1306                    .map_err(|e| {
1307                        crate::Error::HostMeshAgentConfigurationError(
1308                            host.mesh_agent().actor_id().clone(),
1309                            format!("failed while creating proc: {}", e),
1310                        )
1311                    })?;
1312                let mut reply_port = port.bind();
1313                // If this proc dies or some other issue renders the reply undeliverable,
1314                // the reply does not need to be returned to the sender.
1315                reply_port.return_undeliverable(false);
1316                host.mesh_agent()
1317                    .get_rank_status(cx, proc_name.clone(), reply_port)
1318                    .await
1319                    .map_err(|e| {
1320                        crate::Error::HostMeshAgentConfigurationError(
1321                            host.mesh_agent().actor_id().clone(),
1322                            format!("failed while querying proc status: {}", e),
1323                        )
1324                    })?;
1325                let proc_id = host.named_proc(&proc_name);
1326                tracing::info!(
1327                    name = "ProcMeshStatus",
1328                    status = "Spawn::CreatingProc",
1329                    %proc_id,
1330                    rank = create_rank,
1331                );
1332                procs.push(ProcRef::new(
1333                    proc_id,
1334                    create_rank,
1335                    // TODO: specify or retrieve from state instead, to avoid attestation.
1336                    hyperactor_reference::ActorRef::attest(
1337                        host.named_proc(&proc_name)
1338                            .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
1339                    ),
1340                ));
1341            }
1342        }
1343
1344        let start_time = tokio::time::Instant::now();
1345
1346        // Wait on accumulated StatusMesh snapshots until complete or
1347        // timeout.
1348        match GetRankStatus::wait(
1349            rx,
1350            num_ranks,
1351            hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1352            region.clone(), // fallback mesh if nothing arrives
1353        )
1354        .await
1355        {
1356            Ok(statuses) => {
1357                // If any rank is terminating, surface a
1358                // ProcCreationError pointing at that rank.
1359                if let Some((rank, status)) = statuses
1360                    .values()
1361                    .enumerate()
1362                    .find(|(_, s)| s.is_terminating())
1363                {
1364                    let proc_name = &proc_names[rank];
1365                    let host_rank = rank / per_host.num_ranks();
1366                    let mesh_agent = self.ranks[host_rank].mesh_agent();
1367                    let (reply_tx, mut reply_rx) = cx.mailbox().open_port();
1368                    let mut reply_tx = reply_tx.bind();
1369                    // If this proc dies or some other issue renders the reply undeliverable,
1370                    // the reply does not need to be returned to the sender.
1371                    reply_tx.return_undeliverable(false);
1372                    mesh_agent
1373                        .send(
1374                            cx,
1375                            resource::GetState {
1376                                name: proc_name.clone(),
1377                                reply: reply_tx,
1378                            },
1379                        )
1380                        .map_err(|e| {
1381                            crate::Error::SendingError(mesh_agent.actor_id().clone(), e.into())
1382                        })?;
1383                    let state = match tokio::time::timeout(
1384                        hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1385                        reply_rx.recv(),
1386                    )
1387                    .await
1388                    {
1389                        Ok(Ok(state)) => state,
1390                        _ => resource::State {
1391                            name: proc_name.clone(),
1392                            status,
1393                            state: None,
1394                            generation: 0,
1395                            timestamp: std::time::SystemTime::now(),
1396                        },
1397                    };
1398
1399                    tracing::error!(
1400                        name = "ProcMeshStatus",
1401                        status = "Spawn::GetRankStatus",
1402                        rank = host_rank,
1403                        "rank {} is terminating with state: {}",
1404                        host_rank,
1405                        state
1406                    );
1407
1408                    return Err(crate::Error::ProcCreationError {
1409                        state: Box::new(state),
1410                        host_rank,
1411                        mesh_agent,
1412                    });
1413                }
1414            }
1415            Err(complete) => {
1416                tracing::error!(
1417                    name = "ProcMeshStatus",
1418                    status = "Spawn::GetRankStatus",
1419                    "timeout after {:?} when waiting for procs being created",
1420                    hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1421                );
1422                // Fill remaining ranks with a timeout status via the
1423                // legacy shim.
1424                let legacy = mesh_to_rankedvalues_with_default(
1425                    &complete,
1426                    Status::Timeout(start_time.elapsed()),
1427                    Status::is_not_exist,
1428                    num_ranks,
1429                );
1430                return Err(crate::Error::ProcSpawnError { statuses: legacy });
1431            }
1432        }
1433
1434        let mesh =
1435            ProcMesh::create_owned_unchecked(cx, proc_mesh_name, extent, self.clone(), procs).await;
1436        if let Ok(ref mesh) = mesh {
1437            // Spawn a unique mesh controller for each proc mesh, so the type of the
1438            // mesh can be preserved.
1439            let controller = ProcMeshController::new(mesh.deref().clone());
1440            // hyperactor::proc AI-3: controller name must include mesh
1441            // identity for proc-wide ActorId uniqueness.
1442            let controller_name = format!("{}_{}", PROC_MESH_CONTROLLER_NAME, mesh.name());
1443            let controller_handle = controller
1444                .spawn_with_name(cx, &controller_name)
1445                .map_err(|e| crate::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
1446            // Bind the actor's well-known ports (Signal, IntrospectMessage,
1447            // Undeliverable). Without this, the controller's mailbox has no
1448            // port entries and messages (including introspection queries)
1449            // are returned as undeliverable.
1450            let _: hyperactor::reference::ActorRef<ProcMeshController> = controller_handle.bind();
1451        }
1452        mesh
1453    }
1454
1455    /// The name of the referenced host mesh.
1456    pub fn name(&self) -> &Name {
1457        &self.name
1458    }
1459
1460    /// The host references (channel addresses) in rank order.
1461    pub fn hosts(&self) -> &[HostRef] {
1462        &self.ranks
1463    }
1464
1465    #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
1466    pub(crate) async fn stop_proc_mesh(
1467        &self,
1468        cx: &impl hyperactor::context::Actor,
1469        proc_mesh_name: &Name,
1470        procs: impl IntoIterator<Item = hyperactor_reference::ProcId>,
1471        region: Region,
1472        reason: String,
1473    ) -> anyhow::Result<()> {
1474        // Accumulator outputs full StatusMesh snapshots; seed with
1475        // NotExist.
1476        let mut proc_names = Vec::new();
1477        let num_ranks = region.num_ranks();
1478        // Accumulator outputs full StatusMesh snapshots; seed with
1479        // NotExist.
1480        let (port, rx) = cx.mailbox().open_accum_port_opts(
1481            crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1482            StreamingReducerOpts {
1483                max_update_interval: Some(Duration::from_millis(50)),
1484                initial_update_interval: None,
1485            },
1486        );
1487        for proc_id in procs.into_iter() {
1488            let (addr, proc_name) = (proc_id.addr().clone(), proc_id.name().to_string());
1489            // The name stored in HostAgent is not the same as the
1490            // one stored in the ProcMesh. We instead take each proc id
1491            // and map it to that particular agent.
1492            let proc_name = proc_name.parse::<Name>()?;
1493            proc_names.push(proc_name.clone());
1494
1495            // Note that we don't send 1 message per host agent, we send 1 message
1496            // per proc.
1497            let host = HostRef(addr);
1498            host.mesh_agent().send(
1499                cx,
1500                resource::Stop {
1501                    name: proc_name.clone(),
1502                    reason: reason.clone(),
1503                },
1504            )?;
1505            host.mesh_agent()
1506                .wait_rank_status(cx, proc_name, Status::Stopped, port.bind())
1507                .await?;
1508
1509            tracing::info!(
1510                name = "ProcMeshStatus",
1511                %proc_id,
1512                status = "Stop::Sent",
1513            );
1514        }
1515        tracing::info!(
1516            name = "HostMeshStatus",
1517            status = "ProcMesh::Stop::Sent",
1518            "sending Stop to proc mesh for {} procs: {}",
1519            proc_names.len(),
1520            proc_names
1521                .iter()
1522                .map(|n| n.to_string())
1523                .collect::<Vec<_>>()
1524                .join(", ")
1525        );
1526
1527        let start_time = tokio::time::Instant::now();
1528
1529        match GetRankStatus::wait(
1530            rx,
1531            num_ranks,
1532            hyperactor_config::global::get(PROC_STOP_MAX_IDLE),
1533            region.clone(), // fallback mesh if nothing arrives
1534        )
1535        .await
1536        {
1537            Ok(statuses) => {
1538                let all_stopped = statuses.values().all(|s| s.is_terminated());
1539                if !all_stopped {
1540                    tracing::error!(
1541                        name = "ProcMeshStatus",
1542                        status = "FailedToStop",
1543                        "failed to terminate proc mesh: {:?}",
1544                        statuses,
1545                    );
1546                    return Err(anyhow::anyhow!(
1547                        "failed to terminate proc mesh: {:?}",
1548                        statuses,
1549                    ));
1550                }
1551                tracing::info!(name = "ProcMeshStatus", status = "Stopped");
1552            }
1553            Err(complete) => {
1554                // Fill remaining ranks with a timeout status via the
1555                // legacy shim.
1556                let legacy = mesh_to_rankedvalues_with_default(
1557                    &complete,
1558                    Status::Timeout(start_time.elapsed()),
1559                    Status::is_not_exist,
1560                    num_ranks,
1561                );
1562                tracing::error!(
1563                    name = "ProcMeshStatus",
1564                    status = "StoppingTimeout",
1565                    "failed to terminate proc mesh before timeout: {:?}",
1566                    legacy,
1567                );
1568                return Err(anyhow::anyhow!(
1569                    "failed to terminate proc mesh {} before timeout: {:?}",
1570                    proc_mesh_name,
1571                    legacy
1572                ));
1573            }
1574        }
1575        Ok(())
1576    }
1577
1578    /// Get the state of all procs with Name in this host mesh.
1579    /// The procs iterator must be in rank order.
1580    /// The returned ValueMesh will have a non-empty inner state unless there
1581    /// was a timeout reaching the host mesh agent.
1582    #[allow(clippy::result_large_err)]
1583    pub(crate) async fn proc_states(
1584        &self,
1585        cx: &impl context::Actor,
1586        procs: impl IntoIterator<Item = hyperactor_reference::ProcId>,
1587        region: Region,
1588    ) -> crate::Result<ValueMesh<resource::State<ProcState>>> {
1589        let (tx, mut rx) = cx.mailbox().open_port();
1590
1591        let mut num_ranks = 0;
1592        let procs: Vec<hyperactor_reference::ProcId> = procs.into_iter().collect();
1593        let mut proc_names = Vec::new();
1594        for proc_id in procs.iter() {
1595            num_ranks += 1;
1596            let (addr, proc_name) = (proc_id.addr().clone(), proc_id.name().to_string());
1597
1598            // Note that we don't send 1 message per host agent, we send 1 message
1599            // per proc.
1600            let host = HostRef(addr);
1601            let proc_name = proc_name.parse::<Name>()?;
1602            proc_names.push(proc_name.clone());
1603            let mut reply = tx.bind();
1604            // If this proc dies or some other issue renders the reply undeliverable,
1605            // the reply does not need to be returned to the sender.
1606            reply.return_undeliverable(false);
1607            host.mesh_agent()
1608                .send(
1609                    cx,
1610                    resource::GetState {
1611                        name: proc_name,
1612                        reply,
1613                    },
1614                )
1615                .map_err(|e| {
1616                    crate::Error::CallError(host.mesh_agent().actor_id().clone(), e.into())
1617                })?;
1618        }
1619
1620        let mut states = Vec::with_capacity(num_ranks);
1621        let timeout = hyperactor_config::global::get(GET_PROC_STATE_MAX_IDLE);
1622        for _ in 0..num_ranks {
1623            // The agent runs on the same process as the running actor, so if some
1624            // fatal event caused the process to crash (e.g. OOM, signal, process exit),
1625            // the agent will be unresponsive.
1626            // We handle this by setting a timeout on the recv, and if we don't get a
1627            // message we assume the agent is dead and return a failed state.
1628            let state = tokio::time::timeout(timeout, rx.recv()).await;
1629            if let Ok(state) = state {
1630                // Handle non-timeout receiver error.
1631                let state = state?;
1632                match state.state {
1633                    Some(ref inner) => {
1634                        states.push((inner.create_rank, state));
1635                    }
1636                    None => {
1637                        return Err(crate::Error::NotExist(state.name));
1638                    }
1639                }
1640            } else {
1641                // Timeout error, stop reading from the receiver and send back what we have so far,
1642                // padding with failed states.
1643                tracing::warn!(
1644                    "Timeout waiting for response from host mesh agent for proc_states after {:?}",
1645                    timeout
1646                );
1647                let all_ranks = (0..num_ranks).collect::<HashSet<_>>();
1648                let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
1649                let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
1650                assert_eq!(leftover_ranks.len(), num_ranks - states.len());
1651                while states.len() < num_ranks {
1652                    let rank = *leftover_ranks
1653                        .pop()
1654                        .expect("leftover ranks should not be empty");
1655                    states.push((
1656                        // We populate with any ranks leftover at the time of the timeout.
1657                        rank,
1658                        resource::State {
1659                            name: proc_names[rank].clone(),
1660                            status: resource::Status::Timeout(timeout),
1661                            state: None,
1662                            generation: 0,
1663                            timestamp: std::time::SystemTime::now(),
1664                        },
1665                    ));
1666                }
1667                break;
1668            }
1669        }
1670        // Ensure that all ranks have replied. Note that if the mesh is sliced,
1671        // not all create_ranks may be in the mesh.
1672        // Sort by rank, so that the resulting mesh is ordered.
1673        states.sort_by_key(|(rank, _)| *rank);
1674        let vm = states
1675            .into_iter()
1676            .map(|(_, state)| state)
1677            .collect_mesh::<ValueMesh<_>>(region)?;
1678        Ok(vm)
1679    }
1680}
1681
1682/// An ordered set of host entries, deduplicated by `HostAgent` `ActorId`
1683/// in first-seen order.
1684///
1685/// Insertion is idempotent by construction — SA-3 (dedup by ActorId)
1686/// is a property of this type, not a comment on careful control flow.
1687/// First-seen order is preserved: the first occurrence of a given
1688/// ActorId wins; subsequent duplicates are silently dropped.
1689struct HostSet {
1690    seen: HashSet<hyperactor_reference::ActorId>,
1691    entries: Vec<(String, hyperactor_reference::ActorRef<HostAgent>)>,
1692}
1693
1694impl HostSet {
1695    fn new() -> Self {
1696        Self {
1697            seen: HashSet::new(),
1698            entries: Vec::new(),
1699        }
1700    }
1701
1702    /// Insert a host entry. No-op if `ActorId` already present (SA-3).
1703    /// First-seen order is preserved.
1704    fn insert(&mut self, addr: String, agent_ref: hyperactor_reference::ActorRef<HostAgent>) {
1705        if self.seen.insert(agent_ref.actor_id().clone()) {
1706            self.entries.push((addr, agent_ref));
1707        }
1708    }
1709
1710    /// Extend from a `HostMeshRef`. SA-3 applies per entry.
1711    fn extend_from_mesh(&mut self, mesh: &HostMeshRef) {
1712        for h in mesh.hosts() {
1713            self.insert(h.0.to_string(), h.mesh_agent());
1714        }
1715    }
1716
1717    fn into_vec(self) -> Vec<(String, hyperactor_reference::ActorRef<HostAgent>)> {
1718        self.entries
1719    }
1720}
1721
1722/// Ordered union of hosts from meshes and optional client host
1723/// entries, deduplicated by `HostAgent` `ActorId` in first-seen
1724/// order.
1725///
1726/// SA-3 dedup and SA-6 client-host merge are structural properties
1727/// of [`HostSet`], not invariants on this function's control flow.
1728fn aggregate_hosts(
1729    meshes: &[impl AsRef<HostMeshRef>],
1730    client_host_entries: Option<Vec<(String, hyperactor_reference::ActorRef<HostAgent>)>>,
1731) -> Vec<(String, hyperactor_reference::ActorRef<HostAgent>)> {
1732    let mut set = HostSet::new();
1733
1734    // SA-3: dedup across all mesh hosts in first-seen order.
1735    for mesh in meshes {
1736        set.extend_from_mesh(mesh.as_ref());
1737    }
1738
1739    // CH-1 / SA-6: client host entries merged after mesh aggregation.
1740    if let Some(entries) = client_host_entries {
1741        for (addr, agent_ref) in entries {
1742            set.insert(addr, agent_ref);
1743        }
1744    }
1745
1746    set.into_vec()
1747}
1748
1749/// Spawn a [`MeshAdminAgent`] that aggregates hosts from multiple
1750/// meshes.
1751///
1752/// The admin agent runs on the caller's local proc — the `Proc` of
1753/// the actor context `cx`. Hosts are deduplicated by actor ID across
1754/// all meshes.
1755///
1756/// See the `mesh_admin` module doc for the SA-* (spawn/aggregation),
1757/// CH-* (client host), and AI-* (admin identity) invariants.
1758pub async fn spawn_admin(
1759    meshes: impl IntoIterator<Item = impl AsRef<HostMeshRef>>,
1760    cx: &impl hyperactor::context::Actor,
1761    admin_addr: Option<std::net::SocketAddr>,
1762    telemetry_url: Option<String>,
1763) -> anyhow::Result<String> {
1764    let meshes: Vec<_> = meshes.into_iter().collect();
1765    anyhow::ensure!(!meshes.is_empty(), "at least one mesh is required (SA-1)");
1766    for (i, mesh) in meshes.iter().enumerate() {
1767        anyhow::ensure!(
1768            !mesh.as_ref().hosts().is_empty(),
1769            "mesh at index {} has no hosts (SA-2)",
1770            i,
1771        );
1772    }
1773
1774    let client_entries =
1775        crate::global_context::try_this_host().map(|client_host| client_host.host_entries());
1776    let hosts = aggregate_hosts(&meshes, client_entries);
1777
1778    let root_client_id = cx.mailbox().actor_id().clone();
1779
1780    // Spawn the admin on the caller's local proc. Placement now
1781    // follows the caller context rather than mesh topology.
1782    let local_proc = cx.instance().proc();
1783    let agent_handle = local_proc.spawn(
1784        crate::mesh_admin::MESH_ADMIN_ACTOR_NAME,
1785        crate::mesh_admin::MeshAdminAgent::new(
1786            hosts,
1787            Some(root_client_id),
1788            admin_addr,
1789            telemetry_url,
1790        ),
1791    )?;
1792    let response = agent_handle.get_admin_addr(cx).await?;
1793    let addr = response
1794        .addr
1795        .ok_or_else(|| anyhow::anyhow!("mesh admin agent did not report an address"))?;
1796
1797    Ok(addr)
1798}
1799
1800impl view::Ranked for HostMeshRef {
1801    type Item = HostRef;
1802
1803    fn region(&self) -> &Region {
1804        &self.region
1805    }
1806
1807    fn get(&self, rank: usize) -> Option<&Self::Item> {
1808        self.ranks.get(rank)
1809    }
1810}
1811
1812impl view::RankedSliceable for HostMeshRef {
1813    fn sliced(&self, region: Region) -> Self {
1814        let ranks = self
1815            .region()
1816            .remap(&region)
1817            .unwrap()
1818            .map(|index| self.get(index).unwrap().clone());
1819        Self {
1820            bootstrap_command: self.bootstrap_command.clone(),
1821            ..Self::new(self.name.clone(), region, ranks.collect()).unwrap()
1822        }
1823    }
1824}
1825
1826impl std::fmt::Display for HostMeshRef {
1827    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1828        write!(f, "{}:", self.name)?;
1829        for (rank, host) in self.ranks.iter().enumerate() {
1830            if rank > 0 {
1831                write!(f, ",")?;
1832            }
1833            write!(f, "{}", host)?;
1834        }
1835        write!(f, "@{}", self.region)
1836    }
1837}
1838
1839/// The type of error occuring during `HostMeshRef` parsing.
1840#[derive(thiserror::Error, Debug)]
1841pub enum HostMeshRefParseError {
1842    #[error(transparent)]
1843    RegionParseError(#[from] RegionParseError),
1844
1845    #[error("invalid host mesh ref: missing region")]
1846    MissingRegion,
1847
1848    #[error("invalid host mesh ref: missing name")]
1849    MissingName,
1850
1851    #[error(transparent)]
1852    InvalidName(#[from] crate::NameParseError),
1853
1854    #[error(transparent)]
1855    InvalidHostMeshRef(#[from] Box<crate::Error>),
1856
1857    #[error(transparent)]
1858    Other(#[from] anyhow::Error),
1859}
1860
1861impl From<crate::Error> for HostMeshRefParseError {
1862    fn from(err: crate::Error) -> Self {
1863        Self::InvalidHostMeshRef(Box::new(err))
1864    }
1865}
1866
1867impl FromStr for HostMeshRef {
1868    type Err = HostMeshRefParseError;
1869
1870    fn from_str(s: &str) -> Result<Self, Self::Err> {
1871        let (name, rest) = s
1872            .split_once(':')
1873            .ok_or(HostMeshRefParseError::MissingName)?;
1874
1875        let name = Name::from_str(name)?;
1876
1877        let (hosts, region) = rest
1878            .split_once('@')
1879            .ok_or(HostMeshRefParseError::MissingRegion)?;
1880        let hosts = hosts
1881            .split(',')
1882            .map(|host| host.trim())
1883            .map(|host| host.parse::<HostRef>())
1884            .collect::<Result<Vec<_>, _>>()?;
1885        let region = region.parse()?;
1886        Ok(HostMeshRef::new(name, region, hosts)?)
1887    }
1888}
1889
1890#[cfg(test)]
1891mod tests {
1892    use std::assert_matches::assert_matches;
1893    use std::collections::HashSet;
1894    use std::collections::VecDeque;
1895
1896    use hyperactor::config::ENABLE_DEST_ACTOR_REORDERING_BUFFER;
1897    use hyperactor::context::Mailbox as _;
1898    use hyperactor_config::attrs::Attrs;
1899    use itertools::Itertools;
1900    use ndslice::ViewExt;
1901    use ndslice::extent;
1902    use timed_test::async_timed_test;
1903    use tokio::process::Command;
1904
1905    use super::*;
1906    use crate::ActorMesh;
1907    use crate::Bootstrap;
1908    use crate::bootstrap::MESH_TAIL_LOG_LINES;
1909    use crate::comm::ENABLE_NATIVE_V1_CASTING;
1910    use crate::resource::Status;
1911    use crate::testactor;
1912    use crate::testactor::GetConfigAttrs;
1913    use crate::testactor::SetConfigAttrs;
1914    use crate::testing;
1915
1916    #[test]
1917    fn test_host_mesh_subset() {
1918        let hosts: HostMeshRef = "test:local:1,local:2,local:3,local:4@replica=2/2,host=2/1"
1919            .parse()
1920            .unwrap();
1921        assert_eq!(
1922            hosts.range("replica", 1).unwrap().to_string(),
1923            "test:local:3,local:4@2+replica=1/2,host=2/1"
1924        );
1925    }
1926
1927    #[test]
1928    fn test_host_mesh_ref_parse_roundtrip() {
1929        let host_mesh_ref = HostMeshRef::new(
1930            Name::new("test").unwrap(),
1931            extent!(replica = 2, host = 2).into(),
1932            vec![
1933                "tcp:127.0.0.1:123".parse().unwrap(),
1934                "tcp:127.0.0.1:123".parse().unwrap(),
1935                "tcp:127.0.0.1:123".parse().unwrap(),
1936                "tcp:127.0.0.1:123".parse().unwrap(),
1937            ],
1938        )
1939        .unwrap();
1940
1941        assert_eq!(
1942            host_mesh_ref.to_string().parse::<HostMeshRef>().unwrap(),
1943            host_mesh_ref
1944        );
1945    }
1946
1947    #[cfg(fbcode_build)]
1948    async fn execute_allocate(config: &hyperactor_config::global::ConfigLock) {
1949        let poll = Duration::from_secs(3);
1950        let get_actor = Duration::from_mins(1);
1951        let get_proc = Duration::from_mins(1);
1952        // 3m watchdog total: 3m - (poll + get_actor + get_proc) = 180s - 123s = 57s
1953        let slack = Duration::from_secs(57);
1954
1955        let _pdeath_sig =
1956            config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1957        let _poll = config.override_key(crate::mesh_controller::SUPERVISION_POLL_FREQUENCY, poll);
1958        let _get_actor = config.override_key(crate::proc_mesh::GET_ACTOR_STATE_MAX_IDLE, get_actor);
1959        let _get_proc = config.override_key(crate::host_mesh::GET_PROC_STATE_MAX_IDLE, get_proc);
1960
1961        // Must be >= poll + get_actor + get_proc (+ slack).
1962        let _watchdog = config.override_key(
1963            crate::actor_mesh::SUPERVISION_WATCHDOG_TIMEOUT,
1964            poll + get_actor + get_proc + slack,
1965        );
1966
1967        let instance = testing::instance();
1968
1969        for alloc in testing::allocs(extent!(replicas = 4)).await {
1970            let mut host_mesh = HostMesh::allocate(instance, alloc, "test", None)
1971                .await
1972                .unwrap();
1973
1974            let proc_mesh1 = host_mesh
1975                .spawn(instance, "test_1", Extent::unity(), None)
1976                .await
1977                .unwrap();
1978
1979            let actor_mesh1: ActorMesh<testactor::TestActor> =
1980                proc_mesh1.spawn(instance, "test", &()).await.unwrap();
1981
1982            let proc_mesh2 = host_mesh
1983                .spawn(instance, "test_2", extent!(gpus = 3, extra = 2), None)
1984                .await
1985                .unwrap();
1986            assert_eq!(
1987                proc_mesh2.extent(),
1988                extent!(replicas = 4, gpus = 3, extra = 2)
1989            );
1990            assert_eq!(proc_mesh2.values().count(), 24);
1991
1992            let actor_mesh2: ActorMesh<testactor::TestActor> =
1993                proc_mesh2.spawn(instance, "test", &()).await.unwrap();
1994            assert_eq!(
1995                actor_mesh2.extent(),
1996                extent!(replicas = 4, gpus = 3, extra = 2)
1997            );
1998            assert_eq!(actor_mesh2.values().count(), 24);
1999
2000            // Host meshes can be dereferenced to produce a concrete ref.
2001            let host_mesh_ref: HostMeshRef = host_mesh.clone();
2002            // Here, the underlying host mesh does not change:
2003            assert_eq!(
2004                host_mesh_ref.iter().collect::<Vec<_>>(),
2005                host_mesh.iter().collect::<Vec<_>>(),
2006            );
2007
2008            // Validate we can cast:
2009            for actor_mesh in [&actor_mesh1, &actor_mesh2] {
2010                let (port, mut rx) = instance.mailbox().open_port();
2011                actor_mesh
2012                    .cast(instance, testactor::GetActorId(port.bind()))
2013                    .unwrap();
2014
2015                let mut expected_actor_ids: HashSet<_> = actor_mesh
2016                    .values()
2017                    .map(|actor_ref| actor_ref.actor_id().clone())
2018                    .collect();
2019
2020                while !expected_actor_ids.is_empty() {
2021                    let (actor_id, _seq) = rx.recv().await.unwrap();
2022                    assert!(
2023                        expected_actor_ids.remove(&actor_id),
2024                        "got {actor_id}, expect {expected_actor_ids:?}"
2025                    );
2026                }
2027            }
2028
2029            // Now forward a message through all directed edges across the two meshes.
2030            // This tests the full connectivity of all the hosts, procs, and actors
2031            // involved in these two meshes.
2032            let mut to_visit: VecDeque<_> = actor_mesh1
2033                .values()
2034                .chain(actor_mesh2.values())
2035                .map(|actor_ref| actor_ref.port())
2036                // Each ordered pair of ports
2037                .permutations(2)
2038                // Flatten them to create a path:
2039                .flatten()
2040                .collect();
2041
2042            let expect_visited: Vec<_> = to_visit.clone().into();
2043
2044            // We are going to send to the first, and then set up a port to receive the last.
2045            let (last, mut last_rx) = instance.mailbox().open_port();
2046            to_visit.push_back(last.bind());
2047
2048            let forward = testactor::Forward {
2049                to_visit,
2050                visited: Vec::new(),
2051            };
2052            let first = forward.to_visit.front().unwrap().clone();
2053            first.send(instance, forward).unwrap();
2054
2055            let forward = last_rx.recv().await.unwrap();
2056            assert_eq!(forward.visited, expect_visited);
2057
2058            let _ = host_mesh.shutdown(&instance).await;
2059        }
2060    }
2061
2062    #[async_timed_test(timeout_secs = 600)]
2063    #[cfg(fbcode_build)]
2064    async fn test_allocate_dest_reorder_buffer_off() {
2065        let config = hyperactor_config::global::lock();
2066        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
2067        execute_allocate(&config).await;
2068    }
2069
2070    #[async_timed_test(timeout_secs = 600)]
2071    #[cfg(fbcode_build)]
2072    async fn test_allocate_dest_reorder_buffer_on() {
2073        let config = hyperactor_config::global::lock();
2074        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
2075        let _guard1 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
2076        execute_allocate(&config).await;
2077    }
2078
2079    /// Allocate a new port on localhost. This drops the listener, releasing the socket,
2080    /// before returning. Hyperactor's channel::net applies SO_REUSEADDR, so we do not hav
2081    /// to wait out the socket's TIMED_WAIT state.
2082    ///
2083    /// Even so, this is racy.
2084    fn free_localhost_addr() -> ChannelAddr {
2085        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
2086        ChannelAddr::Tcp(listener.local_addr().unwrap())
2087    }
2088
2089    #[cfg(fbcode_build)]
2090    async fn execute_extrinsic_allocation(config: &hyperactor_config::global::ConfigLock) {
2091        let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
2092
2093        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
2094
2095        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
2096
2097        let mut children = Vec::new();
2098        for host in hosts.iter() {
2099            let mut cmd = Command::new(program.clone());
2100            let boot = Bootstrap::Host {
2101                addr: host.clone(),
2102                command: None, // use current binary
2103                config: None,
2104                exit_on_shutdown: false,
2105            };
2106            boot.to_env(&mut cmd);
2107            cmd.kill_on_drop(true);
2108            children.push(cmd.spawn().unwrap());
2109        }
2110
2111        let instance = testing::instance();
2112        let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
2113
2114        let proc_mesh = host_mesh
2115            .spawn(&testing::instance(), "test", Extent::unity(), None)
2116            .await
2117            .unwrap();
2118
2119        let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
2120            .spawn(&testing::instance(), "test", &())
2121            .await
2122            .unwrap();
2123
2124        testactor::assert_mesh_shape(actor_mesh).await;
2125
2126        HostMesh::take(host_mesh)
2127            .shutdown(&instance)
2128            .await
2129            .expect("hosts shutdown");
2130    }
2131
2132    #[tokio::test]
2133    #[cfg(fbcode_build)]
2134    async fn test_extrinsic_allocation_v0() {
2135        let config = hyperactor_config::global::lock();
2136        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
2137        execute_extrinsic_allocation(&config).await;
2138    }
2139
2140    #[tokio::test]
2141    #[cfg(fbcode_build)]
2142    async fn test_extrinsic_allocation_v1() {
2143        let config = hyperactor_config::global::lock();
2144        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
2145        let _guard1 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
2146        execute_extrinsic_allocation(&config).await;
2147    }
2148
2149    #[tokio::test]
2150    #[cfg(fbcode_build)]
2151    async fn test_failing_proc_allocation() {
2152        let lock = hyperactor_config::global::lock();
2153        let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
2154
2155        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
2156
2157        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
2158
2159        let mut children = Vec::new();
2160        for host in hosts.iter() {
2161            let mut cmd = Command::new(program.clone());
2162            let boot = Bootstrap::Host {
2163                addr: host.clone(),
2164                config: None,
2165                // The entire purpose of this is to fail:
2166                command: Some(BootstrapCommand::from("false")),
2167                exit_on_shutdown: false,
2168            };
2169            boot.to_env(&mut cmd);
2170            cmd.kill_on_drop(true);
2171            children.push(cmd.spawn().unwrap());
2172        }
2173        let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
2174
2175        let instance = testing::instance();
2176
2177        let err = host_mesh
2178            .spawn(&instance, "test", Extent::unity(), None)
2179            .await
2180            .unwrap_err();
2181        assert_matches!(
2182            err,
2183            crate::Error::ProcCreationError { state, .. }
2184            if matches!(state.status, resource::Status::Failed(ref msg) if msg.contains("failed to configure process: Ready(Terminal(Stopped { exit_code: 1"))
2185        );
2186    }
2187
2188    #[tokio::test]
2189    #[cfg(fbcode_build)]
2190    async fn test_halting_proc_allocation() {
2191        let config = hyperactor_config::global::lock();
2192        let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(20));
2193
2194        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
2195
2196        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
2197
2198        let mut children = Vec::new();
2199
2200        for (index, host) in hosts.iter().enumerate() {
2201            let mut cmd = Command::new(program.clone());
2202            let command = if index == 0 {
2203                let mut command = BootstrapCommand::from("sleep");
2204                command.args.push("60".to_string());
2205                Some(command)
2206            } else {
2207                None
2208            };
2209            let boot = Bootstrap::Host {
2210                addr: host.clone(),
2211                config: None,
2212                command,
2213                exit_on_shutdown: false,
2214            };
2215            boot.to_env(&mut cmd);
2216            cmd.kill_on_drop(true);
2217            children.push(cmd.spawn().unwrap());
2218        }
2219        let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
2220
2221        let instance = testing::instance();
2222
2223        let err = host_mesh
2224            .spawn(&instance, "test", Extent::unity(), None)
2225            .await
2226            .unwrap_err();
2227        let statuses = err.into_proc_spawn_error().unwrap();
2228        assert_matches!(
2229            &statuses.materialized_iter(2).cloned().collect::<Vec<_>>()[..],
2230            &[Status::Timeout(_), Status::Running]
2231        );
2232    }
2233
2234    #[tokio::test]
2235    #[cfg(fbcode_build)]
2236    async fn test_client_config_override() {
2237        let config = hyperactor_config::global::lock();
2238        let _guard1 = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
2239        let _guard2 = config.override_key(
2240            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2241            Duration::from_mins(2),
2242        );
2243        let _guard3 = config.override_key(
2244            hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
2245            Duration::from_mins(1),
2246        );
2247
2248        // Unset env vars that were mirrored by TestOverride, so child
2249        // processes don't inherit them. This allows Runtime layer to
2250        // override ClientOverride. SAFETY: Single-threaded test under
2251        // global config lock.
2252        unsafe {
2253            std::env::remove_var("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT");
2254            std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT");
2255        }
2256
2257        let instance = testing::instance();
2258
2259        let mut hm = testing::host_mesh(2).await;
2260        let proc_mesh = hm
2261            .spawn(instance, "test", Extent::unity(), None)
2262            .await
2263            .unwrap();
2264
2265        let actor_mesh: ActorMesh<testactor::TestActor> =
2266            proc_mesh.spawn(instance, "test", &()).await.unwrap();
2267
2268        let mut attrs_override = Attrs::new();
2269        attrs_override.set(
2270            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2271            Duration::from_mins(3),
2272        );
2273        actor_mesh
2274            .cast(
2275                instance,
2276                SetConfigAttrs(bincode::serialize(&attrs_override).unwrap()),
2277            )
2278            .unwrap();
2279
2280        let (tx, mut rx) = instance.open_port();
2281        actor_mesh
2282            .cast(instance, GetConfigAttrs(tx.bind()))
2283            .unwrap();
2284        let actual_attrs = rx.recv().await.unwrap();
2285        let actual_attrs = bincode::deserialize::<Attrs>(&actual_attrs).unwrap();
2286
2287        assert_eq!(
2288            *actual_attrs
2289                .get(hyperactor::config::HOST_SPAWN_READY_TIMEOUT)
2290                .unwrap(),
2291            Duration::from_mins(3)
2292        );
2293        assert_eq!(
2294            *actual_attrs
2295                .get(hyperactor::config::MESSAGE_DELIVERY_TIMEOUT)
2296                .unwrap(),
2297            Duration::from_mins(1)
2298        );
2299
2300        let _ = hm.shutdown(instance).await;
2301    }
2302
2303    // ---- SA-* invariant tests ----
2304
2305    #[tokio::test]
2306    async fn test_sa1_empty_mesh_set_rejected() {
2307        let instance = testing::instance();
2308        let result = spawn_admin(std::iter::empty::<&HostMeshRef>(), instance, None, None).await;
2309        let err = result.unwrap_err().to_string();
2310        assert!(err.contains("SA-1"), "expected SA-1 error, got: {err}");
2311    }
2312
2313    #[tokio::test]
2314    async fn test_sa2_empty_hosts_rejected() {
2315        let instance = testing::instance();
2316        let mesh = HostMeshRef::from_hosts(Name::new("empty").unwrap(), vec![]);
2317        let result = spawn_admin([&mesh], instance, None, None).await;
2318        let err = result.unwrap_err().to_string();
2319        assert!(err.contains("SA-2"), "expected SA-2 error, got: {err}");
2320    }
2321
2322    /// SA-3: `HostSet::insert` is idempotent — inserting the same
2323    /// `ActorId` twice does not add a duplicate entry, and first-seen
2324    /// order is preserved. This is a structural property of `HostSet`,
2325    /// not an invariant on `aggregate_hosts` control flow.
2326    #[test]
2327    fn test_sa3_host_set_insert_idempotent() {
2328        let addr_a: ChannelAddr = "tcp:127.0.0.1:2001".parse().unwrap();
2329        let addr_b: ChannelAddr = "tcp:127.0.0.1:2002".parse().unwrap();
2330
2331        let ref_a = HostRef(addr_a.clone()).mesh_agent();
2332        let ref_b = HostRef(addr_b.clone()).mesh_agent();
2333
2334        let mut set = HostSet::new();
2335        set.insert(addr_a.to_string(), ref_a.clone());
2336        set.insert(addr_b.to_string(), ref_b.clone());
2337        // Insert ref_a again — should be a no-op (SA-3).
2338        set.insert("duplicate_addr".to_string(), ref_a.clone());
2339
2340        let result = set.into_vec();
2341        assert_eq!(
2342            result.len(),
2343            2,
2344            "SA-3: duplicate ActorId must not add entry"
2345        );
2346        assert_eq!(
2347            result[0].0,
2348            addr_a.to_string(),
2349            "SA-3: first-seen order preserved"
2350        );
2351        assert_eq!(
2352            result[1].0,
2353            addr_b.to_string(),
2354            "SA-3: first-seen order preserved"
2355        );
2356    }
2357
2358    #[test]
2359    fn test_sa3_aggregate_hosts_dedup() {
2360        let addr_a: ChannelAddr = "tcp:127.0.0.1:1001".parse().unwrap();
2361        let addr_b: ChannelAddr = "tcp:127.0.0.1:1002".parse().unwrap();
2362        let addr_c: ChannelAddr = "tcp:127.0.0.1:1003".parse().unwrap();
2363
2364        // mesh_a: hosts a, b
2365        let mesh_a = HostMeshRef::from_hosts(
2366            Name::new("mesh_a").unwrap(),
2367            vec![addr_a.clone(), addr_b.clone()],
2368        );
2369        // mesh_b: hosts b, c  (b overlaps with mesh_a)
2370        let mesh_b = HostMeshRef::from_hosts(
2371            Name::new("mesh_b").unwrap(),
2372            vec![addr_b.clone(), addr_c.clone()],
2373        );
2374
2375        let result = aggregate_hosts(&[&mesh_a, &mesh_b], None);
2376
2377        // 3 unique hosts: a, b, c — b is deduplicated.
2378        assert_eq!(result.len(), 3, "expected 3 hosts, got {:?}", result);
2379
2380        // First-seen order: a (mesh_a[0]), b (mesh_a[1]), c (mesh_b[1]).
2381        let addrs: Vec<String> = result.iter().map(|(a, _)| a.clone()).collect();
2382        assert_eq!(addrs[0], addr_a.to_string());
2383        assert_eq!(addrs[1], addr_b.to_string());
2384        assert_eq!(addrs[2], addr_c.to_string());
2385    }
2386
2387    /// SA-6 / CH-1: client host entries are deduplicated against the
2388    /// already-aggregated mesh host set.
2389    #[test]
2390    fn test_sa6_ch1_client_host_dedup() {
2391        let addr_a: ChannelAddr = "tcp:127.0.0.1:1001".parse().unwrap();
2392        let addr_b: ChannelAddr = "tcp:127.0.0.1:1002".parse().unwrap();
2393
2394        let mesh = HostMeshRef::from_hosts(
2395            Name::new("mesh").unwrap(),
2396            vec![addr_a.clone(), addr_b.clone()],
2397        );
2398
2399        // Client host entry overlaps with addr_a.
2400        let client_ref = HostRef(addr_a.clone()).mesh_agent();
2401        let client_entries = vec![("client_addr".to_string(), client_ref)];
2402
2403        let result = aggregate_hosts(&[&mesh], Some(client_entries));
2404
2405        // addr_a already in mesh — client entry is deduplicated.
2406        assert_eq!(result.len(), 2, "expected 2 hosts, got {:?}", result);
2407        let addrs: Vec<String> = result.iter().map(|(a, _)| a.clone()).collect();
2408        assert_eq!(addrs[0], addr_a.to_string());
2409        assert_eq!(addrs[1], addr_b.to_string());
2410    }
2411}