hyperactor/
host.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module defines [`Host`], which represents all the procs running on a host.
10//! The procs themselves are managed by an implementation of [`ProcManager`], which may,
11//! for example, fork new processes for each proc, or spawn them in the same process
12//! for testing purposes.
13//!
14//! The primary purpose of a host is to manage the lifecycle of these procs, and to
15//! serve as a single front-end for all the procs on a host, multiplexing network
16//! channels.
17//!
18//! ## Channel muxing
19//!
20//! A [`Host`] maintains a single frontend address, through which all procs are accessible
21//! through direct addressing: the id of each proc is the `ProcId::Direct(frontend_addr, proc_name)`.
22//! In the following, the frontend address is denoted by `*`. The host listens on `*` and
23//! multiplexes messages based on the proc name. When spawning procs, the host maintains
24//! backend channels with separate addresses. In the diagram `#` is the backend address of
25//! the host, while `#n` is the backend address for proc *n*. The host forwards messages
26//! to the appropriate backend channel, while procs forward messages to the host backend
27//! channel at `#`.
28//!
29//! ```text
30//!                      ┌────────────┐
31//!                  ┌───▶  proc *,1  │
32//!                  │ #1└────────────┘
33//!                  │
34//!  ┌──────────┐    │   ┌────────────┐
35//!  │   Host   │◀───┼───▶  proc *,2  │
36//! *└──────────┘#   │ #2└────────────┘
37//!                  │
38//!                  │   ┌────────────┐
39//!                  └───▶  proc *,3  │
40//!                    #3└────────────┘
41//! ```
42
43use std::collections::HashMap;
44use std::fmt;
45use std::marker::PhantomData;
46use std::str::FromStr;
47use std::sync::Arc;
48use std::time::Duration;
49
50use async_trait::async_trait;
51use futures::Future;
52use futures::StreamExt;
53use futures::stream;
54use tokio::process::Child;
55use tokio::process::Command;
56use tokio::sync::Mutex;
57
58use crate::Actor;
59use crate::ActorHandle;
60use crate::ActorId;
61use crate::ActorRef;
62use crate::PortHandle;
63use crate::Proc;
64use crate::ProcId;
65use crate::actor::Binds;
66use crate::actor::Referable;
67use crate::channel;
68use crate::channel::ChannelAddr;
69use crate::channel::ChannelError;
70use crate::channel::ChannelTransport;
71use crate::channel::Rx;
72use crate::channel::Tx;
73use crate::clock::Clock;
74use crate::clock::RealClock;
75use crate::context;
76use crate::mailbox::BoxableMailboxSender;
77use crate::mailbox::DialMailboxRouter;
78use crate::mailbox::IntoBoxedMailboxSender as _;
79use crate::mailbox::MailboxClient;
80use crate::mailbox::MailboxSender;
81use crate::mailbox::MailboxServer;
82use crate::mailbox::MailboxServerHandle;
83use crate::mailbox::MessageEnvelope;
84use crate::mailbox::Undeliverable;
85
86/// The type of error produced by host operations.
87#[derive(Debug, thiserror::Error)]
88pub enum HostError {
89    /// A channel error occurred during a host operation.
90    #[error(transparent)]
91    ChannelError(#[from] ChannelError),
92
93    /// The named proc already exists and cannot be spawned.
94    #[error("proc '{0}' already exists")]
95    ProcExists(String),
96
97    /// Failures occuring while spawning a subprocess.
98    #[error("proc '{0}' failed to spawn process: {1}")]
99    ProcessSpawnFailure(ProcId, #[source] std::io::Error),
100
101    /// Failures occuring while configuring a subprocess.
102    #[error("proc '{0}' failed to configure process: {1}")]
103    ProcessConfigurationFailure(ProcId, #[source] anyhow::Error),
104
105    /// Failures occuring while spawning a management actor in a proc.
106    #[error("failed to spawn agent on proc '{0}': {1}")]
107    AgentSpawnFailure(ProcId, #[source] anyhow::Error),
108
109    /// An input parameter was missing.
110    #[error("parameter '{0}' missing: {1}")]
111    MissingParameter(String, std::env::VarError),
112
113    /// An input parameter was invalid.
114    #[error("parameter '{0}' invalid: {1}")]
115    InvalidParameter(String, anyhow::Error),
116}
117
118/// A host, managing the lifecycle of several procs, and their backend
119/// routing, as described in this module's documentation.
120#[derive(Debug)]
121pub struct Host<M> {
122    procs: HashMap<String, ChannelAddr>,
123    frontend_addr: ChannelAddr,
124    backend_addr: ChannelAddr,
125    router: DialMailboxRouter,
126    manager: M,
127    service_proc: Proc,
128}
129
130impl<M: ProcManager> Host<M> {
131    /// Serve a host using the provided ProcManager, on the provided `addr`.
132    /// On success, the host will multiplex messages for procs on the host
133    /// on the address of the host.
134    #[tracing::instrument(skip(manager))]
135    pub async fn serve(
136        manager: M,
137        addr: ChannelAddr,
138    ) -> Result<(Self, MailboxServerHandle), HostError> {
139        let (frontend_addr, frontend_rx) = channel::serve(addr)?;
140
141        // We set up a cascade of routers: first, the outer router supports
142        // sending to the the system proc, while the dial router manages dialed
143        // connections.
144        let router = DialMailboxRouter::new();
145
146        // Establish a backend channel on the preferred transport. We currently simply
147        // serve the same router on both.
148        let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?;
149
150        // Set up a system proc. This is often used to manage the host itself.
151        let service_proc_id = ProcId::Direct(frontend_addr.clone(), "service".to_string());
152        let service_proc = Proc::new(service_proc_id.clone(), router.boxed());
153
154        tracing::info!(
155            frontend_addr = frontend_addr.to_string(),
156            backend_addr = backend_addr.to_string(),
157            service_proc_id = service_proc_id.to_string(),
158            "serving host"
159        );
160
161        let host = Host {
162            procs: HashMap::new(),
163            frontend_addr,
164            backend_addr,
165            router: router.clone(),
166            manager,
167            service_proc: service_proc.clone(),
168        };
169
170        let router = ProcOrDial {
171            proc: service_proc,
172            router,
173        };
174
175        // Serve the same router on both frontend and backend addresses.
176        let _backend_handle = router.clone().serve(backend_rx);
177        let frontend_handle = router.serve(frontend_rx);
178
179        Ok((host, frontend_handle))
180    }
181
182    /// The underlying proc manager.
183    pub fn manager(&self) -> &M {
184        &self.manager
185    }
186
187    /// The address which accepts messages destined for this host.
188    pub fn addr(&self) -> &ChannelAddr {
189        &self.frontend_addr
190    }
191
192    /// The system proc associated with this host.
193    pub fn system_proc(&self) -> &Proc {
194        &self.service_proc
195    }
196
197    /// Spawn a new process with the given `name`. On success, the proc has been
198    /// spawned, and is reachable through the returned, direct-addressed ProcId,
199    /// which will be `ProcId::Direct(self.addr(), name)`.
200    pub async fn spawn(
201        &mut self,
202        name: String,
203        config: M::Config,
204    ) -> Result<(ProcId, ActorRef<ManagerAgent<M>>), HostError> {
205        if self.procs.contains_key(&name) {
206            return Err(HostError::ProcExists(name));
207        }
208
209        let proc_id = ProcId::Direct(self.frontend_addr.clone(), name.clone());
210        let handle = self
211            .manager
212            .spawn(proc_id.clone(), self.backend_addr.clone(), config)
213            .await?;
214
215        // Await readiness (config-driven; 0s disables timeout).
216        let to: Duration = crate::config::global::get(crate::config::HOST_SPAWN_READY_TIMEOUT);
217        let ready: Result<(), HostError> = if to == Duration::from_secs(0) {
218            handle.ready().await.map_err(|e| {
219                HostError::ProcessConfigurationFailure(proc_id.clone(), anyhow::anyhow!("{e:?}"))
220            })
221        } else {
222            match RealClock.timeout(to, handle.ready()).await {
223                Ok(Ok(())) => Ok(()),
224                Ok(Err(e)) => Err(HostError::ProcessConfigurationFailure(
225                    proc_id.clone(),
226                    anyhow::anyhow!("{e:?}"),
227                )),
228                Err(_) => Err(HostError::ProcessConfigurationFailure(
229                    proc_id.clone(),
230                    anyhow::anyhow!(format!("timeout waiting for Ready after {to:?}")),
231                )),
232            }
233        };
234
235        ready?;
236
237        // After Ready, addr() + agent_ref() must be present.
238        let addr = handle.addr().ok_or_else(|| {
239            HostError::ProcessConfigurationFailure(
240                proc_id.clone(),
241                anyhow::anyhow!("proc reported Ready but no addr() available"),
242            )
243        })?;
244        let agent_ref = handle.agent_ref().ok_or_else(|| {
245            HostError::ProcessConfigurationFailure(
246                proc_id.clone(),
247                anyhow::anyhow!("proc reported Ready but no agent_ref() available"),
248            )
249        })?;
250
251        self.router.bind(proc_id.clone().into(), addr.clone());
252        self.procs.insert(name, addr);
253
254        Ok((proc_id, agent_ref))
255    }
256}
257
258/// A router used to route to the system proc, or else fall back to
259/// the dial mailbox router.
260#[derive(Debug, Clone)]
261struct ProcOrDial {
262    proc: Proc,
263    router: DialMailboxRouter,
264}
265
266impl MailboxSender for ProcOrDial {
267    fn post_unchecked(
268        &self,
269        envelope: MessageEnvelope,
270        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
271    ) {
272        if envelope.dest().actor_id().proc_id() == self.proc.proc_id() {
273            self.proc.post_unchecked(envelope, return_handle);
274        } else {
275            self.router.post_unchecked(envelope, return_handle)
276        }
277    }
278}
279
280/// Error returned by [`ProcHandle::ready`].
281#[derive(Debug, Clone)]
282pub enum ReadyError<TerminalStatus> {
283    /// The proc reached a terminal state before becoming Ready.
284    Terminal(TerminalStatus),
285    /// Implementation lost its status channel / cannot observe state.
286    ChannelClosed,
287}
288
289/// Error returned by [`ProcHandle::wait`].
290#[derive(Debug, Clone)]
291pub enum WaitError {
292    /// Implementation lost its status channel / cannot observe state.
293    ChannelClosed,
294}
295
296/// Error returned by [`ProcHandle::terminate`] and
297/// [`ProcHandle::kill`].
298///
299/// - `Unsupported`: the manager cannot perform the requested proc
300///   signaling (e.g., local/in-process manager that doesn't emulate
301///   kill).
302/// - `AlreadyTerminated(term)`: the proc was already terminal; `term`
303///   is the same value `wait()` would return.
304/// - `ChannelClosed`: the manager lost its lifecycle channel and
305///   cannot reliably observe state transitions.
306/// - `Io(err)`: manager-specific failure delivering the signal or
307///   performing shutdown (e.g., OS error on kill).
308#[derive(Debug)]
309pub enum TerminateError<TerminalStatus> {
310    /// Manager doesn't support signaling (e.g., Local manager).
311    Unsupported,
312    /// A terminal state was already reached while attempting
313    /// terminate/kill.
314    AlreadyTerminated(TerminalStatus),
315    /// Implementation lost its status channel / cannot observe state.
316    ChannelClosed,
317    /// Manager-specific failure to deliver signal or perform
318    /// shutdown.
319    Io(anyhow::Error),
320}
321
322impl<T: fmt::Debug> fmt::Display for TerminateError<T> {
323    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324        match self {
325            TerminateError::Unsupported => write!(f, "terminate/kill unsupported by manager"),
326            TerminateError::AlreadyTerminated(st) => {
327                write!(f, "proc already terminated (status: {st:?})")
328            }
329            TerminateError::ChannelClosed => {
330                write!(f, "lifecycle channel closed; cannot observe state")
331            }
332            TerminateError::Io(err) => write!(f, "I/O error during terminate/kill: {err}"),
333        }
334    }
335}
336
337impl<T: fmt::Debug> std::error::Error for TerminateError<T> {
338    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
339        match self {
340            TerminateError::Io(err) => Some(err.root_cause()),
341            _ => None,
342        }
343    }
344}
345
346/// Summary of results from a bulk termination attempt.
347///
348/// - `attempted`: total number of child procs for which termination
349///   was attempted.
350/// - `ok`: number of procs successfully terminated (includes those
351///   that were already in a terminal state).
352/// - `failed`: number of procs that could not be terminated (e.g.
353///   signaling errors or lost lifecycle channel).
354#[derive(Debug)]
355pub struct TerminateSummary {
356    /// Total number of child procs for which termination was
357    /// attempted.
358    pub attempted: usize,
359    /// Number of procs that successfully reached a terminal state.
360    ///
361    /// This count includes both procs that exited cleanly after
362    /// `terminate(timeout)` and those that were already in a terminal
363    /// state before termination was attempted.
364    pub ok: usize,
365    /// Number of procs that failed to terminate.
366    ///
367    /// Failures typically arise from signaling errors (e.g., OS
368    /// failure to deliver SIGTERM/SIGKILL) or a lost lifecycle
369    /// channel, meaning the manager could no longer observe state
370    /// transitions.
371    pub failed: usize,
372}
373
374impl fmt::Display for TerminateSummary {
375    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376        write!(
377            f,
378            "attempted={} ok={} failed={}",
379            self.attempted, self.ok, self.failed
380        )
381    }
382}
383
384#[async_trait::async_trait]
385/// Trait for terminating a single proc.
386pub trait SingleTerminate: Send + Sync {
387    /// Gracefully terminate the given proc.
388    ///
389    /// Initiates a polite shutdown for each child, waits up to
390    /// `timeout` for completion, then escalates to a forceful stop
391    /// The returned [`TerminateSummary`] reports how
392    /// many children were attempted, succeeded, and failed.
393    ///
394    /// Implementation notes:
395    /// - "Polite shutdown" and "forceful stop" are intentionally
396    ///   abstract. Implementors should map these to whatever
397    ///   semantics they control (e.g., proc-level drain/abort, RPCs,
398    ///   OS signals).
399    /// - The operation must be idempotent and tolerate races with
400    ///   concurrent termination or external exits.
401    ///
402    /// # Parameters
403    /// - `timeout`: Per-child grace period before escalation to a
404    ///   forceful stop.
405    /// Returns a tuple of (polite shutdown actors vec, forceful stop actors vec)
406    async fn terminate_proc(
407        &self,
408        cx: &impl context::Actor,
409        proc: &ProcId,
410        timeout: std::time::Duration,
411    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error>;
412}
413
414/// Trait for managers that can terminate many child **units** in
415/// bulk.
416///
417/// Implementors provide a concurrency-bounded, graceful shutdown over
418/// all currently tracked children (polite stop → wait → forceful
419/// stop), returning a summary of outcomes. The exact stop/kill
420/// semantics are manager-specific: for example, an OS-process manager
421/// might send signals, while an in-process manager might drain/abort
422/// tasks.
423#[async_trait::async_trait]
424pub trait BulkTerminate: Send + Sync {
425    /// Gracefully terminate all known children.
426    ///
427    /// Initiates a polite shutdown for each child, waits up to
428    /// `timeout` for completion, then escalates to a forceful stop
429    /// for any that remain. Work may be done in parallel, capped by
430    /// `max_in_flight`. The returned [`TerminateSummary`] reports how
431    /// many children were attempted, succeeded, and failed.
432    ///
433    /// Implementation notes:
434    /// - "Polite shutdown" and "forceful stop" are intentionally
435    ///   abstract. Implementors should map these to whatever
436    ///   semantics they control (e.g., proc-level drain/abort, RPCs,
437    ///   OS signals).
438    /// - The operation must be idempotent and tolerate races with
439    ///   concurrent termination or external exits.
440    ///
441    /// # Parameters
442    /// - `timeout`: Per-child grace period before escalation to a
443    ///   forceful stop.
444    /// - `max_in_flight`: Upper bound on concurrent terminations (≥
445    ///   1) to prevent resource spikes (I/O, CPU, file descriptors,
446    ///   etc.).
447    async fn terminate_all(
448        &self,
449        cx: &impl context::Actor,
450        timeout: std::time::Duration,
451        max_in_flight: usize,
452    ) -> TerminateSummary;
453}
454
455// Host convenience that's available only when its manager supports
456// bulk termination.
457impl<M: ProcManager + BulkTerminate> Host<M> {
458    /// Gracefully terminate all procs spawned by this host.
459    ///
460    /// Delegates to the underlying manager’s
461    /// [`BulkTerminate::terminate_all`] implementation. Use this to
462    /// perform orderly teardown during scale-down or shutdown.
463    ///
464    /// # Parameters
465    /// - `timeout`: Per-child grace period before escalation.
466    /// - `max_in_flight`: Upper bound on concurrent terminations.
467    ///
468    /// # Returns
469    /// A [`TerminateSummary`] with counts of attempted/ok/failed
470    /// terminations.
471    pub async fn terminate_children(
472        &self,
473        cx: &impl context::Actor,
474        timeout: Duration,
475        max_in_flight: usize,
476    ) -> TerminateSummary {
477        self.manager.terminate_all(cx, timeout, max_in_flight).await
478    }
479}
480
481#[async_trait::async_trait]
482impl<M: ProcManager + SingleTerminate> SingleTerminate for Host<M> {
483    async fn terminate_proc(
484        &self,
485        cx: &impl context::Actor,
486        proc: &ProcId,
487        timeout: Duration,
488    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
489        self.manager.terminate_proc(cx, proc, timeout).await
490    }
491}
492
493/// Minimal uniform surface for a spawned-**proc** handle returned by
494/// a `ProcManager`. Each manager can return its own concrete handle,
495/// as long as it exposes these. A **proc** is the Hyperactor runtime
496/// + its actors (lifecycle controlled via `Proc` APIs such as
497/// `destroy_and_wait`). A proc **may** be hosted *inside* an OS
498/// **process**, but it is conceptually distinct:
499///
500/// - `LocalProcManager`: runs the proc **in this OS process**; there
501///   is no child process to signal. Lifecycle is entirely proc-level.
502/// - `ProcessProcManager` (test-only here): launches an **external OS
503///   process** which hosts the proc, but this toy manager does
504///   **not** wire a control plane for shutdown, nor an exit monitor.
505///
506/// This trait is therefore written in terms of the **proc**
507/// lifecycle:
508///
509/// - `ready()` resolves when the proc is Ready (mailbox bound; agent
510///   available).
511/// - `wait()` resolves with the proc's terminal status
512///   (Stopped/Killed/Failed).
513/// - `terminate()` requests a graceful shutdown of the *proc* and
514///   waits up to the deadline; managers that also own a child OS
515///   process may escalate to `SIGKILL` if the proc does not exit in
516///   time.
517/// - `kill()` requests an immediate, forced termination. For
518///    in-process procs, this may be implemented as an immediate
519///    drain/abort of actor tasks. For external procs, this is
520///    typically a `SIGKILL`.
521///
522/// The shape of the terminal value is `Self::TerminalStatus`.
523/// Managers that track rich info (exit code, signal, address, agent)
524/// can expose it; trivial managers may use `()`.
525///
526/// Managers that do not support signaling must return `Unsupported`.
527#[async_trait]
528pub trait ProcHandle: Clone + Send + Sync + 'static {
529    /// The agent actor type installed in the proc by the manager.
530    /// Must implement both:
531    /// - [`Actor`], because the agent actually runs inside the proc,
532    ///   and
533    /// - [`Referable`], so callers can hold `ActorRef<Self::Agent>`.
534    type Agent: Actor + Referable;
535
536    /// The type of terminal status produced when the proc exits.
537    ///
538    /// For example, an external proc manager may use a rich status
539    /// enum (e.g. `ProcStatus`), while an in-process manager may use
540    /// a trivial unit type. This is the value returned by
541    /// [`ProcHandle::wait`] and carried by [`ReadyError::Terminal`].
542    type TerminalStatus: std::fmt::Debug + Clone + Send + Sync + 'static;
543
544    /// The proc's logical identity on this host.
545    fn proc_id(&self) -> &ProcId;
546
547    /// The proc's address (the one callers bind into the host
548    /// router).
549    fn addr(&self) -> Option<ChannelAddr>;
550
551    /// The agent actor reference hosted in the proc.
552    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>>;
553
554    /// Resolves when the proc becomes Ready. Multi-waiter,
555    /// non-consuming.
556    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>>;
557
558    /// Resolves with the terminal status (Stopped/Killed/Failed/etc).
559    /// Multi-waiter, non-consuming.
560    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError>;
561
562    /// Politely stop the proc before the deadline; managers that own
563    /// a child OS process may escalate to a forced kill at the
564    /// deadline. Idempotent and race-safe: concurrent callers
565    /// coalesce; the first terminal outcome wins and all callers
566    /// observe it via `wait()`.
567    ///
568    /// Returns the single terminal status the proc reached (the same
569    /// value `wait()` will return). Never fabricates terminal states:
570    /// this is only returned after the exit monitor observes
571    /// termination.
572    async fn terminate(
573        &self,
574        cx: &impl context::Actor,
575        timeout: Duration,
576    ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
577
578    /// Force the proc down immediately. For in-process managers this
579    /// may abort actor tasks; for external managers this typically
580    /// sends `SIGKILL`. Also idempotent/race-safe; the terminal
581    /// outcome is the one observed by `wait()`.
582    async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
583}
584
585/// A trait describing a manager of procs, responsible for bootstrapping
586/// procs on a host, and managing their lifetimes. The manager spawns an
587/// `Agent`-typed actor on each proc, responsible for managing the proc.
588#[async_trait]
589pub trait ProcManager {
590    /// Concrete handle type this manager returns.
591    type Handle: ProcHandle;
592
593    /// Additional configuration for the proc, supported by this manager.
594    type Config = ();
595
596    /// The preferred transport for this ProcManager.
597    /// In practice this will be [`ChannelTransport::Local`]
598    /// for testing, and [`ChannelTransport::Unix`] for external
599    /// processes.
600    fn transport(&self) -> ChannelTransport;
601
602    /// Spawn a new proc with the provided proc id. The proc
603    /// should use the provided forwarder address for messages
604    /// destined outside of the proc. The returned address accepts
605    /// messages destined for the proc.
606    ///
607    /// An agent actor is also spawned, and the corresponding actor
608    /// ref is returned.
609    async fn spawn(
610        &self,
611        proc_id: ProcId,
612        forwarder_addr: ChannelAddr,
613        config: Self::Config,
614    ) -> Result<Self::Handle, HostError>;
615}
616
617/// Type alias for the agent actor managed by a given [`ProcManager`].
618///
619/// This resolves to the `Agent` type exposed by the manager's
620/// associated `Handle` (via [`ProcHandle::Agent`]). It provides a
621/// convenient shorthand so call sites can refer to
622/// `ActorRef<ManagerAgent<M>>` instead of the more verbose
623/// `<M::Handle as ProcHandle>::Agent`.
624///
625/// # Example
626/// ```ignore
627/// fn takes_agent_ref<M: ProcManager>(r: ActorRef<ManagerAgent<M>>) { … }
628/// ```
629pub type ManagerAgent<M> = <<M as ProcManager>::Handle as ProcHandle>::Agent; // rust issue #112792
630
631/// A ProcManager that spawns **in-process** procs (test-only).
632///
633/// The proc runs inside this same OS process; there is **no** child
634/// process to signal. Lifecycle is purely proc-level:
635/// - `terminate(timeout)`: delegates to
636///   `Proc::destroy_and_wait(timeout, None)`, which drains and, at the
637///   deadline, aborts remaining actors.
638/// - `kill()`: uses a zero deadline to emulate a forced stop via
639///   `destroy_and_wait(Duration::ZERO, None)`.
640/// - `wait()`: trivial (no external lifecycle to observe).
641///
642///   No OS signals are sent or required.
643pub struct LocalProcManager<S> {
644    procs: Arc<Mutex<HashMap<ProcId, Proc>>>,
645    spawn: S,
646}
647
648impl<S> LocalProcManager<S> {
649    /// Create a new in-process proc manager with the given agent
650    /// params.
651    pub fn new(spawn: S) -> Self {
652        Self {
653            procs: Arc::new(Mutex::new(HashMap::new())),
654            spawn,
655        }
656    }
657}
658
659#[async_trait]
660impl<S> BulkTerminate for LocalProcManager<S>
661where
662    S: Send + Sync,
663{
664    async fn terminate_all(
665        &self,
666        _cx: &impl context::Actor,
667        timeout: std::time::Duration,
668        max_in_flight: usize,
669    ) -> TerminateSummary {
670        // Snapshot procs so we don't hold the lock across awaits.
671        let procs: Vec<Proc> = {
672            let guard = self.procs.lock().await;
673            guard.values().cloned().collect()
674        };
675
676        let attempted = procs.len();
677
678        let results = stream::iter(procs.into_iter().map(|mut p| async move {
679            // For local manager, graceful proc-level stop.
680            match p.destroy_and_wait::<()>(timeout, None).await {
681                Ok(_) => true,
682                Err(e) => {
683                    tracing::warn!(error=%e, "terminate_all(local): destroy_and_wait failed");
684                    false
685                }
686            }
687        }))
688        .buffer_unordered(max_in_flight.max(1))
689        .collect::<Vec<bool>>()
690        .await;
691
692        let ok = results.into_iter().filter(|b| *b).count();
693
694        TerminateSummary {
695            attempted,
696            ok,
697            failed: attempted.saturating_sub(ok),
698        }
699    }
700}
701
702#[async_trait::async_trait]
703impl<S> SingleTerminate for LocalProcManager<S>
704where
705    S: Send + Sync,
706{
707    async fn terminate_proc(
708        &self,
709        _cx: &impl context::Actor,
710        proc: &ProcId,
711        timeout: std::time::Duration,
712    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
713        // Snapshot procs so we don't hold the lock across awaits.
714        let procs: Option<Proc> = {
715            let mut guard = self.procs.lock().await;
716            guard.remove(proc)
717        };
718        if let Some(mut p) = procs {
719            p.destroy_and_wait::<()>(timeout, None).await
720        } else {
721            Err(anyhow::anyhow!("proc {} doesn't exist", proc))
722        }
723    }
724}
725
726/// A lightweight [`ProcHandle`] for procs managed **in-process** via
727/// [`LocalProcManager`].
728///
729/// This handle wraps the minimal identifying state of a spawned proc:
730/// - its [`ProcId`] (logical identity on the host),
731/// - the proc's [`ChannelAddr`] (the address callers bind into the
732///   host router), and
733/// - the [`ActorRef`] to the agent actor hosted in the proc.
734///
735/// Unlike external handles, `LocalHandle` does **not** manage an OS
736/// child process. It provides a uniform surface (`proc_id()`,
737/// `addr()`, `agent_ref()`) and implements `terminate()`/`kill()` by
738/// calling into the underlying `Proc::destroy_and_wait`, i.e.,
739/// **proc-level** shutdown.
740///
741/// **Type parameter:** `A` is constrained by the `ProcHandle::Agent`
742/// bound (`Actor + Referable`).
743#[derive(Debug)]
744pub struct LocalHandle<A: Actor + Referable> {
745    proc_id: ProcId,
746    addr: ChannelAddr,
747    agent_ref: ActorRef<A>,
748    procs: Arc<Mutex<HashMap<ProcId, Proc>>>,
749}
750
751// Manual `Clone` to avoid requiring `A: Clone`.
752impl<A: Actor + Referable> Clone for LocalHandle<A> {
753    fn clone(&self) -> Self {
754        Self {
755            proc_id: self.proc_id.clone(),
756            addr: self.addr.clone(),
757            agent_ref: self.agent_ref.clone(),
758            procs: Arc::clone(&self.procs),
759        }
760    }
761}
762
763#[async_trait]
764impl<A: Actor + Referable> ProcHandle for LocalHandle<A> {
765    /// `Agent = A` (inherits `Actor + Referable` from the trait
766    /// bound).
767    type Agent = A;
768    type TerminalStatus = ();
769
770    fn proc_id(&self) -> &ProcId {
771        &self.proc_id
772    }
773    fn addr(&self) -> Option<ChannelAddr> {
774        Some(self.addr.clone())
775    }
776    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
777        Some(self.agent_ref.clone())
778    }
779
780    /// Always resolves immediately: a local proc is created
781    /// in-process and is usable as soon as the handle exists.
782    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
783        Ok(())
784    }
785    /// Always resolves immediately with `()`: a local proc has no
786    /// external lifecycle to await. There is no OS child process
787    /// behind this handle.
788    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
789        Ok(())
790    }
791
792    async fn terminate(
793        &self,
794        _cx: &impl context::Actor,
795        timeout: Duration,
796    ) -> Result<(), TerminateError<Self::TerminalStatus>> {
797        let mut proc = {
798            let guard = self.procs.lock().await;
799            match guard.get(self.proc_id()) {
800                Some(p) => p.clone(),
801                None => {
802                    // The proc was already removed; treat as already
803                    // terminal.
804                    return Err(TerminateError::AlreadyTerminated(()));
805                }
806            }
807        };
808
809        // Graceful stop of the *proc* (actors) with a deadline. This
810        // will drain and then abort remaining actors at expiry.
811        let _ = proc
812            .destroy_and_wait::<()>(timeout, None)
813            .await
814            .map_err(TerminateError::Io)?;
815
816        Ok(())
817    }
818
819    async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
820        // Forced stop == zero deadline; `destroy_and_wait` will
821        // immediately abort remaining actors and return.
822        let mut proc = {
823            let guard = self.procs.lock().await;
824            match guard.get(self.proc_id()) {
825                Some(p) => p.clone(),
826                None => return Err(TerminateError::AlreadyTerminated(())),
827            }
828        };
829
830        let _ = proc
831            .destroy_and_wait::<()>(Duration::from_millis(0), None)
832            .await
833            .map_err(TerminateError::Io)?;
834
835        Ok(())
836    }
837}
838
839/// Local, in-process ProcManager.
840///
841/// **Type bounds:**
842/// - `A: Actor + Referable + Binds<A>`
843///   - `Actor`: the agent actually runs inside the proc.
844///   - `Referable`: callers hold `ActorRef<A>` to the agent; this
845///     bound is required for typed remote refs.
846///   - `Binds<A>`: lets the runtime wire the agent's message ports.
847/// - `F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send`:
848///   the spawn closure returns a Send future (we `tokio::spawn` it).
849/// - `S: Fn(Proc) -> F + Sync`: the factory can be called from
850///   concurrent contexts.
851///
852/// Result handle is `LocalHandle<A>` (whose `Agent = A` via `ProcHandle`).
853#[async_trait]
854impl<A, S, F> ProcManager for LocalProcManager<S>
855where
856    A: Actor + Referable + Binds<A>,
857    F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send,
858    S: Fn(Proc) -> F + Sync,
859{
860    type Handle = LocalHandle<A>;
861
862    fn transport(&self) -> ChannelTransport {
863        ChannelTransport::Local
864    }
865
866    #[tracing::instrument(skip(self, _config))]
867    async fn spawn(
868        &self,
869        proc_id: ProcId,
870        forwarder_addr: ChannelAddr,
871        _config: (),
872    ) -> Result<Self::Handle, HostError> {
873        let transport = forwarder_addr.transport();
874        let proc = Proc::new(
875            proc_id.clone(),
876            MailboxClient::dial(forwarder_addr)?.into_boxed(),
877        );
878        let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?;
879        self.procs
880            .lock()
881            .await
882            .insert(proc_id.clone(), proc.clone());
883        let _handle = proc.clone().serve(rx);
884        let agent_handle = (self.spawn)(proc)
885            .await
886            .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
887
888        Ok(LocalHandle {
889            proc_id,
890            addr: proc_addr,
891            agent_ref: agent_handle.bind(),
892            procs: Arc::clone(&self.procs),
893        })
894    }
895}
896
897/// A ProcManager that manages each proc as a **separate OS process**
898/// (test-only toy).
899///
900/// This implementation launches a child via `Command` and relies on
901/// `kill_on_drop(true)` so that children are SIGKILLed if the manager
902/// (or host) drops. There is **no** proc control plane (no RPC to a
903/// proc agent for shutdown) and **no** exit monitor wired here.
904/// Consequently:
905/// - `terminate()` and `kill()` return `Unsupported`.
906/// - `wait()` is trivial (no lifecycle observation).
907///
908/// It follows a simple protocol:
909///
910/// Each process is launched with the following environment variables:
911/// - `HYPERACTOR_HOST_BACKEND_ADDR`: the backend address to which all messages are forwarded,
912/// - `HYPERACTOR_HOST_PROC_ID`: the proc id to assign the launched proc, and
913/// - `HYPERACTOR_HOST_CALLBACK_ADDR`: the channel address with which to return the proc's address
914///
915/// The launched proc should also spawn an actor to manage it - the details of this are
916/// implementation dependent, and outside the scope of the process manager.
917///
918/// The function [`boot_proc`] provides a convenient implementation of the
919/// protocol.
920pub struct ProcessProcManager<A> {
921    program: std::path::PathBuf,
922    children: Arc<Mutex<HashMap<ProcId, Child>>>,
923    _phantom: PhantomData<A>,
924}
925
926impl<A> ProcessProcManager<A> {
927    /// Create a new ProcessProcManager that runs the provided
928    /// command.
929    pub fn new(program: std::path::PathBuf) -> Self {
930        Self {
931            program,
932            children: Arc::new(Mutex::new(HashMap::new())),
933            _phantom: PhantomData,
934        }
935    }
936}
937
938impl<A> Drop for ProcessProcManager<A> {
939    fn drop(&mut self) {
940        // When the manager is dropped, `children` is dropped, which
941        // drops each `Child` handle. With `kill_on_drop(true)`, the OS
942        // will SIGKILL the processes. Nothing else to do here.
943    }
944}
945
946/// A [`ProcHandle`] implementation for procs managed as separate
947/// OS processes via [`ProcessProcManager`].
948///
949/// This handle records the logical identity and connectivity of an
950/// external child process:
951/// - its [`ProcId`] (unique identity on the host),
952/// - the proc's [`ChannelAddr`] (address registered in the host
953///   router),
954/// - and the [`ActorRef`] of the agent actor spawned inside the proc.
955///
956/// Unlike [`LocalHandle`], this corresponds to a real OS process
957/// launched by the manager. In this **toy** implementation the handle
958/// does not own/monitor the `Child` and there is no shutdown control
959/// plane. It is a stable, clonable surface exposing the proc's
960/// identity, address, and agent reference so host code can interact
961/// uniformly with local/external procs. `terminate()`/`kill()` are
962/// intentionally `Unsupported` here; process cleanup relies on
963/// `cmd.kill_on_drop(true)` when launching the child (the OS will
964/// SIGKILL it if the handle is dropped).
965///
966/// The type bound `A: Actor + Referable` comes from the
967/// [`ProcHandle::Agent`] requirement: `Actor` because the agent
968/// actually runs inside the proc, and `Referable` because it must
969/// be referenceable via [`ActorRef<A>`] (i.e., safe to carry as a
970/// typed remote reference).
971#[derive(Debug)]
972pub struct ProcessHandle<A: Actor + Referable> {
973    proc_id: ProcId,
974    addr: ChannelAddr,
975    agent_ref: ActorRef<A>,
976}
977
978// Manual `Clone` to avoid requiring `A: Clone`.
979impl<A: Actor + Referable> Clone for ProcessHandle<A> {
980    fn clone(&self) -> Self {
981        Self {
982            proc_id: self.proc_id.clone(),
983            addr: self.addr.clone(),
984            agent_ref: self.agent_ref.clone(),
985        }
986    }
987}
988
989#[async_trait]
990impl<A: Actor + Referable> ProcHandle for ProcessHandle<A> {
991    /// Agent must be both an `Actor` (runs in the proc) and a
992    /// `Referable` (so it can be referenced via `ActorRef<A>`).
993    type Agent = A;
994    type TerminalStatus = ();
995
996    fn proc_id(&self) -> &ProcId {
997        &self.proc_id
998    }
999    fn addr(&self) -> Option<ChannelAddr> {
1000        Some(self.addr.clone())
1001    }
1002    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1003        Some(self.agent_ref.clone())
1004    }
1005
1006    /// Resolves immediately. `ProcessProcManager::spawn` returns this
1007    /// handle only after the child has called back with (addr,
1008    /// agent), i.e. after readiness.
1009    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1010        Ok(())
1011    }
1012    /// Resolves immediately with `()`. This handle does not track
1013    /// child lifecycle; there is no watcher in this implementation.
1014    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1015        Ok(())
1016    }
1017
1018    async fn terminate(
1019        &self,
1020        _cx: &impl context::Actor,
1021        _deadline: Duration,
1022    ) -> Result<(), TerminateError<Self::TerminalStatus>> {
1023        Err(TerminateError::Unsupported)
1024    }
1025
1026    async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
1027        Err(TerminateError::Unsupported)
1028    }
1029}
1030
1031#[async_trait]
1032impl<A> ProcManager for ProcessProcManager<A>
1033where
1034    // Agent actor runs in the proc (`Actor`) and must be
1035    // referenceable (`Referable`).
1036    A: Actor + Referable,
1037{
1038    type Handle = ProcessHandle<A>;
1039
1040    fn transport(&self) -> ChannelTransport {
1041        ChannelTransport::Unix
1042    }
1043
1044    #[tracing::instrument(skip(self, _config))]
1045    async fn spawn(
1046        &self,
1047        proc_id: ProcId,
1048        forwarder_addr: ChannelAddr,
1049        _config: (),
1050    ) -> Result<Self::Handle, HostError> {
1051        let (callback_addr, mut callback_rx) =
1052            channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1053
1054        let mut cmd = Command::new(&self.program);
1055        cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string());
1056        cmd.env("HYPERACTOR_HOST_BACKEND_ADDR", forwarder_addr.to_string());
1057        cmd.env("HYPERACTOR_HOST_CALLBACK_ADDR", callback_addr.to_string());
1058
1059        // Lifetime strategy: mark the child with
1060        // `kill_on_drop(true)` so the OS will send SIGKILL if the
1061        // handle is dropped and retain the `Child` in
1062        // `self.children`, tying its lifetime to the manager/host.
1063        //
1064        // This is the simplest viable policy to avoid orphaned
1065        // subprocesses in CI; more sophisticated lifecycle control
1066        // (graceful shutdown, restart) will be layered on later.
1067
1068        // Kill the child when its handle is dropped.
1069        cmd.kill_on_drop(true);
1070
1071        let child = cmd
1072            .spawn()
1073            .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
1074
1075        // Retain the handle so it lives for the life of the
1076        // manager/host.
1077        {
1078            let mut children = self.children.lock().await;
1079            children.insert(proc_id.clone(), child);
1080        }
1081
1082        // Wait for the child's callback with (addr, agent_ref)
1083        let (proc_addr, agent_ref) = callback_rx.recv().await?;
1084
1085        // TODO(production): For a non-test implementation, plumb a
1086        // shutdown path:
1087        // - expose a proc-level graceful stop RPC on the agent and
1088        //   implement `terminate(timeout)` by invoking it and, on
1089        //   deadline, call `Child::kill()`; implement `kill()` as
1090        //   immediate `Child::kill()`.
1091        // - wire an exit monitor so `wait()` resolves with a real
1092        //   terminal status.
1093        Ok(ProcessHandle {
1094            proc_id,
1095            addr: proc_addr,
1096            agent_ref,
1097        })
1098    }
1099}
1100
1101impl<A> ProcessProcManager<A>
1102where
1103    // `Actor`: runs in the proc; `Referable`: referenceable via
1104    // ActorRef; `Binds<A>`: wires ports.
1105    A: Actor + Referable + Binds<A>,
1106{
1107    /// Boot a process in a ProcessProcManager<A>. Should be called from processes spawned
1108    /// by the process manager. `boot_proc` will spawn the provided actor type (with parameters)
1109    /// onto the newly created Proc, and bind its handler. This allows the user to install an agent to
1110    /// manage the proc itself.
1111    pub async fn boot_proc<S, F>(spawn: S) -> Result<Proc, HostError>
1112    where
1113        S: FnOnce(Proc) -> F,
1114        F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1115    {
1116        let proc_id: ProcId = Self::parse_env("HYPERACTOR_HOST_PROC_ID")?;
1117        let backend_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_BACKEND_ADDR")?;
1118        let callback_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_CALLBACK_ADDR")?;
1119        spawn_proc(proc_id, backend_addr, callback_addr, spawn).await
1120    }
1121
1122    fn parse_env<T, E>(key: &str) -> Result<T, HostError>
1123    where
1124        T: FromStr<Err = E>,
1125        E: Into<anyhow::Error>,
1126    {
1127        std::env::var(key)
1128            .map_err(|e| HostError::MissingParameter(key.to_string(), e))?
1129            .parse()
1130            .map_err(|e: E| HostError::InvalidParameter(key.to_string(), e.into()))
1131    }
1132}
1133
1134/// Spawn a proc at `proc_id` with an `A`-typed agent actor,
1135/// forwarding messages to the provided `backend_addr`,
1136/// and returning the proc's address and agent actor on
1137/// the provided `callback_addr`.
1138#[tracing::instrument(skip(spawn))]
1139pub async fn spawn_proc<A, S, F>(
1140    proc_id: ProcId,
1141    backend_addr: ChannelAddr,
1142    callback_addr: ChannelAddr,
1143    spawn: S,
1144) -> Result<Proc, HostError>
1145where
1146    // `Actor`: runs in the proc; `Referable`: allows ActorRef<A>;
1147    // `Binds<A>`: wires ports
1148    A: Actor + Referable + Binds<A>,
1149    S: FnOnce(Proc) -> F,
1150    F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1151{
1152    let backend_transport = backend_addr.transport();
1153    let proc = Proc::new(
1154        proc_id.clone(),
1155        MailboxClient::dial(backend_addr)?.into_boxed(),
1156    );
1157
1158    let agent_handle = spawn(proc.clone())
1159        .await
1160        .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
1161
1162    // Finally serve the proc on the same transport as the backend address,
1163    // and call back.
1164    let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?;
1165    proc.clone().serve(proc_rx);
1166    channel::dial(callback_addr)?
1167        .send((proc_addr, agent_handle.bind::<A>()))
1168        .await
1169        .map_err(ChannelError::from)?;
1170
1171    Ok(proc)
1172}
1173
1174/// Testing support for hosts. This is linked outside of cfg(test)
1175/// as it is needed by an external binary.
1176pub mod testing {
1177    use async_trait::async_trait;
1178
1179    use crate as hyperactor;
1180    use crate::Actor;
1181    use crate::ActorId;
1182    use crate::Context;
1183    use crate::Handler;
1184    use crate::OncePortRef;
1185
1186    /// Just a simple actor, available in both the bootstrap binary as well as
1187    /// hyperactor tests.
1188    #[derive(Debug, Default, Actor)]
1189    #[hyperactor::export(handlers = [OncePortRef<ActorId>])]
1190    pub struct EchoActor;
1191
1192    #[async_trait]
1193    impl Handler<OncePortRef<ActorId>> for EchoActor {
1194        async fn handle(
1195            &mut self,
1196            cx: &Context<Self>,
1197            reply: OncePortRef<ActorId>,
1198        ) -> Result<(), anyhow::Error> {
1199            reply.send(cx, cx.self_id().clone())?;
1200            Ok(())
1201        }
1202    }
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207    use std::sync::Arc;
1208    use std::time::Duration;
1209
1210    use super::testing::EchoActor;
1211    use super::*;
1212    use crate::channel::ChannelTransport;
1213    use crate::clock::Clock;
1214    use crate::clock::RealClock;
1215    use crate::context::Mailbox;
1216
1217    #[tokio::test]
1218    async fn test_basic() {
1219        let proc_manager =
1220            LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()).await });
1221        let procs = Arc::clone(&proc_manager.procs);
1222        let (mut host, _handle) =
1223            Host::serve(proc_manager, ChannelAddr::any(ChannelTransport::Local))
1224                .await
1225                .unwrap();
1226
1227        let (proc_id1, _ref) = host.spawn("proc1".to_string(), ()).await.unwrap();
1228        assert_eq!(
1229            proc_id1,
1230            ProcId::Direct(host.addr().clone(), "proc1".to_string())
1231        );
1232        assert!(procs.lock().await.contains_key(&proc_id1));
1233
1234        let (proc_id2, _ref) = host.spawn("proc2".to_string(), ()).await.unwrap();
1235        assert!(procs.lock().await.contains_key(&proc_id2));
1236
1237        let proc1 = procs.lock().await.get(&proc_id1).unwrap().clone();
1238        let proc2 = procs.lock().await.get(&proc_id2).unwrap().clone();
1239
1240        // Make sure they can talk to each other:
1241        let (instance1, _handle) = proc1.instance("client").unwrap();
1242        let (instance2, _handle) = proc2.instance("client").unwrap();
1243
1244        let (port, mut rx) = instance1.mailbox().open_port();
1245
1246        port.bind().send(&instance2, "hello".to_string()).unwrap();
1247        assert_eq!(rx.recv().await.unwrap(), "hello".to_string());
1248
1249        // Make sure that the system proc is also wired in correctly.
1250        let (system_actor, _handle) = host.system_proc().instance("test").unwrap();
1251
1252        // system->proc
1253        port.bind()
1254            .send(&system_actor, "hello from the system proc".to_string())
1255            .unwrap();
1256        assert_eq!(
1257            rx.recv().await.unwrap(),
1258            "hello from the system proc".to_string()
1259        );
1260
1261        // system->system
1262        let (port, mut rx) = system_actor.mailbox().open_port();
1263        port.bind()
1264            .send(&system_actor, "hello from the system".to_string())
1265            .unwrap();
1266        assert_eq!(
1267            rx.recv().await.unwrap(),
1268            "hello from the system".to_string()
1269        );
1270
1271        // proc->system
1272        port.bind()
1273            .send(&instance1, "hello from the instance1".to_string())
1274            .unwrap();
1275        assert_eq!(
1276            rx.recv().await.unwrap(),
1277            "hello from the instance1".to_string()
1278        );
1279    }
1280
1281    #[tokio::test]
1282    // TODO: OSS: called `Result::unwrap()` on an `Err` value: ReadFailed { manifest_path: "/meta-pytorch/monarch/target/debug/deps/hyperactor-0e1fe83af739d976.resources.json", source: Os { code: 2, kind: NotFound, message: "No such file or directory" } }
1283    #[cfg_attr(not(fbcode_build), ignore)]
1284    async fn test_process_proc_manager() {
1285        hyperactor_telemetry::initialize_logging(crate::clock::ClockKind::default());
1286
1287        // EchoActor is "agent" used to test connectivity.
1288        let process_manager = ProcessProcManager::<EchoActor>::new(
1289            buck_resources::get("monarch/hyperactor/bootstrap").unwrap(),
1290        );
1291        let (mut host, _handle) =
1292            Host::serve(process_manager, ChannelAddr::any(ChannelTransport::Unix))
1293                .await
1294                .unwrap();
1295
1296        // (1) Spawn and check invariants.
1297        assert!(matches!(host.addr().transport(), ChannelTransport::Unix));
1298        let (proc1, echo1) = host.spawn("proc1".to_string(), ()).await.unwrap();
1299        let (proc2, echo2) = host.spawn("proc2".to_string(), ()).await.unwrap();
1300        assert_eq!(echo1.actor_id().proc_id(), &proc1);
1301        assert_eq!(echo2.actor_id().proc_id(), &proc2);
1302
1303        // (2) Duplicate name rejection.
1304        let dup = host.spawn("proc1".to_string(), ()).await;
1305        assert!(matches!(dup, Err(HostError::ProcExists(_))));
1306
1307        // (3) Create a standalone client proc and verify echo1 agent responds.
1308        // Request: client proc -> host frontend/router -> echo1 (proc1).
1309        // Reply:   echo1 (proc1) -> host backend -> host router -> client port.
1310        // This confirms that an external proc (created via
1311        // `Proc::direct`) can address a child proc through the host,
1312        // and receive a correct reply.
1313        let client = Proc::direct(
1314            ChannelAddr::any(host.addr().transport()),
1315            "test".to_string(),
1316        )
1317        .await
1318        .unwrap();
1319        let (client_inst, _h) = client.instance("test").unwrap();
1320        let (port, rx) = client_inst.mailbox().open_once_port();
1321        echo1.send(&client_inst, port.bind()).unwrap();
1322        let id = RealClock
1323            .timeout(Duration::from_secs(5), rx.recv())
1324            .await
1325            .unwrap()
1326            .unwrap();
1327        assert_eq!(id, *echo1.actor_id());
1328
1329        // (4) Child <-> external client request -> reply:
1330        // Request: client proc (standalone via `Proc::direct`) ->
1331        //          host frontend/router -> echo2 (proc2).
1332        // Reply:   echo2 (proc2) -> host backend -> host router ->
1333        //          client port (standalone proc).
1334        // This exercises cross-proc routing between a child and an
1335        // external client under the same host.
1336        let (port2, rx2) = client_inst.mailbox().open_once_port();
1337        echo2.send(&client_inst, port2.bind()).unwrap();
1338        let id2 = RealClock
1339            .timeout(Duration::from_secs(5), rx2.recv())
1340            .await
1341            .unwrap()
1342            .unwrap();
1343        assert_eq!(id2, *echo2.actor_id());
1344
1345        // (5) System -> child request -> cross-proc reply:
1346        // Request: system proc -> host router (frontend) -> echo1
1347        //          (proc1, child).
1348        // Reply: echo1 (proc1) -> proc1 forwarder -> host backend ->
1349        //        host router -> client proc direct addr (Proc::direct) ->
1350        //        client port.
1351        // Because `client_inst` runs in its own proc, the reply
1352        // traverses the host (not local delivery within proc1).
1353        let (sys_inst, _h) = host.system_proc().instance("sys-client").unwrap();
1354        let (port3, rx3) = client_inst.mailbox().open_once_port();
1355        // Send from system -> child via a message that ultimately
1356        // replies to client's port
1357        echo1.send(&sys_inst, port3.bind()).unwrap();
1358        let id3 = RealClock
1359            .timeout(Duration::from_secs(5), rx3.recv())
1360            .await
1361            .unwrap()
1362            .unwrap();
1363        assert_eq!(id3, *echo1.actor_id());
1364    }
1365
1366    #[tokio::test]
1367    async fn local_ready_and_wait_are_immediate() {
1368        // Build a LocalHandle directly.
1369        let addr = ChannelAddr::any(ChannelTransport::Local);
1370        let proc_id = ProcId::Direct(addr.clone(), "p".into());
1371        let agent_ref = ActorRef::<()>::attest(proc_id.actor_id("agent", 0));
1372        let h = LocalHandle::<()> {
1373            proc_id,
1374            addr,
1375            agent_ref,
1376            procs: Arc::new(Mutex::new(HashMap::new())),
1377        };
1378
1379        // ready() resolves immediately
1380        assert!(h.ready().await.is_ok());
1381
1382        // wait() resolves immediately with unit TerminalStatus
1383        assert!(h.wait().await.is_ok());
1384
1385        // Multiple concurrent waiters both succeed
1386        let (r1, r2) = tokio::join!(h.ready(), h.ready());
1387        assert!(r1.is_ok() && r2.is_ok());
1388    }
1389
1390    // --
1391    // Fixtures for `host::spawn` tests.
1392
1393    #[derive(Debug, Clone, Copy)]
1394    enum ReadyMode {
1395        OkAfter(Duration),
1396        ErrTerminal,
1397        ErrChannelClosed,
1398    }
1399
1400    #[derive(Debug, Clone)]
1401    struct TestHandle {
1402        id: ProcId,
1403        addr: ChannelAddr,
1404        agent: ActorRef<()>,
1405        mode: ReadyMode,
1406        omit_addr: bool,
1407        omit_agent: bool,
1408    }
1409
1410    #[async_trait::async_trait]
1411    impl ProcHandle for TestHandle {
1412        type Agent = ();
1413        type TerminalStatus = ();
1414
1415        fn proc_id(&self) -> &ProcId {
1416            &self.id
1417        }
1418        fn addr(&self) -> Option<ChannelAddr> {
1419            if self.omit_addr {
1420                None
1421            } else {
1422                Some(self.addr.clone())
1423            }
1424        }
1425        fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1426            if self.omit_agent {
1427                None
1428            } else {
1429                Some(self.agent.clone())
1430            }
1431        }
1432        async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1433            match self.mode {
1434                ReadyMode::OkAfter(d) => {
1435                    if !d.is_zero() {
1436                        RealClock.sleep(d).await;
1437                    }
1438                    Ok(())
1439                }
1440                ReadyMode::ErrTerminal => Err(ReadyError::Terminal(())),
1441                ReadyMode::ErrChannelClosed => Err(ReadyError::ChannelClosed),
1442            }
1443        }
1444        async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1445            Ok(())
1446        }
1447        async fn terminate(
1448            &self,
1449            _cx: &impl context::Actor,
1450            _timeout: Duration,
1451        ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
1452            Err(TerminateError::Unsupported)
1453        }
1454        async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
1455            Err(TerminateError::Unsupported)
1456        }
1457    }
1458
1459    #[derive(Debug, Clone)]
1460    struct TestManager {
1461        mode: ReadyMode,
1462        omit_addr: bool,
1463        omit_agent: bool,
1464        transport: ChannelTransport,
1465    }
1466
1467    impl TestManager {
1468        fn local(mode: ReadyMode) -> Self {
1469            Self {
1470                mode,
1471                omit_addr: false,
1472                omit_agent: false,
1473                transport: ChannelTransport::Local,
1474            }
1475        }
1476        fn with_omissions(mut self, addr: bool, agent: bool) -> Self {
1477            self.omit_addr = addr;
1478            self.omit_agent = agent;
1479            self
1480        }
1481    }
1482
1483    #[async_trait::async_trait]
1484    impl ProcManager for TestManager {
1485        type Handle = TestHandle;
1486
1487        fn transport(&self) -> ChannelTransport {
1488            self.transport.clone()
1489        }
1490
1491        async fn spawn(
1492            &self,
1493            proc_id: ProcId,
1494            forwarder_addr: ChannelAddr,
1495            _config: (),
1496        ) -> Result<Self::Handle, HostError> {
1497            let agent = ActorRef::<()>::attest(proc_id.actor_id("agent", 0));
1498            Ok(TestHandle {
1499                id: proc_id,
1500                addr: forwarder_addr,
1501                agent,
1502                mode: self.mode,
1503                omit_addr: self.omit_addr,
1504                omit_agent: self.omit_agent,
1505            })
1506        }
1507    }
1508
1509    #[tokio::test]
1510    async fn host_spawn_times_out_when_configured() {
1511        let cfg = crate::config::global::lock();
1512        let _g = cfg.override_key(
1513            crate::config::HOST_SPAWN_READY_TIMEOUT,
1514            Duration::from_millis(10),
1515        );
1516
1517        let (mut host, _h) = Host::serve(
1518            TestManager::local(ReadyMode::OkAfter(Duration::from_millis(50))),
1519            ChannelAddr::any(ChannelTransport::Local),
1520        )
1521        .await
1522        .unwrap();
1523
1524        let err = host.spawn("t".into(), ()).await.expect_err("must time out");
1525        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1526    }
1527
1528    #[tokio::test]
1529    async fn host_spawn_timeout_zero_disables_and_succeeds() {
1530        let cfg = crate::config::global::lock();
1531        let _g = cfg.override_key(
1532            crate::config::HOST_SPAWN_READY_TIMEOUT,
1533            Duration::from_secs(0),
1534        );
1535
1536        let (mut host, _h) = Host::serve(
1537            TestManager::local(ReadyMode::OkAfter(Duration::from_millis(20))),
1538            ChannelAddr::any(ChannelTransport::Local),
1539        )
1540        .await
1541        .unwrap();
1542
1543        let (pid, agent) = host.spawn("ok".into(), ()).await.expect("must succeed");
1544        assert_eq!(agent.actor_id().proc_id(), &pid);
1545        assert!(host.procs.contains_key("ok"));
1546    }
1547
1548    #[tokio::test]
1549    async fn host_spawn_maps_channel_closed_ready_error_to_config_failure() {
1550        let (mut host, _h) = Host::serve(
1551            TestManager::local(ReadyMode::ErrChannelClosed),
1552            ChannelAddr::any(ChannelTransport::Local),
1553        )
1554        .await
1555        .unwrap();
1556
1557        let err = host.spawn("p".into(), ()).await.expect_err("must fail");
1558        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1559    }
1560
1561    #[tokio::test]
1562    async fn host_spawn_maps_terminal_ready_error_to_config_failure() {
1563        let (mut host, _h) = Host::serve(
1564            TestManager::local(ReadyMode::ErrTerminal),
1565            ChannelAddr::any(ChannelTransport::Local),
1566        )
1567        .await
1568        .unwrap();
1569
1570        let err = host.spawn("p".into(), ()).await.expect_err("must fail");
1571        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1572    }
1573
1574    #[tokio::test]
1575    async fn host_spawn_fails_if_ready_but_missing_addr() {
1576        let (mut host, _h) = Host::serve(
1577            TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(true, false),
1578            ChannelAddr::any(ChannelTransport::Local),
1579        )
1580        .await
1581        .unwrap();
1582
1583        let err = host
1584            .spawn("no-addr".into(), ())
1585            .await
1586            .expect_err("must fail");
1587        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1588    }
1589
1590    #[tokio::test]
1591    async fn host_spawn_fails_if_ready_but_missing_agent() {
1592        let (mut host, _h) = Host::serve(
1593            TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(false, true),
1594            ChannelAddr::any(ChannelTransport::Local),
1595        )
1596        .await
1597        .unwrap();
1598
1599        let err = host
1600            .spawn("no-agent".into(), ())
1601            .await
1602            .expect_err("must fail");
1603        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1604    }
1605}