hyperactor_mesh/
bootstrap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::env::VarError;
12use std::fmt;
13use std::fs::OpenOptions;
14use std::future;
15use std::io;
16use std::io::Write;
17use std::os::unix::process::ExitStatusExt;
18use std::path::Path;
19use std::path::PathBuf;
20use std::process::Stdio;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::OnceLock;
24use std::time::Duration;
25use std::time::SystemTime;
26
27use async_trait::async_trait;
28use base64::prelude::*;
29use futures::StreamExt;
30use futures::stream;
31use humantime::format_duration;
32use hyperactor::ActorId;
33use hyperactor::ActorRef;
34use hyperactor::Named;
35use hyperactor::ProcId;
36use hyperactor::attrs::Attrs;
37use hyperactor::channel;
38use hyperactor::channel::ChannelAddr;
39use hyperactor::channel::ChannelError;
40use hyperactor::channel::ChannelTransport;
41use hyperactor::channel::Rx;
42use hyperactor::channel::Tx;
43use hyperactor::clock::Clock;
44use hyperactor::clock::RealClock;
45use hyperactor::config::CONFIG;
46use hyperactor::config::ConfigAttr;
47use hyperactor::config::global as config;
48use hyperactor::config::global::override_or_global;
49use hyperactor::context;
50use hyperactor::declare_attrs;
51use hyperactor::host::Host;
52use hyperactor::host::HostError;
53use hyperactor::host::ProcHandle;
54use hyperactor::host::ProcManager;
55use hyperactor::host::TerminateSummary;
56use hyperactor::mailbox::IntoBoxedMailboxSender;
57use hyperactor::mailbox::MailboxClient;
58use hyperactor::mailbox::MailboxServer;
59use hyperactor::proc::Proc;
60use serde::Deserialize;
61use serde::Serialize;
62use tempfile::TempDir;
63use tokio::process::Child;
64use tokio::process::ChildStderr;
65use tokio::process::ChildStdout;
66use tokio::process::Command;
67use tokio::sync::oneshot;
68use tokio::sync::watch;
69use tracing::Instrument;
70use tracing::Level;
71
72use crate::logging::OutputTarget;
73use crate::logging::StreamFwder;
74use crate::proc_mesh::mesh_agent::ProcMeshAgent;
75use crate::resource;
76use crate::v1;
77use crate::v1::host_mesh::mesh_agent::HostAgentMode;
78use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
79
80mod mailbox;
81
82declare_attrs! {
83    /// Enable forwarding child stdout/stderr over the mesh log
84    /// channel.
85    ///
86    /// When `true` (default): child stdio is piped; [`StreamFwder`]
87    /// mirrors output to the parent console and forwards bytes to the
88    /// log channel so a `LogForwardActor` can receive them.
89    ///
90    /// When `false`: no channel forwarding occurs. Child stdio may
91    /// still be piped if [`MESH_ENABLE_FILE_CAPTURE`] is `true` or
92    /// [`MESH_TAIL_LOG_LINES`] > 0; otherwise the child inherits the
93    /// parent stdio (no interception).
94    ///
95    /// This flag does not affect console mirroring: child output
96    /// always reaches the parent console—either via inheritance (no
97    /// piping) or via [`StreamFwder`] when piping is active.
98    @meta(CONFIG = ConfigAttr {
99        env_name: Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
100        py_name: Some("enable_log_forwarding".to_string()),
101    })
102    pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
103
104    /// When `true`: if stdio is piped, each child's `StreamFwder`
105    /// also forwards lines to a host-scoped `FileAppender` managed by
106    /// the `BootstrapProcManager`. That appender creates exactly two
107    /// files per manager instance—one for stdout and one for
108    /// stderr—and **all** child processes' lines are multiplexed into
109    /// those two files. This can be combined with
110    /// [`MESH_ENABLE_LOG_FORWARDING`] ("stream+local").
111    ///
112    /// Notes:
113    /// - The on-disk files are *aggregate*, not per-process.
114    ///   Disambiguation is via the optional rank prefix (see
115    ///   `PREFIX_WITH_RANK`), which `StreamFwder` prepends to lines
116    ///   before writing.
117    /// - On local runs, file capture is suppressed unless
118    ///   `FORCE_FILE_LOG=true`. In that case `StreamFwder` still
119    ///   runs, but the `FileAppender` may be `None` and no files are
120    ///   written.
121    /// - `MESH_TAIL_LOG_LINES` only controls the in-memory rotating
122    ///   buffer used for peeking—independent of file capture.
123    @meta(CONFIG = ConfigAttr {
124        env_name: Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
125        py_name: Some("enable_file_capture".to_string()),
126    })
127    pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
128
129    /// Maximum number of log lines retained in a proc's stderr/stdout
130    /// tail buffer. Used by [`StreamFwder`] when wiring child
131    /// pipes. Default: 100
132    @meta(CONFIG = ConfigAttr {
133        env_name: Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
134        py_name: Some("tail_log_lines".to_string()),
135    })
136    pub attr MESH_TAIL_LOG_LINES: usize = 0;
137
138    /// If enabled (default), bootstrap child processes install
139    /// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
140    /// parent dies unexpectedly. This is a **production safety net**
141    /// against leaked children; tests usually disable it via
142    /// `std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG",
143    /// "false")`.
144    @meta(CONFIG = ConfigAttr {
145        env_name: Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()),
146        py_name: None,
147    })
148    pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
149
150    /// Maximum number of child terminations to run concurrently
151    /// during bulk shutdown. Prevents unbounded spawning of
152    /// termination tasks (which could otherwise spike CPU, I/O, or
153    /// file descriptor load).
154    @meta(CONFIG = ConfigAttr {
155        env_name: Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()),
156        py_name: None,
157    })
158    pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
159
160    /// Per-child grace window for termination. When a shutdown is
161    /// requested, the manager sends SIGTERM and waits this long for
162    /// the child to exit before escalating to SIGKILL.
163    @meta(CONFIG = ConfigAttr {
164        env_name: Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()),
165        py_name: None,
166    })
167    pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
168}
169
170pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
171pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
172pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
173/// A channel used by each process to receive its own stdout and stderr
174/// Because stdout and stderr can only be obtained by the parent process,
175/// they need to be streamed back to the process.
176pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
177
178/// Messages sent from the process to the allocator. This is an envelope
179/// containing the index of the process (i.e., its "address" assigned by
180/// the allocator), along with the control message in question.
181#[derive(Debug, Clone, Serialize, Deserialize, Named)]
182pub(crate) struct Process2Allocator(pub usize, pub Process2AllocatorMessage);
183
184/// Control messages sent from processes to the allocator.
185#[derive(Debug, Clone, Serialize, Deserialize, Named)]
186pub(crate) enum Process2AllocatorMessage {
187    /// Initialize a process2allocator session. The process is
188    /// listening on the provided channel address, to which
189    /// [`Allocator2Process`] messages are sent.
190    Hello(ChannelAddr),
191
192    /// A proc with the provided ID was started. Its mailbox is
193    /// served at the provided channel address. Procs are started
194    /// after instruction by the allocator through the corresponding
195    /// [`Allocator2Process`] message.
196    StartedProc(ProcId, ActorRef<ProcMeshAgent>, ChannelAddr),
197
198    Heartbeat,
199}
200
201/// Messages sent from the allocator to a process.
202#[derive(Debug, Clone, Serialize, Deserialize, Named)]
203pub(crate) enum Allocator2Process {
204    /// Request to start a new proc with the provided ID, listening
205    /// to an address on the indicated channel transport.
206    StartProc(ProcId, ChannelTransport),
207
208    /// A request for the process to shut down its procs and exit the
209    /// process with the provided code.
210    StopAndExit(i32),
211
212    /// A request for the process to immediately exit with the provided
213    /// exit code
214    Exit(i32),
215}
216
217async fn exit_if_missed_heartbeat(bootstrap_index: usize, bootstrap_addr: ChannelAddr) {
218    let tx = match channel::dial(bootstrap_addr.clone()) {
219        Ok(tx) => tx,
220
221        Err(err) => {
222            tracing::error!(
223                "Failed to establish heartbeat connection to allocator, exiting! (addr: {:?}): {}",
224                bootstrap_addr,
225                err
226            );
227            std::process::exit(1);
228        }
229    };
230    tracing::info!(
231        "Heartbeat connection established to allocator (idx: {bootstrap_index}, addr: {bootstrap_addr:?})",
232    );
233    loop {
234        RealClock.sleep(Duration::from_secs(5)).await;
235
236        let result = tx
237            .send(Process2Allocator(
238                bootstrap_index,
239                Process2AllocatorMessage::Heartbeat,
240            ))
241            .await;
242
243        if let Err(err) = result {
244            tracing::error!(
245                "Heartbeat failed to allocator, exiting! (addr: {:?}): {}",
246                bootstrap_addr,
247                err
248            );
249            std::process::exit(1);
250        }
251    }
252}
253
254#[macro_export]
255macro_rules! ok {
256    ($expr:expr $(,)?) => {
257        match $expr {
258            Ok(value) => value,
259            Err(e) => return ::anyhow::Error::from(e),
260        }
261    };
262}
263
264async fn halt<R>() -> R {
265    future::pending::<()>().await;
266    unreachable!()
267}
268
269/// Bootstrap configures how a mesh process starts up.
270///
271/// Both `Proc` and `Host` variants may include an optional
272/// configuration snapshot (`hyperactor::config::Attrs`). This
273/// snapshot is serialized into the bootstrap payload and made
274/// available to the child. Interpretation and application of that
275/// snapshot is up to the child process; if omitted, the child falls
276/// back to environment/default values.
277#[derive(Clone, Default, Debug, Serialize, Deserialize)]
278pub enum Bootstrap {
279    /// "v1" proc bootstrap
280    Proc {
281        /// The ProcId of the proc to be bootstrapped.
282        proc_id: ProcId,
283        /// The backend address to which messages are forwarded.
284        /// See [`hyperactor::host`] for channel topology details.
285        backend_addr: ChannelAddr,
286        /// The callback address used to indicate successful spawning.
287        callback_addr: ChannelAddr,
288        /// Directory for storing proc socket files. Procs place their sockets
289        /// in this directory, so that they can be looked up by other procs
290        /// for direct transfer.
291        socket_dir_path: PathBuf,
292        /// Optional config snapshot (`hyperactor::config::Attrs`)
293        /// captured by the parent. If present, the child installs it
294        /// as the `ClientOverride` layer so the parent's effective config
295        /// takes precedence over Defaults.
296        config: Option<Attrs>,
297    },
298
299    /// Host bootstrap. This sets up a new `Host`, managed by a
300    /// [`crate::v1::host_mesh::mesh_agent::HostMeshAgent`].
301    Host {
302        /// The address on which to serve the host.
303        addr: ChannelAddr,
304        /// If specified, use the provided command instead of
305        /// [`BootstrapCommand::current`].
306        command: Option<BootstrapCommand>,
307        /// Optional config snapshot (`hyperactor::config::Attrs`)
308        /// captured by the parent. If present, the child installs it
309        /// as the `ClientOverride` layer so the parent's effective config
310        /// takes precedence over Defaults.
311        config: Option<Attrs>,
312    },
313
314    #[default]
315    V0ProcMesh, // pass through to the v0 allocator
316}
317
318impl Bootstrap {
319    /// Serialize the mode into a environment-variable-safe string by
320    /// base64-encoding its JSON representation.
321    #[allow(clippy::result_large_err)]
322    fn to_env_safe_string(&self) -> v1::Result<String> {
323        Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
324    }
325
326    /// Deserialize the mode from the representation returned by [`to_env_safe_string`].
327    #[allow(clippy::result_large_err)]
328    fn from_env_safe_string(str: &str) -> v1::Result<Self> {
329        let data = BASE64_STANDARD.decode(str)?;
330        let data = std::str::from_utf8(&data)?;
331        Ok(serde_json::from_str(data)?)
332    }
333
334    /// Get a bootstrap configuration from the environment; returns `None`
335    /// if the environment does not specify a boostrap config.
336    pub fn get_from_env() -> anyhow::Result<Option<Self>> {
337        match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
338            Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
339                Ok(mode) => Ok(Some(mode)),
340                Err(e) => {
341                    Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
342                }
343            },
344            Err(VarError::NotPresent) => Ok(None),
345            Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
346        }
347    }
348
349    /// Inject this bootstrap configuration into the environment of the provided command.
350    pub fn to_env(&self, cmd: &mut Command) {
351        cmd.env(
352            "HYPERACTOR_MESH_BOOTSTRAP_MODE",
353            self.to_env_safe_string().unwrap(),
354        );
355    }
356
357    /// Bootstrap this binary according to this configuration.
358    /// This either runs forever, or returns an error.
359    pub async fn bootstrap(self) -> anyhow::Error {
360        tracing::info!(
361            "bootstrapping mesh process: {}",
362            serde_json::to_string(&self).unwrap()
363        );
364
365        if Debug::is_active() {
366            let mut buf = Vec::new();
367            writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
368            #[cfg(unix)]
369            writeln!(
370                &mut buf,
371                "\tparent pid: {}",
372                std::os::unix::process::parent_id()
373            )
374            .unwrap();
375            writeln!(
376                &mut buf,
377                "\tconfig: {}",
378                serde_json::to_string(&self).unwrap()
379            )
380            .unwrap();
381            match std::env::current_exe() {
382                Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
383                Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
384            }
385            writeln!(&mut buf, "\targs:").unwrap();
386            for arg in std::env::args() {
387                writeln!(&mut buf, "\t\t{}", arg).unwrap();
388            }
389            writeln!(&mut buf, "\tenv:").unwrap();
390            for (key, val) in std::env::vars() {
391                writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
392            }
393            let _ = Debug.write(&buf);
394            if let Ok(s) = std::str::from_utf8(&buf) {
395                tracing::info!("{}", s);
396            } else {
397                tracing::info!("{:?}", buf);
398            }
399        }
400
401        match self {
402            Bootstrap::Proc {
403                proc_id,
404                backend_addr,
405                callback_addr,
406                socket_dir_path,
407                config,
408            } => {
409                let entered = tracing::span!(
410                    Level::INFO,
411                    "proc_bootstrap",
412                    %proc_id,
413                    %backend_addr,
414                    %callback_addr,
415                    socket_dir_path = %socket_dir_path.display(),
416                )
417                .entered();
418                if let Some(attrs) = config {
419                    config::set(config::Source::ClientOverride, attrs);
420                    tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)");
421                } else {
422                    tracing::debug!("bootstrap: no config snapshot provided (Proc)");
423                }
424
425                if hyperactor::config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
426                    // Safety net: normal shutdown is via
427                    // `host_mesh.shutdown(&instance)`; PR_SET_PDEATHSIG
428                    // is a last-resort guard against leaks if that
429                    // protocol is bypassed.
430                    let _ = install_pdeathsig_kill();
431                } else {
432                    eprintln!("(bootstrap) PDEATHSIG disabled via config");
433                }
434
435                let (local_addr, name) = ok!(proc_id
436                    .as_direct()
437                    .ok_or_else(|| anyhow::anyhow!("invalid proc id type: {}", proc_id)));
438                // TODO provide a direct way to construct these
439                let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
440                let serve_addr = serve_addr.parse().unwrap();
441
442                // The following is a modified host::spawn_proc to support direct
443                // dialing between local procs: 1) we bind each proc to a deterministic
444                // address in socket_dir_path; 2) we use LocalProcDialer to dial these
445                // addresses for local procs.
446                let proc_sender = mailbox::LocalProcDialer::new(
447                    local_addr.clone(),
448                    socket_dir_path,
449                    ok!(MailboxClient::dial(backend_addr)),
450                );
451
452                let proc = Proc::new(proc_id.clone(), proc_sender.into_boxed());
453
454                let span = entered.exit();
455
456                let agent_handle = ok!(ProcMeshAgent::boot_v1(proc.clone())
457                    .instrument(span.clone())
458                    .await
459                    .map_err(|e| HostError::AgentSpawnFailure(proc_id, e)));
460
461                // Finally serve the proc on the same transport as the backend address,
462                // and call back.
463                let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr));
464                proc.clone().serve(proc_rx);
465                ok!(ok!(channel::dial(callback_addr))
466                    .send((proc_addr, agent_handle.bind::<ProcMeshAgent>()))
467                    .instrument(span)
468                    .await
469                    .map_err(ChannelError::from));
470
471                halt().await
472            }
473            Bootstrap::Host {
474                addr,
475                command,
476                config,
477            } => {
478                if let Some(attrs) = config {
479                    config::set(config::Source::Runtime, attrs);
480                    tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
481                } else {
482                    tracing::debug!("bootstrap: no config snapshot provided (Host)");
483                }
484
485                let command = match command {
486                    Some(command) => command,
487                    None => ok!(BootstrapCommand::current()),
488                };
489                let manager = BootstrapProcManager::new(command).unwrap();
490
491                let (host, _handle) = ok!(Host::serve(manager, addr).await);
492                let addr = host.addr().clone();
493                let host_mesh_agent = ok!(host
494                    .system_proc()
495                    .clone()
496                    .spawn::<HostMeshAgent>("agent", HostAgentMode::Process(host))
497                    .await);
498
499                tracing::info!(
500                    "serving host at {}, agent: {}",
501                    addr,
502                    host_mesh_agent.bind::<HostMeshAgent>()
503                );
504                halt().await
505            }
506            Bootstrap::V0ProcMesh => bootstrap_v0_proc_mesh().await,
507        }
508    }
509
510    /// A variant of [`bootstrap`] that logs the error and exits the process
511    /// if bootstrapping fails.
512    pub async fn bootstrap_or_die(self) -> ! {
513        let err = self.bootstrap().await;
514        tracing::error!("failed to bootstrap mesh process: {}", err);
515        std::process::exit(1)
516    }
517}
518
519/// Install "kill me if parent dies" and close the race window.
520pub fn install_pdeathsig_kill() -> io::Result<()> {
521    #[cfg(target_os = "linux")]
522    {
523        // SAFETY: Calling into libc; does not dereference memory, just
524        // asks the kernel to deliver SIGKILL on parent death.
525        let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
526        if rc != 0 {
527            return Err(io::Error::last_os_error());
528        }
529    }
530    // Race-close: if the parent died between our exec and prctl(),
531    // we won't get a signal, so detect that and exit now.
532    //
533    // If getppid() == 1, we've already been reparented (parent gone).
534    // SAFETY: `getppid()` is a simple libc syscall returning the
535    // parent PID; it has no side effects and does not touch memory.
536    if unsafe { libc::getppid() } == 1 {
537        std::process::exit(0);
538    }
539    Ok(())
540}
541
542/// Represents the lifecycle state of a **proc as hosted in an OS
543/// process** managed by `BootstrapProcManager`.
544///
545/// Note: This type is deliberately distinct from [`ProcState`] and
546/// [`ProcStopReason`] (see `alloc.rs`). Those types model allocator
547/// *events* - e.g. "a proc was Created/Running/Stopped" - and are
548/// consumed from an event stream during allocation. By contrast,
549/// [`ProcStatus`] is a **live, queryable view**: it reflects the
550/// current observed status of a running proc, as seen through the
551/// [`BootstrapProcHandle`] API (stop, kill, status).
552///
553/// In short:
554/// - `ProcState`/`ProcStopReason`: historical / event-driven model
555/// - `ProcStatus`: immediate status surface for lifecycle control
556#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
557pub enum ProcStatus {
558    /// The OS process has been spawned but is not yet fully running.
559    /// (Process-level: child handle exists, no confirmation yet.)
560    Starting,
561    /// The OS process is alive and considered running.
562    /// (Process-level: `pid` is known; Proc-level: bootstrap
563    /// may still be running.)
564    Running { pid: u32, started_at: SystemTime },
565    /// Ready means bootstrap has completed and the proc is serving.
566    /// (Process-level: `pid` is known; Proc-level: bootstrap
567    /// completed.)
568    Ready {
569        pid: u32,
570        started_at: SystemTime,
571        addr: ChannelAddr,
572        agent: ActorRef<ProcMeshAgent>,
573    },
574    /// A stop has been requested (SIGTERM, graceful shutdown, etc.),
575    /// but the OS process has not yet fully exited. (Proc-level:
576    /// shutdown in progress; Process-level: still running.)
577    Stopping { pid: u32, started_at: SystemTime },
578    /// The process exited with a normal exit code. (Process-level:
579    /// exit observed.)
580    Stopped {
581        exit_code: i32,
582        stderr_tail: Vec<String>,
583    },
584    /// The process was killed by a signal (e.g. SIGKILL).
585    /// (Process-level: abnormal termination.)
586    Killed { signal: i32, core_dumped: bool },
587    /// The proc or its process failed for some other reason
588    /// (bootstrap error, unexpected condition, etc.). (Both levels:
589    /// catch-all failure.)
590    Failed { reason: String },
591}
592
593impl ProcStatus {
594    /// Returns `true` if the proc is in a terminal (exited) state:
595    /// [`ProcStatus::Stopped`], [`ProcStatus::Killed`], or
596    /// [`ProcStatus::Failed`].
597    #[inline]
598    pub fn is_exit(&self) -> bool {
599        matches!(
600            self,
601            ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
602        )
603    }
604}
605
606impl std::fmt::Display for ProcStatus {
607    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
608        match self {
609            ProcStatus::Starting => write!(f, "Starting"),
610            ProcStatus::Running { pid, started_at } => {
611                let uptime = started_at
612                    .elapsed()
613                    .map(|d| format!(" up {}", format_duration(d)))
614                    .unwrap_or_default();
615                write!(f, "Running[{pid}]{uptime}")
616            }
617            ProcStatus::Ready {
618                pid,
619                started_at,
620                addr,
621                ..
622            } => {
623                let uptime = started_at
624                    .elapsed()
625                    .map(|d| format!(" up {}", format_duration(d)))
626                    .unwrap_or_default();
627                write!(f, "Ready[{pid}] at {addr}{uptime}")
628            }
629            ProcStatus::Stopping { pid, started_at } => {
630                let uptime = started_at
631                    .elapsed()
632                    .map(|d| format!(" up {}", format_duration(d)))
633                    .unwrap_or_default();
634                write!(f, "Stopping[{pid}]{uptime}")
635            }
636            ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
637            ProcStatus::Killed {
638                signal,
639                core_dumped,
640            } => {
641                if *core_dumped {
642                    write!(f, "Killed(sig={signal}, core)")
643                } else {
644                    write!(f, "Killed(sig={signal})")
645                }
646            }
647            ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
648        }
649    }
650}
651
652/// Error returned by [`BootstrapProcHandle::ready`].
653#[derive(Debug, Clone)]
654pub enum ReadyError {
655    /// The proc reached a terminal state before `Ready`.
656    Terminal(ProcStatus),
657    /// The internal watch channel closed unexpectedly.
658    ChannelClosed,
659}
660
661impl std::fmt::Display for ReadyError {
662    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
663        match self {
664            ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
665            ReadyError::ChannelClosed => write!(f, "status channel closed"),
666        }
667    }
668}
669impl std::error::Error for ReadyError {}
670
671/// A handle to a proc launched by [`BootstrapProcManager`].
672///
673/// `BootstrapProcHandle` is a lightweight supervisor for an external
674/// process: it tracks and broadcasts lifecycle state, and exposes a
675/// small control/observation surface. While it may temporarily hold a
676/// `tokio::process::Child` (shared behind a mutex) so the exit
677/// monitor can `wait()` it, it is **not** the unique owner of the OS
678/// process, and dropping a `BootstrapProcHandle` does not by itself
679/// terminate the process.
680///
681/// What it pairs together:
682/// - the **logical proc identity** (`ProcId`)
683/// - the **live status surface** ([`ProcStatus`]), available both as
684///   a synchronous snapshot (`status()`) and as an async stream via a
685///   `tokio::sync::watch` channel (`watch()` / `changed()`)
686///
687/// Responsibilities:
688/// - Retain the child handle only until the exit monitor claims it,
689///   so the OS process can be awaited and its terminal status
690///   recorded.
691/// - Hold stdout/stderr tailers until the exit monitor takes them,
692///   then join to recover buffered output for diagnostics.
693/// - Update status via the `mark_*` transitions and broadcast changes
694///   over the watch channel so tasks can `await` lifecycle
695///   transitions without polling.
696/// - Provide the foundation for higher-level APIs like `wait()`
697///   (await terminal) and, later, `terminate()` / `kill()`.
698///
699/// Notes:
700/// - Manager-level cleanup happens in [`BootstrapProcManager::drop`]:
701///   it SIGKILLs any still-recorded PIDs; we do not rely on
702///   `Child::kill_on_drop`.
703///
704/// Relationship to types:
705/// - [`ProcStatus`]: live status surface, updated by this handle.
706/// - [`ProcState`]/[`ProcStopReason`] (in `alloc.rs`):
707///   allocator-facing, historical event log; not directly updated by
708///   this type.
709#[derive(Clone)]
710pub struct BootstrapProcHandle {
711    /// Logical identity of the proc in the mesh.
712    proc_id: ProcId,
713    /// Live lifecycle snapshot (see [`ProcStatus`]). Kept in a mutex
714    /// so [`BootstrapProcHandle::status`] can return a synchronous
715    /// copy. All mutations now flow through
716    /// [`BootstrapProcHandle::transition`], which updates this field
717    /// under the lock and then broadcasts on the watch channel.
718    status: Arc<std::sync::Mutex<ProcStatus>>,
719    /// Underlying OS child handle. Held only until the exit monitor
720    /// claims it (consumed by `wait()` there). Not relied on for
721    /// teardown; manager `Drop` handles best-effort SIGKILL.
722    child: Arc<std::sync::Mutex<Option<Child>>>,
723    /// Stdout monitor for this proc. Created with `StreamFwder::start`, it
724    /// forwards output to a log channel and keeps a bounded ring buffer.
725    /// Transferred to the exit monitor, which joins it after `wait()`
726    /// to recover buffered lines.
727    stdout_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
728    /// Stderr monitor for this proc. Same behavior as `stdout_fwder`
729    /// but for stderr (used for exit-reason enrichment).
730    stderr_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
731    /// Watch sender for status transitions. Every `mark_*` goes
732    /// through [`BootstrapProcHandle::transition`], which updates the
733    /// snapshot under the lock and then `send`s the new
734    /// [`ProcStatus`].
735    tx: tokio::sync::watch::Sender<ProcStatus>,
736    /// Watch receiver seed. `watch()` clones this so callers can
737    /// `borrow()` the current status and `changed().await` future
738    /// transitions independently.
739    rx: tokio::sync::watch::Receiver<ProcStatus>,
740}
741
742impl fmt::Debug for BootstrapProcHandle {
743    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
744        let status = self.status.lock().expect("status mutex poisoned").clone();
745        f.debug_struct("BootstrapProcHandle")
746            .field("proc_id", &self.proc_id)
747            .field("status", &status)
748            .field("child", &"<child>")
749            .field("tx", &"<watch::Sender>")
750            .field("rx", &"<watch::Receiver>")
751            // Intentionally skip stdout_tailer / stderr_tailer (not
752            // Debug).
753            .finish()
754    }
755}
756
757// Locking invariant:
758// - Do not acquire other locks from inside `transition(...)` (it
759//   holds the status mutex).
760// - If you need the child PID during a transition, snapshot it
761//   *before* calling `transition`.
762impl BootstrapProcHandle {
763    /// Construct a new [`BootstrapProcHandle`] for a freshly spawned
764    /// OS process hosting a proc.
765    ///
766    /// - Initializes the status to [`ProcStatus::Starting`] since the
767    ///   child process has been created but not yet confirmed running.
768    /// - Wraps the provided [`Child`] handle in an
769    ///   `Arc<Mutex<Option<Child>>>` so lifecycle methods (`wait`,
770    ///   `terminate`, etc.) can consume it later.
771    ///
772    /// This is the canonical entry point used by
773    /// `BootstrapProcManager` when it launches a proc into a new
774    /// process.
775    pub fn new(proc_id: ProcId, child: Child) -> Self {
776        let (tx, rx) = watch::channel(ProcStatus::Starting);
777        Self {
778            proc_id,
779            status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
780            child: Arc::new(std::sync::Mutex::new(Some(child))),
781            stdout_fwder: Arc::new(std::sync::Mutex::new(None)),
782            stderr_fwder: Arc::new(std::sync::Mutex::new(None)),
783            tx,
784            rx,
785        }
786    }
787
788    /// Return the logical proc identity in the mesh.
789    #[inline]
790    pub fn proc_id(&self) -> &ProcId {
791        &self.proc_id
792    }
793
794    /// Create a new subscription to this proc's status stream.
795    ///
796    /// Each call returns a fresh [`watch::Receiver`] tied to this
797    /// handle's internal [`ProcStatus`] channel. The receiver can be
798    /// awaited on (`rx.changed().await`) to observe lifecycle
799    /// transitions as they occur.
800    ///
801    /// Notes:
802    /// - Multiple subscribers can exist simultaneously; each sees
803    ///   every status update in order.
804    /// - Use [`BootstrapProcHandle::status`] for a one-off snapshot;
805    ///   use `watch()` when you need to await changes over time.
806    #[inline]
807    pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
808        self.rx.clone()
809    }
810
811    /// Wait until this proc's status changes.
812    ///
813    /// This is a convenience wrapper around
814    /// [`watch::Receiver::changed`]: it subscribes internally via
815    /// [`BootstrapProcHandle::watch`] and awaits the next transition.
816    /// If no subscribers exist or the channel is closed, this returns
817    /// without error.
818    ///
819    /// Typical usage:
820    /// ```ignore
821    /// handle.changed().await;
822    /// match handle.status() {
823    ///     ProcStatus::Running { .. } => { /* now running */ }
824    ///     ProcStatus::Stopped { .. } => { /* exited */ }
825    ///     _ => {}
826    /// }
827    /// ```
828    #[inline]
829    pub async fn changed(&self) {
830        let _ = self.watch().changed().await;
831    }
832
833    /// Return the OS process ID (`pid`) for this proc.
834    ///
835    /// If the proc is currently `Running`, this returns the cached
836    /// pid even if the `Child` has already been taken by the exit
837    /// monitor. Otherwise, it falls back to `Child::id()` if the
838    /// handle is still present. Returns `None` once the proc has
839    /// exited or if the handle has been consumed.
840    #[inline]
841    pub fn pid(&self) -> Option<u32> {
842        match *self.status.lock().expect("status mutex poisoned") {
843            ProcStatus::Running { pid, .. } | ProcStatus::Ready { pid, .. } => Some(pid),
844            _ => self
845                .child
846                .lock()
847                .expect("child mutex poisoned")
848                .as_ref()
849                .and_then(|c| c.id()),
850        }
851    }
852
853    /// Return a snapshot of the current [`ProcStatus`] for this proc.
854    ///
855    /// This is a *live view* of the lifecycle state as tracked by
856    /// [`BootstrapProcManager`]. It reflects what is currently known
857    /// about the underlying OS process (e.g., `Starting`, `Running`,
858    /// `Stopping`, etc.).
859    ///
860    /// Internally this reads the mutex-guarded status. Use this when
861    /// you just need a synchronous snapshot; use
862    /// [`BootstrapProcHandle::watch`] or
863    /// [`BootstrapProcHandle::changed`] if you want to await
864    /// transitions asynchronously.
865    #[must_use]
866    pub fn status(&self) -> ProcStatus {
867        // Source of truth for now is the mutex. We broadcast via
868        // `watch` in `transition`, but callers that want a
869        // synchronous snapshot should read the guarded value.
870        self.status.lock().expect("status mutex poisoned").clone()
871    }
872
873    /// Atomically apply a state transition while holding the status
874    /// lock, and send the updated value on the watch channel **while
875    /// still holding the lock**. This guarantees the mutex state and
876    /// the broadcast value stay in sync and avoids reordering between
877    /// concurrent transitions.
878    fn transition<F>(&self, f: F) -> bool
879    where
880        F: FnOnce(&mut ProcStatus) -> bool,
881    {
882        let mut guard = self.status.lock().expect("status mutex poisoned");
883        let _before = guard.clone();
884        let changed = f(&mut guard);
885        if changed {
886            // Publish while still holding the lock to preserve order.
887            let _ = self.tx.send(guard.clone());
888        }
889        changed
890    }
891
892    /// Transition this proc into the [`ProcStatus::Running`] state.
893    ///
894    /// Called internally once the child OS process has been spawned
895    /// and we can observe a valid `pid`. Records the `pid` and the
896    /// `started_at` timestamp so that callers can query them later
897    /// via [`BootstrapProcHandle::status`] or
898    /// [`BootstrapProcHandle::pid`].
899    ///
900    /// This is a best-effort marker: it reflects that the process
901    /// exists at the OS level, but does not guarantee that the proc
902    /// has completed bootstrap or is fully ready.
903    pub(crate) fn mark_running(&self, pid: u32, started_at: SystemTime) -> bool {
904        self.transition(|st| match *st {
905            ProcStatus::Starting => {
906                *st = ProcStatus::Running { pid, started_at };
907                true
908            }
909            _ => {
910                tracing::warn!(
911                    "illegal transition: {:?} -> Running; leaving status unchanged",
912                    *st
913                );
914                false
915            }
916        })
917    }
918
919    /// Attempt to transition this proc into the [`ProcStatus::Ready`]
920    /// state.
921    ///
922    /// This records the process ID, start time, listening address,
923    /// and agent once the proc has successfully started and is ready
924    /// to serve.
925    ///
926    /// Returns `true` if the transition succeeded (from `Starting` or
927    /// `Running`), or `false` if the current state did not allow
928    /// moving to `Ready`. In the latter case the state is left
929    /// unchanged and a warning is logged.
930    pub(crate) fn mark_ready(
931        &self,
932        pid: u32,
933        started_at: SystemTime,
934        addr: ChannelAddr,
935        agent: ActorRef<ProcMeshAgent>,
936    ) -> bool {
937        tracing::info!(proc_id = %self.proc_id, %addr, "{} running on pid {}", self.proc_id, pid);
938        self.transition(|st| match st {
939            ProcStatus::Starting | ProcStatus::Running { .. } => {
940                *st = ProcStatus::Ready {
941                    pid,
942                    started_at,
943                    addr,
944                    agent,
945                };
946                true
947            }
948            _ => {
949                tracing::warn!(
950                    "illegal transition: {:?} -> Ready; leaving status unchanged",
951                    st
952                );
953                false
954            }
955        })
956    }
957
958    /// Record that a stop has been requested for the proc (e.g. a
959    /// graceful shutdown via SIGTERM), but the underlying process has
960    /// not yet fully exited.
961    pub(crate) fn mark_stopping(&self) -> bool {
962        // Avoid taking out additional locks in .transition().
963        let child_pid = self.child_pid_snapshot();
964        let now = hyperactor::clock::RealClock.system_time_now();
965
966        self.transition(|st| match *st {
967            ProcStatus::Running { pid, started_at } => {
968                *st = ProcStatus::Stopping { pid, started_at };
969                true
970            }
971            ProcStatus::Ready {
972                pid, started_at, ..
973            } => {
974                *st = ProcStatus::Stopping { pid, started_at };
975                true
976            }
977            ProcStatus::Starting => {
978                if let Some(pid) = child_pid {
979                    *st = ProcStatus::Stopping {
980                        pid,
981                        started_at: now,
982                    };
983                    true
984                } else {
985                    false
986                }
987            }
988            _ => false,
989        })
990    }
991
992    /// Record that the process has exited normally with the given
993    /// exit code.
994    pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
995        self.transition(|st| match *st {
996            ProcStatus::Starting
997            | ProcStatus::Running { .. }
998            | ProcStatus::Ready { .. }
999            | ProcStatus::Stopping { .. } => {
1000                *st = ProcStatus::Stopped {
1001                    exit_code,
1002                    stderr_tail,
1003                };
1004                true
1005            }
1006            _ => {
1007                tracing::warn!(
1008                    "illegal transition: {:?} -> Stopped; leaving status unchanged",
1009                    *st
1010                );
1011                false
1012            }
1013        })
1014    }
1015
1016    /// Record that the process was killed by the given signal (e.g.
1017    /// SIGKILL, SIGTERM).
1018    pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
1019        self.transition(|st| match *st {
1020            ProcStatus::Starting
1021            | ProcStatus::Running { .. }
1022            | ProcStatus::Ready { .. }
1023            | ProcStatus::Stopping { .. } => {
1024                *st = ProcStatus::Killed {
1025                    signal,
1026                    core_dumped,
1027                };
1028                true
1029            }
1030            _ => {
1031                tracing::warn!(
1032                    "illegal transition: {:?} -> Killed; leaving status unchanged",
1033                    *st
1034                );
1035                false
1036            }
1037        })
1038    }
1039
1040    /// Record that the proc or its process failed for an unexpected
1041    /// reason (bootstrap error, spawn failure, etc.).
1042    pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
1043        self.transition(|st| match *st {
1044            ProcStatus::Starting
1045            | ProcStatus::Running { .. }
1046            | ProcStatus::Ready { .. }
1047            | ProcStatus::Stopping { .. } => {
1048                *st = ProcStatus::Failed {
1049                    reason: reason.into(),
1050                };
1051                true
1052            }
1053            _ => {
1054                tracing::warn!(
1055                    "illegal transition: {:?} -> Failed; leaving status unchanged",
1056                    *st
1057                );
1058                false
1059            }
1060        })
1061    }
1062
1063    /// Wait until the proc has reached a terminal state and return
1064    /// it.
1065    ///
1066    /// Terminal means [`ProcStatus::Stopped`],
1067    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]. If the
1068    /// current status is already terminal, returns immediately.
1069    ///
1070    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
1071    /// owner of the OS process, so you can call `wait()` from
1072    /// multiple tasks concurrently.
1073    ///
1074    /// Implementation detail: listens on this handle's `watch`
1075    /// channel. It snapshots the current status, and if not terminal
1076    /// awaits the next change. If the channel closes unexpectedly,
1077    /// returns the last observed status.
1078    ///
1079    /// Mirrors `tokio::process::Child::wait()`, but yields the
1080    /// higher-level [`ProcStatus`] instead of an `ExitStatus`.
1081    #[must_use]
1082    pub async fn wait_inner(&self) -> ProcStatus {
1083        let mut rx = self.watch();
1084        loop {
1085            let st = rx.borrow().clone();
1086            if st.is_exit() {
1087                return st;
1088            }
1089            // If the channel closes, return the last observed value.
1090            if rx.changed().await.is_err() {
1091                return st;
1092            }
1093        }
1094    }
1095
1096    /// Wait until the proc reaches the [`ProcStatus::Ready`] state.
1097    ///
1098    /// If the proc hits a terminal state ([`ProcStatus::Stopped`],
1099    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]) before ever
1100    /// becoming `Ready`, this returns
1101    /// `Err(ReadyError::Terminal(status))`. If the internal watch
1102    /// channel closes unexpectedly, this returns
1103    /// `Err(ReadyError::ChannelClosed)`. Otherwise it returns
1104    /// `Ok(())` when `Ready` is first observed.
1105    ///
1106    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
1107    /// owner; multiple tasks may await `ready()` concurrently.
1108    /// `Stopping` is not treated as terminal here; we continue
1109    /// waiting until `Ready` or a terminal state is seen.
1110    ///
1111    /// Companion to [`BootstrapProcHandle::wait_inner`]:
1112    /// `wait_inner()` resolves on exit; `ready_inner()` resolves on
1113    /// startup.
1114    pub async fn ready_inner(&self) -> Result<(), ReadyError> {
1115        let mut rx = self.watch();
1116        loop {
1117            let st = rx.borrow().clone();
1118            match &st {
1119                ProcStatus::Ready { .. } => return Ok(()),
1120                s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1121                _non_terminal => {
1122                    if rx.changed().await.is_err() {
1123                        return Err(ReadyError::ChannelClosed);
1124                    }
1125                }
1126            }
1127        }
1128    }
1129
1130    /// Best-effort, non-blocking snapshot of the child's PID. Locks
1131    /// only the `child` mutex and never touches `status`.
1132    fn child_pid_snapshot(&self) -> Option<u32> {
1133        self.child
1134            .lock()
1135            .expect("child mutex poisoned")
1136            .as_ref()
1137            .and_then(|c| c.id())
1138    }
1139
1140    /// Try to fetch a PID we can signal. Prefer cached PID from
1141    /// status; fall back to Child::id() if still present. None once
1142    /// terminal or if monitor already consumed the Child and we never
1143    /// cached a pid.
1144    fn signalable_pid(&self) -> Option<i32> {
1145        match &*self.status.lock().expect("status mutex poisoned") {
1146            ProcStatus::Running { pid, .. }
1147            | ProcStatus::Ready { pid, .. }
1148            | ProcStatus::Stopping { pid, .. } => Some(*pid as i32),
1149            _ => self
1150                .child
1151                .lock()
1152                .expect("child mutex poisoned")
1153                .as_ref()
1154                .and_then(|c| c.id())
1155                .map(|p| p as i32),
1156        }
1157    }
1158
1159    /// Send a signal to the pid. Returns Ok(()) if delivered. If
1160    /// ESRCH (no such process), we treat that as "already gone" and
1161    /// return Ok(()) so callers can proceed to observe the terminal
1162    /// state through the monitor.
1163    fn send_signal(pid: i32, sig: i32) -> Result<(), anyhow::Error> {
1164        // SAFETY: Sending a POSIX signal; does not dereference
1165        // memory.
1166        let rc = unsafe { libc::kill(pid, sig) };
1167        if rc == 0 {
1168            Ok(())
1169        } else {
1170            let e = std::io::Error::last_os_error();
1171            if let Some(libc::ESRCH) = e.raw_os_error() {
1172                // Process already gone; proceed to wait/observe
1173                // terminal state.
1174                Ok(())
1175            } else {
1176                Err(anyhow::anyhow!("kill({pid}, {sig}) failed: {e}"))
1177            }
1178        }
1179    }
1180
1181    pub fn set_stream_monitors(&self, out: Option<StreamFwder>, err: Option<StreamFwder>) {
1182        *self
1183            .stdout_fwder
1184            .lock()
1185            .expect("stdout_tailer mutex poisoned") = out;
1186        *self
1187            .stderr_fwder
1188            .lock()
1189            .expect("stderr_tailer mutex poisoned") = err;
1190    }
1191
1192    fn take_stream_monitors(&self) -> (Option<StreamFwder>, Option<StreamFwder>) {
1193        let out = self
1194            .stdout_fwder
1195            .lock()
1196            .expect("stdout_tailer mutex poisoned")
1197            .take();
1198        let err = self
1199            .stderr_fwder
1200            .lock()
1201            .expect("stderr_tailer mutex poisoned")
1202            .take();
1203        (out, err)
1204    }
1205
1206    /// Sends a StopAll message to the ProcMeshAgent, which should exit the process.
1207    /// Waits for the successful state change of the process. If the process
1208    /// doesn't reach a terminal state, returns Err.
1209    async fn send_stop_all(
1210        &self,
1211        cx: &impl context::Actor,
1212        agent: ActorRef<ProcMeshAgent>,
1213        timeout: Duration,
1214    ) -> anyhow::Result<ProcStatus> {
1215        // For all of the messages and replies in this function:
1216        // if the proc is already dead, then the message will be undeliverable,
1217        // which should be ignored.
1218        // If this message isn't deliverable to the agent, the process may have
1219        // stopped already. No need to produce any errors, just continue with
1220        // killing the process.
1221        let mut agent_port = agent.port();
1222        agent_port.return_undeliverable(false);
1223        agent_port.send(cx, resource::StopAll {})?;
1224        // The agent handling Stop should exit the process, if it doesn't within
1225        // the time window, we escalate to SIGTERM.
1226        match RealClock.timeout(timeout, self.wait()).await {
1227            Ok(Ok(st)) => Ok(st),
1228            Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1229            Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
1230        }
1231    }
1232}
1233
1234#[async_trait]
1235impl hyperactor::host::ProcHandle for BootstrapProcHandle {
1236    type Agent = ProcMeshAgent;
1237    type TerminalStatus = ProcStatus;
1238
1239    #[inline]
1240    fn proc_id(&self) -> &ProcId {
1241        &self.proc_id
1242    }
1243
1244    #[inline]
1245    fn addr(&self) -> Option<ChannelAddr> {
1246        match &*self.status.lock().expect("status mutex poisoned") {
1247            ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1248            _ => None,
1249        }
1250    }
1251
1252    #[inline]
1253    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1254        match &*self.status.lock().expect("status mutex poisoned") {
1255            ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1256            _ => None,
1257        }
1258    }
1259
1260    /// Wait until this proc first reaches the [`ProcStatus::Ready`]
1261    /// state.
1262    ///
1263    /// Returns `Ok(())` once `Ready` is observed.
1264    ///
1265    /// If the proc transitions directly to a terminal state before
1266    /// becoming `Ready`, returns `Err(ReadyError::Terminal(status))`.
1267    ///
1268    /// If the internal status watch closes unexpectedly before
1269    /// `Ready` is observed, returns `Err(ReadyError::ChannelClosed)`.
1270    async fn ready(&self) -> Result<(), hyperactor::host::ReadyError<Self::TerminalStatus>> {
1271        match self.ready_inner().await {
1272            Ok(()) => Ok(()),
1273            Err(ReadyError::Terminal(status)) => {
1274                Err(hyperactor::host::ReadyError::Terminal(status))
1275            }
1276            Err(ReadyError::ChannelClosed) => Err(hyperactor::host::ReadyError::ChannelClosed),
1277        }
1278    }
1279
1280    /// Wait until this proc reaches a terminal [`ProcStatus`].
1281    ///
1282    /// Returns `Ok(status)` when a terminal state is observed
1283    /// (`Stopped`, `Killed`, or `Failed`).
1284    ///
1285    /// If the internal status watch closes before any terminal state
1286    /// is seen, returns `Err(WaitError::ChannelClosed)`.
1287    async fn wait(&self) -> Result<Self::TerminalStatus, hyperactor::host::WaitError> {
1288        let status = self.wait_inner().await;
1289        if status.is_exit() {
1290            Ok(status)
1291        } else {
1292            Err(hyperactor::host::WaitError::ChannelClosed)
1293        }
1294    }
1295
1296    /// Attempt to terminate the underlying OS process.
1297    ///
1298    /// This drives **process-level** teardown only:
1299    /// - Sends `SIGTERM` to the process.
1300    /// - Waits up to `timeout` for it to exit cleanly.
1301    /// - Escalates to `SIGKILL` if still alive, then waits a short
1302    ///   hard-coded grace period (`HARD_WAIT_AFTER_KILL`) to ensure
1303    ///   the process is reaped.
1304    ///
1305    /// If the process was already in a terminal state when called,
1306    /// returns [`TerminateError::AlreadyTerminated`].
1307    ///
1308    /// # Notes
1309    /// - This does *not* attempt a graceful proc-level stop via
1310    ///   `ProcMeshAgent` or other actor messaging. That integration
1311    ///   will come later once proc-level control is wired up.
1312    /// - Errors may also be returned if the process PID cannot be
1313    ///   determined, if signal delivery fails, or if the status
1314    ///   channel is closed unexpectedly.
1315    ///
1316    /// # Parameters
1317    /// - `timeout`: Grace period to wait after `SIGTERM` before
1318    ///   escalating.
1319    ///
1320    /// # Returns
1321    /// - `Ok(ProcStatus)` if the process exited during the
1322    ///   termination sequence.
1323    /// - `Err(TerminateError)` if already exited, signaling failed,
1324    ///   or the channel was lost.
1325    async fn terminate(
1326        &self,
1327        cx: &impl context::Actor,
1328        timeout: Duration,
1329    ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1330        const HARD_WAIT_AFTER_KILL: Duration = Duration::from_secs(5);
1331
1332        // If already terminal, return that.
1333        let st0 = self.status();
1334        if st0.is_exit() {
1335            tracing::debug!(?st0, "terminate(): already terminal");
1336            return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1337        }
1338
1339        // Find a PID to signal.
1340        let pid = self.signalable_pid().ok_or_else(|| {
1341            let st = self.status();
1342            tracing::warn!(?st, "terminate(): no signalable pid");
1343            hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1344                "no signalable pid (state: {:?})",
1345                st
1346            ))
1347        })?;
1348
1349        // Best-effort mark "Stopping" (ok if state races).
1350
1351        // Before sending SIGTERM, try to close actors normally. Only works if
1352        // they are in the Ready state and have an Agent we can message.
1353        let agent = self.agent_ref();
1354        if let Some(agent) = agent {
1355            match self.send_stop_all(cx, agent.clone(), timeout).await {
1356                Ok(st) => return Ok(st),
1357                Err(e) => {
1358                    // Variety of possible errors, proceed with SIGTERM.
1359                    tracing::warn!(
1360                        "ProcMeshAgent {} could not successfully stop all actors: {}",
1361                        agent.actor_id(),
1362                        e,
1363                    );
1364                }
1365            }
1366        }
1367        // If the stop all actors message was unsuccessful, we need
1368        // to actually stop the process.
1369        let _ = self.mark_stopping();
1370
1371        // Send SIGTERM (ESRCH is treated as "already gone").
1372        tracing::info!(pid, ?timeout, "terminate(): sending SIGTERM");
1373        if let Err(e) = Self::send_signal(pid, libc::SIGTERM) {
1374            tracing::warn!(pid, error=%e, "terminate(): SIGTERM delivery failed");
1375            return Err(hyperactor::host::TerminateError::Io(e));
1376        }
1377        tracing::debug!(pid, "terminate(): SIGTERM sent");
1378
1379        // Wait up to the timeout for a terminal state.
1380        match RealClock.timeout(timeout, self.wait_inner()).await {
1381            Ok(st) if st.is_exit() => {
1382                tracing::info!(pid, ?st, "terminate(): exited after SIGTERM");
1383                Ok(st)
1384            }
1385            Ok(non_exit) => {
1386                // wait_inner() should only resolve terminal; treat as
1387                // channel issue.
1388                tracing::warn!(pid, ?non_exit, "terminate(): wait returned non-terminal");
1389                Err(hyperactor::host::TerminateError::ChannelClosed)
1390            }
1391            Err(_elapsed) => {
1392                // Escalate to SIGKILL
1393                tracing::warn!(pid, "terminate(): timeout; escalating to SIGKILL");
1394                if let Some(pid2) = self.signalable_pid() {
1395                    if let Err(e) = Self::send_signal(pid2, libc::SIGKILL) {
1396                        tracing::warn!(pid=pid2, error=%e, "terminate(): SIGKILL delivery failed");
1397                        return Err(hyperactor::host::TerminateError::Io(e));
1398                    }
1399                    tracing::info!(pid = pid2, "terminate(): SIGKILL sent");
1400                } else {
1401                    tracing::warn!("terminate(): lost pid before SIGKILL escalation");
1402                }
1403                // Hard bound after KILL so we can't hang forever.
1404                match RealClock
1405                    .timeout(HARD_WAIT_AFTER_KILL, self.wait_inner())
1406                    .await
1407                {
1408                    Ok(st) if st.is_exit() => {
1409                        tracing::info!(?st, "terminate(): exited after SIGKILL");
1410                        Ok(st)
1411                    }
1412                    other => {
1413                        tracing::warn!(
1414                            ?other,
1415                            "terminate(): post-KILL wait did not yield terminal"
1416                        );
1417                        Err(hyperactor::host::TerminateError::ChannelClosed)
1418                    }
1419                }
1420            }
1421        }
1422    }
1423
1424    /// Forcibly kill the underlying OS process with `SIGKILL`.
1425    ///
1426    /// This bypasses any graceful shutdown semantics and immediately
1427    /// delivers a non-catchable `SIGKILL` to the child. It is
1428    /// intended as a last-resort termination mechanism when
1429    /// `terminate()` fails or when no grace period is desired.
1430    ///
1431    /// # Behavior
1432    /// - If the process was already in a terminal state, returns
1433    ///   [`TerminateError::AlreadyTerminated`].
1434    /// - Otherwise attempts to send `SIGKILL` to the current PID.
1435    /// - Then waits for the exit monitor to observe a terminal state.
1436    ///
1437    /// # Notes
1438    /// - This is strictly an **OS-level kill**. It does *not* attempt
1439    ///   proc-level shutdown via `ProcMeshAgent` or actor messages.
1440    ///   That integration will be layered in later.
1441    /// - Errors may be returned if the PID cannot be determined, if
1442    ///   signal delivery fails, or if the status channel closes
1443    ///   unexpectedly.
1444    ///
1445    /// # Returns
1446    /// - `Ok(ProcStatus)` if the process exited after `SIGKILL`.
1447    /// - `Err(TerminateError)` if already exited, signaling failed,
1448    ///   or the channel was lost.
1449    async fn kill(
1450        &self,
1451    ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1452        // NOTE: This is a pure OS-level kill (SIGKILL) of the child
1453        // process. It bypasses any proc-level shutdown semantics.
1454        //
1455        // Future work: layer in proc-level stop semantics through
1456        // actor messages once those are implemented.
1457
1458        // If already terminal, return that.
1459        let st0 = self.status();
1460        if st0.is_exit() {
1461            return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1462        }
1463
1464        // Try to get a PID and send SIGKILL.
1465        let pid = self.signalable_pid().ok_or_else(|| {
1466            hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1467                "no signalable pid (state: {:?})",
1468                self.status()
1469            ))
1470        })?;
1471
1472        if let Err(e) = Self::send_signal(pid, libc::SIGKILL) {
1473            return Err(hyperactor::host::TerminateError::Io(e));
1474        }
1475
1476        // Wait for exit monitor to record terminal status.
1477        let st = self.wait_inner().await;
1478        if st.is_exit() {
1479            Ok(st)
1480        } else {
1481            Err(hyperactor::host::TerminateError::ChannelClosed)
1482        }
1483    }
1484}
1485
1486/// A specification of the command used to bootstrap procs.
1487#[derive(Debug, Named, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1488pub struct BootstrapCommand {
1489    pub program: PathBuf,
1490    pub arg0: Option<String>,
1491    pub args: Vec<String>,
1492    pub env: HashMap<String, String>,
1493}
1494
1495impl BootstrapCommand {
1496    /// Creates a bootstrap command specification to replicate the
1497    /// invocation of the currently running process.
1498    pub fn current() -> io::Result<Self> {
1499        let mut args: VecDeque<String> = std::env::args().collect();
1500        let arg0 = args.pop_front();
1501
1502        Ok(Self {
1503            program: std::env::current_exe()?,
1504            arg0,
1505            args: args.into(),
1506            env: std::env::vars().collect(),
1507        })
1508    }
1509
1510    /// Create a new `Command` reflecting this bootstrap command
1511    /// configuration.
1512    pub fn new(&self) -> Command {
1513        let mut cmd = Command::new(&self.program);
1514        if let Some(arg0) = &self.arg0 {
1515            cmd.arg0(arg0);
1516        }
1517        for arg in &self.args {
1518            cmd.arg(arg);
1519        }
1520        for (k, v) in &self.env {
1521            cmd.env(k, v);
1522        }
1523        cmd
1524    }
1525
1526    /// Bootstrap command used for testing, invoking the Buck-built
1527    /// `monarch/hyperactor_mesh/bootstrap` binary.
1528    ///
1529    /// Intended for integration tests where we need to spawn real
1530    /// bootstrap processes under proc manager control. Not available
1531    /// outside of test builds.
1532    #[cfg(test)]
1533    #[cfg(fbcode_build)]
1534    pub(crate) fn test() -> Self {
1535        Self {
1536            program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1537            arg0: None,
1538            args: vec![],
1539            env: HashMap::new(),
1540        }
1541    }
1542}
1543
1544impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1545    /// Creates a bootstrap command from the provided path.
1546    fn from(s: T) -> Self {
1547        Self {
1548            program: s.into(),
1549            arg0: None,
1550            args: vec![],
1551            env: HashMap::new(),
1552        }
1553    }
1554}
1555
1556/// A process manager for launching and supervising **bootstrap
1557/// processes** (via the [`bootstrap`] entry point).
1558///
1559/// `BootstrapProcManager` is the host-side runtime for external procs
1560/// in a hyperactor mesh. It spawns the configured bootstrap binary,
1561/// forwards each child's stdout/stderr into the logging channel,
1562/// tracks lifecycle state through [`BootstrapProcHandle`] /
1563/// [`ProcStatus`], and ensures best-effort teardown on drop.
1564///
1565/// Internally it maintains two maps:
1566/// - [`children`]: async map from [`ProcId`] to
1567///   [`BootstrapProcHandle`], used for lifecycle queries (`status`) and
1568///   exit monitoring.
1569/// - [`pid_table`]: synchronous map from [`ProcId`] to raw PIDs, used
1570///   in [`Drop`] to synchronously send `SIGKILL` to any still-running
1571///   children.
1572///
1573/// Together these provide both a queryable, real-time status surface
1574/// and a deterministic cleanup path, so no child processes are left
1575/// orphaned if the manager itself is dropped.
1576#[derive(Debug)]
1577pub struct BootstrapProcManager {
1578    /// The command specification used to bootstrap new processes.
1579    command: BootstrapCommand,
1580    /// Async registry of running children, keyed by [`ProcId`]. Holds
1581    /// [`BootstrapProcHandle`]s so callers can query or monitor
1582    /// status.
1583    children: Arc<tokio::sync::Mutex<HashMap<ProcId, BootstrapProcHandle>>>,
1584    /// Synchronous table of raw PIDs, keyed by [`ProcId`]. Used
1585    /// exclusively in the [`Drop`] impl to send `SIGKILL` without
1586    /// needing async context.
1587    pid_table: Arc<std::sync::Mutex<HashMap<ProcId, u32>>>,
1588    /// FileMonitor that aggregates logs from all children.
1589    /// None if file monitor creation failed.
1590    file_appender: Option<Arc<crate::logging::FileAppender>>,
1591
1592    /// Directory for storing proc socket files. Procs place their sockets
1593    /// in this directory, so that they can be looked up by other procs
1594    /// for direct transfer.
1595    socket_dir: TempDir,
1596}
1597
1598impl Drop for BootstrapProcManager {
1599    /// Drop implementation for [`BootstrapProcManager`].
1600    ///
1601    /// Ensures that when the manager itself is dropped, any child
1602    /// processes it spawned are also terminated. This is a
1603    /// best-effort cleanup mechanism: the manager iterates over its
1604    /// recorded PID table and sends `SIGKILL` to each one.
1605    ///
1606    /// Notes:
1607    /// - Uses a synchronous `Mutex` so it can be locked safely in
1608    ///   `Drop` without async context.
1609    /// - Safety: sending `SIGKILL` is low-level but safe here because
1610    ///   it only instructs the OS to terminate the process. The only
1611    ///   risk is PID reuse, in which case an unrelated process might
1612    ///   be signaled. This is accepted for shutdown semantics.
1613    /// - This makes `BootstrapProcManager` robust against leaks:
1614    ///   dropping it should not leave stray child processes running.
1615    fn drop(&mut self) {
1616        if let Ok(table) = self.pid_table.lock() {
1617            for (proc_id, pid) in table.iter() {
1618                // SAFETY: We own these PIDs because they were spawned and recorded
1619                // by this manager. Sending SIGKILL is safe here since it does not
1620                // dereference any memory; it only instructs the OS to terminate the
1621                // process. The worst-case outcome is that the PID has already exited
1622                // and been reused, in which case the signal may affect an unrelated
1623                // process. We accept that risk in Drop semantics because this path
1624                // is only used for cleanup at shutdown.
1625                unsafe {
1626                    libc::kill(*pid as i32, libc::SIGKILL);
1627                }
1628                tracing::info!(
1629                    "BootstrapProcManager::drop: sent SIGKILL to pid {} for {:?}",
1630                    pid,
1631                    proc_id
1632                );
1633            }
1634        }
1635    }
1636}
1637
1638impl BootstrapProcManager {
1639    /// Construct a new [`BootstrapProcManager`] that will launch
1640    /// procs using the given bootstrap command specification.
1641    ///
1642    /// This is the general entry point when you want to manage procs
1643    /// backed by a specific binary path (e.g. a bootstrap
1644    /// trampoline).
1645    pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1646        let file_appender = if hyperactor::config::global::get(MESH_ENABLE_FILE_CAPTURE) {
1647            match crate::logging::FileAppender::new() {
1648                Some(fm) => {
1649                    tracing::info!("file appender created successfully");
1650                    Some(Arc::new(fm))
1651                }
1652                None => {
1653                    tracing::warn!("failed to create file appender");
1654                    None
1655                }
1656            }
1657        } else {
1658            None
1659        };
1660
1661        Ok(Self {
1662            command,
1663            children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1664            pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
1665            file_appender,
1666            socket_dir: runtime_dir()?,
1667        })
1668    }
1669
1670    /// The bootstrap command used to launch processes.
1671    pub fn command(&self) -> &BootstrapCommand {
1672        &self.command
1673    }
1674
1675    /// The socket directory, where per-proc Unix sockets are placed.
1676    pub fn socket_dir(&self) -> &Path {
1677        self.socket_dir.path()
1678    }
1679
1680    /// Return the current [`ProcStatus`] for the given [`ProcId`], if
1681    /// the proc is known to this manager.
1682    ///
1683    /// This queries the live [`BootstrapProcHandle`] stored in the
1684    /// manager's internal map. It provides an immediate snapshot of
1685    /// lifecycle state (`Starting`, `Running`, `Stopping`, `Stopped`,
1686    /// etc.).
1687    ///
1688    /// Returns `None` if the manager has no record of the proc (e.g.
1689    /// never spawned here, or entry already removed).
1690    pub async fn status(&self, proc_id: &ProcId) -> Option<ProcStatus> {
1691        self.children.lock().await.get(proc_id).map(|h| h.status())
1692    }
1693
1694    fn spawn_exit_monitor(&self, proc_id: ProcId, handle: BootstrapProcHandle) {
1695        let pid_table = Arc::clone(&self.pid_table);
1696
1697        let maybe_child = {
1698            let mut guard = handle.child.lock().expect("child mutex");
1699            let taken = guard.take();
1700            debug_assert!(guard.is_none(), "no Child should remain in handles");
1701            taken
1702        };
1703
1704        let Some(mut child) = maybe_child else {
1705            tracing::debug!("proc {proc_id}: child was already taken; skipping wait");
1706            return;
1707        };
1708
1709        tokio::spawn(async move {
1710            let wait_res = child.wait().await;
1711
1712            let mut stderr_tail: Vec<String> = Vec::new();
1713            let (stdout_mon, stderr_mon) = handle.take_stream_monitors();
1714
1715            if let Some(t) = stderr_mon {
1716                let (lines, _bytes) = t.abort().await;
1717                stderr_tail = lines;
1718            }
1719            if let Some(t) = stdout_mon {
1720                let (_lines, _bytes) = t.abort().await;
1721            }
1722
1723            let tail_str = if stderr_tail.is_empty() {
1724                None
1725            } else {
1726                Some(stderr_tail.join("\n"))
1727            };
1728
1729            let remove_from_pid_table = || {
1730                let pid = if let Ok(mut table) = pid_table.lock() {
1731                    table.remove(&proc_id)
1732                } else {
1733                    None
1734                };
1735
1736                pid.map_or_else(|| "not available".to_string(), |i| format!("{i}"))
1737            };
1738
1739            match wait_res {
1740                Ok(status) => {
1741                    if let Some(sig) = status.signal() {
1742                        let _ = handle.mark_killed(sig, status.core_dumped());
1743                        let pid_str = remove_from_pid_table();
1744                        tracing::info!(
1745                            name = "ProcStatus",
1746                            status = "Exited::KilledBySignal",
1747                            %proc_id,
1748                            tail = tail_str,
1749                            "killed by signal {sig}; proc's pid: {pid_str}"
1750                        );
1751                    } else if let Some(code) = status.code() {
1752                        let _ = handle.mark_stopped(code, stderr_tail);
1753                        let pid_str = remove_from_pid_table();
1754                        tracing::info!(
1755                            name = "ProcStatus",
1756                            status = "Exited::ExitWithCode",
1757                            %proc_id,
1758                            exit_code = code,
1759                            tail = tail_str,
1760                            "proc exited; proc's pid: {pid_str}"
1761                        );
1762                    } else {
1763                        debug_assert!(
1764                            false,
1765                            "unreachable: process terminated with neither signal nor exit code"
1766                        );
1767                        let _ = handle.mark_failed("process exited with unknown status");
1768                        let pid_str = remove_from_pid_table();
1769                        tracing::info!(
1770                            name = "ProcStatus",
1771                            status = "Exited::Unknown",
1772                            %proc_id,
1773                            tail = tail_str,
1774                            "unknown exit: unreachable exit status (no code, no signal); proc's pid: {pid_str}"
1775                        );
1776                    }
1777                }
1778                Err(e) => {
1779                    let _ = handle.mark_failed(format!("wait_inner() failed: {e}"));
1780                    let pid_str = remove_from_pid_table();
1781                    tracing::info!(
1782                        name = "ProcStatus",
1783                        status = "Exited::WaitFailed",
1784                        %proc_id,
1785                        tail = tail_str,
1786                        "proc {proc_id} wait failed; proc's pid: {pid_str}"
1787                    );
1788                }
1789            }
1790        });
1791    }
1792}
1793
1794/// The configuration used for bootstrapped procs.
1795pub struct BootstrapProcConfig {
1796    /// The proc's create rank.
1797    pub create_rank: usize,
1798
1799    /// Config values to set on the spawned proc's global config,
1800    /// at the `ClientOverride` layer.
1801    pub client_config_override: Attrs,
1802}
1803
1804#[async_trait]
1805impl ProcManager for BootstrapProcManager {
1806    type Handle = BootstrapProcHandle;
1807
1808    type Config = BootstrapProcConfig;
1809
1810    /// Return the [`ChannelTransport`] used by this proc manager.
1811    ///
1812    /// For `BootstrapProcManager` this is always
1813    /// [`ChannelTransport::Unix`], since all procs are spawned
1814    /// locally on the same host and communicate over Unix domain
1815    /// sockets.
1816    fn transport(&self) -> ChannelTransport {
1817        ChannelTransport::Unix
1818    }
1819
1820    /// Launch a new proc under this [`BootstrapProcManager`].
1821    ///
1822    /// Spawns the configured bootstrap binary (`self.program`) in a
1823    /// fresh child process. The environment is populated with
1824    /// variables that describe the bootstrap context — most
1825    /// importantly `HYPERACTOR_MESH_BOOTSTRAP_MODE`, which carries a
1826    /// base64-encoded JSON [`Bootstrap::Proc`] payload (proc id,
1827    /// backend addr, callback addr, optional config snapshot).
1828    /// Additional variables like `BOOTSTRAP_LOG_CHANNEL` are also set
1829    /// up for logging and control.
1830    ///
1831    /// Responsibilities performed here:
1832    /// - Create a one-shot callback channel so the child can confirm
1833    ///   successful bootstrap and return its mailbox address plus agent
1834    ///   reference.
1835    /// - Spawn the OS process with stdout/stderr piped.
1836    /// - Stamp the new [`BootstrapProcHandle`] as
1837    ///   [`ProcStatus::Running`] once a PID is observed.
1838    /// - Wire stdout/stderr pipes into local writers and forward them
1839    ///   over the logging channel (`BOOTSTRAP_LOG_CHANNEL`).
1840    /// - Insert the handle into the manager's children map and start
1841    ///   an exit monitor to track process termination.
1842    ///
1843    /// Returns a [`BootstrapProcHandle`] that exposes the child
1844    /// process's lifecycle (status, wait/ready, termination). Errors
1845    /// are surfaced as [`HostError`].
1846    #[tracing::instrument(skip(self, config))]
1847    async fn spawn(
1848        &self,
1849        proc_id: ProcId,
1850        backend_addr: ChannelAddr,
1851        config: BootstrapProcConfig,
1852    ) -> Result<Self::Handle, HostError> {
1853        let (callback_addr, mut callback_rx) =
1854            channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1855
1856        // Decide whether we need to capture stdio.
1857        let overrides = &config.client_config_override;
1858        let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
1859        let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
1860        let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
1861        let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1862
1863        let mode = Bootstrap::Proc {
1864            proc_id: proc_id.clone(),
1865            backend_addr,
1866            callback_addr,
1867            socket_dir_path: self.socket_dir.path().to_owned(),
1868            config: Some(config.client_config_override),
1869        };
1870        let mut cmd = self.command.new();
1871        cmd.env(
1872            "HYPERACTOR_MESH_BOOTSTRAP_MODE",
1873            mode.to_env_safe_string()
1874                .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?,
1875        );
1876
1877        if need_stdio {
1878            cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1879        } else {
1880            cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit());
1881            tracing::info!(
1882                %proc_id, enable_forwarding, enable_file_capture, tail_size,
1883                "child stdio NOT captured (forwarding/file_capture/tail all disabled); inheriting parent console"
1884            );
1885        }
1886
1887        let log_channel: Option<ChannelAddr> = if enable_forwarding {
1888            let addr = ChannelAddr::any(ChannelTransport::Unix);
1889            cmd.env(BOOTSTRAP_LOG_CHANNEL, addr.to_string());
1890            Some(addr)
1891        } else {
1892            None
1893        };
1894
1895        let mut child = cmd
1896            .spawn()
1897            .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
1898        let pid = child.id().unwrap_or_default();
1899
1900        let (out_fwder, err_fwder) = if need_stdio {
1901            let stdout: ChildStdout = child.stdout.take().expect("stdout piped but missing");
1902            let stderr: ChildStderr = child.stderr.take().expect("stderr piped but missing");
1903
1904            let (file_stdout, file_stderr) = if enable_file_capture {
1905                match self.file_appender.as_deref() {
1906                    Some(fm) => (
1907                        Some(fm.addr_for(OutputTarget::Stdout)),
1908                        Some(fm.addr_for(OutputTarget::Stderr)),
1909                    ),
1910                    None => {
1911                        tracing::warn!("enable_file_capture=true but no FileAppender");
1912                        (None, None)
1913                    }
1914                }
1915            } else {
1916                (None, None)
1917            };
1918
1919            let out = StreamFwder::start(
1920                stdout,
1921                file_stdout, // Option<ChannelAddr>
1922                OutputTarget::Stdout,
1923                tail_size,
1924                log_channel.clone(), // Option<ChannelAddr>
1925                pid,
1926                config.create_rank,
1927            );
1928            let err = StreamFwder::start(
1929                stderr,
1930                file_stderr,
1931                OutputTarget::Stderr,
1932                tail_size,
1933                log_channel.clone(),
1934                pid,
1935                config.create_rank,
1936            );
1937            (Some(out), Some(err))
1938        } else {
1939            (None, None)
1940        };
1941
1942        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
1943
1944        if let Some(pid) = handle.pid() {
1945            handle.mark_running(pid, hyperactor::clock::RealClock.system_time_now());
1946            if let Ok(mut table) = self.pid_table.lock() {
1947                table.insert(proc_id.clone(), pid);
1948            }
1949        }
1950
1951        handle.set_stream_monitors(out_fwder, err_fwder);
1952
1953        // Retain handle for lifecycle mgt.
1954        {
1955            let mut children = self.children.lock().await;
1956            children.insert(proc_id.clone(), handle.clone());
1957        }
1958
1959        // Kick off an exit monitor that updates ProcStatus when the
1960        // OS process ends
1961        self.spawn_exit_monitor(proc_id.clone(), handle.clone());
1962
1963        let h = handle.clone();
1964        let pid_table = Arc::clone(&self.pid_table);
1965        tokio::spawn(async move {
1966            match callback_rx.recv().await {
1967                Ok((addr, agent)) => {
1968                    let pid = match h.pid() {
1969                        Some(p) => p,
1970                        None => {
1971                            tracing::warn!("mark_ready called with missing pid; using 0");
1972                            0
1973                        }
1974                    };
1975                    let started_at = RealClock.system_time_now();
1976                    let _ = h.mark_ready(pid, started_at, addr, agent);
1977                }
1978                Err(e) => {
1979                    // Child never called back; record failure.
1980                    let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
1981                    // Cleanup pid table entry if it was set.
1982                    if let Ok(mut table) = pid_table.lock() {
1983                        table.remove(&proc_id);
1984                    }
1985                }
1986            }
1987        });
1988
1989        // Callers do `handle.read().await` for mesh readiness.
1990        Ok(handle)
1991    }
1992}
1993
1994#[async_trait]
1995impl hyperactor::host::SingleTerminate for BootstrapProcManager {
1996    /// Attempt to gracefully terminate one child procs managed by
1997    /// this `BootstrapProcManager`.
1998    ///
1999    /// Each child handle is asked to `terminate(timeout)`, which
2000    /// sends SIGTERM, waits up to the deadline, and escalates to
2001    /// SIGKILL if necessary. Termination is attempted concurrently,
2002    /// with at most `max_in_flight` tasks running at once.
2003    ///
2004    /// Logs a warning for each failure.
2005    async fn terminate_proc(
2006        &self,
2007        cx: &impl context::Actor,
2008        proc: &ProcId,
2009        timeout: Duration,
2010    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
2011        // Snapshot to avoid holding the lock across awaits.
2012        let proc_handle: Option<BootstrapProcHandle> = {
2013            let mut guard = self.children.lock().await;
2014            guard.remove(proc)
2015        };
2016
2017        if let Some(h) = proc_handle {
2018            h.terminate(cx, timeout)
2019                .await
2020                .map(|_| (Vec::new(), Vec::new()))
2021                .map_err(|e| e.into())
2022        } else {
2023            Err(anyhow::anyhow!("proc doesn't exist: {}", proc))
2024        }
2025    }
2026}
2027
2028#[async_trait]
2029impl hyperactor::host::BulkTerminate for BootstrapProcManager {
2030    /// Attempt to gracefully terminate all child procs managed by
2031    /// this `BootstrapProcManager`.
2032    ///
2033    /// Each child handle is asked to `terminate(timeout)`, which
2034    /// sends SIGTERM, waits up to the deadline, and escalates to
2035    /// SIGKILL if necessary. Termination is attempted concurrently,
2036    /// with at most `max_in_flight` tasks running at once.
2037    ///
2038    /// Returns a [`TerminateSummary`] with counts of how many procs
2039    /// were attempted, how many successfully terminated (including
2040    /// those that were already terminal), and how many failed.
2041    ///
2042    /// Logs a warning for each failure.
2043    async fn terminate_all(
2044        &self,
2045        cx: &impl context::Actor,
2046        timeout: Duration,
2047        max_in_flight: usize,
2048    ) -> TerminateSummary {
2049        // Snapshot to avoid holding the lock across awaits.
2050        let handles: Vec<BootstrapProcHandle> = {
2051            let guard = self.children.lock().await;
2052            guard.values().cloned().collect()
2053        };
2054
2055        let attempted = handles.len();
2056        let mut ok = 0usize;
2057
2058        let results = stream::iter(handles.into_iter().map(|h| async move {
2059            match h.terminate(cx, timeout).await {
2060                Ok(_) | Err(hyperactor::host::TerminateError::AlreadyTerminated(_)) => {
2061                    // Treat "already terminal" as success.
2062                    true
2063                }
2064                Err(e) => {
2065                    tracing::warn!(error=%e, "terminate_all: failed to terminate child");
2066                    false
2067                }
2068            }
2069        }))
2070        .buffer_unordered(max_in_flight.max(1))
2071        .collect::<Vec<bool>>()
2072        .await;
2073
2074        for r in results {
2075            if r {
2076                ok += 1;
2077            }
2078        }
2079
2080        TerminateSummary {
2081            attempted,
2082            ok,
2083            failed: attempted.saturating_sub(ok),
2084        }
2085    }
2086}
2087
2088/// Entry point to processes managed by hyperactor_mesh. Any process that is part
2089/// of a hyperactor_mesh program should call [`bootstrap`], which then configures
2090/// the process according to how it is invoked.
2091///
2092/// If bootstrap returns any error, it is defunct from the point of view of hyperactor_mesh,
2093/// and the process should likely exit:
2094///
2095/// ```ignore
2096/// let err = hyperactor_mesh::bootstrap().await;
2097/// tracing::error("could not bootstrap mesh process: {}", err);
2098/// std::process::exit(1);
2099/// ```
2100///
2101/// Use [`bootstrap_or_die`] to implement this behavior directly.
2102pub async fn bootstrap() -> anyhow::Error {
2103    let boot = ok!(Bootstrap::get_from_env()).unwrap_or_else(Bootstrap::default);
2104    boot.bootstrap().await
2105}
2106
2107/// Bootstrap a v0 proc mesh. This launches a control process that responds to
2108/// Allocator2Process messages, conveying its own state in Process2Allocator messages.
2109///
2110/// The bootstrapping process is controlled by the
2111/// following environment variables:
2112///
2113/// - `HYPERACTOR_MESH_BOOTSTRAP_ADDR`: the channel address to which Process2Allocator messages
2114///   should be sent.
2115/// - `HYPERACTOR_MESH_INDEX`: an index used to identify this process to the allocator.
2116async fn bootstrap_v0_proc_mesh() -> anyhow::Error {
2117    pub async fn go() -> Result<(), anyhow::Error> {
2118        let procs = Arc::new(tokio::sync::Mutex::new(Vec::<Proc>::new()));
2119        let procs_for_cleanup = procs.clone();
2120        let _cleanup_guard = hyperactor::register_signal_cleanup_scoped(Box::pin(async move {
2121            for proc_to_stop in procs_for_cleanup.lock().await.iter_mut() {
2122                if let Err(err) = proc_to_stop
2123                    .destroy_and_wait::<()>(Duration::from_millis(10), None)
2124                    .await
2125                {
2126                    tracing::error!(
2127                        "error while stopping proc {}: {}",
2128                        proc_to_stop.proc_id(),
2129                        err
2130                    );
2131                }
2132            }
2133        }));
2134
2135        let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
2136            .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
2137            .parse()?;
2138        let bootstrap_index: usize = std::env::var(BOOTSTRAP_INDEX_ENV)
2139            .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))?
2140            .parse()?;
2141        let listen_addr = ChannelAddr::any(bootstrap_addr.transport());
2142
2143        let entered = tracing::span!(
2144            Level::INFO,
2145            "bootstrap_v0_proc_mesh",
2146            %bootstrap_addr,
2147            %bootstrap_index,
2148            %listen_addr,
2149        )
2150        .entered();
2151
2152        let (serve_addr, mut rx) = channel::serve(listen_addr)?;
2153        let tx = channel::dial(bootstrap_addr.clone())?;
2154
2155        let (rtx, mut return_channel) = oneshot::channel();
2156        tx.try_post(
2157            Process2Allocator(bootstrap_index, Process2AllocatorMessage::Hello(serve_addr)),
2158            rtx,
2159        );
2160        tokio::spawn(exit_if_missed_heartbeat(bootstrap_index, bootstrap_addr));
2161
2162        let _ = entered.exit();
2163
2164        let mut the_msg;
2165
2166        tokio::select! {
2167            msg = rx.recv() => {
2168                the_msg = msg;
2169            }
2170            returned_msg = &mut return_channel => {
2171                match returned_msg {
2172                    Ok(msg) => {
2173                        return Err(anyhow::anyhow!("Hello message was not delivered:{:?}", msg));
2174                    }
2175                    Err(_) => {
2176                        the_msg = rx.recv().await;
2177                    }
2178                }
2179            }
2180        }
2181        loop {
2182            match the_msg? {
2183                Allocator2Process::StartProc(proc_id, listen_transport) => {
2184                    let span = tracing::span!(Level::INFO, "Allocator2Process::StartProc", %proc_id, %listen_transport);
2185                    let (proc, mesh_agent) = ProcMeshAgent::bootstrap(proc_id.clone())
2186                        .instrument(span.clone())
2187                        .await?;
2188                    let entered = span.entered();
2189                    let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?;
2190                    let handle = proc.clone().serve(proc_rx);
2191                    drop(handle); // linter appeasement; it is safe to drop this future
2192                    let span = entered.exit();
2193                    tx.send(Process2Allocator(
2194                        bootstrap_index,
2195                        Process2AllocatorMessage::StartedProc(
2196                            proc_id.clone(),
2197                            mesh_agent.bind(),
2198                            proc_addr,
2199                        ),
2200                    ))
2201                    .instrument(span)
2202                    .await?;
2203                    procs.lock().await.push(proc);
2204                }
2205                Allocator2Process::StopAndExit(code) => {
2206                    tracing::info!("stopping procs with code {code}");
2207                    {
2208                        for proc_to_stop in procs.lock().await.iter_mut() {
2209                            if let Err(err) = proc_to_stop
2210                                .destroy_and_wait::<()>(Duration::from_millis(10), None)
2211                                .await
2212                            {
2213                                tracing::error!(
2214                                    "error while stopping proc {}: {}",
2215                                    proc_to_stop.proc_id(),
2216                                    err
2217                                );
2218                            }
2219                        }
2220                    }
2221                    tracing::info!("exiting with {code}");
2222                    std::process::exit(code);
2223                }
2224                Allocator2Process::Exit(code) => {
2225                    tracing::info!("exiting with {code}");
2226                    std::process::exit(code);
2227                }
2228            }
2229            the_msg = rx.recv().await;
2230        }
2231    }
2232
2233    go().await.unwrap_err()
2234}
2235
2236/// A variant of [`bootstrap`] that logs the error and exits the process
2237/// if bootstrapping fails.
2238pub async fn bootstrap_or_die() -> ! {
2239    let err = bootstrap().await;
2240    let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
2241    tracing::error!("failed to bootstrap mesh process: {}", err);
2242    std::process::exit(1)
2243}
2244
2245#[derive(enum_as_inner::EnumAsInner)]
2246enum DebugSink {
2247    File(std::fs::File),
2248    Sink,
2249}
2250
2251impl DebugSink {
2252    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2253        match self {
2254            DebugSink::File(f) => f.write(buf),
2255            DebugSink::Sink => Ok(buf.len()),
2256        }
2257    }
2258    fn flush(&mut self) -> io::Result<()> {
2259        match self {
2260            DebugSink::File(f) => f.flush(),
2261            DebugSink::Sink => Ok(()),
2262        }
2263    }
2264}
2265
2266fn debug_sink() -> &'static Mutex<DebugSink> {
2267    static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
2268    DEBUG_SINK.get_or_init(|| {
2269        let debug_path = {
2270            let mut p = std::env::temp_dir();
2271            if let Ok(user) = std::env::var("USER") {
2272                p.push(user);
2273            }
2274            std::fs::create_dir_all(&p).ok();
2275            p.push("monarch-bootstrap-debug.log");
2276            p
2277        };
2278        let sink = if debug_path.exists() {
2279            match OpenOptions::new()
2280                .append(true)
2281                .create(true)
2282                .open(debug_path.clone())
2283            {
2284                Ok(f) => DebugSink::File(f),
2285                Err(_e) => {
2286                    eprintln!(
2287                        "failed to open {} for bootstrap debug logging",
2288                        debug_path.display()
2289                    );
2290                    DebugSink::Sink
2291                }
2292            }
2293        } else {
2294            DebugSink::Sink
2295        };
2296        Mutex::new(sink)
2297    })
2298}
2299
2300/// If true, send `Debug` messages to stderr.
2301const DEBUG_TO_STDERR: bool = false;
2302
2303/// A bootstrap specific debug writer. If the file /tmp/monarch-bootstrap-debug.log
2304/// exists, then the writer's destination is that file; otherwise it discards all writes.
2305struct Debug;
2306
2307impl Debug {
2308    fn is_active() -> bool {
2309        DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
2310    }
2311}
2312
2313impl Write for Debug {
2314    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2315        let res = debug_sink().lock().unwrap().write(buf);
2316        if DEBUG_TO_STDERR {
2317            let n = match res {
2318                Ok(n) => n,
2319                Err(_) => buf.len(),
2320            };
2321            let _ = io::stderr().write_all(&buf[..n]);
2322        }
2323
2324        res
2325    }
2326    fn flush(&mut self) -> io::Result<()> {
2327        let res = debug_sink().lock().unwrap().flush();
2328        if DEBUG_TO_STDERR {
2329            let _ = io::stderr().flush();
2330        }
2331        res
2332    }
2333}
2334
2335/// Create a new runtime [`TempDir`]. The directory is created in
2336/// `$XDG_RUNTIME_DIR`, otherwise falling back to the system tempdir.
2337fn runtime_dir() -> io::Result<TempDir> {
2338    match std::env::var_os("XDG_RUNTIME_DIR") {
2339        Some(runtime_dir) => {
2340            let path = PathBuf::from(runtime_dir);
2341            tempfile::tempdir_in(path)
2342        }
2343        None => tempfile::tempdir(),
2344    }
2345}
2346
2347#[cfg(test)]
2348mod tests {
2349    use std::path::PathBuf;
2350    use std::process::Stdio;
2351
2352    use hyperactor::ActorId;
2353    use hyperactor::ActorRef;
2354    use hyperactor::ProcId;
2355    use hyperactor::WorldId;
2356    use hyperactor::channel::ChannelAddr;
2357    use hyperactor::channel::ChannelTransport;
2358    use hyperactor::channel::TcpMode;
2359    use hyperactor::clock::RealClock;
2360    use hyperactor::context::Mailbox as _;
2361    use hyperactor::host::ProcHandle;
2362    use hyperactor::id;
2363    use ndslice::Extent;
2364    use ndslice::ViewExt;
2365    use ndslice::extent;
2366    use tokio::process::Command;
2367
2368    use super::*;
2369    use crate::alloc::AllocSpec;
2370    use crate::alloc::Allocator;
2371    use crate::alloc::ProcessAllocator;
2372    use crate::v1::ActorMesh;
2373    use crate::v1::host_mesh::HostMesh;
2374    use crate::v1::testactor;
2375
2376    // Helper: Avoid repeating
2377    // `ChannelAddr::any(ChannelTransport::Unix)`.
2378    fn any_addr_for_test() -> ChannelAddr {
2379        ChannelAddr::any(ChannelTransport::Unix)
2380    }
2381
2382    #[test]
2383    fn test_bootstrap_mode_env_string_none_config_proc() {
2384        let values = [
2385            Bootstrap::default(),
2386            Bootstrap::Proc {
2387                proc_id: id!(foo[0]),
2388                backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
2389                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2390                socket_dir_path: PathBuf::from("notexist"),
2391                config: None,
2392            },
2393        ];
2394
2395        for value in values {
2396            let safe = value.to_env_safe_string().unwrap();
2397            let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2398
2399            // Re-encode and compare: deterministic round-trip of the
2400            // wire format.
2401            let safe2 = round.to_env_safe_string().unwrap();
2402            assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2403
2404            // Sanity: the decoded variant is what we expect.
2405            match (&value, &round) {
2406                (Bootstrap::Proc { config: None, .. }, Bootstrap::Proc { config: None, .. }) => {}
2407                (Bootstrap::V0ProcMesh, Bootstrap::V0ProcMesh) => {}
2408                _ => panic!("decoded variant mismatch: got {:?}", round),
2409            }
2410        }
2411    }
2412
2413    #[test]
2414    fn test_bootstrap_mode_env_string_none_config_host() {
2415        let value = Bootstrap::Host {
2416            addr: ChannelAddr::any(ChannelTransport::Unix),
2417            command: None,
2418            config: None,
2419        };
2420
2421        let safe = value.to_env_safe_string().unwrap();
2422        let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2423
2424        // Wire-format round-trip should be identical.
2425        let safe2 = round.to_env_safe_string().unwrap();
2426        assert_eq!(safe, safe2);
2427
2428        // Sanity: decoded variant is Host with None config.
2429        match round {
2430            Bootstrap::Host { config: None, .. } => {}
2431            other => panic!("expected Host with None config, got {:?}", other),
2432        }
2433    }
2434
2435    #[test]
2436    fn test_bootstrap_mode_env_string_invalid() {
2437        // Not valid base64
2438        assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2439    }
2440
2441    #[test]
2442    fn test_bootstrap_env_roundtrip_with_config_proc_and_host() {
2443        // Build a small, distinctive Attrs snapshot.
2444        let mut attrs = Attrs::new();
2445        attrs[MESH_TAIL_LOG_LINES] = 123;
2446        attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2447
2448        let socket_dir = runtime_dir().unwrap();
2449
2450        // Proc case
2451        {
2452            let original = Bootstrap::Proc {
2453                proc_id: id!(foo[42]),
2454                backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2455                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2456                config: Some(attrs.clone()),
2457                socket_dir_path: socket_dir.path().to_owned(),
2458            };
2459            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2460            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2461            match &decoded {
2462                Bootstrap::Proc { config, .. } => {
2463                    let cfg = config.as_ref().expect("expected Some(attrs)");
2464                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2465                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2466                }
2467                other => panic!("unexpected variant after roundtrip: {:?}", other),
2468            }
2469        }
2470
2471        // Host case
2472        {
2473            let original = Bootstrap::Host {
2474                addr: ChannelAddr::any(ChannelTransport::Unix),
2475                command: None,
2476                config: Some(attrs.clone()),
2477            };
2478            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2479            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2480            match &decoded {
2481                Bootstrap::Host { config, .. } => {
2482                    let cfg = config.as_ref().expect("expected Some(attrs)");
2483                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2484                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2485                }
2486                other => panic!("unexpected variant after roundtrip: {:?}", other),
2487            }
2488        }
2489    }
2490
2491    #[tokio::test]
2492    async fn test_child_terminated_on_manager_drop() {
2493        use std::path::PathBuf;
2494        use std::process::Stdio;
2495
2496        use tokio::process::Command;
2497
2498        // Manager; program path is irrelevant for this test.
2499        let command = BootstrapCommand {
2500            program: PathBuf::from("/bin/true"),
2501            ..Default::default()
2502        };
2503        let manager = BootstrapProcManager::new(command).unwrap();
2504
2505        // Spawn a long-running child process (sleep 30) with
2506        // kill_on_drop(true).
2507        let mut cmd = Command::new("/bin/sleep");
2508        cmd.arg("30")
2509            .stdout(Stdio::null())
2510            .stderr(Stdio::null())
2511            .kill_on_drop(true);
2512
2513        let child = cmd.spawn().expect("spawn sleep");
2514        let pid = child.id().expect("pid");
2515
2516        // Insert into the manager's children map (simulates a spawned
2517        // proc).
2518        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "test".to_string());
2519        {
2520            let mut children = manager.children.lock().await;
2521            children.insert(
2522                proc_id.clone(),
2523                BootstrapProcHandle::new(proc_id.clone(), child),
2524            );
2525        }
2526
2527        // (Linux-only) Verify the process exists before drop.
2528        #[cfg(target_os = "linux")]
2529        {
2530            let path = format!("/proc/{}", pid);
2531            assert!(
2532                std::fs::metadata(&path).is_ok(),
2533                "expected /proc/{pid} to exist before drop"
2534            );
2535        }
2536
2537        // Drop the manager. We don't rely on `Child::kill_on_drop`.
2538        // The exit monitor owns the Child; the manager's Drop uses
2539        // `pid_table` to best-effort SIGKILL any recorded PIDs. This
2540        // should terminate the child.
2541        drop(manager);
2542
2543        // Poll until the /proc entry disappears OR the state is
2544        // Zombie.
2545        #[cfg(target_os = "linux")]
2546        {
2547            let deadline = std::time::Instant::now() + Duration::from_millis(1500);
2548            let proc_dir = format!("/proc/{}", pid);
2549            let status_file = format!("{}/status", proc_dir);
2550
2551            let mut ok = false;
2552            while std::time::Instant::now() < deadline {
2553                match std::fs::read_to_string(&status_file) {
2554                    Ok(s) => {
2555                        if let Some(state_line) = s.lines().find(|l| l.starts_with("State:")) {
2556                            if state_line.contains('Z') {
2557                                // Only 'Z' (Zombie) is acceptable.
2558                                ok = true;
2559                                break;
2560                            } else {
2561                                // Still alive (e.g. 'R', 'S', etc.). Give it more time.
2562                            }
2563                        }
2564                    }
2565                    Err(_) => {
2566                        // /proc/<pid>/status no longer exists -> fully gone
2567                        ok = true;
2568                        break;
2569                    }
2570                }
2571                RealClock.sleep(Duration::from_millis(100)).await;
2572            }
2573
2574            assert!(ok, "expected /proc/{pid} to be gone or zombie after drop");
2575        }
2576
2577        // On non-Linux, absence of panics/hangs is the signal; PID
2578        // probing is platform-specific.
2579    }
2580
2581    #[tokio::test]
2582    async fn test_v1_child_logging() {
2583        use hyperactor::channel;
2584        use hyperactor::data::Serialized;
2585        use hyperactor::mailbox::BoxedMailboxSender;
2586        use hyperactor::mailbox::DialMailboxRouter;
2587        use hyperactor::mailbox::MailboxServer;
2588        use hyperactor::proc::Proc;
2589
2590        use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2591        use crate::logging::LogClientActor;
2592        use crate::logging::LogClientMessageClient;
2593        use crate::logging::LogForwardActor;
2594        use crate::logging::LogMessage;
2595        use crate::logging::OutputTarget;
2596        use crate::logging::test_tap;
2597
2598        let router = DialMailboxRouter::new();
2599        let (proc_addr, proc_rx) =
2600            channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2601        let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
2602        proc.clone().serve(proc_rx);
2603        router.bind(id!(client[0]).into(), proc_addr.clone());
2604        let (client, _handle) = proc.instance("client").unwrap();
2605
2606        let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2607        test_tap::install(tap_tx);
2608
2609        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2610        // SAFETY: unit-test scoped env var
2611        unsafe {
2612            std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2613        }
2614
2615        // Spawn the log client and disable aggregation (immediate
2616        // print + tap push).
2617        let log_client: ActorRef<LogClientActor> =
2618            proc.spawn("log_client", ()).await.unwrap().bind();
2619        log_client.set_aggregate(&client, None).await.unwrap();
2620
2621        // Spawn the forwarder in this proc (it will serve
2622        // BOOTSTRAP_LOG_CHANNEL).
2623        let _log_forwarder: ActorRef<LogForwardActor> = proc
2624            .spawn("log_forwarder", log_client.clone())
2625            .await
2626            .unwrap()
2627            .bind();
2628
2629        // Dial the channel but don't post until we know the forwarder
2630        // is receiving.
2631        let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2632
2633        // Send a fake log message as if it came from the proc
2634        // manager's writer.
2635        tx.post(LogMessage::Log {
2636            hostname: "testhost".into(),
2637            pid: 12345,
2638            output_target: OutputTarget::Stdout,
2639            payload: Serialized::serialize(&"hello from child".to_string()).unwrap(),
2640        });
2641
2642        // Assert we see it via the tap.
2643        let line = RealClock
2644            .timeout(Duration::from_secs(2), tap_rx.recv())
2645            .await
2646            .expect("timed out waiting for log line")
2647            .expect("tap channel closed unexpectedly");
2648        assert!(
2649            line.contains("hello from child"),
2650            "log line did not appear via LogClientActor; got: {line}"
2651        );
2652    }
2653
2654    mod proc_handle {
2655
2656        use hyperactor::ActorId;
2657        use hyperactor::ActorRef;
2658        use hyperactor::ProcId;
2659        use hyperactor::WorldId;
2660        use hyperactor::host::ProcHandle;
2661        use tokio::process::Command;
2662
2663        use super::super::*;
2664        use super::any_addr_for_test;
2665
2666        // Helper: build a ProcHandle with a short-lived child
2667        // process. We don't rely on the actual process; we only
2668        // exercise the status transitions.
2669        fn handle_for_test() -> BootstrapProcHandle {
2670            // Spawn a trivial child that exits immediately.
2671            let child = Command::new("sh")
2672                .arg("-c")
2673                .arg("exit 0")
2674                .stdin(std::process::Stdio::null())
2675                .stdout(std::process::Stdio::null())
2676                .stderr(std::process::Stdio::null())
2677                .spawn()
2678                .expect("failed to spawn test child process");
2679
2680            let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
2681            BootstrapProcHandle::new(proc_id, child)
2682        }
2683
2684        #[tokio::test]
2685        async fn starting_to_running_ok() {
2686            let h = handle_for_test();
2687            assert!(matches!(h.status(), ProcStatus::Starting));
2688            let child_pid = h.pid().expect("child should have a pid");
2689            let child_started_at = RealClock.system_time_now();
2690            assert!(h.mark_running(child_pid, child_started_at));
2691            match h.status() {
2692                ProcStatus::Running { pid, started_at } => {
2693                    assert_eq!(pid, child_pid);
2694                    assert_eq!(started_at, child_started_at);
2695                }
2696                other => panic!("expected Running, got {other:?}"),
2697            }
2698        }
2699
2700        #[tokio::test]
2701        async fn running_to_stopping_to_stopped_ok() {
2702            let h = handle_for_test();
2703            let child_pid = h.pid().expect("child should have a pid");
2704            let child_started_at = RealClock.system_time_now();
2705            assert!(h.mark_running(child_pid, child_started_at));
2706            assert!(h.mark_stopping());
2707            assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2708            assert!(h.mark_stopped(0, Vec::new()));
2709            assert!(matches!(
2710                h.status(),
2711                ProcStatus::Stopped { exit_code: 0, .. }
2712            ));
2713        }
2714
2715        #[tokio::test]
2716        async fn running_to_killed_ok() {
2717            let h = handle_for_test();
2718            let child_pid = h.pid().expect("child should have a pid");
2719            let child_started_at = RealClock.system_time_now();
2720            assert!(h.mark_running(child_pid, child_started_at));
2721            assert!(h.mark_killed(9, true));
2722            assert!(matches!(
2723                h.status(),
2724                ProcStatus::Killed {
2725                    signal: 9,
2726                    core_dumped: true
2727                }
2728            ));
2729        }
2730
2731        #[tokio::test]
2732        async fn running_to_failed_ok() {
2733            let h = handle_for_test();
2734            let child_pid = h.pid().expect("child should have a pid");
2735            let child_started_at = RealClock.system_time_now();
2736            assert!(h.mark_running(child_pid, child_started_at));
2737            assert!(h.mark_failed("bootstrap error"));
2738            match h.status() {
2739                ProcStatus::Failed { reason } => {
2740                    assert_eq!(reason, "bootstrap error");
2741                }
2742                other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2743            }
2744        }
2745
2746        #[tokio::test]
2747        async fn illegal_transitions_are_rejected() {
2748            let h = handle_for_test();
2749            let child_pid = h.pid().expect("child should have a pid");
2750            let child_started_at = RealClock.system_time_now();
2751            // Starting -> Running is fine; second Running should be rejected.
2752            assert!(h.mark_running(child_pid, child_started_at));
2753            assert!(!h.mark_running(child_pid, RealClock.system_time_now()));
2754            match h.status() {
2755                ProcStatus::Running { pid, .. } => assert_eq!(pid, child_pid),
2756                other => panic!("expected Running, got {other:?}"),
2757            }
2758            // Once Stopped, we can't go to Running/Killed/Failed/etc.
2759            assert!(h.mark_stopping());
2760            assert!(h.mark_stopped(0, Vec::new()));
2761            assert!(!h.mark_running(child_pid, child_started_at));
2762            assert!(!h.mark_killed(9, false));
2763            assert!(!h.mark_failed("nope"));
2764
2765            assert!(matches!(
2766                h.status(),
2767                ProcStatus::Stopped { exit_code: 0, .. }
2768            ));
2769        }
2770
2771        #[tokio::test]
2772        async fn transitions_from_ready_are_legal() {
2773            let h = handle_for_test();
2774            let addr = any_addr_for_test();
2775            // Mark Running.
2776            let pid = h.pid().expect("child should have a pid");
2777            let t0 = RealClock.system_time_now();
2778            assert!(h.mark_running(pid, t0));
2779            // Build a consistent AgentRef for Ready using the
2780            // handle's ProcId.
2781            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2782            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2783            let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2784            // Ready -> Stopping -> Stopped should be legal.
2785            assert!(h.mark_ready(pid, t0, addr, agent_ref));
2786            assert!(h.mark_stopping());
2787            assert!(h.mark_stopped(0, Vec::new()));
2788        }
2789
2790        #[tokio::test]
2791        async fn ready_to_killed_is_legal() {
2792            let h = handle_for_test();
2793            let addr = any_addr_for_test();
2794            // Starting -> Running
2795            let pid = h.pid().expect("child should have a pid");
2796            let t0 = RealClock.system_time_now();
2797            assert!(h.mark_running(pid, t0));
2798            // Build a consistent AgentRef for Ready using the
2799            // handle's ProcId.
2800            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2801            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2802            let agent: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2803            // Running -> Ready
2804            assert!(h.mark_ready(pid, t0, addr, agent));
2805            // Ready -> Killed
2806            assert!(h.mark_killed(9, false));
2807        }
2808
2809        #[tokio::test]
2810        async fn mark_stopping_from_starting_uses_child_pid_when_available() {
2811            let h = handle_for_test();
2812
2813            // In `Starting`, we should still be able to see the PID via
2814            // Child::id().
2815            let child_pid = h
2816                .pid()
2817                .expect("Child::id() should be available in Starting");
2818
2819            // mark_stopping() should transition to Stopping{pid,
2820            // started_at} and stash the same pid we observed.
2821            assert!(
2822                h.mark_stopping(),
2823                "mark_stopping() should succeed from Starting"
2824            );
2825            match h.status() {
2826                ProcStatus::Stopping { pid, started_at } => {
2827                    assert_eq!(pid, child_pid, "Stopping pid should come from Child::id()");
2828                    assert!(
2829                        started_at <= RealClock.system_time_now(),
2830                        "started_at should be sane"
2831                    );
2832                }
2833                other => panic!("expected Stopping{{..}}; got {other:?}"),
2834            }
2835        }
2836
2837        #[tokio::test]
2838        async fn mark_stopping_noop_when_no_child_pid_available() {
2839            let h = handle_for_test();
2840
2841            // Simulate the exit-monitor having already taken the
2842            // Child so there is no Child::id() and we haven't cached
2843            // a pid yet.
2844            {
2845                let _ = h.child.lock().expect("child mutex").take();
2846            }
2847
2848            // With no observable pid, mark_stopping() must no-op and
2849            // leave us in Starting.
2850            assert!(
2851                !h.mark_stopping(),
2852                "mark_stopping() should no-op from Starting when no pid is observable"
2853            );
2854            assert!(matches!(h.status(), ProcStatus::Starting));
2855        }
2856
2857        #[tokio::test]
2858        async fn mark_failed_from_stopping_is_allowed() {
2859            let h = handle_for_test();
2860
2861            // Drive Starting -> Stopping (pid available via
2862            // Child::id()).
2863            assert!(h.mark_stopping(), "precondition: to Stopping");
2864
2865            // Now allow Stopping -> Failed.
2866            assert!(
2867                h.mark_failed("boom"),
2868                "mark_failed() should succeed from Stopping"
2869            );
2870            match h.status() {
2871                ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2872                other => panic!("expected Failed(\"boom\"), got {other:?}"),
2873            }
2874        }
2875    }
2876
2877    #[tokio::test]
2878    async fn test_exit_monitor_updates_status_on_clean_exit() {
2879        let command = BootstrapCommand {
2880            program: PathBuf::from("/bin/true"),
2881            ..Default::default()
2882        };
2883        let manager = BootstrapProcManager::new(command).unwrap();
2884
2885        // Spawn a fast-exiting child.
2886        let mut cmd = Command::new("true");
2887        cmd.stdout(Stdio::null()).stderr(Stdio::null());
2888        let child = cmd.spawn().expect("spawn true");
2889
2890        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "clean".into());
2891        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2892
2893        // Put into manager & start monitor.
2894        {
2895            let mut children = manager.children.lock().await;
2896            children.insert(proc_id.clone(), handle.clone());
2897        }
2898        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2899        {
2900            let guard = handle.child.lock().expect("child mutex");
2901            assert!(
2902                guard.is_none(),
2903                "expected Child to be taken by exit monitor"
2904            );
2905        }
2906
2907        let st = handle.wait_inner().await;
2908        assert!(matches!(st, ProcStatus::Stopped { .. }), "status={st:?}");
2909    }
2910
2911    #[tokio::test]
2912    async fn test_exit_monitor_updates_status_on_kill() {
2913        let command = BootstrapCommand {
2914            program: PathBuf::from("/bin/sleep"),
2915            ..Default::default()
2916        };
2917        let manager = BootstrapProcManager::new(command).unwrap();
2918
2919        // Spawn a process that will live long enough to kill.
2920        let mut cmd = Command::new("/bin/sleep");
2921        cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
2922        let child = cmd.spawn().expect("spawn sleep");
2923        let pid = child.id().expect("pid") as i32;
2924
2925        // Register with the manager and start the exit monitor.
2926        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "killed".into());
2927        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2928        {
2929            let mut children = manager.children.lock().await;
2930            children.insert(proc_id.clone(), handle.clone());
2931        }
2932        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2933        {
2934            let guard = handle.child.lock().expect("child mutex");
2935            assert!(
2936                guard.is_none(),
2937                "expected Child to be taken by exit monitor"
2938            );
2939        }
2940        // Send SIGKILL to the process.
2941        #[cfg(unix)]
2942        // SAFETY: We own the child process we just spawned and have
2943        // its PID.
2944        // Sending SIGKILL is safe here because:
2945        //  - the PID comes directly from `child.id()`
2946        //  - the process is guaranteed to be in our test scope
2947        //  - we don't touch any memory via this call, only signal the
2948        //    OS
2949        unsafe {
2950            libc::kill(pid, libc::SIGKILL);
2951        }
2952
2953        let st = handle.wait_inner().await;
2954        match st {
2955            ProcStatus::Killed { signal, .. } => assert_eq!(signal, libc::SIGKILL),
2956            other => panic!("expected Killed(SIGKILL), got {other:?}"),
2957        }
2958    }
2959
2960    #[tokio::test]
2961    async fn watch_notifies_on_status_changes() {
2962        let child = Command::new("sh")
2963            .arg("-c")
2964            .arg("sleep 0.1")
2965            .stdin(std::process::Stdio::null())
2966            .stdout(std::process::Stdio::null())
2967            .stderr(std::process::Stdio::null())
2968            .spawn()
2969            .expect("spawn");
2970
2971        let proc_id = ProcId::Ranked(WorldId("test".into()), 1);
2972        let handle = BootstrapProcHandle::new(proc_id, child);
2973        let mut rx = handle.watch();
2974
2975        // Starting -> Running
2976        let pid = handle.pid().unwrap_or(0);
2977        let now = RealClock.system_time_now();
2978        assert!(handle.mark_running(pid, now));
2979        rx.changed().await.ok(); // Observe the transition.
2980        match &*rx.borrow() {
2981            ProcStatus::Running { pid: p, started_at } => {
2982                assert_eq!(*p, pid);
2983                assert_eq!(*started_at, now);
2984            }
2985            s => panic!("expected Running, got {s:?}"),
2986        }
2987
2988        // Running -> Stopped
2989        assert!(handle.mark_stopped(0, Vec::new()));
2990        rx.changed().await.ok(); // Observe the transition.
2991        assert!(matches!(
2992            &*rx.borrow(),
2993            ProcStatus::Stopped { exit_code: 0, .. }
2994        ));
2995    }
2996
2997    #[tokio::test]
2998    async fn ready_errs_if_process_exits_before_running() {
2999        // Child exits immediately.
3000        let child = Command::new("sh")
3001            .arg("-c")
3002            .arg("exit 7")
3003            .stdin(std::process::Stdio::null())
3004            .stdout(std::process::Stdio::null())
3005            .stderr(std::process::Stdio::null())
3006            .spawn()
3007            .expect("spawn");
3008
3009        let proc_id = ProcId::Direct(
3010            ChannelAddr::any(ChannelTransport::Unix),
3011            "early-exit".into(),
3012        );
3013        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3014
3015        // Simulate the exit monitor doing its job directly here.
3016        // (Equivalent outcome: terminal state before Running.)
3017        assert!(handle.mark_stopped(7, Vec::new()));
3018
3019        // `ready()` should return Err with the terminal status.
3020        match handle.ready_inner().await {
3021            Ok(()) => panic!("ready() unexpectedly succeeded"),
3022            Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
3023                assert_eq!(exit_code, 7)
3024            }
3025            Err(other) => panic!("expected Stopped(7), got {other:?}"),
3026        }
3027    }
3028
3029    #[tokio::test]
3030    async fn status_unknown_proc_is_none() {
3031        let manager = BootstrapProcManager::new(BootstrapCommand {
3032            program: PathBuf::from("/bin/true"),
3033            ..Default::default()
3034        })
3035        .unwrap();
3036        let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
3037        assert!(manager.status(&unknown).await.is_none());
3038    }
3039
3040    #[tokio::test]
3041    async fn exit_monitor_child_already_taken_leaves_status_unchanged() {
3042        let manager = BootstrapProcManager::new(BootstrapCommand {
3043            program: PathBuf::from("/bin/sleep"),
3044            ..Default::default()
3045        })
3046        .unwrap();
3047
3048        // Long-ish child so it's alive while we "steal" it.
3049        let mut cmd = Command::new("/bin/sleep");
3050        cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
3051        let child = cmd.spawn().expect("spawn sleep");
3052
3053        let proc_id = ProcId::Direct(
3054            ChannelAddr::any(ChannelTransport::Unix),
3055            "already-taken".into(),
3056        );
3057        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3058
3059        // Insert, then deliberately consume the Child before starting
3060        // the monitor.
3061        {
3062            let mut children = manager.children.lock().await;
3063            children.insert(proc_id.clone(), handle.clone());
3064        }
3065        {
3066            let _ = handle.child.lock().expect("child mutex").take();
3067        }
3068
3069        // This should not panic; monitor will see None and bail.
3070        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3071
3072        // Status should remain whatever it was (Starting in this
3073        // setup).
3074        assert!(matches!(
3075            manager.status(&proc_id).await,
3076            Some(ProcStatus::Starting)
3077        ));
3078    }
3079
3080    #[tokio::test]
3081    async fn pid_none_after_exit_monitor_takes_child() {
3082        let manager = BootstrapProcManager::new(BootstrapCommand {
3083            program: PathBuf::from("/bin/sleep"),
3084            ..Default::default()
3085        })
3086        .unwrap();
3087
3088        let mut cmd = Command::new("/bin/sleep");
3089        cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
3090        let child = cmd.spawn().expect("spawn sleep");
3091
3092        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "pid-none".into());
3093        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3094
3095        // Before monitor: we should still be able to read a pid.
3096        assert!(handle.pid().is_some());
3097        {
3098            let mut children = manager.children.lock().await;
3099            children.insert(proc_id.clone(), handle.clone());
3100        }
3101        // `spawn_exit_monitor` takes the Child synchronously before
3102        // spawning the task.
3103        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3104
3105        // Immediately after, the handle should no longer expose a pid
3106        // (Child is gone).
3107        assert!(handle.pid().is_none());
3108    }
3109
3110    #[tokio::test]
3111    async fn starting_may_directly_be_marked_stopped() {
3112        // Unit-level transition check for the "process exited very
3113        // fast" case.
3114        let child = Command::new("sh")
3115            .arg("-c")
3116            .arg("exit 0")
3117            .stdin(std::process::Stdio::null())
3118            .stdout(std::process::Stdio::null())
3119            .stderr(std::process::Stdio::null())
3120            .spawn()
3121            .expect("spawn true");
3122
3123        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "fast-exit".into());
3124        let handle = BootstrapProcHandle::new(proc_id, child);
3125
3126        // Take the child (don't hold the mutex across an await).
3127        let mut c = {
3128            let mut guard = handle.child.lock().expect("child mutex");
3129            guard.take()
3130        }
3131        .expect("child already taken");
3132
3133        let status = c.wait().await.expect("wait");
3134        let code = status.code().unwrap_or(0);
3135        assert!(handle.mark_stopped(code, Vec::new()));
3136
3137        assert!(matches!(
3138            handle.status(),
3139            ProcStatus::Stopped { exit_code: 0, .. }
3140        ));
3141    }
3142
3143    #[tokio::test]
3144    async fn handle_ready_allows_waiters() {
3145        let child = Command::new("sh")
3146            .arg("-c")
3147            .arg("sleep 0.1")
3148            .stdin(std::process::Stdio::null())
3149            .stdout(std::process::Stdio::null())
3150            .stderr(std::process::Stdio::null())
3151            .spawn()
3152            .expect("spawn sleep");
3153        let proc_id = ProcId::Ranked(WorldId("test".into()), 42);
3154        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3155
3156        let pid = handle.pid().expect("child should have a pid");
3157        let started_at = RealClock.system_time_now();
3158        assert!(handle.mark_running(pid, started_at));
3159
3160        let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3161        let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
3162
3163        // Pick any addr to carry in Ready (what the child would have
3164        // called back with).
3165        let ready_addr = any_addr_for_test();
3166
3167        // Stamp Ready and assert ready().await unblocks.
3168        assert!(handle.mark_ready(pid, started_at, ready_addr.clone(), agent_ref));
3169        handle
3170            .ready_inner()
3171            .await
3172            .expect("ready_inner() should complete after Ready");
3173
3174        // Sanity-check the Ready fields we control
3175        // (pid/started_at/addr).
3176        match handle.status() {
3177            ProcStatus::Ready {
3178                pid: p,
3179                started_at: t,
3180                addr: a,
3181                ..
3182            } => {
3183                assert_eq!(p, pid);
3184                assert_eq!(t, started_at);
3185                assert_eq!(a, ready_addr);
3186            }
3187            other => panic!("expected Ready, got {other:?}"),
3188        }
3189    }
3190
3191    #[tokio::test]
3192    async fn pid_behavior_across_states_running_ready_then_stopped() {
3193        let child = Command::new("sh")
3194            .arg("-c")
3195            .arg("sleep 0.1")
3196            .stdin(std::process::Stdio::null())
3197            .stdout(std::process::Stdio::null())
3198            .stderr(std::process::Stdio::null())
3199            .spawn()
3200            .expect("spawn");
3201
3202        let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
3203        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3204
3205        // Running → pid() Some
3206        let pid = handle.pid().expect("initial Child::id");
3207        let t0 = RealClock.system_time_now();
3208        assert!(handle.mark_running(pid, t0));
3209        assert_eq!(handle.pid(), Some(pid));
3210
3211        // Ready → pid() still Some even if Child taken
3212        let addr = any_addr_for_test();
3213        let agent = {
3214            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3215            ActorRef::<ProcMeshAgent>::attest(actor_id)
3216        };
3217        assert!(handle.mark_ready(pid, t0, addr, agent));
3218        {
3219            let _ = handle.child.lock().expect("child mutex").take();
3220        }
3221        assert_eq!(handle.pid(), Some(pid));
3222
3223        // Terminal (Stopped) → pid() None
3224        assert!(handle.mark_stopped(0, Vec::new()));
3225        assert_eq!(handle.pid(), None, "pid() should be None once terminal");
3226    }
3227
3228    #[tokio::test]
3229    async fn pid_is_available_in_ready_even_after_child_taken() {
3230        let child = Command::new("sh")
3231            .arg("-c")
3232            .arg("sleep 0.1")
3233            .stdin(std::process::Stdio::null())
3234            .stdout(std::process::Stdio::null())
3235            .stderr(std::process::Stdio::null())
3236            .spawn()
3237            .expect("spawn");
3238
3239        let proc_id = ProcId::Ranked(WorldId("test".into()), 99);
3240        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3241
3242        // Mark Running.
3243        let pid = handle.pid().expect("child should have pid (via Child::id)");
3244        let started_at = RealClock.system_time_now();
3245        assert!(handle.mark_running(pid, started_at));
3246
3247        // Stamp Ready with synthetic addr/agent (attested).
3248        let addr = any_addr_for_test();
3249        let agent = {
3250            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3251            ActorRef::<ProcMeshAgent>::attest(actor_id)
3252        };
3253        assert!(handle.mark_ready(pid, started_at, addr, agent));
3254
3255        // Simulate exit monitor taking the Child (so Child::id() is
3256        // no longer available).
3257        {
3258            let _ = handle.child.lock().expect("child mutex").take();
3259        }
3260
3261        // **Regression check**: pid() must still return the cached
3262        // pid in Ready.
3263        assert_eq!(handle.pid(), Some(pid), "pid() should be cached in Ready");
3264    }
3265
3266    #[test]
3267    fn display_running_includes_pid_and_uptime() {
3268        let started_at = RealClock.system_time_now() - Duration::from_secs(42);
3269        let st = ProcStatus::Running {
3270            pid: 1234,
3271            started_at,
3272        };
3273
3274        let s = format!("{}", st);
3275        assert!(s.contains("1234"));
3276        assert!(s.contains("Running"));
3277        assert!(s.contains("42s"));
3278    }
3279
3280    #[test]
3281    fn display_ready_includes_pid_and_addr() {
3282        let started_at = RealClock.system_time_now() - Duration::from_secs(5);
3283        let addr = ChannelAddr::any(ChannelTransport::Unix);
3284        let agent =
3285            ActorRef::attest(ProcId::Direct(addr.clone(), "proc".into()).actor_id("agent", 0));
3286
3287        let st = ProcStatus::Ready {
3288            pid: 4321,
3289            started_at,
3290            addr: addr.clone(),
3291            agent,
3292        };
3293
3294        let s = format!("{}", st);
3295        assert!(s.contains("4321")); // pid
3296        assert!(s.contains(&addr.to_string())); // addr
3297        assert!(s.contains("Ready"));
3298    }
3299
3300    #[test]
3301    fn display_stopped_includes_exit_code() {
3302        let st = ProcStatus::Stopped {
3303            exit_code: 7,
3304            stderr_tail: Vec::new(),
3305        };
3306        let s = format!("{}", st);
3307        assert!(s.contains("Stopped"));
3308        assert!(s.contains("7"));
3309    }
3310
3311    #[test]
3312    fn display_other_variants_does_not_panic() {
3313        let samples = vec![
3314            ProcStatus::Starting,
3315            ProcStatus::Stopping {
3316                pid: 42,
3317                started_at: RealClock.system_time_now(),
3318            },
3319            ProcStatus::Ready {
3320                pid: 42,
3321                started_at: RealClock.system_time_now(),
3322                addr: ChannelAddr::any(ChannelTransport::Unix),
3323                agent: ActorRef::attest(
3324                    ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "x".into())
3325                        .actor_id("agent", 0),
3326                ),
3327            },
3328            ProcStatus::Killed {
3329                signal: 9,
3330                core_dumped: false,
3331            },
3332            ProcStatus::Failed {
3333                reason: "boom".into(),
3334            },
3335        ];
3336
3337        for st in samples {
3338            let _ = format!("{}", st); // Just make sure it doesn't panic.
3339        }
3340    }
3341
3342    #[tokio::test]
3343    async fn proc_handle_ready_ok_through_trait() {
3344        let child = Command::new("sh")
3345            .arg("-c")
3346            .arg("sleep 0.1")
3347            .stdin(Stdio::null())
3348            .stdout(Stdio::null())
3349            .stderr(Stdio::null())
3350            .spawn()
3351            .expect("spawn");
3352
3353        let proc_id = ProcId::Direct(any_addr_for_test(), "ph-ready-ok".into());
3354        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3355
3356        // Starting -> Running
3357        let pid = handle.pid().expect("pid");
3358        let t0 = RealClock.system_time_now();
3359        assert!(handle.mark_running(pid, t0));
3360
3361        // Synthesize Ready data
3362        let addr = any_addr_for_test();
3363        let agent: ActorRef<ProcMeshAgent> =
3364            ActorRef::attest(ActorId(proc_id.clone(), "agent".into(), 0));
3365        assert!(handle.mark_ready(pid, t0, addr, agent));
3366
3367        // Call the trait method (not ready_inner).
3368        let r = <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await;
3369        assert!(r.is_ok(), "expected Ok(()), got {r:?}");
3370    }
3371
3372    #[tokio::test]
3373    async fn proc_handle_wait_returns_terminal_status() {
3374        let child = Command::new("sh")
3375            .arg("-c")
3376            .arg("exit 0")
3377            .stdin(Stdio::null())
3378            .stdout(Stdio::null())
3379            .stderr(Stdio::null())
3380            .spawn()
3381            .expect("spawn");
3382
3383        let proc_id = ProcId::Direct(any_addr_for_test(), "ph-wait".into());
3384        let handle = BootstrapProcHandle::new(proc_id, child);
3385
3386        // Drive directly to a terminal state before calling wait()
3387        assert!(handle.mark_stopped(0, Vec::new()));
3388
3389        // Call the trait method (not wait_inner)
3390        let st = <BootstrapProcHandle as hyperactor::host::ProcHandle>::wait(&handle)
3391            .await
3392            .expect("wait should return Ok(terminal)");
3393
3394        match st {
3395            ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
3396            other => panic!("expected Stopped(0), got {other:?}"),
3397        }
3398    }
3399
3400    #[tokio::test]
3401    async fn ready_wrapper_maps_terminal_to_trait_error() {
3402        let child = Command::new("sh")
3403            .arg("-c")
3404            .arg("exit 7")
3405            .stdin(Stdio::null())
3406            .stdout(Stdio::null())
3407            .stderr(Stdio::null())
3408            .spawn()
3409            .expect("spawn");
3410        let proc_id = ProcId::Direct(any_addr_for_test(), "wrap".into());
3411        let handle = BootstrapProcHandle::new(proc_id, child);
3412
3413        assert!(handle.mark_stopped(7, Vec::new()));
3414
3415        match <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await {
3416            Ok(()) => panic!("expected Err"),
3417            Err(hyperactor::host::ReadyError::Terminal(ProcStatus::Stopped {
3418                exit_code, ..
3419            })) => {
3420                assert_eq!(exit_code, 7);
3421            }
3422            Err(e) => panic!("unexpected error: {e:?}"),
3423        }
3424    }
3425
3426    /// Create a ProcId and a host **backend_addr** channel that the
3427    /// bootstrap child proc will dial to attach its mailbox to the
3428    /// host.
3429    ///
3430    /// - `proc_id`: logical identity for the child proc (pure name;
3431    ///   not an OS pid).
3432    /// - `backend_addr`: a mailbox address served by the **parent
3433    ///   (host) proc** here; the spawned bootstrap process dials this
3434    ///   so its messages route via the host.
3435    async fn make_proc_id_and_backend_addr(
3436        instance: &hyperactor::Instance<()>,
3437        _tag: &str,
3438    ) -> (ProcId, ChannelAddr) {
3439        // Serve a Unix channel as the "backend_addr" and hook it into
3440        // this test proc.
3441        let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
3442
3443        // Route messages arriving on backend_addr into this test
3444        // proc's mailbox so the bootstrap child can reach the host
3445        // router.
3446        instance.proc().clone().serve(rx);
3447
3448        // We return an arbitrary (but unbound!) unix direct proc id here;
3449        // it is okay, as we're not testing connectivity.
3450        let proc_id = ProcId::Direct(ChannelTransport::Unix.any(), "test".to_string());
3451        (proc_id, backend_addr)
3452    }
3453
3454    #[tokio::test]
3455    #[cfg(fbcode_build)]
3456    async fn bootstrap_handle_terminate_graceful() {
3457        // Create a root direct-addressed proc + client instance.
3458        let root = hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3459            .await
3460            .unwrap();
3461        let (instance, _handle) = root.instance("client").unwrap();
3462
3463        let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3464        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3465        let handle = mgr
3466            .spawn(
3467                proc_id.clone(),
3468                backend_addr.clone(),
3469                BootstrapProcConfig {
3470                    create_rank: 0,
3471                    client_config_override: Attrs::new(),
3472                },
3473            )
3474            .await
3475            .expect("spawn bootstrap child");
3476
3477        handle.ready().await.expect("ready");
3478
3479        let deadline = Duration::from_secs(2);
3480        match RealClock
3481            .timeout(deadline * 2, handle.terminate(&instance, deadline))
3482            .await
3483        {
3484            Err(_) => panic!("terminate() future hung"),
3485            Ok(Ok(st)) => {
3486                match st {
3487                    ProcStatus::Stopped { exit_code, .. } => {
3488                        // child called exit(0) on SIGTERM
3489                        assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3490                    }
3491                    ProcStatus::Killed { signal, .. } => {
3492                        // If the child didn't trap SIGTERM, we'd see
3493                        // SIGTERM (15) here and indeed, this is what
3494                        // we see. Since we call
3495                        // `hyperactor::initialize_with_current_runtime();`
3496                        // we seem unable to trap `SIGTERM` and
3497                        // instead folly intercepts:
3498                        // [0] *** Aborted at 1758850539 (Unix time, try 'date -d @1758850539') ***
3499                        // [0] *** Signal 15 (SIGTERM) (0x3951c00173692) received by PID 1527420 (pthread TID 0x7f803de66cc0) (linux TID 1527420) (maybe from PID 1521298, UID 234780) (code: 0), stack trace: ***
3500                        // [0]     @ 000000000000e713 folly::symbolizer::(anonymous namespace)::innerSignalHandler(int, siginfo_t*, void*)
3501                        // [0]                        ./fbcode/folly/debugging/symbolizer/SignalHandler.cpp:485
3502                        // It gets worse. When run with
3503                        // '@fbcode//mode/dev-nosan' it terminates
3504                        // with a SEGFAULT (metamate says this is a
3505                        // well known issue at Meta). So, TL;DR I
3506                        // restore default `SIGTERM` handling after
3507                        // the test exe has called
3508                        // `initialize_with_runtime`.
3509                        assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3510                    }
3511                    other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3512                }
3513            }
3514            Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3515        }
3516    }
3517
3518    #[tokio::test]
3519    #[cfg(fbcode_build)]
3520    async fn bootstrap_handle_kill_forced() {
3521        // Root proc + client instance (so the child can dial back).
3522        let root = hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3523            .await
3524            .unwrap();
3525        let (instance, _handle) = root.instance("client").unwrap();
3526
3527        let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3528
3529        // Proc identity + host backend channel the child will dial.
3530        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3531
3532        // Launch the child bootstrap process.
3533        let handle = mgr
3534            .spawn(
3535                proc_id.clone(),
3536                backend_addr.clone(),
3537                BootstrapProcConfig {
3538                    create_rank: 0,
3539                    client_config_override: Attrs::new(),
3540                },
3541            )
3542            .await
3543            .expect("spawn bootstrap child");
3544
3545        // Wait until the child reports Ready (addr+agent returned via
3546        // callback).
3547        handle.ready().await.expect("ready");
3548
3549        // Force-kill the child and assert we observe a Killed
3550        // terminal status.
3551        let deadline = Duration::from_secs(5);
3552        match RealClock.timeout(deadline, handle.kill()).await {
3553            Err(_) => panic!("kill() future hung"),
3554            Ok(Ok(st)) => {
3555                // We expect a KILLED terminal state.
3556                match st {
3557                    ProcStatus::Killed { signal, .. } => {
3558                        // On Linux this should be SIGKILL (9).
3559                        assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3560                    }
3561                    other => panic!("expected Killed status after kill(); got: {other:?}"),
3562                }
3563            }
3564            Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3565        }
3566    }
3567
3568    #[tokio::test]
3569    #[cfg(fbcode_build)]
3570    async fn bootstrap_canonical_simple() {
3571        // SAFETY: unit-test scoped
3572        unsafe {
3573            std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false");
3574        }
3575        // Create a "root" direct addressed proc.
3576        let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3577            .await
3578            .unwrap();
3579        // Create an actor instance we'll use to send and receive
3580        // messages.
3581        let (instance, _handle) = proc.instance("client").unwrap();
3582
3583        // Configure a ProcessAllocator with the bootstrap binary.
3584        let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3585            "monarch/hyperactor_mesh/bootstrap",
3586        )));
3587        // Request a new allocation of procs from the ProcessAllocator.
3588        let alloc = allocator
3589            .allocate(AllocSpec {
3590                extent: extent!(replicas = 1),
3591                constraints: Default::default(),
3592                proc_name: None,
3593                transport: ChannelTransport::Unix,
3594                proc_allocation_mode: Default::default(),
3595            })
3596            .await
3597            .unwrap();
3598
3599        // Build a HostMesh with explicit OS-process boundaries (per
3600        // rank):
3601        //
3602        // (1) Allocator → bootstrap proc [OS process #1]
3603        //     `ProcMesh::allocate(..)` starts one OS process per
3604        //     rank; each runs our runtime and the trampoline actor.
3605        //
3606        // (2) Host::serve(..) sets up a Host in the same OS process
3607        //     (no new process). It binds front/back channels, creates
3608        //     an in-process service proc (`Proc::new(..)`), and
3609        //     stores the `BootstrapProcManager` for later spawns.
3610        //
3611        // (3) Install HostMeshAgent (still no new OS process).
3612        //     `host.system_proc().spawn::<HostMeshAgent>("agent",
3613        //     host).await?` creates the HostMeshAgent actor in that
3614        //     service proc.
3615        //
3616        // (4) Collect & assemble. The trampoline returns a
3617        //     direct-addressed `ActorRef<HostMeshAgent>`; we collect
3618        //     one per rank and assemble a `HostMesh`.
3619        //
3620        // Note: When the Host is later asked to start a proc
3621        // (`host.spawn(name)`), it calls `ProcManager::spawn` on the
3622        // stored `BootstrapProcManager`, which does a
3623        // `Command::spawn()` to launch a new OS child process for
3624        // that proc.
3625        let host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3626            .await
3627            .unwrap();
3628
3629        // Spawn a ProcMesh named "p0" on the host mesh:
3630        //
3631        // (1) Each HostMeshAgent (running inside its host's service
3632        // proc) receives the request.
3633        //
3634        // (2) The Host calls into its `BootstrapProcManager::spawn`,
3635        // which does `Command::spawn()` to launch a brand-new OS
3636        // process for the proc.
3637        //
3638        // (3) Inside that new process, bootstrap runs and a
3639        // `ProcMeshAgent` is started to manage it.
3640        //
3641        // (4) We collect the per-host procs into a `ProcMesh` and
3642        // return it.
3643        let proc_mesh = host_mesh
3644            .spawn(&instance, "p0", Extent::unity())
3645            .await
3646            .unwrap();
3647
3648        // Note: There is no support for status() in v1.
3649        // assert!(proc_mesh.status(&instance).await.is_err());
3650        // let proc_ref = proc_mesh.values().next().expect("one proc");
3651        // assert!(proc_ref.status(&instance).await.unwrap(), "proc should be alive");
3652
3653        // Spawn an `ActorMesh<TestActor>` named "a0" on the proc mesh:
3654        //
3655        // (1) For each proc (already running in its own OS process),
3656        // the `ProcMeshAgent` receives the request.
3657        //
3658        // (2) It spawns a `TestActor` inside that existing proc (no
3659        // new OS process).
3660        //
3661        // (3) The per-proc actors are collected into an
3662        // `ActorMesh<TestActor>` and returned.
3663        let actor_mesh: ActorMesh<testactor::TestActor> =
3664            proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3665
3666        // Open a fresh port on the client instance and send a
3667        // GetActorId message to the actor mesh. Each TestActor will
3668        // reply with its actor ID to the bound port. Receive one
3669        // reply and assert it matches the ID of the (single) actor in
3670        // the mesh.
3671        let (port, mut rx) = instance.mailbox().open_port();
3672        actor_mesh
3673            .cast(&instance, testactor::GetActorId(port.bind()))
3674            .unwrap();
3675        let got_id = rx.recv().await.unwrap();
3676        assert_eq!(
3677            got_id,
3678            actor_mesh.values().next().unwrap().actor_id().clone()
3679        );
3680
3681        // **Important**: If we don't shutdown the hosts, the
3682        // BootstrapProcManager's won't send SIGKILLs to their spawned
3683        // children and there will be orphans left behind.
3684        host_mesh.shutdown(&instance).await.expect("host shutdown");
3685    }
3686
3687    #[tokio::test]
3688    async fn exit_tail_is_attached_and_logged() {
3689        hyperactor_telemetry::initialize_logging_for_test();
3690
3691        let lock = hyperactor::config::global::lock();
3692        let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
3693
3694        // Spawn a child that writes to stderr then exits 7.
3695        let mut cmd = Command::new("sh");
3696        cmd.arg("-c")
3697            .arg("printf 'boom-1\\nboom-2\\n' 1>&2; exit 7")
3698            .stdout(Stdio::piped())
3699            .stderr(Stdio::piped());
3700
3701        let child = cmd.spawn().expect("spawn");
3702        let pid = child.id().unwrap_or_default();
3703
3704        // Build a BootstrapProcHandle around this child (like
3705        // manager.spawn() does).
3706        let proc_id = hyperactor::id!(testproc[0]);
3707        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3708
3709        // Wire tailers + dummy writers (stdout/stderr -> sinks), then
3710        // stash them on the handle.
3711        {
3712            // Lock the child to get its pipes.
3713            let mut guard = handle.child.lock().expect("child mutex poisoned");
3714            if let Some(child) = guard.as_mut() {
3715                let out = child.stdout.take().expect("child stdout must be piped");
3716                let err = child.stderr.take().expect("child stderr must be piped");
3717
3718                // Set up logging like ProcManager does
3719                let log_channel = ChannelAddr::any(ChannelTransport::Unix);
3720                let tail_size = hyperactor::config::global::get(MESH_TAIL_LOG_LINES);
3721
3722                // Set up StreamFwders if pipes are available
3723                let stdout_monitor = StreamFwder::start(
3724                    out,
3725                    None,
3726                    OutputTarget::Stdout,
3727                    tail_size,
3728                    Some(log_channel.clone()),
3729                    pid,
3730                    0,
3731                );
3732
3733                let stderr_monitor = StreamFwder::start(
3734                    err,
3735                    None,
3736                    OutputTarget::Stderr,
3737                    tail_size,
3738                    Some(log_channel.clone()),
3739                    pid,
3740                    0,
3741                );
3742
3743                // Set the stream monitors on the handle
3744                handle.set_stream_monitors(Some(stdout_monitor), Some(stderr_monitor));
3745                tracing::info!("set stream monitors");
3746            } else {
3747                panic!("child already taken before wiring tailers");
3748            }
3749        }
3750
3751        // Start an exit monitor (consumes the Child and tailers from
3752        // the handle).
3753        let manager = BootstrapProcManager::new(BootstrapCommand {
3754            program: std::path::PathBuf::from("/bin/true"), // unused in this test
3755            ..Default::default()
3756        })
3757        .unwrap();
3758
3759        // Give the monitors time to start
3760        RealClock.sleep(Duration::from_millis(1000)).await;
3761
3762        // Now start the exit monitor which will take the child and wait for it to exit
3763        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3764
3765        // Await terminal status and assert on exit code and stderr
3766        // tail.
3767        let st = RealClock
3768            .timeout(Duration::from_secs(2), handle.wait_inner())
3769            .await
3770            .expect("wait_inner() timed out (exit monitor stuck?)");
3771
3772        match st {
3773            ProcStatus::Stopped {
3774                exit_code,
3775                stderr_tail,
3776            } => {
3777                assert_eq!(
3778                    exit_code, 7,
3779                    "unexpected exit code; stderr_tail={:?}",
3780                    stderr_tail
3781                );
3782                let tail = stderr_tail.join("\n");
3783                assert!(tail.contains("boom-1"), "missing boom-1; tail:\n{tail}");
3784                assert!(tail.contains("boom-2"), "missing boom-2; tail:\n{tail}");
3785            }
3786            other => panic!("expected Stopped(7), got {other:?}"),
3787        }
3788    }
3789}