hyperactor_mesh/v1/
host_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use hyperactor::channel::ChannelTransport;
10pub mod mesh_agent;
11
12use std::collections::HashSet;
13use std::ops::Deref;
14use std::str::FromStr;
15use std::sync::Arc;
16
17use hyperactor::ActorRef;
18use hyperactor::Named;
19use hyperactor::ProcId;
20use hyperactor::channel::ChannelAddr;
21use hyperactor::context;
22use ndslice::Extent;
23use ndslice::Region;
24use ndslice::ViewExt;
25use ndslice::extent;
26use ndslice::view;
27use ndslice::view::Ranked;
28use ndslice::view::RegionParseError;
29use serde::Deserialize;
30use serde::Serialize;
31
32use crate::alloc::Alloc;
33use crate::bootstrap::BootstrapCommand;
34use crate::resource;
35use crate::resource::CreateOrUpdateClient;
36use crate::resource::GetRankStatusClient;
37use crate::resource::RankedValues;
38use crate::v1;
39use crate::v1::Name;
40use crate::v1::ProcMesh;
41use crate::v1::ProcMeshRef;
42pub use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
43use crate::v1::host_mesh::mesh_agent::HostMeshAgentProcMeshTrampoline;
44use crate::v1::host_mesh::mesh_agent::ShutdownHostClient;
45use crate::v1::proc_mesh::ProcRef;
46
47/// A reference to a single host.
48#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
49pub struct HostRef(ChannelAddr);
50
51impl HostRef {
52    /// The host mesh agent associated with this host.
53    fn mesh_agent(&self) -> ActorRef<HostMeshAgent> {
54        ActorRef::attest(self.service_proc().actor_id("agent", 0))
55    }
56
57    /// The ProcId for the proc with name `name` on this host.
58    fn named_proc(&self, name: &Name) -> ProcId {
59        ProcId::Direct(self.0.clone(), name.to_string())
60    }
61
62    /// The service proc on this host.
63    fn service_proc(&self) -> ProcId {
64        ProcId::Direct(self.0.clone(), "service".to_string())
65    }
66
67    /// Request an orderly teardown of this host and all procs it
68    /// spawned.
69    ///
70    /// This resolves the per-child grace **timeout** and the maximum
71    /// termination **concurrency** from config and sends a
72    /// [`ShutdownHost`] message to the host's agent. The agent then:
73    ///
74    /// 1) Performs a graceful termination pass over all tracked
75    ///    children (TERM → wait(`timeout`) → KILL), with at most
76    ///    `max_in_flight` running concurrently.
77    /// 2) After the pass completes, **drops the Host**, which also
78    ///    drops the embedded `BootstrapProcManager`. The manager's
79    ///    `Drop` serves as a last-resort safety net (it SIGKILLs
80    ///    anything that somehow remains).
81    ///
82    /// This call returns `Ok(()))` only after the agent has finished
83    /// the termination pass and released the host, so the host is no
84    /// longer reachable when this returns.
85    async fn shutdown(&self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
86        let agent = self.mesh_agent();
87        let terminate_timeout =
88            hyperactor::config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
89        let max_in_flight =
90            hyperactor::config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
91        agent
92            .shutdown_host(cx, terminate_timeout, max_in_flight.clamp(1, 256))
93            .await?;
94        Ok(())
95    }
96}
97
98impl std::fmt::Display for HostRef {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        self.0.fmt(f)
101    }
102}
103
104impl FromStr for HostRef {
105    type Err = <ChannelAddr as FromStr>::Err;
106
107    fn from_str(s: &str) -> Result<Self, Self::Err> {
108        Ok(HostRef(ChannelAddr::from_str(s)?))
109    }
110}
111
112/// An owned mesh of hosts.
113///
114/// # Lifecycle
115/// `HostMesh` owns host lifecycles. Callers **must** invoke
116/// [`HostMesh::shutdown`] for deterministic teardown. The `Drop` impl
117/// performs **best-effort** cleanup only (spawned via Tokio if
118/// available); it is a safety net, not a substitute for orderly
119/// shutdown.
120///
121/// In tests and production, prefer explicit shutdown to guarantee
122/// that host agents drop their `BootstrapProcManager`s and that all
123/// child procs are reaped.
124#[allow(dead_code)]
125pub struct HostMesh {
126    name: Name,
127    extent: Extent,
128    allocation: HostMeshAllocation,
129    current_ref: HostMeshRef,
130}
131
132/// Allocation backing for an owned [`HostMesh`].
133///
134/// This enum records how the underlying hosts were provisioned, which
135/// in turn determines how their lifecycle is managed:
136///
137/// - `ProcMesh`: Hosts were allocated intrinsically via a
138///   [`ProcMesh`]. The `HostMesh` owns the proc mesh and its service
139///   procs, and dropping the mesh ensures that all spawned child procs
140///   are terminated.
141/// - `Owned`: Hosts were constructed externally and "taken" under
142///   ownership. The `HostMesh` assumes responsibility for their
143///   lifecycle from this point forward, ensuring consistent cleanup on
144///   drop.
145///
146/// Additional variants may be added for other provisioning sources,
147/// but in all cases `HostMesh` is an owned resource that guarantees
148/// no leaked child processes.
149#[allow(dead_code)]
150enum HostMeshAllocation {
151    /// Hosts were allocated intrinsically via a [`ProcMesh`].
152    ///
153    /// In this mode, the `HostMesh` owns both the `ProcMesh` itself
154    /// and the service procs that implement each host. Dropping the
155    /// `HostMesh` also drops the embedded `ProcMesh`, ensuring that
156    /// all spawned child procs are terminated cleanly.
157    ProcMesh {
158        proc_mesh: ProcMesh,
159        proc_mesh_ref: ProcMeshRef,
160        hosts: Vec<HostRef>,
161    },
162    /// Hosts were constructed externally and explicitly transferred
163    /// under ownership by this `HostMesh`.
164    ///
165    /// In this mode, the `HostMesh` assumes responsibility for the
166    /// provided hosts going forward. Dropping the mesh guarantees
167    /// teardown of all associated state and signals to prevent any
168    /// leaked processes.
169    Owned { hosts: Vec<HostRef> },
170}
171
172impl HostMesh {
173    /// Allocate a host mesh from an [`Alloc`]. This creates a HostMesh with the same extent
174    /// as the provided alloc. Allocs generate procs, and thus we define and run a Host for each
175    /// proc allocated by it.
176    ///
177    /// ## Allocation strategy
178    ///
179    /// Because HostMeshes use direct-addressed procs, and must fully control the procs they are
180    /// managing, `HostMesh::allocate` uses a trampoline actor to launch the host, which in turn
181    /// runs a [`crate::v1::host_mesh::mesh_agent::HostMeshAgent`] actor to manage the host itself.
182    /// The host (and thus all of its procs) are exposed directly through a separate listening
183    /// channel, established by the host.
184    ///
185    /// ```text
186    ///                        ┌ ─ ─┌────────────────────┐
187    ///                             │allocated Proc:     │
188    ///                        │    │ ┌─────────────────┐│
189    ///                             │ │TrampolineActor  ││
190    ///                        │    │ │ ┌──────────────┐││
191    ///                             │ │ │Host          │││
192    ///               ┌────┬ ─ ┘    │ │ │ ┌──────────┐ │││
193    ///            ┌─▶│Proc│        │ │ │ │HostAgent │ │││
194    ///            │  └────┴ ─ ┐    │ │ │ └──────────┘ │││
195    ///            │  ┌────┐        │ │ │             ██████
196    /// ┌────────┐ ├─▶│Proc│   │    │ │ └──────────────┘││ ▲
197    /// │ Client │─┤  └────┘        │ └─────────────────┘│ listening channel
198    /// └────────┘ │  ┌────┐   └ ─ ─└────────────────────┘
199    ///            ├─▶│Proc│
200    ///            │  └────┘
201    ///            │  ┌────┐
202    ///            └─▶│Proc│
203    ///               └────┘
204    ///                 ▲
205    ///
206    ///          `Alloc`-provided
207    ///                procs
208    /// ```
209    ///
210    /// ## Lifecycle
211    ///
212    /// The returned `HostMesh` **owns** the underlying hosts. Call
213    /// [`shutdown`](Self::shutdown) to deterministically tear them
214    /// down. If you skip shutdown, `Drop` will attempt best-effort
215    /// cleanup only. Do not rely on `Drop` for correctness.
216    pub async fn allocate(
217        cx: &impl context::Actor,
218        alloc: Box<dyn Alloc + Send + Sync>,
219        name: &str,
220        bootstrap_params: Option<BootstrapCommand>,
221    ) -> v1::Result<Self> {
222        let transport = alloc.transport();
223        let extent = alloc.extent().clone();
224        let is_local = alloc.is_local();
225        let proc_mesh = ProcMesh::allocate(cx, alloc, name).await?;
226        let name = Name::new(name);
227
228        // TODO: figure out how to deal with MAST allocs. It requires an extra dimension,
229        // into which it launches multiple procs, so we need to always specify an additional
230        // sub-host dimension of size 1.
231
232        let (mesh_agents, mut mesh_agents_rx) = cx.mailbox().open_port();
233        let _trampoline_actor_mesh = proc_mesh
234            .spawn::<HostMeshAgentProcMeshTrampoline>(
235                cx,
236                "host_mesh_trampoline",
237                &(transport, mesh_agents.bind(), bootstrap_params, is_local),
238            )
239            .await?;
240
241        // TODO: don't re-rank the hosts
242        let mut hosts = Vec::new();
243        for _rank in 0..extent.num_ranks() {
244            let mesh_agent = mesh_agents_rx.recv().await?;
245
246            let Some((addr, _)) = mesh_agent.actor_id().proc_id().as_direct() else {
247                return Err(v1::Error::HostMeshAgentConfigurationError(
248                    mesh_agent.actor_id().clone(),
249                    "host mesh agent must be a direct actor".to_string(),
250                ));
251            };
252
253            let host_ref = HostRef(addr.clone());
254            if host_ref.mesh_agent() != mesh_agent {
255                return Err(v1::Error::HostMeshAgentConfigurationError(
256                    mesh_agent.actor_id().clone(),
257                    format!(
258                        "expected mesh agent actor id to be {}",
259                        host_ref.mesh_agent().actor_id()
260                    ),
261                ));
262            }
263            hosts.push(host_ref);
264        }
265
266        let proc_mesh_ref = proc_mesh.clone();
267        Ok(Self {
268            name,
269            extent: extent.clone(),
270            allocation: HostMeshAllocation::ProcMesh {
271                proc_mesh,
272                proc_mesh_ref,
273                hosts: hosts.clone(),
274            },
275            current_ref: HostMeshRef::new(extent.into(), hosts).unwrap(),
276        })
277    }
278
279    /// Take ownership of an existing host mesh reference.
280    ///
281    /// Consumes the `HostMeshRef`, captures its region/hosts, and
282    /// returns an owned `HostMesh` that assumes lifecycle
283    /// responsibility for those hosts (i.e., will shut them down on
284    /// Drop).
285    pub fn take(name: impl Into<Name>, mesh: HostMeshRef) -> Self {
286        let name = name.into();
287        let region = mesh.region().clone();
288        let hosts: Vec<HostRef> = mesh.values().collect();
289
290        let current_ref = HostMeshRef::new(region.clone(), hosts.clone())
291            .expect("region/hosts cardinality must match");
292
293        Self {
294            name,
295            extent: region.extent().clone(),
296            allocation: HostMeshAllocation::Owned { hosts },
297            current_ref,
298        }
299    }
300
301    /// Request a clean shutdown of all hosts owned by this
302    /// `HostMesh`.
303    ///
304    /// For each host, this sends `ShutdownHost` to its
305    /// `HostMeshAgent`. The agent takes and drops its `Host` (via
306    /// `Option::take()`), which in turn drops the embedded
307    /// `BootstrapProcManager`. On drop, the manager walks its PID
308    /// table and sends SIGKILL to any procs it spawned—tying proc
309    /// lifetimes to their hosts and preventing leaks.
310    pub async fn shutdown(&self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
311        let mut attempted = 0;
312        let mut ok = 0;
313        for host in self.current_ref.values() {
314            attempted += 1;
315            if let Err(e) = host.shutdown(cx).await {
316                tracing::warn!(host = %host, error = %e, "host shutdown failed");
317            } else {
318                ok += 1;
319            }
320        }
321        tracing::info!(attempted, ok, "hostmesh shutdown summary");
322        Ok(())
323    }
324}
325
326impl Deref for HostMesh {
327    type Target = HostMeshRef;
328
329    fn deref(&self) -> &Self::Target {
330        &self.current_ref
331    }
332}
333
334impl Drop for HostMesh {
335    /// Best-effort cleanup for owned host meshes on drop.
336    ///
337    /// When a `HostMesh` is dropped, it attempts to shut down all
338    /// hosts it owns:
339    /// - If a Tokio runtime is available, we spawn an ephemeral
340    ///   `Proc` + `Instance` and send `ShutdownHost` messages to each
341    ///   host. This ensures that the embedded `BootstrapProcManager`s
342    ///   are dropped, and all child procs they spawned are killed.
343    /// - If no runtime is available, we cannot perform async cleanup
344    ///   here; in that case we log a warning and rely on kernel-level
345    ///   PDEATHSIG or the individual `BootstrapProcManager`'s `Drop`
346    ///   as the final safeguard.
347    ///
348    /// This path is **last resort**: callers should prefer explicit
349    /// [`HostMesh::shutdown`] to guarantee orderly teardown. Drop
350    /// only provides opportunistic cleanup to prevent process leaks
351    /// if shutdown is skipped.
352    fn drop(&mut self) {
353        // Snapshot the owned hosts we're responsible for.
354        let hosts: Vec<HostRef> = match &self.allocation {
355            HostMeshAllocation::ProcMesh { hosts, .. } | HostMeshAllocation::Owned { hosts } => {
356                hosts.clone()
357            }
358        };
359
360        // Best-effort only when a Tokio runtime is available.
361        if let Ok(handle) = tokio::runtime::Handle::try_current() {
362            let mesh_name = self.name.clone();
363            let allocation_label = match &self.allocation {
364                HostMeshAllocation::ProcMesh { .. } => "proc_mesh",
365                HostMeshAllocation::Owned { .. } => "owned",
366            }
367            .to_string();
368
369            handle.spawn(async move {
370                let span = tracing::info_span!(
371                    "hostmesh_drop_cleanup",
372                    %mesh_name,
373                    allocation = %allocation_label,
374                    hosts = hosts.len(),
375                );
376                let _g = span.enter();
377
378                // Spin up a tiny ephemeral proc+instance to get an
379                // Actor context.
380                match hyperactor::Proc::direct(
381                    ChannelTransport::Unix.any(),
382                    "hostmesh-drop".to_string(),
383                )
384                    .await
385                {
386                    Err(e) => {
387                        tracing::warn!(
388                            error = %e,
389                            "failed to construct ephemeral Proc for drop-cleanup; \
390                             relying on PDEATHSIG/manager Drop"
391                        );
392                    }
393                    Ok(proc) => {
394                        match proc.instance("drop") {
395                            Err(e) => {
396                                tracing::warn!(
397                                    error = %e,
398                                    "failed to create ephemeral instance for drop-cleanup; \
399                                     relying on PDEATHSIG/manager Drop"
400                                );
401                            }
402                            Ok((instance, _guard)) => {
403                                let mut attempted = 0usize;
404                                let mut ok = 0usize;
405                                let mut err = 0usize;
406
407                                for host in hosts {
408                                    attempted += 1;
409                                    tracing::debug!(host = %host, "drop-cleanup: shutdown start");
410                                    match host.shutdown(&instance).await {
411                                        Ok(()) => {
412                                            ok += 1;
413                                            tracing::debug!(host = %host, "drop-cleanup: shutdown ok");
414                                        }
415                                        Err(e) => {
416                                            err += 1;
417                                            tracing::warn!(host = %host, error = %e, "drop-cleanup: shutdown failed");
418                                        }
419                                    }
420                                }
421
422                                tracing::info!(
423                                    attempted, ok, err,
424                                    "hostmesh drop-cleanup summary"
425                                );
426                            }
427                        }
428                    }
429                }
430            });
431        } else {
432            // No runtime here; PDEATHSIG and manager Drop remain the
433            // last-resort safety net.
434            tracing::warn!(
435                hosts = hosts.len(),
436                "HostMesh dropped without a tokio runtime; skipping best-effort shutdown"
437            );
438        }
439    }
440}
441
442/// A non-owning reference to a mesh of hosts.
443///
444/// Logically, this is a data structure that contains a set of ranked
445/// hosts organized into a [`Region`]. `HostMeshRef`s can be sliced to
446/// produce new references that contain a subset of the hosts in the
447/// original mesh.
448///
449/// `HostMeshRef`s have a concrete syntax, implemented by its
450/// `Display` and `FromStr` implementations.
451///
452/// This type does **not** control lifecycle. It only describes the
453/// topology of hosts. To take ownership and perform deterministic
454/// teardown, use [`HostMesh::take`], which returns an owned
455/// [`HostMesh`] that guarantees cleanup on `shutdown()` or `Drop`.
456///
457/// Cloning this type does not confer ownership. If a corresponding
458/// owned [`HostMesh`] shuts down the hosts, operations via a cloned
459/// `HostMeshRef` may fail because the hosts are no longer running.
460#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
461pub struct HostMeshRef {
462    region: Region,
463    ranks: Arc<Vec<HostRef>>,
464}
465
466impl HostMeshRef {
467    /// Create a new (raw) HostMeshRef from the provided region and associated
468    /// ranks, which must match in cardinality.
469    fn new(region: Region, ranks: Vec<HostRef>) -> v1::Result<Self> {
470        if region.num_ranks() != ranks.len() {
471            return Err(v1::Error::InvalidRankCardinality {
472                expected: region.num_ranks(),
473                actual: ranks.len(),
474            });
475        }
476        Ok(Self {
477            region,
478            ranks: Arc::new(ranks),
479        })
480    }
481
482    /// Create a new HostMeshRef from an arbitrary set of hosts. This is meant to
483    /// enable extrinsic bootstrapping.
484    pub fn from_hosts(hosts: Vec<ChannelAddr>) -> Self {
485        Self {
486            region: extent!(hosts = hosts.len()).into(),
487            ranks: Arc::new(hosts.into_iter().map(HostRef).collect()),
488        }
489    }
490
491    /// Spawn a ProcMesh onto this host mesh. The per_host extent specifies the shape
492    /// of the procs to spawn on each host.
493    ///
494    /// Currently, spawn issues direct calls to each host agent. This will be fixed by
495    /// maintaining a comm actor on the host service procs themselves.
496    pub async fn spawn(
497        &self,
498        cx: &impl context::Actor,
499        name: &str,
500        per_host: Extent,
501    ) -> v1::Result<ProcMesh> {
502        let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
503        let host_labels = self.region.labels().iter().collect::<HashSet<_>>();
504        if !per_host_labels
505            .intersection(&host_labels)
506            .collect::<Vec<_>>()
507            .is_empty()
508        {
509            return Err(v1::Error::ConfigurationError(anyhow::anyhow!(
510                "per_host dims overlap with existing dims when spawning proc mesh"
511            )));
512        }
513
514        let extent = self
515            .region
516            .extent()
517            .concat(&per_host)
518            .map_err(|err| v1::Error::ConfigurationError(err.into()))?;
519
520        let mesh_name = Name::new(name);
521        let mut procs = Vec::new();
522        let num_ranks = self.region().num_ranks() * per_host.num_ranks();
523        let (port, mut rx) = cx.mailbox().open_accum_port(RankedValues::default());
524        // We CreateOrUpdate each proc, and then fence on getting statuses back.
525        // This is currently necessary because otherwise there is a race between
526        // the procs being created, and subsequent messages becoming unroutable
527        // (the agent actor manages the local muxer). We can solve this by allowing
528        // buffering in the host-level muxer.
529        for (host_rank, host) in self.ranks.iter().enumerate() {
530            for per_host_rank in 0..per_host.num_ranks() {
531                let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
532                let proc_name = Name::new(format!("{}-{}", name, per_host_rank));
533                host.mesh_agent()
534                    .create_or_update(cx, proc_name.clone(), resource::Rank::new(create_rank), ())
535                    .await
536                    .map_err(|e| {
537                        v1::Error::HostMeshAgentConfigurationError(
538                            host.mesh_agent().actor_id().clone(),
539                            format!("failed while creating proc: {}", e),
540                        )
541                    })?;
542                host.mesh_agent()
543                    .get_rank_status(cx, proc_name.clone(), port.bind())
544                    .await
545                    .map_err(|e| {
546                        v1::Error::HostMeshAgentConfigurationError(
547                            host.mesh_agent().actor_id().clone(),
548                            format!("failed while querying proc status: {}", e),
549                        )
550                    })?;
551                procs.push(ProcRef::new(
552                    host.named_proc(&proc_name),
553                    create_rank,
554                    // TODO: specify or retrieve from state instead, to avoid attestation.
555                    ActorRef::attest(host.named_proc(&proc_name).actor_id("agent", 0)),
556                ));
557            }
558        }
559
560        // fence: wait for everyone to report back.
561        loop {
562            let statuses = rx.recv().await?;
563            if let Some((ranks, status)) =
564                statuses.iter().find(|(_, status)| status.is_terminating())
565            {
566                let rank = ranks.start;
567                let proc_name = Name::new(format!("{}-{}", name, rank % per_host.num_ranks()));
568                return Err(v1::Error::ProcCreationError {
569                    proc_name,
570                    mesh_agent: self.ranks[rank].mesh_agent(),
571                    host_rank: rank / per_host.num_ranks(),
572                    status: status.clone(),
573                });
574            }
575
576            if statuses.rank(num_ranks) == num_ranks {
577                break;
578            }
579        }
580
581        ProcMesh::create_owned_unchecked(cx, mesh_name, extent, self.clone(), procs).await
582    }
583}
584
585impl view::Ranked for HostMeshRef {
586    type Item = HostRef;
587
588    fn region(&self) -> &Region {
589        &self.region
590    }
591
592    fn get(&self, rank: usize) -> Option<&Self::Item> {
593        self.ranks.get(rank)
594    }
595}
596
597impl view::RankedSliceable for HostMeshRef {
598    fn sliced(&self, region: Region) -> Self {
599        let ranks = self
600            .region()
601            .remap(&region)
602            .unwrap()
603            .map(|index| self.get(index).unwrap().clone());
604        Self::new(region, ranks.collect()).unwrap()
605    }
606}
607
608impl std::fmt::Display for HostMeshRef {
609    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
610        for (rank, host) in self.ranks.iter().enumerate() {
611            if rank > 0 {
612                write!(f, ",")?;
613            }
614            write!(f, "{}", host)?;
615        }
616        write!(f, "@{}", self.region)
617    }
618}
619
620/// The type of error occuring during `HostMeshRef` parsing.
621#[derive(thiserror::Error, Debug)]
622pub enum HostMeshRefParseError {
623    #[error(transparent)]
624    RegionParseError(#[from] RegionParseError),
625
626    #[error("invalid host mesh ref: missing region")]
627    MissingRegion,
628
629    #[error(transparent)]
630    InvalidHostMeshRef(#[from] Box<v1::Error>),
631
632    #[error(transparent)]
633    Other(#[from] anyhow::Error),
634}
635
636impl From<v1::Error> for HostMeshRefParseError {
637    fn from(err: v1::Error) -> Self {
638        Self::InvalidHostMeshRef(Box::new(err))
639    }
640}
641
642impl FromStr for HostMeshRef {
643    type Err = HostMeshRefParseError;
644
645    fn from_str(s: &str) -> Result<Self, Self::Err> {
646        let (hosts, region) = s
647            .split_once('@')
648            .ok_or(HostMeshRefParseError::MissingRegion)?;
649        let hosts = hosts
650            .split(',')
651            .map(|host| host.trim())
652            .map(|host| host.parse::<HostRef>())
653            .collect::<Result<Vec<_>, _>>()?;
654        let region = region.parse()?;
655        Ok(HostMeshRef::new(region, hosts)?)
656    }
657}
658
659#[cfg(test)]
660mod tests {
661    use std::assert_matches::assert_matches;
662    use std::collections::HashSet;
663    use std::collections::VecDeque;
664
665    use hyperactor::context::Mailbox as _;
666    use itertools::Itertools;
667    use ndslice::ViewExt;
668    use ndslice::extent;
669    use tokio::process::Command;
670
671    use super::*;
672    use crate::Bootstrap;
673    use crate::v1::ActorMesh;
674    use crate::v1::testactor;
675    use crate::v1::testing;
676
677    #[test]
678    fn test_host_mesh_subset() {
679        let hosts: HostMeshRef = "local:1,local:2,local:3,local:4@replica=2/2,host=2/1"
680            .parse()
681            .unwrap();
682        assert_eq!(
683            hosts.range("replica", 1).unwrap().to_string(),
684            "local:3,local:4@2+replica=1/2,host=2/1"
685        );
686    }
687
688    #[test]
689    fn test_host_mesh_ref_parse_roundtrip() {
690        let host_mesh_ref = HostMeshRef::new(
691            extent!(replica = 2, host = 2).into(),
692            vec![
693                "tcp:127.0.0.1:123".parse().unwrap(),
694                "tcp:127.0.0.1:123".parse().unwrap(),
695                "tcp:127.0.0.1:123".parse().unwrap(),
696                "tcp:127.0.0.1:123".parse().unwrap(),
697            ],
698        )
699        .unwrap();
700
701        assert_eq!(
702            host_mesh_ref.to_string().parse::<HostMeshRef>().unwrap(),
703            host_mesh_ref
704        );
705    }
706
707    #[tokio::test]
708    async fn test_allocate() {
709        let config = hyperactor::config::global::lock();
710        let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
711
712        let instance = testing::instance().await;
713
714        for alloc in testing::allocs(extent!(replicas = 4)).await {
715            let host_mesh = HostMesh::allocate(instance, alloc, "test", None)
716                .await
717                .unwrap();
718
719            let proc_mesh1 = host_mesh
720                .spawn(instance, "test_1", Extent::unity())
721                .await
722                .unwrap();
723
724            let actor_mesh1: ActorMesh<testactor::TestActor> =
725                proc_mesh1.spawn(instance, "test", &()).await.unwrap();
726
727            let proc_mesh2 = host_mesh
728                .spawn(instance, "test_2", extent!(gpus = 3, extra = 2))
729                .await
730                .unwrap();
731            assert_eq!(
732                proc_mesh2.extent(),
733                extent!(replicas = 4, gpus = 3, extra = 2)
734            );
735            assert_eq!(proc_mesh2.values().count(), 24);
736
737            let actor_mesh2: ActorMesh<testactor::TestActor> =
738                proc_mesh2.spawn(instance, "test", &()).await.unwrap();
739            assert_eq!(
740                actor_mesh2.extent(),
741                extent!(replicas = 4, gpus = 3, extra = 2)
742            );
743            assert_eq!(actor_mesh2.values().count(), 24);
744
745            // Host meshes can be dereferenced to produce a concrete ref.
746            let host_mesh_ref: HostMeshRef = host_mesh.clone();
747            // Here, the underlying host mesh does not change:
748            assert_eq!(
749                host_mesh_ref.iter().collect::<Vec<_>>(),
750                host_mesh.iter().collect::<Vec<_>>(),
751            );
752
753            // Validate we can cast:
754            for actor_mesh in [&actor_mesh1, &actor_mesh2] {
755                let (port, mut rx) = instance.mailbox().open_port();
756                actor_mesh
757                    .cast(instance, testactor::GetActorId(port.bind()))
758                    .unwrap();
759
760                let mut expected_actor_ids: HashSet<_> = actor_mesh
761                    .values()
762                    .map(|actor_ref| actor_ref.actor_id().clone())
763                    .collect();
764
765                while !expected_actor_ids.is_empty() {
766                    let actor_id = rx.recv().await.unwrap();
767                    assert!(
768                        expected_actor_ids.remove(&actor_id),
769                        "got {actor_id}, expect {expected_actor_ids:?}"
770                    );
771                }
772            }
773
774            // Now forward a message through all directed edges across the two meshes.
775            // This tests the full connectivity of all the hosts, procs, and actors
776            // involved in these two meshes.
777            let mut to_visit: VecDeque<_> = actor_mesh1
778                .values()
779                .chain(actor_mesh2.values())
780                .map(|actor_ref| actor_ref.port())
781                // Each ordered pair of ports
782                .permutations(2)
783                // Flatten them to create a path:
784                .flatten()
785                .collect();
786
787            let expect_visited: Vec<_> = to_visit.clone().into();
788
789            // We are going to send to the first, and then set up a port to receive the last.
790            let (last, mut last_rx) = instance.mailbox().open_port();
791            to_visit.push_back(last.bind());
792
793            let forward = testactor::Forward {
794                to_visit,
795                visited: Vec::new(),
796            };
797            let first = forward.to_visit.front().unwrap().clone();
798            first.send(instance, forward).unwrap();
799
800            let forward = last_rx.recv().await.unwrap();
801            assert_eq!(forward.visited, expect_visited);
802
803            let _ = host_mesh.shutdown(&instance).await;
804        }
805    }
806
807    /// Allocate a new port on localhost. This drops the listener, releasing the socket,
808    /// before returning. Hyperactor's channel::net applies SO_REUSEADDR, so we do not hav
809    /// to wait out the socket's TIMED_WAIT state.
810    ///
811    /// Even so, this is racy.
812    fn free_localhost_addr() -> ChannelAddr {
813        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
814        ChannelAddr::Tcp(listener.local_addr().unwrap())
815    }
816
817    #[tokio::test]
818    async fn test_extrinsic_allocation() {
819        let config = hyperactor::config::global::lock();
820        let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
821
822        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
823
824        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
825
826        let mut children = Vec::new();
827        for host in hosts.iter() {
828            let mut cmd = Command::new(program.clone());
829            let boot = Bootstrap::Host {
830                addr: host.clone(),
831                command: None, // use current binary
832                config: None,
833            };
834            boot.to_env(&mut cmd);
835            cmd.kill_on_drop(true);
836            children.push(cmd.spawn().unwrap());
837        }
838
839        let instance = testing::instance().await;
840        let host_mesh = HostMeshRef::from_hosts(hosts);
841
842        let proc_mesh = host_mesh
843            .spawn(&testing::instance().await, "test", Extent::unity())
844            .await
845            .unwrap();
846
847        let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
848            .spawn(&testing::instance().await, "test", &())
849            .await
850            .unwrap();
851
852        testactor::assert_mesh_shape(actor_mesh).await;
853
854        HostMesh::take(Name::new("extrinsic"), host_mesh)
855            .shutdown(&instance)
856            .await
857            .expect("hosts shutdown");
858    }
859
860    #[tokio::test]
861    async fn test_failing_proc_allocation() {
862        let program = buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap();
863
864        let hosts = vec![free_localhost_addr(), free_localhost_addr()];
865
866        let mut children = Vec::new();
867        for host in hosts.iter() {
868            let mut cmd = Command::new(program.clone());
869            let boot = Bootstrap::Host {
870                addr: host.clone(),
871                config: None,
872                // The entire purpose of this is to fail:
873                command: Some(BootstrapCommand::from("/bin/false")),
874            };
875            boot.to_env(&mut cmd);
876            cmd.kill_on_drop(true);
877            children.push(cmd.spawn().unwrap());
878        }
879        let host_mesh = HostMeshRef::from_hosts(hosts);
880
881        let instance = testing::instance().await;
882
883        let err = host_mesh
884            .spawn(&instance, "test", Extent::unity())
885            .await
886            .unwrap_err();
887        assert_matches!(
888            err, v1::Error::ProcCreationError { status: resource::Status::Failed(msg), .. }
889            if msg.contains("failed to configure process: Terminal(Stopped { exit_code: 1")
890        );
891    }
892}