hyperactor_mesh/
bootstrap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::env::VarError;
12use std::fmt;
13use std::fs::OpenOptions;
14use std::future;
15use std::io;
16use std::io::Write;
17use std::os::unix::process::ExitStatusExt;
18use std::path::PathBuf;
19use std::process::Stdio;
20use std::sync::Arc;
21use std::sync::Mutex;
22use std::sync::OnceLock;
23use std::time::Duration;
24use std::time::SystemTime;
25
26use async_trait::async_trait;
27use base64::prelude::*;
28use futures::StreamExt;
29use futures::stream;
30use humantime::format_duration;
31use hyperactor::ActorRef;
32use hyperactor::Named;
33use hyperactor::ProcId;
34use hyperactor::attrs::Attrs;
35use hyperactor::channel;
36use hyperactor::channel::ChannelAddr;
37use hyperactor::channel::ChannelTransport;
38use hyperactor::channel::Rx;
39use hyperactor::channel::Tx;
40use hyperactor::clock::Clock;
41use hyperactor::clock::RealClock;
42use hyperactor::config::CONFIG_ENV_VAR;
43use hyperactor::config::global as config;
44use hyperactor::declare_attrs;
45use hyperactor::host;
46use hyperactor::host::Host;
47use hyperactor::host::HostError;
48use hyperactor::host::ProcHandle;
49use hyperactor::host::ProcManager;
50use hyperactor::host::TerminateSummary;
51use hyperactor::mailbox::MailboxServer;
52use hyperactor::proc::Proc;
53use serde::Deserialize;
54use serde::Serialize;
55use tokio::process::Child;
56use tokio::process::Command;
57use tokio::sync::oneshot;
58use tokio::sync::watch;
59
60use crate::alloc::logtailer::LogTailer;
61use crate::logging::create_log_writers;
62use crate::proc_mesh::mesh_agent::ProcMeshAgent;
63use crate::v1;
64use crate::v1::host_mesh::mesh_agent::HostAgentMode;
65use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
66
67declare_attrs! {
68    /// If enabled (default), bootstrap child processes install
69    /// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
70    /// parent dies unexpectedly. This is a **production safety net**
71    /// against leaked children; tests usually disable it via
72    /// `std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG",
73    /// "false")`.
74    @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string())
75    pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
76
77    /// Maximum number of log lines retained in a proc's stderr/stdout
78    /// tail buffer. Used by [`LogTailer::tee`] when wiring child
79    /// pipes. Default: 100
80    @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TAIL_LOG_LINES".to_string())
81    pub attr MESH_TAIL_LOG_LINES: usize = 100;
82
83    /// Maximum number of child terminations to run concurrently
84    /// during bulk shutdown. Prevents unbounded spawning of
85    /// termination tasks (which could otherwise spike CPU, I/O, or
86    /// file descriptor load).
87    @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string())
88    pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
89
90    /// Per-child grace window for termination. When a shutdown is
91    /// requested, the manager sends SIGTERM and waits this long for
92    /// the child to exit before escalating to SIGKILL.
93    @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string())
94    pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
95}
96
97pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
98pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
99pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
100/// A channel used by each process to receive its own stdout and stderr
101/// Because stdout and stderr can only be obtained by the parent process,
102/// they need to be streamed back to the process.
103pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
104
105/// Messages sent from the process to the allocator. This is an envelope
106/// containing the index of the process (i.e., its "address" assigned by
107/// the allocator), along with the control message in question.
108#[derive(Debug, Clone, Serialize, Deserialize, Named)]
109pub(crate) struct Process2Allocator(pub usize, pub Process2AllocatorMessage);
110
111/// Control messages sent from processes to the allocator.
112#[derive(Debug, Clone, Serialize, Deserialize, Named)]
113pub(crate) enum Process2AllocatorMessage {
114    /// Initialize a process2allocator session. The process is
115    /// listening on the provided channel address, to which
116    /// [`Allocator2Process`] messages are sent.
117    Hello(ChannelAddr),
118
119    /// A proc with the provided ID was started. Its mailbox is
120    /// served at the provided channel address. Procs are started
121    /// after instruction by the allocator through the corresponding
122    /// [`Allocator2Process`] message.
123    StartedProc(ProcId, ActorRef<ProcMeshAgent>, ChannelAddr),
124
125    Heartbeat,
126}
127
128/// Messages sent from the allocator to a process.
129#[derive(Debug, Clone, Serialize, Deserialize, Named)]
130pub(crate) enum Allocator2Process {
131    /// Request to start a new proc with the provided ID, listening
132    /// to an address on the indicated channel transport.
133    StartProc(ProcId, ChannelTransport),
134
135    /// A request for the process to shut down its procs and exit the
136    /// process with the provided code.
137    StopAndExit(i32),
138
139    /// A request for the process to immediately exit with the provided
140    /// exit code
141    Exit(i32),
142}
143
144async fn exit_if_missed_heartbeat(bootstrap_index: usize, bootstrap_addr: ChannelAddr) {
145    let tx = match channel::dial(bootstrap_addr.clone()) {
146        Ok(tx) => tx,
147
148        Err(err) => {
149            tracing::error!(
150                "Failed to establish heartbeat connection to allocator, exiting! (addr: {:?}): {}",
151                bootstrap_addr,
152                err
153            );
154            std::process::exit(1);
155        }
156    };
157    tracing::info!(
158        "Heartbeat connection established to allocator (idx: {bootstrap_index}, addr: {bootstrap_addr:?})",
159    );
160    loop {
161        RealClock.sleep(Duration::from_secs(5)).await;
162
163        let result = tx
164            .send(Process2Allocator(
165                bootstrap_index,
166                Process2AllocatorMessage::Heartbeat,
167            ))
168            .await;
169
170        if let Err(err) = result {
171            tracing::error!(
172                "Heartbeat failed to allocator, exiting! (addr: {:?}): {}",
173                bootstrap_addr,
174                err
175            );
176            std::process::exit(1);
177        }
178    }
179}
180
181#[macro_export]
182macro_rules! ok {
183    ($expr:expr $(,)?) => {
184        match $expr {
185            Ok(value) => value,
186            Err(e) => return ::anyhow::Error::from(e),
187        }
188    };
189}
190
191async fn halt<R>() -> R {
192    future::pending::<()>().await;
193    unreachable!()
194}
195
196/// Bootstrap configures how a mesh process starts up.
197///
198/// Both `Proc` and `Host` variants may include an optional
199/// configuration snapshot (`hyperactor::config::Attrs`). This
200/// snapshot is serialized into the bootstrap payload and made
201/// available to the child. Interpretation and application of that
202/// snapshot is up to the child process; if omitted, the child falls
203/// back to environment/default values.
204#[derive(Clone, Default, Debug, Serialize, Deserialize)]
205pub enum Bootstrap {
206    /// "v1" proc bootstrap
207    Proc {
208        /// The ProcId of the proc to be bootstrapped.
209        proc_id: ProcId,
210        /// The backend address to which messages are forwarded.
211        /// See [`hyperactor::host`] for channel topology details.
212        backend_addr: ChannelAddr,
213        /// The callback address used to indicate successful spawning.
214        callback_addr: ChannelAddr,
215        /// Optional config snapshot (`hyperactor::config::Attrs`)
216        /// captured by the parent. If present, the child installs it
217        /// as the `Runtime` layer so the parent's effective config
218        /// takes precedence over Env/File/Defaults.
219        config: Option<Attrs>,
220    },
221
222    /// Host bootstrap. This sets up a new `Host`, managed by a
223    /// [`crate::v1::host_mesh::mesh_agent::HostMeshAgent`].
224    Host {
225        /// The address on which to serve the host.
226        addr: ChannelAddr,
227        /// If specified, use the provided command instead of
228        /// [`BootstrapCommand::current`].
229        command: Option<BootstrapCommand>,
230        /// Optional config snapshot (`hyperactor::config::Attrs`)
231        /// captured by the parent. If present, the child installs it
232        /// as the `Runtime` layer so the parent’s effective config
233        /// takes precedence over Env/File/Defaults.
234        config: Option<Attrs>,
235    },
236
237    #[default]
238    V0ProcMesh, // pass through to the v0 allocator
239}
240
241impl Bootstrap {
242    /// Serialize the mode into a environment-variable-safe string by
243    /// base64-encoding its JSON representation.
244    fn to_env_safe_string(&self) -> v1::Result<String> {
245        Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
246    }
247
248    /// Deserialize the mode from the representation returned by [`to_env_safe_string`].
249    fn from_env_safe_string(str: &str) -> v1::Result<Self> {
250        let data = BASE64_STANDARD.decode(str)?;
251        let data = std::str::from_utf8(&data)?;
252        Ok(serde_json::from_str(data)?)
253    }
254
255    /// Get a bootstrap configuration from the environment; returns `None`
256    /// if the environment does not specify a boostrap config.
257    pub fn get_from_env() -> anyhow::Result<Option<Self>> {
258        match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
259            Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
260                Ok(mode) => Ok(Some(mode)),
261                Err(e) => {
262                    Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
263                }
264            },
265            Err(VarError::NotPresent) => Ok(None),
266            Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
267        }
268    }
269
270    /// Inject this bootstrap configuration into the environment of the provided command.
271    pub fn to_env(&self, cmd: &mut Command) {
272        cmd.env(
273            "HYPERACTOR_MESH_BOOTSTRAP_MODE",
274            self.to_env_safe_string().unwrap(),
275        );
276    }
277
278    /// Bootstrap this binary according to this configuration.
279    /// This either runs forever, or returns an error.
280    pub async fn bootstrap(self) -> anyhow::Error {
281        tracing::info!(
282            "bootstrapping mesh process: {}",
283            serde_json::to_string(&self).unwrap()
284        );
285
286        if Debug::is_active() {
287            let mut buf = Vec::new();
288            writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
289            #[cfg(unix)]
290            writeln!(
291                &mut buf,
292                "\tparent pid: {}",
293                std::os::unix::process::parent_id()
294            )
295            .unwrap();
296            writeln!(
297                &mut buf,
298                "\tconfig: {}",
299                serde_json::to_string(&self).unwrap()
300            )
301            .unwrap();
302            match std::env::current_exe() {
303                Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
304                Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
305            }
306            writeln!(&mut buf, "\targs:").unwrap();
307            for arg in std::env::args() {
308                writeln!(&mut buf, "\t\t{}", arg).unwrap();
309            }
310            writeln!(&mut buf, "\tenv:").unwrap();
311            for (key, val) in std::env::vars() {
312                writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
313            }
314            let _ = Debug.write(&buf);
315            if let Ok(s) = std::str::from_utf8(&buf) {
316                tracing::info!("{}", s);
317            } else {
318                tracing::info!("{:?}", buf);
319            }
320        }
321
322        match self {
323            Bootstrap::Proc {
324                proc_id,
325                backend_addr,
326                callback_addr,
327                config,
328            } => {
329                if let Some(attrs) = config {
330                    config::set(config::Source::Runtime, attrs);
331                    tracing::debug!("bootstrap: installed Runtime config snapshot (Proc)");
332                } else {
333                    tracing::debug!("bootstrap: no config snapshot provided (Proc)");
334                }
335
336                if hyperactor::config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
337                    // Safety net: normal shutdown is via
338                    // `host_mesh.shutdown(&instance)`; PR_SET_PDEATHSIG
339                    // is a last-resort guard against leaks if that
340                    // protocol is bypassed.
341                    let _ = install_pdeathsig_kill();
342                } else {
343                    eprintln!("(bootstrap) PDEATHSIG disabled via config");
344                }
345
346                let result =
347                    host::spawn_proc(proc_id, backend_addr, callback_addr, |proc| async move {
348                        ProcMeshAgent::boot_v1(proc).await
349                    })
350                    .await;
351                match result {
352                    Ok(_proc) => halt().await,
353                    Err(e) => e.into(),
354                }
355            }
356            Bootstrap::Host {
357                addr,
358                command,
359                config,
360            } => {
361                if let Some(attrs) = config {
362                    config::set(config::Source::Runtime, attrs);
363                    tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
364                } else {
365                    tracing::debug!("bootstrap: no config snapshot provided (Host)");
366                }
367
368                let command = match command {
369                    Some(command) => command,
370                    None => ok!(BootstrapCommand::current()),
371                };
372                let manager = BootstrapProcManager::new(command);
373                let (host, _handle) = ok!(Host::serve(manager, addr).await);
374                let addr = host.addr().clone();
375                let host_mesh_agent = ok!(host
376                    .system_proc()
377                    .clone()
378                    .spawn::<HostMeshAgent>("agent", HostAgentMode::Process(host))
379                    .await);
380
381                tracing::info!(
382                    "serving host at {}, agent: {}",
383                    addr,
384                    host_mesh_agent.bind::<HostMeshAgent>()
385                );
386                halt().await
387            }
388            Bootstrap::V0ProcMesh => bootstrap_v0_proc_mesh().await,
389        }
390    }
391
392    /// A variant of [`bootstrap`] that logs the error and exits the process
393    /// if bootstrapping fails.
394    pub async fn bootstrap_or_die(self) -> ! {
395        let err = self.bootstrap().await;
396        tracing::error!("failed to bootstrap mesh process: {}", err);
397        std::process::exit(1)
398    }
399}
400
401/// Install "kill me if parent dies" and close the race window.
402pub fn install_pdeathsig_kill() -> io::Result<()> {
403    #[cfg(target_os = "linux")]
404    {
405        // SAFETY: Calling into libc; does not dereference memory, just
406        // asks the kernel to deliver SIGKILL on parent death.
407        let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
408        if rc != 0 {
409            return Err(io::Error::last_os_error());
410        }
411    }
412    // Race-close: if the parent died between our exec and prctl(),
413    // we won't get a signal, so detect that and exit now.
414    //
415    // If getppid() == 1, we've already been reparented (parent gone).
416    // SAFETY: `getppid()` is a simple libc syscall returning the
417    // parent PID; it has no side effects and does not touch memory.
418    if unsafe { libc::getppid() } == 1 {
419        std::process::exit(0);
420    }
421    Ok(())
422}
423
424/// Represents the lifecycle state of a **proc as hosted in an OS
425/// process** managed by `BootstrapProcManager`.
426///
427/// Note: This type is deliberately distinct from [`ProcState`] and
428/// [`ProcStopReason`] (see `alloc.rs`). Those types model allocator
429/// *events* - e.g. "a proc was Created/Running/Stopped" - and are
430/// consumed from an event stream during allocation. By contrast,
431/// [`ProcStatus`] is a **live, queryable view**: it reflects the
432/// current observed status of a running proc, as seen through the
433/// [`BootstrapProcHandle`] API (stop, kill, status).
434///
435/// In short:
436/// - `ProcState`/`ProcStopReason`: historical / event-driven model
437/// - `ProcStatus`: immediate status surface for lifecycle control
438#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
439pub enum ProcStatus {
440    /// The OS process has been spawned but is not yet fully running.
441    /// (Process-level: child handle exists, no confirmation yet.)
442    Starting,
443    /// The OS process is alive and considered running.
444    /// (Process-level: `pid` is known; Proc-level: bootstrap
445    /// may still be running.)
446    Running { pid: u32, started_at: SystemTime },
447    /// Ready means bootstrap has completed and the proc is serving.
448    /// (Process-level: `pid` is known; Proc-level: bootstrap
449    /// completed.)
450    Ready {
451        pid: u32,
452        started_at: SystemTime,
453        addr: ChannelAddr,
454        agent: ActorRef<ProcMeshAgent>,
455    },
456    /// A stop has been requested (SIGTERM, graceful shutdown, etc.),
457    /// but the OS process has not yet fully exited. (Proc-level:
458    /// shutdown in progress; Process-level: still running.)
459    Stopping { pid: u32, started_at: SystemTime },
460    /// The process exited with a normal exit code. (Process-level:
461    /// exit observed.)
462    Stopped {
463        exit_code: i32,
464        stderr_tail: Vec<String>,
465    },
466    /// The process was killed by a signal (e.g. SIGKILL).
467    /// (Process-level: abnormal termination.)
468    Killed { signal: i32, core_dumped: bool },
469    /// The proc or its process failed for some other reason
470    /// (bootstrap error, unexpected condition, etc.). (Both levels:
471    /// catch-all failure.)
472    Failed { reason: String },
473}
474
475impl ProcStatus {
476    /// Returns `true` if the proc is in a terminal (exited) state:
477    /// [`ProcStatus::Stopped`], [`ProcStatus::Killed`], or
478    /// [`ProcStatus::Failed`].
479    #[inline]
480    pub fn is_exit(&self) -> bool {
481        matches!(
482            self,
483            ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
484        )
485    }
486}
487
488impl std::fmt::Display for ProcStatus {
489    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
490        match self {
491            ProcStatus::Starting => write!(f, "Starting"),
492            ProcStatus::Running { pid, started_at } => {
493                let uptime = started_at
494                    .elapsed()
495                    .map(|d| format!(" up {}", format_duration(d)))
496                    .unwrap_or_default();
497                write!(f, "Running[{pid}]{uptime}")
498            }
499            ProcStatus::Ready {
500                pid,
501                started_at,
502                addr,
503                ..
504            } => {
505                let uptime = started_at
506                    .elapsed()
507                    .map(|d| format!(" up {}", format_duration(d)))
508                    .unwrap_or_default();
509                write!(f, "Ready[{pid}] at {addr}{uptime}")
510            }
511            ProcStatus::Stopping { pid, started_at } => {
512                let uptime = started_at
513                    .elapsed()
514                    .map(|d| format!(" up {}", format_duration(d)))
515                    .unwrap_or_default();
516                write!(f, "Stopping[{pid}]{uptime}")
517            }
518            ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
519            ProcStatus::Killed {
520                signal,
521                core_dumped,
522            } => {
523                if *core_dumped {
524                    write!(f, "Killed(sig={signal}, core)")
525                } else {
526                    write!(f, "Killed(sig={signal})")
527                }
528            }
529            ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
530        }
531    }
532}
533
534/// Error returned by [`BootstrapProcHandle::ready`].
535#[derive(Debug, Clone)]
536pub enum ReadyError {
537    /// The proc reached a terminal state before `Ready`.
538    Terminal(ProcStatus),
539    /// The internal watch channel closed unexpectedly.
540    ChannelClosed,
541}
542
543impl std::fmt::Display for ReadyError {
544    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
545        match self {
546            ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
547            ReadyError::ChannelClosed => write!(f, "status channel closed"),
548        }
549    }
550}
551impl std::error::Error for ReadyError {}
552
553/// A handle to a proc launched by [`BootstrapProcManager`].
554///
555/// `BootstrapProcHandle` is a lightweight supervisor for an external
556/// process: it tracks and broadcasts lifecycle state, and exposes a
557/// small control/observation surface. While it may temporarily hold a
558/// `tokio::process::Child` (shared behind a mutex) so the exit
559/// monitor can `wait()` it, it is **not** the unique owner of the OS
560/// process, and dropping a `BootstrapProcHandle` does not by itself
561/// terminate the process.
562///
563/// What it pairs together:
564/// - the **logical proc identity** (`ProcId`)
565/// - the **live status surface** ([`ProcStatus`]), available both as
566///   a synchronous snapshot (`status()`) and as an async stream via a
567///   `tokio::sync::watch` channel (`watch()` / `changed()`)
568///
569/// Responsibilities:
570/// - Retain the child handle only until the exit monitor claims it,
571///   so the OS process can be awaited and its terminal status
572///   recorded.
573/// - Hold stdout/stderr tailers until the exit monitor takes them,
574///   then join to recover buffered output for diagnostics.
575/// - Update status via the `mark_*` transitions and broadcast changes
576///   over the watch channel so tasks can `await` lifecycle
577///   transitions without polling.
578/// - Provide the foundation for higher-level APIs like `wait()`
579///   (await terminal) and, later, `terminate()` / `kill()`.
580///
581/// Notes:
582/// - Manager-level cleanup happens in [`BootstrapProcManager::drop`]:
583///   it SIGKILLs any still-recorded PIDs; we do not rely on
584///   `Child::kill_on_drop`.
585///
586/// Relationship to types:
587/// - [`ProcStatus`]: live status surface, updated by this handle.
588/// - [`ProcState`]/[`ProcStopReason`] (in `alloc.rs`):
589///   allocator-facing, historical event log; not directly updated by
590///   this type.
591#[derive(Clone)]
592pub struct BootstrapProcHandle {
593    /// Logical identity of the proc in the mesh.
594    proc_id: ProcId,
595    /// Live lifecycle snapshot (see [`ProcStatus`]). Kept in a mutex
596    /// so [`BootstrapProcHandle::status`] can return a synchronous
597    /// copy. All mutations now flow through
598    /// [`BootstrapProcHandle::transition`], which updates this field
599    /// under the lock and then broadcasts on the watch channel.
600    status: Arc<std::sync::Mutex<ProcStatus>>,
601    /// Underlying OS child handle. Held only until the exit monitor
602    /// claims it (consumed by `wait()` there). Not relied on for
603    /// teardown; manager `Drop` handles best-effort SIGKILL.
604    child: Arc<std::sync::Mutex<Option<Child>>>,
605    /// Stdout tailer for this proc. Created with `LogTailer::tee`, it
606    /// forwards output to a writer and keeps a bounded ring buffer.
607    /// Transferred to the exit monitor, which joins it after `wait()`
608    /// to recover buffered lines.
609    stdout_tailer: Arc<std::sync::Mutex<Option<LogTailer>>>,
610    /// Stderr tailer for this proc. Same behavior as `stdout_tailer`
611    /// but for stderr (used for exit-reason enrichment).
612    stderr_tailer: Arc<std::sync::Mutex<Option<LogTailer>>>,
613    /// Watch sender for status transitions. Every `mark_*` goes
614    /// through [`BootstrapProcHandle::transition`], which updates the
615    /// snapshot under the lock and then `send`s the new
616    /// [`ProcStatus`].
617    tx: tokio::sync::watch::Sender<ProcStatus>,
618    /// Watch receiver seed. `watch()` clones this so callers can
619    /// `borrow()` the current status and `changed().await` future
620    /// transitions independently.
621    rx: tokio::sync::watch::Receiver<ProcStatus>,
622}
623
624impl fmt::Debug for BootstrapProcHandle {
625    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
626        let status = self.status.lock().expect("status mutex poisoned").clone();
627        f.debug_struct("BootstrapProcHandle")
628            .field("proc_id", &self.proc_id)
629            .field("status", &status)
630            .field("child", &"<child>")
631            .field("tx", &"<watch::Sender>")
632            .field("rx", &"<watch::Receiver>")
633            // Intentionally skip stdout_tailer / stderr_tailer (not
634            // Debug).
635            .finish()
636    }
637}
638
639// Locking invariant:
640// - Do not acquire other locks from inside `transition(...)` (it
641//   holds the status mutex).
642// - If you need the child PID during a transition, snapshot it
643//   *before* calling `transition`.
644impl BootstrapProcHandle {
645    /// Construct a new [`BootstrapProcHandle`] for a freshly spawned
646    /// OS process hosting a proc.
647    ///
648    /// - Initializes the status to [`ProcStatus::Starting`] since the
649    ///   child process has been created but not yet confirmed running.
650    /// - Wraps the provided [`Child`] handle in an
651    ///   `Arc<Mutex<Option<Child>>>` so lifecycle methods (`wait`,
652    ///   `terminate`, etc.) can consume it later.
653    ///
654    /// This is the canonical entry point used by
655    /// `BootstrapProcManager` when it launches a proc into a new
656    /// process.
657    pub fn new(proc_id: ProcId, child: Child) -> Self {
658        let (tx, rx) = watch::channel(ProcStatus::Starting);
659        Self {
660            proc_id,
661            status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
662            child: Arc::new(std::sync::Mutex::new(Some(child))),
663            stdout_tailer: Arc::new(std::sync::Mutex::new(None)),
664            stderr_tailer: Arc::new(std::sync::Mutex::new(None)),
665            tx,
666            rx,
667        }
668    }
669
670    /// Return the logical proc identity in the mesh.
671    #[inline]
672    pub fn proc_id(&self) -> &ProcId {
673        &self.proc_id
674    }
675
676    /// Create a new subscription to this proc's status stream.
677    ///
678    /// Each call returns a fresh [`watch::Receiver`] tied to this
679    /// handle's internal [`ProcStatus`] channel. The receiver can be
680    /// awaited on (`rx.changed().await`) to observe lifecycle
681    /// transitions as they occur.
682    ///
683    /// Notes:
684    /// - Multiple subscribers can exist simultaneously; each sees
685    ///   every status update in order.
686    /// - Use [`BootstrapProcHandle::status`] for a one-off snapshot;
687    ///   use `watch()` when you need to await changes over time.
688    #[inline]
689    pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
690        self.rx.clone()
691    }
692
693    /// Wait until this proc's status changes.
694    ///
695    /// This is a convenience wrapper around
696    /// [`watch::Receiver::changed`]: it subscribes internally via
697    /// [`BootstrapProcHandle::watch`] and awaits the next transition.
698    /// If no subscribers exist or the channel is closed, this returns
699    /// without error.
700    ///
701    /// Typical usage:
702    /// ```ignore
703    /// handle.changed().await;
704    /// match handle.status() {
705    ///     ProcStatus::Running { .. } => { /* now running */ }
706    ///     ProcStatus::Stopped { .. } => { /* exited */ }
707    ///     _ => {}
708    /// }
709    /// ```
710    #[inline]
711    pub async fn changed(&self) {
712        let _ = self.watch().changed().await;
713    }
714
715    /// Return the OS process ID (`pid`) for this proc.
716    ///
717    /// If the proc is currently `Running`, this returns the cached
718    /// pid even if the `Child` has already been taken by the exit
719    /// monitor. Otherwise, it falls back to `Child::id()` if the
720    /// handle is still present. Returns `None` once the proc has
721    /// exited or if the handle has been consumed.
722    #[inline]
723    pub fn pid(&self) -> Option<u32> {
724        match *self.status.lock().expect("status mutex poisoned") {
725            ProcStatus::Running { pid, .. } | ProcStatus::Ready { pid, .. } => Some(pid),
726            _ => self
727                .child
728                .lock()
729                .expect("child mutex poisoned")
730                .as_ref()
731                .and_then(|c| c.id()),
732        }
733    }
734
735    /// Return a snapshot of the current [`ProcStatus`] for this proc.
736    ///
737    /// This is a *live view* of the lifecycle state as tracked by
738    /// [`BootstrapProcManager`]. It reflects what is currently known
739    /// about the underlying OS process (e.g., `Starting`, `Running`,
740    /// `Stopping`, etc.).
741    ///
742    /// Internally this reads the mutex-guarded status. Use this when
743    /// you just need a synchronous snapshot; use
744    /// [`BootstrapProcHandle::watch`] or
745    /// [`BootstrapProcHandle::changed`] if you want to await
746    /// transitions asynchronously.
747    #[must_use]
748    pub fn status(&self) -> ProcStatus {
749        // Source of truth for now is the mutex. We broadcast via
750        // `watch` in `transition`, but callers that want a
751        // synchronous snapshot should read the guarded value.
752        self.status.lock().expect("status mutex poisoned").clone()
753    }
754
755    /// Atomically apply a state transition while holding the status
756    /// lock, and send the updated value on the watch channel **while
757    /// still holding the lock**. This guarantees the mutex state and
758    /// the broadcast value stay in sync and avoids reordering between
759    /// concurrent transitions.
760    fn transition<F>(&self, f: F) -> bool
761    where
762        F: FnOnce(&mut ProcStatus) -> bool,
763    {
764        let mut guard = self.status.lock().expect("status mutex poisoned");
765        let _before = guard.clone();
766        let changed = f(&mut guard);
767        if changed {
768            // Publish while still holding the lock to preserve order.
769            let _ = self.tx.send(guard.clone());
770        }
771        changed
772    }
773
774    /// Transition this proc into the [`ProcStatus::Running`] state.
775    ///
776    /// Called internally once the child OS process has been spawned
777    /// and we can observe a valid `pid`. Records the `pid` and the
778    /// `started_at` timestamp so that callers can query them later
779    /// via [`BootstrapProcHandle::status`] or
780    /// [`BootstrapProcHandle::pid`].
781    ///
782    /// This is a best-effort marker: it reflects that the process
783    /// exists at the OS level, but does not guarantee that the proc
784    /// has completed bootstrap or is fully ready.
785    pub(crate) fn mark_running(&self, pid: u32, started_at: SystemTime) -> bool {
786        self.transition(|st| match *st {
787            ProcStatus::Starting => {
788                *st = ProcStatus::Running { pid, started_at };
789                true
790            }
791            _ => {
792                tracing::warn!(
793                    "illegal transition: {:?} -> Running; leaving status unchanged",
794                    *st
795                );
796                false
797            }
798        })
799    }
800
801    /// Attempt to transition this proc into the [`ProcStatus::Ready`]
802    /// state.
803    ///
804    /// This records the process ID, start time, listening address,
805    /// and agent once the proc has successfully started and is ready
806    /// to serve.
807    ///
808    /// Returns `true` if the transition succeeded (from `Starting` or
809    /// `Running`), or `false` if the current state did not allow
810    /// moving to `Ready`. In the latter case the state is left
811    /// unchanged and a warning is logged.
812    pub(crate) fn mark_ready(
813        &self,
814        pid: u32,
815        started_at: SystemTime,
816        addr: ChannelAddr,
817        agent: ActorRef<ProcMeshAgent>,
818    ) -> bool {
819        self.transition(|st| match st {
820            ProcStatus::Starting | ProcStatus::Running { .. } => {
821                *st = ProcStatus::Ready {
822                    pid,
823                    started_at,
824                    addr,
825                    agent,
826                };
827                true
828            }
829            _ => {
830                tracing::warn!(
831                    "illegal transition: {:?} -> Ready; leaving status unchanged",
832                    st
833                );
834                false
835            }
836        })
837    }
838
839    /// Record that a stop has been requested for the proc (e.g. a
840    /// graceful shutdown via SIGTERM), but the underlying process has
841    /// not yet fully exited.
842    pub(crate) fn mark_stopping(&self) -> bool {
843        // Avoid taking out additional locks in .transition().
844        let child_pid = self.child_pid_snapshot();
845        let now = hyperactor::clock::RealClock.system_time_now();
846
847        self.transition(|st| match *st {
848            ProcStatus::Running { pid, started_at } => {
849                *st = ProcStatus::Stopping { pid, started_at };
850                true
851            }
852            ProcStatus::Ready {
853                pid, started_at, ..
854            } => {
855                *st = ProcStatus::Stopping { pid, started_at };
856                true
857            }
858            ProcStatus::Starting => {
859                if let Some(pid) = child_pid {
860                    *st = ProcStatus::Stopping {
861                        pid,
862                        started_at: now,
863                    };
864                    true
865                } else {
866                    false
867                }
868            }
869            _ => false,
870        })
871    }
872
873    /// Record that the process has exited normally with the given
874    /// exit code.
875    pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
876        self.transition(|st| match *st {
877            ProcStatus::Starting
878            | ProcStatus::Running { .. }
879            | ProcStatus::Ready { .. }
880            | ProcStatus::Stopping { .. } => {
881                *st = ProcStatus::Stopped {
882                    exit_code,
883                    stderr_tail,
884                };
885                true
886            }
887            _ => {
888                tracing::warn!(
889                    "illegal transition: {:?} -> Stopped; leaving status unchanged",
890                    *st
891                );
892                false
893            }
894        })
895    }
896
897    /// Record that the process was killed by the given signal (e.g.
898    /// SIGKILL, SIGTERM).
899    pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
900        self.transition(|st| match *st {
901            ProcStatus::Starting
902            | ProcStatus::Running { .. }
903            | ProcStatus::Ready { .. }
904            | ProcStatus::Stopping { .. } => {
905                *st = ProcStatus::Killed {
906                    signal,
907                    core_dumped,
908                };
909                true
910            }
911            _ => {
912                tracing::warn!(
913                    "illegal transition: {:?} -> Killed; leaving status unchanged",
914                    *st
915                );
916                false
917            }
918        })
919    }
920
921    /// Record that the proc or its process failed for an unexpected
922    /// reason (bootstrap error, spawn failure, etc.).
923    pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
924        self.transition(|st| match *st {
925            ProcStatus::Starting
926            | ProcStatus::Running { .. }
927            | ProcStatus::Ready { .. }
928            | ProcStatus::Stopping { .. } => {
929                *st = ProcStatus::Failed {
930                    reason: reason.into(),
931                };
932                true
933            }
934            _ => {
935                tracing::warn!(
936                    "illegal transition: {:?} -> Failed; leaving status unchanged",
937                    *st
938                );
939                false
940            }
941        })
942    }
943
944    /// Wait until the proc has reached a terminal state and return
945    /// it.
946    ///
947    /// Terminal means [`ProcStatus::Stopped`],
948    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]. If the
949    /// current status is already terminal, returns immediately.
950    ///
951    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
952    /// owner of the OS process, so you can call `wait()` from
953    /// multiple tasks concurrently.
954    ///
955    /// Implementation detail: listens on this handle's `watch`
956    /// channel. It snapshots the current status, and if not terminal
957    /// awaits the next change. If the channel closes unexpectedly,
958    /// returns the last observed status.
959    ///
960    /// Mirrors `tokio::process::Child::wait()`, but yields the
961    /// higher-level [`ProcStatus`] instead of an `ExitStatus`.
962    #[must_use]
963    pub async fn wait_inner(&self) -> ProcStatus {
964        let mut rx = self.watch();
965        loop {
966            let st = rx.borrow().clone();
967            if st.is_exit() {
968                return st;
969            }
970            // If the channel closes, return the last observed value.
971            if rx.changed().await.is_err() {
972                return st;
973            }
974        }
975    }
976
977    /// Wait until the proc reaches the [`ProcStatus::Ready`] state.
978    ///
979    /// If the proc hits a terminal state ([`ProcStatus::Stopped`],
980    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]) before ever
981    /// becoming `Ready`, this returns
982    /// `Err(ReadyError::Terminal(status))`. If the internal watch
983    /// channel closes unexpectedly, this returns
984    /// `Err(ReadyError::ChannelClosed)`. Otherwise it returns
985    /// `Ok(())` when `Ready` is first observed.
986    ///
987    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
988    /// owner; multiple tasks may await `ready()` concurrently.
989    /// `Stopping` is not treated as terminal here; we continue
990    /// waiting until `Ready` or a terminal state is seen.
991    ///
992    /// Companion to [`BootstrapProcHandle::wait_inner`]:
993    /// `wait_inner()` resolves on exit; `ready_inner()` resolves on
994    /// startup.
995    pub async fn ready_inner(&self) -> Result<(), ReadyError> {
996        let mut rx = self.watch();
997        loop {
998            let st = rx.borrow().clone();
999            match &st {
1000                ProcStatus::Ready { .. } => return Ok(()),
1001                s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1002                _non_terminal => {
1003                    if rx.changed().await.is_err() {
1004                        return Err(ReadyError::ChannelClosed);
1005                    }
1006                }
1007            }
1008        }
1009    }
1010
1011    /// Best-effort, non-blocking snapshot of the child's PID. Locks
1012    /// only the `child` mutex and never touches `status`.
1013    fn child_pid_snapshot(&self) -> Option<u32> {
1014        self.child
1015            .lock()
1016            .expect("child mutex poisoned")
1017            .as_ref()
1018            .and_then(|c| c.id())
1019    }
1020
1021    /// Try to fetch a PID we can signal. Prefer cached PID from
1022    /// status; fall back to Child::id() if still present. None once
1023    /// terminal or if monitor already consumed the Child and we never
1024    /// cached a pid.
1025    fn signalable_pid(&self) -> Option<i32> {
1026        match &*self.status.lock().expect("status mutex poisoned") {
1027            ProcStatus::Running { pid, .. }
1028            | ProcStatus::Ready { pid, .. }
1029            | ProcStatus::Stopping { pid, .. } => Some(*pid as i32),
1030            _ => self
1031                .child
1032                .lock()
1033                .expect("child mutex poisoned")
1034                .as_ref()
1035                .and_then(|c| c.id())
1036                .map(|p| p as i32),
1037        }
1038    }
1039
1040    /// Send a signal to the pid. Returns Ok(()) if delivered. If
1041    /// ESRCH (no such process), we treat that as "already gone" and
1042    /// return Ok(()) so callers can proceed to observe the terminal
1043    /// state through the monitor.
1044    fn send_signal(pid: i32, sig: i32) -> Result<(), anyhow::Error> {
1045        // SAFETY: Sending a POSIX signal; does not dereference
1046        // memory.
1047        let rc = unsafe { libc::kill(pid, sig) };
1048        if rc == 0 {
1049            Ok(())
1050        } else {
1051            let e = std::io::Error::last_os_error();
1052            if let Some(libc::ESRCH) = e.raw_os_error() {
1053                // Process already gone; proceed to wait/observe
1054                // terminal state.
1055                Ok(())
1056            } else {
1057                Err(anyhow::anyhow!("kill({pid}, {sig}) failed: {e}"))
1058            }
1059        }
1060    }
1061
1062    pub fn set_tailers(&self, out: Option<LogTailer>, err: Option<LogTailer>) {
1063        *self
1064            .stdout_tailer
1065            .lock()
1066            .expect("stdout_tailer mutex poisoned") = out;
1067        *self
1068            .stderr_tailer
1069            .lock()
1070            .expect("stderr_tailer mutex poisoned") = err;
1071    }
1072
1073    fn take_tailers(&self) -> (Option<LogTailer>, Option<LogTailer>) {
1074        let out = self
1075            .stdout_tailer
1076            .lock()
1077            .expect("stdout_tailer mutex poisoned")
1078            .take();
1079        let err = self
1080            .stderr_tailer
1081            .lock()
1082            .expect("stderr_tailer mutex poisoned")
1083            .take();
1084        (out, err)
1085    }
1086}
1087
1088#[async_trait]
1089impl hyperactor::host::ProcHandle for BootstrapProcHandle {
1090    type Agent = ProcMeshAgent;
1091    type TerminalStatus = ProcStatus;
1092
1093    #[inline]
1094    fn proc_id(&self) -> &ProcId {
1095        &self.proc_id
1096    }
1097
1098    #[inline]
1099    fn addr(&self) -> Option<ChannelAddr> {
1100        match &*self.status.lock().expect("status mutex poisoned") {
1101            ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1102            _ => None,
1103        }
1104    }
1105
1106    #[inline]
1107    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1108        match &*self.status.lock().expect("status mutex poisoned") {
1109            ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1110            _ => None,
1111        }
1112    }
1113
1114    /// Wait until this proc first reaches the [`ProcStatus::Ready`]
1115    /// state.
1116    ///
1117    /// Returns `Ok(())` once `Ready` is observed.
1118    ///
1119    /// If the proc transitions directly to a terminal state before
1120    /// becoming `Ready`, returns `Err(ReadyError::Terminal(status))`.
1121    ///
1122    /// If the internal status watch closes unexpectedly before
1123    /// `Ready` is observed, returns `Err(ReadyError::ChannelClosed)`.
1124    async fn ready(&self) -> Result<(), hyperactor::host::ReadyError<Self::TerminalStatus>> {
1125        match self.ready_inner().await {
1126            Ok(()) => Ok(()),
1127            Err(ReadyError::Terminal(status)) => {
1128                Err(hyperactor::host::ReadyError::Terminal(status))
1129            }
1130            Err(ReadyError::ChannelClosed) => Err(hyperactor::host::ReadyError::ChannelClosed),
1131        }
1132    }
1133
1134    /// Wait until this proc reaches a terminal [`ProcStatus`].
1135    ///
1136    /// Returns `Ok(status)` when a terminal state is observed
1137    /// (`Stopped`, `Killed`, or `Failed`).
1138    ///
1139    /// If the internal status watch closes before any terminal state
1140    /// is seen, returns `Err(WaitError::ChannelClosed)`.
1141    async fn wait(&self) -> Result<Self::TerminalStatus, hyperactor::host::WaitError> {
1142        let status = self.wait_inner().await;
1143        if status.is_exit() {
1144            Ok(status)
1145        } else {
1146            Err(hyperactor::host::WaitError::ChannelClosed)
1147        }
1148    }
1149
1150    /// Attempt to terminate the underlying OS process.
1151    ///
1152    /// This drives **process-level** teardown only:
1153    /// - Sends `SIGTERM` to the process.
1154    /// - Waits up to `timeout` for it to exit cleanly.
1155    /// - Escalates to `SIGKILL` if still alive, then waits a short
1156    ///   hard-coded grace period (`HARD_WAIT_AFTER_KILL`) to ensure
1157    ///   the process is reaped.
1158    ///
1159    /// If the process was already in a terminal state when called,
1160    /// returns [`TerminateError::AlreadyTerminated`].
1161    ///
1162    /// # Notes
1163    /// - This does *not* attempt a graceful proc-level stop via
1164    ///   `ProcMeshAgent` or other actor messaging. That integration
1165    ///   will come later once proc-level control is wired up.
1166    /// - Errors may also be returned if the process PID cannot be
1167    ///   determined, if signal delivery fails, or if the status
1168    ///   channel is closed unexpectedly.
1169    ///
1170    /// # Parameters
1171    /// - `timeout`: Grace period to wait after `SIGTERM` before
1172    ///   escalating.
1173    ///
1174    /// # Returns
1175    /// - `Ok(ProcStatus)` if the process exited during the
1176    ///   termination sequence.
1177    /// - `Err(TerminateError)` if already exited, signaling failed,
1178    ///   or the channel was lost.
1179    async fn terminate(
1180        &self,
1181        timeout: Duration,
1182    ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1183        const HARD_WAIT_AFTER_KILL: Duration = Duration::from_secs(5);
1184
1185        // If already terminal, return that.
1186        let st0 = self.status();
1187        if st0.is_exit() {
1188            tracing::debug!(?st0, "terminate(): already terminal");
1189            return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1190        }
1191
1192        // Find a PID to signal.
1193        let pid = self.signalable_pid().ok_or_else(|| {
1194            let st = self.status();
1195            tracing::warn!(?st, "terminate(): no signalable pid");
1196            hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1197                "no signalable pid (state: {:?})",
1198                st
1199            ))
1200        })?;
1201
1202        // Best-effort mark "Stopping" (ok if state races).
1203        let _ = self.mark_stopping();
1204
1205        // Send SIGTERM (ESRCH is treated as "already gone").
1206        tracing::info!(pid, ?timeout, "terminate(): sending SIGTERM");
1207        if let Err(e) = Self::send_signal(pid, libc::SIGTERM) {
1208            tracing::warn!(pid, error=%e, "terminate(): SIGTERM delivery failed");
1209            return Err(hyperactor::host::TerminateError::Io(e));
1210        }
1211        tracing::debug!(pid, "terminate(): SIGTERM sent");
1212
1213        // Wait up to the timeout for a terminal state.
1214        match RealClock.timeout(timeout, self.wait_inner()).await {
1215            Ok(st) if st.is_exit() => {
1216                tracing::info!(pid, ?st, "terminate(): exited after SIGTERM");
1217                Ok(st)
1218            }
1219            Ok(non_exit) => {
1220                // wait_inner() should only resolve terminal; treat as
1221                // channel issue.
1222                tracing::warn!(pid, ?non_exit, "terminate(): wait returned non-terminal");
1223                Err(hyperactor::host::TerminateError::ChannelClosed)
1224            }
1225            Err(_elapsed) => {
1226                // Escalate to SIGKILL
1227                tracing::warn!(pid, "terminate(): timeout; escalating to SIGKILL");
1228                if let Some(pid2) = self.signalable_pid() {
1229                    if let Err(e) = Self::send_signal(pid2, libc::SIGKILL) {
1230                        tracing::warn!(pid=pid2, error=%e, "terminate(): SIGKILL delivery failed");
1231                        return Err(hyperactor::host::TerminateError::Io(e));
1232                    }
1233                    tracing::info!(pid = pid2, "terminate(): SIGKILL sent");
1234                } else {
1235                    tracing::warn!("terminate(): lost pid before SIGKILL escalation");
1236                }
1237                // Hard bound after KILL so we can't hang forever.
1238                match RealClock
1239                    .timeout(HARD_WAIT_AFTER_KILL, self.wait_inner())
1240                    .await
1241                {
1242                    Ok(st) if st.is_exit() => {
1243                        tracing::info!(?st, "terminate(): exited after SIGKILL");
1244                        Ok(st)
1245                    }
1246                    other => {
1247                        tracing::warn!(
1248                            ?other,
1249                            "terminate(): post-KILL wait did not yield terminal"
1250                        );
1251                        Err(hyperactor::host::TerminateError::ChannelClosed)
1252                    }
1253                }
1254            }
1255        }
1256    }
1257
1258    /// Forcibly kill the underlying OS process with `SIGKILL`.
1259    ///
1260    /// This bypasses any graceful shutdown semantics and immediately
1261    /// delivers a non-catchable `SIGKILL` to the child. It is
1262    /// intended as a last-resort termination mechanism when
1263    /// `terminate()` fails or when no grace period is desired.
1264    ///
1265    /// # Behavior
1266    /// - If the process was already in a terminal state, returns
1267    ///   [`TerminateError::AlreadyTerminated`].
1268    /// - Otherwise attempts to send `SIGKILL` to the current PID.
1269    /// - Then waits for the exit monitor to observe a terminal state.
1270    ///
1271    /// # Notes
1272    /// - This is strictly an **OS-level kill**. It does *not* attempt
1273    ///   proc-level shutdown via `ProcMeshAgent` or actor messages.
1274    ///   That integration will be layered in later.
1275    /// - Errors may be returned if the PID cannot be determined, if
1276    ///   signal delivery fails, or if the status channel closes
1277    ///   unexpectedly.
1278    ///
1279    /// # Returns
1280    /// - `Ok(ProcStatus)` if the process exited after `SIGKILL`.
1281    /// - `Err(TerminateError)` if already exited, signaling failed,
1282    ///   or the channel was lost.
1283    async fn kill(
1284        &self,
1285    ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1286        // NOTE: This is a pure OS-level kill (SIGKILL) of the child
1287        // process. It bypasses any proc-level shutdown semantics.
1288        //
1289        // Future work: layer in proc-level stop semantics through
1290        // actor messages once those are implemented.
1291
1292        // If already terminal, return that.
1293        let st0 = self.status();
1294        if st0.is_exit() {
1295            return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1296        }
1297
1298        // Try to get a PID and send SIGKILL.
1299        let pid = self.signalable_pid().ok_or_else(|| {
1300            hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1301                "no signalable pid (state: {:?})",
1302                self.status()
1303            ))
1304        })?;
1305
1306        if let Err(e) = Self::send_signal(pid, libc::SIGKILL) {
1307            return Err(hyperactor::host::TerminateError::Io(e));
1308        }
1309
1310        // Wait for exit monitor to record terminal status.
1311        let st = self.wait_inner().await;
1312        if st.is_exit() {
1313            Ok(st)
1314        } else {
1315            Err(hyperactor::host::TerminateError::ChannelClosed)
1316        }
1317    }
1318}
1319
1320/// A specification of the command used to bootstrap procs.
1321#[derive(Debug, Named, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1322pub struct BootstrapCommand {
1323    pub program: PathBuf,
1324    pub arg0: Option<String>,
1325    pub args: Vec<String>,
1326    pub env: HashMap<String, String>,
1327}
1328
1329impl BootstrapCommand {
1330    /// Creates a bootstrap command specification to replicate the
1331    /// invocation of the currently running process.
1332    pub fn current() -> io::Result<Self> {
1333        let mut args: VecDeque<String> = std::env::args().collect();
1334        let arg0 = args.pop_front();
1335
1336        Ok(Self {
1337            program: std::env::current_exe()?,
1338            arg0,
1339            args: args.into(),
1340            env: std::env::vars().collect(),
1341        })
1342    }
1343
1344    /// Bootstrap command used for testing, invoking the Buck-built
1345    /// `monarch/hyperactor_mesh/bootstrap` binary.
1346    ///
1347    /// Intended for integration tests where we need to spawn real
1348    /// bootstrap processes under proc manager control. Not available
1349    /// outside of test builds.
1350    #[cfg(test)]
1351    pub(crate) fn test() -> Self {
1352        Self {
1353            program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1354            arg0: None,
1355            args: vec![],
1356            env: HashMap::new(),
1357        }
1358    }
1359}
1360
1361impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1362    /// Creates a bootstrap command from the provided path.
1363    fn from(s: T) -> Self {
1364        Self {
1365            program: s.into(),
1366            arg0: None,
1367            args: vec![],
1368            env: HashMap::new(),
1369        }
1370    }
1371}
1372
1373/// A process manager for launching and supervising **bootstrap
1374/// processes** (via the [`bootstrap`] entry point).
1375///
1376/// `BootstrapProcManager` is the host-side runtime for external procs
1377/// in a hyperactor mesh. It spawns the configured bootstrap binary,
1378/// forwards each child's stdout/stderr into the logging channel,
1379/// tracks lifecycle state through [`BootstrapProcHandle`] /
1380/// [`ProcStatus`], and ensures best-effort teardown on drop.
1381///
1382/// Internally it maintains two maps:
1383/// - [`children`]: async map from [`ProcId`] to
1384///   [`BootstrapProcHandle`], used for lifecycle queries (`status`) and
1385///   exit monitoring.
1386/// - [`pid_table`]: synchronous map from [`ProcId`] to raw PIDs, used
1387///   in [`Drop`] to synchronously send `SIGKILL` to any still-running
1388///   children.
1389///
1390/// Together these provide both a queryable, real-time status surface
1391/// and a deterministic cleanup path, so no child processes are left
1392/// orphaned if the manager itself is dropped.
1393#[derive(Debug)]
1394pub struct BootstrapProcManager {
1395    /// The command specification used to bootstrap new processes.
1396    command: BootstrapCommand,
1397    /// Async registry of running children, keyed by [`ProcId`]. Holds
1398    /// [`BootstrapProcHandle`]s so callers can query or monitor
1399    /// status.
1400    children: Arc<tokio::sync::Mutex<HashMap<ProcId, BootstrapProcHandle>>>,
1401    /// Synchronous table of raw PIDs, keyed by [`ProcId`]. Used
1402    /// exclusively in the [`Drop`] impl to send `SIGKILL` without
1403    /// needing async context.
1404    pid_table: Arc<std::sync::Mutex<HashMap<ProcId, u32>>>,
1405}
1406
1407impl Drop for BootstrapProcManager {
1408    /// Drop implementation for [`BootstrapProcManager`].
1409    ///
1410    /// Ensures that when the manager itself is dropped, any child
1411    /// processes it spawned are also terminated. This is a
1412    /// best-effort cleanup mechanism: the manager iterates over its
1413    /// recorded PID table and sends `SIGKILL` to each one.
1414    ///
1415    /// Notes:
1416    /// - Uses a synchronous `Mutex` so it can be locked safely in
1417    ///   `Drop` without async context.
1418    /// - Safety: sending `SIGKILL` is low-level but safe here because
1419    ///   it only instructs the OS to terminate the process. The only
1420    ///   risk is PID reuse, in which case an unrelated process might
1421    ///   be signaled. This is accepted for shutdown semantics.
1422    /// - This makes `BootstrapProcManager` robust against leaks:
1423    ///   dropping it should not leave stray child processes running.
1424    fn drop(&mut self) {
1425        if let Ok(table) = self.pid_table.lock() {
1426            for (proc_id, pid) in table.iter() {
1427                // SAFETY: We own these PIDs because they were spawned and recorded
1428                // by this manager. Sending SIGKILL is safe here since it does not
1429                // dereference any memory; it only instructs the OS to terminate the
1430                // process. The worst-case outcome is that the PID has already exited
1431                // and been reused, in which case the signal may affect an unrelated
1432                // process. We accept that risk in Drop semantics because this path
1433                // is only used for cleanup at shutdown.
1434                unsafe {
1435                    libc::kill(*pid as i32, libc::SIGKILL);
1436                }
1437                tracing::info!(
1438                    "BootstrapProcManager::drop: sent SIGKILL to pid {} for {:?}",
1439                    pid,
1440                    proc_id
1441                );
1442            }
1443        }
1444    }
1445}
1446
1447impl BootstrapProcManager {
1448    /// Construct a new [`BootstrapProcManager`] that will launch
1449    /// procs using the given bootstrap command specification.
1450    ///
1451    /// This is the general entry point when you want to manage procs
1452    /// backed by a specific binary path (e.g. a bootstrap
1453    /// trampoline).
1454    pub(crate) fn new(command: BootstrapCommand) -> Self {
1455        Self {
1456            command,
1457            children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1458            pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
1459        }
1460    }
1461
1462    /// The bootstrap command used to launch processes.
1463    pub fn command(&self) -> &BootstrapCommand {
1464        &self.command
1465    }
1466
1467    /// Return the current [`ProcStatus`] for the given [`ProcId`], if
1468    /// the proc is known to this manager.
1469    ///
1470    /// This querprocies the live [`BootstrapProcHandle`] stored in the
1471    /// manager's internal map. It provides an immediate snapshot of
1472    /// lifecycle state (`Starting`, `Running`, `Stopping`, `Stopped`,
1473    /// etc.).
1474    ///
1475    /// Returns `None` if the manager has no record of the proc (e.g.
1476    /// never spawned here, or entry already removed).
1477    pub async fn status(&self, proc_id: &ProcId) -> Option<ProcStatus> {
1478        self.children.lock().await.get(proc_id).map(|h| h.status())
1479    }
1480
1481    fn spawn_exit_monitor(&self, proc_id: ProcId, handle: BootstrapProcHandle) {
1482        let pid_table = Arc::clone(&self.pid_table);
1483
1484        let (stdout_tailer, stderr_tailer) = handle.take_tailers();
1485
1486        let maybe_child = {
1487            let mut guard = handle.child.lock().expect("child mutex");
1488            let taken = guard.take();
1489            debug_assert!(guard.is_none(), "no Child should remain in handles");
1490            taken
1491        };
1492
1493        let Some(mut child) = maybe_child else {
1494            tracing::debug!("proc {proc_id}: child was already taken; skipping wait");
1495            return;
1496        };
1497
1498        tokio::spawn(async move {
1499            let wait_res = child.wait().await;
1500
1501            let mut stderr_tail: Vec<String> = Vec::new();
1502            if let Some(t) = stderr_tailer {
1503                let (lines, _bytes) = t.join().await;
1504                stderr_tail = lines;
1505            }
1506            if let Some(t) = stdout_tailer {
1507                let (_lines, _bytes) = t.join().await;
1508            }
1509
1510            match wait_res {
1511                Ok(status) => {
1512                    if let Some(sig) = status.signal() {
1513                        let _ = handle.mark_killed(sig, status.core_dumped());
1514                        if let Ok(mut table) = pid_table.lock() {
1515                            table.remove(&proc_id);
1516                        }
1517                        if stderr_tail.is_empty() {
1518                            tracing::debug!("proc {proc_id} killed by signal {sig}");
1519                        } else {
1520                            let tail = stderr_tail.join("\n");
1521                            tracing::debug!(
1522                                "proc {proc_id} killed by signal {sig}; stderr tail:\n{tail}"
1523                            );
1524                        }
1525                    } else if let Some(code) = status.code() {
1526                        let _ = handle.mark_stopped(code, stderr_tail.clone());
1527                        if let Ok(mut table) = pid_table.lock() {
1528                            table.remove(&proc_id);
1529                        }
1530                        let tail_str = if stderr_tail.is_empty() {
1531                            None
1532                        } else {
1533                            Some(stderr_tail.join("\n"))
1534                        };
1535                        if code == 0 {
1536                            tracing::debug!(%proc_id, exit_code = code, tail = tail_str.as_deref(), "proc exited");
1537                        } else {
1538                            tracing::info!(%proc_id, exit_code = code, tail = tail_str.as_deref(), "proc exited");
1539                        }
1540                    } else {
1541                        debug_assert!(
1542                            false,
1543                            "unreachable: process terminated with neither signal nor exit code"
1544                        );
1545                        tracing::error!(
1546                            "proc {proc_id}: unreachable exit status (no code, no signal)"
1547                        );
1548                        let _ = handle.mark_failed("process exited with unknown status");
1549                        if let Ok(mut table) = pid_table.lock() {
1550                            table.remove(&proc_id);
1551                        }
1552                        if stderr_tail.is_empty() {
1553                            tracing::warn!("proc {proc_id} unknown exit");
1554                        } else {
1555                            let tail = stderr_tail.join("\n");
1556                            tracing::warn!("proc {proc_id} unknown exit; stderr tail:\n{tail}");
1557                        }
1558                    }
1559                }
1560                Err(e) => {
1561                    let _ = handle.mark_failed(format!("wait_inner() failed: {e}"));
1562                    if let Ok(mut table) = pid_table.lock() {
1563                        table.remove(&proc_id);
1564                    }
1565                    if stderr_tail.is_empty() {
1566                        tracing::info!("proc {proc_id} wait failed");
1567                    } else {
1568                        let tail = stderr_tail.join("\n");
1569                        tracing::info!("proc {proc_id} wait failed; stderr tail:\n{tail}");
1570                    }
1571                }
1572            }
1573        });
1574    }
1575}
1576
1577#[async_trait]
1578impl ProcManager for BootstrapProcManager {
1579    type Handle = BootstrapProcHandle;
1580
1581    /// Return the [`ChannelTransport`] used by this proc manager.
1582    ///
1583    /// For `BootstrapProcManager` this is always
1584    /// [`ChannelTransport::Unix`], since all procs are spawned
1585    /// locally on the same host and communicate over Unix domain
1586    /// sockets.
1587    fn transport(&self) -> ChannelTransport {
1588        ChannelTransport::Unix
1589    }
1590
1591    /// Launch a new proc under this [`BootstrapProcManager`].
1592    ///
1593    /// Spawns the configured bootstrap binary (`self.program`) in a
1594    /// fresh child process. The environment is populated with
1595    /// variables that describe the bootstrap context — most
1596    /// importantly `HYPERACTOR_MESH_BOOTSTRAP_MODE`, which carries a
1597    /// base64-encoded JSON [`Bootstrap::Proc`] payload (proc id,
1598    /// backend addr, callback addr, optional config snapshot).
1599    /// Additional variables like `BOOTSTRAP_LOG_CHANNEL` are also set
1600    /// up for logging and control.
1601    ///
1602    /// Responsibilities performed here:
1603    /// - Create a one-shot callback channel so the child can confirm
1604    ///   successful bootstrap and return its mailbox address plus agent
1605    ///   reference.
1606    /// - Spawn the OS process with stdout/stderr piped.
1607    /// - Stamp the new [`BootstrapProcHandle`] as
1608    ///   [`ProcStatus::Running`] once a PID is observed.
1609    /// - Wire stdout/stderr pipes into local writers and forward them
1610    ///   over the logging channel (`BOOTSTRAP_LOG_CHANNEL`).
1611    /// - Insert the handle into the manager's children map and start
1612    ///   an exit monitor to track process termination.
1613    ///
1614    /// Returns a [`BootstrapProcHandle`] that exposes the child
1615    /// process's lifecycle (status, wait/ready, termination). Errors
1616    /// are surfaced as [`HostError`].
1617    async fn spawn(
1618        &self,
1619        proc_id: ProcId,
1620        backend_addr: ChannelAddr,
1621    ) -> Result<Self::Handle, HostError> {
1622        let (callback_addr, mut callback_rx) =
1623            channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1624
1625        let cfg = hyperactor::config::global::attrs();
1626
1627        let mode = Bootstrap::Proc {
1628            proc_id: proc_id.clone(),
1629            backend_addr,
1630            callback_addr,
1631            config: Some(cfg),
1632        };
1633        let mut cmd = Command::new(&self.command.program);
1634        if let Some(arg0) = &self.command.arg0 {
1635            cmd.arg0(arg0);
1636        }
1637        for arg in &self.command.args {
1638            cmd.arg(arg);
1639        }
1640        for (k, v) in &self.command.env {
1641            cmd.env(k, v);
1642        }
1643        cmd.env(
1644            "HYPERACTOR_MESH_BOOTSTRAP_MODE",
1645            mode.to_env_safe_string()
1646                .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?,
1647        )
1648        .stdout(Stdio::piped())
1649        .stderr(Stdio::piped());
1650
1651        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
1652        cmd.env(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
1653
1654        let child = cmd
1655            .spawn()
1656            .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
1657        let pid = child.id().unwrap_or_default();
1658
1659        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
1660
1661        if let Some(pid) = handle.pid() {
1662            handle.mark_running(pid, hyperactor::clock::RealClock.system_time_now());
1663            if let Ok(mut table) = self.pid_table.lock() {
1664                table.insert(proc_id.clone(), pid);
1665            }
1666        }
1667
1668        // Writers: tee to local (stdout/stderr or file) + send over
1669        // channel
1670        let (out_writer, err_writer) = create_log_writers(0, log_channel.clone(), pid)
1671            .unwrap_or_else(|_| (Box::new(tokio::io::stdout()), Box::new(tokio::io::stderr())));
1672
1673        let mut stdout_tailer: Option<LogTailer> = None;
1674        let mut stderr_tailer: Option<LogTailer> = None;
1675
1676        // Take the pipes from the child.
1677        {
1678            let mut guard = handle.child.lock().expect("child mutex poisoned");
1679            if let Some(child) = guard.as_mut() {
1680                // LogTailer::tee forwards to our writers and keeps a
1681                // tail buffer.
1682                let max_tail_lines = hyperactor::config::global::get(MESH_TAIL_LOG_LINES);
1683                if let Some(out) = child.stdout.take() {
1684                    stdout_tailer = Some(LogTailer::tee(max_tail_lines, out, out_writer));
1685                }
1686                if let Some(err) = child.stderr.take() {
1687                    stderr_tailer = Some(LogTailer::tee(max_tail_lines, err, err_writer));
1688                }
1689                // Make the tailers visible to the exit monitor.
1690                handle.set_tailers(stdout_tailer.take(), stderr_tailer.take());
1691            } else {
1692                tracing::debug!("proc {proc_id}: child already taken before wiring IO");
1693            }
1694        }
1695
1696        // Retain handle for lifecycle mgt.
1697        {
1698            let mut children = self.children.lock().await;
1699            children.insert(proc_id.clone(), handle.clone());
1700        }
1701
1702        // Kick off an exit monitor that updates ProcStatus when the
1703        // OS process ends
1704        self.spawn_exit_monitor(proc_id.clone(), handle.clone());
1705
1706        let h = handle.clone();
1707        let pid_table = Arc::clone(&self.pid_table);
1708        tokio::spawn(async move {
1709            match callback_rx.recv().await {
1710                Ok((addr, agent)) => {
1711                    let pid = match h.pid() {
1712                        Some(p) => p,
1713                        None => {
1714                            tracing::warn!("mark_ready called with missing pid; using 0");
1715                            0
1716                        }
1717                    };
1718                    let started_at = RealClock.system_time_now();
1719                    let _ = h.mark_ready(pid, started_at, addr, agent);
1720                }
1721                Err(e) => {
1722                    // Child never called back; record failure.
1723                    let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
1724                    // Cleanup pid table entry if it was set.
1725                    if let Ok(mut table) = pid_table.lock() {
1726                        table.remove(&proc_id);
1727                    }
1728                }
1729            }
1730        });
1731
1732        // Callers do `handle.read().await` for mesh readiness.
1733        Ok(handle)
1734    }
1735}
1736
1737#[async_trait]
1738impl hyperactor::host::BulkTerminate for BootstrapProcManager {
1739    /// Attempt to gracefully terminate all child procs managed by
1740    /// this `BootstrapProcManager`.
1741    ///
1742    /// Each child handle is asked to `terminate(timeout)`, which
1743    /// sends SIGTERM, waits up to the deadline, and escalates to
1744    /// SIGKILL if necessary. Termination is attempted concurrently,
1745    /// with at most `max_in_flight` tasks running at once.
1746    ///
1747    /// Returns a [`TerminateSummary`] with counts of how many procs
1748    /// were attempted, how many successfully terminated (including
1749    /// those that were already terminal), and how many failed.
1750    ///
1751    /// Logs a warning for each failure.
1752    async fn terminate_all(&self, timeout: Duration, max_in_flight: usize) -> TerminateSummary {
1753        // Snapshot to avoid holding the lock across awaits.
1754        let handles: Vec<BootstrapProcHandle> = {
1755            let guard = self.children.lock().await;
1756            guard.values().cloned().collect()
1757        };
1758
1759        let attempted = handles.len();
1760        let mut ok = 0usize;
1761
1762        let results = stream::iter(handles.into_iter().map(|h| async move {
1763            match h.terminate(timeout).await {
1764                Ok(_) | Err(hyperactor::host::TerminateError::AlreadyTerminated(_)) => {
1765                    // Treat "already terminal" as success.
1766                    true
1767                }
1768                Err(e) => {
1769                    tracing::warn!(error=%e, "terminate_all: failed to terminate child");
1770                    false
1771                }
1772            }
1773        }))
1774        .buffer_unordered(max_in_flight.max(1))
1775        .collect::<Vec<bool>>()
1776        .await;
1777
1778        for r in results {
1779            if r {
1780                ok += 1;
1781            }
1782        }
1783
1784        TerminateSummary {
1785            attempted,
1786            ok,
1787            failed: attempted.saturating_sub(ok),
1788        }
1789    }
1790}
1791
1792/// Entry point to processes managed by hyperactor_mesh. Any process that is part
1793/// of a hyperactor_mesh program should call [`bootstrap`], which then configures
1794/// the process according to how it is invoked.
1795///
1796/// If bootstrap returns any error, it is defunct from the point of view of hyperactor_mesh,
1797/// and the process should likely exit:
1798///
1799/// ```ignore
1800/// let err = hyperactor_mesh::bootstrap().await;
1801/// tracing::error("could not bootstrap mesh process: {}", err);
1802/// std::process::exit(1);
1803/// ```
1804///
1805/// Use [`bootstrap_or_die`] to implement this behavior directly.
1806pub async fn bootstrap() -> anyhow::Error {
1807    let boot = ok!(Bootstrap::get_from_env()).unwrap_or_else(Bootstrap::default);
1808    boot.bootstrap().await
1809}
1810
1811/// Bootstrap a v0 proc mesh. This launches a control process that responds to
1812/// Allocator2Process messages, conveying its own state in Process2Allocator messages.
1813///
1814/// The bootstrapping process is controlled by the
1815/// following environment variables:
1816///
1817/// - `HYPERACTOR_MESH_BOOTSTRAP_ADDR`: the channel address to which Process2Allocator messages
1818///   should be sent.
1819/// - `HYPERACTOR_MESH_INDEX`: an index used to identify this process to the allocator.
1820async fn bootstrap_v0_proc_mesh() -> anyhow::Error {
1821    pub async fn go() -> Result<(), anyhow::Error> {
1822        let procs = Arc::new(tokio::sync::Mutex::new(Vec::<Proc>::new()));
1823        let procs_for_cleanup = procs.clone();
1824        let _cleanup_guard = hyperactor::register_signal_cleanup_scoped(Box::pin(async move {
1825            for proc_to_stop in procs_for_cleanup.lock().await.iter_mut() {
1826                if let Err(err) = proc_to_stop
1827                    .destroy_and_wait::<()>(Duration::from_millis(10), None)
1828                    .await
1829                {
1830                    tracing::error!(
1831                        "error while stopping proc {}: {}",
1832                        proc_to_stop.proc_id(),
1833                        err
1834                    );
1835                }
1836            }
1837        }));
1838
1839        let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
1840            .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
1841            .parse()?;
1842        let bootstrap_index: usize = std::env::var(BOOTSTRAP_INDEX_ENV)
1843            .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))?
1844            .parse()?;
1845        let listen_addr = ChannelAddr::any(bootstrap_addr.transport());
1846        let (serve_addr, mut rx) = channel::serve(listen_addr)?;
1847        let tx = channel::dial(bootstrap_addr.clone())?;
1848
1849        let (rtx, mut return_channel) = oneshot::channel();
1850        tx.try_post(
1851            Process2Allocator(bootstrap_index, Process2AllocatorMessage::Hello(serve_addr)),
1852            rtx,
1853        )?;
1854        tokio::spawn(exit_if_missed_heartbeat(bootstrap_index, bootstrap_addr));
1855
1856        let mut the_msg;
1857
1858        tokio::select! {
1859            msg = rx.recv() => {
1860                the_msg = msg;
1861            }
1862            returned_msg = &mut return_channel => {
1863                match returned_msg {
1864                    Ok(msg) => {
1865                        return Err(anyhow::anyhow!("Hello message was not delivered:{:?}", msg));
1866                    }
1867                    Err(_) => {
1868                        the_msg = rx.recv().await;
1869                    }
1870                }
1871            }
1872        }
1873        loop {
1874            let _ = hyperactor::tracing::info_span!("wait_for_next_message_from_mesh_agent");
1875            match the_msg? {
1876                Allocator2Process::StartProc(proc_id, listen_transport) => {
1877                    let (proc, mesh_agent) = ProcMeshAgent::bootstrap(proc_id.clone()).await?;
1878                    let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?;
1879                    let handle = proc.clone().serve(proc_rx);
1880                    drop(handle); // linter appeasement; it is safe to drop this future
1881                    tx.send(Process2Allocator(
1882                        bootstrap_index,
1883                        Process2AllocatorMessage::StartedProc(
1884                            proc_id.clone(),
1885                            mesh_agent.bind(),
1886                            proc_addr,
1887                        ),
1888                    ))
1889                    .await?;
1890                    procs.lock().await.push(proc);
1891                }
1892                Allocator2Process::StopAndExit(code) => {
1893                    tracing::info!("stopping procs with code {code}");
1894                    {
1895                        for proc_to_stop in procs.lock().await.iter_mut() {
1896                            if let Err(err) = proc_to_stop
1897                                .destroy_and_wait::<()>(Duration::from_millis(10), None)
1898                                .await
1899                            {
1900                                tracing::error!(
1901                                    "error while stopping proc {}: {}",
1902                                    proc_to_stop.proc_id(),
1903                                    err
1904                                );
1905                            }
1906                        }
1907                    }
1908                    tracing::info!("exiting with {code}");
1909                    std::process::exit(code);
1910                }
1911                Allocator2Process::Exit(code) => {
1912                    tracing::info!("exiting with {code}");
1913                    std::process::exit(code);
1914                }
1915            }
1916            the_msg = rx.recv().await;
1917        }
1918    }
1919
1920    go().await.unwrap_err()
1921}
1922
1923/// A variant of [`bootstrap`] that logs the error and exits the process
1924/// if bootstrapping fails.
1925pub async fn bootstrap_or_die() -> ! {
1926    let err = bootstrap().await;
1927    let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
1928    tracing::error!("failed to bootstrap mesh process: {}", err);
1929    std::process::exit(1)
1930}
1931
1932#[derive(enum_as_inner::EnumAsInner)]
1933enum DebugSink {
1934    File(std::fs::File),
1935    Sink,
1936}
1937
1938impl DebugSink {
1939    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1940        match self {
1941            DebugSink::File(f) => f.write(buf),
1942            DebugSink::Sink => Ok(buf.len()),
1943        }
1944    }
1945    fn flush(&mut self) -> io::Result<()> {
1946        match self {
1947            DebugSink::File(f) => f.flush(),
1948            DebugSink::Sink => Ok(()),
1949        }
1950    }
1951}
1952
1953fn debug_sink() -> &'static Mutex<DebugSink> {
1954    static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
1955    DEBUG_SINK.get_or_init(|| {
1956        let debug_path = {
1957            let mut p = std::env::temp_dir();
1958            if let Ok(user) = std::env::var("USER") {
1959                p.push(user);
1960            }
1961            std::fs::create_dir_all(&p).ok();
1962            p.push("monarch-bootstrap-debug.log");
1963            p
1964        };
1965        let sink = if debug_path.exists() {
1966            match OpenOptions::new()
1967                .append(true)
1968                .create(true)
1969                .open(debug_path.clone())
1970            {
1971                Ok(f) => DebugSink::File(f),
1972                Err(_e) => {
1973                    eprintln!(
1974                        "failed to open {} for bootstrap debug logging",
1975                        debug_path.display()
1976                    );
1977                    DebugSink::Sink
1978                }
1979            }
1980        } else {
1981            DebugSink::Sink
1982        };
1983        Mutex::new(sink)
1984    })
1985}
1986
1987/// If true, send `Debug` messages to stderr.
1988const DEBUG_TO_STDERR: bool = false;
1989
1990/// A bootstrap specific debug writer. If the file /tmp/monarch-bootstrap-debug.log
1991/// exists, then the writer's destination is that file; otherwise it discards all writes.
1992struct Debug;
1993
1994impl Debug {
1995    fn is_active() -> bool {
1996        DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
1997    }
1998}
1999
2000impl Write for Debug {
2001    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2002        let res = debug_sink().lock().unwrap().write(buf);
2003        if DEBUG_TO_STDERR {
2004            let n = match res {
2005                Ok(n) => n,
2006                Err(_) => buf.len(),
2007            };
2008            let _ = io::stderr().write_all(&buf[..n]);
2009        }
2010
2011        res
2012    }
2013    fn flush(&mut self) -> io::Result<()> {
2014        let res = debug_sink().lock().unwrap().flush();
2015        if DEBUG_TO_STDERR {
2016            let _ = io::stderr().flush();
2017        }
2018        res
2019    }
2020}
2021
2022#[cfg(test)]
2023mod tests {
2024    use std::path::PathBuf;
2025    use std::process::Stdio;
2026
2027    use hyperactor::ActorId;
2028    use hyperactor::ActorRef;
2029    use hyperactor::ProcId;
2030    use hyperactor::WorldId;
2031    use hyperactor::channel::ChannelAddr;
2032    use hyperactor::channel::ChannelTransport;
2033    use hyperactor::clock::RealClock;
2034    use hyperactor::context::Mailbox as _;
2035    use hyperactor::host::ProcHandle;
2036    use hyperactor::id;
2037    use ndslice::Extent;
2038    use ndslice::ViewExt;
2039    use ndslice::extent;
2040    use tokio::io;
2041    use tokio::process::Command;
2042
2043    use super::*;
2044    use crate::alloc::AllocSpec;
2045    use crate::alloc::Allocator;
2046    use crate::alloc::ProcessAllocator;
2047    use crate::v1::ActorMesh;
2048    use crate::v1::host_mesh::HostMesh;
2049    use crate::v1::testactor;
2050
2051    // Helper: Avoid repeating
2052    // `ChannelAddr::any(ChannelTransport::Unix)`.
2053    fn any_addr_for_test() -> ChannelAddr {
2054        ChannelAddr::any(ChannelTransport::Unix)
2055    }
2056
2057    #[test]
2058    fn test_bootstrap_mode_env_string_none_config_proc() {
2059        let values = [
2060            Bootstrap::default(),
2061            Bootstrap::Proc {
2062                proc_id: id!(foo[0]),
2063                backend_addr: ChannelAddr::any(ChannelTransport::Tcp),
2064                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2065                config: None,
2066            },
2067        ];
2068
2069        for value in values {
2070            let safe = value.to_env_safe_string().unwrap();
2071            let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2072
2073            // Re-encode and compare: deterministic round-trip of the
2074            // wire format.
2075            let safe2 = round.to_env_safe_string().unwrap();
2076            assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2077
2078            // Sanity: the decoded variant is what we expect.
2079            match (&value, &round) {
2080                (Bootstrap::Proc { config: None, .. }, Bootstrap::Proc { config: None, .. }) => {}
2081                (Bootstrap::V0ProcMesh, Bootstrap::V0ProcMesh) => {}
2082                _ => panic!("decoded variant mismatch: got {:?}", round),
2083            }
2084        }
2085    }
2086
2087    #[test]
2088    fn test_bootstrap_mode_env_string_none_config_host() {
2089        let value = Bootstrap::Host {
2090            addr: ChannelAddr::any(ChannelTransport::Unix),
2091            command: None,
2092            config: None,
2093        };
2094
2095        let safe = value.to_env_safe_string().unwrap();
2096        let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2097
2098        // Wire-format round-trip should be identical.
2099        let safe2 = round.to_env_safe_string().unwrap();
2100        assert_eq!(safe, safe2);
2101
2102        // Sanity: decoded variant is Host with None config.
2103        match round {
2104            Bootstrap::Host { config: None, .. } => {}
2105            other => panic!("expected Host with None config, got {:?}", other),
2106        }
2107    }
2108
2109    #[test]
2110    fn test_bootstrap_mode_env_string_invalid() {
2111        // Not valid base64
2112        assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2113    }
2114
2115    #[test]
2116    fn test_bootstrap_env_roundtrip_with_config_proc_and_host() {
2117        // Build a small, distinctive Attrs snapshot.
2118        let mut attrs = Attrs::new();
2119        attrs[MESH_TAIL_LOG_LINES] = 123;
2120        attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2121
2122        // Proc case
2123        {
2124            let original = Bootstrap::Proc {
2125                proc_id: id!(foo[42]),
2126                backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2127                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2128                config: Some(attrs.clone()),
2129            };
2130            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2131            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2132            match &decoded {
2133                Bootstrap::Proc { config, .. } => {
2134                    let cfg = config.as_ref().expect("expected Some(attrs)");
2135                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2136                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2137                }
2138                other => panic!("unexpected variant after roundtrip: {:?}", other),
2139            }
2140        }
2141
2142        // Host case
2143        {
2144            let original = Bootstrap::Host {
2145                addr: ChannelAddr::any(ChannelTransport::Unix),
2146                command: None,
2147                config: Some(attrs.clone()),
2148            };
2149            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2150            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2151            match &decoded {
2152                Bootstrap::Host { config, .. } => {
2153                    let cfg = config.as_ref().expect("expected Some(attrs)");
2154                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2155                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2156                }
2157                other => panic!("unexpected variant after roundtrip: {:?}", other),
2158            }
2159        }
2160    }
2161
2162    #[tokio::test]
2163    async fn test_child_terminated_on_manager_drop() {
2164        use std::path::PathBuf;
2165        use std::process::Stdio;
2166
2167        use tokio::process::Command;
2168        use tokio::time::Duration;
2169
2170        // Manager; program path is irrelevant for this test.
2171        let command = BootstrapCommand {
2172            program: PathBuf::from("/bin/true"),
2173            ..Default::default()
2174        };
2175        let manager = BootstrapProcManager::new(command);
2176
2177        // Spawn a long-running child process (sleep 30) with
2178        // kill_on_drop(true).
2179        let mut cmd = Command::new("/bin/sleep");
2180        cmd.arg("30")
2181            .stdout(Stdio::null())
2182            .stderr(Stdio::null())
2183            .kill_on_drop(true);
2184
2185        let child = cmd.spawn().expect("spawn sleep");
2186        let pid = child.id().expect("pid");
2187
2188        // Insert into the manager's children map (simulates a spawned
2189        // proc).
2190        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "test".to_string());
2191        {
2192            let mut children = manager.children.lock().await;
2193            children.insert(
2194                proc_id.clone(),
2195                BootstrapProcHandle::new(proc_id.clone(), child),
2196            );
2197        }
2198
2199        // (Linux-only) Verify the process exists before drop.
2200        #[cfg(target_os = "linux")]
2201        {
2202            let path = format!("/proc/{}", pid);
2203            assert!(
2204                std::fs::metadata(&path).is_ok(),
2205                "expected /proc/{pid} to exist before drop"
2206            );
2207        }
2208
2209        // Drop the manager. We don't rely on `Child::kill_on_drop`.
2210        // The exit monitor owns the Child; the manager's Drop uses
2211        // `pid_table` to best-effort SIGKILL any recorded PIDs. This
2212        // should terminate the child.
2213        drop(manager);
2214
2215        // Poll until the /proc entry disappears OR the state is
2216        // Zombie.
2217        #[cfg(target_os = "linux")]
2218        {
2219            let deadline = std::time::Instant::now() + Duration::from_millis(1500);
2220            let proc_dir = format!("/proc/{}", pid);
2221            let status_file = format!("{}/status", proc_dir);
2222
2223            let mut ok = false;
2224            while std::time::Instant::now() < deadline {
2225                match std::fs::read_to_string(&status_file) {
2226                    Ok(s) => {
2227                        if let Some(state_line) = s.lines().find(|l| l.starts_with("State:")) {
2228                            if state_line.contains('Z') {
2229                                // Only 'Z' (Zombie) is acceptable.
2230                                ok = true;
2231                                break;
2232                            } else {
2233                                // Still alive (e.g. 'R', 'S', etc.). Give it more time.
2234                            }
2235                        }
2236                    }
2237                    Err(_) => {
2238                        // /proc/<pid>/status no longer exists -> fully gone
2239                        ok = true;
2240                        break;
2241                    }
2242                }
2243                RealClock.sleep(Duration::from_millis(100)).await;
2244            }
2245
2246            assert!(ok, "expected /proc/{pid} to be gone or zombie after drop");
2247        }
2248
2249        // On non-Linux, absence of panics/hangs is the signal; PID
2250        // probing is platform-specific.
2251    }
2252
2253    #[tokio::test]
2254    async fn test_v1_child_logging() {
2255        use hyperactor::channel;
2256        use hyperactor::data::Serialized;
2257        use hyperactor::mailbox::BoxedMailboxSender;
2258        use hyperactor::mailbox::DialMailboxRouter;
2259        use hyperactor::mailbox::MailboxServer;
2260        use hyperactor::proc::Proc;
2261
2262        use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2263        use crate::logging::LogClientActor;
2264        use crate::logging::LogClientMessageClient;
2265        use crate::logging::LogForwardActor;
2266        use crate::logging::LogMessage;
2267        use crate::logging::OutputTarget;
2268        use crate::logging::test_tap;
2269
2270        let router = DialMailboxRouter::new();
2271        let (proc_addr, proc_rx) =
2272            channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2273        let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
2274        proc.clone().serve(proc_rx);
2275        router.bind(id!(client[0]).into(), proc_addr.clone());
2276        let (client, _handle) = proc.instance("client").unwrap();
2277
2278        let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2279        test_tap::install(tap_tx);
2280
2281        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2282        // SAFETY: unit-test scoped env var
2283        unsafe {
2284            std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2285        }
2286
2287        // Spawn the log client and disable aggregation (immediate
2288        // print + tap push).
2289        let log_client: ActorRef<LogClientActor> =
2290            proc.spawn("log_client", ()).await.unwrap().bind();
2291        log_client.set_aggregate(&client, None).await.unwrap();
2292
2293        // Spawn the forwarder in this proc (it will serve
2294        // BOOTSTRAP_LOG_CHANNEL).
2295        let _log_forwarder: ActorRef<LogForwardActor> = proc
2296            .spawn("log_forwarder", log_client.clone())
2297            .await
2298            .unwrap()
2299            .bind();
2300
2301        // Dial the channel but don't post until we know the forwarder
2302        // is receiving.
2303        let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2304
2305        // Send a fake log message as if it came from the proc
2306        // manager's writer.
2307        tx.post(LogMessage::Log {
2308            hostname: "testhost".into(),
2309            pid: 12345,
2310            output_target: OutputTarget::Stdout,
2311            payload: Serialized::serialize(&"hello from child".to_string()).unwrap(),
2312        });
2313
2314        // Assert we see it via the tap.
2315        let line = RealClock
2316            .timeout(Duration::from_secs(2), tap_rx.recv())
2317            .await
2318            .expect("timed out waiting for log line")
2319            .expect("tap channel closed unexpectedly");
2320        assert!(
2321            line.contains("hello from child"),
2322            "log line did not appear via LogClientActor; got: {line}"
2323        );
2324    }
2325
2326    mod proc_handle {
2327
2328        use hyperactor::ActorId;
2329        use hyperactor::ActorRef;
2330        use hyperactor::ProcId;
2331        use hyperactor::WorldId;
2332        use hyperactor::host::ProcHandle;
2333        use tokio::process::Command;
2334
2335        use super::super::*;
2336        use super::any_addr_for_test;
2337
2338        // Helper: build a ProcHandle with a short-lived child
2339        // process. We don't rely on the actual process; we only
2340        // exercise the status transitions.
2341        fn handle_for_test() -> BootstrapProcHandle {
2342            // Spawn a trivial child that exits immediately.
2343            let child = Command::new("sh")
2344                .arg("-c")
2345                .arg("exit 0")
2346                .stdin(std::process::Stdio::null())
2347                .stdout(std::process::Stdio::null())
2348                .stderr(std::process::Stdio::null())
2349                .spawn()
2350                .expect("failed to spawn test child process");
2351
2352            let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
2353            BootstrapProcHandle::new(proc_id, child)
2354        }
2355
2356        #[tokio::test]
2357        async fn starting_to_running_ok() {
2358            let h = handle_for_test();
2359            assert!(matches!(h.status(), ProcStatus::Starting));
2360            let child_pid = h.pid().expect("child should have a pid");
2361            let child_started_at = RealClock.system_time_now();
2362            assert!(h.mark_running(child_pid, child_started_at));
2363            match h.status() {
2364                ProcStatus::Running { pid, started_at } => {
2365                    assert_eq!(pid, child_pid);
2366                    assert_eq!(started_at, child_started_at);
2367                }
2368                other => panic!("expected Running, got {other:?}"),
2369            }
2370        }
2371
2372        #[tokio::test]
2373        async fn running_to_stopping_to_stopped_ok() {
2374            let h = handle_for_test();
2375            let child_pid = h.pid().expect("child should have a pid");
2376            let child_started_at = RealClock.system_time_now();
2377            assert!(h.mark_running(child_pid, child_started_at));
2378            assert!(h.mark_stopping());
2379            assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2380            assert!(h.mark_stopped(0, Vec::new()));
2381            assert!(matches!(
2382                h.status(),
2383                ProcStatus::Stopped { exit_code: 0, .. }
2384            ));
2385        }
2386
2387        #[tokio::test]
2388        async fn running_to_killed_ok() {
2389            let h = handle_for_test();
2390            let child_pid = h.pid().expect("child should have a pid");
2391            let child_started_at = RealClock.system_time_now();
2392            assert!(h.mark_running(child_pid, child_started_at));
2393            assert!(h.mark_killed(9, true));
2394            assert!(matches!(
2395                h.status(),
2396                ProcStatus::Killed {
2397                    signal: 9,
2398                    core_dumped: true
2399                }
2400            ));
2401        }
2402
2403        #[tokio::test]
2404        async fn running_to_failed_ok() {
2405            let h = handle_for_test();
2406            let child_pid = h.pid().expect("child should have a pid");
2407            let child_started_at = RealClock.system_time_now();
2408            assert!(h.mark_running(child_pid, child_started_at));
2409            assert!(h.mark_failed("bootstrap error"));
2410            match h.status() {
2411                ProcStatus::Failed { reason } => {
2412                    assert_eq!(reason, "bootstrap error");
2413                }
2414                other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2415            }
2416        }
2417
2418        #[tokio::test]
2419        async fn illegal_transitions_are_rejected() {
2420            let h = handle_for_test();
2421            let child_pid = h.pid().expect("child should have a pid");
2422            let child_started_at = RealClock.system_time_now();
2423            // Starting -> Running is fine; second Running should be rejected.
2424            assert!(h.mark_running(child_pid, child_started_at));
2425            assert!(!h.mark_running(child_pid, RealClock.system_time_now()));
2426            match h.status() {
2427                ProcStatus::Running { pid, .. } => assert_eq!(pid, child_pid),
2428                other => panic!("expected Running, got {other:?}"),
2429            }
2430            // Once Stopped, we can't go to Running/Killed/Failed/etc.
2431            assert!(h.mark_stopping());
2432            assert!(h.mark_stopped(0, Vec::new()));
2433            assert!(!h.mark_running(child_pid, child_started_at));
2434            assert!(!h.mark_killed(9, false));
2435            assert!(!h.mark_failed("nope"));
2436
2437            assert!(matches!(
2438                h.status(),
2439                ProcStatus::Stopped { exit_code: 0, .. }
2440            ));
2441        }
2442
2443        #[tokio::test]
2444        async fn transitions_from_ready_are_legal() {
2445            let h = handle_for_test();
2446            let addr = any_addr_for_test();
2447            // Mark Running.
2448            let pid = h.pid().expect("child should have a pid");
2449            let t0 = RealClock.system_time_now();
2450            assert!(h.mark_running(pid, t0));
2451            // Build a consistent AgentRef for Ready using the
2452            // handle's ProcId.
2453            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2454            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2455            let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2456            // Ready -> Stopping -> Stopped should be legal.
2457            assert!(h.mark_ready(pid, t0, addr, agent_ref));
2458            assert!(h.mark_stopping());
2459            assert!(h.mark_stopped(0, Vec::new()));
2460        }
2461
2462        #[tokio::test]
2463        async fn ready_to_killed_is_legal() {
2464            let h = handle_for_test();
2465            let addr = any_addr_for_test();
2466            // Starting -> Running
2467            let pid = h.pid().expect("child should have a pid");
2468            let t0 = RealClock.system_time_now();
2469            assert!(h.mark_running(pid, t0));
2470            // Build a consistent AgentRef for Ready using the
2471            // handle's ProcId.
2472            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2473            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2474            let agent: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2475            // Running -> Ready
2476            assert!(h.mark_ready(pid, t0, addr, agent));
2477            // Ready -> Killed
2478            assert!(h.mark_killed(9, false));
2479        }
2480
2481        #[tokio::test]
2482        async fn mark_stopping_from_starting_uses_child_pid_when_available() {
2483            let h = handle_for_test();
2484
2485            // In `Starting`, we should still be able to see the PID via
2486            // Child::id().
2487            let child_pid = h
2488                .pid()
2489                .expect("Child::id() should be available in Starting");
2490
2491            // mark_stopping() should transition to Stopping{pid,
2492            // started_at} and stash the same pid we observed.
2493            assert!(
2494                h.mark_stopping(),
2495                "mark_stopping() should succeed from Starting"
2496            );
2497            match h.status() {
2498                ProcStatus::Stopping { pid, started_at } => {
2499                    assert_eq!(pid, child_pid, "Stopping pid should come from Child::id()");
2500                    assert!(
2501                        started_at <= RealClock.system_time_now(),
2502                        "started_at should be sane"
2503                    );
2504                }
2505                other => panic!("expected Stopping{{..}}; got {other:?}"),
2506            }
2507        }
2508
2509        #[tokio::test]
2510        async fn mark_stopping_noop_when_no_child_pid_available() {
2511            let h = handle_for_test();
2512
2513            // Simulate the exit-monitor having already taken the
2514            // Child so there is no Child::id() and we haven't cached
2515            // a pid yet.
2516            {
2517                let _ = h.child.lock().expect("child mutex").take();
2518            }
2519
2520            // With no observable pid, mark_stopping() must no-op and
2521            // leave us in Starting.
2522            assert!(
2523                !h.mark_stopping(),
2524                "mark_stopping() should no-op from Starting when no pid is observable"
2525            );
2526            assert!(matches!(h.status(), ProcStatus::Starting));
2527        }
2528
2529        #[tokio::test]
2530        async fn mark_failed_from_stopping_is_allowed() {
2531            let h = handle_for_test();
2532
2533            // Drive Starting -> Stopping (pid available via
2534            // Child::id()).
2535            assert!(h.mark_stopping(), "precondition: to Stopping");
2536
2537            // Now allow Stopping -> Failed.
2538            assert!(
2539                h.mark_failed("boom"),
2540                "mark_failed() should succeed from Stopping"
2541            );
2542            match h.status() {
2543                ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2544                other => panic!("expected Failed(\"boom\"), got {other:?}"),
2545            }
2546        }
2547    }
2548
2549    #[tokio::test]
2550    async fn test_exit_monitor_updates_status_on_clean_exit() {
2551        let command = BootstrapCommand {
2552            program: PathBuf::from("/bin/true"),
2553            ..Default::default()
2554        };
2555        let manager = BootstrapProcManager::new(command);
2556
2557        // Spawn a fast-exiting child.
2558        let mut cmd = Command::new("true");
2559        cmd.stdout(Stdio::null()).stderr(Stdio::null());
2560        let child = cmd.spawn().expect("spawn true");
2561
2562        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "clean".into());
2563        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2564
2565        // Put into manager & start monitor.
2566        {
2567            let mut children = manager.children.lock().await;
2568            children.insert(proc_id.clone(), handle.clone());
2569        }
2570        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2571        {
2572            let guard = handle.child.lock().expect("child mutex");
2573            assert!(
2574                guard.is_none(),
2575                "expected Child to be taken by exit monitor"
2576            );
2577        }
2578
2579        let st = handle.wait_inner().await;
2580        assert!(matches!(st, ProcStatus::Stopped { .. }), "status={st:?}");
2581    }
2582
2583    #[tokio::test]
2584    async fn test_exit_monitor_updates_status_on_kill() {
2585        let command = BootstrapCommand {
2586            program: PathBuf::from("/bin/sleep"),
2587            ..Default::default()
2588        };
2589        let manager = BootstrapProcManager::new(command);
2590
2591        // Spawn a process that will live long enough to kill.
2592        let mut cmd = Command::new("/bin/sleep");
2593        cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
2594        let child = cmd.spawn().expect("spawn sleep");
2595        let pid = child.id().expect("pid") as i32;
2596
2597        // Register with the manager and start the exit monitor.
2598        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "killed".into());
2599        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2600        {
2601            let mut children = manager.children.lock().await;
2602            children.insert(proc_id.clone(), handle.clone());
2603        }
2604        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2605        {
2606            let guard = handle.child.lock().expect("child mutex");
2607            assert!(
2608                guard.is_none(),
2609                "expected Child to be taken by exit monitor"
2610            );
2611        }
2612        // Send SIGKILL to the process.
2613        #[cfg(unix)]
2614        // SAFETY: We own the child process we just spawned and have
2615        // its PID.
2616        // Sending SIGKILL is safe here because:
2617        //  - the PID comes directly from `child.id()`
2618        //  - the process is guaranteed to be in our test scope
2619        //  - we don't touch any memory via this call, only signal the
2620        //    OS
2621        unsafe {
2622            libc::kill(pid, libc::SIGKILL);
2623        }
2624
2625        let st = handle.wait_inner().await;
2626        match st {
2627            ProcStatus::Killed { signal, .. } => assert_eq!(signal, libc::SIGKILL),
2628            other => panic!("expected Killed(SIGKILL), got {other:?}"),
2629        }
2630    }
2631
2632    #[tokio::test]
2633    async fn watch_notifies_on_status_changes() {
2634        let child = Command::new("sh")
2635            .arg("-c")
2636            .arg("sleep 0.1")
2637            .stdin(std::process::Stdio::null())
2638            .stdout(std::process::Stdio::null())
2639            .stderr(std::process::Stdio::null())
2640            .spawn()
2641            .expect("spawn");
2642
2643        let proc_id = ProcId::Ranked(WorldId("test".into()), 1);
2644        let handle = BootstrapProcHandle::new(proc_id, child);
2645        let mut rx = handle.watch();
2646
2647        // Starting -> Running
2648        let pid = handle.pid().unwrap_or(0);
2649        let now = RealClock.system_time_now();
2650        assert!(handle.mark_running(pid, now));
2651        rx.changed().await.ok(); // Observe the transition.
2652        match &*rx.borrow() {
2653            ProcStatus::Running { pid: p, started_at } => {
2654                assert_eq!(*p, pid);
2655                assert_eq!(*started_at, now);
2656            }
2657            s => panic!("expected Running, got {s:?}"),
2658        }
2659
2660        // Running -> Stopped
2661        assert!(handle.mark_stopped(0, Vec::new()));
2662        rx.changed().await.ok(); // Observe the transition.
2663        assert!(matches!(
2664            &*rx.borrow(),
2665            ProcStatus::Stopped { exit_code: 0, .. }
2666        ));
2667    }
2668
2669    #[tokio::test]
2670    async fn ready_errs_if_process_exits_before_running() {
2671        // Child exits immediately.
2672        let child = Command::new("sh")
2673            .arg("-c")
2674            .arg("exit 7")
2675            .stdin(std::process::Stdio::null())
2676            .stdout(std::process::Stdio::null())
2677            .stderr(std::process::Stdio::null())
2678            .spawn()
2679            .expect("spawn");
2680
2681        let proc_id = ProcId::Direct(
2682            ChannelAddr::any(ChannelTransport::Unix),
2683            "early-exit".into(),
2684        );
2685        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2686
2687        // Simulate the exit monitor doing its job directly here.
2688        // (Equivalent outcome: terminal state before Running.)
2689        assert!(handle.mark_stopped(7, Vec::new()));
2690
2691        // `ready()` should return Err with the terminal status.
2692        match handle.ready_inner().await {
2693            Ok(()) => panic!("ready() unexpectedly succeeded"),
2694            Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
2695                assert_eq!(exit_code, 7)
2696            }
2697            Err(other) => panic!("expected Stopped(7), got {other:?}"),
2698        }
2699    }
2700
2701    #[tokio::test]
2702    async fn status_unknown_proc_is_none() {
2703        let manager = BootstrapProcManager::new(BootstrapCommand {
2704            program: PathBuf::from("/bin/true"),
2705            ..Default::default()
2706        });
2707        let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
2708        assert!(manager.status(&unknown).await.is_none());
2709    }
2710
2711    #[tokio::test]
2712    async fn exit_monitor_child_already_taken_leaves_status_unchanged() {
2713        let manager = BootstrapProcManager::new(BootstrapCommand {
2714            program: PathBuf::from("/bin/sleep"),
2715            ..Default::default()
2716        });
2717
2718        // Long-ish child so it's alive while we "steal" it.
2719        let mut cmd = Command::new("/bin/sleep");
2720        cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
2721        let child = cmd.spawn().expect("spawn sleep");
2722
2723        let proc_id = ProcId::Direct(
2724            ChannelAddr::any(ChannelTransport::Unix),
2725            "already-taken".into(),
2726        );
2727        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2728
2729        // Insert, then deliberately consume the Child before starting
2730        // the monitor.
2731        {
2732            let mut children = manager.children.lock().await;
2733            children.insert(proc_id.clone(), handle.clone());
2734        }
2735        {
2736            let _ = handle.child.lock().expect("child mutex").take();
2737        }
2738
2739        // This should not panic; monitor will see None and bail.
2740        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2741
2742        // Status should remain whatever it was (Starting in this
2743        // setup).
2744        assert!(matches!(
2745            manager.status(&proc_id).await,
2746            Some(ProcStatus::Starting)
2747        ));
2748    }
2749
2750    #[tokio::test]
2751    async fn pid_none_after_exit_monitor_takes_child() {
2752        let manager = BootstrapProcManager::new(BootstrapCommand {
2753            program: PathBuf::from("/bin/sleep"),
2754            ..Default::default()
2755        });
2756
2757        let mut cmd = Command::new("/bin/sleep");
2758        cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
2759        let child = cmd.spawn().expect("spawn sleep");
2760
2761        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "pid-none".into());
2762        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2763
2764        // Before monitor: we should still be able to read a pid.
2765        assert!(handle.pid().is_some());
2766        {
2767            let mut children = manager.children.lock().await;
2768            children.insert(proc_id.clone(), handle.clone());
2769        }
2770        // `spawn_exit_monitor` takes the Child synchronously before
2771        // spawning the task.
2772        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2773
2774        // Immediately after, the handle should no longer expose a pid
2775        // (Child is gone).
2776        assert!(handle.pid().is_none());
2777    }
2778
2779    #[tokio::test]
2780    async fn starting_may_directly_be_marked_stopped() {
2781        // Unit-level transition check for the "process exited very
2782        // fast" case.
2783        let child = Command::new("sh")
2784            .arg("-c")
2785            .arg("exit 0")
2786            .stdin(std::process::Stdio::null())
2787            .stdout(std::process::Stdio::null())
2788            .stderr(std::process::Stdio::null())
2789            .spawn()
2790            .expect("spawn true");
2791
2792        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "fast-exit".into());
2793        let handle = BootstrapProcHandle::new(proc_id, child);
2794
2795        // Take the child (don't hold the mutex across an await).
2796        let mut c = {
2797            let mut guard = handle.child.lock().expect("child mutex");
2798            guard.take()
2799        }
2800        .expect("child already taken");
2801
2802        let status = c.wait().await.expect("wait");
2803        let code = status.code().unwrap_or(0);
2804        assert!(handle.mark_stopped(code, Vec::new()));
2805
2806        assert!(matches!(
2807            handle.status(),
2808            ProcStatus::Stopped { exit_code: 0, .. }
2809        ));
2810    }
2811
2812    #[tokio::test]
2813    async fn handle_ready_allows_waiters() {
2814        let child = Command::new("sh")
2815            .arg("-c")
2816            .arg("sleep 0.1")
2817            .stdin(std::process::Stdio::null())
2818            .stdout(std::process::Stdio::null())
2819            .stderr(std::process::Stdio::null())
2820            .spawn()
2821            .expect("spawn sleep");
2822        let proc_id = ProcId::Ranked(WorldId("test".into()), 42);
2823        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2824
2825        let pid = handle.pid().expect("child should have a pid");
2826        let started_at = RealClock.system_time_now();
2827        assert!(handle.mark_running(pid, started_at));
2828
2829        let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2830        let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2831
2832        // Pick any addr to carry in Ready (what the child would have
2833        // called back with).
2834        let ready_addr = any_addr_for_test();
2835
2836        // Stamp Ready and assert ready().await unblocks.
2837        assert!(handle.mark_ready(pid, started_at, ready_addr.clone(), agent_ref));
2838        handle
2839            .ready_inner()
2840            .await
2841            .expect("ready_inner() should complete after Ready");
2842
2843        // Sanity-check the Ready fields we control
2844        // (pid/started_at/addr).
2845        match handle.status() {
2846            ProcStatus::Ready {
2847                pid: p,
2848                started_at: t,
2849                addr: a,
2850                ..
2851            } => {
2852                assert_eq!(p, pid);
2853                assert_eq!(t, started_at);
2854                assert_eq!(a, ready_addr);
2855            }
2856            other => panic!("expected Ready, got {other:?}"),
2857        }
2858    }
2859
2860    #[tokio::test]
2861    async fn pid_behavior_across_states_running_ready_then_stopped() {
2862        let child = Command::new("sh")
2863            .arg("-c")
2864            .arg("sleep 0.1")
2865            .stdin(std::process::Stdio::null())
2866            .stdout(std::process::Stdio::null())
2867            .stderr(std::process::Stdio::null())
2868            .spawn()
2869            .expect("spawn");
2870
2871        let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
2872        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2873
2874        // Running → pid() Some
2875        let pid = handle.pid().expect("initial Child::id");
2876        let t0 = RealClock.system_time_now();
2877        assert!(handle.mark_running(pid, t0));
2878        assert_eq!(handle.pid(), Some(pid));
2879
2880        // Ready → pid() still Some even if Child taken
2881        let addr = any_addr_for_test();
2882        let agent = {
2883            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2884            ActorRef::<ProcMeshAgent>::attest(actor_id)
2885        };
2886        assert!(handle.mark_ready(pid, t0, addr, agent));
2887        {
2888            let _ = handle.child.lock().expect("child mutex").take();
2889        }
2890        assert_eq!(handle.pid(), Some(pid));
2891
2892        // Terminal (Stopped) → pid() None
2893        assert!(handle.mark_stopped(0, Vec::new()));
2894        assert_eq!(handle.pid(), None, "pid() should be None once terminal");
2895    }
2896
2897    #[tokio::test]
2898    async fn pid_is_available_in_ready_even_after_child_taken() {
2899        let child = Command::new("sh")
2900            .arg("-c")
2901            .arg("sleep 0.1")
2902            .stdin(std::process::Stdio::null())
2903            .stdout(std::process::Stdio::null())
2904            .stderr(std::process::Stdio::null())
2905            .spawn()
2906            .expect("spawn");
2907
2908        let proc_id = ProcId::Ranked(WorldId("test".into()), 99);
2909        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2910
2911        // Mark Running.
2912        let pid = handle.pid().expect("child should have pid (via Child::id)");
2913        let started_at = RealClock.system_time_now();
2914        assert!(handle.mark_running(pid, started_at));
2915
2916        // Stamp Ready with synthetic addr/agent (attested).
2917        let addr = any_addr_for_test();
2918        let agent = {
2919            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2920            ActorRef::<ProcMeshAgent>::attest(actor_id)
2921        };
2922        assert!(handle.mark_ready(pid, started_at, addr, agent));
2923
2924        // Simulate exit monitor taking the Child (so Child::id() is
2925        // no longer available).
2926        {
2927            let _ = handle.child.lock().expect("child mutex").take();
2928        }
2929
2930        // **Regression check**: pid() must still return the cached
2931        // pid in Ready.
2932        assert_eq!(handle.pid(), Some(pid), "pid() should be cached in Ready");
2933    }
2934
2935    #[test]
2936    fn display_running_includes_pid_and_uptime() {
2937        let started_at = RealClock.system_time_now() - Duration::from_secs(42);
2938        let st = ProcStatus::Running {
2939            pid: 1234,
2940            started_at,
2941        };
2942
2943        let s = format!("{}", st);
2944        assert!(s.contains("1234"));
2945        assert!(s.contains("Running"));
2946        assert!(s.contains("42s"));
2947    }
2948
2949    #[test]
2950    fn display_ready_includes_pid_and_addr() {
2951        let started_at = RealClock.system_time_now() - Duration::from_secs(5);
2952        let addr = ChannelAddr::any(ChannelTransport::Unix);
2953        let agent =
2954            ActorRef::attest(ProcId::Direct(addr.clone(), "proc".into()).actor_id("agent", 0));
2955
2956        let st = ProcStatus::Ready {
2957            pid: 4321,
2958            started_at,
2959            addr: addr.clone(),
2960            agent,
2961        };
2962
2963        let s = format!("{}", st);
2964        assert!(s.contains("4321")); // pid
2965        assert!(s.contains(&addr.to_string())); // addr
2966        assert!(s.contains("Ready"));
2967    }
2968
2969    #[test]
2970    fn display_stopped_includes_exit_code() {
2971        let st = ProcStatus::Stopped {
2972            exit_code: 7,
2973            stderr_tail: Vec::new(),
2974        };
2975        let s = format!("{}", st);
2976        assert!(s.contains("Stopped"));
2977        assert!(s.contains("7"));
2978    }
2979
2980    #[test]
2981    fn display_other_variants_does_not_panic() {
2982        let samples = vec![
2983            ProcStatus::Starting,
2984            ProcStatus::Stopping {
2985                pid: 42,
2986                started_at: RealClock.system_time_now(),
2987            },
2988            ProcStatus::Ready {
2989                pid: 42,
2990                started_at: RealClock.system_time_now(),
2991                addr: ChannelAddr::any(ChannelTransport::Unix),
2992                agent: ActorRef::attest(
2993                    ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "x".into())
2994                        .actor_id("agent", 0),
2995                ),
2996            },
2997            ProcStatus::Killed {
2998                signal: 9,
2999                core_dumped: false,
3000            },
3001            ProcStatus::Failed {
3002                reason: "boom".into(),
3003            },
3004        ];
3005
3006        for st in samples {
3007            let _ = format!("{}", st); // Just make sure it doesn't panic.
3008        }
3009    }
3010
3011    #[tokio::test]
3012    async fn proc_handle_ready_ok_through_trait() {
3013        let child = Command::new("sh")
3014            .arg("-c")
3015            .arg("sleep 0.1")
3016            .stdin(Stdio::null())
3017            .stdout(Stdio::null())
3018            .stderr(Stdio::null())
3019            .spawn()
3020            .expect("spawn");
3021
3022        let proc_id = ProcId::Direct(any_addr_for_test(), "ph-ready-ok".into());
3023        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3024
3025        // Starting -> Running
3026        let pid = handle.pid().expect("pid");
3027        let t0 = RealClock.system_time_now();
3028        assert!(handle.mark_running(pid, t0));
3029
3030        // Synthesize Ready data
3031        let addr = any_addr_for_test();
3032        let agent: ActorRef<ProcMeshAgent> =
3033            ActorRef::attest(ActorId(proc_id.clone(), "agent".into(), 0));
3034        assert!(handle.mark_ready(pid, t0, addr, agent));
3035
3036        // Call the trait method (not ready_inner).
3037        let r = <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await;
3038        assert!(r.is_ok(), "expected Ok(()), got {r:?}");
3039    }
3040
3041    #[tokio::test]
3042    async fn proc_handle_wait_returns_terminal_status() {
3043        let child = Command::new("sh")
3044            .arg("-c")
3045            .arg("exit 0")
3046            .stdin(Stdio::null())
3047            .stdout(Stdio::null())
3048            .stderr(Stdio::null())
3049            .spawn()
3050            .expect("spawn");
3051
3052        let proc_id = ProcId::Direct(any_addr_for_test(), "ph-wait".into());
3053        let handle = BootstrapProcHandle::new(proc_id, child);
3054
3055        // Drive directly to a terminal state before calling wait()
3056        assert!(handle.mark_stopped(0, Vec::new()));
3057
3058        // Call the trait method (not wait_inner)
3059        let st = <BootstrapProcHandle as hyperactor::host::ProcHandle>::wait(&handle)
3060            .await
3061            .expect("wait should return Ok(terminal)");
3062
3063        match st {
3064            ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
3065            other => panic!("expected Stopped(0), got {other:?}"),
3066        }
3067    }
3068
3069    #[tokio::test]
3070    async fn ready_wrapper_maps_terminal_to_trait_error() {
3071        let child = Command::new("sh")
3072            .arg("-c")
3073            .arg("exit 7")
3074            .stdin(Stdio::null())
3075            .stdout(Stdio::null())
3076            .stderr(Stdio::null())
3077            .spawn()
3078            .expect("spawn");
3079        let proc_id = ProcId::Direct(any_addr_for_test(), "wrap".into());
3080        let handle = BootstrapProcHandle::new(proc_id, child);
3081
3082        assert!(handle.mark_stopped(7, Vec::new()));
3083
3084        match <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await {
3085            Ok(()) => panic!("expected Err"),
3086            Err(hyperactor::host::ReadyError::Terminal(ProcStatus::Stopped {
3087                exit_code, ..
3088            })) => {
3089                assert_eq!(exit_code, 7);
3090            }
3091            Err(e) => panic!("unexpected error: {e:?}"),
3092        }
3093    }
3094
3095    /// Create a ProcId and a host **backend_addr** channel that the
3096    /// bootstrap child proc will dial to attach its mailbox to the
3097    /// host.
3098    ///
3099    /// - `proc_id`: logical identity for the child proc (pure name;
3100    ///   not an OS pid).
3101    /// - `backend_addr`: a mailbox address served by the **parent
3102    ///   (host) proc** here; the spawned bootstrap process dials this
3103    ///   so its messages route via the host.
3104    async fn make_proc_id_and_backend_addr(
3105        instance: &hyperactor::Instance<()>,
3106        _tag: &str,
3107    ) -> (ProcId, ChannelAddr) {
3108        let proc_id = id!(bootstrap_child[0]);
3109
3110        // Serve a Unix channel as the "backend_addr" and hook it into
3111        // this test proc.
3112        let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
3113
3114        // Route messages arriving on backend_addr into this test
3115        // proc's mailbox so the bootstrap child can reach the host
3116        // router.
3117        instance.proc().clone().serve(rx);
3118
3119        (proc_id, backend_addr)
3120    }
3121
3122    #[tokio::test]
3123    async fn bootstrap_handle_terminate_graceful() {
3124        // Create a root direct-addressed proc + client instance.
3125        let root = hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3126            .await
3127            .unwrap();
3128        let (instance, _handle) = root.instance("client").unwrap();
3129
3130        let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3131        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3132        let handle = mgr
3133            .spawn(proc_id.clone(), backend_addr.clone())
3134            .await
3135            .expect("spawn bootstrap child");
3136
3137        handle.ready().await.expect("ready");
3138
3139        let deadline = Duration::from_secs(2);
3140        match RealClock
3141            .timeout(deadline * 2, handle.terminate(deadline))
3142            .await
3143        {
3144            Err(_) => panic!("terminate() future hung"),
3145            Ok(Ok(st)) => {
3146                match st {
3147                    ProcStatus::Stopped { exit_code, .. } => {
3148                        // child called exit(0) on SIGTERM
3149                        assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3150                    }
3151                    ProcStatus::Killed { signal, .. } => {
3152                        // If the child didn't trap SIGTERM, we'd see
3153                        // SIGTERM (15) here and indeed, this is what
3154                        // we see. Since we call
3155                        // `hyperactor::initialize_with_current_runtime();`
3156                        // we seem unable to trap `SIGTERM` and
3157                        // instead folly intercepts:
3158                        // [0] *** Aborted at 1758850539 (Unix time, try 'date -d @1758850539') ***
3159                        // [0] *** Signal 15 (SIGTERM) (0x3951c00173692) received by PID 1527420 (pthread TID 0x7f803de66cc0) (linux TID 1527420) (maybe from PID 1521298, UID 234780) (code: 0), stack trace: ***
3160                        // [0]     @ 000000000000e713 folly::symbolizer::(anonymous namespace)::innerSignalHandler(int, siginfo_t*, void*)
3161                        // [0]                        ./fbcode/folly/debugging/symbolizer/SignalHandler.cpp:485
3162                        // It gets worse. When run with
3163                        // '@fbcode//mode/dev-nosan' it terminates
3164                        // with a SEGFAULT (metamate says this is a
3165                        // well known issue at Meta). So, TL;DR I
3166                        // restore default `SIGTERM` handling after
3167                        // the test exe has called
3168                        // `initialize_with_runtime`.
3169                        assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3170                    }
3171                    other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3172                }
3173            }
3174            Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3175        }
3176    }
3177
3178    #[tokio::test]
3179    async fn bootstrap_handle_kill_forced() {
3180        // Root proc + client instance (so the child can dial back).
3181        let root = hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3182            .await
3183            .unwrap();
3184        let (instance, _handle) = root.instance("client").unwrap();
3185
3186        let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3187
3188        // Proc identity + host backend channel the child will dial.
3189        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3190
3191        // Launch the child bootstrap process.
3192        let handle = mgr
3193            .spawn(proc_id.clone(), backend_addr.clone())
3194            .await
3195            .expect("spawn bootstrap child");
3196
3197        // Wait until the child reports Ready (addr+agent returned via
3198        // callback).
3199        handle.ready().await.expect("ready");
3200
3201        // Force-kill the child and assert we observe a Killed
3202        // terminal status.
3203        let deadline = Duration::from_secs(5);
3204        match RealClock.timeout(deadline, handle.kill()).await {
3205            Err(_) => panic!("kill() future hung"),
3206            Ok(Ok(st)) => {
3207                // We expect a KILLED terminal state.
3208                match st {
3209                    ProcStatus::Killed { signal, .. } => {
3210                        // On Linux this should be SIGKILL (9).
3211                        assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3212                    }
3213                    other => panic!("expected Killed status after kill(); got: {other:?}"),
3214                }
3215            }
3216            Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3217        }
3218    }
3219
3220    #[tokio::test]
3221    async fn bootstrap_cannonical_simple() {
3222        // SAFETY: unit-test scoped
3223        unsafe {
3224            std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false");
3225        }
3226        // Create a "root" direct addressed proc.
3227        let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3228            .await
3229            .unwrap();
3230        // Create an actor instance we'll use to send and receive
3231        // messages.
3232        let (instance, _handle) = proc.instance("client").unwrap();
3233
3234        // Configure a ProcessAllocator with the bootstrap binary.
3235        let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3236            "monarch/hyperactor_mesh/bootstrap",
3237        )));
3238        // Request a new allocation of procs from the ProcessAllocator.
3239        let alloc = allocator
3240            .allocate(AllocSpec {
3241                extent: extent!(replicas = 1),
3242                constraints: Default::default(),
3243                proc_name: None,
3244                transport: ChannelTransport::Unix,
3245            })
3246            .await
3247            .unwrap();
3248
3249        // Build a HostMesh with explicit OS-process boundaries (per
3250        // rank):
3251        //
3252        // (1) Allocator → bootstrap proc [OS process #1]
3253        //     `ProcMesh::allocate(..)` starts one OS process per
3254        //     rank; each runs our runtime and the trampoline actor.
3255        //
3256        // (2) Host::serve(..) sets up a Host in the same OS process
3257        //     (no new process). It binds front/back channels, creates
3258        //     an in-process service proc (`Proc::new(..)`), and
3259        //     stores the `BootstrapProcManager` for later spawns.
3260        //
3261        // (3) Install HostMeshAgent (still no new OS process).
3262        //     `host.system_proc().spawn::<HostMeshAgent>("agent",
3263        //     host).await?` creates the HostMeshAgent actor in that
3264        //     service proc.
3265        //
3266        // (4) Collect & assemble. The trampoline returns a
3267        //     direct-addressed `ActorRef<HostMeshAgent>`; we collect
3268        //     one per rank and assemble a `HostMesh`.
3269        //
3270        // Note: When the Host is later asked to start a proc
3271        // (`host.spawn(name)`), it calls `ProcManager::spawn` on the
3272        // stored `BootstrapProcManager`, which does a
3273        // `Command::spawn()` to launch a new OS child process for
3274        // that proc.
3275        let host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3276            .await
3277            .unwrap();
3278
3279        // Spawn a ProcMesh named "p0" on the host mesh:
3280        //
3281        // (1) Each HostMeshAgent (running inside its host's service
3282        // proc) receives the request.
3283        //
3284        // (2) The Host calls into its `BootstrapProcManager::spawn`,
3285        // which does `Command::spawn()` to launch a brand-new OS
3286        // process for the proc.
3287        //
3288        // (3) Inside that new process, bootstrap runs and a
3289        // `ProcMeshAgent` is started to manage it.
3290        //
3291        // (4) We collect the per-host procs into a `ProcMesh` and
3292        // return it.
3293        let proc_mesh = host_mesh
3294            .spawn(&instance, "p0", Extent::unity())
3295            .await
3296            .unwrap();
3297
3298        // Note: There is no support for status() in v1.
3299        // assert!(proc_mesh.status(&instance).await.is_err());
3300        // let proc_ref = proc_mesh.values().next().expect("one proc");
3301        // assert!(proc_ref.status(&instance).await.unwrap(), "proc should be alive");
3302
3303        // Spawn an `ActorMesh<TestActor>` named "a0" on the proc mesh:
3304        //
3305        // (1) For each proc (already running in its own OS process),
3306        // the `ProcMeshAgent` receives the request.
3307        //
3308        // (2) It spawns a `TestActor` inside that existing proc (no
3309        // new OS process).
3310        //
3311        // (3) The per-proc actors are collected into an
3312        // `ActorMesh<TestActor>` and returned.
3313        let actor_mesh: ActorMesh<testactor::TestActor> =
3314            proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3315
3316        // Open a fresh port on the client instance and send a
3317        // GetActorId message to the actor mesh. Each TestActor will
3318        // reply with its actor ID to the bound port. Receive one
3319        // reply and assert it matches the ID of the (single) actor in
3320        // the mesh.
3321        let (port, mut rx) = instance.mailbox().open_port();
3322        actor_mesh
3323            .cast(&instance, testactor::GetActorId(port.bind()))
3324            .unwrap();
3325        let got_id = rx.recv().await.unwrap();
3326        assert_eq!(
3327            got_id,
3328            actor_mesh.values().next().unwrap().actor_id().clone()
3329        );
3330
3331        // **Important**: If we don't shutdown the hosts, the
3332        // BootstrapProcManager's won't send SIGKILLs to their spawned
3333        // children and there will be orphans left behind.
3334        host_mesh.shutdown(&instance).await.expect("host shutdown");
3335    }
3336
3337    #[tokio::test]
3338    async fn exit_tail_is_attached_and_logged() {
3339        // Spawn a child that writes to stderr then exits 7.
3340        let mut cmd = Command::new("sh");
3341        cmd.arg("-c")
3342            .arg("printf 'boom-1\\nboom-2\\n' 1>&2; exit 7")
3343            .stdout(Stdio::piped())
3344            .stderr(Stdio::piped());
3345
3346        let child = cmd.spawn().expect("spawn");
3347
3348        // Build a BootstrapProcHandle around this child (like
3349        // manager.spawn() does).
3350        let proc_id = hyperactor::id!(testproc[0]);
3351        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3352
3353        // Wire tailers + dummy writers (stdout/stderr -> sinks), then
3354        // stash them on the handle.
3355        {
3356            // Lock the child to get its pipes.
3357            let mut guard = handle.child.lock().expect("child mutex poisoned");
3358            if let Some(child) = guard.as_mut() {
3359                let out = child.stdout.take().expect("child stdout must be piped");
3360                let err = child.stderr.take().expect("child stderr must be piped");
3361
3362                // Use sinks as our "writers" (we don't care about
3363                // forwarding in this test)
3364                let out_writer: Box<dyn io::AsyncWrite + Send + Unpin> = Box::new(io::sink());
3365                let err_writer: Box<dyn io::AsyncWrite + Send + Unpin> = Box::new(io::sink());
3366
3367                // Create the tailers (they spawn background drainers).
3368                let max_tail_lines = 3;
3369                let out_tailer = LogTailer::tee(max_tail_lines, out, out_writer);
3370                let err_tailer = LogTailer::tee(max_tail_lines, err, err_writer);
3371
3372                // Make them visible to the exit monitor (so it can
3373                // join on exit).
3374                handle.set_tailers(Some(out_tailer), Some(err_tailer));
3375            } else {
3376                panic!("child already taken before wiring tailers");
3377            }
3378        }
3379
3380        // Start an exit monitor (consumes the Child and tailers from
3381        // the handle).
3382        let manager = BootstrapProcManager::new(BootstrapCommand {
3383            program: std::path::PathBuf::from("/bin/true"), // unused in this test
3384            ..Default::default()
3385        });
3386        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3387
3388        // Await terminal status and assert on exit code and stderr
3389        // tail.
3390        let st = RealClock
3391            .timeout(Duration::from_secs(2), handle.wait_inner())
3392            .await
3393            .expect("wait_inner() timed out (exit monitor stuck?)");
3394        match st {
3395            ProcStatus::Stopped {
3396                exit_code,
3397                stderr_tail,
3398            } => {
3399                assert_eq!(
3400                    exit_code, 7,
3401                    "unexpected exit code; stderr_tail={:?}",
3402                    stderr_tail
3403                );
3404                let tail = stderr_tail.join("\n");
3405                assert!(tail.contains("boom-1"), "missing boom-1; tail:\n{tail}");
3406                assert!(tail.contains("boom-2"), "missing boom-2; tail:\n{tail}");
3407            }
3408            other => panic!("expected Stopped(7), got {other:?}"),
3409        }
3410    }
3411}