Skip to main content

hyperactor_mesh/
host.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module defines [`Host`], which represents all the procs running on a host.
10//! The procs themselves are managed by an implementation of [`ProcManager`], which may,
11//! for example, fork new processes for each proc, or spawn them in the same process
12//! for testing purposes.
13//!
14//! The primary purpose of a host is to manage the lifecycle of these procs, and to
15//! serve as a single front-end for all the procs on a host, multiplexing network
16//! channels.
17//!
18//! ## Channel muxing
19//!
20//! A [`Host`] maintains a single frontend address, through which all procs are accessible
21//! through direct addressing: the id of each proc is the `ProcId(frontend_addr, proc_name)`.
22//! In the following, the frontend address is denoted by `*`. The host listens on `*` and
23//! multiplexes messages based on the proc name. When spawning procs, the host maintains
24//! backend channels with separate addresses. In the diagram `#` is the backend address of
25//! the host, while `#n` is the backend address for proc *n*. The host forwards messages
26//! to the appropriate backend channel, while procs forward messages to the host backend
27//! channel at `#`.
28//!
29//! ```text
30//!                      ┌────────────┐
31//!                  ┌───▶  proc *,1  │
32//!                  │ #1└────────────┘
33//!                  │
34//!  ┌──────────┐    │   ┌────────────┐
35//!  │   Host   │◀───┼───▶  proc *,2  │
36//! *└──────────┘#   │ #2└────────────┘
37//!                  │
38//!                  │   ┌────────────┐
39//!                  └───▶  proc *,3  │
40//!                    #3└────────────┘
41//! ```
42//!
43//! ## Local proc invariant (LP-*)
44//!
45//! - **LP-1 (lazy activation):** The local proc always exists as a
46//!   `ProcId::Direct(addr, LOCAL_PROC_NAME)` and is forwarded
47//!   in-process by the host's mailbox muxer. However it starts with
48//!   zero actors. A `ProcAgent` and root client actor are added only
49//!   when `HostMeshAgent::handle(GetLocalProc)` is first called.
50
51use std::collections::HashMap;
52use std::collections::HashSet;
53use std::fmt;
54use std::marker::PhantomData;
55use std::str::FromStr;
56use std::sync::Arc;
57use std::time::Duration;
58
59use async_trait::async_trait;
60use futures::Future;
61use futures::StreamExt;
62use futures::stream;
63use hyperactor::Actor;
64use hyperactor::ActorAddr;
65use hyperactor::ActorHandle;
66use hyperactor::ActorRef;
67use hyperactor::Addr;
68use hyperactor::AttachRequest;
69use hyperactor::BootstrapAssignment;
70use hyperactor::Host2Client;
71use hyperactor::PortHandle;
72use hyperactor::Proc;
73use hyperactor::ProcAddr;
74use hyperactor::actor::Binds;
75use hyperactor::actor::Referable;
76use hyperactor::channel;
77use hyperactor::channel::ChannelAddr;
78use hyperactor::channel::ChannelError;
79use hyperactor::channel::ChannelRx;
80use hyperactor::channel::ChannelTransport;
81use hyperactor::channel::Rx;
82use hyperactor::channel::ServerError;
83use hyperactor::channel::Tx;
84use hyperactor::context;
85use hyperactor::mailbox::BoxableMailboxSender;
86use hyperactor::mailbox::BoxedMailboxSender;
87use hyperactor::mailbox::DialMailboxRouter;
88use hyperactor::mailbox::IntoBoxedMailboxSender as _;
89use hyperactor::mailbox::MailboxClient;
90use hyperactor::mailbox::MailboxRouter;
91use hyperactor::mailbox::MailboxSender;
92use hyperactor::mailbox::MailboxServer;
93use hyperactor::mailbox::MailboxServerError;
94use hyperactor::mailbox::MailboxServerHandle;
95use hyperactor::mailbox::MessageEnvelope;
96use hyperactor::mailbox::Undeliverable;
97/// Name of the local client proc on a host.
98///
99/// See LP-1 (lazy activation) in module doc.
100///
101/// In pure-Rust programs (e.g. sieve, dining_philosophers)
102/// `GetLocalProc` is never sent, so the local proc remains empty
103/// throughout the program's lifetime. Code that inspects the local
104/// proc's actors must not assume they exist.
105pub use hyperactor::proc::LEGACY_LOCAL_PROC_NAME as LOCAL_PROC_NAME;
106/// Name of the system service proc on a host.
107///
108/// Hosts the admin actor layer: HostMeshAgent, MeshAdminAgent, and bridge.
109pub use hyperactor::proc::LEGACY_SERVICE_PROC_NAME as SERVICE_PROC_NAME;
110use tokio::process::Child;
111use tokio::process::Command;
112use tokio::sync::Mutex;
113use tokio::sync::watch;
114use tokio::task::JoinSet;
115
116use crate::mesh_id::ResourceId;
117
118/// [`MailboxSender`] adapter that wraps outbound [`MessageEnvelope`]s
119/// in [`Host2Client::Envelope`] before posting to a
120/// [`DuplexTx<Host2Client>`]. Used on the host side to send messages
121/// to an attached remote proc.
122#[derive(Clone)]
123struct AttachSender(channel::duplex::DuplexTx<Host2Client>);
124
125#[async_trait]
126impl MailboxSender for AttachSender {
127    fn post_unchecked(
128        &self,
129        envelope: MessageEnvelope,
130        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
131    ) {
132        self.0.post(Host2Client::Envelope(envelope));
133    }
134}
135
136/// The type of error produced by host operations.
137#[derive(Debug, thiserror::Error)]
138pub enum HostError {
139    /// A channel error occurred during a host operation.
140    #[error(transparent)]
141    ChannelError(#[from] ChannelError),
142
143    /// A duplex server error occurred during a host operation.
144    #[error(transparent)]
145    ServerError(#[from] ServerError),
146
147    /// [`Host::serve`] was called more than once.
148    #[error("host is already serving")]
149    AlreadyServing,
150
151    /// The named proc already exists and cannot be spawned.
152    #[error("proc '{0}' already exists")]
153    ProcExists(String),
154
155    /// Failures occuring while spawning a subprocess.
156    #[error("proc '{0}' (command: {1}) failed to spawn process: {2}")]
157    ProcessSpawnFailure(ProcAddr, String, #[source] std::io::Error),
158
159    /// Failures occuring while configuring a subprocess.
160    #[error("proc '{0}' failed to configure process: {1}")]
161    ProcessConfigurationFailure(ProcAddr, #[source] anyhow::Error),
162
163    /// Failures occuring while spawning a management actor in a proc.
164    #[error("failed to spawn agent on proc '{0}': {1}")]
165    AgentSpawnFailure(ProcAddr, #[source] anyhow::Error),
166
167    /// An input parameter was missing.
168    #[error("parameter '{0}' missing: {1}")]
169    MissingParameter(String, std::env::VarError),
170
171    /// An input parameter was invalid.
172    #[error("parameter '{0}' invalid: {1}")]
173    InvalidParameter(String, anyhow::Error),
174}
175
176/// A host, managing the lifecycle of several procs, and their backend
177/// routing, as described in this module's documentation.
178pub struct Host<M> {
179    procs: HashSet<String>,
180    frontend_addr: ChannelAddr,
181    backend_addr: ChannelAddr,
182    /// Routes messages to known procs (local, attached) by prefix.
183    router: MailboxRouter,
184    /// Addr-based routing for dialed connections (child procs,
185    /// remote hosts); used as the fallback when the prefix router
186    /// has no match.
187    dial_router: DialMailboxRouter,
188    manager: M,
189    service_proc: Proc,
190    local_proc: Proc,
191    /// The frontend accept state. Consumed by [`Host::serve`].
192    frontend: Option<Frontend>,
193}
194
195/// The frontend server that accepts inbound messages on the host's
196/// frontend address. The duplex variant additionally supports remote
197/// procs attaching via [`Proc::attach_to_host`]; the simplex variant
198/// is used when the transport cannot carry the duplex wire protocol
199/// (see [`ChannelTransport::supports_duplex`]).
200enum Frontend {
201    /// Duplex server for transports that support bidirectional links.
202    /// Accepts both attach connections and regular inbound message
203    /// connections.
204    Duplex(channel::duplex::DuplexServer<MessageEnvelope, Host2Client>),
205    /// Simplex receiver for transports that do not support the duplex
206    /// wire protocol (currently only [`ChannelTransport::Local`]).
207    Simplex(ChannelRx<MessageEnvelope>),
208}
209
210impl<M: ProcManager> Host<M> {
211    /// Serve a host using the provided ProcManager, on the provided `addr`.
212    /// On success, the host will multiplex messages for procs on the host
213    /// on the address of the host.
214    pub async fn new(manager: M, addr: ChannelAddr) -> Result<Self, HostError> {
215        Self::new_with_default(manager, addr, None, None).await
216    }
217
218    /// Like [`new`], serves a host using the provided ProcManager, on the provided `addr`.
219    /// Unknown destinations are forwarded to the default sender.
220    /// When `listener` is `Some`, it is used as the frontend listening socket
221    /// instead of binding a new one.
222    #[hyperactor::instrument(fields(addr=addr.to_string()))]
223    pub async fn new_with_default(
224        manager: M,
225        addr: ChannelAddr,
226        default_sender: Option<BoxedMailboxSender>,
227        listener: Option<std::net::TcpListener>,
228    ) -> Result<Self, HostError> {
229        // Transports that cannot carry the duplex byte-stream protocol
230        // (currently only `Local`) have no way to support attach and
231        // fall back to a simplex channel.
232        let (frontend_addr, frontend) = if addr.transport().supports_duplex() {
233            let server = channel::duplex::serve::<MessageEnvelope, Host2Client>(addr, listener)?;
234            let frontend_addr = server.addr().clone();
235            (frontend_addr, Frontend::Duplex(server))
236        } else {
237            let (frontend_addr, frontend_rx) = channel::serve_with_listener(addr, listener)?;
238            (frontend_addr, Frontend::Simplex(frontend_rx))
239        };
240        // We set up a cascade of routers: first, the outer router supports
241        // sending to the the system proc, while the dial router manages dialed
242        // connections.
243        let dial_router = match default_sender {
244            Some(d) => DialMailboxRouter::new_with_default(d),
245            None => DialMailboxRouter::new(),
246        };
247        let router = MailboxRouter::new();
248
249        // Establish a backend channel on the preferred transport.
250        let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?;
251
252        // Set up a system proc. This is often used to manage the host itself.
253        // These use with_name (not unique) because their uniqueness is
254        // guaranteed by the ChannelAddr component, and the Name type's
255        // '-' delimiter must not collide with a hash suffix.
256        let combined = router.fallback(dial_router.boxed());
257        let service_proc =
258            Proc::legacy_service_pseudo_singleton(frontend_addr.clone(), combined.clone());
259        let local_proc = Proc::legacy_local_pseudo_singleton(frontend_addr.clone(), combined);
260        let service_proc_id = service_proc.proc_addr().clone();
261        let local_proc_id = local_proc.proc_addr().clone();
262
263        // Register the local procs' muxers so the router delivers to
264        // them without dialing. We bind the muxer (not the Proc) to
265        // avoid a flush cycle: Proc.forwarder → MailboxRouter → Proc →
266        // Proc.forwarder → …
267        router.bind(
268            Addr::from(service_proc_id.clone()),
269            service_proc.muxer().clone(),
270        );
271        router.bind(
272            Addr::from(local_proc_id.clone()),
273            local_proc.muxer().clone(),
274        );
275
276        tracing::info!(
277            frontend_addr = frontend_addr.to_string(),
278            backend_addr = backend_addr.to_string(),
279            service_proc_id = service_proc_id.to_string(),
280            local_proc_id = local_proc_id.to_string(),
281            "serving host"
282        );
283
284        let host = Host {
285            procs: HashSet::new(),
286            frontend_addr,
287            backend_addr,
288            router,
289            dial_router,
290            manager,
291            service_proc,
292            local_proc,
293            frontend: Some(frontend),
294        };
295
296        // Serve the same router on the backend address. We don't ever need
297        // to join this handle because the server is used only to receive
298        // messages from procs spawned by this host -- if the host is shutting down,
299        // then all its procs should have shut down first, and we don't have to worry
300        // about any unacked messages.
301        let _backend_handle = host.forwarder().serve(backend_rx);
302
303        Ok(host)
304    }
305
306    /// Start serving the frontend accept loop.
307    ///
308    /// Returns a [`MailboxServerHandle`] on first invocation and
309    /// [`HostError::AlreadyServing`] on subsequent invocations.
310    /// Callers should retain the handle and join it as part of
311    /// orderly shutdown so pending messages flush correctly:
312    /// `stop(reason)` signals the accept loop, which cancels
313    /// per-connection tasks and waits for them before the handle
314    /// resolves.
315    pub fn serve(&mut self) -> Result<MailboxServerHandle, HostError> {
316        let frontend = self.frontend.take().ok_or(HostError::AlreadyServing)?;
317        let forwarder = self.forwarder();
318        Ok(match frontend {
319            Frontend::Duplex(server) => spawn_duplex_accept_loop(
320                server,
321                self.frontend_addr.clone(),
322                self.router.clone(),
323                forwarder,
324            ),
325            Frontend::Simplex(rx) => forwarder.serve(rx),
326        })
327    }
328
329    /// The underlying proc manager.
330    pub fn manager(&self) -> &M {
331        &self.manager
332    }
333
334    /// The address which accepts messages destined for this host.
335    pub fn addr(&self) -> &ChannelAddr {
336        &self.frontend_addr
337    }
338
339    /// The system proc associated with this host.
340    /// This is used to run host-level system services like host managers.
341    pub fn system_proc(&self) -> &Proc {
342        &self.service_proc
343    }
344
345    /// The local proc associated with this host (`LOCAL_PROC_NAME`).
346    ///
347    /// Starts with zero actors; see invariant LP-1 on
348    /// [`LOCAL_PROC_NAME`] for activation semantics.
349    pub fn local_proc(&self) -> &Proc {
350        &self.local_proc
351    }
352
353    /// Spawn a new process with the given `name`. On success, the
354    /// proc has been spawned, and is reachable through the returned,
355    /// direct-addressed ProcId, which will be
356    /// `ProcId(self.addr(), name)`.
357    pub async fn spawn(
358        &mut self,
359        name: String,
360        config: M::Config,
361    ) -> Result<(ProcAddr, ActorRef<ManagerAgent<M>>), HostError> {
362        if self.procs.contains(&name) {
363            return Err(HostError::ProcExists(name));
364        }
365
366        let proc_id = ResourceId::proc_addr_from_name(self.frontend_addr.clone(), &name);
367        let handle = self
368            .manager
369            .spawn(proc_id.clone(), self.backend_addr.clone(), config)
370            .await?;
371
372        // Await readiness (config-driven; 0s disables timeout).
373        let to: Duration =
374            hyperactor_config::global::get(hyperactor::config::HOST_SPAWN_READY_TIMEOUT);
375        let ready = if to == Duration::from_secs(0) {
376            ReadyProc::ensure(&handle).await
377        } else {
378            match tokio::time::timeout(to, ReadyProc::ensure(&handle)).await {
379                Ok(result) => result,
380                Err(_elapsed) => Err(ReadyProcError::Timeout),
381            }
382        }
383        .map_err(|e| {
384            HostError::ProcessConfigurationFailure(proc_id.clone(), anyhow::anyhow!("{e:?}"))
385        })?;
386
387        self.dial_router
388            .bind(Addr::from(proc_id.clone()), ready.addr().clone());
389        self.procs.insert(name.clone());
390
391        Ok((proc_id, ready.agent_ref().clone()))
392    }
393
394    /// A [`MailboxSender`] that first consults the prefix router (for
395    /// local and attached procs) and then falls back to the
396    /// address-based dial router (for child procs and remote hosts).
397    fn forwarder(&self) -> BoxedMailboxSender {
398        self.router.fallback(self.dial_router.boxed())
399    }
400}
401
402/// Spawn the duplex accept loop and wrap it in a
403/// [`MailboxServerHandle`] so callers can stop and join it as part of
404/// orderly shutdown. The stop signal is observed directly by the
405/// accept loop and its per-connection tasks.
406fn spawn_duplex_accept_loop(
407    server: channel::duplex::DuplexServer<MessageEnvelope, Host2Client>,
408    frontend_addr: ChannelAddr,
409    router: MailboxRouter,
410    forwarder: BoxedMailboxSender,
411) -> MailboxServerHandle {
412    let (stopped_tx, stopped_rx) = watch::channel(false);
413    let join_handle = tokio::spawn(async move {
414        duplex_accept_loop(server, frontend_addr, router, forwarder, stopped_rx).await;
415        Ok::<(), MailboxServerError>(())
416    });
417    MailboxServerHandle::from_parts(join_handle, stopped_tx)
418}
419
420/// Wait until `stopped_rx` observes a `true` value, then return. If
421/// the sender is dropped without ever sending `true`, pend forever —
422/// the surrounding `tokio::select!` should only fire on an explicit
423/// stop, not on silent teardown of the handle.
424async fn wait_for_stop(mut stopped_rx: watch::Receiver<bool>) {
425    let ok = stopped_rx.wait_for(|stopped| *stopped).await.is_ok();
426    if !ok {
427        std::future::pending::<()>().await;
428    }
429}
430
431/// [`Rx<MessageEnvelope>`] adapter that yields a single pre-read
432/// envelope before delegating to an inner receiver. Used to re-inject
433/// the first message consumed during connection-type dispatch.
434struct PrependRx<R> {
435    first: Option<MessageEnvelope>,
436    inner: R,
437}
438
439#[async_trait]
440impl<R: channel::Rx<MessageEnvelope> + Send> channel::Rx<MessageEnvelope> for PrependRx<R> {
441    async fn recv(&mut self) -> Result<MessageEnvelope, ChannelError> {
442        if let Some(msg) = self.first.take() {
443            return Ok(msg);
444        }
445        self.inner.recv().await
446    }
447
448    fn addr(&self) -> ChannelAddr {
449        self.inner.addr()
450    }
451
452    async fn join(self) {
453        self.inner.join().await
454    }
455}
456
457/// Accept loop for the host's frontend duplex server.
458///
459/// Each accepted connection is dispatched based on its first message:
460///
461/// - **[`AttachRequest`]**: the client wants to attach as a remote
462///   proc. The host assigns a [`ProcId`], sends a
463///   [`BootstrapAssignment`], and establishes bidirectional routing.
464/// - **Regular [`MessageEnvelope`]**: a normal inbound connection
465///   (e.g., from another host or proc). Messages are routed through
466///   the forwarder. The outbound (tag 0x01) channel is unused.
467///
468/// The attach protocol proceeds as follows:
469///
470/// 1. The remote proc dials the host address.
471/// 2. The host accepts the connection and reads the first message.
472/// 3. The host assigns a unique [`ProcId`] of the form `remote_<uid>`
473///    and sends a [`BootstrapAssignment`] back on the channel.
474/// 4. The host registers the duplex sender in the router so that
475///    outbound messages addressed to the remote proc are forwarded
476///    over the duplex channel.
477/// 5. A per-connection task reads inbound messages from the duplex
478///    channel and routes them through the host's router. Undeliverable
479///    messages are bounced back to the original sender.
480///
481/// When a connection closes or the stop signal fires, the route
482/// entry is removed. All per-connection tasks are joined before this
483/// function returns.
484///
485/// TODO: see [`AttachRequest`] — the attach/simplex discrimination
486/// currently happens by attempting to deserialize the first envelope
487/// as [`AttachRequest`]. A cleaner design, suggested during review,
488/// would be to surface the distinction at the link layer rather than
489/// peek at application-level payloads.
490async fn duplex_accept_loop(
491    mut duplex_server: channel::duplex::DuplexServer<MessageEnvelope, Host2Client>,
492    frontend_addr: ChannelAddr,
493    router: MailboxRouter,
494    forwarder: BoxedMailboxSender,
495    stopped_rx: watch::Receiver<bool>,
496) {
497    let mut tasks = JoinSet::new();
498    loop {
499        let accept = tokio::select! {
500            result = duplex_server.accept() => result,
501            () = wait_for_stop(stopped_rx.clone()) => break,
502        };
503        let (mut duplex_rx, duplex_tx) = match accept {
504            Ok(pair) => pair,
505            Err(e) => {
506                tracing::info!(
507                    frontend_addr = frontend_addr.to_string(),
508                    error = %e,
509                    "duplex accept loop ended"
510                );
511                break;
512            }
513        };
514
515        // Read the first message to determine connection type.
516        let first_msg = match duplex_rx.recv().await {
517            Ok(msg) => msg,
518            Err(e) => {
519                tracing::info!(error = %e, "duplex connection closed before first message");
520                continue;
521            }
522        };
523
524        let is_attach = first_msg.deserialized::<AttachRequest>().is_ok();
525
526        if is_attach {
527            // Attach protocol: assign an identity and set up
528            // bidirectional routing. Use a fresh random uid under the
529            // `remote` label so procs attached to different host
530            // generations sharing a frontend address (e.g., after
531            // restart on the same ip:port) cannot collide.
532            let proc_id = ProcAddr::instance(frontend_addr.clone(), "remote");
533
534            let assignment = BootstrapAssignment {
535                proc_id: proc_id.clone(),
536            };
537            tracing::info!(
538                proc_id = proc_id.to_string(),
539                "duplex accepted attach connection"
540            );
541            duplex_tx.post(Host2Client::Bootstrap(assignment));
542
543            router.bind(Addr::from(proc_id.clone()), AttachSender(duplex_tx));
544
545            let mut handle = forwarder.clone().serve(duplex_rx);
546            let cleanup_router = router.clone();
547            let conn_stop = stopped_rx.clone();
548            tasks.spawn(async move {
549                tokio::select! {
550                    _ = &mut handle => {}
551                    () = wait_for_stop(conn_stop) => {
552                        handle.stop("host duplex cancel");
553                        let _ = handle.await;
554                    }
555                }
556                cleanup_router.unbind(&Addr::from(proc_id.clone()));
557                tracing::info!(
558                    proc_id = proc_id.to_string(),
559                    "attach connection closed, removed route"
560                );
561            });
562        } else {
563            // Regular inbound connection: route messages, no
564            // outbound tag-0x01 traffic. The DuplexTx is held for
565            // the lifetime of the connection: dropping it closes
566            // the session's outbound channel, which causes the
567            // session task to exit and the inbound receiver to
568            // close after a single message.
569            let fwd = forwarder.clone();
570            let conn_stop = stopped_rx.clone();
571            tasks.spawn(async move {
572                let _keep_alive = duplex_tx;
573                let rx = PrependRx {
574                    first: Some(first_msg),
575                    inner: duplex_rx,
576                };
577                let mut handle = fwd.serve(rx);
578                tokio::select! {
579                    _ = &mut handle => {}
580                    () = wait_for_stop(conn_stop) => {
581                        handle.stop("host frontend cancel");
582                        let _ = handle.await;
583                    }
584                }
585            });
586        }
587    }
588
589    while tasks.join_next().await.is_some() {}
590
591    // Drain the duplex server's listener task so every in-flight
592    // dispatch (one per session) finishes its terminal cleanup —
593    // final ack flush + `Closed` emit — before the host exits.
594    // Without this, simply dropping `duplex_server` would detach the
595    // listener and skip the flush, surfacing as undeliverable
596    // message errors on the peer's send-side after `process::exit`.
597    duplex_server.join().await;
598}
599
600/// Error returned by [`ProcHandle::ready`].
601#[derive(Debug, Clone)]
602pub enum ReadyError<TerminalStatus> {
603    /// The proc reached a terminal state before becoming Ready.
604    Terminal(TerminalStatus),
605    /// Implementation lost its status channel / cannot observe state.
606    ChannelClosed,
607}
608
609/// Error returned by [`ready_proc`].
610#[derive(Debug, Clone)]
611pub enum ReadyProcError<TerminalStatus> {
612    /// Timed out waiting for ready.
613    Timeout,
614    /// The underlying `ready()` call failed.
615    Ready(ReadyError<TerminalStatus>),
616    /// The handle's `addr()` returned `None` after `ready()` succeeded.
617    MissingAddr,
618    /// The handle's `agent_ref()` returned `None` after `ready()`
619    /// succeeded.
620    MissingAgentRef,
621}
622
623impl<T> From<ReadyError<T>> for ReadyProcError<T> {
624    fn from(e: ReadyError<T>) -> Self {
625        ReadyProcError::Ready(e)
626    }
627}
628
629/// Error returned by [`ProcHandle::wait`].
630#[derive(Debug, Clone)]
631pub enum WaitError {
632    /// Implementation lost its status channel / cannot observe state.
633    ChannelClosed,
634}
635
636/// Error returned by [`ProcHandle::terminate`] and
637/// [`ProcHandle::kill`].
638///
639/// - `Unsupported`: the manager cannot perform the requested proc
640///   signaling (e.g., local/in-process manager that doesn't emulate
641///   kill).
642/// - `AlreadyTerminated(term)`: the proc was already terminal; `term`
643///   is the same value `wait()` would return.
644/// - `ChannelClosed`: the manager lost its lifecycle channel and
645///   cannot reliably observe state transitions.
646/// - `Io(err)`: manager-specific failure delivering the signal or
647///   performing shutdown (e.g., OS error on kill).
648#[derive(Debug)]
649pub enum TerminateError<TerminalStatus> {
650    /// Manager doesn't support signaling (e.g., Local manager).
651    Unsupported,
652    /// A terminal state was already reached while attempting
653    /// terminate/kill.
654    AlreadyTerminated(TerminalStatus),
655    /// Implementation lost its status channel / cannot observe state.
656    ChannelClosed,
657    /// Manager-specific failure to deliver signal or perform
658    /// shutdown.
659    Io(anyhow::Error),
660}
661
662impl<T: fmt::Debug> fmt::Display for TerminateError<T> {
663    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
664        match self {
665            TerminateError::Unsupported => write!(f, "terminate/kill unsupported by manager"),
666            TerminateError::AlreadyTerminated(st) => {
667                write!(f, "proc already terminated (status: {st:?})")
668            }
669            TerminateError::ChannelClosed => {
670                write!(f, "lifecycle channel closed; cannot observe state")
671            }
672            TerminateError::Io(err) => write!(f, "I/O error during terminate/kill: {err}"),
673        }
674    }
675}
676
677impl<T: fmt::Debug> std::error::Error for TerminateError<T> {
678    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
679        match self {
680            TerminateError::Io(err) => Some(err.root_cause()),
681            _ => None,
682        }
683    }
684}
685
686/// Summary of results from a bulk termination attempt.
687///
688/// - `attempted`: total number of child procs for which termination
689///   was attempted.
690/// - `ok`: number of procs successfully terminated (includes those
691///   that were already in a terminal state).
692/// - `failed`: number of procs that could not be terminated (e.g.
693///   signaling errors or lost lifecycle channel).
694#[derive(Debug)]
695pub struct TerminateSummary {
696    /// Total number of child procs for which termination was
697    /// attempted.
698    pub attempted: usize,
699    /// Number of procs that successfully reached a terminal state.
700    ///
701    /// This count includes both procs that exited cleanly after
702    /// `terminate(timeout)` and those that were already in a terminal
703    /// state before termination was attempted.
704    pub ok: usize,
705    /// Number of procs that failed to terminate.
706    ///
707    /// Failures typically arise from signaling errors (e.g., OS
708    /// failure to deliver SIGTERM/SIGKILL) or a lost lifecycle
709    /// channel, meaning the manager could no longer observe state
710    /// transitions.
711    pub failed: usize,
712}
713
714impl fmt::Display for TerminateSummary {
715    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
716        write!(
717            f,
718            "attempted={} ok={} failed={}",
719            self.attempted, self.ok, self.failed
720        )
721    }
722}
723
724#[async_trait::async_trait]
725/// Trait for terminating a single proc.
726pub trait SingleTerminate: Send + Sync {
727    /// Gracefully terminate the given proc.
728    ///
729    /// Initiates a polite shutdown for each child, waits up to
730    /// `timeout` for completion, then escalates to a forceful stop
731    /// The returned [`TerminateSummary`] reports how
732    /// many children were attempted, succeeded, and failed.
733    ///
734    /// Implementation notes:
735    /// - "Polite shutdown" and "forceful stop" are intentionally
736    ///   abstract. Implementors should map these to whatever
737    ///   semantics they control (e.g., proc-level drain/abort, RPCs,
738    ///   OS signals).
739    /// - The operation must be idempotent and tolerate races with
740    ///   concurrent termination or external exits.
741    ///
742    /// # Parameters
743    /// - `timeout`: Per-child grace period before escalation to a
744    ///   forceful stop.
745    /// - `reason`: Human-readable reason for termination.
746    /// Returns a tuple of (polite shutdown actors vec, forceful stop actors vec)
747    async fn terminate_proc(
748        &self,
749        cx: &impl context::Actor,
750        proc: &ProcAddr,
751        timeout: std::time::Duration,
752        reason: &str,
753    ) -> Result<(Vec<ActorAddr>, Vec<ActorAddr>), anyhow::Error>;
754}
755
756/// Trait for managers that can terminate many child **units** in
757/// bulk.
758///
759/// Implementors provide a concurrency-bounded, graceful shutdown over
760/// all currently tracked children (polite stop → wait → forceful
761/// stop), returning a summary of outcomes. The exact stop/kill
762/// semantics are manager-specific: for example, an OS-process manager
763/// might send signals, while an in-process manager might drain/abort
764/// tasks.
765#[async_trait::async_trait]
766pub trait BulkTerminate: Send + Sync {
767    /// Gracefully terminate all known children.
768    ///
769    /// Initiates a polite shutdown for each child, waits up to
770    /// `timeout` for completion, then escalates to a forceful stop
771    /// for any that remain. Work may be done in parallel, capped by
772    /// `max_in_flight`. The returned [`TerminateSummary`] reports how
773    /// many children were attempted, succeeded, and failed.
774    ///
775    /// Implementation notes:
776    /// - "Polite shutdown" and "forceful stop" are intentionally
777    ///   abstract. Implementors should map these to whatever
778    ///   semantics they control (e.g., proc-level drain/abort, RPCs,
779    ///   OS signals).
780    /// - The operation must be idempotent and tolerate races with
781    ///   concurrent termination or external exits.
782    ///
783    /// # Parameters
784    /// - `timeout`: Per-child grace period before escalation to a
785    ///   forceful stop.
786    /// - `max_in_flight`: Upper bound on concurrent terminations (≥
787    ///   1) to prevent resource spikes (I/O, CPU, file descriptors,
788    ///   etc.).
789    async fn terminate_all(
790        &self,
791        cx: &impl context::Actor,
792        timeout: std::time::Duration,
793        max_in_flight: usize,
794        reason: &str,
795    ) -> TerminateSummary;
796}
797
798// Host convenience that's available only when its manager supports
799// bulk termination.
800impl<M: ProcManager + BulkTerminate> Host<M> {
801    /// Gracefully terminate all procs spawned by this host.
802    ///
803    /// Delegates to the underlying manager’s
804    /// [`BulkTerminate::terminate_all`] implementation. Use this to
805    /// perform orderly teardown during scale-down or shutdown.
806    ///
807    /// # Parameters
808    /// - `timeout`: Per-child grace period before escalation.
809    /// - `max_in_flight`: Upper bound on concurrent terminations.
810    ///
811    /// # Returns
812    /// A [`TerminateSummary`] with counts of attempted/ok/failed
813    /// terminations.
814    pub async fn terminate_children(
815        &mut self,
816        cx: &impl context::Actor,
817        timeout: Duration,
818        max_in_flight: usize,
819        reason: &str,
820    ) -> TerminateSummary {
821        let summary = self
822            .manager
823            .terminate_all(cx, timeout, max_in_flight, reason)
824            .await;
825        // Unbind procs from the router so if new procs are made with the same
826        // names, they can use the same slot.
827        for name in self.procs.drain() {
828            let proc_ref = ResourceId::proc_addr_from_name(self.frontend_addr.clone(), &name);
829            self.dial_router.unbind(&Addr::from(proc_ref));
830        }
831        summary
832    }
833}
834
835#[async_trait::async_trait]
836impl<M: ProcManager + SingleTerminate> SingleTerminate for Host<M> {
837    async fn terminate_proc(
838        &self,
839        cx: &impl context::Actor,
840        proc: &ProcAddr,
841        timeout: Duration,
842        reason: &str,
843    ) -> Result<(Vec<ActorAddr>, Vec<ActorAddr>), anyhow::Error> {
844        self.manager.terminate_proc(cx, proc, timeout, reason).await
845    }
846}
847
848/// Capability proving a proc is ready.
849///
850/// [`ReadyProc::ensure`] validates that `addr()` and `agent_ref()`
851/// are available; this type carries that proof, providing infallible
852/// accessors.
853///
854/// Obtain a `ReadyProc` by calling `ready_proc(&handle).await`.
855pub struct ReadyProc<'a, H: ProcHandle> {
856    handle: &'a H,
857    addr: ChannelAddr,
858    agent_ref: ActorRef<H::Agent>,
859}
860
861impl<'a, H: ProcHandle> ReadyProc<'a, H> {
862    /// Wait for a proc to become ready, then return a capability that
863    /// provides infallible access to `addr()` and `agent_ref()`.
864    ///
865    /// This is the type-safe way to obtain the proc's address and
866    /// agent reference. After this function returns `Ok(ready)`, both
867    /// `ready.addr()` and `ready.agent_ref()` are guaranteed to
868    /// succeed.
869    pub async fn ensure(
870        handle: &'a H,
871    ) -> Result<ReadyProc<'a, H>, ReadyProcError<H::TerminalStatus>> {
872        handle.ready().await?;
873        let addr = handle.addr().ok_or(ReadyProcError::MissingAddr)?;
874        let agent_ref = handle.agent_ref().ok_or(ReadyProcError::MissingAgentRef)?;
875        Ok(ReadyProc {
876            handle,
877            addr,
878            agent_ref,
879        })
880    }
881
882    /// The proc's logical address.
883    pub fn proc_addr(&self) -> &ProcAddr {
884        self.handle.proc_addr()
885    }
886
887    /// The proc's address (guaranteed available after ready).
888    pub fn addr(&self) -> &ChannelAddr {
889        &self.addr
890    }
891
892    /// The agent actor reference (guaranteed available after ready).
893    pub fn agent_ref(&self) -> &ActorRef<H::Agent> {
894        &self.agent_ref
895    }
896}
897
898/// Minimal uniform surface for a spawned-**proc** handle returned by
899/// a `ProcManager`. Each manager can return its own concrete handle,
900/// as long as it exposes these. A **proc** is the Hyperactor runtime
901/// + its actors (lifecycle controlled via `Proc` APIs such as
902/// `destroy_and_wait`). A proc **may** be hosted *inside* an OS
903/// **process**, but it is conceptually distinct:
904///
905/// - `LocalProcManager`: runs the proc **in this OS process**; there
906///   is no child process to signal. Lifecycle is entirely proc-level.
907/// - `ProcessProcManager` (test-only here): launches an **external OS
908///   process** which hosts the proc, but this toy manager does
909///   **not** wire a control plane for shutdown, nor an exit monitor.
910///
911/// This trait is therefore written in terms of the **proc**
912/// lifecycle:
913///
914/// - `ready()` resolves when the proc is Ready (mailbox bound; agent
915///   available).
916/// - `wait()` resolves with the proc's terminal status
917///   (Stopped/Killed/Failed).
918/// - `terminate()` requests a graceful shutdown of the *proc* and
919///   waits up to the deadline; managers that also own a child OS
920///   process may escalate to `SIGKILL` if the proc does not exit in
921///   time.
922/// - `kill()` requests an immediate, forced termination. For
923///    in-process procs, this may be implemented as an immediate
924///    drain/abort of actor tasks. For external procs, this is
925///    typically a `SIGKILL`.
926///
927/// The shape of the terminal value is `Self::TerminalStatus`.
928/// Managers that track rich info (exit code, signal, address, agent)
929/// can expose it; trivial managers may use `()`.
930///
931/// Managers that do not support signaling must return `Unsupported`.
932#[async_trait]
933pub trait ProcHandle: Clone + Send + Sync + 'static {
934    /// The agent actor type installed in the proc by the manager.
935    /// Must implement both:
936    /// - [`Actor`], because the agent actually runs inside the proc,
937    ///   and
938    /// - [`Referable`], so callers can hold `ActorRef<Self::Agent>`.
939    type Agent: Actor + Referable;
940
941    /// The type of terminal status produced when the proc exits.
942    ///
943    /// For example, an external proc manager may use a rich status
944    /// enum (e.g. `ProcStatus`), while an in-process manager may use
945    /// a trivial unit type. This is the value returned by
946    /// [`ProcHandle::wait`] and carried by [`ReadyError::Terminal`].
947    type TerminalStatus: std::fmt::Debug + Clone + Send + Sync + 'static;
948
949    /// The proc's logical address on this host.
950    fn proc_addr(&self) -> &ProcAddr;
951
952    /// The proc's address (the one callers bind into the host
953    /// router). May return `None` before `ready()` completes.
954    /// Guaranteed to return `Some` after `ready()` succeeds.
955    ///
956    /// **Prefer [`ready_proc()`]** for type-safe access that
957    /// guarantees availability at compile time.
958    fn addr(&self) -> Option<ChannelAddr>;
959
960    /// The agent actor reference hosted in the proc. May return
961    /// `None` before `ready()` completes. Guaranteed to return `Some`
962    /// after `ready()` succeeds.
963    ///
964    /// **Prefer [`ready_proc()`]** for type-safe access that
965    /// guarantees availability at compile time.
966    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>>;
967
968    /// Resolves when the proc becomes Ready. Multi-waiter,
969    /// non-consuming.
970    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>>;
971
972    /// Resolves with the terminal status (Stopped/Killed/Failed/etc).
973    /// Multi-waiter, non-consuming.
974    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError>;
975
976    /// Politely stop the proc before the deadline; managers that own
977    /// a child OS process may escalate to a forced kill at the
978    /// deadline. Idempotent and race-safe: concurrent callers
979    /// coalesce; the first terminal outcome wins and all callers
980    /// observe it via `wait()`.
981    ///
982    /// Returns the single terminal status the proc reached (the same
983    /// value `wait()` will return). Never fabricates terminal states:
984    /// this is only returned after the exit monitor observes
985    /// termination.
986    ///
987    /// # Parameters
988    /// - `cx`: The actor context for sending messages.
989    /// - `timeout`: Grace period before escalation.
990    /// - `reason`: Human-readable reason for termination.
991    async fn terminate(
992        &self,
993        cx: &impl context::Actor,
994        timeout: Duration,
995        reason: &str,
996    ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
997
998    /// Force the proc down immediately. For in-process managers this
999    /// may abort actor tasks; for external managers this typically
1000    /// sends `SIGKILL`. Also idempotent/race-safe; the terminal
1001    /// outcome is the one observed by `wait()`.
1002    async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
1003}
1004
1005/// A trait describing a manager of procs, responsible for bootstrapping
1006/// procs on a host, and managing their lifetimes. The manager spawns an
1007/// `Agent`-typed actor on each proc, responsible for managing the proc.
1008#[async_trait]
1009pub trait ProcManager {
1010    /// Concrete handle type this manager returns.
1011    type Handle: ProcHandle;
1012
1013    /// Additional configuration for the proc, supported by this manager.
1014    type Config = ();
1015
1016    /// The preferred transport for this ProcManager.
1017    /// In practice this will be [`ChannelTransport::Local`]
1018    /// for testing, and [`ChannelTransport::Unix`] for external
1019    /// processes.
1020    fn transport(&self) -> ChannelTransport;
1021
1022    /// Spawn a new proc with the provided proc id. The proc
1023    /// should use the provided forwarder address for messages
1024    /// destined outside of the proc. The returned address accepts
1025    /// messages destined for the proc.
1026    ///
1027    /// An agent actor is also spawned, and the corresponding actor
1028    /// ref is returned.
1029    async fn spawn(
1030        &self,
1031        proc_id: ProcAddr,
1032        forwarder_addr: ChannelAddr,
1033        config: Self::Config,
1034    ) -> Result<Self::Handle, HostError>;
1035}
1036
1037/// Type alias for the agent actor managed by a given [`ProcManager`].
1038///
1039/// This resolves to the `Agent` type exposed by the manager's
1040/// associated `Handle` (via [`ProcHandle::Agent`]). It provides a
1041/// convenient shorthand so call sites can refer to
1042/// `ActorRef<ManagerAgent<M>>` instead of the more verbose
1043/// `<M::Handle as ProcHandle>::Agent`.
1044///
1045/// # Example
1046/// ```ignore
1047/// fn takes_agent_ref<M: ProcManager>(r: ActorRef<ManagerAgent<M>>) { … }
1048/// ```
1049pub type ManagerAgent<M> = <<M as ProcManager>::Handle as ProcHandle>::Agent; // rust issue #112792
1050
1051/// Lifecycle status for procs managed by [`LocalProcManager`].
1052///
1053/// Used by [`LocalProcManager::request_stop`] to track background
1054/// teardown progress.
1055#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1056pub enum LocalProcStatus {
1057    /// A stop has been requested but teardown is still in progress.
1058    Stopping,
1059    /// Teardown completed.
1060    Stopped,
1061}
1062
1063/// A ProcManager that spawns **in-process** procs (test-only).
1064///
1065/// The proc runs inside this same OS process; there is **no** child
1066/// process to signal. Lifecycle is purely proc-level:
1067/// - `terminate(timeout)`: delegates to
1068///   `Proc::destroy_and_wait(timeout)`, which drains and, at the
1069///   deadline, aborts remaining actors.
1070/// - `kill()`: uses a zero deadline to emulate a forced stop via
1071///   `destroy_and_wait(Duration::ZERO)`.
1072/// - `wait()`: trivial (no external lifecycle to observe).
1073///
1074///   No OS signals are sent or required.
1075pub struct LocalProcManager<S> {
1076    procs: Arc<Mutex<HashMap<ProcAddr, Proc>>>,
1077    stopping: Arc<Mutex<HashMap<ProcAddr, tokio::sync::watch::Sender<LocalProcStatus>>>>,
1078    spawn: S,
1079}
1080
1081impl<S> LocalProcManager<S> {
1082    /// Create a new in-process proc manager with the given agent
1083    /// params.
1084    pub fn new(spawn: S) -> Self {
1085        Self {
1086            procs: Arc::new(Mutex::new(HashMap::new())),
1087            stopping: Arc::new(Mutex::new(HashMap::new())),
1088            spawn,
1089        }
1090    }
1091
1092    /// Non-blocking stop: remove the proc and spawn a background task
1093    /// that tears it down.
1094    ///
1095    /// Status transitions through `Stopping` -> `Stopped` and is
1096    /// observable via [`local_proc_status`] and [`watch`]. Idempotent:
1097    /// no-ops if the proc is already stopping or stopped.
1098    pub async fn request_stop(&self, proc: &ProcAddr, timeout: Duration, reason: &str) {
1099        {
1100            let guard = self.stopping.lock().await;
1101            if guard.contains_key(proc) {
1102                return;
1103            }
1104        }
1105
1106        let mut proc_handle = {
1107            let mut guard = self.procs.lock().await;
1108            match guard.remove(proc) {
1109                Some(p) => p,
1110                None => return,
1111            }
1112        };
1113
1114        let proc_ref: ProcAddr = proc_handle.proc_addr().clone();
1115        let (tx, _) = tokio::sync::watch::channel(LocalProcStatus::Stopping);
1116        self.stopping.lock().await.insert(proc_ref.clone(), tx);
1117
1118        let stopping = Arc::clone(&self.stopping);
1119        let reason = reason.to_string();
1120        tokio::spawn(async move {
1121            if let Err(e) = proc_handle.destroy_and_wait(timeout, &reason).await {
1122                tracing::warn!(error = %e, "request_stop(local): destroy_and_wait failed");
1123            }
1124            if let Some(tx) = stopping.lock().await.get(&proc_ref) {
1125                let _ = tx.send(LocalProcStatus::Stopped);
1126            }
1127        });
1128    }
1129
1130    /// Query the lifecycle status of a proc that was stopped via
1131    /// [`request_stop`].
1132    ///
1133    /// Returns `None` if the proc was never stopped through this path.
1134    pub async fn local_proc_status(&self, proc: &ProcAddr) -> Option<LocalProcStatus> {
1135        self.stopping.lock().await.get(proc).map(|tx| *tx.borrow())
1136    }
1137
1138    /// Subscribe to lifecycle status changes for a proc that was
1139    /// stopped via [`request_stop`].
1140    ///
1141    /// Returns `None` if the proc was never stopped through this path.
1142    pub async fn watch(
1143        &self,
1144        proc: &ProcAddr,
1145    ) -> Option<tokio::sync::watch::Receiver<LocalProcStatus>> {
1146        self.stopping
1147            .lock()
1148            .await
1149            .get(proc)
1150            .map(|tx| tx.subscribe())
1151    }
1152}
1153
1154#[async_trait]
1155impl<S> BulkTerminate for LocalProcManager<S>
1156where
1157    S: Send + Sync,
1158{
1159    async fn terminate_all(
1160        &self,
1161        _cx: &impl context::Actor,
1162        timeout: std::time::Duration,
1163        max_in_flight: usize,
1164        reason: &str,
1165    ) -> TerminateSummary {
1166        // Drain procs so we don't hold the lock across awaits and subsequent
1167        // calls to terminate_all don't try to re-terminate.
1168        let procs: Vec<Proc> = {
1169            let mut guard = self.procs.lock().await;
1170            guard.drain().map(|(_, v)| v).collect()
1171        };
1172
1173        let attempted = procs.len();
1174
1175        let results = stream::iter(procs.into_iter().map(|mut p| async move {
1176            // For local manager, graceful proc-level stop.
1177            match p.destroy_and_wait(timeout, reason).await {
1178                Ok(_) => true,
1179                Err(e) => {
1180                    tracing::warn!(error=%e, "terminate_all(local): destroy_and_wait failed");
1181                    false
1182                }
1183            }
1184        }))
1185        .buffer_unordered(max_in_flight.max(1))
1186        .collect::<Vec<bool>>()
1187        .await;
1188
1189        let ok = results.into_iter().filter(|b| *b).count();
1190
1191        TerminateSummary {
1192            attempted,
1193            ok,
1194            failed: attempted.saturating_sub(ok),
1195        }
1196    }
1197}
1198
1199#[async_trait::async_trait]
1200impl<S> SingleTerminate for LocalProcManager<S>
1201where
1202    S: Send + Sync,
1203{
1204    async fn terminate_proc(
1205        &self,
1206        _cx: &impl context::Actor,
1207        proc: &ProcAddr,
1208        timeout: std::time::Duration,
1209        reason: &str,
1210    ) -> Result<(Vec<ActorAddr>, Vec<ActorAddr>), anyhow::Error> {
1211        // Snapshot procs so we don't hold the lock across awaits.
1212        let procs: Option<Proc> = {
1213            let mut guard = self.procs.lock().await;
1214            guard.remove(proc)
1215        };
1216        if let Some(mut p) = procs {
1217            p.destroy_and_wait(timeout, reason).await
1218        } else {
1219            Err(anyhow::anyhow!("proc {} doesn't exist", proc))
1220        }
1221    }
1222}
1223
1224/// A lightweight [`ProcHandle`] for procs managed **in-process** via
1225/// [`LocalProcManager`].
1226///
1227/// This handle wraps the minimal identifying state of a spawned proc:
1228/// - its [`ProcId`] (logical identity on the host),
1229/// - the proc's [`ChannelAddr`] (the address callers bind into the
1230///   host router), and
1231/// - the [`ActorAddr`] to the agent actor hosted in the proc.
1232///
1233/// Unlike external handles, `LocalHandle` does **not** manage an OS
1234/// child process. It provides a uniform surface (`proc_id()`,
1235/// `addr()`, `agent_ref()`) and implements `terminate()`/`kill()` by
1236/// calling into the underlying `Proc::destroy_and_wait`, i.e.,
1237/// **proc-level** shutdown.
1238///
1239/// **Type parameter:** `A` is constrained by the `ProcHandle::Agent`
1240/// bound (`Actor + Referable`).
1241pub struct LocalHandle<A: Actor + Referable> {
1242    proc_id: ProcAddr,
1243    addr: ChannelAddr,
1244    agent_ref: ActorRef<A>,
1245    procs: Arc<Mutex<HashMap<ProcAddr, Proc>>>,
1246}
1247
1248// Manual `Clone` to avoid requiring `A: Clone`.
1249impl<A: Actor + Referable> Clone for LocalHandle<A> {
1250    fn clone(&self) -> Self {
1251        Self {
1252            proc_id: self.proc_id.clone(),
1253            addr: self.addr.clone(),
1254            agent_ref: self.agent_ref.clone(),
1255            procs: Arc::clone(&self.procs),
1256        }
1257    }
1258}
1259
1260#[async_trait]
1261impl<A: Actor + Referable> ProcHandle for LocalHandle<A> {
1262    /// `Agent = A` (inherits `Actor + Referable` from the trait
1263    /// bound).
1264    type Agent = A;
1265    type TerminalStatus = ();
1266
1267    fn proc_addr(&self) -> &ProcAddr {
1268        &self.proc_id
1269    }
1270
1271    fn addr(&self) -> Option<ChannelAddr> {
1272        Some(self.addr.clone())
1273    }
1274
1275    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1276        Some(self.agent_ref.clone())
1277    }
1278
1279    /// Always resolves immediately: a local proc is created
1280    /// in-process and is usable as soon as the handle exists.
1281    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1282        Ok(())
1283    }
1284    /// Always resolves immediately with `()`: a local proc has no
1285    /// external lifecycle to await. There is no OS child process
1286    /// behind this handle.
1287    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1288        Ok(())
1289    }
1290
1291    async fn terminate(
1292        &self,
1293        _cx: &impl context::Actor,
1294        timeout: Duration,
1295        reason: &str,
1296    ) -> Result<(), TerminateError<Self::TerminalStatus>> {
1297        let mut proc = {
1298            let guard = self.procs.lock().await;
1299            match guard.get(self.proc_addr()) {
1300                Some(p) => p.clone(),
1301                None => {
1302                    // The proc was already removed; treat as already
1303                    // terminal.
1304                    return Err(TerminateError::AlreadyTerminated(()));
1305                }
1306            }
1307        };
1308
1309        // Graceful stop of the *proc* (actors) with a deadline. This
1310        // will drain and then abort remaining actors at expiry.
1311        let _ = proc
1312            .destroy_and_wait(timeout, reason)
1313            .await
1314            .map_err(TerminateError::Io)?;
1315
1316        Ok(())
1317    }
1318
1319    async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
1320        // Forced stop == zero deadline; `destroy_and_wait` will
1321        // immediately abort remaining actors and return.
1322        let mut proc = {
1323            let guard = self.procs.lock().await;
1324            match guard.get(self.proc_addr()) {
1325                Some(p) => p.clone(),
1326                None => return Err(TerminateError::AlreadyTerminated(())),
1327            }
1328        };
1329
1330        let _ = proc
1331            .destroy_and_wait(Duration::from_millis(0), "kill")
1332            .await
1333            .map_err(TerminateError::Io)?;
1334
1335        Ok(())
1336    }
1337}
1338
1339/// Local, in-process ProcManager.
1340///
1341/// **Type bounds:**
1342/// - `A: Actor + Referable + Binds<A>`
1343///   - `Actor`: the agent actually runs inside the proc.
1344///   - `Referable`: callers hold `ActorRef<A>` to the agent; this
1345///     bound is required for typed remote refs.
1346///   - `Binds<A>`: lets the runtime wire the agent's handler ports.
1347/// - `F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send`:
1348///   the spawn closure returns a Send future (we `tokio::spawn` it).
1349/// - `S: Fn(Proc) -> F + Sync`: the factory can be called from
1350///   concurrent contexts.
1351///
1352/// Result handle is `LocalHandle<A>` (whose `Agent = A` via `ProcHandle`).
1353#[async_trait]
1354impl<A, S, F> ProcManager for LocalProcManager<S>
1355where
1356    A: Actor + Referable + Binds<A>,
1357    F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send,
1358    S: Fn(Proc) -> F + Sync,
1359{
1360    type Handle = LocalHandle<A>;
1361
1362    fn transport(&self) -> ChannelTransport {
1363        ChannelTransport::Local
1364    }
1365
1366    #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=forwarder_addr.to_string()))]
1367    async fn spawn(
1368        &self,
1369        proc_id: ProcAddr,
1370        forwarder_addr: ChannelAddr,
1371        _config: (),
1372    ) -> Result<Self::Handle, HostError> {
1373        let transport = forwarder_addr.transport();
1374        let proc = Proc::configured(
1375            proc_id.clone(),
1376            MailboxClient::dial(forwarder_addr)?.into_boxed(),
1377        );
1378        let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?;
1379        self.procs
1380            .lock()
1381            .await
1382            .insert(proc_id.clone(), proc.clone());
1383        let _handle = proc.clone().serve(rx);
1384        let agent_handle = (self.spawn)(proc)
1385            .await
1386            .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
1387
1388        Ok(LocalHandle {
1389            proc_id,
1390            addr: proc_addr,
1391            agent_ref: agent_handle.bind(),
1392            procs: Arc::clone(&self.procs),
1393        })
1394    }
1395}
1396
1397/// A ProcManager that manages each proc as a **separate OS process**
1398/// (test-only toy).
1399///
1400/// This implementation launches a child via `Command` and relies on
1401/// `kill_on_drop(true)` so that children are SIGKILLed if the manager
1402/// (or host) drops. There is **no** proc control plane (no RPC to a
1403/// proc agent for shutdown) and **no** exit monitor wired here.
1404/// Consequently:
1405/// - `terminate()` and `kill()` return `Unsupported`.
1406/// - `wait()` is trivial (no lifecycle observation).
1407///
1408/// It follows a simple protocol:
1409///
1410/// Each process is launched with the following environment variables:
1411/// - `HYPERACTOR_HOST_BACKEND_ADDR`: the backend address to which all messages are forwarded,
1412/// - `HYPERACTOR_HOST_PROC_ID`: the proc id to assign the launched proc, and
1413/// - `HYPERACTOR_HOST_CALLBACK_ADDR`: the channel address with which to return the proc's address
1414///
1415/// The launched proc should also spawn an actor to manage it - the details of this are
1416/// implementation dependent, and outside the scope of the process manager.
1417///
1418/// The function [`boot_proc`] provides a convenient implementation of the
1419/// protocol.
1420pub struct ProcessProcManager<A> {
1421    program: std::path::PathBuf,
1422    children: Arc<Mutex<HashMap<ProcAddr, Child>>>,
1423    _phantom: PhantomData<A>,
1424}
1425
1426impl<A> ProcessProcManager<A> {
1427    /// Create a new ProcessProcManager that runs the provided
1428    /// command.
1429    pub fn new(program: std::path::PathBuf) -> Self {
1430        Self {
1431            program,
1432            children: Arc::new(Mutex::new(HashMap::new())),
1433            _phantom: PhantomData,
1434        }
1435    }
1436}
1437
1438impl<A> Drop for ProcessProcManager<A> {
1439    fn drop(&mut self) {
1440        // When the manager is dropped, `children` is dropped, which
1441        // drops each `Child` handle. With `kill_on_drop(true)`, the OS
1442        // will SIGKILL the processes. Nothing else to do here.
1443    }
1444}
1445
1446/// A [`ProcHandle`] implementation for procs managed as separate
1447/// OS processes via [`ProcessProcManager`].
1448///
1449/// This handle records the logical identity and connectivity of an
1450/// external child process:
1451/// - its [`ProcId`] (unique identity on the host),
1452/// - the proc's [`ChannelAddr`] (address registered in the host
1453///   router),
1454/// - and the [`ActorRef`] of the agent actor spawned inside the proc.
1455///
1456/// Unlike [`LocalHandle`], this corresponds to a real OS process
1457/// launched by the manager. In this **toy** implementation the handle
1458/// does not own/monitor the `Child` and there is no shutdown control
1459/// plane. It is a stable, clonable surface exposing the proc's
1460/// identity, address, and agent reference so host code can interact
1461/// uniformly with local/external procs. `terminate()`/`kill()` are
1462/// intentionally `Unsupported` here; process cleanup relies on
1463/// `cmd.kill_on_drop(true)` when launching the child (the OS will
1464/// SIGKILL it if the handle is dropped).
1465///
1466/// The type bound `A: Actor + Referable` comes from the
1467/// [`ProcHandle::Agent`] requirement: `Actor` because the agent
1468/// actually runs inside the proc, and `Referable` because it must
1469/// be referenceable via [`ActorRef<A>`] (i.e., safe to carry as a
1470/// typed remote reference).
1471#[derive(Debug)]
1472pub struct ProcessHandle<A: Actor + Referable> {
1473    proc_id: ProcAddr,
1474    addr: ChannelAddr,
1475    agent_ref: ActorRef<A>,
1476}
1477
1478// Manual `Clone` to avoid requiring `A: Clone`.
1479impl<A: Actor + Referable> Clone for ProcessHandle<A> {
1480    fn clone(&self) -> Self {
1481        Self {
1482            proc_id: self.proc_id.clone(),
1483            addr: self.addr.clone(),
1484            agent_ref: self.agent_ref.clone(),
1485        }
1486    }
1487}
1488
1489#[async_trait]
1490impl<A: Actor + Referable> ProcHandle for ProcessHandle<A> {
1491    /// Agent must be both an `Actor` (runs in the proc) and a
1492    /// `Referable` (so it can be referenced via `ActorRef<A>`).
1493    type Agent = A;
1494    type TerminalStatus = ();
1495
1496    fn proc_addr(&self) -> &ProcAddr {
1497        &self.proc_id
1498    }
1499
1500    fn addr(&self) -> Option<ChannelAddr> {
1501        Some(self.addr.clone())
1502    }
1503
1504    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1505        Some(self.agent_ref.clone())
1506    }
1507
1508    /// Resolves immediately. `ProcessProcManager::spawn` returns this
1509    /// handle only after the child has called back with (addr,
1510    /// agent), i.e. after readiness.
1511    async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1512        Ok(())
1513    }
1514    /// Resolves immediately with `()`. This handle does not track
1515    /// child lifecycle; there is no watcher in this implementation.
1516    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1517        Ok(())
1518    }
1519
1520    async fn terminate(
1521        &self,
1522        _cx: &impl context::Actor,
1523        _deadline: Duration,
1524        _reason: &str,
1525    ) -> Result<(), TerminateError<Self::TerminalStatus>> {
1526        Err(TerminateError::Unsupported)
1527    }
1528
1529    async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
1530        Err(TerminateError::Unsupported)
1531    }
1532}
1533
1534#[async_trait]
1535impl<A> ProcManager for ProcessProcManager<A>
1536where
1537    // Agent actor runs in the proc (`Actor`) and must be
1538    // referenceable (`Referable`).
1539    A: Actor + Referable + Sync,
1540{
1541    type Handle = ProcessHandle<A>;
1542
1543    fn transport(&self) -> ChannelTransport {
1544        ChannelTransport::Unix
1545    }
1546
1547    #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=forwarder_addr.to_string()))]
1548    async fn spawn(
1549        &self,
1550        proc_id: ProcAddr,
1551        forwarder_addr: ChannelAddr,
1552        _config: (),
1553    ) -> Result<Self::Handle, HostError> {
1554        let (callback_addr, mut callback_rx) =
1555            channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1556
1557        let mut cmd = Command::new(&self.program);
1558        cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string());
1559        cmd.env("HYPERACTOR_HOST_BACKEND_ADDR", forwarder_addr.to_string());
1560        cmd.env("HYPERACTOR_HOST_CALLBACK_ADDR", callback_addr.to_string());
1561
1562        // Lifetime strategy: mark the child with
1563        // `kill_on_drop(true)` so the OS will send SIGKILL if the
1564        // handle is dropped and retain the `Child` in
1565        // `self.children`, tying its lifetime to the manager/host.
1566        //
1567        // This is the simplest viable policy to avoid orphaned
1568        // subprocesses in CI; more sophisticated lifecycle control
1569        // (graceful shutdown, restart) will be layered on later.
1570
1571        // Kill the child when its handle is dropped.
1572        cmd.kill_on_drop(true);
1573
1574        let child = cmd.spawn().map_err(|e| {
1575            HostError::ProcessSpawnFailure(proc_id.clone(), self.program.display().to_string(), e)
1576        })?;
1577
1578        // Retain the handle so it lives for the life of the
1579        // manager/host.
1580        {
1581            let mut children = self.children.lock().await;
1582            children.insert(proc_id.clone(), child);
1583        }
1584
1585        // Wait for the child's callback with (addr, agent_ref)
1586        let (proc_addr, agent_ref) = callback_rx.recv().await?;
1587
1588        // TODO(production): For a non-test implementation, plumb a
1589        // shutdown path:
1590        // - expose a proc-level graceful stop RPC on the agent and
1591        //   implement `terminate(timeout)` by invoking it and, on
1592        //   deadline, call `Child::kill()`; implement `kill()` as
1593        //   immediate `Child::kill()`.
1594        // - wire an exit monitor so `wait()` resolves with a real
1595        //   terminal status.
1596        Ok(ProcessHandle {
1597            proc_id,
1598            addr: proc_addr,
1599            agent_ref,
1600        })
1601    }
1602}
1603
1604impl<A> ProcessProcManager<A>
1605where
1606    // `Actor`: runs in the proc; `Referable`: referenceable via
1607    // ActorRef; `Binds<A>`: wires ports.
1608    A: Actor + Referable + Binds<A>,
1609{
1610    /// Boot a process in a ProcessProcManager<A>. Should be called from processes spawned
1611    /// by the process manager. `boot_proc` will spawn the provided actor type (with parameters)
1612    /// onto the newly created Proc, and bind its handler. This allows the user to install an agent to
1613    /// manage the proc itself.
1614    pub async fn boot_proc<S, F>(spawn: S) -> Result<Proc, HostError>
1615    where
1616        S: FnOnce(Proc) -> F,
1617        F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1618    {
1619        let proc_id: ProcAddr = Self::parse_env("HYPERACTOR_HOST_PROC_ID")?;
1620        let backend_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_BACKEND_ADDR")?;
1621        let callback_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_CALLBACK_ADDR")?;
1622        spawn_proc(proc_id, backend_addr, callback_addr, spawn).await
1623    }
1624
1625    fn parse_env<T, E>(key: &str) -> Result<T, HostError>
1626    where
1627        T: FromStr<Err = E>,
1628        E: Into<anyhow::Error>,
1629    {
1630        std::env::var(key)
1631            .map_err(|e| HostError::MissingParameter(key.to_string(), e))?
1632            .parse()
1633            .map_err(|e: E| HostError::InvalidParameter(key.to_string(), e.into()))
1634    }
1635}
1636
1637/// Spawn a proc at `proc_id` with an `A`-typed agent actor,
1638/// forwarding messages to the provided `backend_addr`,
1639/// and returning the proc's address and agent actor on
1640/// the provided `callback_addr`.
1641#[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string(), callback_addr=callback_addr.to_string()))]
1642pub async fn spawn_proc<A, S, F>(
1643    proc_id: ProcAddr,
1644    backend_addr: ChannelAddr,
1645    callback_addr: ChannelAddr,
1646    spawn: S,
1647) -> Result<Proc, HostError>
1648where
1649    // `Actor`: runs in the proc; `Referable`: allows ActorRef<A>;
1650    // `Binds<A>`: wires ports
1651    A: Actor + Referable + Binds<A>,
1652    S: FnOnce(Proc) -> F,
1653    F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1654{
1655    let backend_transport = backend_addr.transport();
1656    let proc = Proc::configured(
1657        proc_id.clone(),
1658        MailboxClient::dial(backend_addr)?.into_boxed(),
1659    );
1660
1661    let agent_handle = spawn(proc.clone())
1662        .await
1663        .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
1664
1665    // Finally serve the proc on the same transport as the backend address,
1666    // and call back.
1667    let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?;
1668    proc.clone().serve(proc_rx);
1669    let agent_ref: ActorRef<A> = agent_handle.bind::<A>();
1670    channel::dial::<(ChannelAddr, ActorRef<A>)>(callback_addr)?
1671        .send((proc_addr, agent_ref))
1672        .await
1673        .map_err(ChannelError::from)?;
1674
1675    Ok(proc)
1676}
1677
1678/// Testing support for hosts. This is linked outside of cfg(test)
1679/// as it is needed by an external binary.
1680pub mod testing {
1681    use async_trait::async_trait;
1682    use hyperactor::Actor;
1683    use hyperactor::ActorAddr;
1684    use hyperactor::Context;
1685    use hyperactor::Endpoint as _;
1686    use hyperactor::Handler;
1687    use hyperactor::OncePortRef;
1688    /// Just a simple actor, available in both the bootstrap binary as well as
1689    /// hyperactor tests.
1690    #[derive(Debug, Default)]
1691    #[hyperactor::export(handlers = [OncePortRef<ActorAddr>])]
1692    pub struct EchoActor;
1693
1694    impl Actor for EchoActor {}
1695
1696    #[async_trait]
1697    impl Handler<OncePortRef<ActorAddr>> for EchoActor {
1698        async fn handle(
1699            &mut self,
1700            cx: &Context<Self>,
1701            reply: OncePortRef<ActorAddr>,
1702        ) -> Result<(), anyhow::Error> {
1703            reply.post(cx, cx.self_addr().clone());
1704            Ok(())
1705        }
1706    }
1707}
1708
1709#[cfg(test)]
1710mod tests {
1711    use std::sync::Arc;
1712    use std::time::Duration;
1713
1714    use async_trait::async_trait;
1715    use hyperactor::Actor;
1716    use hyperactor::Context;
1717    use hyperactor::Endpoint as _;
1718    use hyperactor::Handler;
1719    use hyperactor::Instance;
1720    use hyperactor::OncePortRef;
1721    use hyperactor::PortRef;
1722    use hyperactor::channel::ChannelTransport;
1723    use hyperactor::channel::Tx;
1724    use hyperactor::channel::TxStatus;
1725    use hyperactor::context::Mailbox;
1726    use hyperactor::mailbox::Undeliverable;
1727    use hyperactor::port::Port;
1728    use tokio::sync::mpsc;
1729
1730    use super::testing::EchoActor;
1731    use super::*;
1732
1733    /// A PortRef<String> targeting a nonexistent actor. When the
1734    /// collector receives this, it sends a message to the dest; the
1735    /// resulting Undeliverable is captured.
1736    type SendTo = PortRef<String>;
1737
1738    /// Test actor that sends a message to a provided destination and
1739    /// collects the resulting Undeliverable.
1740    #[derive(Debug)]
1741    #[hyperactor::export(handlers = [SendTo])]
1742    struct UndeliverableCollector {
1743        tx: mpsc::UnboundedSender<Undeliverable<MessageEnvelope>>,
1744    }
1745
1746    #[async_trait]
1747    impl Actor for UndeliverableCollector {
1748        async fn handle_undeliverable_message(
1749            &mut self,
1750            _cx: &Instance<Self>,
1751            message: Undeliverable<MessageEnvelope>,
1752        ) -> Result<(), anyhow::Error> {
1753            let _ = self.tx.send(message);
1754            Ok(())
1755        }
1756    }
1757
1758    #[async_trait]
1759    impl Handler<SendTo> for UndeliverableCollector {
1760        async fn handle(&mut self, cx: &Context<Self>, dest: SendTo) -> Result<(), anyhow::Error> {
1761            dest.post(cx, "into-the-void".to_string());
1762            Ok(())
1763        }
1764    }
1765
1766    #[tokio::test]
1767    async fn test_basic() {
1768        let proc_manager =
1769            LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
1770        let procs = Arc::clone(&proc_manager.procs);
1771        let mut host = Host::new(proc_manager, ChannelAddr::any(ChannelTransport::Unix))
1772            .await
1773            .unwrap();
1774
1775        let (proc_id1, _ref) = host.spawn("proc1".to_string(), ()).await.unwrap();
1776        assert_eq!(
1777            proc_id1,
1778            ResourceId::proc_addr_from_name(host.addr().clone(), "proc1")
1779        );
1780        assert!(procs.lock().await.contains_key(&proc_id1));
1781
1782        let (proc_id2, _ref) = host.spawn("proc2".to_string(), ()).await.unwrap();
1783        assert!(procs.lock().await.contains_key(&proc_id2));
1784
1785        let proc1 = procs.lock().await.get(&proc_id1).unwrap().clone();
1786        let proc2 = procs.lock().await.get(&proc_id2).unwrap().clone();
1787
1788        // Make sure they can talk to each other:
1789        let (instance1, _handle) = proc1.client("client").unwrap();
1790        let (instance2, _handle) = proc2.client("client").unwrap();
1791
1792        let (port, mut rx) = instance1.mailbox().open_port();
1793
1794        port.bind().post(&instance2, "hello".to_string());
1795        assert_eq!(rx.recv().await.unwrap(), "hello".to_string());
1796
1797        // Make sure that the system proc is also wired in correctly.
1798        let (system_actor, _handle) = host.system_proc().client("test").unwrap();
1799
1800        // system->proc
1801        port.bind()
1802            .post(&system_actor, "hello from the system proc".to_string());
1803        assert_eq!(
1804            rx.recv().await.unwrap(),
1805            "hello from the system proc".to_string()
1806        );
1807
1808        // system->system
1809        let (port, mut rx) = system_actor.mailbox().open_port();
1810        port.bind()
1811            .post(&system_actor, "hello from the system".to_string());
1812        assert_eq!(
1813            rx.recv().await.unwrap(),
1814            "hello from the system".to_string()
1815        );
1816
1817        // proc->system
1818        port.bind()
1819            .post(&instance1, "hello from the instance1".to_string());
1820        assert_eq!(
1821            rx.recv().await.unwrap(),
1822            "hello from the instance1".to_string()
1823        );
1824    }
1825
1826    #[tokio::test]
1827    // TODO: OSS: called `Result::unwrap()` on an `Err` value: ReadFailed { manifest_path: "/meta-pytorch/monarch/target/debug/deps/hyperactor-0e1fe83af739d976.resources.json", source: Os { code: 2, kind: NotFound, message: "No such file or directory" } }
1828    #[cfg_attr(not(fbcode_build), ignore)]
1829    async fn test_process_proc_manager() {
1830        hyperactor_telemetry::initialize_logging(hyperactor_telemetry::DefaultTelemetryClock {});
1831
1832        // EchoActor is "host_agent" used to test connectivity.
1833        let process_manager = ProcessProcManager::<EchoActor>::new(
1834            buck_resources::get("monarch/hyperactor_mesh/host_bootstrap").unwrap(),
1835        );
1836        let mut host = Host::new(process_manager, ChannelAddr::any(ChannelTransport::Unix))
1837            .await
1838            .unwrap();
1839
1840        // Manually serve this: the agent isn't actually doing anything in this case,
1841        // but we are testing connectivity.
1842        host.serve().unwrap();
1843
1844        // (1) Spawn and check invariants.
1845        assert!(matches!(host.addr().transport(), ChannelTransport::Unix));
1846        let (proc1, echo1) = host.spawn("proc1".to_string(), ()).await.unwrap();
1847        let (proc2, echo2) = host.spawn("proc2".to_string(), ()).await.unwrap();
1848        assert_eq!(echo1.actor_addr().proc_addr(), proc1);
1849        assert_eq!(echo2.actor_addr().proc_addr(), proc2);
1850
1851        // (2) Duplicate name rejection.
1852        let dup = host.spawn("proc1".to_string(), ()).await;
1853        assert!(matches!(dup, Err(HostError::ProcExists(_))));
1854
1855        // (3) Create a standalone client proc and verify echo1 agent responds.
1856        // Request: client proc -> host frontend/router -> echo1 (proc1).
1857        // Reply:   echo1 (proc1) -> host backend -> host router -> client port.
1858        // This confirms that an external proc (created via
1859        // `Proc::direct`) can address a child proc through the host,
1860        // and receive a correct reply.
1861        let client = Proc::direct(
1862            ChannelAddr::any(host.addr().transport()),
1863            "test".to_string(),
1864        )
1865        .unwrap();
1866        let (client_inst, _h) = client.client("test").unwrap();
1867        let (port, rx) = client_inst.mailbox().open_once_port();
1868        echo1.post(&client_inst, port.bind());
1869        let id = tokio::time::timeout(Duration::from_secs(5), rx.recv())
1870            .await
1871            .unwrap()
1872            .unwrap();
1873        assert_eq!(id, *echo1.actor_addr());
1874
1875        // (4) Child <-> external client request -> reply:
1876        // Request: client proc (standalone via `Proc::direct`) ->
1877        //          host frontend/router -> echo2 (proc2).
1878        // Reply:   echo2 (proc2) -> host backend -> host router ->
1879        //          client port (standalone proc).
1880        // This exercises cross-proc routing between a child and an
1881        // external client under the same host.
1882        let (port2, rx2) = client_inst.mailbox().open_once_port();
1883        echo2.post(&client_inst, port2.bind());
1884        let id2 = tokio::time::timeout(Duration::from_secs(5), rx2.recv())
1885            .await
1886            .unwrap()
1887            .unwrap();
1888        assert_eq!(id2, *echo2.actor_addr());
1889
1890        // (5) System -> child request -> cross-proc reply:
1891        // Request: system proc -> host router (frontend) -> echo1
1892        //          (proc1, child).
1893        // Reply: echo1 (proc1) -> proc1 forwarder -> host backend ->
1894        //        host router -> client proc direct addr (Proc::direct) ->
1895        //        client port.
1896        // Because `client_inst` runs in its own proc, the reply
1897        // traverses the host (not local delivery within proc1).
1898        let (sys_inst, _h) = host.system_proc().client("sys-client").unwrap();
1899        let (port3, rx3) = client_inst.mailbox().open_once_port();
1900        // Send from system -> child via a message that ultimately
1901        // replies to client's port
1902        echo1.post(&sys_inst, port3.bind());
1903        let id3 = tokio::time::timeout(Duration::from_secs(5), rx3.recv())
1904            .await
1905            .unwrap()
1906            .unwrap();
1907        assert_eq!(id3, *echo1.actor_addr());
1908    }
1909
1910    #[tokio::test]
1911    async fn local_ready_and_wait_are_immediate() {
1912        // Build a LocalHandle directly.
1913        let addr = ChannelAddr::any(ChannelTransport::Local);
1914        let proc_ref = ResourceId::proc_addr_from_name(addr.clone(), "p");
1915        let actor_ref = proc_ref.actor_addr("host_agent");
1916        let agent_ref = ActorRef::<()>::attest(actor_ref);
1917        let h = LocalHandle::<()> {
1918            proc_id: proc_ref,
1919            addr,
1920            agent_ref,
1921            procs: Arc::new(Mutex::new(HashMap::new())),
1922        };
1923
1924        // ready() resolves immediately
1925        assert!(h.ready().await.is_ok());
1926
1927        // wait() resolves immediately with unit TerminalStatus
1928        assert!(h.wait().await.is_ok());
1929
1930        // Multiple concurrent waiters both succeed
1931        let (r1, r2) = tokio::join!(h.ready(), h.ready());
1932        assert!(r1.is_ok() && r2.is_ok());
1933    }
1934
1935    // --
1936    // Fixtures for `host::spawn` tests.
1937
1938    #[derive(Debug, Clone, Copy)]
1939    enum ReadyMode {
1940        OkAfter(Duration),
1941        ErrTerminal,
1942        ErrChannelClosed,
1943    }
1944
1945    #[derive(Debug, Clone)]
1946    struct TestHandle {
1947        id: ProcAddr,
1948        addr: ChannelAddr,
1949        agent: ActorRef<()>,
1950        mode: ReadyMode,
1951        omit_addr: bool,
1952        omit_agent: bool,
1953    }
1954
1955    #[async_trait::async_trait]
1956    impl ProcHandle for TestHandle {
1957        type Agent = ();
1958        type TerminalStatus = ();
1959
1960        fn proc_addr(&self) -> &ProcAddr {
1961            &self.id
1962        }
1963
1964        fn addr(&self) -> Option<ChannelAddr> {
1965            if self.omit_addr {
1966                None
1967            } else {
1968                Some(self.addr.clone())
1969            }
1970        }
1971
1972        fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1973            if self.omit_agent {
1974                None
1975            } else {
1976                Some(self.agent.clone())
1977            }
1978        }
1979
1980        async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1981            match self.mode {
1982                ReadyMode::OkAfter(d) => {
1983                    if !d.is_zero() {
1984                        tokio::time::sleep(d).await;
1985                    }
1986                    Ok(())
1987                }
1988                ReadyMode::ErrTerminal => Err(ReadyError::Terminal(())),
1989                ReadyMode::ErrChannelClosed => Err(ReadyError::ChannelClosed),
1990            }
1991        }
1992        async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1993            Ok(())
1994        }
1995        async fn terminate(
1996            &self,
1997            _cx: &impl context::Actor,
1998            _timeout: Duration,
1999            _reason: &str,
2000        ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
2001            Err(TerminateError::Unsupported)
2002        }
2003        async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
2004            Err(TerminateError::Unsupported)
2005        }
2006    }
2007
2008    #[derive(Debug, Clone)]
2009    struct TestManager {
2010        mode: ReadyMode,
2011        omit_addr: bool,
2012        omit_agent: bool,
2013        transport: ChannelTransport,
2014    }
2015
2016    impl TestManager {
2017        fn local(mode: ReadyMode) -> Self {
2018            Self {
2019                mode,
2020                omit_addr: false,
2021                omit_agent: false,
2022                transport: ChannelTransport::Local,
2023            }
2024        }
2025        fn with_omissions(mut self, addr: bool, agent: bool) -> Self {
2026            self.omit_addr = addr;
2027            self.omit_agent = agent;
2028            self
2029        }
2030    }
2031
2032    #[async_trait::async_trait]
2033    impl ProcManager for TestManager {
2034        type Handle = TestHandle;
2035
2036        fn transport(&self) -> ChannelTransport {
2037            self.transport.clone()
2038        }
2039
2040        async fn spawn(
2041            &self,
2042            proc_id: ProcAddr,
2043            forwarder_addr: ChannelAddr,
2044            _config: (),
2045        ) -> Result<Self::Handle, HostError> {
2046            let agent = ActorRef::<()>::attest(proc_id.actor_addr("host_agent"));
2047            Ok(TestHandle {
2048                id: proc_id,
2049                addr: forwarder_addr,
2050                agent,
2051                mode: self.mode,
2052                omit_addr: self.omit_addr,
2053                omit_agent: self.omit_agent,
2054            })
2055        }
2056    }
2057
2058    #[tokio::test]
2059    async fn host_spawn_times_out_when_configured() {
2060        let cfg = hyperactor_config::global::lock();
2061        let _g = cfg.override_key(
2062            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2063            Duration::from_millis(10),
2064        );
2065
2066        let mut host = Host::new(
2067            TestManager::local(ReadyMode::OkAfter(Duration::from_millis(50))),
2068            ChannelAddr::any(ChannelTransport::Local),
2069        )
2070        .await
2071        .unwrap();
2072
2073        let err = host.spawn("t".into(), ()).await.expect_err("must time out");
2074        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
2075    }
2076
2077    #[tokio::test]
2078    async fn host_spawn_timeout_zero_disables_and_succeeds() {
2079        let cfg = hyperactor_config::global::lock();
2080        let _g = cfg.override_key(
2081            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2082            Duration::from_secs(0),
2083        );
2084
2085        let mut host = Host::new(
2086            TestManager::local(ReadyMode::OkAfter(Duration::from_millis(20))),
2087            ChannelAddr::any(ChannelTransport::Local),
2088        )
2089        .await
2090        .unwrap();
2091
2092        let (pid, agent) = host.spawn("ok".into(), ()).await.expect("must succeed");
2093        assert_eq!(agent.actor_addr().proc_addr(), pid);
2094        assert!(host.procs.contains("ok"));
2095    }
2096
2097    #[tokio::test]
2098    async fn host_spawn_maps_channel_closed_ready_error_to_config_failure() {
2099        let mut host = Host::new(
2100            TestManager::local(ReadyMode::ErrChannelClosed),
2101            ChannelAddr::any(ChannelTransport::Local),
2102        )
2103        .await
2104        .unwrap();
2105
2106        let err = host.spawn("p".into(), ()).await.expect_err("must fail");
2107        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
2108    }
2109
2110    #[tokio::test]
2111    async fn host_spawn_maps_terminal_ready_error_to_config_failure() {
2112        let mut host = Host::new(
2113            TestManager::local(ReadyMode::ErrTerminal),
2114            ChannelAddr::any(ChannelTransport::Local),
2115        )
2116        .await
2117        .unwrap();
2118
2119        let err = host.spawn("p".into(), ()).await.expect_err("must fail");
2120        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
2121    }
2122
2123    #[tokio::test]
2124    async fn host_spawn_fails_if_ready_but_missing_addr() {
2125        let mut host = Host::new(
2126            TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(true, false),
2127            ChannelAddr::any(ChannelTransport::Local),
2128        )
2129        .await
2130        .unwrap();
2131
2132        let err = host
2133            .spawn("no-addr".into(), ())
2134            .await
2135            .expect_err("must fail");
2136        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
2137    }
2138
2139    #[tokio::test]
2140    async fn host_spawn_fails_if_ready_but_missing_agent() {
2141        let mut host = Host::new(
2142            TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(false, true),
2143            ChannelAddr::any(ChannelTransport::Local),
2144        )
2145        .await
2146        .unwrap();
2147
2148        let err = host
2149            .spawn("no-agent".into(), ())
2150            .await
2151            .expect_err("must fail");
2152        assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
2153    }
2154
2155    #[tokio::test]
2156    async fn test_duplex_remote_proc() {
2157        // Create a host with a duplex server.
2158        let proc_manager =
2159            LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
2160        let mut host = Host::new_with_default(
2161            proc_manager,
2162            ChannelAddr::any(ChannelTransport::Unix),
2163            None,
2164            None,
2165        )
2166        .await
2167        .unwrap();
2168        host.serve().unwrap();
2169
2170        let remote_proc = Proc::attach_to_host(host.addr().clone()).await.unwrap();
2171        assert_eq!(remote_proc.proc_addr().addr(), host.addr());
2172
2173        // (1) Host -> remote: open a port on the remote proc, send from
2174        //     the system instance.
2175        let (system_inst, _h) = host.system_proc().client("test-sender").unwrap();
2176        let (remote_inst, _rh) = remote_proc.client("remote-client").unwrap();
2177
2178        let (remote_port, mut remote_rx) = remote_inst.mailbox().open_port();
2179        let remote_port = remote_port.bind();
2180
2181        remote_port.post(&system_inst, "hello-to-remote".to_string());
2182
2183        let arrived: String = tokio::time::timeout(Duration::from_secs(5), remote_rx.recv())
2184            .await
2185            .expect("timed out waiting for message on remote rx")
2186            .expect("recv failed");
2187        assert_eq!(arrived, "hello-to-remote");
2188
2189        // (2) Remote -> host: open a port on the system instance, send
2190        //     from the remote instance.
2191        let (host_port, mut host_rx) = system_inst.mailbox().open_port();
2192        let host_port = host_port.bind();
2193
2194        host_port.post(&remote_inst, "hello-from-remote".to_string());
2195
2196        let arrived: String = tokio::time::timeout(Duration::from_secs(5), host_rx.recv())
2197            .await
2198            .expect("timed out waiting for inbound message")
2199            .expect("recv failed");
2200        assert_eq!(arrived, "hello-from-remote");
2201    }
2202
2203    #[tokio::test]
2204    async fn test_duplex_undeliverable_from_client() {
2205        // Attached client sends to a nonexistent actor on the host's
2206        // service proc. The message travels client → duplex → host →
2207        // service proc (actor not found) → undeliverable back through
2208        // duplex → client's collector actor.
2209        let proc_manager =
2210            LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
2211        let mut host = Host::new_with_default(
2212            proc_manager,
2213            ChannelAddr::any(ChannelTransport::Unix),
2214            None,
2215            None,
2216        )
2217        .await
2218        .unwrap();
2219        host.serve().unwrap();
2220
2221        let remote_proc = Proc::attach_to_host(host.addr().clone()).await.unwrap();
2222
2223        // Spawn a collector on the remote proc.
2224        let (undlv_tx, mut undlv_rx) = mpsc::unbounded_channel();
2225        let handle = remote_proc
2226            .spawn("collector", UndeliverableCollector { tx: undlv_tx })
2227            .unwrap();
2228        let collector_ref = handle.bind::<UndeliverableCollector>();
2229
2230        // Tell the collector to send to a nonexistent actor on the
2231        // host's service proc.
2232        let bogus_actor = host.system_proc().proc_addr().actor_addr("no-such-actor");
2233        let bogus_port = bogus_actor.port_addr(Port::from(0u64));
2234        let bogus_dest = PortRef::<String>::attest(bogus_port);
2235
2236        let (trigger_inst, _h) = remote_proc.client("trigger").unwrap();
2237        collector_ref
2238            .port::<SendTo>()
2239            .post(&trigger_inst, bogus_dest);
2240
2241        let undeliverable = tokio::time::timeout(Duration::from_secs(5), undlv_rx.recv())
2242            .await
2243            .expect("timed out waiting for undeliverable")
2244            .expect("channel closed");
2245
2246        assert_eq!(
2247            undeliverable
2248                .into_message()
2249                .expect("expected returned envelope")
2250                .dest()
2251                .actor_id(),
2252            bogus_actor.id()
2253        );
2254    }
2255
2256    #[tokio::test]
2257    async fn test_duplex_undeliverable_from_host() {
2258        // Host sends to a nonexistent actor on the attached remote
2259        // proc. The message travels host → overlay (finds remote proc)
2260        // → duplex → remote proc (actor not found) → undeliverable
2261        // back through duplex → host's collector actor.
2262        let proc_manager =
2263            LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
2264        let mut host = Host::new_with_default(
2265            proc_manager,
2266            ChannelAddr::any(ChannelTransport::Unix),
2267            None,
2268            None,
2269        )
2270        .await
2271        .unwrap();
2272        host.serve().unwrap();
2273
2274        let remote_proc = Proc::attach_to_host(host.addr().clone()).await.unwrap();
2275
2276        // Spawn a collector on the host's service proc.
2277        let (undlv_tx, mut undlv_rx) = mpsc::unbounded_channel();
2278        let handle = host
2279            .system_proc()
2280            .spawn("collector", UndeliverableCollector { tx: undlv_tx })
2281            .unwrap();
2282        let collector_ref = handle.bind::<UndeliverableCollector>();
2283
2284        // Tell the collector to send to a nonexistent actor on the
2285        // attached remote proc.
2286        let bogus_actor = remote_proc.proc_addr().actor_addr("ghost-actor");
2287        let bogus_port = bogus_actor.port_addr(Port::from(0u64));
2288        let bogus_dest = PortRef::<String>::attest(bogus_port);
2289
2290        let (trigger_inst, _h) = host.system_proc().client("trigger").unwrap();
2291        collector_ref
2292            .port::<SendTo>()
2293            .post(&trigger_inst, bogus_dest);
2294
2295        let undeliverable = tokio::time::timeout(Duration::from_secs(5), undlv_rx.recv())
2296            .await
2297            .expect("timed out waiting for undeliverable")
2298            .expect("channel closed");
2299
2300        assert_eq!(
2301            undeliverable
2302                .into_message()
2303                .expect("expected returned envelope")
2304                .dest()
2305                .actor_id(),
2306            bogus_actor.id()
2307        );
2308    }
2309
2310    #[tokio::test]
2311    async fn test_duplex_teardown() {
2312        // Start a host, attach a remote proc, and exchange a message
2313        // to confirm routing is live. Then stop the serve handle and
2314        // assert it completes within a bounded time, indicating the
2315        // accept loop and per-connection tasks drained cleanly.
2316        let proc_manager =
2317            LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
2318        let mut host = Host::new_with_default(
2319            proc_manager,
2320            ChannelAddr::any(ChannelTransport::Unix),
2321            None,
2322            None,
2323        )
2324        .await
2325        .unwrap();
2326        let serve_handle = host.serve().unwrap();
2327
2328        // Second call must fail with AlreadyServing.
2329        assert!(matches!(host.serve(), Err(HostError::AlreadyServing)));
2330
2331        let remote_proc = Proc::attach_to_host(host.addr().clone()).await.unwrap();
2332
2333        let (system_inst, _h) = host.system_proc().client("teardown-sender").unwrap();
2334        let (remote_inst, _rh) = remote_proc.client("teardown-client").unwrap();
2335
2336        let (remote_port, mut remote_rx) = remote_inst.mailbox().open_port();
2337        let remote_port = remote_port.bind();
2338        remote_port.post(&system_inst, "pre-stop".to_string());
2339        let arrived: String = tokio::time::timeout(Duration::from_secs(5), remote_rx.recv())
2340            .await
2341            .expect("timed out waiting for message on remote rx")
2342            .expect("recv failed");
2343        assert_eq!(arrived, "pre-stop");
2344
2345        serve_handle.stop("teardown");
2346        tokio::time::timeout(Duration::from_secs(5), serve_handle)
2347            .await
2348            .expect("timed out waiting for serve handle to resolve")
2349            .expect("serve task panicked")
2350            .expect("serve task returned error");
2351    }
2352
2353    /// Repro for the OSS broken-link issue: when the host's duplex
2354    /// frontend shuts down with messages still on the wire, the
2355    /// simplex peer must see a clean close (and pending acks must
2356    /// flush) rather than retry-looping for `MESSAGE_DELIVERY_TIMEOUT`.
2357    ///
2358    /// Before the fix: the peer's `NetTx` got no acks for in-flight
2359    /// messages and no `Closed` response, so it spent the full 30 s
2360    /// `MESSAGE_DELIVERY_TIMEOUT` reconnecting against a dead host.
2361    ///
2362    /// This test posts a message, then stops the serve handle and
2363    /// asserts the simplex `NetTx` transitions to `Closed` quickly —
2364    /// well under `MESSAGE_DELIVERY_TIMEOUT`.
2365    #[tokio::test]
2366    async fn test_simplex_peer_sees_clean_close_on_host_shutdown() {
2367        let proc_manager = LocalProcManager::new(|proc: Proc| async move {
2368            proc.spawn::<EchoActor>("host_agent", EchoActor)
2369        });
2370        let mut host = Host::new_with_default(
2371            proc_manager,
2372            ChannelAddr::any(ChannelTransport::Unix),
2373            None,
2374            None,
2375        )
2376        .await
2377        .unwrap();
2378        let serve_handle = host.serve().unwrap();
2379
2380        // Spawn an EchoActor and send a request from a simplex client.
2381        let echo_handle = host
2382            .system_proc()
2383            .spawn::<EchoActor>("echo", EchoActor)
2384            .unwrap();
2385        let echo_ref = echo_handle.bind::<EchoActor>();
2386
2387        let dial_router = DialMailboxRouter::new();
2388        dial_router.bind(
2389            Addr::from(host.system_proc().proc_addr().clone()),
2390            host.addr().clone(),
2391        );
2392        let client_addr = ChannelAddr::any(ChannelTransport::Unix);
2393        let (client_listen_addr, client_rx) = channel::serve(client_addr).unwrap();
2394        let client_proc_id = ResourceId::proc_addr_from_name(client_listen_addr, "client");
2395        let client_proc = Proc::configured(client_proc_id, dial_router.into_boxed());
2396        let _client_handle = client_proc.clone().serve(client_rx);
2397
2398        let (client_inst, _h) = client_proc.client("requester").unwrap();
2399        let (reply_port, reply_handle) = client_inst.mailbox().open_once_port::<ActorAddr>();
2400        let reply_port = reply_port.bind();
2401        echo_ref
2402            .port::<OncePortRef<ActorAddr>>()
2403            .post(&client_inst, reply_port);
2404        let _ = tokio::time::timeout(Duration::from_secs(5), reply_handle.recv())
2405            .await
2406            .expect("baseline round-trip timed out")
2407            .expect("baseline recv failed");
2408
2409        // Snapshot the client's outbound NetTx status before shutdown.
2410        let host_tx = channel::dial::<MessageEnvelope>(host.addr().clone()).unwrap();
2411        // Push one message so the lazy-connect kicks in.
2412        let dummy_dest = host
2413            .system_proc()
2414            .proc_addr()
2415            .actor_addr("noop")
2416            .port_addr(Port::from(0u64));
2417        let envelope = MessageEnvelope::serialize(
2418            client_inst.self_addr().clone(),
2419            dummy_dest,
2420            &"warmup".to_string(),
2421            Default::default(),
2422        )
2423        .unwrap();
2424        host_tx.post(envelope);
2425        // Wait briefly for connection to establish.
2426        tokio::time::sleep(Duration::from_millis(200)).await;
2427        assert!(matches!(*host_tx.status().borrow(), TxStatus::Active));
2428
2429        // Shut down the host's frontend. The fix ensures pending
2430        // recv-side acks are flushed AND a `Closed` response is sent,
2431        // so the simplex peer transitions to `Closed` promptly.
2432        serve_handle.stop("test shutdown");
2433        let _ = tokio::time::timeout(Duration::from_secs(5), serve_handle)
2434            .await
2435            .expect("serve handle did not resolve");
2436
2437        // The simplex peer should see Closed within a few seconds —
2438        // not the full MESSAGE_DELIVERY_TIMEOUT (30 s). Wait for the
2439        // status watch to flip.
2440        let mut status = host_tx.status().clone();
2441        tokio::time::timeout(Duration::from_secs(10), async {
2442            loop {
2443                if let TxStatus::Closed(_) = *status.borrow() {
2444                    return;
2445                }
2446                if status.changed().await.is_err() {
2447                    return;
2448                }
2449            }
2450        })
2451        .await
2452        .expect("simplex peer did not see Closed within 10s of host shutdown");
2453
2454        match &*host_tx.status().borrow() {
2455            TxStatus::Closed(_) => {}
2456            other => panic!("expected TxStatus::Closed, got {:?}", other),
2457        }
2458    }
2459
2460    /// Stress repro: many simplex clients send rapid request+reply
2461    /// traffic to the host's duplex frontend and the host shuts down
2462    /// while traffic is in flight. This mirrors the OSS test pattern
2463    /// where `HostMeshShutdownGuard::drop` sends `ShutdownHost`.
2464    #[tokio::test]
2465    async fn test_simplex_clients_during_host_shutdown() {
2466        let proc_manager = LocalProcManager::new(|proc: Proc| async move {
2467            proc.spawn::<EchoActor>("host_agent", EchoActor)
2468        });
2469        let mut host = Host::new_with_default(
2470            proc_manager,
2471            ChannelAddr::any(ChannelTransport::Unix),
2472            None,
2473            None,
2474        )
2475        .await
2476        .unwrap();
2477        let serve_handle = host.serve().unwrap();
2478
2479        let echo_handle = host
2480            .system_proc()
2481            .spawn::<EchoActor>("echo", EchoActor)
2482            .unwrap();
2483        let echo_ref = echo_handle.bind::<EchoActor>();
2484        let host_addr = host.addr().clone();
2485        let echo_actor_id = echo_ref.actor_addr().clone();
2486        let system_proc_id = host.system_proc().proc_addr().clone();
2487
2488        // Spawn N clients, each sending M requests.
2489        const N_CLIENTS: usize = 4;
2490        const M_REQUESTS: usize = 5;
2491
2492        let mut client_tasks = Vec::new();
2493        for ci in 0..N_CLIENTS {
2494            let host_addr = host_addr.clone();
2495            let echo_actor_id = echo_actor_id.clone();
2496            let system_proc_id = system_proc_id.clone();
2497            client_tasks.push(tokio::spawn(async move {
2498                let dial_router = DialMailboxRouter::new();
2499                dial_router.bind(Addr::from(system_proc_id.clone()), host_addr);
2500                let client_addr = ChannelAddr::any(ChannelTransport::Unix);
2501                let (client_listen_addr, client_rx) = channel::serve(client_addr).unwrap();
2502                let client_proc_id =
2503                    ResourceId::proc_addr_from_name(client_listen_addr, format!("client-{}", ci));
2504                let client_proc = Proc::configured(client_proc_id, dial_router.into_boxed());
2505                let _client_handle = client_proc.clone().serve(client_rx);
2506
2507                let echo_ref = ActorRef::<EchoActor>::attest(echo_actor_id);
2508
2509                for ri in 0..M_REQUESTS {
2510                    let (client_inst, _h) = client_proc.client(&format!("req-{}", ri)).unwrap();
2511                    let (reply_port, reply_handle) =
2512                        client_inst.mailbox().open_once_port::<ActorAddr>();
2513                    let reply_port = reply_port.bind();
2514                    echo_ref
2515                        .port::<OncePortRef<ActorAddr>>()
2516                        .post(&client_inst, reply_port);
2517                    let received =
2518                        tokio::time::timeout(Duration::from_secs(10), reply_handle.recv())
2519                            .await
2520                            .expect("timeout waiting for reply")
2521                            .expect("recv failed");
2522                    assert_eq!(received, *echo_ref.actor_addr());
2523                }
2524            }));
2525        }
2526
2527        for task in client_tasks {
2528            task.await.unwrap();
2529        }
2530
2531        // Shut down. The handle must resolve cleanly.
2532        serve_handle.stop("test cleanup");
2533        tokio::time::timeout(Duration::from_secs(10), serve_handle)
2534            .await
2535            .expect("serve handle did not resolve")
2536            .expect("serve task panicked")
2537            .expect("serve task error");
2538    }
2539
2540    /// Repro for the broken-link errors seen in OSS Python tests:
2541    /// an external simplex `Proc::direct` dialing the host's duplex
2542    /// frontend should be able to round-trip a request + reply.
2543    #[tokio::test]
2544    async fn test_simplex_client_to_duplex_host() {
2545        let proc_manager = LocalProcManager::new(|proc: Proc| async move {
2546            proc.spawn::<EchoActor>("host_agent", EchoActor)
2547        });
2548        let mut host = Host::new_with_default(
2549            proc_manager,
2550            ChannelAddr::any(ChannelTransport::Unix),
2551            None,
2552            None,
2553        )
2554        .await
2555        .unwrap();
2556        let _serve_handle = host.serve().unwrap();
2557
2558        // Spawn an EchoActor on the host's system_proc.
2559        let echo_handle = host
2560            .system_proc()
2561            .spawn::<EchoActor>("echo", EchoActor)
2562            .unwrap();
2563        let echo_ref = echo_handle.bind::<EchoActor>();
2564
2565        // Create an external simplex client proc with a dial router
2566        // bound to the host's frontend address. This mirrors what the
2567        // Python "root client" does: a `Proc::direct` whose forwarder
2568        // is a `DialMailboxRouter` with the host's frontend address as
2569        // a route to the host's procs.
2570        let client_addr = ChannelAddr::any(ChannelTransport::Unix);
2571        let dial_router = DialMailboxRouter::new();
2572        dial_router.bind(
2573            Addr::from(host.system_proc().proc_addr().clone()),
2574            host.addr().clone(),
2575        );
2576        let (client_listen_addr, client_rx) = channel::serve(client_addr).unwrap();
2577        let client_proc_id = ResourceId::proc_addr_from_name(client_listen_addr, "external-client");
2578        let client_proc = Proc::configured(client_proc_id, dial_router.into_boxed());
2579        let _client_handle = client_proc.clone().serve(client_rx);
2580
2581        let (client_inst, _client_h) = client_proc.client("requester").unwrap();
2582
2583        // Send a request to the echo actor on the host. The reply
2584        // travels back through the host's dial router → simplex dial
2585        // → client's frontend.
2586        let (reply_port, reply_handle) = client_inst.mailbox().open_once_port::<ActorAddr>();
2587        let reply_port = reply_port.bind();
2588        echo_ref
2589            .port::<OncePortRef<ActorAddr>>()
2590            .post(&client_inst, reply_port);
2591
2592        let received = tokio::time::timeout(Duration::from_secs(10), reply_handle.recv())
2593            .await
2594            .expect("timed out waiting for reply")
2595            .expect("recv failed");
2596        assert_eq!(received, *echo_ref.actor_addr());
2597    }
2598}