hyperactor/
host.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module defines [`Host`], which represents all the procs running on a host.
10//! The procs themselves are managed by an implementation of [`ProcManager`], which may,
11//! for example, fork new processes for each proc, or spawn them in the same process
12//! for testing purposes.
13//!
14//! The primary purpose of a host is to manage the lifecycle of these procs, and to
15//! serve as a single front-end for all the procs on a host, multiplexing network
16//! channels.
17//!
18//! ## Channel muxing
19//!
20//! A [`Host`] maintains a single frontend address, through which all procs are accessible
21//! through direct addressing: the id of each proc is the `ProcId(frontend_addr, proc_name)`.
22//! In the following, the frontend address is denoted by `*`. The host listens on `*` and
23//! multiplexes messages based on the proc name. When spawning procs, the host maintains
24//! backend channels with separate addresses. In the diagram `#` is the backend address of
25//! the host, while `#n` is the backend address for proc *n*. The host forwards messages
26//! to the appropriate backend channel, while procs forward messages to the host backend
27//! channel at `#`.
28//!
29//! ```text
30//!                      ┌────────────┐
31//!                  ┌───▶  proc *,1  │
32//!                  │ #1└────────────┘
33//!                  │
34//!  ┌──────────┐    │   ┌────────────┐
35//!  │   Host   │◀───┼───▶  proc *,2  │
36//! *└──────────┘#   │ #2└────────────┘
37//!                  │
38//!                  │   ┌────────────┐
39//!                  └───▶  proc *,3  │
40//!                    #3└────────────┘
41//! ```
42//!
43//! ## Local proc invariant (LP-*)
44//!
45//! - **LP-1 (lazy activation):** The local proc always exists as a
46//!   `ProcId::Direct(addr, LOCAL_PROC_NAME)` and is forwarded
47//!   in-process by the host's mailbox muxer. However it starts with
48//!   zero actors. A `ProcAgent` and root client actor are added only
49//!   when `HostMeshAgent::handle(GetLocalProc)` is first called.
50
51use std::collections::HashMap;
52use std::collections::HashSet;
53use std::fmt;
54use std::marker::PhantomData;
55use std::str::FromStr;
56use std::sync::Arc;
57use std::time::Duration;
58
59use async_trait::async_trait;
60use futures::Future;
61use futures::StreamExt;
62use futures::stream;
63use tokio::process::Child;
64use tokio::process::Command;
65use tokio::sync::Mutex;
66
67use crate as hyperactor;
68use crate::Actor;
69use crate::ActorHandle;
70use crate::PortHandle;
71use crate::Proc;
72use crate::actor::Binds;
73use crate::actor::Referable;
74use crate::channel;
75use crate::channel::ChannelAddr;
76use crate::channel::ChannelError;
77use crate::channel::ChannelRx;
78use crate::channel::ChannelTransport;
79use crate::channel::Rx;
80use crate::channel::Tx;
81use crate::context;
82use crate::mailbox::BoxableMailboxSender;
83use crate::mailbox::BoxedMailboxSender;
84use crate::mailbox::DialMailboxRouter;
85use crate::mailbox::IntoBoxedMailboxSender as _;
86use crate::mailbox::MailboxClient;
87use crate::mailbox::MailboxSender;
88use crate::mailbox::MailboxServer;
89use crate::mailbox::MailboxServerHandle;
90use crate::mailbox::MessageEnvelope;
91use crate::mailbox::Undeliverable;
92use crate::reference;
93
94/// Name of the system service proc on a host — hosts the admin actor
95/// layer (HostMeshAgent, MeshAdminAgent, bridge).
96pub const SERVICE_PROC_NAME: &str = "service";
97
98/// Name of the local client proc on a host.
99///
100/// See LP-1 (lazy activation) in module doc.
101///
102/// In pure-Rust programs (e.g. sieve, dining_philosophers)
103/// `GetLocalProc` is never sent, so the local proc remains empty
104/// throughout the program's lifetime. Code that inspects the local
105/// proc's actors must not assume they exist.
106pub const LOCAL_PROC_NAME: &str = "local";
107
108/// The type of error produced by host operations.
109#[derive(Debug, thiserror::Error)]
110pub enum HostError {
111    /// A channel error occurred during a host operation.
112    #[error(transparent)]
113    ChannelError(#[from] ChannelError),
114
115    /// The named proc already exists and cannot be spawned.
116    #[error("proc '{0}' already exists")]
117    ProcExists(String),
118
119    /// Failures occuring while spawning a subprocess.
120    #[error("proc '{0}' failed to spawn process: {1}")]
121    ProcessSpawnFailure(reference::ProcId, #[source] std::io::Error),
122
123    /// Failures occuring while configuring a subprocess.
124    #[error("proc '{0}' failed to configure process: {1}")]
125    ProcessConfigurationFailure(reference::ProcId, #[source] anyhow::Error),
126
127    /// Failures occuring while spawning a management actor in a proc.
128    #[error("failed to spawn agent on proc '{0}': {1}")]
129    AgentSpawnFailure(reference::ProcId, #[source] anyhow::Error),
130
131    /// An input parameter was missing.
132    #[error("parameter '{0}' missing: {1}")]
133    MissingParameter(String, std::env::VarError),
134
135    /// An input parameter was invalid.
136    #[error("parameter '{0}' invalid: {1}")]
137    InvalidParameter(String, anyhow::Error),
138}
139
140/// A host, managing the lifecycle of several procs, and their backend
141/// routing, as described in this module's documentation.
142pub struct Host<M> {
143    procs: HashSet<String>,
144    frontend_addr: ChannelAddr,
145    backend_addr: ChannelAddr,
146    router: DialMailboxRouter,
147    manager: M,
148    service_proc: Proc,
149    local_proc: Proc,
150    frontend_rx: Option<ChannelRx<MessageEnvelope>>,
151}
152
153impl<M: ProcManager> Host<M> {
154    /// Serve a host using the provided ProcManager, on the provided `addr`.
155    /// On success, the host will multiplex messages for procs on the host
156    /// on the address of the host.
157    pub async fn new(manager: M, addr: ChannelAddr) -> Result<Self, HostError> {
158        Self::new_with_default(manager, addr, None).await
159    }
160
161    /// Like [`new`], serves a host using the provided ProcManager, on the provided `addr`.
162    /// Unknown destinations are forwarded to the default sender.
163    #[crate::instrument(fields(addr=addr.to_string()))]
164    pub async fn new_with_default(
165        manager: M,
166        addr: ChannelAddr,
167        default_sender: Option<BoxedMailboxSender>,
168    ) -> Result<Self, HostError> {
169        let (frontend_addr, frontend_rx) = channel::serve(addr)?;
170
171        // We set up a cascade of routers: first, the outer router supports
172        // sending to the the system proc, while the dial router manages dialed
173        // connections.
174        let router = match default_sender {
175            Some(d) => DialMailboxRouter::new_with_default(d),
176            None => DialMailboxRouter::new(),
177        };
178
179        // Establish a backend channel on the preferred transport. We currently simply
180        // serve the same router on both.
181        let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?;
182
183        // Set up a system proc. This is often used to manage the host itself.
184        // These use with_name (not unique) because their uniqueness is
185        // guaranteed by the ChannelAddr component, and the Name type's
186        // '-' delimiter must not collide with a hash suffix.
187        let service_proc_id =
188            reference::ProcId::with_name(frontend_addr.clone(), SERVICE_PROC_NAME);
189        let service_proc = Proc::configured(service_proc_id.clone(), router.boxed());
190
191        let local_proc_id = reference::ProcId::with_name(frontend_addr.clone(), LOCAL_PROC_NAME);
192        let local_proc = Proc::configured(local_proc_id.clone(), router.boxed());
193
194        tracing::info!(
195            frontend_addr = frontend_addr.to_string(),
196            backend_addr = backend_addr.to_string(),
197            service_proc_id = service_proc_id.to_string(),
198            local_proc_id = local_proc_id.to_string(),
199            "serving host"
200        );
201
202        let host = Host {
203            procs: HashSet::new(),
204            frontend_addr,
205            backend_addr,
206            router,
207            manager,
208            service_proc,
209            local_proc,
210            frontend_rx: Some(frontend_rx),
211        };
212
213        // We the same router on both frontend and backend addresses.
214        let _backend_handle = host.forwarder().serve(backend_rx);
215
216        Ok(host)
217    }
218
219    /// Start serving this host's mailbox on its frontend address.
220    /// Returns the server handle on first invocation; afterwards None.
221    pub fn serve(&mut self) -> Option<MailboxServerHandle> {
222        Some(self.forwarder().serve(self.frontend_rx.take()?))
223    }
224
225    /// The underlying proc manager.
226    pub fn manager(&self) -> &M {
227        &self.manager
228    }
229
230    /// The address which accepts messages destined for this host.
231    pub fn addr(&self) -> &ChannelAddr {
232        &self.frontend_addr
233    }
234
235    /// The system proc associated with this host.
236    /// This is used to run host-level system services like host managers.
237    pub fn system_proc(&self) -> &Proc {
238        &self.service_proc
239    }
240
241    /// The local proc associated with this host (`LOCAL_PROC_NAME`).
242    ///
243    /// Starts with zero actors; see invariant LP-1 on
244    /// [`LOCAL_PROC_NAME`] for activation semantics.
245    pub fn local_proc(&self) -> &Proc {
246        &self.local_proc
247    }
248
249    /// Spawn a new process with the given `name`. On success, the
250    /// proc has been spawned, and is reachable through the returned,
251    /// direct-addressed ProcId, which will be
252    /// `ProcId(self.addr(), name)`.
253    pub async fn spawn(
254        &mut self,
255        name: String,
256        config: M::Config,
257    ) -> Result<(reference::ProcId, reference::ActorRef<ManagerAgent<M>>), HostError> {
258        if self.procs.contains(&name) {
259            return Err(HostError::ProcExists(name));
260        }
261
262        let proc_id = reference::ProcId::with_name(self.frontend_addr.clone(), &name);
263        let handle = self
264            .manager
265            .spawn(proc_id.clone(), self.backend_addr.clone(), config)
266            .await?;
267
268        // Await readiness (config-driven; 0s disables timeout).
269        let to: Duration = hyperactor_config::global::get(crate::config::HOST_SPAWN_READY_TIMEOUT);
270        let ready = if to == Duration::from_secs(0) {
271            ReadyProc::ensure(&handle).await
272        } else {
273            match tokio::time::timeout(to, ReadyProc::ensure(&handle)).await {
274                Ok(result) => result,
275                Err(_elapsed) => Err(ReadyProcError::Timeout),
276            }
277        }
278        .map_err(|e| {
279            HostError::ProcessConfigurationFailure(proc_id.clone(), anyhow::anyhow!("{e:?}"))
280        })?;
281
282        self.router
283            .bind(proc_id.clone().into(), ready.addr().clone());
284        self.procs.insert(name.clone());
285
286        Ok((proc_id, ready.agent_ref().clone()))
287    }
288
289    fn forwarder(&self) -> ProcOrDial {
290        ProcOrDial {
291            service_proc: self.service_proc.clone(),
292            local_proc: self.local_proc.clone(),
293            dialer: self.router.clone(),
294        }
295    }
296}
297
298/// A router used to route to the system proc, or else fall back to
299/// the dial mailbox router.
300#[derive(Clone)]
301struct ProcOrDial {
302    service_proc: Proc,
303    local_proc: Proc,
304    dialer: DialMailboxRouter,
305}
306
307#[async_trait]
308impl MailboxSender for ProcOrDial {
309    fn post_unchecked(
310        &self,
311        envelope: MessageEnvelope,
312        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
313    ) {
314        if envelope.dest().actor_id().proc_id() == self.service_proc.proc_id() {
315            self.service_proc.post_unchecked(envelope, return_handle);
316        } else if envelope.dest().actor_id().proc_id() == self.local_proc.proc_id() {
317            self.local_proc.post_unchecked(envelope, return_handle);
318        } else {
319            self.dialer.post_unchecked(envelope, return_handle)
320        }
321    }
322
323    async fn flush(&self) -> Result<(), anyhow::Error> {
324        let (r1, r2, r3) = futures::future::join3(
325            self.service_proc.flush(),
326            self.local_proc.flush(),
327            self.dialer.flush(),
328        )
329        .await;
330        r1?;
331        r2?;
332        r3?;
333        Ok(())
334    }
335}
336
337/// Error returned by [`ProcHandle::ready`].
338#[derive(Debug, Clone)]
339pub enum ReadyError<TerminalStatus> {
340    /// The proc reached a terminal state before becoming Ready.
341    Terminal(TerminalStatus),
342    /// Implementation lost its status channel / cannot observe state.
343    ChannelClosed,
344}
345
346/// Error returned by [`ready_proc`].
347#[derive(Debug, Clone)]
348pub enum ReadyProcError<TerminalStatus> {
349    /// Timed out waiting for ready.
350    Timeout,
351    /// The underlying `ready()` call failed.
352    Ready(ReadyError<TerminalStatus>),
353    /// The handle's `addr()` returned `None` after `ready()` succeeded.
354    MissingAddr,
355    /// The handle's `agent_ref()` returned `None` after `ready()`
356    /// succeeded.
357    MissingAgentRef,
358}
359
360impl<T> From<ReadyError<T>> for ReadyProcError<T> {
361    fn from(e: ReadyError<T>) -> Self {
362        ReadyProcError::Ready(e)
363    }
364}
365
366/// Error returned by [`ProcHandle::wait`].
367#[derive(Debug, Clone)]
368pub enum WaitError {
369    /// Implementation lost its status channel / cannot observe state.
370    ChannelClosed,
371}
372
373/// Error returned by [`ProcHandle::terminate`] and
374/// [`ProcHandle::kill`].
375///
376/// - `Unsupported`: the manager cannot perform the requested proc
377///   signaling (e.g., local/in-process manager that doesn't emulate
378///   kill).
379/// - `AlreadyTerminated(term)`: the proc was already terminal; `term`
380///   is the same value `wait()` would return.
381/// - `ChannelClosed`: the manager lost its lifecycle channel and
382///   cannot reliably observe state transitions.
383/// - `Io(err)`: manager-specific failure delivering the signal or
384///   performing shutdown (e.g., OS error on kill).
385#[derive(Debug)]
386pub enum TerminateError<TerminalStatus> {
387    /// Manager doesn't support signaling (e.g., Local manager).
388    Unsupported,
389    /// A terminal state was already reached while attempting
390    /// terminate/kill.
391    AlreadyTerminated(TerminalStatus),
392    /// Implementation lost its status channel / cannot observe state.
393    ChannelClosed,
394    /// Manager-specific failure to deliver signal or perform
395    /// shutdown.
396    Io(anyhow::Error),
397}
398
399impl<T: fmt::Debug> fmt::Display for TerminateError<T> {
400    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
401        match self {
402            TerminateError::Unsupported => write!(f, "terminate/kill unsupported by manager"),
403            TerminateError::AlreadyTerminated(st) => {
404                write!(f, "proc already terminated (status: {st:?})")
405            }
406            TerminateError::ChannelClosed => {
407                write!(f, "lifecycle channel closed; cannot observe state")
408            }
409            TerminateError::Io(err) => write!(f, "I/O error during terminate/kill: {err}"),
410        }
411    }
412}
413
414impl<T: fmt::Debug> std::error::Error for TerminateError<T> {
415    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
416        match self {
417            TerminateError::Io(err) => Some(err.root_cause()),
418            _ => None,
419        }
420    }
421}
422
423/// Summary of results from a bulk termination attempt.
424///
425/// - `attempted`: total number of child procs for which termination
426///   was attempted.
427/// - `ok`: number of procs successfully terminated (includes those
428///   that were already in a terminal state).
429/// - `failed`: number of procs that could not be terminated (e.g.
430///   signaling errors or lost lifecycle channel).
431#[derive(Debug)]
432pub struct TerminateSummary {
433    /// Total number of child procs for which termination was
434    /// attempted.
435    pub attempted: usize,
436    /// Number of procs that successfully reached a terminal state.
437    ///
438    /// This count includes both procs that exited cleanly after
439    /// `terminate(timeout)` and those that were already in a terminal
440    /// state before termination was attempted.
441    pub ok: usize,
442    /// Number of procs that failed to terminate.
443    ///
444    /// Failures typically arise from signaling errors (e.g., OS
445    /// failure to deliver SIGTERM/SIGKILL) or a lost lifecycle
446    /// channel, meaning the manager could no longer observe state
447    /// transitions.
448    pub failed: usize,
449}
450
451impl fmt::Display for TerminateSummary {
452    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453        write!(
454            f,
455            "attempted={} ok={} failed={}",
456            self.attempted, self.ok, self.failed
457        )
458    }
459}
460
461#[async_trait::async_trait]
462/// Trait for terminating a single proc.
463pub trait SingleTerminate: Send + Sync {
464    /// Gracefully terminate the given proc.
465    ///
466    /// Initiates a polite shutdown for each child, waits up to
467    /// `timeout` for completion, then escalates to a forceful stop
468    /// The returned [`TerminateSummary`] reports how
469    /// many children were attempted, succeeded, and failed.
470    ///
471    /// Implementation notes:
472    /// - "Polite shutdown" and "forceful stop" are intentionally
473    ///   abstract. Implementors should map these to whatever
474    ///   semantics they control (e.g., proc-level drain/abort, RPCs,
475    ///   OS signals).
476    /// - The operation must be idempotent and tolerate races with
477    ///   concurrent termination or external exits.
478    ///
479    /// # Parameters
480    /// - `timeout`: Per-child grace period before escalation to a
481    ///   forceful stop.
482    /// - `reason`: Human-readable reason for termination.
483    /// Returns a tuple of (polite shutdown actors vec, forceful stop actors vec)
484    async fn terminate_proc(
485        &self,
486        cx: &impl context::Actor,
487        proc: &reference::ProcId,
488        timeout: std::time::Duration,
489        reason: &str,
490    ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error>;
491}
492
493/// Trait for managers that can terminate many child **units** in
494/// bulk.
495///
496/// Implementors provide a concurrency-bounded, graceful shutdown over
497/// all currently tracked children (polite stop → wait → forceful
498/// stop), returning a summary of outcomes. The exact stop/kill
499/// semantics are manager-specific: for example, an OS-process manager
500/// might send signals, while an in-process manager might drain/abort
501/// tasks.
502#[async_trait::async_trait]
503pub trait BulkTerminate: Send + Sync {
504    /// Gracefully terminate all known children.
505    ///
506    /// Initiates a polite shutdown for each child, waits up to
507    /// `timeout` for completion, then escalates to a forceful stop
508    /// for any that remain. Work may be done in parallel, capped by
509    /// `max_in_flight`. The returned [`TerminateSummary`] reports how
510    /// many children were attempted, succeeded, and failed.
511    ///
512    /// Implementation notes:
513    /// - "Polite shutdown" and "forceful stop" are intentionally
514    ///   abstract. Implementors should map these to whatever
515    ///   semantics they control (e.g., proc-level drain/abort, RPCs,
516    ///   OS signals).
517    /// - The operation must be idempotent and tolerate races with
518    ///   concurrent termination or external exits.
519    ///
520    /// # Parameters
521    /// - `timeout`: Per-child grace period before escalation to a
522    ///   forceful stop.
523    /// - `max_in_flight`: Upper bound on concurrent terminations (≥
524    ///   1) to prevent resource spikes (I/O, CPU, file descriptors,
525    ///   etc.).
526    async fn terminate_all(
527        &self,
528        cx: &impl context::Actor,
529        timeout: std::time::Duration,
530        max_in_flight: usize,
531        reason: &str,
532    ) -> TerminateSummary;
533}
534
535// Host convenience that's available only when its manager supports
536// bulk termination.
537impl<M: ProcManager + BulkTerminate> Host<M> {
538    /// Gracefully terminate all procs spawned by this host.
539    ///
540    /// Delegates to the underlying manager’s
541    /// [`BulkTerminate::terminate_all`] implementation. Use this to
542    /// perform orderly teardown during scale-down or shutdown.
543    ///
544    /// # Parameters
545    /// - `timeout`: Per-child grace period before escalation.
546    /// - `max_in_flight`: Upper bound on concurrent terminations.
547    ///
548    /// # Returns
549    /// A [`TerminateSummary`] with counts of attempted/ok/failed
550    /// terminations.
551    pub async fn terminate_children(
552        &mut self,
553        cx: &impl context::Actor,
554        timeout: Duration,
555        max_in_flight: usize,
556        reason: &str,
557    ) -> TerminateSummary {
558        let summary = self
559            .manager
560            .terminate_all(cx, timeout, max_in_flight, reason)
561            .await;
562        // Unbind procs from the router so if new procs are made with the same
563        // names, they can use the same slot.
564        for name in self.procs.drain() {
565            let proc_id = reference::ProcId::with_name(self.frontend_addr.clone(), &name);
566            self.router.unbind(&proc_id.into());
567        }
568        summary
569    }
570}
571
572#[async_trait::async_trait]
573impl<M: ProcManager + SingleTerminate> SingleTerminate for Host<M> {
574    async fn terminate_proc(
575        &self,
576        cx: &impl context::Actor,
577        proc: &reference::ProcId,
578        timeout: Duration,
579        reason: &str,
580    ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
581        self.manager.terminate_proc(cx, proc, timeout, reason).await
582    }
583}
584
585/// Capability proving a proc is ready.
586///
587/// [`ReadyProc::ensure`] validates that `addr()` and `agent_ref()`
588/// are available; this type carries that proof, providing infallible
589/// accessors.
590///
591/// Obtain a `ReadyProc` by calling `ready_proc(&handle).await`.
592pub struct ReadyProc<'a, H: ProcHandle> {
593    handle: &'a H,
594    addr: ChannelAddr,
595    agent_ref: reference::ActorRef<H::Agent>,
596}
597
598impl<'a, H: ProcHandle> ReadyProc<'a, H> {
599    /// Wait for a proc to become ready, then return a capability that
600    /// provides infallible access to `addr()` and `agent_ref()`.
601    ///
602    /// This is the type-safe way to obtain the proc's address and
603    /// agent reference. After this function returns `Ok(ready)`, both
604    /// `ready.addr()` and `ready.agent_ref()` are guaranteed to
605    /// succeed.
606    pub async fn ensure(
607        handle: &'a H,
608    ) -> Result<ReadyProc<'a, H>, ReadyProcError<H::TerminalStatus>> {
609        handle.ready().await?;
610        let addr = handle.addr().ok_or(ReadyProcError::MissingAddr)?;
611        let agent_ref = handle.agent_ref().ok_or(ReadyProcError::MissingAgentRef)?;
612        Ok(ReadyProc {
613            handle,
614            addr,
615            agent_ref,
616        })
617    }
618
619    /// The proc's logical identity.
620    pub fn proc_id(&self) -> &reference::ProcId {
621        self.handle.proc_id()
622    }
623
624    /// The proc's address (guaranteed available after ready).
625    pub fn addr(&self) -> &ChannelAddr {
626        &self.addr
627    }
628
629    /// The agent actor reference (guaranteed available after ready).
630    pub fn agent_ref(&self) -> &reference::ActorRef<H::Agent> {
631        &self.agent_ref
632    }
633}
634
635/// Minimal uniform surface for a spawned-**proc** handle returned by
636/// a `ProcManager`. Each manager can return its own concrete handle,
637/// as long as it exposes these. A **proc** is the Hyperactor runtime
638/// + its actors (lifecycle controlled via `Proc` APIs such as
639/// `destroy_and_wait`). A proc **may** be hosted *inside* an OS
640/// **process**, but it is conceptually distinct:
641///
642/// - `LocalProcManager`: runs the proc **in this OS process**; there
643///   is no child process to signal. Lifecycle is entirely proc-level.
644/// - `ProcessProcManager` (test-only here): launches an **external OS
645///   process** which hosts the proc, but this toy manager does
646///   **not** wire a control plane for shutdown, nor an exit monitor.
647///
648/// This trait is therefore written in terms of the **proc**
649/// lifecycle:
650///
651/// - `ready()` resolves when the proc is Ready (mailbox bound; agent
652///   available).
653/// - `wait()` resolves with the proc's terminal status
654///   (Stopped/Killed/Failed).
655/// - `terminate()` requests a graceful shutdown of the *proc* and
656///   waits up to the deadline; managers that also own a child OS
657///   process may escalate to `SIGKILL` if the proc does not exit in
658///   time.
659/// - `kill()` requests an immediate, forced termination. For
660///    in-process procs, this may be implemented as an immediate
661///    drain/abort of actor tasks. For external procs, this is
662///    typically a `SIGKILL`.
663///
664/// The shape of the terminal value is `Self::TerminalStatus`.
665/// Managers that track rich info (exit code, signal, address, agent)
666/// can expose it; trivial managers may use `()`.
667///
668/// Managers that do not support signaling must return `Unsupported`.
669#[async_trait]
670pub trait ProcHandle: Clone + Send + Sync + 'static {
671    /// The agent actor type installed in the proc by the manager.
672    /// Must implement both:
673    /// - [`Actor`], because the agent actually runs inside the proc,
674    ///   and
675    /// - [`Referable`], so callers can hold `ActorRef<Self::Agent>`.
676    type Agent: Actor + Referable;
677
678    /// The type of terminal status produced when the proc exits.
679    ///
680    /// For example, an external proc manager may use a rich status
681    /// enum (e.g. `ProcStatus`), while an in-process manager may use
682    /// a trivial unit type. This is the value returned by
683    /// [`ProcHandle::wait`] and carried by [`ReadyError::Terminal`].
684    type TerminalStatus: std::fmt::Debug + Clone + Send + Sync + 'static;
685
686    /// The proc's logical identity on this host.
687    fn proc_id(&self) -> &reference::ProcId;
688
689    /// The proc's address (the one callers bind into the host
690    /// router). May return `None` before `ready()` completes.
691    /// Guaranteed to return `Some` after `ready()` succeeds.
692    ///
693    /// **Prefer [`ready_proc()`]** for type-safe access that
694    /// guarantees availability at compile time.
695    fn addr(&self) -> Option<ChannelAddr>;
696
697    /// The agent actor reference hosted in the proc. May return
698    /// `None` before `ready()` completes. Guaranteed to return `Some`
699    /// after `ready()` succeeds.
700    ///
701    /// **Prefer [`ready_proc()`]** for type-safe access that
702    /// guarantees availability at compile time.
703    fn agent_ref(&self) -> Option<reference::ActorRef<Self::Agent>>;
704
705    /// Resolves when the proc becomes Ready. Multi-waiter,
706    /// non-consuming.
707    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>>;
708
709    /// Resolves with the terminal status (Stopped/Killed/Failed/etc).
710    /// Multi-waiter, non-consuming.
711    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError>;
712
713    /// Politely stop the proc before the deadline; managers that own
714    /// a child OS process may escalate to a forced kill at the
715    /// deadline. Idempotent and race-safe: concurrent callers
716    /// coalesce; the first terminal outcome wins and all callers
717    /// observe it via `wait()`.
718    ///
719    /// Returns the single terminal status the proc reached (the same
720    /// value `wait()` will return). Never fabricates terminal states:
721    /// this is only returned after the exit monitor observes
722    /// termination.
723    ///
724    /// # Parameters
725    /// - `cx`: The actor context for sending messages.
726    /// - `timeout`: Grace period before escalation.
727    /// - `reason`: Human-readable reason for termination.
728    async fn terminate(
729        &self,
730        cx: &impl context::Actor,
731        timeout: Duration,
732        reason: &str,
733    ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
734
735    /// Force the proc down immediately. For in-process managers this
736    /// may abort actor tasks; for external managers this typically
737    /// sends `SIGKILL`. Also idempotent/race-safe; the terminal
738    /// outcome is the one observed by `wait()`.
739    async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
740}
741
742/// A trait describing a manager of procs, responsible for bootstrapping
743/// procs on a host, and managing their lifetimes. The manager spawns an
744/// `Agent`-typed actor on each proc, responsible for managing the proc.
745#[async_trait]
746pub trait ProcManager {
747    /// Concrete handle type this manager returns.
748    type Handle: ProcHandle;
749
750    /// Additional configuration for the proc, supported by this manager.
751    type Config = ();
752
753    /// The preferred transport for this ProcManager.
754    /// In practice this will be [`ChannelTransport::Local`]
755    /// for testing, and [`ChannelTransport::Unix`] for external
756    /// processes.
757    fn transport(&self) -> ChannelTransport;
758
759    /// Spawn a new proc with the provided proc id. The proc
760    /// should use the provided forwarder address for messages
761    /// destined outside of the proc. The returned address accepts
762    /// messages destined for the proc.
763    ///
764    /// An agent actor is also spawned, and the corresponding actor
765    /// ref is returned.
766    async fn spawn(
767        &self,
768        proc_id: reference::ProcId,
769        forwarder_addr: ChannelAddr,
770        config: Self::Config,
771    ) -> Result<Self::Handle, HostError>;
772}
773
774/// Type alias for the agent actor managed by a given [`ProcManager`].
775///
776/// This resolves to the `Agent` type exposed by the manager's
777/// associated `Handle` (via [`ProcHandle::Agent`]). It provides a
778/// convenient shorthand so call sites can refer to
779/// `ActorRef<ManagerAgent<M>>` instead of the more verbose
780/// `<M::Handle as ProcHandle>::Agent`.
781///
782/// # Example
783/// ```ignore
784/// fn takes_agent_ref<M: ProcManager>(r: ActorRef<ManagerAgent<M>>) { … }
785/// ```
786pub type ManagerAgent<M> = <<M as ProcManager>::Handle as ProcHandle>::Agent; // rust issue #112792
787
788/// Lifecycle status for procs managed by [`LocalProcManager`].
789///
790/// Used by [`LocalProcManager::request_stop`] to track background
791/// teardown progress.
792#[derive(Debug, Clone, Copy, PartialEq, Eq)]
793pub enum LocalProcStatus {
794    /// A stop has been requested but teardown is still in progress.
795    Stopping,
796    /// Teardown completed.
797    Stopped,
798}
799
800/// A ProcManager that spawns **in-process** procs (test-only).
801///
802/// The proc runs inside this same OS process; there is **no** child
803/// process to signal. Lifecycle is purely proc-level:
804/// - `terminate(timeout)`: delegates to
805///   `Proc::destroy_and_wait(timeout, None)`, which drains and, at the
806///   deadline, aborts remaining actors.
807/// - `kill()`: uses a zero deadline to emulate a forced stop via
808///   `destroy_and_wait(Duration::ZERO, None)`.
809/// - `wait()`: trivial (no external lifecycle to observe).
810///
811///   No OS signals are sent or required.
812pub struct LocalProcManager<S> {
813    procs: Arc<Mutex<HashMap<reference::ProcId, Proc>>>,
814    stopping: Arc<Mutex<HashMap<reference::ProcId, tokio::sync::watch::Sender<LocalProcStatus>>>>,
815    spawn: S,
816}
817
818impl<S> LocalProcManager<S> {
819    /// Create a new in-process proc manager with the given agent
820    /// params.
821    pub fn new(spawn: S) -> Self {
822        Self {
823            procs: Arc::new(Mutex::new(HashMap::new())),
824            stopping: Arc::new(Mutex::new(HashMap::new())),
825            spawn,
826        }
827    }
828
829    /// Non-blocking stop: remove the proc and spawn a background task
830    /// that tears it down.
831    ///
832    /// Status transitions through `Stopping` -> `Stopped` and is
833    /// observable via [`local_proc_status`] and [`watch`]. Idempotent:
834    /// no-ops if the proc is already stopping or stopped.
835    pub async fn request_stop(&self, proc: &reference::ProcId, timeout: Duration, reason: &str) {
836        {
837            let guard = self.stopping.lock().await;
838            if guard.contains_key(proc) {
839                return;
840            }
841        }
842
843        let mut proc_handle = {
844            let mut guard = self.procs.lock().await;
845            match guard.remove(proc) {
846                Some(p) => p,
847                None => return,
848            }
849        };
850
851        let proc_id = proc_handle.proc_id().clone();
852        let (tx, _) = tokio::sync::watch::channel(LocalProcStatus::Stopping);
853        self.stopping.lock().await.insert(proc_id.clone(), tx);
854
855        let stopping = Arc::clone(&self.stopping);
856        let reason = reason.to_string();
857        tokio::spawn(async move {
858            if let Err(e) = proc_handle
859                .destroy_and_wait::<()>(timeout, None, &reason)
860                .await
861            {
862                tracing::warn!(error = %e, "request_stop(local): destroy_and_wait failed");
863            }
864            if let Some(tx) = stopping.lock().await.get(&proc_id) {
865                let _ = tx.send(LocalProcStatus::Stopped);
866            }
867        });
868    }
869
870    /// Query the lifecycle status of a proc that was stopped via
871    /// [`request_stop`].
872    ///
873    /// Returns `None` if the proc was never stopped through this path.
874    pub async fn local_proc_status(&self, proc: &reference::ProcId) -> Option<LocalProcStatus> {
875        self.stopping.lock().await.get(proc).map(|tx| *tx.borrow())
876    }
877
878    /// Subscribe to lifecycle status changes for a proc that was
879    /// stopped via [`request_stop`].
880    ///
881    /// Returns `None` if the proc was never stopped through this path.
882    pub async fn watch(
883        &self,
884        proc: &reference::ProcId,
885    ) -> Option<tokio::sync::watch::Receiver<LocalProcStatus>> {
886        self.stopping
887            .lock()
888            .await
889            .get(proc)
890            .map(|tx| tx.subscribe())
891    }
892}
893
894#[async_trait]
895impl<S> BulkTerminate for LocalProcManager<S>
896where
897    S: Send + Sync,
898{
899    async fn terminate_all(
900        &self,
901        _cx: &impl context::Actor,
902        timeout: std::time::Duration,
903        max_in_flight: usize,
904        reason: &str,
905    ) -> TerminateSummary {
906        // Drain procs so we don't hold the lock across awaits and subsequent
907        // calls to terminate_all don't try to re-terminate.
908        let procs: Vec<Proc> = {
909            let mut guard = self.procs.lock().await;
910            guard.drain().map(|(_, v)| v).collect()
911        };
912
913        let attempted = procs.len();
914
915        let results = stream::iter(procs.into_iter().map(|mut p| async move {
916            // For local manager, graceful proc-level stop.
917            match p.destroy_and_wait::<()>(timeout, None, reason).await {
918                Ok(_) => true,
919                Err(e) => {
920                    tracing::warn!(error=%e, "terminate_all(local): destroy_and_wait failed");
921                    false
922                }
923            }
924        }))
925        .buffer_unordered(max_in_flight.max(1))
926        .collect::<Vec<bool>>()
927        .await;
928
929        let ok = results.into_iter().filter(|b| *b).count();
930
931        TerminateSummary {
932            attempted,
933            ok,
934            failed: attempted.saturating_sub(ok),
935        }
936    }
937}
938
939#[async_trait::async_trait]
940impl<S> SingleTerminate for LocalProcManager<S>
941where
942    S: Send + Sync,
943{
944    async fn terminate_proc(
945        &self,
946        _cx: &impl context::Actor,
947        proc: &reference::ProcId,
948        timeout: std::time::Duration,
949        reason: &str,
950    ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
951        // Snapshot procs so we don't hold the lock across awaits.
952        let procs: Option<Proc> = {
953            let mut guard = self.procs.lock().await;
954            guard.remove(proc)
955        };
956        if let Some(mut p) = procs {
957            p.destroy_and_wait::<()>(timeout, None, reason).await
958        } else {
959            Err(anyhow::anyhow!("proc {} doesn't exist", proc))
960        }
961    }
962}
963
964/// A lightweight [`ProcHandle`] for procs managed **in-process** via
965/// [`LocalProcManager`].
966///
967/// This handle wraps the minimal identifying state of a spawned proc:
968/// - its [`ProcId`] (logical identity on the host),
969/// - the proc's [`ChannelAddr`] (the address callers bind into the
970///   host router), and
971/// - the [`ActorRef`] to the agent actor hosted in the proc.
972///
973/// Unlike external handles, `LocalHandle` does **not** manage an OS
974/// child process. It provides a uniform surface (`proc_id()`,
975/// `addr()`, `agent_ref()`) and implements `terminate()`/`kill()` by
976/// calling into the underlying `Proc::destroy_and_wait`, i.e.,
977/// **proc-level** shutdown.
978///
979/// **Type parameter:** `A` is constrained by the `ProcHandle::Agent`
980/// bound (`Actor + Referable`).
981pub struct LocalHandle<A: Actor + Referable> {
982    proc_id: reference::ProcId,
983    addr: ChannelAddr,
984    agent_ref: reference::ActorRef<A>,
985    procs: Arc<Mutex<HashMap<reference::ProcId, Proc>>>,
986}
987
988// Manual `Clone` to avoid requiring `A: Clone`.
989impl<A: Actor + Referable> Clone for LocalHandle<A> {
990    fn clone(&self) -> Self {
991        Self {
992            proc_id: self.proc_id.clone(),
993            addr: self.addr.clone(),
994            agent_ref: self.agent_ref.clone(),
995            procs: Arc::clone(&self.procs),
996        }
997    }
998}
999
1000#[async_trait]
1001impl<A: Actor + Referable> ProcHandle for LocalHandle<A> {
1002    /// `Agent = A` (inherits `Actor + Referable` from the trait
1003    /// bound).
1004    type Agent = A;
1005    type TerminalStatus = ();
1006
1007    fn proc_id(&self) -> &reference::ProcId {
1008        &self.proc_id
1009    }
1010
1011    fn addr(&self) -> Option<ChannelAddr> {
1012        Some(self.addr.clone())
1013    }
1014
1015    fn agent_ref(&self) -> Option<reference::ActorRef<Self::Agent>> {
1016        Some(self.agent_ref.clone())
1017    }
1018
1019    /// Always resolves immediately: a local proc is created
1020    /// in-process and is usable as soon as the handle exists.
1021    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1022        Ok(())
1023    }
1024    /// Always resolves immediately with `()`: a local proc has no
1025    /// external lifecycle to await. There is no OS child process
1026    /// behind this handle.
1027    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1028        Ok(())
1029    }
1030
1031    async fn terminate(
1032        &self,
1033        _cx: &impl context::Actor,
1034        timeout: Duration,
1035        reason: &str,
1036    ) -> Result<(), TerminateError<Self::TerminalStatus>> {
1037        let mut proc = {
1038            let guard = self.procs.lock().await;
1039            match guard.get(self.proc_id()) {
1040                Some(p) => p.clone(),
1041                None => {
1042                    // The proc was already removed; treat as already
1043                    // terminal.
1044                    return Err(TerminateError::AlreadyTerminated(()));
1045                }
1046            }
1047        };
1048
1049        // Graceful stop of the *proc* (actors) with a deadline. This
1050        // will drain and then abort remaining actors at expiry.
1051        let _ = proc
1052            .destroy_and_wait::<()>(timeout, None, reason)
1053            .await
1054            .map_err(TerminateError::Io)?;
1055
1056        Ok(())
1057    }
1058
1059    async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
1060        // Forced stop == zero deadline; `destroy_and_wait` will
1061        // immediately abort remaining actors and return.
1062        let mut proc = {
1063            let guard = self.procs.lock().await;
1064            match guard.get(self.proc_id()) {
1065                Some(p) => p.clone(),
1066                None => return Err(TerminateError::AlreadyTerminated(())),
1067            }
1068        };
1069
1070        let _ = proc
1071            .destroy_and_wait::<()>(Duration::from_millis(0), None, "kill")
1072            .await
1073            .map_err(TerminateError::Io)?;
1074
1075        Ok(())
1076    }
1077}
1078
1079/// Local, in-process ProcManager.
1080///
1081/// **Type bounds:**
1082/// - `A: Actor + Referable + Binds<A>`
1083///   - `Actor`: the agent actually runs inside the proc.
1084///   - `Referable`: callers hold `ActorRef<A>` to the agent; this
1085///     bound is required for typed remote refs.
1086///   - `Binds<A>`: lets the runtime wire the agent's message ports.
1087/// - `F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send`:
1088///   the spawn closure returns a Send future (we `tokio::spawn` it).
1089/// - `S: Fn(Proc) -> F + Sync`: the factory can be called from
1090///   concurrent contexts.
1091///
1092/// Result handle is `LocalHandle<A>` (whose `Agent = A` via `ProcHandle`).
1093#[async_trait]
1094impl<A, S, F> ProcManager for LocalProcManager<S>
1095where
1096    A: Actor + Referable + Binds<A>,
1097    F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send,
1098    S: Fn(Proc) -> F + Sync,
1099{
1100    type Handle = LocalHandle<A>;
1101
1102    fn transport(&self) -> ChannelTransport {
1103        ChannelTransport::Local
1104    }
1105
1106    #[crate::instrument(fields(proc_id=proc_id.to_string(), addr=forwarder_addr.to_string()))]
1107    async fn spawn(
1108        &self,
1109        proc_id: reference::ProcId,
1110        forwarder_addr: ChannelAddr,
1111        _config: (),
1112    ) -> Result<Self::Handle, HostError> {
1113        let transport = forwarder_addr.transport();
1114        let proc = Proc::configured(
1115            proc_id.clone(),
1116            MailboxClient::dial(forwarder_addr)?.into_boxed(),
1117        );
1118        let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?;
1119        self.procs
1120            .lock()
1121            .await
1122            .insert(proc_id.clone(), proc.clone());
1123        let _handle = proc.clone().serve(rx);
1124        let agent_handle = (self.spawn)(proc)
1125            .await
1126            .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
1127
1128        Ok(LocalHandle {
1129            proc_id,
1130            addr: proc_addr,
1131            agent_ref: agent_handle.bind(),
1132            procs: Arc::clone(&self.procs),
1133        })
1134    }
1135}
1136
1137/// A ProcManager that manages each proc as a **separate OS process**
1138/// (test-only toy).
1139///
1140/// This implementation launches a child via `Command` and relies on
1141/// `kill_on_drop(true)` so that children are SIGKILLed if the manager
1142/// (or host) drops. There is **no** proc control plane (no RPC to a
1143/// proc agent for shutdown) and **no** exit monitor wired here.
1144/// Consequently:
1145/// - `terminate()` and `kill()` return `Unsupported`.
1146/// - `wait()` is trivial (no lifecycle observation).
1147///
1148/// It follows a simple protocol:
1149///
1150/// Each process is launched with the following environment variables:
1151/// - `HYPERACTOR_HOST_BACKEND_ADDR`: the backend address to which all messages are forwarded,
1152/// - `HYPERACTOR_HOST_PROC_ID`: the proc id to assign the launched proc, and
1153/// - `HYPERACTOR_HOST_CALLBACK_ADDR`: the channel address with which to return the proc's address
1154///
1155/// The launched proc should also spawn an actor to manage it - the details of this are
1156/// implementation dependent, and outside the scope of the process manager.
1157///
1158/// The function [`boot_proc`] provides a convenient implementation of the
1159/// protocol.
1160pub struct ProcessProcManager<A> {
1161    program: std::path::PathBuf,
1162    children: Arc<Mutex<HashMap<reference::ProcId, Child>>>,
1163    _phantom: PhantomData<A>,
1164}
1165
1166impl<A> ProcessProcManager<A> {
1167    /// Create a new ProcessProcManager that runs the provided
1168    /// command.
1169    pub fn new(program: std::path::PathBuf) -> Self {
1170        Self {
1171            program,
1172            children: Arc::new(Mutex::new(HashMap::new())),
1173            _phantom: PhantomData,
1174        }
1175    }
1176}
1177
1178impl<A> Drop for ProcessProcManager<A> {
1179    fn drop(&mut self) {
1180        // When the manager is dropped, `children` is dropped, which
1181        // drops each `Child` handle. With `kill_on_drop(true)`, the OS
1182        // will SIGKILL the processes. Nothing else to do here.
1183    }
1184}
1185
1186/// A [`ProcHandle`] implementation for procs managed as separate
1187/// OS processes via [`ProcessProcManager`].
1188///
1189/// This handle records the logical identity and connectivity of an
1190/// external child process:
1191/// - its [`ProcId`] (unique identity on the host),
1192/// - the proc's [`ChannelAddr`] (address registered in the host
1193///   router),
1194/// - and the [`ActorRef`] of the agent actor spawned inside the proc.
1195///
1196/// Unlike [`LocalHandle`], this corresponds to a real OS process
1197/// launched by the manager. In this **toy** implementation the handle
1198/// does not own/monitor the `Child` and there is no shutdown control
1199/// plane. It is a stable, clonable surface exposing the proc's
1200/// identity, address, and agent reference so host code can interact
1201/// uniformly with local/external procs. `terminate()`/`kill()` are
1202/// intentionally `Unsupported` here; process cleanup relies on
1203/// `cmd.kill_on_drop(true)` when launching the child (the OS will
1204/// SIGKILL it if the handle is dropped).
1205///
1206/// The type bound `A: Actor + Referable` comes from the
1207/// [`ProcHandle::Agent`] requirement: `Actor` because the agent
1208/// actually runs inside the proc, and `Referable` because it must
1209/// be referenceable via [`ActorRef<A>`] (i.e., safe to carry as a
1210/// typed remote reference).
1211#[derive(Debug)]
1212pub struct ProcessHandle<A: Actor + Referable> {
1213    proc_id: reference::ProcId,
1214    addr: ChannelAddr,
1215    agent_ref: reference::ActorRef<A>,
1216}
1217
1218// Manual `Clone` to avoid requiring `A: Clone`.
1219impl<A: Actor + Referable> Clone for ProcessHandle<A> {
1220    fn clone(&self) -> Self {
1221        Self {
1222            proc_id: self.proc_id.clone(),
1223            addr: self.addr.clone(),
1224            agent_ref: self.agent_ref.clone(),
1225        }
1226    }
1227}
1228
1229#[async_trait]
1230impl<A: Actor + Referable> ProcHandle for ProcessHandle<A> {
1231    /// Agent must be both an `Actor` (runs in the proc) and a
1232    /// `Referable` (so it can be referenced via `ActorRef<A>`).
1233    type Agent = A;
1234    type TerminalStatus = ();
1235
1236    fn proc_id(&self) -> &reference::ProcId {
1237        &self.proc_id
1238    }
1239
1240    fn addr(&self) -> Option<ChannelAddr> {
1241        Some(self.addr.clone())
1242    }
1243
1244    fn agent_ref(&self) -> Option<reference::ActorRef<Self::Agent>> {
1245        Some(self.agent_ref.clone())
1246    }
1247
1248    /// Resolves immediately. `ProcessProcManager::spawn` returns this
1249    /// handle only after the child has called back with (addr,
1250    /// agent), i.e. after readiness.
1251    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1252        Ok(())
1253    }
1254    /// Resolves immediately with `()`. This handle does not track
1255    /// child lifecycle; there is no watcher in this implementation.
1256    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1257        Ok(())
1258    }
1259
1260    async fn terminate(
1261        &self,
1262        _cx: &impl context::Actor,
1263        _deadline: Duration,
1264        _reason: &str,
1265    ) -> Result<(), TerminateError<Self::TerminalStatus>> {
1266        Err(TerminateError::Unsupported)
1267    }
1268
1269    async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
1270        Err(TerminateError::Unsupported)
1271    }
1272}
1273
1274#[async_trait]
1275impl<A> ProcManager for ProcessProcManager<A>
1276where
1277    // Agent actor runs in the proc (`Actor`) and must be
1278    // referenceable (`Referable`).
1279    A: Actor + Referable + Sync,
1280{
1281    type Handle = ProcessHandle<A>;
1282
1283    fn transport(&self) -> ChannelTransport {
1284        ChannelTransport::Unix
1285    }
1286
1287    #[crate::instrument(fields(proc_id=proc_id.to_string(), addr=forwarder_addr.to_string()))]
1288    async fn spawn(
1289        &self,
1290        proc_id: reference::ProcId,
1291        forwarder_addr: ChannelAddr,
1292        _config: (),
1293    ) -> Result<Self::Handle, HostError> {
1294        let (callback_addr, mut callback_rx) =
1295            channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1296
1297        let mut cmd = Command::new(&self.program);
1298        cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string());
1299        cmd.env("HYPERACTOR_HOST_BACKEND_ADDR", forwarder_addr.to_string());
1300        cmd.env("HYPERACTOR_HOST_CALLBACK_ADDR", callback_addr.to_string());
1301
1302        // Lifetime strategy: mark the child with
1303        // `kill_on_drop(true)` so the OS will send SIGKILL if the
1304        // handle is dropped and retain the `Child` in
1305        // `self.children`, tying its lifetime to the manager/host.
1306        //
1307        // This is the simplest viable policy to avoid orphaned
1308        // subprocesses in CI; more sophisticated lifecycle control
1309        // (graceful shutdown, restart) will be layered on later.
1310
1311        // Kill the child when its handle is dropped.
1312        cmd.kill_on_drop(true);
1313
1314        let child = cmd
1315            .spawn()
1316            .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
1317
1318        // Retain the handle so it lives for the life of the
1319        // manager/host.
1320        {
1321            let mut children = self.children.lock().await;
1322            children.insert(proc_id.clone(), child);
1323        }
1324
1325        // Wait for the child's callback with (addr, agent_ref)
1326        let (proc_addr, agent_ref) = callback_rx.recv().await?;
1327
1328        // TODO(production): For a non-test implementation, plumb a
1329        // shutdown path:
1330        // - expose a proc-level graceful stop RPC on the agent and
1331        //   implement `terminate(timeout)` by invoking it and, on
1332        //   deadline, call `Child::kill()`; implement `kill()` as
1333        //   immediate `Child::kill()`.
1334        // - wire an exit monitor so `wait()` resolves with a real
1335        //   terminal status.
1336        Ok(ProcessHandle {
1337            proc_id,
1338            addr: proc_addr,
1339            agent_ref,
1340        })
1341    }
1342}
1343
1344impl<A> ProcessProcManager<A>
1345where
1346    // `Actor`: runs in the proc; `Referable`: referenceable via
1347    // ActorRef; `Binds<A>`: wires ports.
1348    A: Actor + Referable + Binds<A>,
1349{
1350    /// Boot a process in a ProcessProcManager<A>. Should be called from processes spawned
1351    /// by the process manager. `boot_proc` will spawn the provided actor type (with parameters)
1352    /// onto the newly created Proc, and bind its handler. This allows the user to install an agent to
1353    /// manage the proc itself.
1354    pub async fn boot_proc<S, F>(spawn: S) -> Result<Proc, HostError>
1355    where
1356        S: FnOnce(Proc) -> F,
1357        F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1358    {
1359        let proc_id: reference::ProcId = Self::parse_env("HYPERACTOR_HOST_PROC_ID")?;
1360        let backend_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_BACKEND_ADDR")?;
1361        let callback_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_CALLBACK_ADDR")?;
1362        spawn_proc(proc_id, backend_addr, callback_addr, spawn).await
1363    }
1364
1365    fn parse_env<T, E>(key: &str) -> Result<T, HostError>
1366    where
1367        T: FromStr<Err = E>,
1368        E: Into<anyhow::Error>,
1369    {
1370        std::env::var(key)
1371            .map_err(|e| HostError::MissingParameter(key.to_string(), e))?
1372            .parse()
1373            .map_err(|e: E| HostError::InvalidParameter(key.to_string(), e.into()))
1374    }
1375}
1376
1377/// Spawn a proc at `proc_id` with an `A`-typed agent actor,
1378/// forwarding messages to the provided `backend_addr`,
1379/// and returning the proc's address and agent actor on
1380/// the provided `callback_addr`.
1381#[crate::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string(), callback_addr=callback_addr.to_string()))]
1382pub async fn spawn_proc<A, S, F>(
1383    proc_id: reference::ProcId,
1384    backend_addr: ChannelAddr,
1385    callback_addr: ChannelAddr,
1386    spawn: S,
1387) -> Result<Proc, HostError>
1388where
1389    // `Actor`: runs in the proc; `Referable`: allows ActorRef<A>;
1390    // `Binds<A>`: wires ports
1391    A: Actor + Referable + Binds<A>,
1392    S: FnOnce(Proc) -> F,
1393    F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1394{
1395    let backend_transport = backend_addr.transport();
1396    let proc = Proc::configured(
1397        proc_id.clone(),
1398        MailboxClient::dial(backend_addr)?.into_boxed(),
1399    );
1400
1401    let agent_handle = spawn(proc.clone())
1402        .await
1403        .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
1404
1405    // Finally serve the proc on the same transport as the backend address,
1406    // and call back.
1407    let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?;
1408    proc.clone().serve(proc_rx);
1409    channel::dial(callback_addr)?
1410        .send((proc_addr, agent_handle.bind::<A>()))
1411        .await
1412        .map_err(ChannelError::from)?;
1413
1414    Ok(proc)
1415}
1416
1417/// Testing support for hosts. This is linked outside of cfg(test)
1418/// as it is needed by an external binary.
1419pub mod testing {
1420    use async_trait::async_trait;
1421
1422    use crate as hyperactor;
1423    use crate::Actor;
1424    use crate::Context;
1425    use crate::Handler;
1426    use crate::reference;
1427
1428    /// Just a simple actor, available in both the bootstrap binary as well as
1429    /// hyperactor tests.
1430    #[derive(Debug, Default)]
1431    #[hyperactor::export(handlers = [reference::OncePortRef<reference::ActorId>])]
1432    pub struct EchoActor;
1433
1434    impl Actor for EchoActor {}
1435
1436    #[async_trait]
1437    impl Handler<reference::OncePortRef<reference::ActorId>> for EchoActor {
1438        async fn handle(
1439            &mut self,
1440            cx: &Context<Self>,
1441            reply: reference::OncePortRef<reference::ActorId>,
1442        ) -> Result<(), anyhow::Error> {
1443            reply.send(cx, cx.self_id().clone())?;
1444            Ok(())
1445        }
1446    }
1447}
1448
1449#[cfg(test)]
1450mod tests {
1451    use std::sync::Arc;
1452    use std::time::Duration;
1453
1454    use super::testing::EchoActor;
1455    use super::*;
1456    use crate::channel::ChannelTransport;
1457    use crate::context::Mailbox;
1458
1459    #[tokio::test]
1460    async fn test_basic() {
1461        let proc_manager =
1462            LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
1463        let procs = Arc::clone(&proc_manager.procs);
1464        let mut host = Host::new(proc_manager, ChannelAddr::any(ChannelTransport::Local))
1465            .await
1466            .unwrap();
1467
1468        let (proc_id1, _ref) = host.spawn("proc1".to_string(), ()).await.unwrap();
1469        assert_eq!(
1470            proc_id1,
1471            reference::ProcId::with_name(host.addr().clone(), "proc1")
1472        );
1473        assert!(procs.lock().await.contains_key(&proc_id1));
1474
1475        let (proc_id2, _ref) = host.spawn("proc2".to_string(), ()).await.unwrap();
1476        assert!(procs.lock().await.contains_key(&proc_id2));
1477
1478        let proc1 = procs.lock().await.get(&proc_id1).unwrap().clone();
1479        let proc2 = procs.lock().await.get(&proc_id2).unwrap().clone();
1480
1481        // Make sure they can talk to each other:
1482        let (instance1, _handle) = proc1.instance("client").unwrap();
1483        let (instance2, _handle) = proc2.instance("client").unwrap();
1484
1485        let (port, mut rx) = instance1.mailbox().open_port();
1486
1487        port.bind().send(&instance2, "hello".to_string()).unwrap();
1488        assert_eq!(rx.recv().await.unwrap(), "hello".to_string());
1489
1490        // Make sure that the system proc is also wired in correctly.
1491        let (system_actor, _handle) = host.system_proc().instance("test").unwrap();
1492
1493        // system->proc
1494        port.bind()
1495            .send(&system_actor, "hello from the system proc".to_string())
1496            .unwrap();
1497        assert_eq!(
1498            rx.recv().await.unwrap(),
1499            "hello from the system proc".to_string()
1500        );
1501
1502        // system->system
1503        let (port, mut rx) = system_actor.mailbox().open_port();
1504        port.bind()
1505            .send(&system_actor, "hello from the system".to_string())
1506            .unwrap();
1507        assert_eq!(
1508            rx.recv().await.unwrap(),
1509            "hello from the system".to_string()
1510        );
1511
1512        // proc->system
1513        port.bind()
1514            .send(&instance1, "hello from the instance1".to_string())
1515            .unwrap();
1516        assert_eq!(
1517            rx.recv().await.unwrap(),
1518            "hello from the instance1".to_string()
1519        );
1520    }
1521
1522    #[tokio::test]
1523    // TODO: OSS: called `Result::unwrap()` on an `Err` value: ReadFailed { manifest_path: "/meta-pytorch/monarch/target/debug/deps/hyperactor-0e1fe83af739d976.resources.json", source: Os { code: 2, kind: NotFound, message: "No such file or directory" } }
1524    #[cfg_attr(not(fbcode_build), ignore)]
1525    async fn test_process_proc_manager() {
1526        hyperactor_telemetry::initialize_logging(hyperactor_telemetry::DefaultTelemetryClock {});
1527
1528        // EchoActor is "host_agent" used to test connectivity.
1529        let process_manager = ProcessProcManager::<EchoActor>::new(
1530            buck_resources::get("monarch/hyperactor/bootstrap").unwrap(),
1531        );
1532        let mut host = Host::new(process_manager, ChannelAddr::any(ChannelTransport::Unix))
1533            .await
1534            .unwrap();
1535
1536        // Manually serve this: the agent isn't actually doing anything in this case,
1537        // but we are testing connectivity.
1538        host.serve();
1539
1540        // (1) Spawn and check invariants.
1541        assert!(matches!(host.addr().transport(), ChannelTransport::Unix));
1542        let (proc1, echo1) = host.spawn("proc1".to_string(), ()).await.unwrap();
1543        let (proc2, echo2) = host.spawn("proc2".to_string(), ()).await.unwrap();
1544        assert_eq!(echo1.actor_id().proc_id(), &proc1);
1545        assert_eq!(echo2.actor_id().proc_id(), &proc2);
1546
1547        // (2) Duplicate name rejection.
1548        let dup = host.spawn("proc1".to_string(), ()).await;
1549        assert!(matches!(dup, Err(HostError::ProcExists(_))));
1550
1551        // (3) Create a standalone client proc and verify echo1 agent responds.
1552        // Request: client proc -> host frontend/router -> echo1 (proc1).
1553        // Reply:   echo1 (proc1) -> host backend -> host router -> client port.
1554        // This confirms that an external proc (created via
1555        // `Proc::direct`) can address a child proc through the host,
1556        // and receive a correct reply.
1557        let client = Proc::direct(
1558            ChannelAddr::any(host.addr().transport()),
1559            "test".to_string(),
1560        )
1561        .unwrap();
1562        let (client_inst, _h) = client.instance("test").unwrap();
1563        let (port, rx) = client_inst.mailbox().open_once_port();
1564        echo1.send(&client_inst, port.bind()).unwrap();
1565        let id = tokio::time::timeout(Duration::from_secs(5), rx.recv())
1566            .await
1567            .unwrap()
1568            .unwrap();
1569        assert_eq!(id, *echo1.actor_id());
1570
1571        // (4) Child <-> external client request -> reply:
1572        // Request: client proc (standalone via `Proc::direct`) ->
1573        //          host frontend/router -> echo2 (proc2).
1574        // Reply:   echo2 (proc2) -> host backend -> host router ->
1575        //          client port (standalone proc).
1576        // This exercises cross-proc routing between a child and an
1577        // external client under the same host.
1578        let (port2, rx2) = client_inst.mailbox().open_once_port();
1579        echo2.send(&client_inst, port2.bind()).unwrap();
1580        let id2 = tokio::time::timeout(Duration::from_secs(5), rx2.recv())
1581            .await
1582            .unwrap()
1583            .unwrap();
1584        assert_eq!(id2, *echo2.actor_id());
1585
1586        // (5) System -> child request -> cross-proc reply:
1587        // Request: system proc -> host router (frontend) -> echo1
1588        //          (proc1, child).
1589        // Reply: echo1 (proc1) -> proc1 forwarder -> host backend ->
1590        //        host router -> client proc direct addr (Proc::direct) ->
1591        //        client port.
1592        // Because `client_inst` runs in its own proc, the reply
1593        // traverses the host (not local delivery within proc1).
1594        let (sys_inst, _h) = host.system_proc().instance("sys-client").unwrap();
1595        let (port3, rx3) = client_inst.mailbox().open_once_port();
1596        // Send from system -> child via a message that ultimately
1597        // replies to client's port
1598        echo1.send(&sys_inst, port3.bind()).unwrap();
1599        let id3 = tokio::time::timeout(Duration::from_secs(5), rx3.recv())
1600            .await
1601            .unwrap()
1602            .unwrap();
1603        assert_eq!(id3, *echo1.actor_id());
1604    }
1605
1606    #[tokio::test]
1607    async fn local_ready_and_wait_are_immediate() {
1608        // Build a LocalHandle directly.
1609        let addr = ChannelAddr::any(ChannelTransport::Local);
1610        let proc_id = reference::ProcId::with_name(addr.clone(), "p");
1611        let agent_ref = reference::ActorRef::<()>::attest(proc_id.actor_id("host_agent", 0));
1612        let h = LocalHandle::<()> {
1613            proc_id,
1614            addr,
1615            agent_ref,
1616            procs: Arc::new(Mutex::new(HashMap::new())),
1617        };
1618
1619        // ready() resolves immediately
1620        assert!(h.ready().await.is_ok());
1621
1622        // wait() resolves immediately with unit TerminalStatus
1623        assert!(h.wait().await.is_ok());
1624
1625        // Multiple concurrent waiters both succeed
1626        let (r1, r2) = tokio::join!(h.ready(), h.ready());
1627        assert!(r1.is_ok() && r2.is_ok());
1628    }
1629
1630    // --
1631    // Fixtures for `host::spawn` tests.
1632
1633    #[derive(Debug, Clone, Copy)]
1634    enum ReadyMode {
1635        OkAfter(Duration),
1636        ErrTerminal,
1637        ErrChannelClosed,
1638    }
1639
1640    #[derive(Debug, Clone)]
1641    struct TestHandle {
1642        id: reference::ProcId,
1643        addr: ChannelAddr,
1644        agent: reference::ActorRef<()>,
1645        mode: ReadyMode,
1646        omit_addr: bool,
1647        omit_agent: bool,
1648    }
1649
1650    #[async_trait::async_trait]
1651    impl ProcHandle for TestHandle {
1652        type Agent = ();
1653        type TerminalStatus = ();
1654
1655        fn proc_id(&self) -> &reference::ProcId {
1656            &self.id
1657        }
1658
1659        fn addr(&self) -> Option<ChannelAddr> {
1660            if self.omit_addr {
1661                None
1662            } else {
1663                Some(self.addr.clone())
1664            }
1665        }
1666
1667        fn agent_ref(&self) -> Option<reference::ActorRef<Self::Agent>> {
1668            if self.omit_agent {
1669                None
1670            } else {
1671                Some(self.agent.clone())
1672            }
1673        }
1674
1675        async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1676            match self.mode {
1677                ReadyMode::OkAfter(d) => {
1678                    if !d.is_zero() {
1679                        tokio::time::sleep(d).await;
1680                    }
1681                    Ok(())
1682                }
1683                ReadyMode::ErrTerminal => Err(ReadyError::Terminal(())),
1684                ReadyMode::ErrChannelClosed => Err(ReadyError::ChannelClosed),
1685            }
1686        }
1687        async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1688            Ok(())
1689        }
1690        async fn terminate(
1691            &self,
1692            _cx: &impl context::Actor,
1693            _timeout: Duration,
1694            _reason: &str,
1695        ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
1696            Err(TerminateError::Unsupported)
1697        }
1698        async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
1699            Err(TerminateError::Unsupported)
1700        }
1701    }
1702
1703    #[derive(Debug, Clone)]
1704    struct TestManager {
1705        mode: ReadyMode,
1706        omit_addr: bool,
1707        omit_agent: bool,
1708        transport: ChannelTransport,
1709    }
1710
1711    impl TestManager {
1712        fn local(mode: ReadyMode) -> Self {
1713            Self {
1714                mode,
1715                omit_addr: false,
1716                omit_agent: false,
1717                transport: ChannelTransport::Local,
1718            }
1719        }
1720        fn with_omissions(mut self, addr: bool, agent: bool) -> Self {
1721            self.omit_addr = addr;
1722            self.omit_agent = agent;
1723            self
1724        }
1725    }
1726
1727    #[async_trait::async_trait]
1728    impl ProcManager for TestManager {
1729        type Handle = TestHandle;
1730
1731        fn transport(&self) -> ChannelTransport {
1732            self.transport.clone()
1733        }
1734
1735        async fn spawn(
1736            &self,
1737            proc_id: reference::ProcId,
1738            forwarder_addr: ChannelAddr,
1739            _config: (),
1740        ) -> Result<Self::Handle, HostError> {
1741            let agent = reference::ActorRef::<()>::attest(proc_id.actor_id("host_agent", 0));
1742            Ok(TestHandle {
1743                id: proc_id,
1744                addr: forwarder_addr,
1745                agent,
1746                mode: self.mode,
1747                omit_addr: self.omit_addr,
1748                omit_agent: self.omit_agent,
1749            })
1750        }
1751    }
1752
1753    #[tokio::test]
1754    async fn host_spawn_times_out_when_configured() {
1755        let cfg = hyperactor_config::global::lock();
1756        let _g = cfg.override_key(
1757            crate::config::HOST_SPAWN_READY_TIMEOUT,
1758            Duration::from_millis(10),
1759        );
1760
1761        let mut host = Host::new(
1762            TestManager::local(ReadyMode::OkAfter(Duration::from_millis(50))),
1763            ChannelAddr::any(ChannelTransport::Local),
1764        )
1765        .await
1766        .unwrap();
1767
1768        let err = host.spawn("t".into(), ()).await.expect_err("must time out");
1769        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1770    }
1771
1772    #[tokio::test]
1773    async fn host_spawn_timeout_zero_disables_and_succeeds() {
1774        let cfg = hyperactor_config::global::lock();
1775        let _g = cfg.override_key(
1776            crate::config::HOST_SPAWN_READY_TIMEOUT,
1777            Duration::from_secs(0),
1778        );
1779
1780        let mut host = Host::new(
1781            TestManager::local(ReadyMode::OkAfter(Duration::from_millis(20))),
1782            ChannelAddr::any(ChannelTransport::Local),
1783        )
1784        .await
1785        .unwrap();
1786
1787        let (pid, agent) = host.spawn("ok".into(), ()).await.expect("must succeed");
1788        assert_eq!(agent.actor_id().proc_id(), &pid);
1789        assert!(host.procs.contains("ok"));
1790    }
1791
1792    #[tokio::test]
1793    async fn host_spawn_maps_channel_closed_ready_error_to_config_failure() {
1794        let mut host = Host::new(
1795            TestManager::local(ReadyMode::ErrChannelClosed),
1796            ChannelAddr::any(ChannelTransport::Local),
1797        )
1798        .await
1799        .unwrap();
1800
1801        let err = host.spawn("p".into(), ()).await.expect_err("must fail");
1802        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1803    }
1804
1805    #[tokio::test]
1806    async fn host_spawn_maps_terminal_ready_error_to_config_failure() {
1807        let mut host = Host::new(
1808            TestManager::local(ReadyMode::ErrTerminal),
1809            ChannelAddr::any(ChannelTransport::Local),
1810        )
1811        .await
1812        .unwrap();
1813
1814        let err = host.spawn("p".into(), ()).await.expect_err("must fail");
1815        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1816    }
1817
1818    #[tokio::test]
1819    async fn host_spawn_fails_if_ready_but_missing_addr() {
1820        let mut host = Host::new(
1821            TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(true, false),
1822            ChannelAddr::any(ChannelTransport::Local),
1823        )
1824        .await
1825        .unwrap();
1826
1827        let err = host
1828            .spawn("no-addr".into(), ())
1829            .await
1830            .expect_err("must fail");
1831        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1832    }
1833
1834    #[tokio::test]
1835    async fn host_spawn_fails_if_ready_but_missing_agent() {
1836        let mut host = Host::new(
1837            TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(false, true),
1838            ChannelAddr::any(ChannelTransport::Local),
1839        )
1840        .await
1841        .unwrap();
1842
1843        let err = host
1844            .spawn("no-agent".into(), ())
1845            .await
1846            .expect_err("must fail");
1847        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1848    }
1849}