hyperactor/host.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module defines [`Host`], which represents all the procs running on a host.
10//! The procs themselves are managed by an implementation of [`ProcManager`], which may,
11//! for example, fork new processes for each proc, or spawn them in the same process
12//! for testing purposes.
13//!
14//! The primary purpose of a host is to manage the lifecycle of these procs, and to
15//! serve as a single front-end for all the procs on a host, multiplexing network
16//! channels.
17//!
18//! ## Channel muxing
19//!
20//! A [`Host`] maintains a single frontend address, through which all procs are accessible
21//! through direct addressing: the id of each proc is the `ProcId(frontend_addr, proc_name)`.
22//! In the following, the frontend address is denoted by `*`. The host listens on `*` and
23//! multiplexes messages based on the proc name. When spawning procs, the host maintains
24//! backend channels with separate addresses. In the diagram `#` is the backend address of
25//! the host, while `#n` is the backend address for proc *n*. The host forwards messages
26//! to the appropriate backend channel, while procs forward messages to the host backend
27//! channel at `#`.
28//!
29//! ```text
30//! ┌────────────┐
31//! ┌───▶ proc *,1 │
32//! │ #1└────────────┘
33//! │
34//! ┌──────────┐ │ ┌────────────┐
35//! │ Host │◀───┼───▶ proc *,2 │
36//! *└──────────┘# │ #2└────────────┘
37//! │
38//! │ ┌────────────┐
39//! └───▶ proc *,3 │
40//! #3└────────────┘
41//! ```
42//!
43//! ## Local proc invariant (LP-*)
44//!
45//! - **LP-1 (lazy activation):** The local proc always exists as a
46//! `ProcId::Direct(addr, LOCAL_PROC_NAME)` and is forwarded
47//! in-process by the host's mailbox muxer. However it starts with
48//! zero actors. A `ProcAgent` and root client actor are added only
49//! when `HostMeshAgent::handle(GetLocalProc)` is first called.
50
51use std::collections::HashMap;
52use std::collections::HashSet;
53use std::fmt;
54use std::marker::PhantomData;
55use std::str::FromStr;
56use std::sync::Arc;
57use std::time::Duration;
58
59use async_trait::async_trait;
60use futures::Future;
61use futures::StreamExt;
62use futures::stream;
63use tokio::process::Child;
64use tokio::process::Command;
65use tokio::sync::Mutex;
66
67use crate as hyperactor;
68use crate::Actor;
69use crate::ActorHandle;
70use crate::PortHandle;
71use crate::Proc;
72use crate::actor::Binds;
73use crate::actor::Referable;
74use crate::channel;
75use crate::channel::ChannelAddr;
76use crate::channel::ChannelError;
77use crate::channel::ChannelRx;
78use crate::channel::ChannelTransport;
79use crate::channel::Rx;
80use crate::channel::Tx;
81use crate::context;
82use crate::mailbox::BoxableMailboxSender;
83use crate::mailbox::BoxedMailboxSender;
84use crate::mailbox::DialMailboxRouter;
85use crate::mailbox::IntoBoxedMailboxSender as _;
86use crate::mailbox::MailboxClient;
87use crate::mailbox::MailboxSender;
88use crate::mailbox::MailboxServer;
89use crate::mailbox::MailboxServerHandle;
90use crate::mailbox::MessageEnvelope;
91use crate::mailbox::Undeliverable;
92use crate::reference;
93
94/// Name of the system service proc on a host — hosts the admin actor
95/// layer (HostMeshAgent, MeshAdminAgent, bridge).
96pub const SERVICE_PROC_NAME: &str = "service";
97
98/// Name of the local client proc on a host.
99///
100/// See LP-1 (lazy activation) in module doc.
101///
102/// In pure-Rust programs (e.g. sieve, dining_philosophers)
103/// `GetLocalProc` is never sent, so the local proc remains empty
104/// throughout the program's lifetime. Code that inspects the local
105/// proc's actors must not assume they exist.
106pub const LOCAL_PROC_NAME: &str = "local";
107
108/// The type of error produced by host operations.
109#[derive(Debug, thiserror::Error)]
110pub enum HostError {
111 /// A channel error occurred during a host operation.
112 #[error(transparent)]
113 ChannelError(#[from] ChannelError),
114
115 /// The named proc already exists and cannot be spawned.
116 #[error("proc '{0}' already exists")]
117 ProcExists(String),
118
119 /// Failures occuring while spawning a subprocess.
120 #[error("proc '{0}' failed to spawn process: {1}")]
121 ProcessSpawnFailure(reference::ProcId, #[source] std::io::Error),
122
123 /// Failures occuring while configuring a subprocess.
124 #[error("proc '{0}' failed to configure process: {1}")]
125 ProcessConfigurationFailure(reference::ProcId, #[source] anyhow::Error),
126
127 /// Failures occuring while spawning a management actor in a proc.
128 #[error("failed to spawn agent on proc '{0}': {1}")]
129 AgentSpawnFailure(reference::ProcId, #[source] anyhow::Error),
130
131 /// An input parameter was missing.
132 #[error("parameter '{0}' missing: {1}")]
133 MissingParameter(String, std::env::VarError),
134
135 /// An input parameter was invalid.
136 #[error("parameter '{0}' invalid: {1}")]
137 InvalidParameter(String, anyhow::Error),
138}
139
140/// A host, managing the lifecycle of several procs, and their backend
141/// routing, as described in this module's documentation.
142pub struct Host<M> {
143 procs: HashSet<String>,
144 frontend_addr: ChannelAddr,
145 backend_addr: ChannelAddr,
146 router: DialMailboxRouter,
147 manager: M,
148 service_proc: Proc,
149 local_proc: Proc,
150 frontend_rx: Option<ChannelRx<MessageEnvelope>>,
151}
152
153impl<M: ProcManager> Host<M> {
154 /// Serve a host using the provided ProcManager, on the provided `addr`.
155 /// On success, the host will multiplex messages for procs on the host
156 /// on the address of the host.
157 pub async fn new(manager: M, addr: ChannelAddr) -> Result<Self, HostError> {
158 Self::new_with_default(manager, addr, None).await
159 }
160
161 /// Like [`new`], serves a host using the provided ProcManager, on the provided `addr`.
162 /// Unknown destinations are forwarded to the default sender.
163 #[crate::instrument(fields(addr=addr.to_string()))]
164 pub async fn new_with_default(
165 manager: M,
166 addr: ChannelAddr,
167 default_sender: Option<BoxedMailboxSender>,
168 ) -> Result<Self, HostError> {
169 let (frontend_addr, frontend_rx) = channel::serve(addr)?;
170
171 // We set up a cascade of routers: first, the outer router supports
172 // sending to the the system proc, while the dial router manages dialed
173 // connections.
174 let router = match default_sender {
175 Some(d) => DialMailboxRouter::new_with_default(d),
176 None => DialMailboxRouter::new(),
177 };
178
179 // Establish a backend channel on the preferred transport. We currently simply
180 // serve the same router on both.
181 let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?;
182
183 // Set up a system proc. This is often used to manage the host itself.
184 // These use with_name (not unique) because their uniqueness is
185 // guaranteed by the ChannelAddr component, and the Name type's
186 // '-' delimiter must not collide with a hash suffix.
187 let service_proc_id =
188 reference::ProcId::with_name(frontend_addr.clone(), SERVICE_PROC_NAME);
189 let service_proc = Proc::configured(service_proc_id.clone(), router.boxed());
190
191 let local_proc_id = reference::ProcId::with_name(frontend_addr.clone(), LOCAL_PROC_NAME);
192 let local_proc = Proc::configured(local_proc_id.clone(), router.boxed());
193
194 tracing::info!(
195 frontend_addr = frontend_addr.to_string(),
196 backend_addr = backend_addr.to_string(),
197 service_proc_id = service_proc_id.to_string(),
198 local_proc_id = local_proc_id.to_string(),
199 "serving host"
200 );
201
202 let host = Host {
203 procs: HashSet::new(),
204 frontend_addr,
205 backend_addr,
206 router,
207 manager,
208 service_proc,
209 local_proc,
210 frontend_rx: Some(frontend_rx),
211 };
212
213 // We the same router on both frontend and backend addresses.
214 let _backend_handle = host.forwarder().serve(backend_rx);
215
216 Ok(host)
217 }
218
219 /// Start serving this host's mailbox on its frontend address.
220 /// Returns the server handle on first invocation; afterwards None.
221 pub fn serve(&mut self) -> Option<MailboxServerHandle> {
222 Some(self.forwarder().serve(self.frontend_rx.take()?))
223 }
224
225 /// The underlying proc manager.
226 pub fn manager(&self) -> &M {
227 &self.manager
228 }
229
230 /// The address which accepts messages destined for this host.
231 pub fn addr(&self) -> &ChannelAddr {
232 &self.frontend_addr
233 }
234
235 /// The system proc associated with this host.
236 /// This is used to run host-level system services like host managers.
237 pub fn system_proc(&self) -> &Proc {
238 &self.service_proc
239 }
240
241 /// The local proc associated with this host (`LOCAL_PROC_NAME`).
242 ///
243 /// Starts with zero actors; see invariant LP-1 on
244 /// [`LOCAL_PROC_NAME`] for activation semantics.
245 pub fn local_proc(&self) -> &Proc {
246 &self.local_proc
247 }
248
249 /// Spawn a new process with the given `name`. On success, the
250 /// proc has been spawned, and is reachable through the returned,
251 /// direct-addressed ProcId, which will be
252 /// `ProcId(self.addr(), name)`.
253 pub async fn spawn(
254 &mut self,
255 name: String,
256 config: M::Config,
257 ) -> Result<(reference::ProcId, reference::ActorRef<ManagerAgent<M>>), HostError> {
258 if self.procs.contains(&name) {
259 return Err(HostError::ProcExists(name));
260 }
261
262 let proc_id = reference::ProcId::with_name(self.frontend_addr.clone(), &name);
263 let handle = self
264 .manager
265 .spawn(proc_id.clone(), self.backend_addr.clone(), config)
266 .await?;
267
268 // Await readiness (config-driven; 0s disables timeout).
269 let to: Duration = hyperactor_config::global::get(crate::config::HOST_SPAWN_READY_TIMEOUT);
270 let ready = if to == Duration::from_secs(0) {
271 ReadyProc::ensure(&handle).await
272 } else {
273 match tokio::time::timeout(to, ReadyProc::ensure(&handle)).await {
274 Ok(result) => result,
275 Err(_elapsed) => Err(ReadyProcError::Timeout),
276 }
277 }
278 .map_err(|e| {
279 HostError::ProcessConfigurationFailure(proc_id.clone(), anyhow::anyhow!("{e:?}"))
280 })?;
281
282 self.router
283 .bind(proc_id.clone().into(), ready.addr().clone());
284 self.procs.insert(name.clone());
285
286 Ok((proc_id, ready.agent_ref().clone()))
287 }
288
289 fn forwarder(&self) -> ProcOrDial {
290 ProcOrDial {
291 service_proc: self.service_proc.clone(),
292 local_proc: self.local_proc.clone(),
293 dialer: self.router.clone(),
294 }
295 }
296}
297
298/// A router used to route to the system proc, or else fall back to
299/// the dial mailbox router.
300#[derive(Clone)]
301struct ProcOrDial {
302 service_proc: Proc,
303 local_proc: Proc,
304 dialer: DialMailboxRouter,
305}
306
307impl MailboxSender for ProcOrDial {
308 fn post_unchecked(
309 &self,
310 envelope: MessageEnvelope,
311 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
312 ) {
313 if envelope.dest().actor_id().proc_id() == self.service_proc.proc_id() {
314 self.service_proc.post_unchecked(envelope, return_handle);
315 } else if envelope.dest().actor_id().proc_id() == self.local_proc.proc_id() {
316 self.local_proc.post_unchecked(envelope, return_handle);
317 } else {
318 self.dialer.post_unchecked(envelope, return_handle)
319 }
320 }
321}
322
323/// Error returned by [`ProcHandle::ready`].
324#[derive(Debug, Clone)]
325pub enum ReadyError<TerminalStatus> {
326 /// The proc reached a terminal state before becoming Ready.
327 Terminal(TerminalStatus),
328 /// Implementation lost its status channel / cannot observe state.
329 ChannelClosed,
330}
331
332/// Error returned by [`ready_proc`].
333#[derive(Debug, Clone)]
334pub enum ReadyProcError<TerminalStatus> {
335 /// Timed out waiting for ready.
336 Timeout,
337 /// The underlying `ready()` call failed.
338 Ready(ReadyError<TerminalStatus>),
339 /// The handle's `addr()` returned `None` after `ready()` succeeded.
340 MissingAddr,
341 /// The handle's `agent_ref()` returned `None` after `ready()`
342 /// succeeded.
343 MissingAgentRef,
344}
345
346impl<T> From<ReadyError<T>> for ReadyProcError<T> {
347 fn from(e: ReadyError<T>) -> Self {
348 ReadyProcError::Ready(e)
349 }
350}
351
352/// Error returned by [`ProcHandle::wait`].
353#[derive(Debug, Clone)]
354pub enum WaitError {
355 /// Implementation lost its status channel / cannot observe state.
356 ChannelClosed,
357}
358
359/// Error returned by [`ProcHandle::terminate`] and
360/// [`ProcHandle::kill`].
361///
362/// - `Unsupported`: the manager cannot perform the requested proc
363/// signaling (e.g., local/in-process manager that doesn't emulate
364/// kill).
365/// - `AlreadyTerminated(term)`: the proc was already terminal; `term`
366/// is the same value `wait()` would return.
367/// - `ChannelClosed`: the manager lost its lifecycle channel and
368/// cannot reliably observe state transitions.
369/// - `Io(err)`: manager-specific failure delivering the signal or
370/// performing shutdown (e.g., OS error on kill).
371#[derive(Debug)]
372pub enum TerminateError<TerminalStatus> {
373 /// Manager doesn't support signaling (e.g., Local manager).
374 Unsupported,
375 /// A terminal state was already reached while attempting
376 /// terminate/kill.
377 AlreadyTerminated(TerminalStatus),
378 /// Implementation lost its status channel / cannot observe state.
379 ChannelClosed,
380 /// Manager-specific failure to deliver signal or perform
381 /// shutdown.
382 Io(anyhow::Error),
383}
384
385impl<T: fmt::Debug> fmt::Display for TerminateError<T> {
386 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
387 match self {
388 TerminateError::Unsupported => write!(f, "terminate/kill unsupported by manager"),
389 TerminateError::AlreadyTerminated(st) => {
390 write!(f, "proc already terminated (status: {st:?})")
391 }
392 TerminateError::ChannelClosed => {
393 write!(f, "lifecycle channel closed; cannot observe state")
394 }
395 TerminateError::Io(err) => write!(f, "I/O error during terminate/kill: {err}"),
396 }
397 }
398}
399
400impl<T: fmt::Debug> std::error::Error for TerminateError<T> {
401 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
402 match self {
403 TerminateError::Io(err) => Some(err.root_cause()),
404 _ => None,
405 }
406 }
407}
408
409/// Summary of results from a bulk termination attempt.
410///
411/// - `attempted`: total number of child procs for which termination
412/// was attempted.
413/// - `ok`: number of procs successfully terminated (includes those
414/// that were already in a terminal state).
415/// - `failed`: number of procs that could not be terminated (e.g.
416/// signaling errors or lost lifecycle channel).
417#[derive(Debug)]
418pub struct TerminateSummary {
419 /// Total number of child procs for which termination was
420 /// attempted.
421 pub attempted: usize,
422 /// Number of procs that successfully reached a terminal state.
423 ///
424 /// This count includes both procs that exited cleanly after
425 /// `terminate(timeout)` and those that were already in a terminal
426 /// state before termination was attempted.
427 pub ok: usize,
428 /// Number of procs that failed to terminate.
429 ///
430 /// Failures typically arise from signaling errors (e.g., OS
431 /// failure to deliver SIGTERM/SIGKILL) or a lost lifecycle
432 /// channel, meaning the manager could no longer observe state
433 /// transitions.
434 pub failed: usize,
435}
436
437impl fmt::Display for TerminateSummary {
438 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
439 write!(
440 f,
441 "attempted={} ok={} failed={}",
442 self.attempted, self.ok, self.failed
443 )
444 }
445}
446
447#[async_trait::async_trait]
448/// Trait for terminating a single proc.
449pub trait SingleTerminate: Send + Sync {
450 /// Gracefully terminate the given proc.
451 ///
452 /// Initiates a polite shutdown for each child, waits up to
453 /// `timeout` for completion, then escalates to a forceful stop
454 /// The returned [`TerminateSummary`] reports how
455 /// many children were attempted, succeeded, and failed.
456 ///
457 /// Implementation notes:
458 /// - "Polite shutdown" and "forceful stop" are intentionally
459 /// abstract. Implementors should map these to whatever
460 /// semantics they control (e.g., proc-level drain/abort, RPCs,
461 /// OS signals).
462 /// - The operation must be idempotent and tolerate races with
463 /// concurrent termination or external exits.
464 ///
465 /// # Parameters
466 /// - `timeout`: Per-child grace period before escalation to a
467 /// forceful stop.
468 /// - `reason`: Human-readable reason for termination.
469 /// Returns a tuple of (polite shutdown actors vec, forceful stop actors vec)
470 async fn terminate_proc(
471 &self,
472 cx: &impl context::Actor,
473 proc: &reference::ProcId,
474 timeout: std::time::Duration,
475 reason: &str,
476 ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error>;
477}
478
479/// Trait for managers that can terminate many child **units** in
480/// bulk.
481///
482/// Implementors provide a concurrency-bounded, graceful shutdown over
483/// all currently tracked children (polite stop → wait → forceful
484/// stop), returning a summary of outcomes. The exact stop/kill
485/// semantics are manager-specific: for example, an OS-process manager
486/// might send signals, while an in-process manager might drain/abort
487/// tasks.
488#[async_trait::async_trait]
489pub trait BulkTerminate: Send + Sync {
490 /// Gracefully terminate all known children.
491 ///
492 /// Initiates a polite shutdown for each child, waits up to
493 /// `timeout` for completion, then escalates to a forceful stop
494 /// for any that remain. Work may be done in parallel, capped by
495 /// `max_in_flight`. The returned [`TerminateSummary`] reports how
496 /// many children were attempted, succeeded, and failed.
497 ///
498 /// Implementation notes:
499 /// - "Polite shutdown" and "forceful stop" are intentionally
500 /// abstract. Implementors should map these to whatever
501 /// semantics they control (e.g., proc-level drain/abort, RPCs,
502 /// OS signals).
503 /// - The operation must be idempotent and tolerate races with
504 /// concurrent termination or external exits.
505 ///
506 /// # Parameters
507 /// - `timeout`: Per-child grace period before escalation to a
508 /// forceful stop.
509 /// - `max_in_flight`: Upper bound on concurrent terminations (≥
510 /// 1) to prevent resource spikes (I/O, CPU, file descriptors,
511 /// etc.).
512 async fn terminate_all(
513 &self,
514 cx: &impl context::Actor,
515 timeout: std::time::Duration,
516 max_in_flight: usize,
517 reason: &str,
518 ) -> TerminateSummary;
519}
520
521// Host convenience that's available only when its manager supports
522// bulk termination.
523impl<M: ProcManager + BulkTerminate> Host<M> {
524 /// Gracefully terminate all procs spawned by this host.
525 ///
526 /// Delegates to the underlying manager’s
527 /// [`BulkTerminate::terminate_all`] implementation. Use this to
528 /// perform orderly teardown during scale-down or shutdown.
529 ///
530 /// # Parameters
531 /// - `timeout`: Per-child grace period before escalation.
532 /// - `max_in_flight`: Upper bound on concurrent terminations.
533 ///
534 /// # Returns
535 /// A [`TerminateSummary`] with counts of attempted/ok/failed
536 /// terminations.
537 pub async fn terminate_children(
538 &self,
539 cx: &impl context::Actor,
540 timeout: Duration,
541 max_in_flight: usize,
542 reason: &str,
543 ) -> TerminateSummary {
544 self.manager
545 .terminate_all(cx, timeout, max_in_flight, reason)
546 .await
547 }
548}
549
550#[async_trait::async_trait]
551impl<M: ProcManager + SingleTerminate> SingleTerminate for Host<M> {
552 async fn terminate_proc(
553 &self,
554 cx: &impl context::Actor,
555 proc: &reference::ProcId,
556 timeout: Duration,
557 reason: &str,
558 ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
559 self.manager.terminate_proc(cx, proc, timeout, reason).await
560 }
561}
562
563/// Capability proving a proc is ready.
564///
565/// [`ReadyProc::ensure`] validates that `addr()` and `agent_ref()`
566/// are available; this type carries that proof, providing infallible
567/// accessors.
568///
569/// Obtain a `ReadyProc` by calling `ready_proc(&handle).await`.
570pub struct ReadyProc<'a, H: ProcHandle> {
571 handle: &'a H,
572 addr: ChannelAddr,
573 agent_ref: reference::ActorRef<H::Agent>,
574}
575
576impl<'a, H: ProcHandle> ReadyProc<'a, H> {
577 /// Wait for a proc to become ready, then return a capability that
578 /// provides infallible access to `addr()` and `agent_ref()`.
579 ///
580 /// This is the type-safe way to obtain the proc's address and
581 /// agent reference. After this function returns `Ok(ready)`, both
582 /// `ready.addr()` and `ready.agent_ref()` are guaranteed to
583 /// succeed.
584 pub async fn ensure(
585 handle: &'a H,
586 ) -> Result<ReadyProc<'a, H>, ReadyProcError<H::TerminalStatus>> {
587 handle.ready().await?;
588 let addr = handle.addr().ok_or(ReadyProcError::MissingAddr)?;
589 let agent_ref = handle.agent_ref().ok_or(ReadyProcError::MissingAgentRef)?;
590 Ok(ReadyProc {
591 handle,
592 addr,
593 agent_ref,
594 })
595 }
596
597 /// The proc's logical identity.
598 pub fn proc_id(&self) -> &reference::ProcId {
599 self.handle.proc_id()
600 }
601
602 /// The proc's address (guaranteed available after ready).
603 pub fn addr(&self) -> &ChannelAddr {
604 &self.addr
605 }
606
607 /// The agent actor reference (guaranteed available after ready).
608 pub fn agent_ref(&self) -> &reference::ActorRef<H::Agent> {
609 &self.agent_ref
610 }
611}
612
613/// Minimal uniform surface for a spawned-**proc** handle returned by
614/// a `ProcManager`. Each manager can return its own concrete handle,
615/// as long as it exposes these. A **proc** is the Hyperactor runtime
616/// + its actors (lifecycle controlled via `Proc` APIs such as
617/// `destroy_and_wait`). A proc **may** be hosted *inside* an OS
618/// **process**, but it is conceptually distinct:
619///
620/// - `LocalProcManager`: runs the proc **in this OS process**; there
621/// is no child process to signal. Lifecycle is entirely proc-level.
622/// - `ProcessProcManager` (test-only here): launches an **external OS
623/// process** which hosts the proc, but this toy manager does
624/// **not** wire a control plane for shutdown, nor an exit monitor.
625///
626/// This trait is therefore written in terms of the **proc**
627/// lifecycle:
628///
629/// - `ready()` resolves when the proc is Ready (mailbox bound; agent
630/// available).
631/// - `wait()` resolves with the proc's terminal status
632/// (Stopped/Killed/Failed).
633/// - `terminate()` requests a graceful shutdown of the *proc* and
634/// waits up to the deadline; managers that also own a child OS
635/// process may escalate to `SIGKILL` if the proc does not exit in
636/// time.
637/// - `kill()` requests an immediate, forced termination. For
638/// in-process procs, this may be implemented as an immediate
639/// drain/abort of actor tasks. For external procs, this is
640/// typically a `SIGKILL`.
641///
642/// The shape of the terminal value is `Self::TerminalStatus`.
643/// Managers that track rich info (exit code, signal, address, agent)
644/// can expose it; trivial managers may use `()`.
645///
646/// Managers that do not support signaling must return `Unsupported`.
647#[async_trait]
648pub trait ProcHandle: Clone + Send + Sync + 'static {
649 /// The agent actor type installed in the proc by the manager.
650 /// Must implement both:
651 /// - [`Actor`], because the agent actually runs inside the proc,
652 /// and
653 /// - [`Referable`], so callers can hold `ActorRef<Self::Agent>`.
654 type Agent: Actor + Referable;
655
656 /// The type of terminal status produced when the proc exits.
657 ///
658 /// For example, an external proc manager may use a rich status
659 /// enum (e.g. `ProcStatus`), while an in-process manager may use
660 /// a trivial unit type. This is the value returned by
661 /// [`ProcHandle::wait`] and carried by [`ReadyError::Terminal`].
662 type TerminalStatus: std::fmt::Debug + Clone + Send + Sync + 'static;
663
664 /// The proc's logical identity on this host.
665 fn proc_id(&self) -> &reference::ProcId;
666
667 /// The proc's address (the one callers bind into the host
668 /// router). May return `None` before `ready()` completes.
669 /// Guaranteed to return `Some` after `ready()` succeeds.
670 ///
671 /// **Prefer [`ready_proc()`]** for type-safe access that
672 /// guarantees availability at compile time.
673 fn addr(&self) -> Option<ChannelAddr>;
674
675 /// The agent actor reference hosted in the proc. May return
676 /// `None` before `ready()` completes. Guaranteed to return `Some`
677 /// after `ready()` succeeds.
678 ///
679 /// **Prefer [`ready_proc()`]** for type-safe access that
680 /// guarantees availability at compile time.
681 fn agent_ref(&self) -> Option<reference::ActorRef<Self::Agent>>;
682
683 /// Resolves when the proc becomes Ready. Multi-waiter,
684 /// non-consuming.
685 async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>>;
686
687 /// Resolves with the terminal status (Stopped/Killed/Failed/etc).
688 /// Multi-waiter, non-consuming.
689 async fn wait(&self) -> Result<Self::TerminalStatus, WaitError>;
690
691 /// Politely stop the proc before the deadline; managers that own
692 /// a child OS process may escalate to a forced kill at the
693 /// deadline. Idempotent and race-safe: concurrent callers
694 /// coalesce; the first terminal outcome wins and all callers
695 /// observe it via `wait()`.
696 ///
697 /// Returns the single terminal status the proc reached (the same
698 /// value `wait()` will return). Never fabricates terminal states:
699 /// this is only returned after the exit monitor observes
700 /// termination.
701 ///
702 /// # Parameters
703 /// - `cx`: The actor context for sending messages.
704 /// - `timeout`: Grace period before escalation.
705 /// - `reason`: Human-readable reason for termination.
706 async fn terminate(
707 &self,
708 cx: &impl context::Actor,
709 timeout: Duration,
710 reason: &str,
711 ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
712
713 /// Force the proc down immediately. For in-process managers this
714 /// may abort actor tasks; for external managers this typically
715 /// sends `SIGKILL`. Also idempotent/race-safe; the terminal
716 /// outcome is the one observed by `wait()`.
717 async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
718}
719
720/// A trait describing a manager of procs, responsible for bootstrapping
721/// procs on a host, and managing their lifetimes. The manager spawns an
722/// `Agent`-typed actor on each proc, responsible for managing the proc.
723#[async_trait]
724pub trait ProcManager {
725 /// Concrete handle type this manager returns.
726 type Handle: ProcHandle;
727
728 /// Additional configuration for the proc, supported by this manager.
729 type Config = ();
730
731 /// The preferred transport for this ProcManager.
732 /// In practice this will be [`ChannelTransport::Local`]
733 /// for testing, and [`ChannelTransport::Unix`] for external
734 /// processes.
735 fn transport(&self) -> ChannelTransport;
736
737 /// Spawn a new proc with the provided proc id. The proc
738 /// should use the provided forwarder address for messages
739 /// destined outside of the proc. The returned address accepts
740 /// messages destined for the proc.
741 ///
742 /// An agent actor is also spawned, and the corresponding actor
743 /// ref is returned.
744 async fn spawn(
745 &self,
746 proc_id: reference::ProcId,
747 forwarder_addr: ChannelAddr,
748 config: Self::Config,
749 ) -> Result<Self::Handle, HostError>;
750}
751
752/// Type alias for the agent actor managed by a given [`ProcManager`].
753///
754/// This resolves to the `Agent` type exposed by the manager's
755/// associated `Handle` (via [`ProcHandle::Agent`]). It provides a
756/// convenient shorthand so call sites can refer to
757/// `ActorRef<ManagerAgent<M>>` instead of the more verbose
758/// `<M::Handle as ProcHandle>::Agent`.
759///
760/// # Example
761/// ```ignore
762/// fn takes_agent_ref<M: ProcManager>(r: ActorRef<ManagerAgent<M>>) { … }
763/// ```
764pub type ManagerAgent<M> = <<M as ProcManager>::Handle as ProcHandle>::Agent; // rust issue #112792
765
766/// Lifecycle status for procs managed by [`LocalProcManager`].
767///
768/// Used by [`LocalProcManager::request_stop`] to track background
769/// teardown progress.
770#[derive(Debug, Clone, Copy, PartialEq, Eq)]
771pub enum LocalProcStatus {
772 /// A stop has been requested but teardown is still in progress.
773 Stopping,
774 /// Teardown completed.
775 Stopped,
776}
777
778/// A ProcManager that spawns **in-process** procs (test-only).
779///
780/// The proc runs inside this same OS process; there is **no** child
781/// process to signal. Lifecycle is purely proc-level:
782/// - `terminate(timeout)`: delegates to
783/// `Proc::destroy_and_wait(timeout, None)`, which drains and, at the
784/// deadline, aborts remaining actors.
785/// - `kill()`: uses a zero deadline to emulate a forced stop via
786/// `destroy_and_wait(Duration::ZERO, None)`.
787/// - `wait()`: trivial (no external lifecycle to observe).
788///
789/// No OS signals are sent or required.
790pub struct LocalProcManager<S> {
791 procs: Arc<Mutex<HashMap<reference::ProcId, Proc>>>,
792 stopping: Arc<Mutex<HashMap<reference::ProcId, LocalProcStatus>>>,
793 spawn: S,
794}
795
796impl<S> LocalProcManager<S> {
797 /// Create a new in-process proc manager with the given agent
798 /// params.
799 pub fn new(spawn: S) -> Self {
800 Self {
801 procs: Arc::new(Mutex::new(HashMap::new())),
802 stopping: Arc::new(Mutex::new(HashMap::new())),
803 spawn,
804 }
805 }
806
807 /// Non-blocking stop: remove the proc and spawn a background task
808 /// that tears it down.
809 ///
810 /// Status transitions through `Stopping` -> `Stopped` and is
811 /// observable via [`local_proc_status`]. Idempotent: no-ops if
812 /// the proc is already stopping or stopped.
813 pub async fn request_stop(&self, proc: &reference::ProcId, timeout: Duration, reason: &str) {
814 {
815 let guard = self.stopping.lock().await;
816 if guard.contains_key(proc) {
817 return;
818 }
819 }
820
821 let mut proc_handle = {
822 let mut guard = self.procs.lock().await;
823 match guard.remove(proc) {
824 Some(p) => p,
825 None => return,
826 }
827 };
828
829 let proc_id = proc_handle.proc_id().clone();
830 self.stopping
831 .lock()
832 .await
833 .insert(proc_id.clone(), LocalProcStatus::Stopping);
834
835 let stopping = Arc::clone(&self.stopping);
836 let reason = reason.to_string();
837 tokio::spawn(async move {
838 if let Err(e) = proc_handle
839 .destroy_and_wait::<()>(timeout, None, &reason)
840 .await
841 {
842 tracing::warn!(error = %e, "request_stop(local): destroy_and_wait failed");
843 }
844 stopping
845 .lock()
846 .await
847 .insert(proc_id, LocalProcStatus::Stopped);
848 });
849 }
850
851 /// Query the lifecycle status of a proc that was stopped via
852 /// [`request_stop`].
853 ///
854 /// Returns `None` if the proc was never stopped through this path.
855 pub async fn local_proc_status(&self, proc: &reference::ProcId) -> Option<LocalProcStatus> {
856 self.stopping.lock().await.get(proc).copied()
857 }
858}
859
860#[async_trait]
861impl<S> BulkTerminate for LocalProcManager<S>
862where
863 S: Send + Sync,
864{
865 async fn terminate_all(
866 &self,
867 _cx: &impl context::Actor,
868 timeout: std::time::Duration,
869 max_in_flight: usize,
870 reason: &str,
871 ) -> TerminateSummary {
872 // Snapshot procs so we don't hold the lock across awaits.
873 let procs: Vec<Proc> = {
874 let guard = self.procs.lock().await;
875 guard.values().cloned().collect()
876 };
877
878 let attempted = procs.len();
879
880 let results = stream::iter(procs.into_iter().map(|mut p| async move {
881 // For local manager, graceful proc-level stop.
882 match p.destroy_and_wait::<()>(timeout, None, reason).await {
883 Ok(_) => true,
884 Err(e) => {
885 tracing::warn!(error=%e, "terminate_all(local): destroy_and_wait failed");
886 false
887 }
888 }
889 }))
890 .buffer_unordered(max_in_flight.max(1))
891 .collect::<Vec<bool>>()
892 .await;
893
894 let ok = results.into_iter().filter(|b| *b).count();
895
896 TerminateSummary {
897 attempted,
898 ok,
899 failed: attempted.saturating_sub(ok),
900 }
901 }
902}
903
904#[async_trait::async_trait]
905impl<S> SingleTerminate for LocalProcManager<S>
906where
907 S: Send + Sync,
908{
909 async fn terminate_proc(
910 &self,
911 _cx: &impl context::Actor,
912 proc: &reference::ProcId,
913 timeout: std::time::Duration,
914 reason: &str,
915 ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
916 // Snapshot procs so we don't hold the lock across awaits.
917 let procs: Option<Proc> = {
918 let mut guard = self.procs.lock().await;
919 guard.remove(proc)
920 };
921 if let Some(mut p) = procs {
922 p.destroy_and_wait::<()>(timeout, None, reason).await
923 } else {
924 Err(anyhow::anyhow!("proc {} doesn't exist", proc))
925 }
926 }
927}
928
929/// A lightweight [`ProcHandle`] for procs managed **in-process** via
930/// [`LocalProcManager`].
931///
932/// This handle wraps the minimal identifying state of a spawned proc:
933/// - its [`ProcId`] (logical identity on the host),
934/// - the proc's [`ChannelAddr`] (the address callers bind into the
935/// host router), and
936/// - the [`ActorRef`] to the agent actor hosted in the proc.
937///
938/// Unlike external handles, `LocalHandle` does **not** manage an OS
939/// child process. It provides a uniform surface (`proc_id()`,
940/// `addr()`, `agent_ref()`) and implements `terminate()`/`kill()` by
941/// calling into the underlying `Proc::destroy_and_wait`, i.e.,
942/// **proc-level** shutdown.
943///
944/// **Type parameter:** `A` is constrained by the `ProcHandle::Agent`
945/// bound (`Actor + Referable`).
946pub struct LocalHandle<A: Actor + Referable> {
947 proc_id: reference::ProcId,
948 addr: ChannelAddr,
949 agent_ref: reference::ActorRef<A>,
950 procs: Arc<Mutex<HashMap<reference::ProcId, Proc>>>,
951}
952
953// Manual `Clone` to avoid requiring `A: Clone`.
954impl<A: Actor + Referable> Clone for LocalHandle<A> {
955 fn clone(&self) -> Self {
956 Self {
957 proc_id: self.proc_id.clone(),
958 addr: self.addr.clone(),
959 agent_ref: self.agent_ref.clone(),
960 procs: Arc::clone(&self.procs),
961 }
962 }
963}
964
965#[async_trait]
966impl<A: Actor + Referable> ProcHandle for LocalHandle<A> {
967 /// `Agent = A` (inherits `Actor + Referable` from the trait
968 /// bound).
969 type Agent = A;
970 type TerminalStatus = ();
971
972 fn proc_id(&self) -> &reference::ProcId {
973 &self.proc_id
974 }
975
976 fn addr(&self) -> Option<ChannelAddr> {
977 Some(self.addr.clone())
978 }
979
980 fn agent_ref(&self) -> Option<reference::ActorRef<Self::Agent>> {
981 Some(self.agent_ref.clone())
982 }
983
984 /// Always resolves immediately: a local proc is created
985 /// in-process and is usable as soon as the handle exists.
986 async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
987 Ok(())
988 }
989 /// Always resolves immediately with `()`: a local proc has no
990 /// external lifecycle to await. There is no OS child process
991 /// behind this handle.
992 async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
993 Ok(())
994 }
995
996 async fn terminate(
997 &self,
998 _cx: &impl context::Actor,
999 timeout: Duration,
1000 reason: &str,
1001 ) -> Result<(), TerminateError<Self::TerminalStatus>> {
1002 let mut proc = {
1003 let guard = self.procs.lock().await;
1004 match guard.get(self.proc_id()) {
1005 Some(p) => p.clone(),
1006 None => {
1007 // The proc was already removed; treat as already
1008 // terminal.
1009 return Err(TerminateError::AlreadyTerminated(()));
1010 }
1011 }
1012 };
1013
1014 // Graceful stop of the *proc* (actors) with a deadline. This
1015 // will drain and then abort remaining actors at expiry.
1016 let _ = proc
1017 .destroy_and_wait::<()>(timeout, None, reason)
1018 .await
1019 .map_err(TerminateError::Io)?;
1020
1021 Ok(())
1022 }
1023
1024 async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
1025 // Forced stop == zero deadline; `destroy_and_wait` will
1026 // immediately abort remaining actors and return.
1027 let mut proc = {
1028 let guard = self.procs.lock().await;
1029 match guard.get(self.proc_id()) {
1030 Some(p) => p.clone(),
1031 None => return Err(TerminateError::AlreadyTerminated(())),
1032 }
1033 };
1034
1035 let _ = proc
1036 .destroy_and_wait::<()>(Duration::from_millis(0), None, "kill")
1037 .await
1038 .map_err(TerminateError::Io)?;
1039
1040 Ok(())
1041 }
1042}
1043
1044/// Local, in-process ProcManager.
1045///
1046/// **Type bounds:**
1047/// - `A: Actor + Referable + Binds<A>`
1048/// - `Actor`: the agent actually runs inside the proc.
1049/// - `Referable`: callers hold `ActorRef<A>` to the agent; this
1050/// bound is required for typed remote refs.
1051/// - `Binds<A>`: lets the runtime wire the agent's message ports.
1052/// - `F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send`:
1053/// the spawn closure returns a Send future (we `tokio::spawn` it).
1054/// - `S: Fn(Proc) -> F + Sync`: the factory can be called from
1055/// concurrent contexts.
1056///
1057/// Result handle is `LocalHandle<A>` (whose `Agent = A` via `ProcHandle`).
1058#[async_trait]
1059impl<A, S, F> ProcManager for LocalProcManager<S>
1060where
1061 A: Actor + Referable + Binds<A>,
1062 F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send,
1063 S: Fn(Proc) -> F + Sync,
1064{
1065 type Handle = LocalHandle<A>;
1066
1067 fn transport(&self) -> ChannelTransport {
1068 ChannelTransport::Local
1069 }
1070
1071 #[crate::instrument(fields(proc_id=proc_id.to_string(), addr=forwarder_addr.to_string()))]
1072 async fn spawn(
1073 &self,
1074 proc_id: reference::ProcId,
1075 forwarder_addr: ChannelAddr,
1076 _config: (),
1077 ) -> Result<Self::Handle, HostError> {
1078 let transport = forwarder_addr.transport();
1079 let proc = Proc::configured(
1080 proc_id.clone(),
1081 MailboxClient::dial(forwarder_addr)?.into_boxed(),
1082 );
1083 let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?;
1084 self.procs
1085 .lock()
1086 .await
1087 .insert(proc_id.clone(), proc.clone());
1088 let _handle = proc.clone().serve(rx);
1089 let agent_handle = (self.spawn)(proc)
1090 .await
1091 .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
1092
1093 Ok(LocalHandle {
1094 proc_id,
1095 addr: proc_addr,
1096 agent_ref: agent_handle.bind(),
1097 procs: Arc::clone(&self.procs),
1098 })
1099 }
1100}
1101
1102/// A ProcManager that manages each proc as a **separate OS process**
1103/// (test-only toy).
1104///
1105/// This implementation launches a child via `Command` and relies on
1106/// `kill_on_drop(true)` so that children are SIGKILLed if the manager
1107/// (or host) drops. There is **no** proc control plane (no RPC to a
1108/// proc agent for shutdown) and **no** exit monitor wired here.
1109/// Consequently:
1110/// - `terminate()` and `kill()` return `Unsupported`.
1111/// - `wait()` is trivial (no lifecycle observation).
1112///
1113/// It follows a simple protocol:
1114///
1115/// Each process is launched with the following environment variables:
1116/// - `HYPERACTOR_HOST_BACKEND_ADDR`: the backend address to which all messages are forwarded,
1117/// - `HYPERACTOR_HOST_PROC_ID`: the proc id to assign the launched proc, and
1118/// - `HYPERACTOR_HOST_CALLBACK_ADDR`: the channel address with which to return the proc's address
1119///
1120/// The launched proc should also spawn an actor to manage it - the details of this are
1121/// implementation dependent, and outside the scope of the process manager.
1122///
1123/// The function [`boot_proc`] provides a convenient implementation of the
1124/// protocol.
1125pub struct ProcessProcManager<A> {
1126 program: std::path::PathBuf,
1127 children: Arc<Mutex<HashMap<reference::ProcId, Child>>>,
1128 _phantom: PhantomData<A>,
1129}
1130
1131impl<A> ProcessProcManager<A> {
1132 /// Create a new ProcessProcManager that runs the provided
1133 /// command.
1134 pub fn new(program: std::path::PathBuf) -> Self {
1135 Self {
1136 program,
1137 children: Arc::new(Mutex::new(HashMap::new())),
1138 _phantom: PhantomData,
1139 }
1140 }
1141}
1142
1143impl<A> Drop for ProcessProcManager<A> {
1144 fn drop(&mut self) {
1145 // When the manager is dropped, `children` is dropped, which
1146 // drops each `Child` handle. With `kill_on_drop(true)`, the OS
1147 // will SIGKILL the processes. Nothing else to do here.
1148 }
1149}
1150
1151/// A [`ProcHandle`] implementation for procs managed as separate
1152/// OS processes via [`ProcessProcManager`].
1153///
1154/// This handle records the logical identity and connectivity of an
1155/// external child process:
1156/// - its [`ProcId`] (unique identity on the host),
1157/// - the proc's [`ChannelAddr`] (address registered in the host
1158/// router),
1159/// - and the [`ActorRef`] of the agent actor spawned inside the proc.
1160///
1161/// Unlike [`LocalHandle`], this corresponds to a real OS process
1162/// launched by the manager. In this **toy** implementation the handle
1163/// does not own/monitor the `Child` and there is no shutdown control
1164/// plane. It is a stable, clonable surface exposing the proc's
1165/// identity, address, and agent reference so host code can interact
1166/// uniformly with local/external procs. `terminate()`/`kill()` are
1167/// intentionally `Unsupported` here; process cleanup relies on
1168/// `cmd.kill_on_drop(true)` when launching the child (the OS will
1169/// SIGKILL it if the handle is dropped).
1170///
1171/// The type bound `A: Actor + Referable` comes from the
1172/// [`ProcHandle::Agent`] requirement: `Actor` because the agent
1173/// actually runs inside the proc, and `Referable` because it must
1174/// be referenceable via [`ActorRef<A>`] (i.e., safe to carry as a
1175/// typed remote reference).
1176#[derive(Debug)]
1177pub struct ProcessHandle<A: Actor + Referable> {
1178 proc_id: reference::ProcId,
1179 addr: ChannelAddr,
1180 agent_ref: reference::ActorRef<A>,
1181}
1182
1183// Manual `Clone` to avoid requiring `A: Clone`.
1184impl<A: Actor + Referable> Clone for ProcessHandle<A> {
1185 fn clone(&self) -> Self {
1186 Self {
1187 proc_id: self.proc_id.clone(),
1188 addr: self.addr.clone(),
1189 agent_ref: self.agent_ref.clone(),
1190 }
1191 }
1192}
1193
1194#[async_trait]
1195impl<A: Actor + Referable> ProcHandle for ProcessHandle<A> {
1196 /// Agent must be both an `Actor` (runs in the proc) and a
1197 /// `Referable` (so it can be referenced via `ActorRef<A>`).
1198 type Agent = A;
1199 type TerminalStatus = ();
1200
1201 fn proc_id(&self) -> &reference::ProcId {
1202 &self.proc_id
1203 }
1204
1205 fn addr(&self) -> Option<ChannelAddr> {
1206 Some(self.addr.clone())
1207 }
1208
1209 fn agent_ref(&self) -> Option<reference::ActorRef<Self::Agent>> {
1210 Some(self.agent_ref.clone())
1211 }
1212
1213 /// Resolves immediately. `ProcessProcManager::spawn` returns this
1214 /// handle only after the child has called back with (addr,
1215 /// agent), i.e. after readiness.
1216 async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1217 Ok(())
1218 }
1219 /// Resolves immediately with `()`. This handle does not track
1220 /// child lifecycle; there is no watcher in this implementation.
1221 async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1222 Ok(())
1223 }
1224
1225 async fn terminate(
1226 &self,
1227 _cx: &impl context::Actor,
1228 _deadline: Duration,
1229 _reason: &str,
1230 ) -> Result<(), TerminateError<Self::TerminalStatus>> {
1231 Err(TerminateError::Unsupported)
1232 }
1233
1234 async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
1235 Err(TerminateError::Unsupported)
1236 }
1237}
1238
1239#[async_trait]
1240impl<A> ProcManager for ProcessProcManager<A>
1241where
1242 // Agent actor runs in the proc (`Actor`) and must be
1243 // referenceable (`Referable`).
1244 A: Actor + Referable + Sync,
1245{
1246 type Handle = ProcessHandle<A>;
1247
1248 fn transport(&self) -> ChannelTransport {
1249 ChannelTransport::Unix
1250 }
1251
1252 #[crate::instrument(fields(proc_id=proc_id.to_string(), addr=forwarder_addr.to_string()))]
1253 async fn spawn(
1254 &self,
1255 proc_id: reference::ProcId,
1256 forwarder_addr: ChannelAddr,
1257 _config: (),
1258 ) -> Result<Self::Handle, HostError> {
1259 let (callback_addr, mut callback_rx) =
1260 channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1261
1262 let mut cmd = Command::new(&self.program);
1263 cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string());
1264 cmd.env("HYPERACTOR_HOST_BACKEND_ADDR", forwarder_addr.to_string());
1265 cmd.env("HYPERACTOR_HOST_CALLBACK_ADDR", callback_addr.to_string());
1266
1267 // Lifetime strategy: mark the child with
1268 // `kill_on_drop(true)` so the OS will send SIGKILL if the
1269 // handle is dropped and retain the `Child` in
1270 // `self.children`, tying its lifetime to the manager/host.
1271 //
1272 // This is the simplest viable policy to avoid orphaned
1273 // subprocesses in CI; more sophisticated lifecycle control
1274 // (graceful shutdown, restart) will be layered on later.
1275
1276 // Kill the child when its handle is dropped.
1277 cmd.kill_on_drop(true);
1278
1279 let child = cmd
1280 .spawn()
1281 .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
1282
1283 // Retain the handle so it lives for the life of the
1284 // manager/host.
1285 {
1286 let mut children = self.children.lock().await;
1287 children.insert(proc_id.clone(), child);
1288 }
1289
1290 // Wait for the child's callback with (addr, agent_ref)
1291 let (proc_addr, agent_ref) = callback_rx.recv().await?;
1292
1293 // TODO(production): For a non-test implementation, plumb a
1294 // shutdown path:
1295 // - expose a proc-level graceful stop RPC on the agent and
1296 // implement `terminate(timeout)` by invoking it and, on
1297 // deadline, call `Child::kill()`; implement `kill()` as
1298 // immediate `Child::kill()`.
1299 // - wire an exit monitor so `wait()` resolves with a real
1300 // terminal status.
1301 Ok(ProcessHandle {
1302 proc_id,
1303 addr: proc_addr,
1304 agent_ref,
1305 })
1306 }
1307}
1308
1309impl<A> ProcessProcManager<A>
1310where
1311 // `Actor`: runs in the proc; `Referable`: referenceable via
1312 // ActorRef; `Binds<A>`: wires ports.
1313 A: Actor + Referable + Binds<A>,
1314{
1315 /// Boot a process in a ProcessProcManager<A>. Should be called from processes spawned
1316 /// by the process manager. `boot_proc` will spawn the provided actor type (with parameters)
1317 /// onto the newly created Proc, and bind its handler. This allows the user to install an agent to
1318 /// manage the proc itself.
1319 pub async fn boot_proc<S, F>(spawn: S) -> Result<Proc, HostError>
1320 where
1321 S: FnOnce(Proc) -> F,
1322 F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1323 {
1324 let proc_id: reference::ProcId = Self::parse_env("HYPERACTOR_HOST_PROC_ID")?;
1325 let backend_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_BACKEND_ADDR")?;
1326 let callback_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_CALLBACK_ADDR")?;
1327 spawn_proc(proc_id, backend_addr, callback_addr, spawn).await
1328 }
1329
1330 fn parse_env<T, E>(key: &str) -> Result<T, HostError>
1331 where
1332 T: FromStr<Err = E>,
1333 E: Into<anyhow::Error>,
1334 {
1335 std::env::var(key)
1336 .map_err(|e| HostError::MissingParameter(key.to_string(), e))?
1337 .parse()
1338 .map_err(|e: E| HostError::InvalidParameter(key.to_string(), e.into()))
1339 }
1340}
1341
1342/// Spawn a proc at `proc_id` with an `A`-typed agent actor,
1343/// forwarding messages to the provided `backend_addr`,
1344/// and returning the proc's address and agent actor on
1345/// the provided `callback_addr`.
1346#[crate::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string(), callback_addr=callback_addr.to_string()))]
1347pub async fn spawn_proc<A, S, F>(
1348 proc_id: reference::ProcId,
1349 backend_addr: ChannelAddr,
1350 callback_addr: ChannelAddr,
1351 spawn: S,
1352) -> Result<Proc, HostError>
1353where
1354 // `Actor`: runs in the proc; `Referable`: allows ActorRef<A>;
1355 // `Binds<A>`: wires ports
1356 A: Actor + Referable + Binds<A>,
1357 S: FnOnce(Proc) -> F,
1358 F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1359{
1360 let backend_transport = backend_addr.transport();
1361 let proc = Proc::configured(
1362 proc_id.clone(),
1363 MailboxClient::dial(backend_addr)?.into_boxed(),
1364 );
1365
1366 let agent_handle = spawn(proc.clone())
1367 .await
1368 .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
1369
1370 // Finally serve the proc on the same transport as the backend address,
1371 // and call back.
1372 let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?;
1373 proc.clone().serve(proc_rx);
1374 channel::dial(callback_addr)?
1375 .send((proc_addr, agent_handle.bind::<A>()))
1376 .await
1377 .map_err(ChannelError::from)?;
1378
1379 Ok(proc)
1380}
1381
1382/// Testing support for hosts. This is linked outside of cfg(test)
1383/// as it is needed by an external binary.
1384pub mod testing {
1385 use async_trait::async_trait;
1386
1387 use crate as hyperactor;
1388 use crate::Actor;
1389 use crate::Context;
1390 use crate::Handler;
1391 use crate::reference;
1392
1393 /// Just a simple actor, available in both the bootstrap binary as well as
1394 /// hyperactor tests.
1395 #[derive(Debug, Default)]
1396 #[hyperactor::export(handlers = [reference::OncePortRef<reference::ActorId>])]
1397 pub struct EchoActor;
1398
1399 impl Actor for EchoActor {}
1400
1401 #[async_trait]
1402 impl Handler<reference::OncePortRef<reference::ActorId>> for EchoActor {
1403 async fn handle(
1404 &mut self,
1405 cx: &Context<Self>,
1406 reply: reference::OncePortRef<reference::ActorId>,
1407 ) -> Result<(), anyhow::Error> {
1408 reply.send(cx, cx.self_id().clone())?;
1409 Ok(())
1410 }
1411 }
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416 use std::sync::Arc;
1417 use std::time::Duration;
1418
1419 use super::testing::EchoActor;
1420 use super::*;
1421 use crate::channel::ChannelTransport;
1422 use crate::context::Mailbox;
1423
1424 #[tokio::test]
1425 async fn test_basic() {
1426 let proc_manager =
1427 LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
1428 let procs = Arc::clone(&proc_manager.procs);
1429 let mut host = Host::new(proc_manager, ChannelAddr::any(ChannelTransport::Local))
1430 .await
1431 .unwrap();
1432
1433 let (proc_id1, _ref) = host.spawn("proc1".to_string(), ()).await.unwrap();
1434 assert_eq!(
1435 proc_id1,
1436 reference::ProcId::with_name(host.addr().clone(), "proc1")
1437 );
1438 assert!(procs.lock().await.contains_key(&proc_id1));
1439
1440 let (proc_id2, _ref) = host.spawn("proc2".to_string(), ()).await.unwrap();
1441 assert!(procs.lock().await.contains_key(&proc_id2));
1442
1443 let proc1 = procs.lock().await.get(&proc_id1).unwrap().clone();
1444 let proc2 = procs.lock().await.get(&proc_id2).unwrap().clone();
1445
1446 // Make sure they can talk to each other:
1447 let (instance1, _handle) = proc1.instance("client").unwrap();
1448 let (instance2, _handle) = proc2.instance("client").unwrap();
1449
1450 let (port, mut rx) = instance1.mailbox().open_port();
1451
1452 port.bind().send(&instance2, "hello".to_string()).unwrap();
1453 assert_eq!(rx.recv().await.unwrap(), "hello".to_string());
1454
1455 // Make sure that the system proc is also wired in correctly.
1456 let (system_actor, _handle) = host.system_proc().instance("test").unwrap();
1457
1458 // system->proc
1459 port.bind()
1460 .send(&system_actor, "hello from the system proc".to_string())
1461 .unwrap();
1462 assert_eq!(
1463 rx.recv().await.unwrap(),
1464 "hello from the system proc".to_string()
1465 );
1466
1467 // system->system
1468 let (port, mut rx) = system_actor.mailbox().open_port();
1469 port.bind()
1470 .send(&system_actor, "hello from the system".to_string())
1471 .unwrap();
1472 assert_eq!(
1473 rx.recv().await.unwrap(),
1474 "hello from the system".to_string()
1475 );
1476
1477 // proc->system
1478 port.bind()
1479 .send(&instance1, "hello from the instance1".to_string())
1480 .unwrap();
1481 assert_eq!(
1482 rx.recv().await.unwrap(),
1483 "hello from the instance1".to_string()
1484 );
1485 }
1486
1487 #[tokio::test]
1488 // TODO: OSS: called `Result::unwrap()` on an `Err` value: ReadFailed { manifest_path: "/meta-pytorch/monarch/target/debug/deps/hyperactor-0e1fe83af739d976.resources.json", source: Os { code: 2, kind: NotFound, message: "No such file or directory" } }
1489 #[cfg_attr(not(fbcode_build), ignore)]
1490 async fn test_process_proc_manager() {
1491 hyperactor_telemetry::initialize_logging(hyperactor_telemetry::DefaultTelemetryClock {});
1492
1493 // EchoActor is "host_agent" used to test connectivity.
1494 let process_manager = ProcessProcManager::<EchoActor>::new(
1495 buck_resources::get("monarch/hyperactor/bootstrap").unwrap(),
1496 );
1497 let mut host = Host::new(process_manager, ChannelAddr::any(ChannelTransport::Unix))
1498 .await
1499 .unwrap();
1500
1501 // Manually serve this: the agent isn't actually doing anything in this case,
1502 // but we are testing connectivity.
1503 host.serve();
1504
1505 // (1) Spawn and check invariants.
1506 assert!(matches!(host.addr().transport(), ChannelTransport::Unix));
1507 let (proc1, echo1) = host.spawn("proc1".to_string(), ()).await.unwrap();
1508 let (proc2, echo2) = host.spawn("proc2".to_string(), ()).await.unwrap();
1509 assert_eq!(echo1.actor_id().proc_id(), &proc1);
1510 assert_eq!(echo2.actor_id().proc_id(), &proc2);
1511
1512 // (2) Duplicate name rejection.
1513 let dup = host.spawn("proc1".to_string(), ()).await;
1514 assert!(matches!(dup, Err(HostError::ProcExists(_))));
1515
1516 // (3) Create a standalone client proc and verify echo1 agent responds.
1517 // Request: client proc -> host frontend/router -> echo1 (proc1).
1518 // Reply: echo1 (proc1) -> host backend -> host router -> client port.
1519 // This confirms that an external proc (created via
1520 // `Proc::direct`) can address a child proc through the host,
1521 // and receive a correct reply.
1522 let client = Proc::direct(
1523 ChannelAddr::any(host.addr().transport()),
1524 "test".to_string(),
1525 )
1526 .unwrap();
1527 let (client_inst, _h) = client.instance("test").unwrap();
1528 let (port, rx) = client_inst.mailbox().open_once_port();
1529 echo1.send(&client_inst, port.bind()).unwrap();
1530 let id = tokio::time::timeout(Duration::from_secs(5), rx.recv())
1531 .await
1532 .unwrap()
1533 .unwrap();
1534 assert_eq!(id, *echo1.actor_id());
1535
1536 // (4) Child <-> external client request -> reply:
1537 // Request: client proc (standalone via `Proc::direct`) ->
1538 // host frontend/router -> echo2 (proc2).
1539 // Reply: echo2 (proc2) -> host backend -> host router ->
1540 // client port (standalone proc).
1541 // This exercises cross-proc routing between a child and an
1542 // external client under the same host.
1543 let (port2, rx2) = client_inst.mailbox().open_once_port();
1544 echo2.send(&client_inst, port2.bind()).unwrap();
1545 let id2 = tokio::time::timeout(Duration::from_secs(5), rx2.recv())
1546 .await
1547 .unwrap()
1548 .unwrap();
1549 assert_eq!(id2, *echo2.actor_id());
1550
1551 // (5) System -> child request -> cross-proc reply:
1552 // Request: system proc -> host router (frontend) -> echo1
1553 // (proc1, child).
1554 // Reply: echo1 (proc1) -> proc1 forwarder -> host backend ->
1555 // host router -> client proc direct addr (Proc::direct) ->
1556 // client port.
1557 // Because `client_inst` runs in its own proc, the reply
1558 // traverses the host (not local delivery within proc1).
1559 let (sys_inst, _h) = host.system_proc().instance("sys-client").unwrap();
1560 let (port3, rx3) = client_inst.mailbox().open_once_port();
1561 // Send from system -> child via a message that ultimately
1562 // replies to client's port
1563 echo1.send(&sys_inst, port3.bind()).unwrap();
1564 let id3 = tokio::time::timeout(Duration::from_secs(5), rx3.recv())
1565 .await
1566 .unwrap()
1567 .unwrap();
1568 assert_eq!(id3, *echo1.actor_id());
1569 }
1570
1571 #[tokio::test]
1572 async fn local_ready_and_wait_are_immediate() {
1573 // Build a LocalHandle directly.
1574 let addr = ChannelAddr::any(ChannelTransport::Local);
1575 let proc_id = reference::ProcId::with_name(addr.clone(), "p");
1576 let agent_ref = reference::ActorRef::<()>::attest(proc_id.actor_id("host_agent", 0));
1577 let h = LocalHandle::<()> {
1578 proc_id,
1579 addr,
1580 agent_ref,
1581 procs: Arc::new(Mutex::new(HashMap::new())),
1582 };
1583
1584 // ready() resolves immediately
1585 assert!(h.ready().await.is_ok());
1586
1587 // wait() resolves immediately with unit TerminalStatus
1588 assert!(h.wait().await.is_ok());
1589
1590 // Multiple concurrent waiters both succeed
1591 let (r1, r2) = tokio::join!(h.ready(), h.ready());
1592 assert!(r1.is_ok() && r2.is_ok());
1593 }
1594
1595 // --
1596 // Fixtures for `host::spawn` tests.
1597
1598 #[derive(Debug, Clone, Copy)]
1599 enum ReadyMode {
1600 OkAfter(Duration),
1601 ErrTerminal,
1602 ErrChannelClosed,
1603 }
1604
1605 #[derive(Debug, Clone)]
1606 struct TestHandle {
1607 id: reference::ProcId,
1608 addr: ChannelAddr,
1609 agent: reference::ActorRef<()>,
1610 mode: ReadyMode,
1611 omit_addr: bool,
1612 omit_agent: bool,
1613 }
1614
1615 #[async_trait::async_trait]
1616 impl ProcHandle for TestHandle {
1617 type Agent = ();
1618 type TerminalStatus = ();
1619
1620 fn proc_id(&self) -> &reference::ProcId {
1621 &self.id
1622 }
1623
1624 fn addr(&self) -> Option<ChannelAddr> {
1625 if self.omit_addr {
1626 None
1627 } else {
1628 Some(self.addr.clone())
1629 }
1630 }
1631
1632 fn agent_ref(&self) -> Option<reference::ActorRef<Self::Agent>> {
1633 if self.omit_agent {
1634 None
1635 } else {
1636 Some(self.agent.clone())
1637 }
1638 }
1639
1640 async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1641 match self.mode {
1642 ReadyMode::OkAfter(d) => {
1643 if !d.is_zero() {
1644 tokio::time::sleep(d).await;
1645 }
1646 Ok(())
1647 }
1648 ReadyMode::ErrTerminal => Err(ReadyError::Terminal(())),
1649 ReadyMode::ErrChannelClosed => Err(ReadyError::ChannelClosed),
1650 }
1651 }
1652 async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1653 Ok(())
1654 }
1655 async fn terminate(
1656 &self,
1657 _cx: &impl context::Actor,
1658 _timeout: Duration,
1659 _reason: &str,
1660 ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
1661 Err(TerminateError::Unsupported)
1662 }
1663 async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
1664 Err(TerminateError::Unsupported)
1665 }
1666 }
1667
1668 #[derive(Debug, Clone)]
1669 struct TestManager {
1670 mode: ReadyMode,
1671 omit_addr: bool,
1672 omit_agent: bool,
1673 transport: ChannelTransport,
1674 }
1675
1676 impl TestManager {
1677 fn local(mode: ReadyMode) -> Self {
1678 Self {
1679 mode,
1680 omit_addr: false,
1681 omit_agent: false,
1682 transport: ChannelTransport::Local,
1683 }
1684 }
1685 fn with_omissions(mut self, addr: bool, agent: bool) -> Self {
1686 self.omit_addr = addr;
1687 self.omit_agent = agent;
1688 self
1689 }
1690 }
1691
1692 #[async_trait::async_trait]
1693 impl ProcManager for TestManager {
1694 type Handle = TestHandle;
1695
1696 fn transport(&self) -> ChannelTransport {
1697 self.transport.clone()
1698 }
1699
1700 async fn spawn(
1701 &self,
1702 proc_id: reference::ProcId,
1703 forwarder_addr: ChannelAddr,
1704 _config: (),
1705 ) -> Result<Self::Handle, HostError> {
1706 let agent = reference::ActorRef::<()>::attest(proc_id.actor_id("host_agent", 0));
1707 Ok(TestHandle {
1708 id: proc_id,
1709 addr: forwarder_addr,
1710 agent,
1711 mode: self.mode,
1712 omit_addr: self.omit_addr,
1713 omit_agent: self.omit_agent,
1714 })
1715 }
1716 }
1717
1718 #[tokio::test]
1719 async fn host_spawn_times_out_when_configured() {
1720 let cfg = hyperactor_config::global::lock();
1721 let _g = cfg.override_key(
1722 crate::config::HOST_SPAWN_READY_TIMEOUT,
1723 Duration::from_millis(10),
1724 );
1725
1726 let mut host = Host::new(
1727 TestManager::local(ReadyMode::OkAfter(Duration::from_millis(50))),
1728 ChannelAddr::any(ChannelTransport::Local),
1729 )
1730 .await
1731 .unwrap();
1732
1733 let err = host.spawn("t".into(), ()).await.expect_err("must time out");
1734 assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1735 }
1736
1737 #[tokio::test]
1738 async fn host_spawn_timeout_zero_disables_and_succeeds() {
1739 let cfg = hyperactor_config::global::lock();
1740 let _g = cfg.override_key(
1741 crate::config::HOST_SPAWN_READY_TIMEOUT,
1742 Duration::from_secs(0),
1743 );
1744
1745 let mut host = Host::new(
1746 TestManager::local(ReadyMode::OkAfter(Duration::from_millis(20))),
1747 ChannelAddr::any(ChannelTransport::Local),
1748 )
1749 .await
1750 .unwrap();
1751
1752 let (pid, agent) = host.spawn("ok".into(), ()).await.expect("must succeed");
1753 assert_eq!(agent.actor_id().proc_id(), &pid);
1754 assert!(host.procs.contains("ok"));
1755 }
1756
1757 #[tokio::test]
1758 async fn host_spawn_maps_channel_closed_ready_error_to_config_failure() {
1759 let mut host = Host::new(
1760 TestManager::local(ReadyMode::ErrChannelClosed),
1761 ChannelAddr::any(ChannelTransport::Local),
1762 )
1763 .await
1764 .unwrap();
1765
1766 let err = host.spawn("p".into(), ()).await.expect_err("must fail");
1767 assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1768 }
1769
1770 #[tokio::test]
1771 async fn host_spawn_maps_terminal_ready_error_to_config_failure() {
1772 let mut host = Host::new(
1773 TestManager::local(ReadyMode::ErrTerminal),
1774 ChannelAddr::any(ChannelTransport::Local),
1775 )
1776 .await
1777 .unwrap();
1778
1779 let err = host.spawn("p".into(), ()).await.expect_err("must fail");
1780 assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1781 }
1782
1783 #[tokio::test]
1784 async fn host_spawn_fails_if_ready_but_missing_addr() {
1785 let mut host = Host::new(
1786 TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(true, false),
1787 ChannelAddr::any(ChannelTransport::Local),
1788 )
1789 .await
1790 .unwrap();
1791
1792 let err = host
1793 .spawn("no-addr".into(), ())
1794 .await
1795 .expect_err("must fail");
1796 assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1797 }
1798
1799 #[tokio::test]
1800 async fn host_spawn_fails_if_ready_but_missing_agent() {
1801 let mut host = Host::new(
1802 TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(false, true),
1803 ChannelAddr::any(ChannelTransport::Local),
1804 )
1805 .await
1806 .unwrap();
1807
1808 let err = host
1809 .spawn("no-agent".into(), ())
1810 .await
1811 .expect_err("must fail");
1812 assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
1813 }
1814}