hyperactor/
host.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module defines [`Host`], which represents all the procs running on a host.
10//! The procs themselves are managed by an implementation of [`ProcManager`], which may,
11//! for example, fork new processes for each proc, or spawn them in the same process
12//! for testing purposes.
13//!
14//! The primary purpose of a host is to manage the lifecycle of these procs, and to
15//! serve as a single front-end for all the procs on a host, multiplexing network
16//! channels.
17//!
18//! ## Channel muxing
19//!
20//! A [`Host`] maintains a single frontend address, through which all procs are accessible
21//! through direct addressing: the id of each proc is the `ProcId::Direct(frontend_addr, proc_name)`.
22//! In the following, the frontend address is denoted by `*`. The host listens on `*` and
23//! multiplexes messages based on the proc name. When spawning procs, the host maintains
24//! backend channels with separate addresses. In the diagram `#` is the backend address of
25//! the host, while `#n` is the backend address for proc *n*. The host forwards messages
26//! to the appropriate backend channel, while procs forward messages to the host backend
27//! channel at `#`.
28//!
29//! ```text
30//!                      ┌────────────┐
31//!                  ┌───▶  proc *,1  │
32//!                  │ #1└────────────┘
33//!                  │
34//!  ┌──────────┐    │   ┌────────────┐
35//!  │   Host   │◀───┼───▶  proc *,2  │
36//! *└──────────┘#   │ #2└────────────┘
37//!                  │
38//!                  │   ┌────────────┐
39//!                  └───▶  proc *,3  │
40//!                    #3└────────────┘
41//! ```
42
43use std::collections::HashMap;
44use std::fmt;
45use std::marker::PhantomData;
46use std::str::FromStr;
47use std::sync::Arc;
48use std::time::Duration;
49
50use async_trait::async_trait;
51use futures::Future;
52use futures::StreamExt;
53use futures::stream;
54use tokio::process::Child;
55use tokio::process::Command;
56use tokio::sync::Mutex;
57
58use crate::Actor;
59use crate::ActorHandle;
60use crate::ActorRef;
61use crate::PortHandle;
62use crate::Proc;
63use crate::ProcId;
64use crate::actor::Binds;
65use crate::actor::Referable;
66use crate::channel;
67use crate::channel::ChannelAddr;
68use crate::channel::ChannelError;
69use crate::channel::ChannelTransport;
70use crate::channel::Rx;
71use crate::channel::Tx;
72use crate::clock::Clock;
73use crate::clock::RealClock;
74use crate::mailbox::BoxableMailboxSender;
75use crate::mailbox::DialMailboxRouter;
76use crate::mailbox::IntoBoxedMailboxSender as _;
77use crate::mailbox::MailboxClient;
78use crate::mailbox::MailboxSender;
79use crate::mailbox::MailboxServer;
80use crate::mailbox::MailboxServerHandle;
81use crate::mailbox::MessageEnvelope;
82use crate::mailbox::Undeliverable;
83
84/// The type of error produced by host operations.
85#[derive(Debug, thiserror::Error)]
86pub enum HostError {
87    /// A channel error occurred during a host operation.
88    #[error(transparent)]
89    ChannelError(#[from] ChannelError),
90
91    /// The named proc already exists and cannot be spawned.
92    #[error("proc '{0}' already exists")]
93    ProcExists(String),
94
95    /// Failures occuring while spawning a subprocess.
96    #[error("proc '{0}' failed to spawn process: {1}")]
97    ProcessSpawnFailure(ProcId, #[source] std::io::Error),
98
99    /// Failures occuring while configuring a subprocess.
100    #[error("proc '{0}' failed to configure process: {1}")]
101    ProcessConfigurationFailure(ProcId, #[source] anyhow::Error),
102
103    /// Failures occuring while spawning a management actor in a proc.
104    #[error("failed to spawn agent on proc '{0}': {1}")]
105    AgentSpawnFailure(ProcId, #[source] anyhow::Error),
106
107    /// An input parameter was missing.
108    #[error("parameter '{0}' missing: {1}")]
109    MissingParameter(String, std::env::VarError),
110
111    /// An input parameter was invalid.
112    #[error("parameter '{0}' invalid: {1}")]
113    InvalidParameter(String, anyhow::Error),
114}
115
116/// A host, managing the lifecycle of several procs, and their backend
117/// routing, as described in this module's documentation.
118#[derive(Debug)]
119pub struct Host<M> {
120    procs: HashMap<String, ChannelAddr>,
121    frontend_addr: ChannelAddr,
122    backend_addr: ChannelAddr,
123    router: DialMailboxRouter,
124    manager: M,
125    service_proc: Proc,
126}
127
128impl<M: ProcManager> Host<M> {
129    /// Serve a host using the provided ProcManager, on the provided `addr`.
130    /// On success, the host will multiplex messages for procs on the host
131    /// on the address of the host.
132    pub async fn serve(
133        manager: M,
134        addr: ChannelAddr,
135    ) -> Result<(Self, MailboxServerHandle), HostError> {
136        let (frontend_addr, frontend_rx) = channel::serve(addr)?;
137
138        // We set up a cascade of routers: first, the outer router supports
139        // sending to the the system proc, while the dial router manages dialed
140        // connections.
141        let router = DialMailboxRouter::new();
142
143        // Establish a backend channel on the preferred transport. We currently simply
144        // serve the same router on both.
145        let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?;
146
147        // Set up a system proc. This is often used to manage the host itself.
148        let service_proc_id = ProcId::Direct(frontend_addr.clone(), "service".to_string());
149        let service_proc = Proc::new(service_proc_id.clone(), router.boxed());
150
151        tracing::info!(
152            frontend_addr = frontend_addr.to_string(),
153            backend_addr = backend_addr.to_string(),
154            service_proc_id = service_proc_id.to_string(),
155            "serving host"
156        );
157
158        let host = Host {
159            procs: HashMap::new(),
160            frontend_addr,
161            backend_addr,
162            router: router.clone(),
163            manager,
164            service_proc: service_proc.clone(),
165        };
166
167        let router = ProcOrDial {
168            proc: service_proc,
169            router,
170        };
171
172        // Serve the same router on both frontend and backend addresses.
173        let _backend_handle = router.clone().serve(backend_rx);
174        let frontend_handle = router.serve(frontend_rx);
175
176        Ok((host, frontend_handle))
177    }
178
179    /// The underlying proc manager.
180    pub fn manager(&self) -> &M {
181        &self.manager
182    }
183
184    /// The address which accepts messages destined for this host.
185    pub fn addr(&self) -> &ChannelAddr {
186        &self.frontend_addr
187    }
188
189    /// The system proc associated with this host.
190    pub fn system_proc(&self) -> &Proc {
191        &self.service_proc
192    }
193
194    /// Spawn a new process with the given `name`. On success, the proc has been
195    /// spawned, and is reachable through the returned, direct-addressed ProcId,
196    /// which will be `ProcId::Direct(self.addr(), name)`.
197    pub async fn spawn(
198        &mut self,
199        name: String,
200    ) -> Result<(ProcId, ActorRef<ManagerAgent<M>>), HostError> {
201        if self.procs.contains_key(&name) {
202            return Err(HostError::ProcExists(name));
203        }
204
205        let proc_id = ProcId::Direct(self.frontend_addr.clone(), name.clone());
206        let handle = self
207            .manager
208            .spawn(proc_id.clone(), self.backend_addr.clone())
209            .await?;
210
211        // Await readiness (config-driven; 0s disables timeout).
212        let to: Duration = crate::config::global::get(crate::config::HOST_SPAWN_READY_TIMEOUT);
213        let ready: Result<(), HostError> = if to == Duration::from_secs(0) {
214            handle.ready().await.map_err(|e| {
215                HostError::ProcessConfigurationFailure(proc_id.clone(), anyhow::anyhow!("{e:?}"))
216            })
217        } else {
218            match RealClock.timeout(to, handle.ready()).await {
219                Ok(Ok(())) => Ok(()),
220                Ok(Err(e)) => Err(HostError::ProcessConfigurationFailure(
221                    proc_id.clone(),
222                    anyhow::anyhow!("{e:?}"),
223                )),
224                Err(_) => Err(HostError::ProcessConfigurationFailure(
225                    proc_id.clone(),
226                    anyhow::anyhow!(format!("timeout waiting for Ready after {to:?}")),
227                )),
228            }
229        };
230
231        ready?;
232
233        // After Ready, addr() + agent_ref() must be present.
234        let addr = handle.addr().ok_or_else(|| {
235            HostError::ProcessConfigurationFailure(
236                proc_id.clone(),
237                anyhow::anyhow!("proc reported Ready but no addr() available"),
238            )
239        })?;
240        let agent_ref = handle.agent_ref().ok_or_else(|| {
241            HostError::ProcessConfigurationFailure(
242                proc_id.clone(),
243                anyhow::anyhow!("proc reported Ready but no agent_ref() available"),
244            )
245        })?;
246
247        self.router.bind(proc_id.clone().into(), addr.clone());
248        self.procs.insert(name, addr);
249
250        Ok((proc_id, agent_ref))
251    }
252}
253
254/// A router used to route to the system proc, or else fall back to
255/// the dial mailbox router.
256#[derive(Debug, Clone)]
257struct ProcOrDial {
258    proc: Proc,
259    router: DialMailboxRouter,
260}
261
262impl MailboxSender for ProcOrDial {
263    fn post_unchecked(
264        &self,
265        envelope: MessageEnvelope,
266        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
267    ) {
268        if envelope.dest().actor_id().proc_id() == self.proc.proc_id() {
269            self.proc.post_unchecked(envelope, return_handle);
270        } else {
271            self.router.post_unchecked(envelope, return_handle)
272        }
273    }
274}
275
276/// Error returned by [`ProcHandle::ready`].
277#[derive(Debug, Clone)]
278pub enum ReadyError<TerminalStatus> {
279    /// The proc reached a terminal state before becoming Ready.
280    Terminal(TerminalStatus),
281    /// Implementation lost its status channel / cannot observe state.
282    ChannelClosed,
283}
284
285/// Error returned by [`ProcHandle::wait`].
286#[derive(Debug, Clone)]
287pub enum WaitError {
288    /// Implementation lost its status channel / cannot observe state.
289    ChannelClosed,
290}
291
292/// Error returned by [`ProcHandle::terminate`] and
293/// [`ProcHandle::kill`].
294///
295/// - `Unsupported`: the manager cannot perform the requested proc
296///   signaling (e.g., local/in-process manager that doesn't emulate
297///   kill).
298/// - `AlreadyTerminated(term)`: the proc was already terminal; `term`
299///   is the same value `wait()` would return.
300/// - `ChannelClosed`: the manager lost its lifecycle channel and
301///   cannot reliably observe state transitions.
302/// - `Io(err)`: manager-specific failure delivering the signal or
303///   performing shutdown (e.g., OS error on kill).
304#[derive(Debug)]
305pub enum TerminateError<TerminalStatus> {
306    /// Manager doesn't support signaling (e.g., Local manager).
307    Unsupported,
308    /// A terminal state was already reached while attempting
309    /// terminate/kill.
310    AlreadyTerminated(TerminalStatus),
311    /// Implementation lost its status channel / cannot observe state.
312    ChannelClosed,
313    /// Manager-specific failure to deliver signal or perform
314    /// shutdown.
315    Io(anyhow::Error),
316}
317
318impl<T: fmt::Debug> fmt::Display for TerminateError<T> {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        match self {
321            TerminateError::Unsupported => write!(f, "terminate/kill unsupported by manager"),
322            TerminateError::AlreadyTerminated(st) => {
323                write!(f, "proc already terminated (status: {st:?})")
324            }
325            TerminateError::ChannelClosed => {
326                write!(f, "lifecycle channel closed; cannot observe state")
327            }
328            TerminateError::Io(err) => write!(f, "I/O error during terminate/kill: {err}"),
329        }
330    }
331}
332
333impl<T: fmt::Debug> std::error::Error for TerminateError<T> {
334    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
335        match self {
336            TerminateError::Io(err) => Some(err.root_cause()),
337            _ => None,
338        }
339    }
340}
341
342/// Summary of results from a bulk termination attempt.
343///
344/// - `attempted`: total number of child procs for which termination
345///   was attempted.
346/// - `ok`: number of procs successfully terminated (includes those
347///   that were already in a terminal state).
348/// - `failed`: number of procs that could not be terminated (e.g.
349///   signaling errors or lost lifecycle channel).
350#[derive(Debug)]
351pub struct TerminateSummary {
352    /// Total number of child procs for which termination was
353    /// attempted.
354    pub attempted: usize,
355    /// Number of procs that successfully reached a terminal state.
356    ///
357    /// This count includes both procs that exited cleanly after
358    /// `terminate(timeout)` and those that were already in a terminal
359    /// state before termination was attempted.
360    pub ok: usize,
361    /// Number of procs that failed to terminate.
362    ///
363    /// Failures typically arise from signaling errors (e.g., OS
364    /// failure to deliver SIGTERM/SIGKILL) or a lost lifecycle
365    /// channel, meaning the manager could no longer observe state
366    /// transitions.
367    pub failed: usize,
368}
369
370impl fmt::Display for TerminateSummary {
371    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
372        write!(
373            f,
374            "attempted={} ok={} failed={}",
375            self.attempted, self.ok, self.failed
376        )
377    }
378}
379
380/// Trait for managers that can terminate many child **units** in
381/// bulk.
382///
383/// Implementors provide a concurrency-bounded, graceful shutdown over
384/// all currently tracked children (polite stop → wait → forceful
385/// stop), returning a summary of outcomes. The exact stop/kill
386/// semantics are manager-specific: for example, an OS-process manager
387/// might send signals, while an in-process manager might drain/abort
388/// tasks.
389#[async_trait::async_trait]
390pub trait BulkTerminate: Send + Sync {
391    /// Gracefully terminate all known children.
392    ///
393    /// Initiates a polite shutdown for each child, waits up to
394    /// `timeout` for completion, then escalates to a forceful stop
395    /// for any that remain. Work may be done in parallel, capped by
396    /// `max_in_flight`. The returned [`TerminateSummary`] reports how
397    /// many children were attempted, succeeded, and failed.
398    ///
399    /// Implementation notes:
400    /// - "Polite shutdown" and "forceful stop" are intentionally
401    ///   abstract. Implementors should map these to whatever
402    ///   semantics they control (e.g., proc-level drain/abort, RPCs,
403    ///   OS signals).
404    /// - The operation must be idempotent and tolerate races with
405    ///   concurrent termination or external exits.
406    ///
407    /// # Parameters
408    /// - `timeout`: Per-child grace period before escalation to a
409    ///   forceful stop.
410    /// - `max_in_flight`: Upper bound on concurrent terminations (≥
411    ///   1) to prevent resource spikes (I/O, CPU, file descriptors,
412    ///   etc.).
413    async fn terminate_all(
414        &self,
415        timeout: std::time::Duration,
416        max_in_flight: usize,
417    ) -> TerminateSummary;
418}
419
420// Host convenience that's available only when its manager supports
421// bulk termination.
422impl<M: ProcManager + BulkTerminate> Host<M> {
423    /// Gracefully terminate all procs spawned by this host.
424    ///
425    /// Delegates to the underlying manager’s
426    /// [`BulkTerminate::terminate_all`] implementation. Use this to
427    /// perform orderly teardown during scale-down or shutdown.
428    ///
429    /// # Parameters
430    /// - `timeout`: Per-child grace period before escalation.
431    /// - `max_in_flight`: Upper bound on concurrent terminations.
432    ///
433    /// # Returns
434    /// A [`TerminateSummary`] with counts of attempted/ok/failed
435    /// terminations.
436    pub async fn terminate_children(
437        &self,
438        timeout: Duration,
439        max_in_flight: usize,
440    ) -> TerminateSummary {
441        self.manager.terminate_all(timeout, max_in_flight).await
442    }
443}
444
445/// Minimal uniform surface for a spawned-**proc** handle returned by
446/// a `ProcManager`. Each manager can return its own concrete handle,
447/// as long as it exposes these. A **proc** is the Hyperactor runtime
448/// + its actors (lifecycle controlled via `Proc` APIs such as
449/// `destroy_and_wait`). A proc **may** be hosted *inside* an OS
450/// **process**, but it is conceptually distinct:
451///
452/// - `LocalProcManager`: runs the proc **in this OS process**; there
453///   is no child process to signal. Lifecycle is entirely proc-level.
454/// - `ProcessProcManager` (test-only here): launches an **external OS
455///   process** which hosts the proc, but this toy manager does
456///   **not** wire a control plane for shutdown, nor an exit monitor.
457///
458/// This trait is therefore written in terms of the **proc**
459/// lifecycle:
460///
461/// - `ready()` resolves when the proc is Ready (mailbox bound; agent
462///   available).
463/// - `wait()` resolves with the proc's terminal status
464///   (Stopped/Killed/Failed).
465/// - `terminate()` requests a graceful shutdown of the *proc* and
466///   waits up to the deadline; managers that also own a child OS
467///   process may escalate to `SIGKILL` if the proc does not exit in
468///   time.
469/// - `kill()` requests an immediate, forced termination. For
470///    in-process procs, this may be implemented as an immediate
471///    drain/abort of actor tasks. For external procs, this is
472///    typically a `SIGKILL`.
473///
474/// The shape of the terminal value is `Self::TerminalStatus`.
475/// Managers that track rich info (exit code, signal, address, agent)
476/// can expose it; trivial managers may use `()`.
477///
478/// Managers that do not support signaling must return `Unsupported`.
479#[async_trait]
480pub trait ProcHandle: Clone + Send + Sync + 'static {
481    /// The agent actor type installed in the proc by the manager.
482    /// Must implement both:
483    /// - [`Actor`], because the agent actually runs inside the proc,
484    ///   and
485    /// - [`Referable`], so callers can hold `ActorRef<Self::Agent>`.
486    type Agent: Actor + Referable;
487
488    /// The type of terminal status produced when the proc exits.
489    ///
490    /// For example, an external proc manager may use a rich status
491    /// enum (e.g. `ProcStatus`), while an in-process manager may use
492    /// a trivial unit type. This is the value returned by
493    /// [`ProcHandle::wait`] and carried by [`ReadyError::Terminal`].
494    type TerminalStatus: std::fmt::Debug + Clone + Send + Sync + 'static;
495
496    /// The proc's logical identity on this host.
497    fn proc_id(&self) -> &ProcId;
498
499    /// The proc's address (the one callers bind into the host
500    /// router).
501    fn addr(&self) -> Option<ChannelAddr>;
502
503    /// The agent actor reference hosted in the proc.
504    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>>;
505
506    /// Resolves when the proc becomes Ready. Multi-waiter,
507    /// non-consuming.
508    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>>;
509
510    /// Resolves with the terminal status (Stopped/Killed/Failed/etc).
511    /// Multi-waiter, non-consuming.
512    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError>;
513
514    /// Politely stop the proc before the deadline; managers that own
515    /// a child OS process may escalate to a forced kill at the
516    /// deadline. Idempotent and race-safe: concurrent callers
517    /// coalesce; the first terminal outcome wins and all callers
518    /// observe it via `wait()`.
519    ///
520    /// Returns the single terminal status the proc reached (the same
521    /// value `wait()` will return). Never fabricates terminal states:
522    /// this is only returned after the exit monitor observes
523    /// termination.
524    async fn terminate(
525        &self,
526        timeout: Duration,
527    ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
528
529    /// Force the proc down immediately. For in-process managers this
530    /// may abort actor tasks; for external managers this typically
531    /// sends `SIGKILL`. Also idempotent/race-safe; the terminal
532    /// outcome is the one observed by `wait()`.
533    async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
534}
535
536/// A trait describing a manager of procs, responsible for bootstrapping
537/// procs on a host, and managing their lifetimes. The manager spawns an
538/// `Agent`-typed actor on each proc, responsible for managing the proc.
539#[async_trait]
540pub trait ProcManager {
541    /// Concrete handle type this manager returns.
542    type Handle: ProcHandle;
543
544    /// The preferred transport for this ProcManager.
545    /// In practice this will be [`ChannelTransport::Local`]
546    /// for testing, and [`ChannelTransport::Unix`] for external
547    /// processes.
548    fn transport(&self) -> ChannelTransport;
549
550    /// Spawn a new proc with the provided proc id. The proc
551    /// should use the provided forwarder address for messages
552    /// destined outside of the proc. The returned address accepts
553    /// messages destined for the proc.
554    ///
555    /// An agent actor is also spawned, and the corresponding actor
556    /// ref is returned.
557    async fn spawn(
558        &self,
559        proc_id: ProcId,
560        forwarder_addr: ChannelAddr,
561    ) -> Result<Self::Handle, HostError>;
562}
563
564/// Type alias for the agent actor managed by a given [`ProcManager`].
565///
566/// This resolves to the `Agent` type exposed by the manager's
567/// associated `Handle` (via [`ProcHandle::Agent`]). It provides a
568/// convenient shorthand so call sites can refer to
569/// `ActorRef<ManagerAgent<M>>` instead of the more verbose
570/// `<M::Handle as ProcHandle>::Agent`.
571///
572/// # Example
573/// ```ignore
574/// fn takes_agent_ref<M: ProcManager>(r: ActorRef<ManagerAgent<M>>) { … }
575/// ```
576pub type ManagerAgent<M> = <<M as ProcManager>::Handle as ProcHandle>::Agent; // rust issue #112792
577
578/// A ProcManager that spawns **in-process** procs (test-only).
579///
580/// The proc runs inside this same OS process; there is **no** child
581/// process to signal. Lifecycle is purely proc-level:
582/// - `terminate(timeout)`: delegates to
583///   `Proc::destroy_and_wait(timeout, None)`, which drains and, at the
584///   deadline, aborts remaining actors.
585/// - `kill()`: uses a zero deadline to emulate a forced stop via
586///   `destroy_and_wait(Duration::ZERO, None)`.
587/// - `wait()`: trivial (no external lifecycle to observe).
588///
589///   No OS signals are sent or required.
590pub struct LocalProcManager<S> {
591    procs: Arc<Mutex<HashMap<ProcId, Proc>>>,
592    spawn: S,
593}
594
595impl<S> LocalProcManager<S> {
596    /// Create a new in-process proc manager with the given agent
597    /// params.
598    pub fn new(spawn: S) -> Self {
599        Self {
600            procs: Arc::new(Mutex::new(HashMap::new())),
601            spawn,
602        }
603    }
604}
605
606#[async_trait]
607impl<S> BulkTerminate for LocalProcManager<S>
608where
609    S: Send + Sync,
610{
611    async fn terminate_all(
612        &self,
613        timeout: std::time::Duration,
614        max_in_flight: usize,
615    ) -> TerminateSummary {
616        // Snapshot procs so we don't hold the lock across awaits.
617        let procs: Vec<Proc> = {
618            let guard = self.procs.lock().await;
619            guard.values().cloned().collect()
620        };
621
622        let attempted = procs.len();
623
624        let results = stream::iter(procs.into_iter().map(|mut p| async move {
625            // For local manager, graceful proc-level stop.
626            match p.destroy_and_wait::<()>(timeout, None).await {
627                Ok(_) => true,
628                Err(e) => {
629                    tracing::warn!(error=%e, "terminate_all(local): destroy_and_wait failed");
630                    false
631                }
632            }
633        }))
634        .buffer_unordered(max_in_flight.max(1))
635        .collect::<Vec<bool>>()
636        .await;
637
638        let ok = results.into_iter().filter(|b| *b).count();
639
640        TerminateSummary {
641            attempted,
642            ok,
643            failed: attempted.saturating_sub(ok),
644        }
645    }
646}
647
648/// A lightweight [`ProcHandle`] for procs managed **in-process** via
649/// [`LocalProcManager`].
650///
651/// This handle wraps the minimal identifying state of a spawned proc:
652/// - its [`ProcId`] (logical identity on the host),
653/// - the proc's [`ChannelAddr`] (the address callers bind into the
654///   host router), and
655/// - the [`ActorRef`] to the agent actor hosted in the proc.
656///
657/// Unlike external handles, `LocalHandle` does **not** manage an OS
658/// child process. It provides a uniform surface (`proc_id()`,
659/// `addr()`, `agent_ref()`) and implements `terminate()`/`kill()` by
660/// calling into the underlying `Proc::destroy_and_wait`, i.e.,
661/// **proc-level** shutdown.
662///
663/// **Type parameter:** `A` is constrained by the `ProcHandle::Agent`
664/// bound (`Actor + Referable`).
665#[derive(Debug)]
666pub struct LocalHandle<A: Actor + Referable> {
667    proc_id: ProcId,
668    addr: ChannelAddr,
669    agent_ref: ActorRef<A>,
670    procs: Arc<Mutex<HashMap<ProcId, Proc>>>,
671}
672
673// Manual `Clone` to avoid requiring `A: Clone`.
674impl<A: Actor + Referable> Clone for LocalHandle<A> {
675    fn clone(&self) -> Self {
676        Self {
677            proc_id: self.proc_id.clone(),
678            addr: self.addr.clone(),
679            agent_ref: self.agent_ref.clone(),
680            procs: Arc::clone(&self.procs),
681        }
682    }
683}
684
685#[async_trait]
686impl<A: Actor + Referable> ProcHandle for LocalHandle<A> {
687    /// `Agent = A` (inherits `Actor + Referable` from the trait
688    /// bound).
689    type Agent = A;
690    type TerminalStatus = ();
691
692    fn proc_id(&self) -> &ProcId {
693        &self.proc_id
694    }
695    fn addr(&self) -> Option<ChannelAddr> {
696        Some(self.addr.clone())
697    }
698    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
699        Some(self.agent_ref.clone())
700    }
701
702    /// Always resolves immediately: a local proc is created
703    /// in-process and is usable as soon as the handle exists.
704    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
705        Ok(())
706    }
707    /// Always resolves immediately with `()`: a local proc has no
708    /// external lifecycle to await. There is no OS child process
709    /// behind this handle.
710    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
711        Ok(())
712    }
713
714    async fn terminate(
715        &self,
716        timeout: Duration,
717    ) -> Result<(), TerminateError<Self::TerminalStatus>> {
718        let mut proc = {
719            let guard = self.procs.lock().await;
720            match guard.get(self.proc_id()) {
721                Some(p) => p.clone(),
722                None => {
723                    // The proc was already removed; treat as already
724                    // terminal.
725                    return Err(TerminateError::AlreadyTerminated(()));
726                }
727            }
728        };
729
730        // Graceful stop of the *proc* (actors) with a deadline. This
731        // will drain and then abort remaining actors at expiry.
732        let _ = proc
733            .destroy_and_wait::<()>(timeout, None)
734            .await
735            .map_err(TerminateError::Io)?;
736
737        Ok(())
738    }
739
740    async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
741        // Forced stop == zero deadline; `destroy_and_wait` will
742        // immediately abort remaining actors and return.
743        let mut proc = {
744            let guard = self.procs.lock().await;
745            match guard.get(self.proc_id()) {
746                Some(p) => p.clone(),
747                None => return Err(TerminateError::AlreadyTerminated(())),
748            }
749        };
750
751        let _ = proc
752            .destroy_and_wait::<()>(Duration::from_millis(0), None)
753            .await
754            .map_err(TerminateError::Io)?;
755
756        Ok(())
757    }
758}
759
760/// Local, in-process ProcManager.
761///
762/// **Type bounds:**
763/// - `A: Actor + Referable + Binds<A>`
764///   - `Actor`: the agent actually runs inside the proc.
765///   - `Referable`: callers hold `ActorRef<A>` to the agent; this
766///     bound is required for typed remote refs.
767///   - `Binds<A>`: lets the runtime wire the agent's message ports.
768/// - `F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send`:
769///   the spawn closure returns a Send future (we `tokio::spawn` it).
770/// - `S: Fn(Proc) -> F + Sync`: the factory can be called from
771///   concurrent contexts.
772///
773/// Result handle is `LocalHandle<A>` (whose `Agent = A` via `ProcHandle`).
774#[async_trait]
775impl<A, S, F> ProcManager for LocalProcManager<S>
776where
777    A: Actor + Referable + Binds<A>,
778    F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send,
779    S: Fn(Proc) -> F + Sync,
780{
781    type Handle = LocalHandle<A>;
782
783    fn transport(&self) -> ChannelTransport {
784        ChannelTransport::Local
785    }
786
787    async fn spawn(
788        &self,
789        proc_id: ProcId,
790        forwarder_addr: ChannelAddr,
791    ) -> Result<Self::Handle, HostError> {
792        let transport = forwarder_addr.transport();
793        let proc = Proc::new(
794            proc_id.clone(),
795            MailboxClient::dial(forwarder_addr)?.into_boxed(),
796        );
797        let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?;
798        self.procs
799            .lock()
800            .await
801            .insert(proc_id.clone(), proc.clone());
802        let _handle = proc.clone().serve(rx);
803        let agent_handle = (self.spawn)(proc)
804            .await
805            .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
806
807        Ok(LocalHandle {
808            proc_id,
809            addr: proc_addr,
810            agent_ref: agent_handle.bind(),
811            procs: Arc::clone(&self.procs),
812        })
813    }
814}
815
816/// A ProcManager that manages each proc as a **separate OS process**
817/// (test-only toy).
818///
819/// This implementation launches a child via `Command` and relies on
820/// `kill_on_drop(true)` so that children are SIGKILLed if the manager
821/// (or host) drops. There is **no** proc control plane (no RPC to a
822/// proc agent for shutdown) and **no** exit monitor wired here.
823/// Consequently:
824/// - `terminate()` and `kill()` return `Unsupported`.
825/// - `wait()` is trivial (no lifecycle observation).
826///
827/// It follows a simple protocol:
828///
829/// Each process is launched with the following environment variables:
830/// - `HYPERACTOR_HOST_BACKEND_ADDR`: the backend address to which all messages are forwarded,
831/// - `HYPERACTOR_HOST_PROC_ID`: the proc id to assign the launched proc, and
832/// - `HYPERACTOR_HOST_CALLBACK_ADDR`: the channel address with which to return the proc's address
833///
834/// The launched proc should also spawn an actor to manage it - the details of this are
835/// implementation dependent, and outside the scope of the process manager.
836///
837/// The function [`boot_proc`] provides a convenient implementation of the
838/// protocol.
839pub struct ProcessProcManager<A> {
840    program: std::path::PathBuf,
841    children: Arc<Mutex<HashMap<ProcId, Child>>>,
842    _phantom: PhantomData<A>,
843}
844
845impl<A> ProcessProcManager<A> {
846    /// Create a new ProcessProcManager that runs the provided
847    /// command.
848    pub fn new(program: std::path::PathBuf) -> Self {
849        Self {
850            program,
851            children: Arc::new(Mutex::new(HashMap::new())),
852            _phantom: PhantomData,
853        }
854    }
855}
856
857impl<A> Drop for ProcessProcManager<A> {
858    fn drop(&mut self) {
859        // When the manager is dropped, `children` is dropped, which
860        // drops each `Child` handle. With `kill_on_drop(true)`, the OS
861        // will SIGKILL the processes. Nothing else to do here.
862    }
863}
864
865/// A [`ProcHandle`] implementation for procs managed as separate
866/// OS processes via [`ProcessProcManager`].
867///
868/// This handle records the logical identity and connectivity of an
869/// external child process:
870/// - its [`ProcId`] (unique identity on the host),
871/// - the proc's [`ChannelAddr`] (address registered in the host
872///   router),
873/// - and the [`ActorRef`] of the agent actor spawned inside the proc.
874///
875/// Unlike [`LocalHandle`], this corresponds to a real OS process
876/// launched by the manager. In this **toy** implementation the handle
877/// does not own/monitor the `Child` and there is no shutdown control
878/// plane. It is a stable, clonable surface exposing the proc's
879/// identity, address, and agent reference so host code can interact
880/// uniformly with local/external procs. `terminate()`/`kill()` are
881/// intentionally `Unsupported` here; process cleanup relies on
882/// `cmd.kill_on_drop(true)` when launching the child (the OS will
883/// SIGKILL it if the handle is dropped).
884///
885/// The type bound `A: Actor + Referable` comes from the
886/// [`ProcHandle::Agent`] requirement: `Actor` because the agent
887/// actually runs inside the proc, and `Referable` because it must
888/// be referenceable via [`ActorRef<A>`] (i.e., safe to carry as a
889/// typed remote reference).
890#[derive(Debug)]
891pub struct ProcessHandle<A: Actor + Referable> {
892    proc_id: ProcId,
893    addr: ChannelAddr,
894    agent_ref: ActorRef<A>,
895}
896
897// Manual `Clone` to avoid requiring `A: Clone`.
898impl<A: Actor + Referable> Clone for ProcessHandle<A> {
899    fn clone(&self) -> Self {
900        Self {
901            proc_id: self.proc_id.clone(),
902            addr: self.addr.clone(),
903            agent_ref: self.agent_ref.clone(),
904        }
905    }
906}
907
908#[async_trait]
909impl<A: Actor + Referable> ProcHandle for ProcessHandle<A> {
910    /// Agent must be both an `Actor` (runs in the proc) and a
911    /// `Referable` (so it can be referenced via `ActorRef<A>`).
912    type Agent = A;
913    type TerminalStatus = ();
914
915    fn proc_id(&self) -> &ProcId {
916        &self.proc_id
917    }
918    fn addr(&self) -> Option<ChannelAddr> {
919        Some(self.addr.clone())
920    }
921    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
922        Some(self.agent_ref.clone())
923    }
924
925    /// Resolves immediately. `ProcessProcManager::spawn` returns this
926    /// handle only after the child has called back with (addr,
927    /// agent), i.e. after readiness.
928    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
929        Ok(())
930    }
931    /// Resolves immediately with `()`. This handle does not track
932    /// child lifecycle; there is no watcher in this implementation.
933    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
934        Ok(())
935    }
936
937    async fn terminate(
938        &self,
939        _deadline: Duration,
940    ) -> Result<(), TerminateError<Self::TerminalStatus>> {
941        Err(TerminateError::Unsupported)
942    }
943
944    async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
945        Err(TerminateError::Unsupported)
946    }
947}
948
949#[async_trait]
950impl<A> ProcManager for ProcessProcManager<A>
951where
952    // Agent actor runs in the proc (`Actor`) and must be
953    // referenceable (`Referable`).
954    A: Actor + Referable,
955{
956    type Handle = ProcessHandle<A>;
957
958    fn transport(&self) -> ChannelTransport {
959        ChannelTransport::Unix
960    }
961
962    async fn spawn(
963        &self,
964        proc_id: ProcId,
965        forwarder_addr: ChannelAddr,
966    ) -> Result<Self::Handle, HostError> {
967        let (callback_addr, mut callback_rx) =
968            channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
969
970        let mut cmd = Command::new(&self.program);
971        cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string());
972        cmd.env("HYPERACTOR_HOST_BACKEND_ADDR", forwarder_addr.to_string());
973        cmd.env("HYPERACTOR_HOST_CALLBACK_ADDR", callback_addr.to_string());
974
975        // Lifetime strategy: mark the child with
976        // `kill_on_drop(true)` so the OS will send SIGKILL if the
977        // handle is dropped and retain the `Child` in
978        // `self.children`, tying its lifetime to the manager/host.
979        //
980        // This is the simplest viable policy to avoid orphaned
981        // subprocesses in CI; more sophisticated lifecycle control
982        // (graceful shutdown, restart) will be layered on later.
983
984        // Kill the child when its handle is dropped.
985        cmd.kill_on_drop(true);
986
987        let child = cmd
988            .spawn()
989            .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
990
991        // Retain the handle so it lives for the life of the
992        // manager/host.
993        {
994            let mut children = self.children.lock().await;
995            children.insert(proc_id.clone(), child);
996        }
997
998        // Wait for the child's callback with (addr, agent_ref)
999        let (proc_addr, agent_ref) = callback_rx.recv().await?;
1000
1001        // TODO(production): For a non-test implementation, plumb a
1002        // shutdown path:
1003        // - expose a proc-level graceful stop RPC on the agent and
1004        //   implement `terminate(timeout)` by invoking it and, on
1005        //   deadline, call `Child::kill()`; implement `kill()` as
1006        //   immediate `Child::kill()`.
1007        // - wire an exit monitor so `wait()` resolves with a real
1008        //   terminal status.
1009        Ok(ProcessHandle {
1010            proc_id,
1011            addr: proc_addr,
1012            agent_ref,
1013        })
1014    }
1015}
1016
1017impl<A> ProcessProcManager<A>
1018where
1019    // `Actor`: runs in the proc; `Referable`: referenceable via
1020    // ActorRef; `Binds<A>`: wires ports.
1021    A: Actor + Referable + Binds<A>,
1022{
1023    /// Boot a process in a ProcessProcManager<A>. Should be called from processes spawned
1024    /// by the process manager. `boot_proc` will spawn the provided actor type (with parameters)
1025    /// onto the newly created Proc, and bind its handler. This allows the user to install an agent to
1026    /// manage the proc itself.
1027    pub async fn boot_proc<S, F>(spawn: S) -> Result<Proc, HostError>
1028    where
1029        S: FnOnce(Proc) -> F,
1030        F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1031    {
1032        let proc_id: ProcId = Self::parse_env("HYPERACTOR_HOST_PROC_ID")?;
1033        let backend_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_BACKEND_ADDR")?;
1034        let callback_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_CALLBACK_ADDR")?;
1035        spawn_proc(proc_id, backend_addr, callback_addr, spawn).await
1036    }
1037
1038    fn parse_env<T, E>(key: &str) -> Result<T, HostError>
1039    where
1040        T: FromStr<Err = E>,
1041        E: Into<anyhow::Error>,
1042    {
1043        std::env::var(key)
1044            .map_err(|e| HostError::MissingParameter(key.to_string(), e))?
1045            .parse()
1046            .map_err(|e: E| HostError::InvalidParameter(key.to_string(), e.into()))
1047    }
1048}
1049
1050/// Spawn a proc at `proc_id` with an `A`-typed agent actor,
1051/// forwarding messages to the provided `backend_addr`,
1052/// and returning the proc's address and agent actor on
1053/// the provided `callback_addr`.
1054pub async fn spawn_proc<A, S, F>(
1055    proc_id: ProcId,
1056    backend_addr: ChannelAddr,
1057    callback_addr: ChannelAddr,
1058    spawn: S,
1059) -> Result<Proc, HostError>
1060where
1061    // `Actor`: runs in the proc; `Referable`: allows ActorRef<A>;
1062    // `Binds<A>`: wires ports
1063    A: Actor + Referable + Binds<A>,
1064    S: FnOnce(Proc) -> F,
1065    F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1066{
1067    let backend_transport = backend_addr.transport();
1068    let proc = Proc::new(
1069        proc_id.clone(),
1070        MailboxClient::dial(backend_addr)?.into_boxed(),
1071    );
1072
1073    let agent_handle = spawn(proc.clone())
1074        .await
1075        .map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?;
1076
1077    // Finally serve the proc on the same transport as the backend address,
1078    // and call back.
1079    let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?;
1080    proc.clone().serve(proc_rx);
1081    channel::dial(callback_addr)?
1082        .send((proc_addr, agent_handle.bind::<A>()))
1083        .await
1084        .map_err(ChannelError::from)?;
1085
1086    Ok(proc)
1087}
1088
1089/// Testing support for hosts. This is linked outside of cfg(test)
1090/// as it is needed by an external binary.
1091pub mod testing {
1092    use async_trait::async_trait;
1093
1094    use crate as hyperactor;
1095    use crate::Actor;
1096    use crate::ActorId;
1097    use crate::Context;
1098    use crate::Handler;
1099    use crate::OncePortRef;
1100
1101    /// Just a simple actor, available in both the bootstrap binary as well as
1102    /// hyperactor tests.
1103    #[derive(Debug, Default, Actor)]
1104    #[hyperactor::export(handlers = [OncePortRef<ActorId>])]
1105    pub struct EchoActor;
1106
1107    #[async_trait]
1108    impl Handler<OncePortRef<ActorId>> for EchoActor {
1109        async fn handle(
1110            &mut self,
1111            cx: &Context<Self>,
1112            reply: OncePortRef<ActorId>,
1113        ) -> Result<(), anyhow::Error> {
1114            reply.send(cx, cx.self_id().clone())?;
1115            Ok(())
1116        }
1117    }
1118}
1119
1120#[cfg(test)]
1121mod tests {
1122    use std::sync::Arc;
1123    use std::time::Duration;
1124
1125    use super::testing::EchoActor;
1126    use super::*;
1127    use crate::channel::ChannelTransport;
1128    use crate::clock::Clock;
1129    use crate::clock::RealClock;
1130    use crate::context::Mailbox;
1131
1132    #[tokio::test]
1133    async fn test_basic() {
1134        let proc_manager =
1135            LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()).await });
1136        let procs = Arc::clone(&proc_manager.procs);
1137        let (mut host, _handle) =
1138            Host::serve(proc_manager, ChannelAddr::any(ChannelTransport::Local))
1139                .await
1140                .unwrap();
1141
1142        let (proc_id1, _ref) = host.spawn("proc1".to_string()).await.unwrap();
1143        assert_eq!(
1144            proc_id1,
1145            ProcId::Direct(host.addr().clone(), "proc1".to_string())
1146        );
1147        assert!(procs.lock().await.contains_key(&proc_id1));
1148
1149        let (proc_id2, _ref) = host.spawn("proc2".to_string()).await.unwrap();
1150        assert!(procs.lock().await.contains_key(&proc_id2));
1151
1152        let proc1 = procs.lock().await.get(&proc_id1).unwrap().clone();
1153        let proc2 = procs.lock().await.get(&proc_id2).unwrap().clone();
1154
1155        // Make sure they can talk to each other:
1156        let (instance1, _handle) = proc1.instance("client").unwrap();
1157        let (instance2, _handle) = proc2.instance("client").unwrap();
1158
1159        let (port, mut rx) = instance1.mailbox().open_port();
1160
1161        port.bind().send(&instance2, "hello".to_string()).unwrap();
1162        assert_eq!(rx.recv().await.unwrap(), "hello".to_string());
1163
1164        // Make sure that the system proc is also wired in correctly.
1165        let (system_actor, _handle) = host.system_proc().instance("test").unwrap();
1166
1167        // system->proc
1168        port.bind()
1169            .send(&system_actor, "hello from the system proc".to_string())
1170            .unwrap();
1171        assert_eq!(
1172            rx.recv().await.unwrap(),
1173            "hello from the system proc".to_string()
1174        );
1175
1176        // system->system
1177        let (port, mut rx) = system_actor.mailbox().open_port();
1178        port.bind()
1179            .send(&system_actor, "hello from the system".to_string())
1180            .unwrap();
1181        assert_eq!(
1182            rx.recv().await.unwrap(),
1183            "hello from the system".to_string()
1184        );
1185
1186        // proc->system
1187        port.bind()
1188            .send(&instance1, "hello from the instance1".to_string())
1189            .unwrap();
1190        assert_eq!(
1191            rx.recv().await.unwrap(),
1192            "hello from the instance1".to_string()
1193        );
1194    }
1195
1196    #[tokio::test]
1197    // TODO: OSS: called `Result::unwrap()` on an `Err` value: ReadFailed { manifest_path: "/meta-pytorch/monarch/target/debug/deps/hyperactor-0e1fe83af739d976.resources.json", source: Os { code: 2, kind: NotFound, message: "No such file or directory" } }
1198    #[cfg_attr(not(feature = "fb"), ignore)]
1199    async fn test_process_proc_manager() {
1200        hyperactor_telemetry::initialize_logging(crate::clock::ClockKind::default());
1201
1202        // EchoActor is "agent" used to test connectivity.
1203        let process_manager = ProcessProcManager::<EchoActor>::new(
1204            buck_resources::get("monarch/hyperactor/bootstrap").unwrap(),
1205        );
1206        let (mut host, _handle) =
1207            Host::serve(process_manager, ChannelAddr::any(ChannelTransport::Unix))
1208                .await
1209                .unwrap();
1210
1211        // (1) Spawn and check invariants.
1212        assert!(matches!(host.addr().transport(), ChannelTransport::Unix));
1213        let (proc1, echo1) = host.spawn("proc1".to_string()).await.unwrap();
1214        let (proc2, echo2) = host.spawn("proc2".to_string()).await.unwrap();
1215        assert_eq!(echo1.actor_id().proc_id(), &proc1);
1216        assert_eq!(echo2.actor_id().proc_id(), &proc2);
1217
1218        // (2) Duplicate name rejection.
1219        let dup = host.spawn("proc1".to_string()).await;
1220        assert!(matches!(dup, Err(HostError::ProcExists(_))));
1221
1222        // (3) Create a standalone client proc and verify echo1 agent responds.
1223        // Request: client proc -> host frontend/router -> echo1 (proc1).
1224        // Reply:   echo1 (proc1) -> host backend -> host router -> client port.
1225        // This confirms that an external proc (created via
1226        // `Proc::direct`) can address a child proc through the host,
1227        // and receive a correct reply.
1228        let client = Proc::direct(
1229            ChannelAddr::any(host.addr().transport()),
1230            "test".to_string(),
1231        )
1232        .await
1233        .unwrap();
1234        let (client_inst, _h) = client.instance("test").unwrap();
1235        let (port, rx) = client_inst.mailbox().open_once_port();
1236        echo1.send(&client_inst, port.bind()).unwrap();
1237        let id = RealClock
1238            .timeout(Duration::from_secs(5), rx.recv())
1239            .await
1240            .unwrap()
1241            .unwrap();
1242        assert_eq!(id, *echo1.actor_id());
1243
1244        // (4) Child <-> external client request -> reply:
1245        // Request: client proc (standalone via `Proc::direct`) ->
1246        //          host frontend/router -> echo2 (proc2).
1247        // Reply:   echo2 (proc2) -> host backend -> host router ->
1248        //          client port (standalone proc).
1249        // This exercises cross-proc routing between a child and an
1250        // external client under the same host.
1251        let (port2, rx2) = client_inst.mailbox().open_once_port();
1252        echo2.send(&client_inst, port2.bind()).unwrap();
1253        let id2 = RealClock
1254            .timeout(Duration::from_secs(5), rx2.recv())
1255            .await
1256            .unwrap()
1257            .unwrap();
1258        assert_eq!(id2, *echo2.actor_id());
1259
1260        // (5) System -> child request -> cross-proc reply:
1261        // Request: system proc -> host router (frontend) -> echo1
1262        //          (proc1, child).
1263        // Reply: echo1 (proc1) -> proc1 forwarder -> host backend ->
1264        //        host router -> client proc direct addr (Proc::direct) ->
1265        //        client port.
1266        // Because `client_inst` runs in its own proc, the reply
1267        // traverses the host (not local delivery within proc1).
1268        let (sys_inst, _h) = host.system_proc().instance("sys-client").unwrap();
1269        let (port3, rx3) = client_inst.mailbox().open_once_port();
1270        // Send from system -> child via a message that ultimately
1271        // replies to client's port
1272        echo1.send(&sys_inst, port3.bind()).unwrap();
1273        let id3 = RealClock
1274            .timeout(Duration::from_secs(5), rx3.recv())
1275            .await
1276            .unwrap()
1277            .unwrap();
1278        assert_eq!(id3, *echo1.actor_id());
1279    }
1280
1281    #[tokio::test]
1282    async fn local_ready_and_wait_are_immediate() {
1283        // Build a LocalHandle directly.
1284        let addr = ChannelAddr::any(ChannelTransport::Local);
1285        let proc_id = ProcId::Direct(addr.clone(), "p".into());
1286        let agent_ref = ActorRef::<()>::attest(proc_id.actor_id("agent", 0));
1287        let h = LocalHandle::<()> {
1288            proc_id,
1289            addr,
1290            agent_ref,
1291            procs: Arc::new(Mutex::new(HashMap::new())),
1292        };
1293
1294        // ready() resolves immediately
1295        assert!(h.ready().await.is_ok());
1296
1297        // wait() resolves immediately with unit TerminalStatus
1298        assert!(h.wait().await.is_ok());
1299
1300        // Multiple concurrent waiters both succeed
1301        let (r1, r2) = tokio::join!(h.ready(), h.ready());
1302        assert!(r1.is_ok() && r2.is_ok());
1303    }
1304
1305    // --
1306    // Fixtures for `host::spawn` tests.
1307
1308    #[derive(Debug, Clone, Copy)]
1309    enum ReadyMode {
1310        OkAfter(Duration),
1311        ErrTerminal,
1312        ErrChannelClosed,
1313    }
1314
1315    #[derive(Debug, Clone)]
1316    struct TestHandle {
1317        id: ProcId,
1318        addr: ChannelAddr,
1319        agent: ActorRef<()>,
1320        mode: ReadyMode,
1321        omit_addr: bool,
1322        omit_agent: bool,
1323    }
1324
1325    #[async_trait::async_trait]
1326    impl ProcHandle for TestHandle {
1327        type Agent = ();
1328        type TerminalStatus = ();
1329
1330        fn proc_id(&self) -> &ProcId {
1331            &self.id
1332        }
1333        fn addr(&self) -> Option<ChannelAddr> {
1334            if self.omit_addr {
1335                None
1336            } else {
1337                Some(self.addr.clone())
1338            }
1339        }
1340        fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1341            if self.omit_agent {
1342                None
1343            } else {
1344                Some(self.agent.clone())
1345            }
1346        }
1347        async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1348            match self.mode {
1349                ReadyMode::OkAfter(d) => {
1350                    if !d.is_zero() {
1351                        RealClock.sleep(d).await;
1352                    }
1353                    Ok(())
1354                }
1355                ReadyMode::ErrTerminal => Err(ReadyError::Terminal(())),
1356                ReadyMode::ErrChannelClosed => Err(ReadyError::ChannelClosed),
1357            }
1358        }
1359        async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1360            Ok(())
1361        }
1362        async fn terminate(
1363            &self,
1364            _timeout: Duration,
1365        ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
1366            Err(TerminateError::Unsupported)
1367        }
1368        async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
1369            Err(TerminateError::Unsupported)
1370        }
1371    }
1372
1373    #[derive(Debug, Clone)]
1374    struct TestManager {
1375        mode: ReadyMode,
1376        omit_addr: bool,
1377        omit_agent: bool,
1378        transport: ChannelTransport,
1379    }
1380
1381    impl TestManager {
1382        fn local(mode: ReadyMode) -> Self {
1383            Self {
1384                mode,
1385                omit_addr: false,
1386                omit_agent: false,
1387                transport: ChannelTransport::Local,
1388            }
1389        }
1390        fn with_omissions(mut self, addr: bool, agent: bool) -> Self {
1391            self.omit_addr = addr;
1392            self.omit_agent = agent;
1393            self
1394        }
1395    }
1396
1397    #[async_trait::async_trait]
1398    impl ProcManager for TestManager {
1399        type Handle = TestHandle;
1400
1401        fn transport(&self) -> ChannelTransport {
1402            self.transport.clone()
1403        }
1404        async fn spawn(
1405            &self,
1406            proc_id: ProcId,
1407            forwarder_addr: ChannelAddr,
1408        ) -> Result<Self::Handle, HostError> {
1409            let agent = ActorRef::<()>::attest(proc_id.actor_id("agent", 0));
1410            Ok(TestHandle {
1411                id: proc_id,
1412                addr: forwarder_addr,
1413                agent,
1414                mode: self.mode,
1415                omit_addr: self.omit_addr,
1416                omit_agent: self.omit_agent,
1417            })
1418        }
1419    }
1420
1421    #[tokio::test]
1422    async fn host_spawn_times_out_when_configured() {
1423        let cfg = crate::config::global::lock();
1424        let _g = cfg.override_key(
1425            crate::config::HOST_SPAWN_READY_TIMEOUT,
1426            Duration::from_millis(10),
1427        );
1428
1429        let (mut host, _h) = Host::serve(
1430            TestManager::local(ReadyMode::OkAfter(Duration::from_millis(50))),
1431            ChannelAddr::any(ChannelTransport::Local),
1432        )
1433        .await
1434        .unwrap();
1435
1436        let err = host.spawn("t".into()).await.expect_err("must time out");
1437        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1438    }
1439
1440    #[tokio::test]
1441    async fn host_spawn_timeout_zero_disables_and_succeeds() {
1442        let cfg = crate::config::global::lock();
1443        let _g = cfg.override_key(
1444            crate::config::HOST_SPAWN_READY_TIMEOUT,
1445            Duration::from_secs(0),
1446        );
1447
1448        let (mut host, _h) = Host::serve(
1449            TestManager::local(ReadyMode::OkAfter(Duration::from_millis(20))),
1450            ChannelAddr::any(ChannelTransport::Local),
1451        )
1452        .await
1453        .unwrap();
1454
1455        let (pid, agent) = host.spawn("ok".into()).await.expect("must succeed");
1456        assert_eq!(agent.actor_id().proc_id(), &pid);
1457        assert!(host.procs.contains_key("ok"));
1458    }
1459
1460    #[tokio::test]
1461    async fn host_spawn_maps_channel_closed_ready_error_to_config_failure() {
1462        let (mut host, _h) = Host::serve(
1463            TestManager::local(ReadyMode::ErrChannelClosed),
1464            ChannelAddr::any(ChannelTransport::Local),
1465        )
1466        .await
1467        .unwrap();
1468
1469        let err = host.spawn("p".into()).await.expect_err("must fail");
1470        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1471    }
1472
1473    #[tokio::test]
1474    async fn host_spawn_maps_terminal_ready_error_to_config_failure() {
1475        let (mut host, _h) = Host::serve(
1476            TestManager::local(ReadyMode::ErrTerminal),
1477            ChannelAddr::any(ChannelTransport::Local),
1478        )
1479        .await
1480        .unwrap();
1481
1482        let err = host.spawn("p".into()).await.expect_err("must fail");
1483        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1484    }
1485
1486    #[tokio::test]
1487    async fn host_spawn_fails_if_ready_but_missing_addr() {
1488        let (mut host, _h) = Host::serve(
1489            TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(true, false),
1490            ChannelAddr::any(ChannelTransport::Local),
1491        )
1492        .await
1493        .unwrap();
1494
1495        let err = host.spawn("no-addr".into()).await.expect_err("must fail");
1496        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1497    }
1498
1499    #[tokio::test]
1500    async fn host_spawn_fails_if_ready_but_missing_agent() {
1501        let (mut host, _h) = Host::serve(
1502            TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(false, true),
1503            ChannelAddr::any(ChannelTransport::Local),
1504        )
1505        .await
1506        .unwrap();
1507
1508        let err = host.spawn("no-agent".into()).await.expect_err("must fail");
1509        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1510    }
1511}