hyperactor_mesh/
host_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use hyperactor::Actor;
10use hyperactor::Handler;
11use hyperactor::accum::StreamingReducerOpts;
12use hyperactor::channel::ChannelTransport;
13use hyperactor::host::Host;
14use hyperactor::host::LocalProcManager;
15use hyperactor::host::SERVICE_PROC_NAME;
16use hyperactor_config::CONFIG;
17use hyperactor_config::ConfigAttr;
18use hyperactor_config::attrs::declare_attrs;
19use ndslice::view::CollectMeshExt;
20
21use crate::supervision::MeshFailure;
22
23pub mod host_agent;
24
25use std::collections::HashSet;
26use std::hash::Hash;
27use std::ops::Deref;
28use std::str::FromStr;
29use std::sync::Arc;
30use std::time::Duration;
31
32use hyperactor::channel::ChannelAddr;
33use hyperactor::context;
34use hyperactor::reference as hyperactor_reference;
35use ndslice::Extent;
36use ndslice::Region;
37use ndslice::ViewExt;
38use ndslice::extent;
39use ndslice::view;
40use ndslice::view::Ranked;
41use ndslice::view::RegionParseError;
42use serde::Deserialize;
43use serde::Serialize;
44use typeuri::Named;
45
46use crate::Bootstrap;
47use crate::Name;
48use crate::ProcMesh;
49use crate::ProcMeshRef;
50use crate::ValueMesh;
51use crate::alloc::Alloc;
52use crate::bootstrap::BootstrapCommand;
53use crate::bootstrap::BootstrapProcManager;
54pub use crate::host_mesh::host_agent::HostAgent;
55use crate::host_mesh::host_agent::HostAgentMode;
56use crate::host_mesh::host_agent::HostMeshAgentProcMeshTrampoline;
57use crate::host_mesh::host_agent::ProcManagerSpawnFn;
58use crate::host_mesh::host_agent::ProcState;
59use crate::host_mesh::host_agent::SetClientConfigClient;
60use crate::host_mesh::host_agent::ShutdownHostClient;
61use crate::host_mesh::host_agent::SpawnMeshAdminClient;
62use crate::mesh_controller::HostMeshController;
63use crate::mesh_controller::ProcMeshController;
64use crate::proc_agent::ProcAgent;
65use crate::proc_mesh::ProcRef;
66use crate::resource;
67use crate::resource::CreateOrUpdateClient;
68use crate::resource::GetRankStatus;
69use crate::resource::GetRankStatusClient;
70use crate::resource::ProcSpec;
71use crate::resource::RankedValues;
72use crate::resource::Status;
73use crate::transport::DEFAULT_TRANSPORT;
74
75/// Actor name for `HostMeshController` when spawned as a named child.
76pub const HOST_MESH_CONTROLLER_NAME: &str = "host_mesh_controller";
77
78/// Actor name for `ProcMeshController` when spawned as a named child.
79pub const PROC_MESH_CONTROLLER_NAME: &str = "proc_mesh_controller";
80
81declare_attrs! {
82    /// The maximum idle time between updates while spawning proc
83    /// meshes.
84    @meta(CONFIG = ConfigAttr::new(
85        Some("HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()),
86        Some("mesh_proc_spawn_max_idle".to_string()),
87    ))
88    pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
89
90    /// The maximum idle time between updates while stopping proc
91    /// meshes.
92    @meta(CONFIG = ConfigAttr::new(
93        Some("HYPERACTOR_MESH_PROC_STOP_MAX_IDLE".to_string()),
94        Some("proc_stop_max_idle".to_string()),
95    ))
96    pub attr PROC_STOP_MAX_IDLE: Duration = Duration::from_secs(30);
97
98    /// The maximum idle time between updates while querying host meshes
99    /// for their proc states.
100    @meta(CONFIG = ConfigAttr::new(
101        Some("HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE".to_string()),
102        Some("get_proc_state_max_idle".to_string()),
103    ))
104    pub attr GET_PROC_STATE_MAX_IDLE: Duration = Duration::from_mins(1);
105}
106
107/// A reference to a single host.
108#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
109pub struct HostRef(ChannelAddr);
110wirevalue::register_type!(HostRef);
111
112impl HostRef {
113    /// The host mesh agent associated with this host.
114    fn mesh_agent(&self) -> hyperactor_reference::ActorRef<HostAgent> {
115        hyperactor_reference::ActorRef::attest(
116            self.service_proc()
117                .actor_id(host_agent::HOST_MESH_AGENT_ACTOR_NAME, 0),
118        )
119    }
120
121    /// The ProcId for the proc with name `name` on this host.
122    fn named_proc(&self, name: &Name) -> hyperactor_reference::ProcId {
123        hyperactor_reference::ProcId::with_name(self.0.clone(), name.to_string())
124    }
125
126    /// The service proc on this host.
127    fn service_proc(&self) -> hyperactor_reference::ProcId {
128        hyperactor_reference::ProcId::with_name(self.0.clone(), SERVICE_PROC_NAME)
129    }
130
131    /// Request an orderly teardown of this host and all procs it
132    /// spawned.
133    ///
134    /// This resolves the per-child grace **timeout** and the maximum
135    /// termination **concurrency** from config and sends a
136    /// [`ShutdownHost`] message to the host's agent. The agent then:
137    ///
138    /// 1) Performs a graceful termination pass over all tracked
139    ///    children (TERM → wait(`timeout`) → KILL), with at most
140    ///    `max_in_flight` running concurrently.
141    /// 2) After the pass completes, **drops the Host**, which also
142    ///    drops the embedded `BootstrapProcManager`. The manager's
143    ///    `Drop` serves as a last-resort safety net (it SIGKILLs
144    ///    anything that somehow remains).
145    ///
146    /// This call returns `Ok(()))` only after the agent has finished
147    /// the termination pass and released the host, so the host is no
148    /// longer reachable when this returns.
149    pub(crate) async fn shutdown(
150        &self,
151        cx: &impl hyperactor::context::Actor,
152    ) -> anyhow::Result<()> {
153        let agent = self.mesh_agent();
154        let terminate_timeout =
155            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
156        let max_in_flight =
157            hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
158        agent
159            .shutdown_host(cx, terminate_timeout, max_in_flight.clamp(1, 256))
160            .await?;
161        Ok(())
162    }
163}
164
165impl TryFrom<hyperactor_reference::ActorRef<HostAgent>> for HostRef {
166    type Error = crate::Error;
167
168    fn try_from(value: hyperactor_reference::ActorRef<HostAgent>) -> Result<Self, crate::Error> {
169        let proc_id = value.actor_id().proc_id();
170        Ok(HostRef(proc_id.addr().clone()))
171    }
172}
173
174impl std::fmt::Display for HostRef {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        self.0.fmt(f)
177    }
178}
179
180impl FromStr for HostRef {
181    type Err = <ChannelAddr as FromStr>::Err;
182
183    fn from_str(s: &str) -> Result<Self, Self::Err> {
184        Ok(HostRef(ChannelAddr::from_str(s)?))
185    }
186}
187
188/// An owned mesh of hosts.
189///
190/// # Lifecycle
191/// `HostMesh` owns host lifecycles. Callers **must** invoke
192/// [`HostMesh::shutdown`] for deterministic teardown. The `Drop` impl
193/// performs **best-effort** cleanup only (spawned via Tokio if
194/// available); it is a safety net, not a substitute for orderly
195/// shutdown.
196///
197/// In tests and production, prefer explicit shutdown to guarantee
198/// that host agents drop their `BootstrapProcManager`s and that all
199/// child procs are reaped.
200#[allow(dead_code)]
201pub struct HostMesh {
202    name: Name,
203    extent: Extent,
204    allocation: HostMeshAllocation,
205    current_ref: HostMeshRef,
206}
207
208/// Allocation backing for an owned [`HostMesh`].
209///
210/// This enum records how the underlying hosts were provisioned, which
211/// in turn determines how their lifecycle is managed:
212///
213/// - `ProcMesh`: Hosts were allocated intrinsically via a
214///   [`ProcMesh`]. The `HostMesh` owns the proc mesh and its service
215///   procs, and dropping the mesh ensures that all spawned child procs
216///   are terminated.
217/// - `Owned`: Hosts were constructed externally and "taken" under
218///   ownership. The `HostMesh` assumes responsibility for their
219///   lifecycle from this point forward, ensuring consistent cleanup on
220///   drop.
221///
222/// Additional variants may be added for other provisioning sources,
223/// but in all cases `HostMesh` is an owned resource that guarantees
224/// no leaked child processes.
225#[allow(dead_code)]
226enum HostMeshAllocation {
227    /// Hosts were allocated intrinsically via a [`ProcMesh`].
228    ///
229    /// In this mode, the `HostMesh` owns both the `ProcMesh` itself
230    /// and the service procs that implement each host. Dropping the
231    /// `HostMesh` also drops the embedded `ProcMesh`, ensuring that
232    /// all spawned child procs are terminated cleanly.
233    ProcMesh {
234        proc_mesh: ProcMesh,
235        proc_mesh_ref: ProcMeshRef,
236        hosts: Vec<HostRef>,
237    },
238    /// Hosts were constructed externally and explicitly transferred
239    /// under ownership by this `HostMesh`.
240    ///
241    /// In this mode, the `HostMesh` assumes responsibility for the
242    /// provided hosts going forward. Dropping the mesh guarantees
243    /// teardown of all associated state and signals to prevent any
244    /// leaked processes.
245    Owned { hosts: Vec<HostRef> },
246}
247
248impl HostMesh {
249    /// Emit a telemetry event for this host mesh creation.
250    fn notify_created(&self) {
251        let name_str = self.name.to_string();
252        let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
253
254        hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
255            id: mesh_id_hash,
256            timestamp: std::time::SystemTime::now(),
257            class: "Host".to_string(),
258            given_name: self.name.name().to_string(),
259            full_name: name_str,
260            shape_json: serde_json::to_string(&self.extent).unwrap_or_default(),
261            parent_mesh_id: None,
262            parent_view_json: None,
263        });
264
265        // Notify telemetry of each HostAgent actor in this mesh.
266        // These are skipped in Proc::spawn_inner. mesh_id directly points to host mesh.
267        let now = std::time::SystemTime::now();
268        for (rank, host) in self.current_ref.hosts().iter().enumerate() {
269            let actor = host.mesh_agent();
270            hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
271                id: hyperactor_telemetry::hash_to_u64(actor.actor_id()),
272                timestamp: now,
273                mesh_id: mesh_id_hash,
274                rank: rank as u64,
275                full_name: actor.actor_id().to_string(),
276                display_name: None,
277            });
278        }
279    }
280
281    /// Bring up a local single-host mesh and, in the launcher
282    /// process, return a `HostMesh` handle for it.
283    ///
284    /// There are two execution modes:
285    ///
286    /// - bootstrap-child mode: if `Bootstrap::get_from_env()` says
287    ///   this process was launched as a bootstrap child, we call
288    ///   `boot.bootstrap().await`, which hands control to the
289    ///   bootstrap logic for this process (as defined by the
290    ///   `BootstrapCommand` the parent used to spawn it). if that
291    ///   call returns, we log the error and terminate. this branch
292    ///   does not produce a `HostMesh`.
293    ///
294    /// - launcher mode: otherwise, we are the process that is setting
295    ///   up the mesh. we create a `Host`, spawn a `HostAgent` in
296    ///   it, and build a single-host `HostMesh` around that. that
297    ///   `HostMesh` is returned to the caller.
298    ///
299    /// This API is intended for tests, examples, and local bring-up,
300    /// not production.
301    ///
302    /// TODO: fix up ownership
303    pub async fn local() -> crate::Result<HostMesh> {
304        Self::local_with_bootstrap(BootstrapCommand::current()?).await
305    }
306
307    /// Same as [`local`], but the caller supplies the
308    /// `BootstrapCommand` instead of deriving it from the current
309    /// process.
310    ///
311    /// The provided `bootstrap_cmd` is used when spawning bootstrap
312    /// children and determines the behavior of
313    /// `boot.bootstrap().await` in those children.
314    pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> crate::Result<HostMesh> {
315        if let Ok(Some(boot)) = Bootstrap::get_from_env() {
316            let result = boot.bootstrap().await;
317            if let Err(err) = result {
318                tracing::error!("failed to bootstrap local host mesh process: {}", err);
319            }
320            std::process::exit(1);
321        }
322
323        let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
324
325        let manager = BootstrapProcManager::new(bootstrap_cmd)?;
326        let host = Host::new(manager, addr).await?;
327        let addr = host.addr().clone();
328        let system_proc = host.system_proc().clone();
329        let host_mesh_agent = system_proc
330            .spawn(
331                "host_agent",
332                HostAgent::new(HostAgentMode::Process {
333                    host,
334                    shutdown_tx: None,
335                }),
336            )
337            .map_err(crate::Error::SingletonActorSpawnError)?;
338        host_mesh_agent.bind::<HostAgent>();
339
340        let host = HostRef(addr);
341        let host_mesh_ref = HostMeshRef::new(
342            Name::new("local").unwrap(),
343            extent!(hosts = 1).into(),
344            vec![host],
345        )?;
346        Ok(HostMesh::take(host_mesh_ref))
347    }
348
349    /// Create a local in-process host mesh where all procs run in the
350    /// current OS process.
351    ///
352    /// Unlike [`local`] which spawns child processes for each proc,
353    /// this method uses [`LocalProcManager`] to run everything
354    /// in-process. This makes all actors visible in the admin tree
355    /// (useful for debugging with the TUI).
356    ///
357    /// This API is intended for tests, examples, and debugging.
358    pub async fn local_in_process() -> crate::Result<HostMesh> {
359        let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
360        Ok(HostMesh::take(Self::local_n_in_process(vec![addr]).await?))
361    }
362
363    /// Create a local in-process host mesh with multiple hosts, where
364    /// all procs run in the current OS process using [`LocalProcManager`].
365    ///
366    /// Each address in `addrs` becomes a separate host. The resulting
367    /// mesh has `extent!(hosts = addrs.len())`.
368    ///
369    /// This API is intended for unit tests that need a multi-host mesh
370    /// within a single process.
371    pub(crate) async fn local_n_in_process(addrs: Vec<ChannelAddr>) -> crate::Result<HostMeshRef> {
372        let n = addrs.len();
373        let mut host_refs = Vec::with_capacity(n);
374        for addr in addrs {
375            host_refs.push(Self::create_in_process_host(addr).await?);
376        }
377        HostMeshRef::new(
378            Name::new("local").unwrap(),
379            extent!(hosts = n).into(),
380            host_refs,
381        )
382    }
383
384    /// Create a single in-process host at the given address, returning
385    /// a [`HostRef`] for it.
386    async fn create_in_process_host(addr: ChannelAddr) -> crate::Result<HostRef> {
387        let spawn: ProcManagerSpawnFn =
388            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
389        let manager = LocalProcManager::new(spawn);
390        let host = Host::new(manager, addr).await?;
391        let addr = host.addr().clone();
392        let system_proc = host.system_proc().clone();
393        let host_mesh_agent = system_proc
394            .spawn(
395                host_agent::HOST_MESH_AGENT_ACTOR_NAME,
396                HostAgent::new(HostAgentMode::Local(host)),
397            )
398            .map_err(crate::Error::SingletonActorSpawnError)?;
399        host_mesh_agent.bind::<HostAgent>();
400        Ok(HostRef(addr))
401    }
402
403    /// Create a new process-based host mesh. Each host is represented by a local process,
404    /// which manages its set of procs. This is not a true host mesh the sense that each host
405    /// is not independent. The intent of `process` is for testing, examples, and experimentation.
406    ///
407    /// The bootstrap command is used to bootstrap both hosts and processes, thus it should be
408    /// a command that reaches [`crate::bootstrap_or_die`]. `process` is itself a valid bootstrap
409    /// entry point; thus using `BootstrapCommand::current` works correctly as long as `process`
410    /// is called early in the lifecycle of the process and reached unconditionally.
411    ///
412    /// TODO: thread through ownership
413    pub async fn process(extent: Extent, command: BootstrapCommand) -> crate::Result<HostMesh> {
414        if let Ok(Some(boot)) = Bootstrap::get_from_env() {
415            let result = boot.bootstrap().await;
416            if let Err(err) = result {
417                tracing::error!("failed to bootstrap process host mesh process: {}", err);
418            }
419            std::process::exit(1);
420        }
421
422        let bind_spec = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT);
423        let mut hosts = Vec::with_capacity(extent.num_ranks());
424        for _ in 0..extent.num_ranks() {
425            // Note: this can be racy. Possibly we should have a callback channel.
426            let addr = bind_spec.binding_addr();
427            let bootstrap = Bootstrap::Host {
428                addr: addr.clone(),
429                command: Some(command.clone()),
430                config: Some(hyperactor_config::global::attrs()),
431                exit_on_shutdown: false,
432            };
433
434            let mut cmd = command.new();
435            bootstrap.to_env(&mut cmd);
436            cmd.spawn()?;
437            hosts.push(HostRef(addr));
438        }
439
440        let host_mesh_ref = HostMeshRef::new(Name::new("process").unwrap(), extent.into(), hosts)?;
441        Ok(HostMesh::take(host_mesh_ref))
442    }
443
444    /// Allocate a host mesh from an [`Alloc`]. This creates a HostMesh with the same extent
445    /// as the provided alloc. Allocs generate procs, and thus we define and run a Host for each
446    /// proc allocated by it.
447    ///
448    /// ## Allocation strategy
449    ///
450    /// Because HostMeshes use direct-addressed procs, and must fully control the procs they are
451    /// managing, `HostMesh::allocate` uses a trampoline actor to launch the host, which in turn
452    /// runs a [`crate::host_mesh::host_agent::HostAgent`] actor to manage the host itself.
453    /// The host (and thus all of its procs) are exposed directly through a separate listening
454    /// channel, established by the host.
455    ///
456    /// ```text
457    ///                        ┌ ─ ─┌────────────────────┐
458    ///                             │allocated Proc:     │
459    ///                        │    │ ┌─────────────────┐│
460    ///                             │ │TrampolineActor  ││
461    ///                        │    │ │ ┌──────────────┐││
462    ///                             │ │ │Host          │││
463    ///               ┌────┬ ─ ┘    │ │ │ ┌──────────┐ │││
464    ///            ┌─▶│Proc│        │ │ │ │HostAgent │ │││
465    ///            │  └────┴ ─ ┐    │ │ │ └──────────┘ │││
466    ///            │  ┌────┐        │ │ │             ██████
467    /// ┌────────┐ ├─▶│Proc│   │    │ │ └──────────────┘││ ▲
468    /// │ Client │─┤  └────┘        │ └─────────────────┘│ listening channel
469    /// └────────┘ │  ┌────┐   └ ─ ─└────────────────────┘
470    ///            ├─▶│Proc│
471    ///            │  └────┘
472    ///            │  ┌────┐
473    ///            └─▶│Proc│
474    ///               └────┘
475    ///                 ▲
476    ///
477    ///          `Alloc`-provided
478    ///                procs
479    /// ```
480    ///
481    /// ## Lifecycle
482    ///
483    /// The returned `HostMesh` **owns** the underlying hosts. Call
484    /// [`shutdown`](Self::shutdown) to deterministically tear them
485    /// down. If you skip shutdown, `Drop` will attempt best-effort
486    /// cleanup only. Do not rely on `Drop` for correctness.
487    pub async fn allocate<C: context::Actor>(
488        cx: &C,
489        alloc: Box<dyn Alloc + Send + Sync>,
490        name: &str,
491        bootstrap_params: Option<BootstrapCommand>,
492    ) -> crate::Result<Self>
493    where
494        C::A: Handler<MeshFailure>,
495    {
496        Self::allocate_inner(cx, alloc, Name::new(name)?, bootstrap_params).await
497    }
498
499    // Use allocate_inner to set field mesh_name in span
500    #[hyperactor::instrument(fields(host_mesh=name.to_string()))]
501    async fn allocate_inner<C: context::Actor>(
502        cx: &C,
503        alloc: Box<dyn Alloc + Send + Sync>,
504        name: Name,
505        bootstrap_params: Option<BootstrapCommand>,
506    ) -> crate::Result<Self>
507    where
508        C::A: Handler<MeshFailure>,
509    {
510        tracing::info!(name = "HostMeshStatus", status = "Allocate::Attempt");
511        let transport = alloc.transport();
512        let extent = alloc.extent().clone();
513        let is_local = alloc.is_local();
514        let proc_mesh = ProcMesh::allocate(cx, alloc, name.name()).await?;
515
516        // TODO: figure out how to deal with MAST allocs. It requires an extra dimension,
517        // into which it launches multiple procs, so we need to always specify an additional
518        // sub-host dimension of size 1.
519
520        let (mesh_agents, mut mesh_agents_rx) = cx.mailbox().open_port();
521        let trampoline_name = Name::new("host_mesh_trampoline").unwrap();
522        let _trampoline_actor_mesh = proc_mesh
523            .spawn_with_name::<HostMeshAgentProcMeshTrampoline, C>(
524                cx,
525                trampoline_name,
526                &(transport, mesh_agents.bind(), bootstrap_params, is_local),
527                None,
528                // The trampoline is a system actor and does not need a controller.
529                true,
530            )
531            .await?;
532
533        // TODO: don't re-rank the hosts
534        let mut hosts = Vec::new();
535        for _rank in 0..extent.num_ranks() {
536            let mesh_agent = mesh_agents_rx.recv().await?;
537
538            let addr = mesh_agent.actor_id().proc_id().addr().clone();
539
540            let host_ref = HostRef(addr);
541            if host_ref.mesh_agent() != mesh_agent {
542                return Err(crate::Error::HostMeshAgentConfigurationError(
543                    mesh_agent.actor_id().clone(),
544                    format!(
545                        "expected mesh agent actor id to be {}",
546                        host_ref.mesh_agent().actor_id()
547                    ),
548                ));
549            }
550            hosts.push(host_ref);
551        }
552
553        let proc_mesh_ref = proc_mesh.clone();
554        let mesh = Self {
555            name: name.clone(),
556            extent: extent.clone(),
557            allocation: HostMeshAllocation::ProcMesh {
558                proc_mesh,
559                proc_mesh_ref,
560                hosts: hosts.clone(),
561            },
562            current_ref: HostMeshRef::new(name, extent.into(), hosts).unwrap(),
563        };
564
565        // Spawn a unique mesh controller for each proc mesh, so the type of the
566        // mesh can be preserved.
567        let controller = HostMeshController::new(mesh.deref().clone());
568        // AI-3: controller name must include mesh identity for
569        // proc-wide ActorId uniqueness.
570        let controller_name = format!("{}_{}", HOST_MESH_CONTROLLER_NAME, mesh.name());
571        let controller_handle = controller
572            .spawn_with_name(cx, &controller_name)
573            .map_err(|e| crate::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
574        // Bind the actor's well-known ports (Signal, IntrospectMessage,
575        // Undeliverable). Without this, the controller's mailbox has no
576        // port entries and messages (including introspection queries)
577        // are returned as undeliverable.
578        let _: hyperactor::reference::ActorRef<HostMeshController> = controller_handle.bind();
579
580        tracing::info!(name = "HostMeshStatus", status = "Allocate::Created");
581
582        mesh.notify_created();
583
584        Ok(mesh)
585    }
586
587    /// Take ownership of an existing host mesh reference.
588    ///
589    /// Consumes the `HostMeshRef`, captures its region/hosts, and
590    /// returns an owned `HostMesh` that assumes lifecycle
591    /// responsibility for those hosts (i.e., will shut them down on
592    /// Drop).
593    pub fn take(mesh: HostMeshRef) -> Self {
594        let region = mesh.region().clone();
595        let hosts: Vec<HostRef> = mesh.values().collect();
596
597        let current_ref = HostMeshRef::new(mesh.name.clone(), region.clone(), hosts.clone())
598            .expect("region/hosts cardinality must match");
599
600        let result = Self {
601            name: mesh.name,
602            extent: region.extent().clone(),
603            allocation: HostMeshAllocation::Owned { hosts },
604            current_ref,
605        };
606        result.notify_created();
607        result
608    }
609
610    /// Attach to pre-existing workers and push client config.
611    ///
612    /// This is the "simple bootstrap" attach protocol:
613    /// 1. Wraps the provided addresses into a `HostMeshRef`.
614    /// 2. Snapshots `propagatable_attrs()` from the client's global config.
615    /// 3. Pushes the config to each host agent as `Source::ClientOverride`,
616    ///    with a barrier to confirm installation.
617    /// 4. Returns the owned `HostMesh`.
618    ///
619    /// After this returns, host agents have the client's config and any
620    /// subsequent operations on the host's system proc (e.g.
621    /// SpawnMeshAdmin) will see it.
622    pub async fn attach(
623        cx: &impl context::Actor,
624        name: Name,
625        addresses: Vec<ChannelAddr>,
626    ) -> crate::Result<Self> {
627        let mesh_ref = HostMeshRef::from_hosts(name, addresses);
628        let config = hyperactor_config::global::propagatable_attrs();
629        mesh_ref.push_config(cx, config).await;
630        Ok(Self::take(mesh_ref))
631    }
632
633    /// Request a clean shutdown of all hosts owned by this
634    /// `HostMesh`.
635    ///
636    /// For each host, this sends `ShutdownHost` to its
637    /// `HostAgent`. The agent takes and drops its `Host` (via
638    /// `Option::take()`), which in turn drops the embedded
639    /// `BootstrapProcManager`. On drop, the manager walks its PID
640    /// table and sends SIGKILL to any procs it spawned—tying proc
641    /// lifetimes to their hosts and preventing leaks.
642    #[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
643    pub async fn shutdown(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
644        tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
645        let mut failed_hosts = vec![];
646        for host in self.current_ref.values() {
647            if let Err(e) = host.shutdown(cx).await {
648                tracing::warn!(
649                    name = "HostMeshStatus",
650                    status = "Shutdown::Host::Failed",
651                    host = %host,
652                    error = %e,
653                    "host shutdown failed"
654                );
655                failed_hosts.push(host);
656            }
657        }
658        if failed_hosts.is_empty() {
659            tracing::info!(name = "HostMeshStatus", status = "Shutdown::Success");
660        } else {
661            tracing::error!(
662                name = "HostMeshStatus",
663                status = "Shutdown::Failed",
664                "host mesh shutdown failed; check the logs of the failed hosts for details: {:?}",
665                failed_hosts
666            );
667        }
668
669        match &mut self.allocation {
670            HostMeshAllocation::ProcMesh { proc_mesh, .. } => {
671                proc_mesh.stop(cx, "host mesh shutdown".to_string()).await?;
672            }
673            HostMeshAllocation::Owned { .. } => {}
674        }
675        Ok(())
676    }
677}
678
679impl Deref for HostMesh {
680    type Target = HostMeshRef;
681
682    fn deref(&self) -> &Self::Target {
683        &self.current_ref
684    }
685}
686
687impl Drop for HostMesh {
688    /// Best-effort cleanup for owned host meshes on drop.
689    ///
690    /// When a `HostMesh` is dropped, it attempts to shut down all
691    /// hosts it owns:
692    /// - If a Tokio runtime is available, we spawn an ephemeral
693    ///   `Proc` + `Instance` and send `ShutdownHost` messages to each
694    ///   host. This ensures that the embedded `BootstrapProcManager`s
695    ///   are dropped, and all child procs they spawned are killed.
696    /// - If no runtime is available, we cannot perform async cleanup
697    ///   here; in that case we log a warning and rely on kernel-level
698    ///   PDEATHSIG or the individual `BootstrapProcManager`'s `Drop`
699    ///   as the final safeguard.
700    ///
701    /// This path is **last resort**: callers should prefer explicit
702    /// [`HostMesh::shutdown`] to guarantee orderly teardown. Drop
703    /// only provides opportunistic cleanup to prevent process leaks
704    /// if shutdown is skipped.
705    fn drop(&mut self) {
706        tracing::info!(
707            name = "HostMeshStatus",
708            host_mesh = %self.name,
709            status = "Dropping",
710        );
711        // Snapshot the owned hosts we're responsible for.
712        let hosts: Vec<HostRef> = match &self.allocation {
713            HostMeshAllocation::ProcMesh { hosts, .. } | HostMeshAllocation::Owned { hosts } => {
714                hosts.clone()
715            }
716        };
717
718        // Best-effort only when a Tokio runtime is available.
719        if let Ok(handle) = tokio::runtime::Handle::try_current() {
720            let mesh_name = self.name.clone();
721            let allocation_label = match &self.allocation {
722                HostMeshAllocation::ProcMesh { .. } => "proc_mesh",
723                HostMeshAllocation::Owned { .. } => "owned",
724            }
725            .to_string();
726
727            handle.spawn(async move {
728                let span = tracing::info_span!(
729                    "hostmesh_drop_cleanup",
730                    host_mesh = %mesh_name,
731                    allocation = %allocation_label,
732                    hosts = hosts.len(),
733                );
734                let _g = span.enter();
735
736                // Spin up a tiny ephemeral proc+instance to get an
737                // Actor context.
738                match hyperactor::Proc::direct(
739                    ChannelTransport::Unix.any(),
740                    "hostmesh-drop".to_string(),
741                )
742                {
743                    Err(e) => {
744                        tracing::warn!(
745                            error = %e,
746                            "failed to construct ephemeral Proc for drop-cleanup; \
747                             relying on PDEATHSIG/manager Drop"
748                        );
749                    }
750                    Ok(proc) => {
751                        match proc.instance("drop") {
752                            Err(e) => {
753                                tracing::warn!(
754                                    error = %e,
755                                    "failed to create ephemeral instance for drop-cleanup; \
756                                     relying on PDEATHSIG/manager Drop"
757                                );
758                            }
759                            Ok((instance, _guard)) => {
760                                let mut attempted = 0usize;
761                                let mut ok = 0usize;
762                                let mut err = 0usize;
763
764                                for host in hosts {
765                                    attempted += 1;
766                                    tracing::debug!(host = %host, "drop-cleanup: shutdown start");
767                                    match host.shutdown(&instance).await {
768                                        Ok(()) => {
769                                            ok += 1;
770                                            tracing::debug!(host = %host, "drop-cleanup: shutdown ok");
771                                        }
772                                        Err(e) => {
773                                            err += 1;
774                                            tracing::warn!(host = %host, error = %e, "drop-cleanup: shutdown failed");
775                                        }
776                                    }
777                                }
778
779                                tracing::info!(
780                                    attempted, ok, err,
781                                    "hostmesh drop-cleanup summary"
782                                );
783                            }
784                        }
785                    }
786                }
787            });
788        } else {
789            // No runtime here; PDEATHSIG and manager Drop remain the
790            // last-resort safety net.
791            tracing::warn!(
792                host_mesh = %self.name,
793                hosts = hosts.len(),
794                "HostMesh dropped without a Tokio runtime; skipping \
795                 best-effort shutdown. This indicates that .shutdown() \
796                 on this mesh has not been called before program exit \
797                 (perhaps due to a missing call to \
798                 'monarch.actor.shutdown_context()'?) This in turn can \
799                 lead to backtrace output due to folly SIGTERM \
800                 handlers."
801            );
802        }
803
804        tracing::info!(
805            name = "HostMeshStatus",
806            host_mesh = %self.name,
807            status = "Dropped",
808        );
809    }
810}
811
812/// Helper: legacy shim for error types that still require
813/// RankedValues<Status>. TODO(shayne-fletcher): Delete this
814/// shim once Error::ActorSpawnError carries a StatusMesh
815/// (ValueMesh<Status>) directly. At that point, use the mesh
816/// as-is and remove `mesh_to_rankedvalues_*` calls below.
817/// is_sentinel should return true if the value matches a previous filled in
818/// value. If the input value matches the sentinel, it gets replaced with the
819/// default.
820pub(crate) fn mesh_to_rankedvalues_with_default<T, F>(
821    mesh: &ValueMesh<T>,
822    default: T,
823    is_sentinel: F,
824    len: usize,
825) -> RankedValues<T>
826where
827    T: Eq + Clone + 'static,
828    F: Fn(&T) -> bool,
829{
830    let mut out = RankedValues::from((0..len, default));
831    for (i, s) in mesh.values().enumerate() {
832        if !is_sentinel(&s) {
833            out.merge_from(RankedValues::from((i..i + 1, s)));
834        }
835    }
836    out
837}
838
839/// A non-owning reference to a mesh of hosts.
840///
841/// Logically, this is a data structure that contains a set of ranked
842/// hosts organized into a [`Region`]. `HostMeshRef`s can be sliced to
843/// produce new references that contain a subset of the hosts in the
844/// original mesh.
845///
846/// `HostMeshRef`s have a concrete syntax, implemented by its
847/// `Display` and `FromStr` implementations.
848///
849/// This type does **not** control lifecycle. It only describes the
850/// topology of hosts. To take ownership and perform deterministic
851/// teardown, use [`HostMesh::take`], which returns an owned
852/// [`HostMesh`] that guarantees cleanup on `shutdown()` or `Drop`.
853///
854/// Cloning this type does not confer ownership. If a corresponding
855/// owned [`HostMesh`] shuts down the hosts, operations via a cloned
856/// `HostMeshRef` may fail because the hosts are no longer running.
857#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
858pub struct HostMeshRef {
859    name: Name,
860    region: Region,
861    ranks: Arc<Vec<HostRef>>,
862}
863wirevalue::register_type!(HostMeshRef);
864
865impl HostMeshRef {
866    /// Create a new (raw) HostMeshRef from the provided region and associated
867    /// ranks, which must match in cardinality.
868    #[allow(clippy::result_large_err)]
869    fn new(name: Name, region: Region, ranks: Vec<HostRef>) -> crate::Result<Self> {
870        if region.num_ranks() != ranks.len() {
871            return Err(crate::Error::InvalidRankCardinality {
872                expected: region.num_ranks(),
873                actual: ranks.len(),
874            });
875        }
876        Ok(Self {
877            name,
878            region,
879            ranks: Arc::new(ranks),
880        })
881    }
882
883    /// Create a new HostMeshRef from an arbitrary set of hosts. This is meant to
884    /// enable extrinsic bootstrapping.
885    pub fn from_hosts(name: Name, hosts: Vec<ChannelAddr>) -> Self {
886        Self {
887            name,
888            region: extent!(hosts = hosts.len()).into(),
889            ranks: Arc::new(hosts.into_iter().map(HostRef).collect()),
890        }
891    }
892
893    /// Create a new HostMeshRef from an arbitrary set of host mesh agents.
894    pub fn from_host_agents(
895        name: Name,
896        agents: Vec<hyperactor_reference::ActorRef<HostAgent>>,
897    ) -> crate::Result<Self> {
898        Ok(Self {
899            name,
900            region: extent!(hosts = agents.len()).into(),
901            ranks: Arc::new(
902                agents
903                    .into_iter()
904                    .map(HostRef::try_from)
905                    .collect::<crate::Result<_>>()?,
906            ),
907        })
908    }
909
910    /// Create a unit HostMeshRef from a host mesh agent.
911    pub fn from_host_agent(
912        name: Name,
913        agent: hyperactor_reference::ActorRef<HostAgent>,
914    ) -> crate::Result<Self> {
915        Ok(Self {
916            name,
917            region: Extent::unity().into(),
918            ranks: Arc::new(vec![HostRef::try_from(agent)?]),
919        })
920    }
921
922    /// Returns the host entries as `(addr_string, ActorRef<HostAgent>)` pairs.
923    /// Used by `MeshAdminAgent::effective_hosts()` to merge C into the
924    /// admin's host list (see CH-1 in mesh_admin module doc).
925    pub(crate) fn host_entries(&self) -> Vec<(String, hyperactor_reference::ActorRef<HostAgent>)> {
926        self.ranks
927            .iter()
928            .map(|h| (h.0.to_string(), h.mesh_agent()))
929            .collect()
930    }
931
932    /// Push client config to all host agents in this mesh, in parallel.
933    ///
934    /// Each host installs the attrs as `Source::ClientOverride`.
935    /// Idempotent: sending the same attrs twice replaces the layer.
936    ///
937    /// Sends request-reply to each host and barriers on all replies.
938    /// Best-effort: on timeout or error, logs a warning and continues.
939    /// Timeout controlled by `MESH_ATTACH_CONFIG_TIMEOUT` (default 10s).
940    pub(crate) async fn push_config(
941        &self,
942        cx: &impl context::Actor,
943        attrs: hyperactor_config::attrs::Attrs,
944    ) {
945        let timeout = hyperactor_config::global::get(crate::config::MESH_ATTACH_CONFIG_TIMEOUT);
946        let hosts: Vec<_> = self.values().collect();
947        let num_hosts = hosts.len();
948
949        let barrier = futures::future::join_all(hosts.into_iter().map(|host| {
950            let attrs = attrs.clone();
951            let agent_id = host.mesh_agent().actor_id().clone();
952            async move {
953                match host.mesh_agent().set_client_config(cx, attrs).await {
954                    Ok(()) => {
955                        tracing::debug!(host = %agent_id, "host agent config installed");
956                        true
957                    }
958                    Err(e) => {
959                        tracing::warn!(
960                            host = %agent_id,
961                            error = %e,
962                            "failed to push client config to host agent, \
963                             continuing without it",
964                        );
965                        false
966                    }
967                }
968            }
969        }));
970
971        match tokio::time::timeout(timeout, barrier).await {
972            Ok(results) => {
973                let success = results.iter().filter(|&&r| r).count();
974                let failed = num_hosts - success;
975                tracing::info!(
976                    success = success,
977                    failed = failed,
978                    "push_config barrier complete",
979                );
980            }
981            Err(_) => {
982                tracing::warn!(
983                    num_hosts = num_hosts,
984                    timeout_secs = timeout.as_secs(),
985                    "push_config barrier timed out, some hosts may not \
986                     have received client config",
987                );
988            }
989        }
990    }
991
992    /// Spawn a ProcMesh onto this host mesh. The per_host extent specifies the shape
993    /// of the procs to spawn on each host.
994    ///
995    /// Currently, spawn issues direct calls to each host agent. This will be fixed by
996    /// maintaining a comm actor on the host service procs themselves.
997    #[allow(clippy::result_large_err)]
998    pub async fn spawn<C: context::Actor>(
999        &self,
1000        cx: &C,
1001        name: &str,
1002        per_host: Extent,
1003    ) -> crate::Result<ProcMesh>
1004    where
1005        C::A: Handler<MeshFailure>,
1006    {
1007        self.spawn_inner(cx, Name::new(name)?, per_host).await
1008    }
1009
1010    #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
1011    async fn spawn_inner<C: context::Actor>(
1012        &self,
1013        cx: &C,
1014        proc_mesh_name: Name,
1015        per_host: Extent,
1016    ) -> crate::Result<ProcMesh>
1017    where
1018        C::A: Handler<MeshFailure>,
1019    {
1020        tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt");
1021        tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",);
1022        let result = self.spawn_inner_inner(cx, proc_mesh_name, per_host).await;
1023        match &result {
1024            Ok(_) => {
1025                tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success");
1026                tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success");
1027            }
1028            Err(error) => {
1029                tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error);
1030                tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error);
1031            }
1032        }
1033        result
1034    }
1035
1036    async fn spawn_inner_inner<C: context::Actor>(
1037        &self,
1038        cx: &C,
1039        proc_mesh_name: Name,
1040        per_host: Extent,
1041    ) -> crate::Result<ProcMesh>
1042    where
1043        C::A: Handler<MeshFailure>,
1044    {
1045        let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
1046        let host_labels = self.region.labels().iter().collect::<HashSet<_>>();
1047        if !per_host_labels
1048            .intersection(&host_labels)
1049            .collect::<Vec<_>>()
1050            .is_empty()
1051        {
1052            return Err(crate::Error::ConfigurationError(anyhow::anyhow!(
1053                "per_host dims overlap with existing dims when spawning proc mesh"
1054            )));
1055        }
1056
1057        let extent = self
1058            .region
1059            .extent()
1060            .concat(&per_host)
1061            .map_err(|err| crate::Error::ConfigurationError(err.into()))?;
1062
1063        let region: Region = extent.clone().into();
1064
1065        tracing::info!(
1066            name = "ProcMeshStatus",
1067            status = "Spawn::Attempt",
1068            %region,
1069            "spawning proc mesh"
1070        );
1071
1072        let mut procs = Vec::new();
1073        let num_ranks = region.num_ranks();
1074        // Accumulator outputs full StatusMesh snapshots; seed with
1075        // NotExist.
1076        let (port, rx) = cx.mailbox().open_accum_port_opts(
1077            crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1078            StreamingReducerOpts {
1079                max_update_interval: Some(Duration::from_millis(50)),
1080                initial_update_interval: None,
1081            },
1082        );
1083
1084        // Create or update each proc, then fence on receiving status
1085        // overlays. This prevents a race where procs become
1086        // addressable before their local muxers are ready, which
1087        // could make early messages unroutable. A future improvement
1088        // would allow buffering in the host-level muxer to eliminate
1089        // the need for this synchronization step.
1090        let mut proc_names = Vec::new();
1091        let client_config_override = hyperactor_config::global::propagatable_attrs();
1092        for (host_rank, host) in self.ranks.iter().enumerate() {
1093            for per_host_rank in 0..per_host.num_ranks() {
1094                let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
1095                let proc_name = Name::new(format!("{}_{}", proc_mesh_name.name(), per_host_rank))?;
1096                proc_names.push(proc_name.clone());
1097                host.mesh_agent()
1098                    .create_or_update(
1099                        cx,
1100                        proc_name.clone(),
1101                        resource::Rank::new(create_rank),
1102                        ProcSpec::new(client_config_override.clone()),
1103                    )
1104                    .await
1105                    .map_err(|e| {
1106                        crate::Error::HostMeshAgentConfigurationError(
1107                            host.mesh_agent().actor_id().clone(),
1108                            format!("failed while creating proc: {}", e),
1109                        )
1110                    })?;
1111                let mut reply_port = port.bind();
1112                // If this proc dies or some other issue renders the reply undeliverable,
1113                // the reply does not need to be returned to the sender.
1114                reply_port.return_undeliverable(false);
1115                host.mesh_agent()
1116                    .get_rank_status(cx, proc_name.clone(), reply_port)
1117                    .await
1118                    .map_err(|e| {
1119                        crate::Error::HostMeshAgentConfigurationError(
1120                            host.mesh_agent().actor_id().clone(),
1121                            format!("failed while querying proc status: {}", e),
1122                        )
1123                    })?;
1124                let proc_id = host.named_proc(&proc_name);
1125                tracing::info!(
1126                    name = "ProcMeshStatus",
1127                    status = "Spawn::CreatingProc",
1128                    %proc_id,
1129                    rank = create_rank,
1130                );
1131                procs.push(ProcRef::new(
1132                    proc_id,
1133                    create_rank,
1134                    // TODO: specify or retrieve from state instead, to avoid attestation.
1135                    hyperactor_reference::ActorRef::attest(
1136                        host.named_proc(&proc_name)
1137                            .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
1138                    ),
1139                ));
1140            }
1141        }
1142
1143        let start_time = tokio::time::Instant::now();
1144
1145        // Wait on accumulated StatusMesh snapshots until complete or
1146        // timeout.
1147        match GetRankStatus::wait(
1148            rx,
1149            num_ranks,
1150            hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1151            region.clone(), // fallback mesh if nothing arrives
1152        )
1153        .await
1154        {
1155            Ok(statuses) => {
1156                // If any rank is terminating, surface a
1157                // ProcCreationError pointing at that rank.
1158                if let Some((rank, status)) = statuses
1159                    .values()
1160                    .enumerate()
1161                    .find(|(_, s)| s.is_terminating())
1162                {
1163                    let proc_name = &proc_names[rank];
1164                    let host_rank = rank / per_host.num_ranks();
1165                    let mesh_agent = self.ranks[host_rank].mesh_agent();
1166                    let (reply_tx, mut reply_rx) = cx.mailbox().open_port();
1167                    let mut reply_tx = reply_tx.bind();
1168                    // If this proc dies or some other issue renders the reply undeliverable,
1169                    // the reply does not need to be returned to the sender.
1170                    reply_tx.return_undeliverable(false);
1171                    mesh_agent
1172                        .send(
1173                            cx,
1174                            resource::GetState {
1175                                name: proc_name.clone(),
1176                                reply: reply_tx,
1177                            },
1178                        )
1179                        .map_err(|e| {
1180                            crate::Error::SendingError(mesh_agent.actor_id().clone(), e.into())
1181                        })?;
1182                    let state = match tokio::time::timeout(
1183                        hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1184                        reply_rx.recv(),
1185                    )
1186                    .await
1187                    {
1188                        Ok(Ok(state)) => state,
1189                        _ => resource::State {
1190                            name: proc_name.clone(),
1191                            status,
1192                            state: None,
1193                        },
1194                    };
1195
1196                    tracing::error!(
1197                        name = "ProcMeshStatus",
1198                        status = "Spawn::GetRankStatus",
1199                        rank = host_rank,
1200                        "rank {} is terminating with state: {}",
1201                        host_rank,
1202                        state
1203                    );
1204
1205                    return Err(crate::Error::ProcCreationError {
1206                        state: Box::new(state),
1207                        host_rank,
1208                        mesh_agent,
1209                    });
1210                }
1211            }
1212            Err(complete) => {
1213                tracing::error!(
1214                    name = "ProcMeshStatus",
1215                    status = "Spawn::GetRankStatus",
1216                    "timeout after {:?} when waiting for procs being created",
1217                    hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1218                );
1219                // Fill remaining ranks with a timeout status via the
1220                // legacy shim.
1221                let legacy = mesh_to_rankedvalues_with_default(
1222                    &complete,
1223                    Status::Timeout(start_time.elapsed()),
1224                    Status::is_not_exist,
1225                    num_ranks,
1226                );
1227                return Err(crate::Error::ProcSpawnError { statuses: legacy });
1228            }
1229        }
1230
1231        let mesh =
1232            ProcMesh::create_owned_unchecked(cx, proc_mesh_name, extent, self.clone(), procs).await;
1233        if let Ok(ref mesh) = mesh {
1234            // Spawn a unique mesh controller for each proc mesh, so the type of the
1235            // mesh can be preserved.
1236            let controller = ProcMeshController::new(mesh.deref().clone());
1237            // AI-3: controller name must include mesh identity for
1238            // proc-wide ActorId uniqueness.
1239            let controller_name = format!("{}_{}", PROC_MESH_CONTROLLER_NAME, mesh.name());
1240            let controller_handle = controller
1241                .spawn_with_name(cx, &controller_name)
1242                .map_err(|e| crate::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
1243            // Bind the actor's well-known ports (Signal, IntrospectMessage,
1244            // Undeliverable). Without this, the controller's mailbox has no
1245            // port entries and messages (including introspection queries)
1246            // are returned as undeliverable.
1247            let _: hyperactor::reference::ActorRef<ProcMeshController> = controller_handle.bind();
1248        }
1249        mesh
1250    }
1251
1252    /// The name of the referenced host mesh.
1253    pub fn name(&self) -> &Name {
1254        &self.name
1255    }
1256
1257    /// The host references (channel addresses) in rank order.
1258    pub fn hosts(&self) -> &[HostRef] {
1259        &self.ranks
1260    }
1261
1262    /// Spawn a [`MeshAdminAgent`] on the head host's system proc and
1263    /// return its HTTP address.
1264    ///
1265    /// Sends a `SpawnMeshAdmin` message to `ranks[0]`'s
1266    /// `HostAgent`, which spawns the admin agent on that host's
1267    /// system proc. When `admin_addr` is `Some`, the HTTP server
1268    /// binds to that address; otherwise it reads `MESH_ADMIN_ADDR`
1269    /// from config.
1270    pub async fn spawn_admin(
1271        &self,
1272        cx: &impl hyperactor::context::Actor,
1273        admin_addr: Option<std::net::SocketAddr>,
1274    ) -> anyhow::Result<String> {
1275        let mut hosts: Vec<(String, hyperactor_reference::ActorRef<HostAgent>)> = self
1276            .ranks
1277            .iter()
1278            .map(|h| (h.0.to_string(), h.mesh_agent()))
1279            .collect();
1280
1281        // CH-1: see mesh_admin module doc. Include C (the client
1282        // host) so the admin can introspect it. Dedup for C in A.
1283        if let Some(client_host) = crate::global_context::try_this_host() {
1284            for (addr, agent_ref) in client_host.host_entries() {
1285                let agent_id = agent_ref.actor_id();
1286                if !hosts
1287                    .iter()
1288                    .any(|(_, existing)| existing.actor_id() == agent_id)
1289                {
1290                    hosts.push((addr, agent_ref));
1291                }
1292            }
1293        }
1294
1295        let root_client_id = cx.mailbox().actor_id().clone();
1296
1297        let head_agent = self.ranks[0].mesh_agent();
1298        let addr = head_agent
1299            .spawn_mesh_admin(cx, hosts, Some(root_client_id), admin_addr)
1300            .await?;
1301
1302        Ok(addr)
1303    }
1304
1305    #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
1306    pub(crate) async fn stop_proc_mesh(
1307        &self,
1308        cx: &impl hyperactor::context::Actor,
1309        proc_mesh_name: &Name,
1310        procs: impl IntoIterator<Item = hyperactor_reference::ProcId>,
1311        region: Region,
1312        reason: String,
1313    ) -> anyhow::Result<()> {
1314        // Accumulator outputs full StatusMesh snapshots; seed with
1315        // NotExist.
1316        let mut proc_names = Vec::new();
1317        let num_ranks = region.num_ranks();
1318        // Accumulator outputs full StatusMesh snapshots; seed with
1319        // NotExist.
1320        let (port, rx) = cx.mailbox().open_accum_port_opts(
1321            crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1322            StreamingReducerOpts {
1323                max_update_interval: Some(Duration::from_millis(50)),
1324                initial_update_interval: None,
1325            },
1326        );
1327        for proc_id in procs.into_iter() {
1328            let (addr, proc_name) = (proc_id.addr().clone(), proc_id.name().to_string());
1329            // The name stored in HostAgent is not the same as the
1330            // one stored in the ProcMesh. We instead take each proc id
1331            // and map it to that particular agent.
1332            let proc_name = proc_name.parse::<Name>()?;
1333            proc_names.push(proc_name.clone());
1334
1335            // Note that we don't send 1 message per host agent, we send 1 message
1336            // per proc.
1337            let host = HostRef(addr);
1338            host.mesh_agent().send(
1339                cx,
1340                resource::Stop {
1341                    name: proc_name.clone(),
1342                    reason: reason.clone(),
1343                },
1344            )?;
1345            host.mesh_agent()
1346                .get_rank_status(cx, proc_name, port.bind())
1347                .await?;
1348
1349            tracing::info!(
1350                name = "ProcMeshStatus",
1351                %proc_id,
1352                status = "Stop::Sent",
1353            );
1354        }
1355        tracing::info!(
1356            name = "HostMeshStatus",
1357            status = "ProcMesh::Stop::Sent",
1358            "sending Stop to proc mesh for {} procs: {}",
1359            proc_names.len(),
1360            proc_names
1361                .iter()
1362                .map(|n| n.to_string())
1363                .collect::<Vec<_>>()
1364                .join(", ")
1365        );
1366
1367        let start_time = tokio::time::Instant::now();
1368
1369        match GetRankStatus::wait(
1370            rx,
1371            num_ranks,
1372            hyperactor_config::global::get(PROC_STOP_MAX_IDLE),
1373            region.clone(), // fallback mesh if nothing arrives
1374        )
1375        .await
1376        {
1377            Ok(statuses) => {
1378                let all_stopped = statuses.values().all(|s| s.is_terminating());
1379                if !all_stopped {
1380                    tracing::error!(
1381                        name = "ProcMeshStatus",
1382                        status = "FailedToStop",
1383                        "failed to terminate proc mesh: {:?}",
1384                        statuses,
1385                    );
1386                    return Err(anyhow::anyhow!(
1387                        "failed to terminate proc mesh: {:?}",
1388                        statuses,
1389                    ));
1390                }
1391                tracing::info!(name = "ProcMeshStatus", status = "Stopped");
1392            }
1393            Err(complete) => {
1394                // Fill remaining ranks with a timeout status via the
1395                // legacy shim.
1396                let legacy = mesh_to_rankedvalues_with_default(
1397                    &complete,
1398                    Status::Timeout(start_time.elapsed()),
1399                    Status::is_not_exist,
1400                    num_ranks,
1401                );
1402                tracing::error!(
1403                    name = "ProcMeshStatus",
1404                    status = "StoppingTimeout",
1405                    "failed to terminate proc mesh before timeout: {:?}",
1406                    legacy,
1407                );
1408                return Err(anyhow::anyhow!(
1409                    "failed to terminate proc mesh {} before timeout: {:?}",
1410                    proc_mesh_name,
1411                    legacy
1412                ));
1413            }
1414        }
1415        Ok(())
1416    }
1417
1418    /// Get the state of all procs with Name in this host mesh.
1419    /// The procs iterator must be in rank order.
1420    /// The returned ValueMesh will have a non-empty inner state unless there
1421    /// was a timeout reaching the host mesh agent.
1422    #[allow(clippy::result_large_err)]
1423    pub(crate) async fn proc_states(
1424        &self,
1425        cx: &impl context::Actor,
1426        procs: impl IntoIterator<Item = hyperactor_reference::ProcId>,
1427        region: Region,
1428    ) -> crate::Result<ValueMesh<resource::State<ProcState>>> {
1429        let (tx, mut rx) = cx.mailbox().open_port();
1430
1431        let mut num_ranks = 0;
1432        let procs: Vec<hyperactor_reference::ProcId> = procs.into_iter().collect();
1433        let mut proc_names = Vec::new();
1434        for proc_id in procs.iter() {
1435            num_ranks += 1;
1436            let (addr, proc_name) = (proc_id.addr().clone(), proc_id.name().to_string());
1437
1438            // Note that we don't send 1 message per host agent, we send 1 message
1439            // per proc.
1440            let host = HostRef(addr);
1441            let proc_name = proc_name.parse::<Name>()?;
1442            proc_names.push(proc_name.clone());
1443            let mut reply = tx.bind();
1444            // If this proc dies or some other issue renders the reply undeliverable,
1445            // the reply does not need to be returned to the sender.
1446            reply.return_undeliverable(false);
1447            host.mesh_agent()
1448                .send(
1449                    cx,
1450                    resource::GetState {
1451                        name: proc_name,
1452                        reply,
1453                    },
1454                )
1455                .map_err(|e| {
1456                    crate::Error::CallError(host.mesh_agent().actor_id().clone(), e.into())
1457                })?;
1458        }
1459
1460        let mut states = Vec::with_capacity(num_ranks);
1461        let timeout = hyperactor_config::global::get(GET_PROC_STATE_MAX_IDLE);
1462        for _ in 0..num_ranks {
1463            // The agent runs on the same process as the running actor, so if some
1464            // fatal event caused the process to crash (e.g. OOM, signal, process exit),
1465            // the agent will be unresponsive.
1466            // We handle this by setting a timeout on the recv, and if we don't get a
1467            // message we assume the agent is dead and return a failed state.
1468            let state = tokio::time::timeout(timeout, rx.recv()).await;
1469            if let Ok(state) = state {
1470                // Handle non-timeout receiver error.
1471                let state = state?;
1472                match state.state {
1473                    Some(ref inner) => {
1474                        states.push((inner.create_rank, state));
1475                    }
1476                    None => {
1477                        return Err(crate::Error::NotExist(state.name));
1478                    }
1479                }
1480            } else {
1481                // Timeout error, stop reading from the receiver and send back what we have so far,
1482                // padding with failed states.
1483                tracing::warn!(
1484                    "Timeout waiting for response from host mesh agent for proc_states after {:?}",
1485                    timeout
1486                );
1487                let all_ranks = (0..num_ranks).collect::<HashSet<_>>();
1488                let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
1489                let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
1490                assert_eq!(leftover_ranks.len(), num_ranks - states.len());
1491                while states.len() < num_ranks {
1492                    let rank = *leftover_ranks
1493                        .pop()
1494                        .expect("leftover ranks should not be empty");
1495                    states.push((
1496                        // We populate with any ranks leftover at the time of the timeout.
1497                        rank,
1498                        resource::State {
1499                            name: proc_names[rank].clone(),
1500                            status: resource::Status::Timeout(timeout),
1501                            state: None,
1502                        },
1503                    ));
1504                }
1505                break;
1506            }
1507        }
1508        // Ensure that all ranks have replied. Note that if the mesh is sliced,
1509        // not all create_ranks may be in the mesh.
1510        // Sort by rank, so that the resulting mesh is ordered.
1511        states.sort_by_key(|(rank, _)| *rank);
1512        let vm = states
1513            .into_iter()
1514            .map(|(_, state)| state)
1515            .collect_mesh::<ValueMesh<_>>(region)?;
1516        Ok(vm)
1517    }
1518}
1519
1520impl view::Ranked for HostMeshRef {
1521    type Item = HostRef;
1522
1523    fn region(&self) -> &Region {
1524        &self.region
1525    }
1526
1527    fn get(&self, rank: usize) -> Option<&Self::Item> {
1528        self.ranks.get(rank)
1529    }
1530}
1531
1532impl view::RankedSliceable for HostMeshRef {
1533    fn sliced(&self, region: Region) -> Self {
1534        let ranks = self
1535            .region()
1536            .remap(&region)
1537            .unwrap()
1538            .map(|index| self.get(index).unwrap().clone());
1539        Self::new(self.name.clone(), region, ranks.collect()).unwrap()
1540    }
1541}
1542
1543impl std::fmt::Display for HostMeshRef {
1544    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1545        write!(f, "{}:", self.name)?;
1546        for (rank, host) in self.ranks.iter().enumerate() {
1547            if rank > 0 {
1548                write!(f, ",")?;
1549            }
1550            write!(f, "{}", host)?;
1551        }
1552        write!(f, "@{}", self.region)
1553    }
1554}
1555
1556/// The type of error occuring during `HostMeshRef` parsing.
1557#[derive(thiserror::Error, Debug)]
1558pub enum HostMeshRefParseError {
1559    #[error(transparent)]
1560    RegionParseError(#[from] RegionParseError),
1561
1562    #[error("invalid host mesh ref: missing region")]
1563    MissingRegion,
1564
1565    #[error("invalid host mesh ref: missing name")]
1566    MissingName,
1567
1568    #[error(transparent)]
1569    InvalidName(#[from] crate::NameParseError),
1570
1571    #[error(transparent)]
1572    InvalidHostMeshRef(#[from] Box<crate::Error>),
1573
1574    #[error(transparent)]
1575    Other(#[from] anyhow::Error),
1576}
1577
1578impl From<crate::Error> for HostMeshRefParseError {
1579    fn from(err: crate::Error) -> Self {
1580        Self::InvalidHostMeshRef(Box::new(err))
1581    }
1582}
1583
1584impl FromStr for HostMeshRef {
1585    type Err = HostMeshRefParseError;
1586
1587    fn from_str(s: &str) -> Result<Self, Self::Err> {
1588        let (name, rest) = s
1589            .split_once(':')
1590            .ok_or(HostMeshRefParseError::MissingName)?;
1591
1592        let name = Name::from_str(name)?;
1593
1594        let (hosts, region) = rest
1595            .split_once('@')
1596            .ok_or(HostMeshRefParseError::MissingRegion)?;
1597        let hosts = hosts
1598            .split(',')
1599            .map(|host| host.trim())
1600            .map(|host| host.parse::<HostRef>())
1601            .collect::<Result<Vec<_>, _>>()?;
1602        let region = region.parse()?;
1603        Ok(HostMeshRef::new(name, region, hosts)?)
1604    }
1605}
1606
1607#[cfg(test)]
1608mod tests {
1609    use std::assert_matches::assert_matches;
1610    use std::collections::HashSet;
1611    use std::collections::VecDeque;
1612
1613    use hyperactor::config::ENABLE_DEST_ACTOR_REORDERING_BUFFER;
1614    use hyperactor::context::Mailbox as _;
1615    use hyperactor_config::attrs::Attrs;
1616    use itertools::Itertools;
1617    use ndslice::ViewExt;
1618    use ndslice::extent;
1619    use timed_test::async_timed_test;
1620    use tokio::process::Command;
1621
1622    use super::*;
1623    use crate::ActorMesh;
1624    use crate::Bootstrap;
1625    use crate::bootstrap::MESH_TAIL_LOG_LINES;
1626    use crate::comm::ENABLE_NATIVE_V1_CASTING;
1627    use crate::resource::Status;
1628    use crate::testactor;
1629    use crate::testactor::GetConfigAttrs;
1630    use crate::testactor::SetConfigAttrs;
1631    use crate::testing;
1632
1633    #[test]
1634    fn test_host_mesh_subset() {
1635        let hosts: HostMeshRef = "test:local:1,local:2,local:3,local:4@replica=2/2,host=2/1"
1636            .parse()
1637            .unwrap();
1638        assert_eq!(
1639            hosts.range("replica", 1).unwrap().to_string(),
1640            "test:local:3,local:4@2+replica=1/2,host=2/1"
1641        );
1642    }
1643
1644    #[test]
1645    fn test_host_mesh_ref_parse_roundtrip() {
1646        let host_mesh_ref = HostMeshRef::new(
1647            Name::new("test").unwrap(),
1648            extent!(replica = 2, host = 2).into(),
1649            vec![
1650                "tcp:127.0.0.1:123".parse().unwrap(),
1651                "tcp:127.0.0.1:123".parse().unwrap(),
1652                "tcp:127.0.0.1:123".parse().unwrap(),
1653                "tcp:127.0.0.1:123".parse().unwrap(),
1654            ],
1655        )
1656        .unwrap();
1657
1658        assert_eq!(
1659            host_mesh_ref.to_string().parse::<HostMeshRef>().unwrap(),
1660            host_mesh_ref
1661        );
1662    }
1663
1664    #[cfg(fbcode_build)]
1665    async fn execute_allocate(config: &hyperactor_config::global::ConfigLock) {
1666        let poll = Duration::from_secs(3);
1667        let get_actor = Duration::from_mins(1);
1668        let get_proc = Duration::from_mins(1);
1669        // 3m watchdog total: 3m - (poll + get_actor + get_proc) = 180s - 123s = 57s
1670        let slack = Duration::from_secs(57);
1671
1672        let _pdeath_sig =
1673            config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1674        let _poll = config.override_key(crate::mesh_controller::SUPERVISION_POLL_FREQUENCY, poll);
1675        let _get_actor = config.override_key(crate::proc_mesh::GET_ACTOR_STATE_MAX_IDLE, get_actor);
1676        let _get_proc = config.override_key(crate::host_mesh::GET_PROC_STATE_MAX_IDLE, get_proc);
1677
1678        // Must be >= poll + get_actor + get_proc (+ slack).
1679        let _watchdog = config.override_key(
1680            crate::actor_mesh::SUPERVISION_WATCHDOG_TIMEOUT,
1681            poll + get_actor + get_proc + slack,
1682        );
1683
1684        let instance = testing::instance();
1685
1686        for alloc in testing::allocs(extent!(replicas = 4)).await {
1687            let mut host_mesh = HostMesh::allocate(instance, alloc, "test", None)
1688                .await
1689                .unwrap();
1690
1691            let proc_mesh1 = host_mesh
1692                .spawn(instance, "test_1", Extent::unity())
1693                .await
1694                .unwrap();
1695
1696            let actor_mesh1: ActorMesh<testactor::TestActor> =
1697                proc_mesh1.spawn(instance, "test", &()).await.unwrap();
1698
1699            let proc_mesh2 = host_mesh
1700                .spawn(instance, "test_2", extent!(gpus = 3, extra = 2))
1701                .await
1702                .unwrap();
1703            assert_eq!(
1704                proc_mesh2.extent(),
1705                extent!(replicas = 4, gpus = 3, extra = 2)
1706            );
1707            assert_eq!(proc_mesh2.values().count(), 24);
1708
1709            let actor_mesh2: ActorMesh<testactor::TestActor> =
1710                proc_mesh2.spawn(instance, "test", &()).await.unwrap();
1711            assert_eq!(
1712                actor_mesh2.extent(),
1713                extent!(replicas = 4, gpus = 3, extra = 2)
1714            );
1715            assert_eq!(actor_mesh2.values().count(), 24);
1716
1717            // Host meshes can be dereferenced to produce a concrete ref.
1718            let host_mesh_ref: HostMeshRef = host_mesh.clone();
1719            // Here, the underlying host mesh does not change:
1720            assert_eq!(
1721                host_mesh_ref.iter().collect::<Vec<_>>(),
1722                host_mesh.iter().collect::<Vec<_>>(),
1723            );
1724
1725            // Validate we can cast:
1726            for actor_mesh in [&actor_mesh1, &actor_mesh2] {
1727                let (port, mut rx) = instance.mailbox().open_port();
1728                actor_mesh
1729                    .cast(instance, testactor::GetActorId(port.bind()))
1730                    .unwrap();
1731
1732                let mut expected_actor_ids: HashSet<_> = actor_mesh
1733                    .values()
1734                    .map(|actor_ref| actor_ref.actor_id().clone())
1735                    .collect();
1736
1737                while !expected_actor_ids.is_empty() {
1738                    let (actor_id, _seq) = rx.recv().await.unwrap();
1739                    assert!(
1740                        expected_actor_ids.remove(&actor_id),
1741                        "got {actor_id}, expect {expected_actor_ids:?}"
1742                    );
1743                }
1744            }
1745
1746            // Now forward a message through all directed edges across the two meshes.
1747            // This tests the full connectivity of all the hosts, procs, and actors
1748            // involved in these two meshes.
1749            let mut to_visit: VecDeque<_> = actor_mesh1
1750                .values()
1751                .chain(actor_mesh2.values())
1752                .map(|actor_ref| actor_ref.port())
1753                // Each ordered pair of ports
1754                .permutations(2)
1755                // Flatten them to create a path:
1756                .flatten()
1757                .collect();
1758
1759            let expect_visited: Vec<_> = to_visit.clone().into();
1760
1761            // We are going to send to the first, and then set up a port to receive the last.
1762            let (last, mut last_rx) = instance.mailbox().open_port();
1763            to_visit.push_back(last.bind());
1764
1765            let forward = testactor::Forward {
1766                to_visit,
1767                visited: Vec::new(),
1768            };
1769            let first = forward.to_visit.front().unwrap().clone();
1770            first.send(instance, forward).unwrap();
1771
1772            let forward = last_rx.recv().await.unwrap();
1773            assert_eq!(forward.visited, expect_visited);
1774
1775            let _ = host_mesh.shutdown(&instance).await;
1776        }
1777    }
1778
1779    #[async_timed_test(timeout_secs = 600)]
1780    #[cfg(fbcode_build)]
1781    async fn test_allocate_dest_reorder_buffer_off() {
1782        let config = hyperactor_config::global::lock();
1783        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
1784        execute_allocate(&config).await;
1785    }
1786
1787    #[async_timed_test(timeout_secs = 600)]
1788    #[cfg(fbcode_build)]
1789    async fn test_allocate_dest_reorder_buffer_on() {
1790        let config = hyperactor_config::global::lock();
1791        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1792        let _guard1 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1793        execute_allocate(&config).await;
1794    }
1795
1796    /// Allocate a new port on localhost. This drops the listener, releasing the socket,
1797    /// before returning. Hyperactor's channel::net applies SO_REUSEADDR, so we do not hav
1798    /// to wait out the socket's TIMED_WAIT state.
1799    ///
1800    /// Even so, this is racy.
1801    fn free_localhost_addr() -> ChannelAddr {
1802        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1803        ChannelAddr::Tcp(listener.local_addr().unwrap())
1804    }
1805
1806    #[cfg(fbcode_build)]
1807    async fn execute_extrinsic_allocation(config: &hyperactor_config::global::ConfigLock) {
1808        let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1809
1810        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1811
1812        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1813
1814        let mut children = Vec::new();
1815        for host in hosts.iter() {
1816            let mut cmd = Command::new(program.clone());
1817            let boot = Bootstrap::Host {
1818                addr: host.clone(),
1819                command: None, // use current binary
1820                config: None,
1821                exit_on_shutdown: false,
1822            };
1823            boot.to_env(&mut cmd);
1824            cmd.kill_on_drop(true);
1825            children.push(cmd.spawn().unwrap());
1826        }
1827
1828        let instance = testing::instance();
1829        let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1830
1831        let proc_mesh = host_mesh
1832            .spawn(&testing::instance(), "test", Extent::unity())
1833            .await
1834            .unwrap();
1835
1836        let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
1837            .spawn(&testing::instance(), "test", &())
1838            .await
1839            .unwrap();
1840
1841        testactor::assert_mesh_shape(actor_mesh).await;
1842
1843        HostMesh::take(host_mesh)
1844            .shutdown(&instance)
1845            .await
1846            .expect("hosts shutdown");
1847    }
1848
1849    #[tokio::test]
1850    #[cfg(fbcode_build)]
1851    async fn test_extrinsic_allocation_v0() {
1852        let config = hyperactor_config::global::lock();
1853        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
1854        execute_extrinsic_allocation(&config).await;
1855    }
1856
1857    #[tokio::test]
1858    #[cfg(fbcode_build)]
1859    async fn test_extrinsic_allocation_v1() {
1860        let config = hyperactor_config::global::lock();
1861        let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1862        let _guard1 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1863        execute_extrinsic_allocation(&config).await;
1864    }
1865
1866    #[tokio::test]
1867    #[cfg(fbcode_build)]
1868    async fn test_failing_proc_allocation() {
1869        let lock = hyperactor_config::global::lock();
1870        let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
1871
1872        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1873
1874        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1875
1876        let mut children = Vec::new();
1877        for host in hosts.iter() {
1878            let mut cmd = Command::new(program.clone());
1879            let boot = Bootstrap::Host {
1880                addr: host.clone(),
1881                config: None,
1882                // The entire purpose of this is to fail:
1883                command: Some(BootstrapCommand::from("false")),
1884                exit_on_shutdown: false,
1885            };
1886            boot.to_env(&mut cmd);
1887            cmd.kill_on_drop(true);
1888            children.push(cmd.spawn().unwrap());
1889        }
1890        let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1891
1892        let instance = testing::instance();
1893
1894        let err = host_mesh
1895            .spawn(&instance, "test", Extent::unity())
1896            .await
1897            .unwrap_err();
1898        assert_matches!(
1899            err,
1900            crate::Error::ProcCreationError { state, .. }
1901            if matches!(state.status, resource::Status::Failed(ref msg) if msg.contains("failed to configure process: Ready(Terminal(Stopped { exit_code: 1"))
1902        );
1903    }
1904
1905    #[tokio::test]
1906    #[cfg(fbcode_build)]
1907    async fn test_halting_proc_allocation() {
1908        let config = hyperactor_config::global::lock();
1909        let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(20));
1910
1911        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1912
1913        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1914
1915        let mut children = Vec::new();
1916
1917        for (index, host) in hosts.iter().enumerate() {
1918            let mut cmd = Command::new(program.clone());
1919            let command = if index == 0 {
1920                let mut command = BootstrapCommand::from("sleep");
1921                command.args.push("60".to_string());
1922                Some(command)
1923            } else {
1924                None
1925            };
1926            let boot = Bootstrap::Host {
1927                addr: host.clone(),
1928                config: None,
1929                command,
1930                exit_on_shutdown: false,
1931            };
1932            boot.to_env(&mut cmd);
1933            cmd.kill_on_drop(true);
1934            children.push(cmd.spawn().unwrap());
1935        }
1936        let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1937
1938        let instance = testing::instance();
1939
1940        let err = host_mesh
1941            .spawn(&instance, "test", Extent::unity())
1942            .await
1943            .unwrap_err();
1944        let statuses = err.into_proc_spawn_error().unwrap();
1945        assert_matches!(
1946            &statuses.materialized_iter(2).cloned().collect::<Vec<_>>()[..],
1947            &[Status::Timeout(_), Status::Running]
1948        );
1949    }
1950
1951    #[tokio::test]
1952    #[cfg(fbcode_build)]
1953    async fn test_client_config_override() {
1954        let config = hyperactor_config::global::lock();
1955        let _guard1 = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1956        let _guard2 = config.override_key(
1957            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1958            Duration::from_mins(2),
1959        );
1960        let _guard3 = config.override_key(
1961            hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1962            Duration::from_mins(1),
1963        );
1964
1965        // Unset env vars that were mirrored by TestOverride, so child
1966        // processes don't inherit them. This allows Runtime layer to
1967        // override ClientOverride. SAFETY: Single-threaded test under
1968        // global config lock.
1969        unsafe {
1970            std::env::remove_var("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT");
1971            std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT");
1972        }
1973
1974        let instance = testing::instance();
1975
1976        let mut hm = testing::host_mesh(2).await;
1977        let proc_mesh = hm.spawn(instance, "test", Extent::unity()).await.unwrap();
1978
1979        let actor_mesh: ActorMesh<testactor::TestActor> =
1980            proc_mesh.spawn(instance, "test", &()).await.unwrap();
1981
1982        let mut attrs_override = Attrs::new();
1983        attrs_override.set(
1984            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1985            Duration::from_mins(3),
1986        );
1987        actor_mesh
1988            .cast(
1989                instance,
1990                SetConfigAttrs(bincode::serialize(&attrs_override).unwrap()),
1991            )
1992            .unwrap();
1993
1994        let (tx, mut rx) = instance.open_port();
1995        actor_mesh
1996            .cast(instance, GetConfigAttrs(tx.bind()))
1997            .unwrap();
1998        let actual_attrs = rx.recv().await.unwrap();
1999        let actual_attrs = bincode::deserialize::<Attrs>(&actual_attrs).unwrap();
2000
2001        assert_eq!(
2002            *actual_attrs
2003                .get(hyperactor::config::HOST_SPAWN_READY_TIMEOUT)
2004                .unwrap(),
2005            Duration::from_mins(3)
2006        );
2007        assert_eq!(
2008            *actual_attrs
2009                .get(hyperactor::config::MESSAGE_DELIVERY_TIMEOUT)
2010                .unwrap(),
2011            Duration::from_mins(1)
2012        );
2013
2014        let _ = hm.shutdown(instance).await;
2015    }
2016}