hyperactor/
host.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module defines [`Host`], which represents all the procs running on a host.
10//! The procs themselves are managed by an implementation of [`ProcManager`], which may,
11//! for example, fork new processes for each proc, or spawn them in the same process
12//! for testing purposes.
13//!
14//! The primary purpose of a host is to manage the lifecycle of these procs, and to
15//! serve as a single front-end for all the procs on a host, multiplexing network
16//! channels.
17//!
18//! ## Channel muxing
19//!
20//! A [`Host`] maintains a single frontend address, through which all procs are accessible
21//! through direct addressing: the id of each proc is the `ProcId::Direct(frontend_addr, proc_name)`.
22//! In the following, the frontend address is denoted by `*`. The host listens on `*` and
23//! multiplexes messages based on the proc name. When spawning procs, the host maintains
24//! backend channels with separate addresses. In the diagram `#` is the backend address of
25//! the host, while `#n` is the backend address for proc *n*. The host forwards messages
26//! to the appropriate backend channel, while procs forward messages to the host backend
27//! channel at `#`.
28//!
29//! ```text
30//!                      ┌────────────┐
31//!                  ┌───▶  proc *,1  │
32//!                  │ #1└────────────┘
33//!                  │
34//!  ┌──────────┐    │   ┌────────────┐
35//!  │   Host   │◀───┼───▶  proc *,2  │
36//! *└──────────┘#   │ #2└────────────┘
37//!                  │
38//!                  │   ┌────────────┐
39//!                  └───▶  proc *,3  │
40//!                    #3└────────────┘
41//! ```
42
43use std::collections::HashMap;
44use std::collections::HashSet;
45use std::fmt;
46use std::marker::PhantomData;
47use std::str::FromStr;
48use std::sync::Arc;
49use std::time::Duration;
50
51use async_trait::async_trait;
52use futures::Future;
53use futures::StreamExt;
54use futures::stream;
55use tokio::process::Child;
56use tokio::process::Command;
57use tokio::sync::Mutex;
58
59use crate as hyperactor;
60use crate::Actor;
61use crate::ActorHandle;
62use crate::ActorId;
63use crate::ActorRef;
64use crate::PortHandle;
65use crate::Proc;
66use crate::ProcId;
67use crate::actor::Binds;
68use crate::actor::Referable;
69use crate::channel;
70use crate::channel::ChannelAddr;
71use crate::channel::ChannelError;
72use crate::channel::ChannelRx;
73use crate::channel::ChannelTransport;
74use crate::channel::Rx;
75use crate::channel::Tx;
76use crate::clock::Clock;
77use crate::clock::RealClock;
78use crate::context;
79use crate::mailbox::BoxableMailboxSender;
80use crate::mailbox::BoxedMailboxSender;
81use crate::mailbox::DialMailboxRouter;
82use crate::mailbox::IntoBoxedMailboxSender as _;
83use crate::mailbox::MailboxClient;
84use crate::mailbox::MailboxSender;
85use crate::mailbox::MailboxServer;
86use crate::mailbox::MailboxServerHandle;
87use crate::mailbox::MessageEnvelope;
88use crate::mailbox::Undeliverable;
89
90/// The type of error produced by host operations.
91#[derive(Debug, thiserror::Error)]
92pub enum HostError {
93    /// A channel error occurred during a host operation.
94    #[error(transparent)]
95    ChannelError(#[from] ChannelError),
96
97    /// The named proc already exists and cannot be spawned.
98    #[error("proc '{0}' already exists")]
99    ProcExists(String),
100
101    /// Failures occuring while spawning a subprocess.
102    #[error("proc '{0}' failed to spawn process: {1}")]
103    ProcessSpawnFailure(ProcId, #[source] std::io::Error),
104
105    /// Failures occuring while configuring a subprocess.
106    #[error("proc '{0}' failed to configure process: {1}")]
107    ProcessConfigurationFailure(ProcId, #[source] anyhow::Error),
108
109    /// Failures occuring while spawning a management actor in a proc.
110    #[error("failed to spawn agent on proc '{0}': {1}")]
111    AgentSpawnFailure(ProcId, #[source] anyhow::Error),
112
113    /// An input parameter was missing.
114    #[error("parameter '{0}' missing: {1}")]
115    MissingParameter(String, std::env::VarError),
116
117    /// An input parameter was invalid.
118    #[error("parameter '{0}' invalid: {1}")]
119    InvalidParameter(String, anyhow::Error),
120}
121
122/// A host, managing the lifecycle of several procs, and their backend
123/// routing, as described in this module's documentation.
124pub struct Host<M> {
125    procs: HashSet<String>,
126    frontend_addr: ChannelAddr,
127    backend_addr: ChannelAddr,
128    router: DialMailboxRouter,
129    manager: M,
130    service_proc: Proc,
131    local_proc: Proc,
132    frontend_rx: Option<ChannelRx<MessageEnvelope>>,
133}
134
135impl<M: ProcManager> Host<M> {
136    /// Serve a host using the provided ProcManager, on the provided `addr`.
137    /// On success, the host will multiplex messages for procs on the host
138    /// on the address of the host.
139    pub async fn new(manager: M, addr: ChannelAddr) -> Result<Self, HostError> {
140        Self::new_with_default(manager, addr, None).await
141    }
142
143    /// Like [`new`], serves a host using the provided ProcManager, on the provided `addr`.
144    /// Unknown destinations are forwarded to the default sender.
145    #[crate::instrument(fields(addr=addr.to_string()))]
146    pub async fn new_with_default(
147        manager: M,
148        addr: ChannelAddr,
149        default_sender: Option<BoxedMailboxSender>,
150    ) -> Result<Self, HostError> {
151        let (frontend_addr, frontend_rx) = channel::serve(addr)?;
152
153        // We set up a cascade of routers: first, the outer router supports
154        // sending to the the system proc, while the dial router manages dialed
155        // connections.
156        let router = match default_sender {
157            Some(d) => DialMailboxRouter::new_with_default(d),
158            None => DialMailboxRouter::new(),
159        };
160
161        // Establish a backend channel on the preferred transport. We currently simply
162        // serve the same router on both.
163        let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?;
164
165        // Set up a system proc. This is often used to manage the host itself.
166        let service_proc_id = ProcId::Direct(frontend_addr.clone(), "service".to_string());
167        let service_proc = Proc::new(service_proc_id.clone(), router.boxed());
168
169        let local_proc_id = ProcId::Direct(frontend_addr.clone(), "local".to_string());
170        let local_proc = Proc::new(local_proc_id.clone(), router.boxed());
171
172        tracing::info!(
173            frontend_addr = frontend_addr.to_string(),
174            backend_addr = backend_addr.to_string(),
175            service_proc_id = service_proc_id.to_string(),
176            local_proc_id = local_proc_id.to_string(),
177            "serving host"
178        );
179
180        let host = Host {
181            procs: HashSet::new(),
182            frontend_addr,
183            backend_addr,
184            router,
185            manager,
186            service_proc,
187            local_proc,
188            frontend_rx: Some(frontend_rx),
189        };
190
191        // We the same router on both frontend and backend addresses.
192        let _backend_handle = host.forwarder().serve(backend_rx);
193
194        Ok(host)
195    }
196
197    /// Start serving this host's mailbox on its frontend address.
198    /// Returns the server handle on first invocation; afterwards None.
199    pub fn serve(&mut self) -> Option<MailboxServerHandle> {
200        Some(self.forwarder().serve(self.frontend_rx.take()?))
201    }
202
203    /// The underlying proc manager.
204    pub fn manager(&self) -> &M {
205        &self.manager
206    }
207
208    /// The address which accepts messages destined for this host.
209    pub fn addr(&self) -> &ChannelAddr {
210        &self.frontend_addr
211    }
212
213    /// The system proc associated with this host.
214    /// This is used to run host-level system services like host managers.
215    pub fn system_proc(&self) -> &Proc {
216        &self.service_proc
217    }
218
219    /// The local proc associated with this host.
220    /// This is the local proc used in processes that are also hosts.
221    pub fn local_proc(&self) -> &Proc {
222        &self.local_proc
223    }
224
225    /// Spawn a new process with the given `name`. On success, the
226    /// proc has been spawned, and is reachable through the returned,
227    /// direct-addressed ProcId, which will be
228    /// `ProcId::Direct(self.addr(), name)`.
229    pub async fn spawn(
230        &mut self,
231        name: String,
232        config: M::Config,
233    ) -> Result<(ProcId, ActorRef<ManagerAgent<M>>), HostError> {
234        if self.procs.contains(&name) {
235            return Err(HostError::ProcExists(name));
236        }
237
238        let proc_id = ProcId::Direct(self.frontend_addr.clone(), name.clone());
239        let handle = self
240            .manager
241            .spawn(proc_id.clone(), self.backend_addr.clone(), config)
242            .await?;
243
244        // Await readiness (config-driven; 0s disables timeout).
245        let to: Duration = hyperactor_config::global::get(crate::config::HOST_SPAWN_READY_TIMEOUT);
246        let ready = if to == Duration::from_secs(0) {
247            ReadyProc::ensure(&handle).await
248        } else {
249            match RealClock.timeout(to, ReadyProc::ensure(&handle)).await {
250                Ok(result) => result,
251                Err(_elapsed) => Err(ReadyProcError::Timeout),
252            }
253        }
254        .map_err(|e| {
255            HostError::ProcessConfigurationFailure(proc_id.clone(), anyhow::anyhow!("{e:?}"))
256        })?;
257
258        self.router
259            .bind(proc_id.clone().into(), ready.addr().clone());
260        self.procs.insert(name);
261
262        Ok((proc_id, ready.agent_ref().clone()))
263    }
264
265    fn forwarder(&self) -> ProcOrDial {
266        ProcOrDial {
267            service_proc: self.service_proc.clone(),
268            local_proc: self.local_proc.clone(),
269            dialer: self.router.clone(),
270        }
271    }
272}
273
274/// A router used to route to the system proc, or else fall back to
275/// the dial mailbox router.
276#[derive(Clone)]
277struct ProcOrDial {
278    service_proc: Proc,
279    local_proc: Proc,
280    dialer: DialMailboxRouter,
281}
282
283impl MailboxSender for ProcOrDial {
284    fn post_unchecked(
285        &self,
286        envelope: MessageEnvelope,
287        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
288    ) {
289        if envelope.dest().actor_id().proc_id() == self.service_proc.proc_id() {
290            self.service_proc.post_unchecked(envelope, return_handle);
291        } else if envelope.dest().actor_id().proc_id() == self.local_proc.proc_id() {
292            self.local_proc.post_unchecked(envelope, return_handle);
293        } else {
294            self.dialer.post_unchecked(envelope, return_handle)
295        }
296    }
297}
298
299/// Error returned by [`ProcHandle::ready`].
300#[derive(Debug, Clone)]
301pub enum ReadyError<TerminalStatus> {
302    /// The proc reached a terminal state before becoming Ready.
303    Terminal(TerminalStatus),
304    /// Implementation lost its status channel / cannot observe state.
305    ChannelClosed,
306}
307
308/// Error returned by [`ready_proc`].
309#[derive(Debug, Clone)]
310pub enum ReadyProcError<TerminalStatus> {
311    /// Timed out waiting for ready.
312    Timeout,
313    /// The underlying `ready()` call failed.
314    Ready(ReadyError<TerminalStatus>),
315    /// The handle's `addr()` returned `None` after `ready()` succeeded.
316    MissingAddr,
317    /// The handle's `agent_ref()` returned `None` after `ready()`
318    /// succeeded.
319    MissingAgentRef,
320}
321
322impl<T> From<ReadyError<T>> for ReadyProcError<T> {
323    fn from(e: ReadyError<T>) -> Self {
324        ReadyProcError::Ready(e)
325    }
326}
327
328/// Error returned by [`ProcHandle::wait`].
329#[derive(Debug, Clone)]
330pub enum WaitError {
331    /// Implementation lost its status channel / cannot observe state.
332    ChannelClosed,
333}
334
335/// Error returned by [`ProcHandle::terminate`] and
336/// [`ProcHandle::kill`].
337///
338/// - `Unsupported`: the manager cannot perform the requested proc
339///   signaling (e.g., local/in-process manager that doesn't emulate
340///   kill).
341/// - `AlreadyTerminated(term)`: the proc was already terminal; `term`
342///   is the same value `wait()` would return.
343/// - `ChannelClosed`: the manager lost its lifecycle channel and
344///   cannot reliably observe state transitions.
345/// - `Io(err)`: manager-specific failure delivering the signal or
346///   performing shutdown (e.g., OS error on kill).
347#[derive(Debug)]
348pub enum TerminateError<TerminalStatus> {
349    /// Manager doesn't support signaling (e.g., Local manager).
350    Unsupported,
351    /// A terminal state was already reached while attempting
352    /// terminate/kill.
353    AlreadyTerminated(TerminalStatus),
354    /// Implementation lost its status channel / cannot observe state.
355    ChannelClosed,
356    /// Manager-specific failure to deliver signal or perform
357    /// shutdown.
358    Io(anyhow::Error),
359}
360
361impl<T: fmt::Debug> fmt::Display for TerminateError<T> {
362    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
363        match self {
364            TerminateError::Unsupported => write!(f, "terminate/kill unsupported by manager"),
365            TerminateError::AlreadyTerminated(st) => {
366                write!(f, "proc already terminated (status: {st:?})")
367            }
368            TerminateError::ChannelClosed => {
369                write!(f, "lifecycle channel closed; cannot observe state")
370            }
371            TerminateError::Io(err) => write!(f, "I/O error during terminate/kill: {err}"),
372        }
373    }
374}
375
376impl<T: fmt::Debug> std::error::Error for TerminateError<T> {
377    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
378        match self {
379            TerminateError::Io(err) => Some(err.root_cause()),
380            _ => None,
381        }
382    }
383}
384
385/// Summary of results from a bulk termination attempt.
386///
387/// - `attempted`: total number of child procs for which termination
388///   was attempted.
389/// - `ok`: number of procs successfully terminated (includes those
390///   that were already in a terminal state).
391/// - `failed`: number of procs that could not be terminated (e.g.
392///   signaling errors or lost lifecycle channel).
393#[derive(Debug)]
394pub struct TerminateSummary {
395    /// Total number of child procs for which termination was
396    /// attempted.
397    pub attempted: usize,
398    /// Number of procs that successfully reached a terminal state.
399    ///
400    /// This count includes both procs that exited cleanly after
401    /// `terminate(timeout)` and those that were already in a terminal
402    /// state before termination was attempted.
403    pub ok: usize,
404    /// Number of procs that failed to terminate.
405    ///
406    /// Failures typically arise from signaling errors (e.g., OS
407    /// failure to deliver SIGTERM/SIGKILL) or a lost lifecycle
408    /// channel, meaning the manager could no longer observe state
409    /// transitions.
410    pub failed: usize,
411}
412
413impl fmt::Display for TerminateSummary {
414    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415        write!(
416            f,
417            "attempted={} ok={} failed={}",
418            self.attempted, self.ok, self.failed
419        )
420    }
421}
422
423#[async_trait::async_trait]
424/// Trait for terminating a single proc.
425pub trait SingleTerminate: Send + Sync {
426    /// Gracefully terminate the given proc.
427    ///
428    /// Initiates a polite shutdown for each child, waits up to
429    /// `timeout` for completion, then escalates to a forceful stop
430    /// The returned [`TerminateSummary`] reports how
431    /// many children were attempted, succeeded, and failed.
432    ///
433    /// Implementation notes:
434    /// - "Polite shutdown" and "forceful stop" are intentionally
435    ///   abstract. Implementors should map these to whatever
436    ///   semantics they control (e.g., proc-level drain/abort, RPCs,
437    ///   OS signals).
438    /// - The operation must be idempotent and tolerate races with
439    ///   concurrent termination or external exits.
440    ///
441    /// # Parameters
442    /// - `timeout`: Per-child grace period before escalation to a
443    ///   forceful stop.
444    /// Returns a tuple of (polite shutdown actors vec, forceful stop actors vec)
445    async fn terminate_proc(
446        &self,
447        cx: &impl context::Actor,
448        proc: &ProcId,
449        timeout: std::time::Duration,
450    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error>;
451}
452
453/// Trait for managers that can terminate many child **units** in
454/// bulk.
455///
456/// Implementors provide a concurrency-bounded, graceful shutdown over
457/// all currently tracked children (polite stop → wait → forceful
458/// stop), returning a summary of outcomes. The exact stop/kill
459/// semantics are manager-specific: for example, an OS-process manager
460/// might send signals, while an in-process manager might drain/abort
461/// tasks.
462#[async_trait::async_trait]
463pub trait BulkTerminate: Send + Sync {
464    /// Gracefully terminate all known children.
465    ///
466    /// Initiates a polite shutdown for each child, waits up to
467    /// `timeout` for completion, then escalates to a forceful stop
468    /// for any that remain. Work may be done in parallel, capped by
469    /// `max_in_flight`. The returned [`TerminateSummary`] reports how
470    /// many children were attempted, succeeded, and failed.
471    ///
472    /// Implementation notes:
473    /// - "Polite shutdown" and "forceful stop" are intentionally
474    ///   abstract. Implementors should map these to whatever
475    ///   semantics they control (e.g., proc-level drain/abort, RPCs,
476    ///   OS signals).
477    /// - The operation must be idempotent and tolerate races with
478    ///   concurrent termination or external exits.
479    ///
480    /// # Parameters
481    /// - `timeout`: Per-child grace period before escalation to a
482    ///   forceful stop.
483    /// - `max_in_flight`: Upper bound on concurrent terminations (≥
484    ///   1) to prevent resource spikes (I/O, CPU, file descriptors,
485    ///   etc.).
486    async fn terminate_all(
487        &self,
488        cx: &impl context::Actor,
489        timeout: std::time::Duration,
490        max_in_flight: usize,
491    ) -> TerminateSummary;
492}
493
494// Host convenience that's available only when its manager supports
495// bulk termination.
496impl<M: ProcManager + BulkTerminate> Host<M> {
497    /// Gracefully terminate all procs spawned by this host.
498    ///
499    /// Delegates to the underlying manager’s
500    /// [`BulkTerminate::terminate_all`] implementation. Use this to
501    /// perform orderly teardown during scale-down or shutdown.
502    ///
503    /// # Parameters
504    /// - `timeout`: Per-child grace period before escalation.
505    /// - `max_in_flight`: Upper bound on concurrent terminations.
506    ///
507    /// # Returns
508    /// A [`TerminateSummary`] with counts of attempted/ok/failed
509    /// terminations.
510    pub async fn terminate_children(
511        &self,
512        cx: &impl context::Actor,
513        timeout: Duration,
514        max_in_flight: usize,
515    ) -> TerminateSummary {
516        self.manager.terminate_all(cx, timeout, max_in_flight).await
517    }
518}
519
520#[async_trait::async_trait]
521impl<M: ProcManager + SingleTerminate> SingleTerminate for Host<M> {
522    async fn terminate_proc(
523        &self,
524        cx: &impl context::Actor,
525        proc: &ProcId,
526        timeout: Duration,
527    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
528        self.manager.terminate_proc(cx, proc, timeout).await
529    }
530}
531
532/// Capability proving a proc is ready.
533///
534/// [`ReadyProc::ensure`] validates that `addr()` and `agent_ref()`
535/// are available; this type carries that proof, providing infallible
536/// accessors.
537///
538/// Obtain a `ReadyProc` by calling `ready_proc(&handle).await`.
539pub struct ReadyProc<'a, H: ProcHandle> {
540    handle: &'a H,
541    addr: ChannelAddr,
542    agent_ref: ActorRef<H::Agent>,
543}
544
545impl<'a, H: ProcHandle> ReadyProc<'a, H> {
546    /// Wait for a proc to become ready, then return a capability that
547    /// provides infallible access to `addr()` and `agent_ref()`.
548    ///
549    /// This is the type-safe way to obtain the proc's address and
550    /// agent reference. After this function returns `Ok(ready)`, both
551    /// `ready.addr()` and `ready.agent_ref()` are guaranteed to
552    /// succeed.
553    pub async fn ensure(
554        handle: &'a H,
555    ) -> Result<ReadyProc<'a, H>, ReadyProcError<H::TerminalStatus>> {
556        handle.ready().await?;
557        let addr = handle.addr().ok_or(ReadyProcError::MissingAddr)?;
558        let agent_ref = handle.agent_ref().ok_or(ReadyProcError::MissingAgentRef)?;
559        Ok(ReadyProc {
560            handle,
561            addr,
562            agent_ref,
563        })
564    }
565
566    /// The proc's logical identity.
567    pub fn proc_id(&self) -> &ProcId {
568        self.handle.proc_id()
569    }
570
571    /// The proc's address (guaranteed available after ready).
572    pub fn addr(&self) -> &ChannelAddr {
573        &self.addr
574    }
575
576    /// The agent actor reference (guaranteed available after ready).
577    pub fn agent_ref(&self) -> &ActorRef<H::Agent> {
578        &self.agent_ref
579    }
580}
581
582/// Minimal uniform surface for a spawned-**proc** handle returned by
583/// a `ProcManager`. Each manager can return its own concrete handle,
584/// as long as it exposes these. A **proc** is the Hyperactor runtime
585/// + its actors (lifecycle controlled via `Proc` APIs such as
586/// `destroy_and_wait`). A proc **may** be hosted *inside* an OS
587/// **process**, but it is conceptually distinct:
588///
589/// - `LocalProcManager`: runs the proc **in this OS process**; there
590///   is no child process to signal. Lifecycle is entirely proc-level.
591/// - `ProcessProcManager` (test-only here): launches an **external OS
592///   process** which hosts the proc, but this toy manager does
593///   **not** wire a control plane for shutdown, nor an exit monitor.
594///
595/// This trait is therefore written in terms of the **proc**
596/// lifecycle:
597///
598/// - `ready()` resolves when the proc is Ready (mailbox bound; agent
599///   available).
600/// - `wait()` resolves with the proc's terminal status
601///   (Stopped/Killed/Failed).
602/// - `terminate()` requests a graceful shutdown of the *proc* and
603///   waits up to the deadline; managers that also own a child OS
604///   process may escalate to `SIGKILL` if the proc does not exit in
605///   time.
606/// - `kill()` requests an immediate, forced termination. For
607///    in-process procs, this may be implemented as an immediate
608///    drain/abort of actor tasks. For external procs, this is
609///    typically a `SIGKILL`.
610///
611/// The shape of the terminal value is `Self::TerminalStatus`.
612/// Managers that track rich info (exit code, signal, address, agent)
613/// can expose it; trivial managers may use `()`.
614///
615/// Managers that do not support signaling must return `Unsupported`.
616#[async_trait]
617pub trait ProcHandle: Clone + Send + Sync + 'static {
618    /// The agent actor type installed in the proc by the manager.
619    /// Must implement both:
620    /// - [`Actor`], because the agent actually runs inside the proc,
621    ///   and
622    /// - [`Referable`], so callers can hold `ActorRef<Self::Agent>`.
623    type Agent: Actor + Referable;
624
625    /// The type of terminal status produced when the proc exits.
626    ///
627    /// For example, an external proc manager may use a rich status
628    /// enum (e.g. `ProcStatus`), while an in-process manager may use
629    /// a trivial unit type. This is the value returned by
630    /// [`ProcHandle::wait`] and carried by [`ReadyError::Terminal`].
631    type TerminalStatus: std::fmt::Debug + Clone + Send + Sync + 'static;
632
633    /// The proc's logical identity on this host.
634    fn proc_id(&self) -> &ProcId;
635
636    /// The proc's address (the one callers bind into the host
637    /// router). May return `None` before `ready()` completes.
638    /// Guaranteed to return `Some` after `ready()` succeeds.
639    ///
640    /// **Prefer [`ready_proc()`]** for type-safe access that
641    /// guarantees availability at compile time.
642    fn addr(&self) -> Option<ChannelAddr>;
643
644    /// The agent actor reference hosted in the proc. May return
645    /// `None` before `ready()` completes. Guaranteed to return `Some`
646    /// after `ready()` succeeds.
647    ///
648    /// **Prefer [`ready_proc()`]** for type-safe access that
649    /// guarantees availability at compile time.
650    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>>;
651
652    /// Resolves when the proc becomes Ready. Multi-waiter,
653    /// non-consuming.
654    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>>;
655
656    /// Resolves with the terminal status (Stopped/Killed/Failed/etc).
657    /// Multi-waiter, non-consuming.
658    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError>;
659
660    /// Politely stop the proc before the deadline; managers that own
661    /// a child OS process may escalate to a forced kill at the
662    /// deadline. Idempotent and race-safe: concurrent callers
663    /// coalesce; the first terminal outcome wins and all callers
664    /// observe it via `wait()`.
665    ///
666    /// Returns the single terminal status the proc reached (the same
667    /// value `wait()` will return). Never fabricates terminal states:
668    /// this is only returned after the exit monitor observes
669    /// termination.
670    async fn terminate(
671        &self,
672        cx: &impl context::Actor,
673        timeout: Duration,
674    ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
675
676    /// Force the proc down immediately. For in-process managers this
677    /// may abort actor tasks; for external managers this typically
678    /// sends `SIGKILL`. Also idempotent/race-safe; the terminal
679    /// outcome is the one observed by `wait()`.
680    async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
681}
682
683/// A trait describing a manager of procs, responsible for bootstrapping
684/// procs on a host, and managing their lifetimes. The manager spawns an
685/// `Agent`-typed actor on each proc, responsible for managing the proc.
686#[async_trait]
687pub trait ProcManager {
688    /// Concrete handle type this manager returns.
689    type Handle: ProcHandle;
690
691    /// Additional configuration for the proc, supported by this manager.
692    type Config = ();
693
694    /// The preferred transport for this ProcManager.
695    /// In practice this will be [`ChannelTransport::Local`]
696    /// for testing, and [`ChannelTransport::Unix`] for external
697    /// processes.
698    fn transport(&self) -> ChannelTransport;
699
700    /// Spawn a new proc with the provided proc id. The proc
701    /// should use the provided forwarder address for messages
702    /// destined outside of the proc. The returned address accepts
703    /// messages destined for the proc.
704    ///
705    /// An agent actor is also spawned, and the corresponding actor
706    /// ref is returned.
707    async fn spawn(
708        &self,
709        proc_id: ProcId,
710        forwarder_addr: ChannelAddr,
711        config: Self::Config,
712    ) -> Result<Self::Handle, HostError>;
713}
714
715/// Type alias for the agent actor managed by a given [`ProcManager`].
716///
717/// This resolves to the `Agent` type exposed by the manager's
718/// associated `Handle` (via [`ProcHandle::Agent`]). It provides a
719/// convenient shorthand so call sites can refer to
720/// `ActorRef<ManagerAgent<M>>` instead of the more verbose
721/// `<M::Handle as ProcHandle>::Agent`.
722///
723/// # Example
724/// ```ignore
725/// fn takes_agent_ref<M: ProcManager>(r: ActorRef<ManagerAgent<M>>) { … }
726/// ```
727pub type ManagerAgent<M> = <<M as ProcManager>::Handle as ProcHandle>::Agent; // rust issue #112792
728
729/// A ProcManager that spawns **in-process** procs (test-only).
730///
731/// The proc runs inside this same OS process; there is **no** child
732/// process to signal. Lifecycle is purely proc-level:
733/// - `terminate(timeout)`: delegates to
734///   `Proc::destroy_and_wait(timeout, None)`, which drains and, at the
735///   deadline, aborts remaining actors.
736/// - `kill()`: uses a zero deadline to emulate a forced stop via
737///   `destroy_and_wait(Duration::ZERO, None)`.
738/// - `wait()`: trivial (no external lifecycle to observe).
739///
740///   No OS signals are sent or required.
741pub struct LocalProcManager<S> {
742    procs: Arc<Mutex<HashMap<ProcId, Proc>>>,
743    spawn: S,
744}
745
746impl<S> LocalProcManager<S> {
747    /// Create a new in-process proc manager with the given agent
748    /// params.
749    pub fn new(spawn: S) -> Self {
750        Self {
751            procs: Arc::new(Mutex::new(HashMap::new())),
752            spawn,
753        }
754    }
755}
756
757#[async_trait]
758impl<S> BulkTerminate for LocalProcManager<S>
759where
760    S: Send + Sync,
761{
762    async fn terminate_all(
763        &self,
764        _cx: &impl context::Actor,
765        timeout: std::time::Duration,
766        max_in_flight: usize,
767    ) -> TerminateSummary {
768        // Snapshot procs so we don't hold the lock across awaits.
769        let procs: Vec<Proc> = {
770            let guard = self.procs.lock().await;
771            guard.values().cloned().collect()
772        };
773
774        let attempted = procs.len();
775
776        let results = stream::iter(procs.into_iter().map(|mut p| async move {
777            // For local manager, graceful proc-level stop.
778            match p.destroy_and_wait::<()>(timeout, None).await {
779                Ok(_) => true,
780                Err(e) => {
781                    tracing::warn!(error=%e, "terminate_all(local): destroy_and_wait failed");
782                    false
783                }
784            }
785        }))
786        .buffer_unordered(max_in_flight.max(1))
787        .collect::<Vec<bool>>()
788        .await;
789
790        let ok = results.into_iter().filter(|b| *b).count();
791
792        TerminateSummary {
793            attempted,
794            ok,
795            failed: attempted.saturating_sub(ok),
796        }
797    }
798}
799
800#[async_trait::async_trait]
801impl<S> SingleTerminate for LocalProcManager<S>
802where
803    S: Send + Sync,
804{
805    async fn terminate_proc(
806        &self,
807        _cx: &impl context::Actor,
808        proc: &ProcId,
809        timeout: std::time::Duration,
810    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
811        // Snapshot procs so we don't hold the lock across awaits.
812        let procs: Option<Proc> = {
813            let mut guard = self.procs.lock().await;
814            guard.remove(proc)
815        };
816        if let Some(mut p) = procs {
817            p.destroy_and_wait::<()>(timeout, None).await
818        } else {
819            Err(anyhow::anyhow!("proc {} doesn't exist", proc))
820        }
821    }
822}
823
824/// A lightweight [`ProcHandle`] for procs managed **in-process** via
825/// [`LocalProcManager`].
826///
827/// This handle wraps the minimal identifying state of a spawned proc:
828/// - its [`ProcId`] (logical identity on the host),
829/// - the proc's [`ChannelAddr`] (the address callers bind into the
830///   host router), and
831/// - the [`ActorRef`] to the agent actor hosted in the proc.
832///
833/// Unlike external handles, `LocalHandle` does **not** manage an OS
834/// child process. It provides a uniform surface (`proc_id()`,
835/// `addr()`, `agent_ref()`) and implements `terminate()`/`kill()` by
836/// calling into the underlying `Proc::destroy_and_wait`, i.e.,
837/// **proc-level** shutdown.
838///
839/// **Type parameter:** `A` is constrained by the `ProcHandle::Agent`
840/// bound (`Actor + Referable`).
841pub struct LocalHandle<A: Actor + Referable> {
842    proc_id: ProcId,
843    addr: ChannelAddr,
844    agent_ref: ActorRef<A>,
845    procs: Arc<Mutex<HashMap<ProcId, Proc>>>,
846}
847
848// Manual `Clone` to avoid requiring `A: Clone`.
849impl<A: Actor + Referable> Clone for LocalHandle<A> {
850    fn clone(&self) -> Self {
851        Self {
852            proc_id: self.proc_id.clone(),
853            addr: self.addr.clone(),
854            agent_ref: self.agent_ref.clone(),
855            procs: Arc::clone(&self.procs),
856        }
857    }
858}
859
860#[async_trait]
861impl<A: Actor + Referable> ProcHandle for LocalHandle<A> {
862    /// `Agent = A` (inherits `Actor + Referable` from the trait
863    /// bound).
864    type Agent = A;
865    type TerminalStatus = ();
866
867    fn proc_id(&self) -> &ProcId {
868        &self.proc_id
869    }
870
871    fn addr(&self) -> Option<ChannelAddr> {
872        Some(self.addr.clone())
873    }
874
875    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
876        Some(self.agent_ref.clone())
877    }
878
879    /// Always resolves immediately: a local proc is created
880    /// in-process and is usable as soon as the handle exists.
881    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
882        Ok(())
883    }
884    /// Always resolves immediately with `()`: a local proc has no
885    /// external lifecycle to await. There is no OS child process
886    /// behind this handle.
887    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
888        Ok(())
889    }
890
891    async fn terminate(
892        &self,
893        _cx: &impl context::Actor,
894        timeout: Duration,
895    ) -> Result<(), TerminateError<Self::TerminalStatus>> {
896        let mut proc = {
897            let guard = self.procs.lock().await;
898            match guard.get(self.proc_id()) {
899                Some(p) => p.clone(),
900                None => {
901                    // The proc was already removed; treat as already
902                    // terminal.
903                    return Err(TerminateError::AlreadyTerminated(()));
904                }
905            }
906        };
907
908        // Graceful stop of the *proc* (actors) with a deadline. This
909        // will drain and then abort remaining actors at expiry.
910        let _ = proc
911            .destroy_and_wait::<()>(timeout, None)
912            .await
913            .map_err(TerminateError::Io)?;
914
915        Ok(())
916    }
917
918    async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
919        // Forced stop == zero deadline; `destroy_and_wait` will
920        // immediately abort remaining actors and return.
921        let mut proc = {
922            let guard = self.procs.lock().await;
923            match guard.get(self.proc_id()) {
924                Some(p) => p.clone(),
925                None => return Err(TerminateError::AlreadyTerminated(())),
926            }
927        };
928
929        let _ = proc
930            .destroy_and_wait::<()>(Duration::from_millis(0), None)
931            .await
932            .map_err(TerminateError::Io)?;
933
934        Ok(())
935    }
936}
937
938/// Local, in-process ProcManager.
939///
940/// **Type bounds:**
941/// - `A: Actor + Referable + Binds<A>`
942///   - `Actor`: the agent actually runs inside the proc.
943///   - `Referable`: callers hold `ActorRef<A>` to the agent; this
944///     bound is required for typed remote refs.
945///   - `Binds<A>`: lets the runtime wire the agent's message ports.
946/// - `F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send`:
947///   the spawn closure returns a Send future (we `tokio::spawn` it).
948/// - `S: Fn(Proc) -> F + Sync`: the factory can be called from
949///   concurrent contexts.
950///
951/// Result handle is `LocalHandle<A>` (whose `Agent = A` via `ProcHandle`).
952#[async_trait]
953impl<A, S, F> ProcManager for LocalProcManager<S>
954where
955    A: Actor + Referable + Binds<A>,
956    F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send,
957    S: Fn(Proc) -> F + Sync,
958{
959    type Handle = LocalHandle<A>;
960
961    fn transport(&self) -> ChannelTransport {
962        ChannelTransport::Local
963    }
964
965    #[crate::instrument(fields(proc_id=proc_id.to_string(), addr=forwarder_addr.to_string()))]
966    async fn spawn(
967        &self,
968        proc_id: ProcId,
969        forwarder_addr: ChannelAddr,
970        _config: (),
971    ) -> Result<Self::Handle, HostError> {
972        let transport = forwarder_addr.transport();
973        let proc = Proc::new(
974            proc_id.clone(),
975            MailboxClient::dial(forwarder_addr)?.into_boxed(),
976        );
977        let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?;
978        self.procs
979            .lock()
980            .await
981            .insert(proc_id.clone(), proc.clone());
982        let _handle = proc.clone().serve(rx);
983        let agent_handle = (self.spawn)(proc)
984            .await
985            .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
986
987        Ok(LocalHandle {
988            proc_id,
989            addr: proc_addr,
990            agent_ref: agent_handle.bind(),
991            procs: Arc::clone(&self.procs),
992        })
993    }
994}
995
996/// A ProcManager that manages each proc as a **separate OS process**
997/// (test-only toy).
998///
999/// This implementation launches a child via `Command` and relies on
1000/// `kill_on_drop(true)` so that children are SIGKILLed if the manager
1001/// (or host) drops. There is **no** proc control plane (no RPC to a
1002/// proc agent for shutdown) and **no** exit monitor wired here.
1003/// Consequently:
1004/// - `terminate()` and `kill()` return `Unsupported`.
1005/// - `wait()` is trivial (no lifecycle observation).
1006///
1007/// It follows a simple protocol:
1008///
1009/// Each process is launched with the following environment variables:
1010/// - `HYPERACTOR_HOST_BACKEND_ADDR`: the backend address to which all messages are forwarded,
1011/// - `HYPERACTOR_HOST_PROC_ID`: the proc id to assign the launched proc, and
1012/// - `HYPERACTOR_HOST_CALLBACK_ADDR`: the channel address with which to return the proc's address
1013///
1014/// The launched proc should also spawn an actor to manage it - the details of this are
1015/// implementation dependent, and outside the scope of the process manager.
1016///
1017/// The function [`boot_proc`] provides a convenient implementation of the
1018/// protocol.
1019pub struct ProcessProcManager<A> {
1020    program: std::path::PathBuf,
1021    children: Arc<Mutex<HashMap<ProcId, Child>>>,
1022    _phantom: PhantomData<A>,
1023}
1024
1025impl<A> ProcessProcManager<A> {
1026    /// Create a new ProcessProcManager that runs the provided
1027    /// command.
1028    pub fn new(program: std::path::PathBuf) -> Self {
1029        Self {
1030            program,
1031            children: Arc::new(Mutex::new(HashMap::new())),
1032            _phantom: PhantomData,
1033        }
1034    }
1035}
1036
1037impl<A> Drop for ProcessProcManager<A> {
1038    fn drop(&mut self) {
1039        // When the manager is dropped, `children` is dropped, which
1040        // drops each `Child` handle. With `kill_on_drop(true)`, the OS
1041        // will SIGKILL the processes. Nothing else to do here.
1042    }
1043}
1044
1045/// A [`ProcHandle`] implementation for procs managed as separate
1046/// OS processes via [`ProcessProcManager`].
1047///
1048/// This handle records the logical identity and connectivity of an
1049/// external child process:
1050/// - its [`ProcId`] (unique identity on the host),
1051/// - the proc's [`ChannelAddr`] (address registered in the host
1052///   router),
1053/// - and the [`ActorRef`] of the agent actor spawned inside the proc.
1054///
1055/// Unlike [`LocalHandle`], this corresponds to a real OS process
1056/// launched by the manager. In this **toy** implementation the handle
1057/// does not own/monitor the `Child` and there is no shutdown control
1058/// plane. It is a stable, clonable surface exposing the proc's
1059/// identity, address, and agent reference so host code can interact
1060/// uniformly with local/external procs. `terminate()`/`kill()` are
1061/// intentionally `Unsupported` here; process cleanup relies on
1062/// `cmd.kill_on_drop(true)` when launching the child (the OS will
1063/// SIGKILL it if the handle is dropped).
1064///
1065/// The type bound `A: Actor + Referable` comes from the
1066/// [`ProcHandle::Agent`] requirement: `Actor` because the agent
1067/// actually runs inside the proc, and `Referable` because it must
1068/// be referenceable via [`ActorRef<A>`] (i.e., safe to carry as a
1069/// typed remote reference).
1070#[derive(Debug)]
1071pub struct ProcessHandle<A: Actor + Referable> {
1072    proc_id: ProcId,
1073    addr: ChannelAddr,
1074    agent_ref: ActorRef<A>,
1075}
1076
1077// Manual `Clone` to avoid requiring `A: Clone`.
1078impl<A: Actor + Referable> Clone for ProcessHandle<A> {
1079    fn clone(&self) -> Self {
1080        Self {
1081            proc_id: self.proc_id.clone(),
1082            addr: self.addr.clone(),
1083            agent_ref: self.agent_ref.clone(),
1084        }
1085    }
1086}
1087
1088#[async_trait]
1089impl<A: Actor + Referable> ProcHandle for ProcessHandle<A> {
1090    /// Agent must be both an `Actor` (runs in the proc) and a
1091    /// `Referable` (so it can be referenced via `ActorRef<A>`).
1092    type Agent = A;
1093    type TerminalStatus = ();
1094
1095    fn proc_id(&self) -> &ProcId {
1096        &self.proc_id
1097    }
1098
1099    fn addr(&self) -> Option<ChannelAddr> {
1100        Some(self.addr.clone())
1101    }
1102
1103    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1104        Some(self.agent_ref.clone())
1105    }
1106
1107    /// Resolves immediately. `ProcessProcManager::spawn` returns this
1108    /// handle only after the child has called back with (addr,
1109    /// agent), i.e. after readiness.
1110    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1111        Ok(())
1112    }
1113    /// Resolves immediately with `()`. This handle does not track
1114    /// child lifecycle; there is no watcher in this implementation.
1115    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1116        Ok(())
1117    }
1118
1119    async fn terminate(
1120        &self,
1121        _cx: &impl context::Actor,
1122        _deadline: Duration,
1123    ) -> Result<(), TerminateError<Self::TerminalStatus>> {
1124        Err(TerminateError::Unsupported)
1125    }
1126
1127    async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
1128        Err(TerminateError::Unsupported)
1129    }
1130}
1131
1132#[async_trait]
1133impl<A> ProcManager for ProcessProcManager<A>
1134where
1135    // Agent actor runs in the proc (`Actor`) and must be
1136    // referenceable (`Referable`).
1137    A: Actor + Referable + Sync,
1138{
1139    type Handle = ProcessHandle<A>;
1140
1141    fn transport(&self) -> ChannelTransport {
1142        ChannelTransport::Unix
1143    }
1144
1145    #[crate::instrument(fields(proc_id=proc_id.to_string(), addr=forwarder_addr.to_string()))]
1146    async fn spawn(
1147        &self,
1148        proc_id: ProcId,
1149        forwarder_addr: ChannelAddr,
1150        _config: (),
1151    ) -> Result<Self::Handle, HostError> {
1152        let (callback_addr, mut callback_rx) =
1153            channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1154
1155        let mut cmd = Command::new(&self.program);
1156        cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string());
1157        cmd.env("HYPERACTOR_HOST_BACKEND_ADDR", forwarder_addr.to_string());
1158        cmd.env("HYPERACTOR_HOST_CALLBACK_ADDR", callback_addr.to_string());
1159
1160        // Lifetime strategy: mark the child with
1161        // `kill_on_drop(true)` so the OS will send SIGKILL if the
1162        // handle is dropped and retain the `Child` in
1163        // `self.children`, tying its lifetime to the manager/host.
1164        //
1165        // This is the simplest viable policy to avoid orphaned
1166        // subprocesses in CI; more sophisticated lifecycle control
1167        // (graceful shutdown, restart) will be layered on later.
1168
1169        // Kill the child when its handle is dropped.
1170        cmd.kill_on_drop(true);
1171
1172        let child = cmd
1173            .spawn()
1174            .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
1175
1176        // Retain the handle so it lives for the life of the
1177        // manager/host.
1178        {
1179            let mut children = self.children.lock().await;
1180            children.insert(proc_id.clone(), child);
1181        }
1182
1183        // Wait for the child's callback with (addr, agent_ref)
1184        let (proc_addr, agent_ref) = callback_rx.recv().await?;
1185
1186        // TODO(production): For a non-test implementation, plumb a
1187        // shutdown path:
1188        // - expose a proc-level graceful stop RPC on the agent and
1189        //   implement `terminate(timeout)` by invoking it and, on
1190        //   deadline, call `Child::kill()`; implement `kill()` as
1191        //   immediate `Child::kill()`.
1192        // - wire an exit monitor so `wait()` resolves with a real
1193        //   terminal status.
1194        Ok(ProcessHandle {
1195            proc_id,
1196            addr: proc_addr,
1197            agent_ref,
1198        })
1199    }
1200}
1201
1202impl<A> ProcessProcManager<A>
1203where
1204    // `Actor`: runs in the proc; `Referable`: referenceable via
1205    // ActorRef; `Binds<A>`: wires ports.
1206    A: Actor + Referable + Binds<A>,
1207{
1208    /// Boot a process in a ProcessProcManager<A>. Should be called from processes spawned
1209    /// by the process manager. `boot_proc` will spawn the provided actor type (with parameters)
1210    /// onto the newly created Proc, and bind its handler. This allows the user to install an agent to
1211    /// manage the proc itself.
1212    pub async fn boot_proc<S, F>(spawn: S) -> Result<Proc, HostError>
1213    where
1214        S: FnOnce(Proc) -> F,
1215        F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1216    {
1217        let proc_id: ProcId = Self::parse_env("HYPERACTOR_HOST_PROC_ID")?;
1218        let backend_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_BACKEND_ADDR")?;
1219        let callback_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_CALLBACK_ADDR")?;
1220        spawn_proc(proc_id, backend_addr, callback_addr, spawn).await
1221    }
1222
1223    fn parse_env<T, E>(key: &str) -> Result<T, HostError>
1224    where
1225        T: FromStr<Err = E>,
1226        E: Into<anyhow::Error>,
1227    {
1228        std::env::var(key)
1229            .map_err(|e| HostError::MissingParameter(key.to_string(), e))?
1230            .parse()
1231            .map_err(|e: E| HostError::InvalidParameter(key.to_string(), e.into()))
1232    }
1233}
1234
1235/// Spawn a proc at `proc_id` with an `A`-typed agent actor,
1236/// forwarding messages to the provided `backend_addr`,
1237/// and returning the proc's address and agent actor on
1238/// the provided `callback_addr`.
1239#[crate::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string(), callback_addr=callback_addr.to_string()))]
1240pub async fn spawn_proc<A, S, F>(
1241    proc_id: ProcId,
1242    backend_addr: ChannelAddr,
1243    callback_addr: ChannelAddr,
1244    spawn: S,
1245) -> Result<Proc, HostError>
1246where
1247    // `Actor`: runs in the proc; `Referable`: allows ActorRef<A>;
1248    // `Binds<A>`: wires ports
1249    A: Actor + Referable + Binds<A>,
1250    S: FnOnce(Proc) -> F,
1251    F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1252{
1253    let backend_transport = backend_addr.transport();
1254    let proc = Proc::new(
1255        proc_id.clone(),
1256        MailboxClient::dial(backend_addr)?.into_boxed(),
1257    );
1258
1259    let agent_handle = spawn(proc.clone())
1260        .await
1261        .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
1262
1263    // Finally serve the proc on the same transport as the backend address,
1264    // and call back.
1265    let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?;
1266    proc.clone().serve(proc_rx);
1267    channel::dial(callback_addr)?
1268        .send((proc_addr, agent_handle.bind::<A>()))
1269        .await
1270        .map_err(ChannelError::from)?;
1271
1272    Ok(proc)
1273}
1274
1275/// Testing support for hosts. This is linked outside of cfg(test)
1276/// as it is needed by an external binary.
1277pub mod testing {
1278    use async_trait::async_trait;
1279
1280    use crate as hyperactor;
1281    use crate::Actor;
1282    use crate::ActorId;
1283    use crate::Context;
1284    use crate::Handler;
1285    use crate::OncePortRef;
1286
1287    /// Just a simple actor, available in both the bootstrap binary as well as
1288    /// hyperactor tests.
1289    #[derive(Debug, Default)]
1290    #[hyperactor::export(handlers = [OncePortRef<ActorId>])]
1291    pub struct EchoActor;
1292
1293    impl Actor for EchoActor {}
1294
1295    #[async_trait]
1296    impl Handler<OncePortRef<ActorId>> for EchoActor {
1297        async fn handle(
1298            &mut self,
1299            cx: &Context<Self>,
1300            reply: OncePortRef<ActorId>,
1301        ) -> Result<(), anyhow::Error> {
1302            reply.send(cx, cx.self_id().clone())?;
1303            Ok(())
1304        }
1305    }
1306}
1307
1308#[cfg(test)]
1309mod tests {
1310    use std::sync::Arc;
1311    use std::time::Duration;
1312
1313    use super::testing::EchoActor;
1314    use super::*;
1315    use crate::channel::ChannelTransport;
1316    use crate::clock::Clock;
1317    use crate::clock::RealClock;
1318    use crate::context::Mailbox;
1319
1320    #[tokio::test]
1321    async fn test_basic() {
1322        let proc_manager =
1323            LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()) });
1324        let procs = Arc::clone(&proc_manager.procs);
1325        let mut host = Host::new(proc_manager, ChannelAddr::any(ChannelTransport::Local))
1326            .await
1327            .unwrap();
1328
1329        let (proc_id1, _ref) = host.spawn("proc1".to_string(), ()).await.unwrap();
1330        assert_eq!(
1331            proc_id1,
1332            ProcId::Direct(host.addr().clone(), "proc1".to_string())
1333        );
1334        assert!(procs.lock().await.contains_key(&proc_id1));
1335
1336        let (proc_id2, _ref) = host.spawn("proc2".to_string(), ()).await.unwrap();
1337        assert!(procs.lock().await.contains_key(&proc_id2));
1338
1339        let proc1 = procs.lock().await.get(&proc_id1).unwrap().clone();
1340        let proc2 = procs.lock().await.get(&proc_id2).unwrap().clone();
1341
1342        // Make sure they can talk to each other:
1343        let (instance1, _handle) = proc1.instance("client").unwrap();
1344        let (instance2, _handle) = proc2.instance("client").unwrap();
1345
1346        let (port, mut rx) = instance1.mailbox().open_port();
1347
1348        port.bind().send(&instance2, "hello".to_string()).unwrap();
1349        assert_eq!(rx.recv().await.unwrap(), "hello".to_string());
1350
1351        // Make sure that the system proc is also wired in correctly.
1352        let (system_actor, _handle) = host.system_proc().instance("test").unwrap();
1353
1354        // system->proc
1355        port.bind()
1356            .send(&system_actor, "hello from the system proc".to_string())
1357            .unwrap();
1358        assert_eq!(
1359            rx.recv().await.unwrap(),
1360            "hello from the system proc".to_string()
1361        );
1362
1363        // system->system
1364        let (port, mut rx) = system_actor.mailbox().open_port();
1365        port.bind()
1366            .send(&system_actor, "hello from the system".to_string())
1367            .unwrap();
1368        assert_eq!(
1369            rx.recv().await.unwrap(),
1370            "hello from the system".to_string()
1371        );
1372
1373        // proc->system
1374        port.bind()
1375            .send(&instance1, "hello from the instance1".to_string())
1376            .unwrap();
1377        assert_eq!(
1378            rx.recv().await.unwrap(),
1379            "hello from the instance1".to_string()
1380        );
1381    }
1382
1383    #[tokio::test]
1384    // TODO: OSS: called `Result::unwrap()` on an `Err` value: ReadFailed { manifest_path: "/meta-pytorch/monarch/target/debug/deps/hyperactor-0e1fe83af739d976.resources.json", source: Os { code: 2, kind: NotFound, message: "No such file or directory" } }
1385    #[cfg_attr(not(fbcode_build), ignore)]
1386    async fn test_process_proc_manager() {
1387        hyperactor_telemetry::initialize_logging(crate::clock::ClockKind::default());
1388
1389        // EchoActor is "agent" used to test connectivity.
1390        let process_manager = ProcessProcManager::<EchoActor>::new(
1391            buck_resources::get("monarch/hyperactor/bootstrap").unwrap(),
1392        );
1393        let mut host = Host::new(process_manager, ChannelAddr::any(ChannelTransport::Unix))
1394            .await
1395            .unwrap();
1396
1397        // Manually serve this: the agent isn't actually doing anything in this case,
1398        // but we are testing connectivity.
1399        host.serve();
1400
1401        // (1) Spawn and check invariants.
1402        assert!(matches!(host.addr().transport(), ChannelTransport::Unix));
1403        let (proc1, echo1) = host.spawn("proc1".to_string(), ()).await.unwrap();
1404        let (proc2, echo2) = host.spawn("proc2".to_string(), ()).await.unwrap();
1405        assert_eq!(echo1.actor_id().proc_id(), &proc1);
1406        assert_eq!(echo2.actor_id().proc_id(), &proc2);
1407
1408        // (2) Duplicate name rejection.
1409        let dup = host.spawn("proc1".to_string(), ()).await;
1410        assert!(matches!(dup, Err(HostError::ProcExists(_))));
1411
1412        // (3) Create a standalone client proc and verify echo1 agent responds.
1413        // Request: client proc -> host frontend/router -> echo1 (proc1).
1414        // Reply:   echo1 (proc1) -> host backend -> host router -> client port.
1415        // This confirms that an external proc (created via
1416        // `Proc::direct`) can address a child proc through the host,
1417        // and receive a correct reply.
1418        let client = Proc::direct(
1419            ChannelAddr::any(host.addr().transport()),
1420            "test".to_string(),
1421        )
1422        .unwrap();
1423        let (client_inst, _h) = client.instance("test").unwrap();
1424        let (port, rx) = client_inst.mailbox().open_once_port();
1425        echo1.send(&client_inst, port.bind()).unwrap();
1426        let id = RealClock
1427            .timeout(Duration::from_secs(5), rx.recv())
1428            .await
1429            .unwrap()
1430            .unwrap();
1431        assert_eq!(id, *echo1.actor_id());
1432
1433        // (4) Child <-> external client request -> reply:
1434        // Request: client proc (standalone via `Proc::direct`) ->
1435        //          host frontend/router -> echo2 (proc2).
1436        // Reply:   echo2 (proc2) -> host backend -> host router ->
1437        //          client port (standalone proc).
1438        // This exercises cross-proc routing between a child and an
1439        // external client under the same host.
1440        let (port2, rx2) = client_inst.mailbox().open_once_port();
1441        echo2.send(&client_inst, port2.bind()).unwrap();
1442        let id2 = RealClock
1443            .timeout(Duration::from_secs(5), rx2.recv())
1444            .await
1445            .unwrap()
1446            .unwrap();
1447        assert_eq!(id2, *echo2.actor_id());
1448
1449        // (5) System -> child request -> cross-proc reply:
1450        // Request: system proc -> host router (frontend) -> echo1
1451        //          (proc1, child).
1452        // Reply: echo1 (proc1) -> proc1 forwarder -> host backend ->
1453        //        host router -> client proc direct addr (Proc::direct) ->
1454        //        client port.
1455        // Because `client_inst` runs in its own proc, the reply
1456        // traverses the host (not local delivery within proc1).
1457        let (sys_inst, _h) = host.system_proc().instance("sys-client").unwrap();
1458        let (port3, rx3) = client_inst.mailbox().open_once_port();
1459        // Send from system -> child via a message that ultimately
1460        // replies to client's port
1461        echo1.send(&sys_inst, port3.bind()).unwrap();
1462        let id3 = RealClock
1463            .timeout(Duration::from_secs(5), rx3.recv())
1464            .await
1465            .unwrap()
1466            .unwrap();
1467        assert_eq!(id3, *echo1.actor_id());
1468    }
1469
1470    #[tokio::test]
1471    async fn local_ready_and_wait_are_immediate() {
1472        // Build a LocalHandle directly.
1473        let addr = ChannelAddr::any(ChannelTransport::Local);
1474        let proc_id = ProcId::Direct(addr.clone(), "p".into());
1475        let agent_ref = ActorRef::<()>::attest(proc_id.actor_id("agent", 0));
1476        let h = LocalHandle::<()> {
1477            proc_id,
1478            addr,
1479            agent_ref,
1480            procs: Arc::new(Mutex::new(HashMap::new())),
1481        };
1482
1483        // ready() resolves immediately
1484        assert!(h.ready().await.is_ok());
1485
1486        // wait() resolves immediately with unit TerminalStatus
1487        assert!(h.wait().await.is_ok());
1488
1489        // Multiple concurrent waiters both succeed
1490        let (r1, r2) = tokio::join!(h.ready(), h.ready());
1491        assert!(r1.is_ok() && r2.is_ok());
1492    }
1493
1494    // --
1495    // Fixtures for `host::spawn` tests.
1496
1497    #[derive(Debug, Clone, Copy)]
1498    enum ReadyMode {
1499        OkAfter(Duration),
1500        ErrTerminal,
1501        ErrChannelClosed,
1502    }
1503
1504    #[derive(Debug, Clone)]
1505    struct TestHandle {
1506        id: ProcId,
1507        addr: ChannelAddr,
1508        agent: ActorRef<()>,
1509        mode: ReadyMode,
1510        omit_addr: bool,
1511        omit_agent: bool,
1512    }
1513
1514    #[async_trait::async_trait]
1515    impl ProcHandle for TestHandle {
1516        type Agent = ();
1517        type TerminalStatus = ();
1518
1519        fn proc_id(&self) -> &ProcId {
1520            &self.id
1521        }
1522
1523        fn addr(&self) -> Option<ChannelAddr> {
1524            if self.omit_addr {
1525                None
1526            } else {
1527                Some(self.addr.clone())
1528            }
1529        }
1530
1531        fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1532            if self.omit_agent {
1533                None
1534            } else {
1535                Some(self.agent.clone())
1536            }
1537        }
1538
1539        async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1540            match self.mode {
1541                ReadyMode::OkAfter(d) => {
1542                    if !d.is_zero() {
1543                        RealClock.sleep(d).await;
1544                    }
1545                    Ok(())
1546                }
1547                ReadyMode::ErrTerminal => Err(ReadyError::Terminal(())),
1548                ReadyMode::ErrChannelClosed => Err(ReadyError::ChannelClosed),
1549            }
1550        }
1551        async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1552            Ok(())
1553        }
1554        async fn terminate(
1555            &self,
1556            _cx: &impl context::Actor,
1557            _timeout: Duration,
1558        ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
1559            Err(TerminateError::Unsupported)
1560        }
1561        async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
1562            Err(TerminateError::Unsupported)
1563        }
1564    }
1565
1566    #[derive(Debug, Clone)]
1567    struct TestManager {
1568        mode: ReadyMode,
1569        omit_addr: bool,
1570        omit_agent: bool,
1571        transport: ChannelTransport,
1572    }
1573
1574    impl TestManager {
1575        fn local(mode: ReadyMode) -> Self {
1576            Self {
1577                mode,
1578                omit_addr: false,
1579                omit_agent: false,
1580                transport: ChannelTransport::Local,
1581            }
1582        }
1583        fn with_omissions(mut self, addr: bool, agent: bool) -> Self {
1584            self.omit_addr = addr;
1585            self.omit_agent = agent;
1586            self
1587        }
1588    }
1589
1590    #[async_trait::async_trait]
1591    impl ProcManager for TestManager {
1592        type Handle = TestHandle;
1593
1594        fn transport(&self) -> ChannelTransport {
1595            self.transport.clone()
1596        }
1597
1598        async fn spawn(
1599            &self,
1600            proc_id: ProcId,
1601            forwarder_addr: ChannelAddr,
1602            _config: (),
1603        ) -> Result<Self::Handle, HostError> {
1604            let agent = ActorRef::<()>::attest(proc_id.actor_id("agent", 0));
1605            Ok(TestHandle {
1606                id: proc_id,
1607                addr: forwarder_addr,
1608                agent,
1609                mode: self.mode,
1610                omit_addr: self.omit_addr,
1611                omit_agent: self.omit_agent,
1612            })
1613        }
1614    }
1615
1616    #[tokio::test]
1617    async fn host_spawn_times_out_when_configured() {
1618        let cfg = hyperactor_config::global::lock();
1619        let _g = cfg.override_key(
1620            crate::config::HOST_SPAWN_READY_TIMEOUT,
1621            Duration::from_millis(10),
1622        );
1623
1624        let mut host = Host::new(
1625            TestManager::local(ReadyMode::OkAfter(Duration::from_millis(50))),
1626            ChannelAddr::any(ChannelTransport::Local),
1627        )
1628        .await
1629        .unwrap();
1630
1631        let err = host.spawn("t".into(), ()).await.expect_err("must time out");
1632        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1633    }
1634
1635    #[tokio::test]
1636    async fn host_spawn_timeout_zero_disables_and_succeeds() {
1637        let cfg = hyperactor_config::global::lock();
1638        let _g = cfg.override_key(
1639            crate::config::HOST_SPAWN_READY_TIMEOUT,
1640            Duration::from_secs(0),
1641        );
1642
1643        let mut host = Host::new(
1644            TestManager::local(ReadyMode::OkAfter(Duration::from_millis(20))),
1645            ChannelAddr::any(ChannelTransport::Local),
1646        )
1647        .await
1648        .unwrap();
1649
1650        let (pid, agent) = host.spawn("ok".into(), ()).await.expect("must succeed");
1651        assert_eq!(agent.actor_id().proc_id(), &pid);
1652        assert!(host.procs.contains("ok"));
1653    }
1654
1655    #[tokio::test]
1656    async fn host_spawn_maps_channel_closed_ready_error_to_config_failure() {
1657        let mut host = Host::new(
1658            TestManager::local(ReadyMode::ErrChannelClosed),
1659            ChannelAddr::any(ChannelTransport::Local),
1660        )
1661        .await
1662        .unwrap();
1663
1664        let err = host.spawn("p".into(), ()).await.expect_err("must fail");
1665        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1666    }
1667
1668    #[tokio::test]
1669    async fn host_spawn_maps_terminal_ready_error_to_config_failure() {
1670        let mut host = Host::new(
1671            TestManager::local(ReadyMode::ErrTerminal),
1672            ChannelAddr::any(ChannelTransport::Local),
1673        )
1674        .await
1675        .unwrap();
1676
1677        let err = host.spawn("p".into(), ()).await.expect_err("must fail");
1678        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1679    }
1680
1681    #[tokio::test]
1682    async fn host_spawn_fails_if_ready_but_missing_addr() {
1683        let mut host = Host::new(
1684            TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(true, false),
1685            ChannelAddr::any(ChannelTransport::Local),
1686        )
1687        .await
1688        .unwrap();
1689
1690        let err = host
1691            .spawn("no-addr".into(), ())
1692            .await
1693            .expect_err("must fail");
1694        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1695    }
1696
1697    #[tokio::test]
1698    async fn host_spawn_fails_if_ready_but_missing_agent() {
1699        let mut host = Host::new(
1700            TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(false, true),
1701            ChannelAddr::any(ChannelTransport::Local),
1702        )
1703        .await
1704        .unwrap();
1705
1706        let err = host
1707            .spawn("no-agent".into(), ())
1708            .await
1709            .expect_err("must fail");
1710        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1711    }
1712}