hyperactor_mesh/
bootstrap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::env::VarError;
12use std::fmt;
13use std::fs::OpenOptions;
14use std::future;
15use std::io;
16use std::io::Write;
17use std::os::unix::process::ExitStatusExt;
18use std::path::Path;
19use std::path::PathBuf;
20use std::process::Stdio;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::OnceLock;
24use std::time::Duration;
25use std::time::SystemTime;
26
27use async_trait::async_trait;
28use base64::prelude::*;
29use futures::StreamExt;
30use futures::stream;
31use humantime::format_duration;
32use hyperactor::ActorHandle;
33use hyperactor::ActorId;
34use hyperactor::ActorRef;
35use hyperactor::ProcId;
36use hyperactor::channel;
37use hyperactor::channel::ChannelAddr;
38use hyperactor::channel::ChannelError;
39use hyperactor::channel::ChannelTransport;
40use hyperactor::channel::Rx;
41use hyperactor::channel::Tx;
42use hyperactor::clock::Clock;
43use hyperactor::clock::RealClock;
44use hyperactor::context;
45use hyperactor::host::Host;
46use hyperactor::host::HostError;
47use hyperactor::host::ProcHandle;
48use hyperactor::host::ProcManager;
49use hyperactor::host::TerminateSummary;
50use hyperactor::mailbox::BoxableMailboxSender;
51use hyperactor::mailbox::IntoBoxedMailboxSender;
52use hyperactor::mailbox::MailboxClient;
53use hyperactor::mailbox::MailboxServer;
54use hyperactor::proc::Proc;
55use hyperactor_config::CONFIG;
56use hyperactor_config::ConfigAttr;
57use hyperactor_config::attrs::Attrs;
58use hyperactor_config::attrs::declare_attrs;
59use hyperactor_config::global::override_or_global;
60use serde::Deserialize;
61use serde::Serialize;
62use tempfile::TempDir;
63use tokio::process::Child;
64use tokio::process::ChildStderr;
65use tokio::process::ChildStdout;
66use tokio::process::Command;
67use tokio::sync::oneshot;
68use tokio::sync::watch;
69use tracing::Instrument;
70use tracing::Level;
71use typeuri::Named;
72
73use crate::logging::OutputTarget;
74use crate::logging::StreamFwder;
75use crate::proc_mesh::mesh_agent::ProcMeshAgent;
76use crate::resource;
77use crate::v1;
78use crate::v1::host_mesh::mesh_agent::HostAgentMode;
79use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
80
81mod mailbox;
82
83declare_attrs! {
84    /// Enable forwarding child stdout/stderr over the mesh log
85    /// channel.
86    ///
87    /// When `true` (default): child stdio is piped; [`StreamFwder`]
88    /// mirrors output to the parent console and forwards bytes to the
89    /// log channel so a `LogForwardActor` can receive them.
90    ///
91    /// When `false`: no channel forwarding occurs. Child stdio may
92    /// still be piped if [`MESH_ENABLE_FILE_CAPTURE`] is `true` or
93    /// [`MESH_TAIL_LOG_LINES`] > 0; otherwise the child inherits the
94    /// parent stdio (no interception).
95    ///
96    /// This flag does not affect console mirroring: child output
97    /// always reaches the parent console—either via inheritance (no
98    /// piping) or via [`StreamFwder`] when piping is active.
99    @meta(CONFIG = ConfigAttr {
100        env_name: Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
101        py_name: Some("enable_log_forwarding".to_string()),
102    })
103    pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
104
105    /// When `true`: if stdio is piped, each child's `StreamFwder`
106    /// also forwards lines to a host-scoped `FileAppender` managed by
107    /// the `BootstrapProcManager`. That appender creates exactly two
108    /// files per manager instance—one for stdout and one for
109    /// stderr—and **all** child processes' lines are multiplexed into
110    /// those two files. This can be combined with
111    /// [`MESH_ENABLE_LOG_FORWARDING`] ("stream+local").
112    ///
113    /// Notes:
114    /// - The on-disk files are *aggregate*, not per-process.
115    ///   Disambiguation is via the optional rank prefix (see
116    ///   `PREFIX_WITH_RANK`), which `StreamFwder` prepends to lines
117    ///   before writing.
118    /// - On local runs, file capture is suppressed unless
119    ///   `FORCE_FILE_LOG=true`. In that case `StreamFwder` still
120    ///   runs, but the `FileAppender` may be `None` and no files are
121    ///   written.
122    /// - `MESH_TAIL_LOG_LINES` only controls the in-memory rotating
123    ///   buffer used for peeking—independent of file capture.
124    @meta(CONFIG = ConfigAttr {
125        env_name: Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
126        py_name: Some("enable_file_capture".to_string()),
127    })
128    pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
129
130    /// Maximum number of log lines retained in a proc's stderr/stdout
131    /// tail buffer. Used by [`StreamFwder`] when wiring child
132    /// pipes. Default: 100
133    @meta(CONFIG = ConfigAttr {
134        env_name: Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
135        py_name: Some("tail_log_lines".to_string()),
136    })
137    pub attr MESH_TAIL_LOG_LINES: usize = 0;
138
139    /// If enabled (default), bootstrap child processes install
140    /// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
141    /// parent dies unexpectedly. This is a **production safety net**
142    /// against leaked children; tests usually disable it via
143    /// `std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG",
144    /// "false")`.
145    @meta(CONFIG = ConfigAttr {
146        env_name: Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()),
147        py_name: Some("mesh_bootstrap_enable_pdeathsig".to_string()),
148    })
149    pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
150
151    /// Maximum number of child terminations to run concurrently
152    /// during bulk shutdown. Prevents unbounded spawning of
153    /// termination tasks (which could otherwise spike CPU, I/O, or
154    /// file descriptor load).
155    @meta(CONFIG = ConfigAttr {
156        env_name: Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()),
157        py_name: Some("mesh_terminate_concurrency".to_string()),
158    })
159    pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
160
161    /// Per-child grace window for termination. When a shutdown is
162    /// requested, the manager sends SIGTERM and waits this long for
163    /// the child to exit before escalating to SIGKILL.
164    @meta(CONFIG = ConfigAttr {
165        env_name: Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()),
166        py_name: Some("mesh_terminate_timeout".to_string()),
167    })
168    pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
169}
170
171pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
172pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
173pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
174/// A channel used by each process to receive its own stdout and stderr
175/// Because stdout and stderr can only be obtained by the parent process,
176/// they need to be streamed back to the process.
177pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
178
179/// Messages sent from the process to the allocator. This is an envelope
180/// containing the index of the process (i.e., its "address" assigned by
181/// the allocator), along with the control message in question.
182#[derive(Debug, Clone, Serialize, Deserialize, Named)]
183pub(crate) struct Process2Allocator(pub usize, pub Process2AllocatorMessage);
184wirevalue::register_type!(Process2Allocator);
185
186/// Control messages sent from processes to the allocator.
187#[derive(Debug, Clone, Serialize, Deserialize, Named)]
188pub(crate) enum Process2AllocatorMessage {
189    /// Initialize a process2allocator session. The process is
190    /// listening on the provided channel address, to which
191    /// [`Allocator2Process`] messages are sent.
192    Hello(ChannelAddr),
193
194    /// A proc with the provided ID was started. Its mailbox is
195    /// served at the provided channel address. Procs are started
196    /// after instruction by the allocator through the corresponding
197    /// [`Allocator2Process`] message.
198    StartedProc(ProcId, ActorRef<ProcMeshAgent>, ChannelAddr),
199
200    Heartbeat,
201}
202wirevalue::register_type!(Process2AllocatorMessage);
203
204/// Messages sent from the allocator to a process.
205#[derive(Debug, Clone, Serialize, Deserialize, Named)]
206pub(crate) enum Allocator2Process {
207    /// Request to start a new proc with the provided ID, listening
208    /// to an address on the indicated channel transport.
209    StartProc(ProcId, ChannelTransport),
210
211    /// A request for the process to shut down its procs and exit the
212    /// process with the provided code.
213    StopAndExit(i32),
214
215    /// A request for the process to immediately exit with the provided
216    /// exit code
217    Exit(i32),
218}
219wirevalue::register_type!(Allocator2Process);
220
221async fn exit_if_missed_heartbeat(bootstrap_index: usize, bootstrap_addr: ChannelAddr) {
222    let tx = match channel::dial(bootstrap_addr.clone()) {
223        Ok(tx) => tx,
224
225        Err(err) => {
226            tracing::error!(
227                "Failed to establish heartbeat connection to allocator, exiting! (addr: {:?}): {}",
228                bootstrap_addr,
229                err
230            );
231            std::process::exit(1);
232        }
233    };
234    tracing::info!(
235        "Heartbeat connection established to allocator (idx: {bootstrap_index}, addr: {bootstrap_addr:?})",
236    );
237    loop {
238        RealClock.sleep(Duration::from_secs(5)).await;
239
240        let result = tx
241            .send(Process2Allocator(
242                bootstrap_index,
243                Process2AllocatorMessage::Heartbeat,
244            ))
245            .await;
246
247        if let Err(err) = result {
248            tracing::error!(
249                "Heartbeat failed to allocator, exiting! (addr: {:?}): {}",
250                bootstrap_addr,
251                err
252            );
253            std::process::exit(1);
254        }
255    }
256}
257
258#[macro_export]
259macro_rules! ok {
260    ($expr:expr $(,)?) => {
261        match $expr {
262            Ok(value) => value,
263            Err(e) => return ::anyhow::Error::from(e),
264        }
265    };
266}
267
268async fn halt<R>() -> R {
269    future::pending::<()>().await;
270    unreachable!()
271}
272
273/// Bootstrap a host in this process, returning a handle to the mesh agent.
274///
275/// To obtain the local proc, use `GetLocalProc` on the returned host mesh agent,
276/// then use `GetProc` on the returned proc mesh agent.
277///
278/// - `addr`: the listening address of the host; this is used to bind the frontend address;
279/// - `command`: optional bootstrap command to spawn procs, otherwise [`BootstrapProcManager::current`];
280/// - `config`: optional runtime config overlay.
281pub async fn host(
282    addr: ChannelAddr,
283    command: Option<BootstrapCommand>,
284    config: Option<Attrs>,
285) -> anyhow::Result<ActorHandle<HostMeshAgent>> {
286    if let Some(attrs) = config {
287        hyperactor_config::global::set(hyperactor_config::global::Source::Runtime, attrs);
288        tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
289    } else {
290        tracing::debug!("bootstrap: no config snapshot provided (Host)");
291    }
292
293    let command = match command {
294        Some(command) => command,
295        None => BootstrapCommand::current()?,
296    };
297    let manager = BootstrapProcManager::new(command)?;
298
299    // REMOVE(V0): forward unknown destinations to the default sender.
300    let host = Host::new_with_default(manager, addr, Some(crate::router::global().clone().boxed()))
301        .await?;
302    let addr = host.addr().clone();
303    let system_proc = host.system_proc().clone();
304    let host_mesh_agent = system_proc
305        .spawn::<HostMeshAgent>("agent", HostMeshAgent::new(HostAgentMode::Process(host)))?;
306
307    tracing::info!(
308        "serving host at {}, agent: {}",
309        addr,
310        host_mesh_agent.bind::<HostMeshAgent>()
311    );
312
313    Ok(host_mesh_agent)
314}
315
316/// Bootstrap configures how a mesh process starts up.
317///
318/// Both `Proc` and `Host` variants may include an optional
319/// configuration snapshot (`hyperactor_config::Attrs`). This
320/// snapshot is serialized into the bootstrap payload and made
321/// available to the child. Interpretation and application of that
322/// snapshot is up to the child process; if omitted, the child falls
323/// back to environment/default values.
324#[derive(Clone, Debug, Serialize, Deserialize)]
325pub enum Bootstrap {
326    /// Bootstrap as a "v1" proc
327    Proc {
328        /// The ProcId of the proc to be bootstrapped.
329        proc_id: ProcId,
330        /// The backend address to which messages are forwarded.
331        /// See [`hyperactor::host`] for channel topology details.
332        backend_addr: ChannelAddr,
333        /// The callback address used to indicate successful spawning.
334        callback_addr: ChannelAddr,
335        /// Directory for storing proc socket files. Procs place their sockets
336        /// in this directory, so that they can be looked up by other procs
337        /// for direct transfer.
338        socket_dir_path: PathBuf,
339        /// Optional config snapshot (`hyperactor_config::Attrs`)
340        /// captured by the parent. If present, the child installs it
341        /// as the `ClientOverride` layer so the parent's effective config
342        /// takes precedence over Defaults.
343        config: Option<Attrs>,
344    },
345
346    /// Bootstrap as a "v1" host bootstrap. This sets up a new `Host`,
347    /// managed by a [`crate::v1::host_mesh::mesh_agent::HostMeshAgent`].
348    Host {
349        /// The address on which to serve the host.
350        addr: ChannelAddr,
351        /// If specified, use the provided command instead of
352        /// [`BootstrapCommand::current`].
353        command: Option<BootstrapCommand>,
354        /// Optional config snapshot (`hyperactor_config::Attrs`)
355        /// captured by the parent. If present, the child installs it
356        /// as the `ClientOverride` layer so the parent's effective config
357        /// takes precedence over Defaults.
358        config: Option<Attrs>,
359    },
360
361    /// Bootstrap as a legacy "v0" proc.
362    V0ProcMesh {
363        /// Optional config snapshot (`hyperactor_config::Attrs`)
364        /// captured by the parent. If present, the child installs it
365        /// as the `ClientOverride` layer so the parent's effective config
366        /// takes precedence over Env/Defaults.
367        config: Option<Attrs>,
368    },
369}
370
371impl Default for Bootstrap {
372    fn default() -> Self {
373        Bootstrap::V0ProcMesh { config: None }
374    }
375}
376
377impl Bootstrap {
378    /// Serialize the mode into a environment-variable-safe string by
379    /// base64-encoding its JSON representation.
380    #[allow(clippy::result_large_err)]
381    fn to_env_safe_string(&self) -> v1::Result<String> {
382        Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
383    }
384
385    /// Deserialize the mode from the representation returned by [`to_env_safe_string`].
386    #[allow(clippy::result_large_err)]
387    fn from_env_safe_string(str: &str) -> v1::Result<Self> {
388        let data = BASE64_STANDARD.decode(str)?;
389        let data = std::str::from_utf8(&data)?;
390        Ok(serde_json::from_str(data)?)
391    }
392
393    /// Get a bootstrap configuration from the environment; returns `None`
394    /// if the environment does not specify a boostrap config.
395    pub fn get_from_env() -> anyhow::Result<Option<Self>> {
396        match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
397            Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
398                Ok(mode) => Ok(Some(mode)),
399                Err(e) => {
400                    Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
401                }
402            },
403            Err(VarError::NotPresent) => Ok(None),
404            Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
405        }
406    }
407
408    /// Inject this bootstrap configuration into the environment of the provided command.
409    pub fn to_env(&self, cmd: &mut Command) {
410        cmd.env(
411            "HYPERACTOR_MESH_BOOTSTRAP_MODE",
412            self.to_env_safe_string().unwrap(),
413        );
414    }
415
416    /// Bootstrap this binary according to this configuration.
417    /// This either runs forever, or returns an error.
418    pub async fn bootstrap(self) -> anyhow::Error {
419        tracing::info!(
420            "bootstrapping mesh process: {}",
421            serde_json::to_string(&self).unwrap()
422        );
423
424        if Debug::is_active() {
425            let mut buf = Vec::new();
426            writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
427            #[cfg(unix)]
428            writeln!(
429                &mut buf,
430                "\tparent pid: {}",
431                std::os::unix::process::parent_id()
432            )
433            .unwrap();
434            writeln!(
435                &mut buf,
436                "\tconfig: {}",
437                serde_json::to_string(&self).unwrap()
438            )
439            .unwrap();
440            match std::env::current_exe() {
441                Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
442                Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
443            }
444            writeln!(&mut buf, "\targs:").unwrap();
445            for arg in std::env::args() {
446                writeln!(&mut buf, "\t\t{}", arg).unwrap();
447            }
448            writeln!(&mut buf, "\tenv:").unwrap();
449            for (key, val) in std::env::vars() {
450                writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
451            }
452            let _ = Debug.write(&buf);
453            if let Ok(s) = std::str::from_utf8(&buf) {
454                tracing::info!("{}", s);
455            } else {
456                tracing::info!("{:?}", buf);
457            }
458        }
459
460        match self {
461            Bootstrap::Proc {
462                proc_id,
463                backend_addr,
464                callback_addr,
465                socket_dir_path,
466                config,
467            } => {
468                let entered = tracing::span!(
469                    Level::INFO,
470                    "proc_bootstrap",
471                    %proc_id,
472                    %backend_addr,
473                    %callback_addr,
474                    socket_dir_path = %socket_dir_path.display(),
475                )
476                .entered();
477                if let Some(attrs) = config {
478                    hyperactor_config::global::set(
479                        hyperactor_config::global::Source::ClientOverride,
480                        attrs,
481                    );
482                    tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)");
483                } else {
484                    tracing::debug!("bootstrap: no config snapshot provided (Proc)");
485                }
486
487                if hyperactor_config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
488                    // Safety net: normal shutdown is via
489                    // `host_mesh.shutdown(&instance)`; PR_SET_PDEATHSIG
490                    // is a last-resort guard against leaks if that
491                    // protocol is bypassed.
492                    let _ = install_pdeathsig_kill();
493                } else {
494                    eprintln!("(bootstrap) PDEATHSIG disabled via config");
495                }
496
497                let (local_addr, name) = ok!(proc_id
498                    .as_direct()
499                    .ok_or_else(|| anyhow::anyhow!("invalid proc id type: {}", proc_id)));
500                // TODO provide a direct way to construct these
501                let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
502                let serve_addr = serve_addr.parse().unwrap();
503
504                // The following is a modified host::spawn_proc to support direct
505                // dialing between local procs: 1) we bind each proc to a deterministic
506                // address in socket_dir_path; 2) we use LocalProcDialer to dial these
507                // addresses for local procs.
508                let proc_sender = mailbox::LocalProcDialer::new(
509                    local_addr.clone(),
510                    socket_dir_path,
511                    ok!(MailboxClient::dial(backend_addr)),
512                );
513
514                let proc = Proc::new(proc_id.clone(), proc_sender.into_boxed());
515
516                let agent_handle = ok!(ProcMeshAgent::boot_v1(proc.clone())
517                    .map_err(|e| HostError::AgentSpawnFailure(proc_id, e)));
518
519                let span = entered.exit();
520
521                // Finally serve the proc on the same transport as the backend address,
522                // and call back.
523                let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr));
524                proc.clone().serve(proc_rx);
525                ok!(ok!(channel::dial(callback_addr))
526                    .send((proc_addr, agent_handle.bind::<ProcMeshAgent>()))
527                    .instrument(span)
528                    .await
529                    .map_err(ChannelError::from));
530
531                halt().await
532            }
533            Bootstrap::Host {
534                addr,
535                command,
536                config,
537            } => {
538                ok!(host(addr, command, config).await);
539                halt().await
540            }
541            Bootstrap::V0ProcMesh { config } => bootstrap_v0_proc_mesh(config).await,
542        }
543    }
544
545    /// A variant of [`bootstrap`] that logs the error and exits the process
546    /// if bootstrapping fails.
547    pub async fn bootstrap_or_die(self) -> ! {
548        let err = self.bootstrap().await;
549        tracing::error!("failed to bootstrap mesh process: {}", err);
550        std::process::exit(1)
551    }
552}
553
554/// Install "kill me if parent dies" and close the race window.
555pub fn install_pdeathsig_kill() -> io::Result<()> {
556    #[cfg(target_os = "linux")]
557    {
558        // SAFETY: `getppid()` is a simple libc syscall returning the
559        // parent PID; it has no side effects and does not touch memory.
560        let ppid_before = unsafe { libc::getppid() };
561
562        // SAFETY: Calling into libc; does not dereference memory, just
563        // asks the kernel to deliver SIGKILL on parent death.
564        let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
565        if rc != 0 {
566            return Err(io::Error::last_os_error());
567        }
568
569        // Race-close: if the parent died between our exec and prctl(),
570        // we won't get a signal, so detect that and exit now.
571        //
572        // If the parent PID changed, the parent has died and we've been
573        // reparented. Note: We cannot assume ppid == 1 means the parent
574        // died, as in container environments (e.g., Kubernetes) the parent
575        // may legitimately run as PID 1.
576        // SAFETY: `getppid()` is a simple libc syscall returning the
577        // parent PID; it has no side effects and does not touch memory.
578        let ppid_after = unsafe { libc::getppid() };
579        if ppid_before != ppid_after {
580            std::process::exit(0);
581        }
582    }
583    Ok(())
584}
585
586/// Represents the lifecycle state of a **proc as hosted in an OS
587/// process** managed by `BootstrapProcManager`.
588///
589/// Note: This type is deliberately distinct from [`ProcState`] and
590/// [`ProcStopReason`] (see `alloc.rs`). Those types model allocator
591/// *events* - e.g. "a proc was Created/Running/Stopped" - and are
592/// consumed from an event stream during allocation. By contrast,
593/// [`ProcStatus`] is a **live, queryable view**: it reflects the
594/// current observed status of a running proc, as seen through the
595/// [`BootstrapProcHandle`] API (stop, kill, status).
596///
597/// In short:
598/// - `ProcState`/`ProcStopReason`: historical / event-driven model
599/// - `ProcStatus`: immediate status surface for lifecycle control
600#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
601pub enum ProcStatus {
602    /// The OS process has been spawned but is not yet fully running.
603    /// (Process-level: child handle exists, no confirmation yet.)
604    Starting,
605    /// The OS process is alive and considered running.
606    /// (Process-level: `pid` is known; Proc-level: bootstrap
607    /// may still be running.)
608    Running { pid: u32, started_at: SystemTime },
609    /// Ready means bootstrap has completed and the proc is serving.
610    /// (Process-level: `pid` is known; Proc-level: bootstrap
611    /// completed.)
612    Ready {
613        pid: u32,
614        started_at: SystemTime,
615        addr: ChannelAddr,
616        agent: ActorRef<ProcMeshAgent>,
617    },
618    /// A stop has been requested (SIGTERM, graceful shutdown, etc.),
619    /// but the OS process has not yet fully exited. (Proc-level:
620    /// shutdown in progress; Process-level: still running.)
621    Stopping { pid: u32, started_at: SystemTime },
622    /// The process exited with a normal exit code. (Process-level:
623    /// exit observed.)
624    Stopped {
625        exit_code: i32,
626        stderr_tail: Vec<String>,
627    },
628    /// The process was killed by a signal (e.g. SIGKILL).
629    /// (Process-level: abnormal termination.)
630    Killed { signal: i32, core_dumped: bool },
631    /// The proc or its process failed for some other reason
632    /// (bootstrap error, unexpected condition, etc.). (Both levels:
633    /// catch-all failure.)
634    Failed { reason: String },
635}
636
637impl ProcStatus {
638    /// Returns `true` if the proc is in a terminal (exited) state:
639    /// [`ProcStatus::Stopped`], [`ProcStatus::Killed`], or
640    /// [`ProcStatus::Failed`].
641    #[inline]
642    pub fn is_exit(&self) -> bool {
643        matches!(
644            self,
645            ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
646        )
647    }
648}
649
650impl std::fmt::Display for ProcStatus {
651    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652        match self {
653            ProcStatus::Starting => write!(f, "Starting"),
654            ProcStatus::Running { pid, started_at } => {
655                let uptime = started_at
656                    .elapsed()
657                    .map(|d| format!(" up {}", format_duration(d)))
658                    .unwrap_or_default();
659                write!(f, "Running[{pid}]{uptime}")
660            }
661            ProcStatus::Ready {
662                pid,
663                started_at,
664                addr,
665                ..
666            } => {
667                let uptime = started_at
668                    .elapsed()
669                    .map(|d| format!(" up {}", format_duration(d)))
670                    .unwrap_or_default();
671                write!(f, "Ready[{pid}] at {addr}{uptime}")
672            }
673            ProcStatus::Stopping { pid, started_at } => {
674                let uptime = started_at
675                    .elapsed()
676                    .map(|d| format!(" up {}", format_duration(d)))
677                    .unwrap_or_default();
678                write!(f, "Stopping[{pid}]{uptime}")
679            }
680            ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
681            ProcStatus::Killed {
682                signal,
683                core_dumped,
684            } => {
685                if *core_dumped {
686                    write!(f, "Killed(sig={signal}, core)")
687                } else {
688                    write!(f, "Killed(sig={signal})")
689                }
690            }
691            ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
692        }
693    }
694}
695
696/// Error returned by [`BootstrapProcHandle::ready`].
697#[derive(Debug, Clone)]
698pub enum ReadyError {
699    /// The proc reached a terminal state before `Ready`.
700    Terminal(ProcStatus),
701    /// The internal watch channel closed unexpectedly.
702    ChannelClosed,
703}
704
705impl std::fmt::Display for ReadyError {
706    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707        match self {
708            ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
709            ReadyError::ChannelClosed => write!(f, "status channel closed"),
710        }
711    }
712}
713impl std::error::Error for ReadyError {}
714
715/// A handle to a proc launched by [`BootstrapProcManager`].
716///
717/// `BootstrapProcHandle` is a lightweight supervisor for an external
718/// process: it tracks and broadcasts lifecycle state, and exposes a
719/// small control/observation surface. While it may temporarily hold a
720/// `tokio::process::Child` (shared behind a mutex) so the exit
721/// monitor can `wait()` it, it is **not** the unique owner of the OS
722/// process, and dropping a `BootstrapProcHandle` does not by itself
723/// terminate the process.
724///
725/// What it pairs together:
726/// - the **logical proc identity** (`ProcId`)
727/// - the **live status surface** ([`ProcStatus`]), available both as
728///   a synchronous snapshot (`status()`) and as an async stream via a
729///   `tokio::sync::watch` channel (`watch()` / `changed()`)
730///
731/// Responsibilities:
732/// - Retain the child handle only until the exit monitor claims it,
733///   so the OS process can be awaited and its terminal status
734///   recorded.
735/// - Hold stdout/stderr tailers until the exit monitor takes them,
736///   then join to recover buffered output for diagnostics.
737/// - Update status via the `mark_*` transitions and broadcast changes
738///   over the watch channel so tasks can `await` lifecycle
739///   transitions without polling.
740/// - Provide the foundation for higher-level APIs like `wait()`
741///   (await terminal) and, later, `terminate()` / `kill()`.
742///
743/// Notes:
744/// - Manager-level cleanup happens in [`BootstrapProcManager::drop`]:
745///   it SIGKILLs any still-recorded PIDs; we do not rely on
746///   `Child::kill_on_drop`.
747///
748/// Relationship to types:
749/// - [`ProcStatus`]: live status surface, updated by this handle.
750/// - [`ProcState`]/[`ProcStopReason`] (in `alloc.rs`):
751///   allocator-facing, historical event log; not directly updated by
752///   this type.
753#[derive(Clone)]
754pub struct BootstrapProcHandle {
755    /// Logical identity of the proc in the mesh.
756    proc_id: ProcId,
757    /// Live lifecycle snapshot (see [`ProcStatus`]). Kept in a mutex
758    /// so [`BootstrapProcHandle::status`] can return a synchronous
759    /// copy. All mutations now flow through
760    /// [`BootstrapProcHandle::transition`], which updates this field
761    /// under the lock and then broadcasts on the watch channel.
762    status: Arc<std::sync::Mutex<ProcStatus>>,
763    /// Underlying OS child handle. Held only until the exit monitor
764    /// claims it (consumed by `wait()` there). Not relied on for
765    /// teardown; manager `Drop` handles best-effort SIGKILL.
766    child: Arc<std::sync::Mutex<Option<Child>>>,
767    /// Stdout monitor for this proc. Created with `StreamFwder::start`, it
768    /// forwards output to a log channel and keeps a bounded ring buffer.
769    /// Transferred to the exit monitor, which joins it after `wait()`
770    /// to recover buffered lines.
771    stdout_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
772    /// Stderr monitor for this proc. Same behavior as `stdout_fwder`
773    /// but for stderr (used for exit-reason enrichment).
774    stderr_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
775    /// Watch sender for status transitions. Every `mark_*` goes
776    /// through [`BootstrapProcHandle::transition`], which updates the
777    /// snapshot under the lock and then `send`s the new
778    /// [`ProcStatus`].
779    tx: tokio::sync::watch::Sender<ProcStatus>,
780    /// Watch receiver seed. `watch()` clones this so callers can
781    /// `borrow()` the current status and `changed().await` future
782    /// transitions independently.
783    rx: tokio::sync::watch::Receiver<ProcStatus>,
784}
785
786impl fmt::Debug for BootstrapProcHandle {
787    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
788        let status = self.status.lock().expect("status mutex poisoned").clone();
789        f.debug_struct("BootstrapProcHandle")
790            .field("proc_id", &self.proc_id)
791            .field("status", &status)
792            .field("child", &"<child>")
793            .field("tx", &"<watch::Sender>")
794            .field("rx", &"<watch::Receiver>")
795            // Intentionally skip stdout_tailer / stderr_tailer (not
796            // Debug).
797            .finish()
798    }
799}
800
801// Locking invariant:
802// - Do not acquire other locks from inside `transition(...)` (it
803//   holds the status mutex).
804// - If you need the child PID during a transition, snapshot it
805//   *before* calling `transition`.
806impl BootstrapProcHandle {
807    /// Construct a new [`BootstrapProcHandle`] for a freshly spawned
808    /// OS process hosting a proc.
809    ///
810    /// - Initializes the status to [`ProcStatus::Starting`] since the
811    ///   child process has been created but not yet confirmed running.
812    /// - Wraps the provided [`Child`] handle in an
813    ///   `Arc<Mutex<Option<Child>>>` so lifecycle methods (`wait`,
814    ///   `terminate`, etc.) can consume it later.
815    ///
816    /// This is the canonical entry point used by
817    /// `BootstrapProcManager` when it launches a proc into a new
818    /// process.
819    pub fn new(proc_id: ProcId, child: Child) -> Self {
820        let (tx, rx) = watch::channel(ProcStatus::Starting);
821        Self {
822            proc_id,
823            status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
824            child: Arc::new(std::sync::Mutex::new(Some(child))),
825            stdout_fwder: Arc::new(std::sync::Mutex::new(None)),
826            stderr_fwder: Arc::new(std::sync::Mutex::new(None)),
827            tx,
828            rx,
829        }
830    }
831
832    /// Return the logical proc identity in the mesh.
833    #[inline]
834    pub fn proc_id(&self) -> &ProcId {
835        &self.proc_id
836    }
837
838    /// Create a new subscription to this proc's status stream.
839    ///
840    /// Each call returns a fresh [`watch::Receiver`] tied to this
841    /// handle's internal [`ProcStatus`] channel. The receiver can be
842    /// awaited on (`rx.changed().await`) to observe lifecycle
843    /// transitions as they occur.
844    ///
845    /// Notes:
846    /// - Multiple subscribers can exist simultaneously; each sees
847    ///   every status update in order.
848    /// - Use [`BootstrapProcHandle::status`] for a one-off snapshot;
849    ///   use `watch()` when you need to await changes over time.
850    #[inline]
851    pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
852        self.rx.clone()
853    }
854
855    /// Wait until this proc's status changes.
856    ///
857    /// This is a convenience wrapper around
858    /// [`watch::Receiver::changed`]: it subscribes internally via
859    /// [`BootstrapProcHandle::watch`] and awaits the next transition.
860    /// If no subscribers exist or the channel is closed, this returns
861    /// without error.
862    ///
863    /// Typical usage:
864    /// ```ignore
865    /// handle.changed().await;
866    /// match handle.status() {
867    ///     ProcStatus::Running { .. } => { /* now running */ }
868    ///     ProcStatus::Stopped { .. } => { /* exited */ }
869    ///     _ => {}
870    /// }
871    /// ```
872    #[inline]
873    pub async fn changed(&self) {
874        let _ = self.watch().changed().await;
875    }
876
877    /// Return the OS process ID (`pid`) for this proc.
878    ///
879    /// If the proc is currently `Running`, this returns the cached
880    /// pid even if the `Child` has already been taken by the exit
881    /// monitor. Otherwise, it falls back to `Child::id()` if the
882    /// handle is still present. Returns `None` once the proc has
883    /// exited or if the handle has been consumed.
884    #[inline]
885    pub fn pid(&self) -> Option<u32> {
886        match *self.status.lock().expect("status mutex poisoned") {
887            ProcStatus::Running { pid, .. } | ProcStatus::Ready { pid, .. } => Some(pid),
888            _ => self
889                .child
890                .lock()
891                .expect("child mutex poisoned")
892                .as_ref()
893                .and_then(|c| c.id()),
894        }
895    }
896
897    /// Return a snapshot of the current [`ProcStatus`] for this proc.
898    ///
899    /// This is a *live view* of the lifecycle state as tracked by
900    /// [`BootstrapProcManager`]. It reflects what is currently known
901    /// about the underlying OS process (e.g., `Starting`, `Running`,
902    /// `Stopping`, etc.).
903    ///
904    /// Internally this reads the mutex-guarded status. Use this when
905    /// you just need a synchronous snapshot; use
906    /// [`BootstrapProcHandle::watch`] or
907    /// [`BootstrapProcHandle::changed`] if you want to await
908    /// transitions asynchronously.
909    #[must_use]
910    pub fn status(&self) -> ProcStatus {
911        // Source of truth for now is the mutex. We broadcast via
912        // `watch` in `transition`, but callers that want a
913        // synchronous snapshot should read the guarded value.
914        self.status.lock().expect("status mutex poisoned").clone()
915    }
916
917    /// Atomically apply a state transition while holding the status
918    /// lock, and send the updated value on the watch channel **while
919    /// still holding the lock**. This guarantees the mutex state and
920    /// the broadcast value stay in sync and avoids reordering between
921    /// concurrent transitions.
922    fn transition<F>(&self, f: F) -> bool
923    where
924        F: FnOnce(&mut ProcStatus) -> bool,
925    {
926        let mut guard = self.status.lock().expect("status mutex poisoned");
927        let _before = guard.clone();
928        let changed = f(&mut guard);
929        if changed {
930            // Publish while still holding the lock to preserve order.
931            let _ = self.tx.send(guard.clone());
932        }
933        changed
934    }
935
936    /// Transition this proc into the [`ProcStatus::Running`] state.
937    ///
938    /// Called internally once the child OS process has been spawned
939    /// and we can observe a valid `pid`. Records the `pid` and the
940    /// `started_at` timestamp so that callers can query them later
941    /// via [`BootstrapProcHandle::status`] or
942    /// [`BootstrapProcHandle::pid`].
943    ///
944    /// This is a best-effort marker: it reflects that the process
945    /// exists at the OS level, but does not guarantee that the proc
946    /// has completed bootstrap or is fully ready.
947    pub(crate) fn mark_running(&self, pid: u32, started_at: SystemTime) -> bool {
948        self.transition(|st| match *st {
949            ProcStatus::Starting => {
950                *st = ProcStatus::Running { pid, started_at };
951                true
952            }
953            _ => {
954                tracing::warn!(
955                    "illegal transition: {:?} -> Running; leaving status unchanged",
956                    *st
957                );
958                false
959            }
960        })
961    }
962
963    /// Attempt to transition this proc into the [`ProcStatus::Ready`]
964    /// state.
965    ///
966    /// This records the process ID, start time, listening address,
967    /// and agent once the proc has successfully started and is ready
968    /// to serve.
969    ///
970    /// Returns `true` if the transition succeeded (from `Starting` or
971    /// `Running`), or `false` if the current state did not allow
972    /// moving to `Ready`. In the latter case the state is left
973    /// unchanged and a warning is logged.
974    pub(crate) fn mark_ready(
975        &self,
976        pid: u32,
977        started_at: SystemTime,
978        addr: ChannelAddr,
979        agent: ActorRef<ProcMeshAgent>,
980    ) -> bool {
981        tracing::info!(proc_id = %self.proc_id, %addr, "{} running on pid {}", self.proc_id, pid);
982        self.transition(|st| match st {
983            ProcStatus::Starting | ProcStatus::Running { .. } => {
984                *st = ProcStatus::Ready {
985                    pid,
986                    started_at,
987                    addr,
988                    agent,
989                };
990                true
991            }
992            _ => {
993                tracing::warn!(
994                    "illegal transition: {:?} -> Ready; leaving status unchanged",
995                    st
996                );
997                false
998            }
999        })
1000    }
1001
1002    /// Record that a stop has been requested for the proc (e.g. a
1003    /// graceful shutdown via SIGTERM), but the underlying process has
1004    /// not yet fully exited.
1005    pub(crate) fn mark_stopping(&self) -> bool {
1006        // Avoid taking out additional locks in .transition().
1007        let child_pid = self.child_pid_snapshot();
1008        let now = hyperactor::clock::RealClock.system_time_now();
1009
1010        self.transition(|st| match *st {
1011            ProcStatus::Running { pid, started_at } => {
1012                *st = ProcStatus::Stopping { pid, started_at };
1013                true
1014            }
1015            ProcStatus::Ready {
1016                pid, started_at, ..
1017            } => {
1018                *st = ProcStatus::Stopping { pid, started_at };
1019                true
1020            }
1021            ProcStatus::Starting => {
1022                if let Some(pid) = child_pid {
1023                    *st = ProcStatus::Stopping {
1024                        pid,
1025                        started_at: now,
1026                    };
1027                    true
1028                } else {
1029                    false
1030                }
1031            }
1032            _ => false,
1033        })
1034    }
1035
1036    /// Record that the process has exited normally with the given
1037    /// exit code.
1038    pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
1039        self.transition(|st| match *st {
1040            ProcStatus::Starting
1041            | ProcStatus::Running { .. }
1042            | ProcStatus::Ready { .. }
1043            | ProcStatus::Stopping { .. } => {
1044                *st = ProcStatus::Stopped {
1045                    exit_code,
1046                    stderr_tail,
1047                };
1048                true
1049            }
1050            _ => {
1051                tracing::warn!(
1052                    "illegal transition: {:?} -> Stopped; leaving status unchanged",
1053                    *st
1054                );
1055                false
1056            }
1057        })
1058    }
1059
1060    /// Record that the process was killed by the given signal (e.g.
1061    /// SIGKILL, SIGTERM).
1062    pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
1063        self.transition(|st| match *st {
1064            ProcStatus::Starting
1065            | ProcStatus::Running { .. }
1066            | ProcStatus::Ready { .. }
1067            | ProcStatus::Stopping { .. } => {
1068                *st = ProcStatus::Killed {
1069                    signal,
1070                    core_dumped,
1071                };
1072                true
1073            }
1074            _ => {
1075                tracing::warn!(
1076                    "illegal transition: {:?} -> Killed; leaving status unchanged",
1077                    *st
1078                );
1079                false
1080            }
1081        })
1082    }
1083
1084    /// Record that the proc or its process failed for an unexpected
1085    /// reason (bootstrap error, spawn failure, etc.).
1086    pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
1087        self.transition(|st| match *st {
1088            ProcStatus::Starting
1089            | ProcStatus::Running { .. }
1090            | ProcStatus::Ready { .. }
1091            | ProcStatus::Stopping { .. } => {
1092                *st = ProcStatus::Failed {
1093                    reason: reason.into(),
1094                };
1095                true
1096            }
1097            _ => {
1098                tracing::warn!(
1099                    "illegal transition: {:?} -> Failed; leaving status unchanged",
1100                    *st
1101                );
1102                false
1103            }
1104        })
1105    }
1106
1107    /// Wait until the proc has reached a terminal state and return
1108    /// it.
1109    ///
1110    /// Terminal means [`ProcStatus::Stopped`],
1111    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]. If the
1112    /// current status is already terminal, returns immediately.
1113    ///
1114    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
1115    /// owner of the OS process, so you can call `wait()` from
1116    /// multiple tasks concurrently.
1117    ///
1118    /// Implementation detail: listens on this handle's `watch`
1119    /// channel. It snapshots the current status, and if not terminal
1120    /// awaits the next change. If the channel closes unexpectedly,
1121    /// returns the last observed status.
1122    ///
1123    /// Mirrors `tokio::process::Child::wait()`, but yields the
1124    /// higher-level [`ProcStatus`] instead of an `ExitStatus`.
1125    #[must_use]
1126    pub async fn wait_inner(&self) -> ProcStatus {
1127        let mut rx = self.watch();
1128        loop {
1129            let st = rx.borrow().clone();
1130            if st.is_exit() {
1131                return st;
1132            }
1133            // If the channel closes, return the last observed value.
1134            if rx.changed().await.is_err() {
1135                return st;
1136            }
1137        }
1138    }
1139
1140    /// Wait until the proc reaches the [`ProcStatus::Ready`] state.
1141    ///
1142    /// If the proc hits a terminal state ([`ProcStatus::Stopped`],
1143    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]) before ever
1144    /// becoming `Ready`, this returns
1145    /// `Err(ReadyError::Terminal(status))`. If the internal watch
1146    /// channel closes unexpectedly, this returns
1147    /// `Err(ReadyError::ChannelClosed)`. Otherwise it returns
1148    /// `Ok(())` when `Ready` is first observed.
1149    ///
1150    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
1151    /// owner; multiple tasks may await `ready()` concurrently.
1152    /// `Stopping` is not treated as terminal here; we continue
1153    /// waiting until `Ready` or a terminal state is seen.
1154    ///
1155    /// Companion to [`BootstrapProcHandle::wait_inner`]:
1156    /// `wait_inner()` resolves on exit; `ready_inner()` resolves on
1157    /// startup.
1158    pub async fn ready_inner(&self) -> Result<(), ReadyError> {
1159        let mut rx = self.watch();
1160        loop {
1161            let st = rx.borrow().clone();
1162            match &st {
1163                ProcStatus::Ready { .. } => return Ok(()),
1164                s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1165                _non_terminal => {
1166                    if rx.changed().await.is_err() {
1167                        return Err(ReadyError::ChannelClosed);
1168                    }
1169                }
1170            }
1171        }
1172    }
1173
1174    /// Best-effort, non-blocking snapshot of the child's PID. Locks
1175    /// only the `child` mutex and never touches `status`.
1176    fn child_pid_snapshot(&self) -> Option<u32> {
1177        self.child
1178            .lock()
1179            .expect("child mutex poisoned")
1180            .as_ref()
1181            .and_then(|c| c.id())
1182    }
1183
1184    /// Try to fetch a PID we can signal. Prefer cached PID from
1185    /// status; fall back to Child::id() if still present. None once
1186    /// terminal or if monitor already consumed the Child and we never
1187    /// cached a pid.
1188    fn signalable_pid(&self) -> Option<i32> {
1189        match &*self.status.lock().expect("status mutex poisoned") {
1190            ProcStatus::Running { pid, .. }
1191            | ProcStatus::Ready { pid, .. }
1192            | ProcStatus::Stopping { pid, .. } => Some(*pid as i32),
1193            _ => self
1194                .child
1195                .lock()
1196                .expect("child mutex poisoned")
1197                .as_ref()
1198                .and_then(|c| c.id())
1199                .map(|p| p as i32),
1200        }
1201    }
1202
1203    /// Send a signal to the pid. Returns Ok(()) if delivered. If
1204    /// ESRCH (no such process), we treat that as "already gone" and
1205    /// return Ok(()) so callers can proceed to observe the terminal
1206    /// state through the monitor.
1207    fn send_signal(pid: i32, sig: i32) -> Result<(), anyhow::Error> {
1208        // SAFETY: Sending a POSIX signal; does not dereference
1209        // memory.
1210        let rc = unsafe { libc::kill(pid, sig) };
1211        if rc == 0 {
1212            Ok(())
1213        } else {
1214            let e = std::io::Error::last_os_error();
1215            if let Some(libc::ESRCH) = e.raw_os_error() {
1216                // Process already gone; proceed to wait/observe
1217                // terminal state.
1218                Ok(())
1219            } else {
1220                Err(anyhow::anyhow!("kill({pid}, {sig}) failed: {e}"))
1221            }
1222        }
1223    }
1224
1225    pub fn set_stream_monitors(&self, out: Option<StreamFwder>, err: Option<StreamFwder>) {
1226        *self
1227            .stdout_fwder
1228            .lock()
1229            .expect("stdout_tailer mutex poisoned") = out;
1230        *self
1231            .stderr_fwder
1232            .lock()
1233            .expect("stderr_tailer mutex poisoned") = err;
1234    }
1235
1236    fn take_stream_monitors(&self) -> (Option<StreamFwder>, Option<StreamFwder>) {
1237        let out = self
1238            .stdout_fwder
1239            .lock()
1240            .expect("stdout_tailer mutex poisoned")
1241            .take();
1242        let err = self
1243            .stderr_fwder
1244            .lock()
1245            .expect("stderr_tailer mutex poisoned")
1246            .take();
1247        (out, err)
1248    }
1249
1250    /// Sends a StopAll message to the ProcMeshAgent, which should exit the process.
1251    /// Waits for the successful state change of the process. If the process
1252    /// doesn't reach a terminal state, returns Err.
1253    async fn send_stop_all(
1254        &self,
1255        cx: &impl context::Actor,
1256        agent: ActorRef<ProcMeshAgent>,
1257        timeout: Duration,
1258    ) -> anyhow::Result<ProcStatus> {
1259        // For all of the messages and replies in this function:
1260        // if the proc is already dead, then the message will be undeliverable,
1261        // which should be ignored.
1262        // If this message isn't deliverable to the agent, the process may have
1263        // stopped already. No need to produce any errors, just continue with
1264        // killing the process.
1265        let mut agent_port = agent.port();
1266        agent_port.return_undeliverable(false);
1267        agent_port.send(cx, resource::StopAll {})?;
1268        // The agent handling Stop should exit the process, if it doesn't within
1269        // the time window, we escalate to SIGTERM.
1270        match RealClock.timeout(timeout, self.wait()).await {
1271            Ok(Ok(st)) => Ok(st),
1272            Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1273            Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
1274        }
1275    }
1276}
1277
1278#[async_trait]
1279impl hyperactor::host::ProcHandle for BootstrapProcHandle {
1280    type Agent = ProcMeshAgent;
1281    type TerminalStatus = ProcStatus;
1282
1283    #[inline]
1284    fn proc_id(&self) -> &ProcId {
1285        &self.proc_id
1286    }
1287
1288    #[inline]
1289    fn addr(&self) -> Option<ChannelAddr> {
1290        match &*self.status.lock().expect("status mutex poisoned") {
1291            ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1292            _ => None,
1293        }
1294    }
1295
1296    #[inline]
1297    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1298        match &*self.status.lock().expect("status mutex poisoned") {
1299            ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1300            _ => None,
1301        }
1302    }
1303
1304    /// Wait until this proc first reaches the [`ProcStatus::Ready`]
1305    /// state.
1306    ///
1307    /// Returns `Ok(())` once `Ready` is observed.
1308    ///
1309    /// If the proc transitions directly to a terminal state before
1310    /// becoming `Ready`, returns `Err(ReadyError::Terminal(status))`.
1311    ///
1312    /// If the internal status watch closes unexpectedly before
1313    /// `Ready` is observed, returns `Err(ReadyError::ChannelClosed)`.
1314    async fn ready(&self) -> Result<(), hyperactor::host::ReadyError<Self::TerminalStatus>> {
1315        match self.ready_inner().await {
1316            Ok(()) => Ok(()),
1317            Err(ReadyError::Terminal(status)) => {
1318                Err(hyperactor::host::ReadyError::Terminal(status))
1319            }
1320            Err(ReadyError::ChannelClosed) => Err(hyperactor::host::ReadyError::ChannelClosed),
1321        }
1322    }
1323
1324    /// Wait until this proc reaches a terminal [`ProcStatus`].
1325    ///
1326    /// Returns `Ok(status)` when a terminal state is observed
1327    /// (`Stopped`, `Killed`, or `Failed`).
1328    ///
1329    /// If the internal status watch closes before any terminal state
1330    /// is seen, returns `Err(WaitError::ChannelClosed)`.
1331    async fn wait(&self) -> Result<Self::TerminalStatus, hyperactor::host::WaitError> {
1332        let status = self.wait_inner().await;
1333        if status.is_exit() {
1334            Ok(status)
1335        } else {
1336            Err(hyperactor::host::WaitError::ChannelClosed)
1337        }
1338    }
1339
1340    /// Attempt to terminate the underlying OS process.
1341    ///
1342    /// This drives **process-level** teardown only:
1343    /// - Sends `SIGTERM` to the process.
1344    /// - Waits up to `timeout` for it to exit cleanly.
1345    /// - Escalates to `SIGKILL` if still alive, then waits a short
1346    ///   hard-coded grace period (`HARD_WAIT_AFTER_KILL`) to ensure
1347    ///   the process is reaped.
1348    ///
1349    /// If the process was already in a terminal state when called,
1350    /// returns [`TerminateError::AlreadyTerminated`].
1351    ///
1352    /// # Notes
1353    /// - This does *not* attempt a graceful proc-level stop via
1354    ///   `ProcMeshAgent` or other actor messaging. That integration
1355    ///   will come later once proc-level control is wired up.
1356    /// - Errors may also be returned if the process PID cannot be
1357    ///   determined, if signal delivery fails, or if the status
1358    ///   channel is closed unexpectedly.
1359    ///
1360    /// # Parameters
1361    /// - `timeout`: Grace period to wait after `SIGTERM` before
1362    ///   escalating.
1363    ///
1364    /// # Returns
1365    /// - `Ok(ProcStatus)` if the process exited during the
1366    ///   termination sequence.
1367    /// - `Err(TerminateError)` if already exited, signaling failed,
1368    ///   or the channel was lost.
1369    async fn terminate(
1370        &self,
1371        cx: &impl context::Actor,
1372        timeout: Duration,
1373    ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1374        const HARD_WAIT_AFTER_KILL: Duration = Duration::from_secs(5);
1375
1376        // If already terminal, return that.
1377        let st0 = self.status();
1378        if st0.is_exit() {
1379            tracing::debug!(?st0, "terminate(): already terminal");
1380            return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1381        }
1382
1383        // Find a PID to signal.
1384        let pid = self.signalable_pid().ok_or_else(|| {
1385            let st = self.status();
1386            tracing::warn!(?st, "terminate(): no signalable pid");
1387            hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1388                "no signalable pid (state: {:?})",
1389                st
1390            ))
1391        })?;
1392
1393        // Best-effort mark "Stopping" (ok if state races).
1394
1395        // Before sending SIGTERM, try to close actors normally. Only works if
1396        // they are in the Ready state and have an Agent we can message.
1397        let agent = self.agent_ref();
1398        if let Some(agent) = agent {
1399            match self.send_stop_all(cx, agent.clone(), timeout).await {
1400                Ok(st) => return Ok(st),
1401                Err(e) => {
1402                    // Variety of possible errors, proceed with SIGTERM.
1403                    tracing::warn!(
1404                        "ProcMeshAgent {} could not successfully stop all actors: {}",
1405                        agent.actor_id(),
1406                        e,
1407                    );
1408                }
1409            }
1410        }
1411        // If the stop all actors message was unsuccessful, we need
1412        // to actually stop the process.
1413        let _ = self.mark_stopping();
1414
1415        // Send SIGTERM (ESRCH is treated as "already gone").
1416        tracing::info!(pid, ?timeout, "terminate(): sending SIGTERM");
1417        if let Err(e) = Self::send_signal(pid, libc::SIGTERM) {
1418            tracing::warn!(pid, error=%e, "terminate(): SIGTERM delivery failed");
1419            return Err(hyperactor::host::TerminateError::Io(e));
1420        }
1421        tracing::debug!(pid, "terminate(): SIGTERM sent");
1422
1423        // Wait up to the timeout for a terminal state.
1424        match RealClock.timeout(timeout, self.wait_inner()).await {
1425            Ok(st) if st.is_exit() => {
1426                tracing::info!(pid, ?st, "terminate(): exited after SIGTERM");
1427                Ok(st)
1428            }
1429            Ok(non_exit) => {
1430                // wait_inner() should only resolve terminal; treat as
1431                // channel issue.
1432                tracing::warn!(pid, ?non_exit, "terminate(): wait returned non-terminal");
1433                Err(hyperactor::host::TerminateError::ChannelClosed)
1434            }
1435            Err(_elapsed) => {
1436                // Escalate to SIGKILL
1437                tracing::warn!(pid, "terminate(): timeout; escalating to SIGKILL");
1438                if let Some(pid2) = self.signalable_pid() {
1439                    if let Err(e) = Self::send_signal(pid2, libc::SIGKILL) {
1440                        tracing::warn!(pid=pid2, error=%e, "terminate(): SIGKILL delivery failed");
1441                        return Err(hyperactor::host::TerminateError::Io(e));
1442                    }
1443                    tracing::info!(pid = pid2, "terminate(): SIGKILL sent");
1444                } else {
1445                    tracing::warn!("terminate(): lost pid before SIGKILL escalation");
1446                }
1447                // Hard bound after KILL so we can't hang forever.
1448                match RealClock
1449                    .timeout(HARD_WAIT_AFTER_KILL, self.wait_inner())
1450                    .await
1451                {
1452                    Ok(st) if st.is_exit() => {
1453                        tracing::info!(?st, "terminate(): exited after SIGKILL");
1454                        Ok(st)
1455                    }
1456                    other => {
1457                        tracing::warn!(
1458                            ?other,
1459                            "terminate(): post-KILL wait did not yield terminal"
1460                        );
1461                        Err(hyperactor::host::TerminateError::ChannelClosed)
1462                    }
1463                }
1464            }
1465        }
1466    }
1467
1468    /// Forcibly kill the underlying OS process with `SIGKILL`.
1469    ///
1470    /// This bypasses any graceful shutdown semantics and immediately
1471    /// delivers a non-catchable `SIGKILL` to the child. It is
1472    /// intended as a last-resort termination mechanism when
1473    /// `terminate()` fails or when no grace period is desired.
1474    ///
1475    /// # Behavior
1476    /// - If the process was already in a terminal state, returns
1477    ///   [`TerminateError::AlreadyTerminated`].
1478    /// - Otherwise attempts to send `SIGKILL` to the current PID.
1479    /// - Then waits for the exit monitor to observe a terminal state.
1480    ///
1481    /// # Notes
1482    /// - This is strictly an **OS-level kill**. It does *not* attempt
1483    ///   proc-level shutdown via `ProcMeshAgent` or actor messages.
1484    ///   That integration will be layered in later.
1485    /// - Errors may be returned if the PID cannot be determined, if
1486    ///   signal delivery fails, or if the status channel closes
1487    ///   unexpectedly.
1488    ///
1489    /// # Returns
1490    /// - `Ok(ProcStatus)` if the process exited after `SIGKILL`.
1491    /// - `Err(TerminateError)` if already exited, signaling failed,
1492    ///   or the channel was lost.
1493    async fn kill(
1494        &self,
1495    ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1496        // NOTE: This is a pure OS-level kill (SIGKILL) of the child
1497        // process. It bypasses any proc-level shutdown semantics.
1498        //
1499        // Future work: layer in proc-level stop semantics through
1500        // actor messages once those are implemented.
1501
1502        // If already terminal, return that.
1503        let st0 = self.status();
1504        if st0.is_exit() {
1505            return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1506        }
1507
1508        // Try to get a PID and send SIGKILL.
1509        let pid = self.signalable_pid().ok_or_else(|| {
1510            hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1511                "no signalable pid (state: {:?})",
1512                self.status()
1513            ))
1514        })?;
1515
1516        if let Err(e) = Self::send_signal(pid, libc::SIGKILL) {
1517            return Err(hyperactor::host::TerminateError::Io(e));
1518        }
1519
1520        // Wait for exit monitor to record terminal status.
1521        let st = self.wait_inner().await;
1522        if st.is_exit() {
1523            Ok(st)
1524        } else {
1525            Err(hyperactor::host::TerminateError::ChannelClosed)
1526        }
1527    }
1528}
1529
1530/// A specification of the command used to bootstrap procs.
1531#[derive(Debug, Named, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1532pub struct BootstrapCommand {
1533    pub program: PathBuf,
1534    pub arg0: Option<String>,
1535    pub args: Vec<String>,
1536    pub env: HashMap<String, String>,
1537}
1538wirevalue::register_type!(BootstrapCommand);
1539
1540impl BootstrapCommand {
1541    /// Creates a bootstrap command specification to replicate the
1542    /// invocation of the currently running process.
1543    pub fn current() -> io::Result<Self> {
1544        let mut args: VecDeque<String> = std::env::args().collect();
1545        let arg0 = args.pop_front();
1546
1547        Ok(Self {
1548            program: std::env::current_exe()?,
1549            arg0,
1550            args: args.into(),
1551            env: std::env::vars().collect(),
1552        })
1553    }
1554
1555    /// Create a new `Command` reflecting this bootstrap command
1556    /// configuration.
1557    pub fn new(&self) -> Command {
1558        let mut cmd = Command::new(&self.program);
1559        if let Some(arg0) = &self.arg0 {
1560            cmd.arg0(arg0);
1561        }
1562        for arg in &self.args {
1563            cmd.arg(arg);
1564        }
1565        for (k, v) in &self.env {
1566            cmd.env(k, v);
1567        }
1568        cmd
1569    }
1570
1571    /// Bootstrap command used for testing, invoking the Buck-built
1572    /// `monarch/hyperactor_mesh/bootstrap` binary.
1573    ///
1574    /// Intended for integration tests where we need to spawn real
1575    /// bootstrap processes under proc manager control. Not available
1576    /// outside of test builds.
1577    #[cfg(test)]
1578    #[cfg(fbcode_build)]
1579    pub(crate) fn test() -> Self {
1580        Self {
1581            program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1582            arg0: None,
1583            args: vec![],
1584            env: HashMap::new(),
1585        }
1586    }
1587}
1588
1589impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1590    /// Creates a bootstrap command from the provided path.
1591    fn from(s: T) -> Self {
1592        Self {
1593            program: s.into(),
1594            arg0: None,
1595            args: vec![],
1596            env: HashMap::new(),
1597        }
1598    }
1599}
1600
1601/// A process manager for launching and supervising **bootstrap
1602/// processes** (via the [`bootstrap`] entry point).
1603///
1604/// `BootstrapProcManager` is the host-side runtime for external procs
1605/// in a hyperactor mesh. It spawns the configured bootstrap binary,
1606/// forwards each child's stdout/stderr into the logging channel,
1607/// tracks lifecycle state through [`BootstrapProcHandle`] /
1608/// [`ProcStatus`], and ensures best-effort teardown on drop.
1609///
1610/// Internally it maintains two maps:
1611/// - [`children`]: async map from [`ProcId`] to
1612///   [`BootstrapProcHandle`], used for lifecycle queries (`status`) and
1613///   exit monitoring.
1614/// - [`pid_table`]: synchronous map from [`ProcId`] to raw PIDs, used
1615///   in [`Drop`] to synchronously send `SIGKILL` to any still-running
1616///   children.
1617///
1618/// Together these provide both a queryable, real-time status surface
1619/// and a deterministic cleanup path, so no child processes are left
1620/// orphaned if the manager itself is dropped.
1621#[derive(Debug)]
1622pub struct BootstrapProcManager {
1623    /// The command specification used to bootstrap new processes.
1624    command: BootstrapCommand,
1625    /// Async registry of running children, keyed by [`ProcId`]. Holds
1626    /// [`BootstrapProcHandle`]s so callers can query or monitor
1627    /// status.
1628    children: Arc<tokio::sync::Mutex<HashMap<ProcId, BootstrapProcHandle>>>,
1629    /// Synchronous table of raw PIDs, keyed by [`ProcId`]. Used
1630    /// exclusively in the [`Drop`] impl to send `SIGKILL` without
1631    /// needing async context.
1632    pid_table: Arc<std::sync::Mutex<HashMap<ProcId, u32>>>,
1633    /// FileMonitor that aggregates logs from all children.
1634    /// None if file monitor creation failed.
1635    file_appender: Option<Arc<crate::logging::FileAppender>>,
1636
1637    /// Directory for storing proc socket files. Procs place their sockets
1638    /// in this directory, so that they can be looked up by other procs
1639    /// for direct transfer.
1640    socket_dir: TempDir,
1641}
1642
1643impl Drop for BootstrapProcManager {
1644    /// Drop implementation for [`BootstrapProcManager`].
1645    ///
1646    /// Ensures that when the manager itself is dropped, any child
1647    /// processes it spawned are also terminated. This is a
1648    /// best-effort cleanup mechanism: the manager iterates over its
1649    /// recorded PID table and sends `SIGKILL` to each one.
1650    ///
1651    /// Notes:
1652    /// - Uses a synchronous `Mutex` so it can be locked safely in
1653    ///   `Drop` without async context.
1654    /// - Safety: sending `SIGKILL` is low-level but safe here because
1655    ///   it only instructs the OS to terminate the process. The only
1656    ///   risk is PID reuse, in which case an unrelated process might
1657    ///   be signaled. This is accepted for shutdown semantics.
1658    /// - This makes `BootstrapProcManager` robust against leaks:
1659    ///   dropping it should not leave stray child processes running.
1660    fn drop(&mut self) {
1661        if let Ok(table) = self.pid_table.lock() {
1662            for (proc_id, pid) in table.iter() {
1663                // SAFETY: We own these PIDs because they were spawned and recorded
1664                // by this manager. Sending SIGKILL is safe here since it does not
1665                // dereference any memory; it only instructs the OS to terminate the
1666                // process. The worst-case outcome is that the PID has already exited
1667                // and been reused, in which case the signal may affect an unrelated
1668                // process. We accept that risk in Drop semantics because this path
1669                // is only used for cleanup at shutdown.
1670                unsafe {
1671                    libc::kill(*pid as i32, libc::SIGKILL);
1672                }
1673                tracing::info!(
1674                    "BootstrapProcManager::drop: sent SIGKILL to pid {} for {:?}",
1675                    pid,
1676                    proc_id
1677                );
1678            }
1679        }
1680    }
1681}
1682
1683impl BootstrapProcManager {
1684    /// Construct a new [`BootstrapProcManager`] that will launch
1685    /// procs using the given bootstrap command specification.
1686    ///
1687    /// This is the general entry point when you want to manage procs
1688    /// backed by a specific binary path (e.g. a bootstrap
1689    /// trampoline).
1690    pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1691        let file_appender = if hyperactor_config::global::get(MESH_ENABLE_FILE_CAPTURE) {
1692            match crate::logging::FileAppender::new() {
1693                Some(fm) => {
1694                    tracing::info!("file appender created successfully");
1695                    Some(Arc::new(fm))
1696                }
1697                None => {
1698                    tracing::warn!("failed to create file appender");
1699                    None
1700                }
1701            }
1702        } else {
1703            None
1704        };
1705
1706        Ok(Self {
1707            command,
1708            children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1709            pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
1710            file_appender,
1711            socket_dir: runtime_dir()?,
1712        })
1713    }
1714
1715    /// The bootstrap command used to launch processes.
1716    pub fn command(&self) -> &BootstrapCommand {
1717        &self.command
1718    }
1719
1720    /// The socket directory, where per-proc Unix sockets are placed.
1721    pub fn socket_dir(&self) -> &Path {
1722        self.socket_dir.path()
1723    }
1724
1725    /// Return the current [`ProcStatus`] for the given [`ProcId`], if
1726    /// the proc is known to this manager.
1727    ///
1728    /// This queries the live [`BootstrapProcHandle`] stored in the
1729    /// manager's internal map. It provides an immediate snapshot of
1730    /// lifecycle state (`Starting`, `Running`, `Stopping`, `Stopped`,
1731    /// etc.).
1732    ///
1733    /// Returns `None` if the manager has no record of the proc (e.g.
1734    /// never spawned here, or entry already removed).
1735    pub async fn status(&self, proc_id: &ProcId) -> Option<ProcStatus> {
1736        self.children.lock().await.get(proc_id).map(|h| h.status())
1737    }
1738
1739    fn spawn_exit_monitor(&self, proc_id: ProcId, handle: BootstrapProcHandle) {
1740        let pid_table = Arc::clone(&self.pid_table);
1741
1742        let maybe_child = {
1743            let mut guard = handle.child.lock().expect("child mutex");
1744            let taken = guard.take();
1745            debug_assert!(guard.is_none(), "no Child should remain in handles");
1746            taken
1747        };
1748
1749        let Some(mut child) = maybe_child else {
1750            tracing::debug!("proc {proc_id}: child was already taken; skipping wait");
1751            return;
1752        };
1753
1754        tokio::spawn(async move {
1755            let wait_res = child.wait().await;
1756
1757            let mut stderr_tail: Vec<String> = Vec::new();
1758            let (stdout_mon, stderr_mon) = handle.take_stream_monitors();
1759
1760            if let Some(t) = stderr_mon {
1761                let (lines, _bytes) = t.abort().await;
1762                stderr_tail = lines;
1763            }
1764            if let Some(t) = stdout_mon {
1765                let (_lines, _bytes) = t.abort().await;
1766            }
1767
1768            let tail_str = if stderr_tail.is_empty() {
1769                None
1770            } else {
1771                Some(stderr_tail.join("\n"))
1772            };
1773
1774            let remove_from_pid_table = || {
1775                let pid = if let Ok(mut table) = pid_table.lock() {
1776                    table.remove(&proc_id)
1777                } else {
1778                    None
1779                };
1780
1781                pid.map_or_else(|| "not available".to_string(), |i| format!("{i}"))
1782            };
1783
1784            match wait_res {
1785                Ok(status) => {
1786                    if let Some(sig) = status.signal() {
1787                        let _ = handle.mark_killed(sig, status.core_dumped());
1788                        let pid_str = remove_from_pid_table();
1789                        tracing::info!(
1790                            name = "ProcStatus",
1791                            status = "Exited::KilledBySignal",
1792                            %proc_id,
1793                            tail = tail_str,
1794                            "killed by signal {sig}; proc's pid: {pid_str}"
1795                        );
1796                    } else if let Some(code) = status.code() {
1797                        let _ = handle.mark_stopped(code, stderr_tail);
1798                        let pid_str = remove_from_pid_table();
1799                        tracing::info!(
1800                            name = "ProcStatus",
1801                            status = "Exited::ExitWithCode",
1802                            %proc_id,
1803                            exit_code = code,
1804                            tail = tail_str,
1805                            "proc exited; proc's pid: {pid_str}"
1806                        );
1807                    } else {
1808                        debug_assert!(
1809                            false,
1810                            "unreachable: process terminated with neither signal nor exit code"
1811                        );
1812                        let _ = handle.mark_failed("process exited with unknown status");
1813                        let pid_str = remove_from_pid_table();
1814                        tracing::info!(
1815                            name = "ProcStatus",
1816                            status = "Exited::Unknown",
1817                            %proc_id,
1818                            tail = tail_str,
1819                            "unknown exit: unreachable exit status (no code, no signal); proc's pid: {pid_str}"
1820                        );
1821                    }
1822                }
1823                Err(e) => {
1824                    let _ = handle.mark_failed(format!("wait_inner() failed: {e}"));
1825                    let pid_str = remove_from_pid_table();
1826                    tracing::info!(
1827                        name = "ProcStatus",
1828                        status = "Exited::WaitFailed",
1829                        %proc_id,
1830                        tail = tail_str,
1831                        "proc {proc_id} wait failed; proc's pid: {pid_str}"
1832                    );
1833                }
1834            }
1835        });
1836    }
1837}
1838
1839/// The configuration used for bootstrapped procs.
1840pub struct BootstrapProcConfig {
1841    /// The proc's create rank.
1842    pub create_rank: usize,
1843
1844    /// Config values to set on the spawned proc's global config,
1845    /// at the `ClientOverride` layer.
1846    pub client_config_override: Attrs,
1847}
1848
1849#[async_trait]
1850impl ProcManager for BootstrapProcManager {
1851    type Handle = BootstrapProcHandle;
1852
1853    type Config = BootstrapProcConfig;
1854
1855    /// Return the [`ChannelTransport`] used by this proc manager.
1856    ///
1857    /// For `BootstrapProcManager` this is always
1858    /// [`ChannelTransport::Unix`], since all procs are spawned
1859    /// locally on the same host and communicate over Unix domain
1860    /// sockets.
1861    fn transport(&self) -> ChannelTransport {
1862        ChannelTransport::Unix
1863    }
1864
1865    /// Launch a new proc under this [`BootstrapProcManager`].
1866    ///
1867    /// Spawns the configured bootstrap binary (`self.program`) in a
1868    /// fresh child process. The environment is populated with
1869    /// variables that describe the bootstrap context — most
1870    /// importantly `HYPERACTOR_MESH_BOOTSTRAP_MODE`, which carries a
1871    /// base64-encoded JSON [`Bootstrap::Proc`] payload (proc id,
1872    /// backend addr, callback addr, optional config snapshot).
1873    /// Additional variables like `BOOTSTRAP_LOG_CHANNEL` are also set
1874    /// up for logging and control.
1875    ///
1876    /// Responsibilities performed here:
1877    /// - Create a one-shot callback channel so the child can confirm
1878    ///   successful bootstrap and return its mailbox address plus agent
1879    ///   reference.
1880    /// - Spawn the OS process with stdout/stderr piped.
1881    /// - Stamp the new [`BootstrapProcHandle`] as
1882    ///   [`ProcStatus::Running`] once a PID is observed.
1883    /// - Wire stdout/stderr pipes into local writers and forward them
1884    ///   over the logging channel (`BOOTSTRAP_LOG_CHANNEL`).
1885    /// - Insert the handle into the manager's children map and start
1886    ///   an exit monitor to track process termination.
1887    ///
1888    /// Returns a [`BootstrapProcHandle`] that exposes the child
1889    /// process's lifecycle (status, wait/ready, termination). Errors
1890    /// are surfaced as [`HostError`].
1891    #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string()))]
1892    async fn spawn(
1893        &self,
1894        proc_id: ProcId,
1895        backend_addr: ChannelAddr,
1896        config: BootstrapProcConfig,
1897    ) -> Result<Self::Handle, HostError> {
1898        let (callback_addr, mut callback_rx) =
1899            channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1900
1901        // Decide whether we need to capture stdio.
1902        let overrides = &config.client_config_override;
1903        let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
1904        let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
1905        let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
1906        let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1907
1908        let mode = Bootstrap::Proc {
1909            proc_id: proc_id.clone(),
1910            backend_addr,
1911            callback_addr,
1912            socket_dir_path: self.socket_dir.path().to_owned(),
1913            config: Some(config.client_config_override),
1914        };
1915        let mut cmd = self.command.new();
1916        cmd.env(
1917            "HYPERACTOR_MESH_BOOTSTRAP_MODE",
1918            mode.to_env_safe_string()
1919                .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?,
1920        );
1921        cmd.env(
1922            "HYPERACTOR_PROCESS_NAME",
1923            format!(
1924                "proc {} @ {}",
1925                match &proc_id {
1926                    ProcId::Direct(_, name) => name.clone(),
1927                    ProcId::Ranked(world_id, rank) => format!("{world_id}[{rank}]"),
1928                },
1929                hostname::get()
1930                    .unwrap_or_else(|_| "unknown_host".into())
1931                    .into_string()
1932                    .unwrap_or("unknown_host".to_string())
1933            ),
1934        );
1935
1936        if need_stdio {
1937            cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1938        } else {
1939            cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit());
1940            tracing::info!(
1941                %proc_id, enable_forwarding, enable_file_capture, tail_size,
1942                "child stdio NOT captured (forwarding/file_capture/tail all disabled); inheriting parent console"
1943            );
1944        }
1945
1946        let log_channel: Option<ChannelAddr> = if enable_forwarding {
1947            let addr = ChannelAddr::any(ChannelTransport::Unix);
1948            cmd.env(BOOTSTRAP_LOG_CHANNEL, addr.to_string());
1949            Some(addr)
1950        } else {
1951            None
1952        };
1953
1954        let mut child = cmd
1955            .spawn()
1956            .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
1957        let pid = child.id().unwrap_or_default();
1958
1959        let (out_fwder, err_fwder) = if need_stdio {
1960            let stdout: ChildStdout = child.stdout.take().expect("stdout piped but missing");
1961            let stderr: ChildStderr = child.stderr.take().expect("stderr piped but missing");
1962
1963            let (file_stdout, file_stderr) = if enable_file_capture {
1964                match self.file_appender.as_deref() {
1965                    Some(fm) => (
1966                        Some(fm.addr_for(OutputTarget::Stdout)),
1967                        Some(fm.addr_for(OutputTarget::Stderr)),
1968                    ),
1969                    None => {
1970                        tracing::warn!("enable_file_capture=true but no FileAppender");
1971                        (None, None)
1972                    }
1973                }
1974            } else {
1975                (None, None)
1976            };
1977
1978            let out = StreamFwder::start(
1979                stdout,
1980                file_stdout, // Option<ChannelAddr>
1981                OutputTarget::Stdout,
1982                tail_size,
1983                log_channel.clone(), // Option<ChannelAddr>
1984                pid,
1985                config.create_rank,
1986            );
1987            let err = StreamFwder::start(
1988                stderr,
1989                file_stderr,
1990                OutputTarget::Stderr,
1991                tail_size,
1992                log_channel.clone(),
1993                pid,
1994                config.create_rank,
1995            );
1996            (Some(out), Some(err))
1997        } else {
1998            (None, None)
1999        };
2000
2001        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2002
2003        if let Some(pid) = handle.pid() {
2004            handle.mark_running(pid, hyperactor::clock::RealClock.system_time_now());
2005            if let Ok(mut table) = self.pid_table.lock() {
2006                table.insert(proc_id.clone(), pid);
2007            }
2008        }
2009
2010        handle.set_stream_monitors(out_fwder, err_fwder);
2011
2012        // Retain handle for lifecycle mgt.
2013        {
2014            let mut children = self.children.lock().await;
2015            children.insert(proc_id.clone(), handle.clone());
2016        }
2017
2018        // Kick off an exit monitor that updates ProcStatus when the
2019        // OS process ends
2020        self.spawn_exit_monitor(proc_id.clone(), handle.clone());
2021
2022        let h = handle.clone();
2023        let pid_table = Arc::clone(&self.pid_table);
2024        tokio::spawn(async move {
2025            match callback_rx.recv().await {
2026                Ok((addr, agent)) => {
2027                    let pid = match h.pid() {
2028                        Some(p) => p,
2029                        None => {
2030                            tracing::warn!("mark_ready called with missing pid; using 0");
2031                            0
2032                        }
2033                    };
2034                    let started_at = RealClock.system_time_now();
2035                    let _ = h.mark_ready(pid, started_at, addr, agent);
2036                }
2037                Err(e) => {
2038                    // Child never called back; record failure.
2039                    let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
2040                    // Cleanup pid table entry if it was set.
2041                    if let Ok(mut table) = pid_table.lock() {
2042                        table.remove(&proc_id);
2043                    }
2044                }
2045            }
2046        });
2047
2048        // Callers do `handle.read().await` for mesh readiness.
2049        Ok(handle)
2050    }
2051}
2052
2053#[async_trait]
2054impl hyperactor::host::SingleTerminate for BootstrapProcManager {
2055    /// Attempt to gracefully terminate one child procs managed by
2056    /// this `BootstrapProcManager`.
2057    ///
2058    /// Each child handle is asked to `terminate(timeout)`, which
2059    /// sends SIGTERM, waits up to the deadline, and escalates to
2060    /// SIGKILL if necessary. Termination is attempted concurrently,
2061    /// with at most `max_in_flight` tasks running at once.
2062    ///
2063    /// Logs a warning for each failure.
2064    async fn terminate_proc(
2065        &self,
2066        cx: &impl context::Actor,
2067        proc: &ProcId,
2068        timeout: Duration,
2069    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
2070        // Snapshot to avoid holding the lock across awaits.
2071        let proc_handle: Option<BootstrapProcHandle> = {
2072            let mut guard = self.children.lock().await;
2073            guard.remove(proc)
2074        };
2075
2076        if let Some(h) = proc_handle {
2077            h.terminate(cx, timeout)
2078                .await
2079                .map(|_| (Vec::new(), Vec::new()))
2080                .map_err(|e| e.into())
2081        } else {
2082            Err(anyhow::anyhow!("proc doesn't exist: {}", proc))
2083        }
2084    }
2085}
2086
2087#[async_trait]
2088impl hyperactor::host::BulkTerminate for BootstrapProcManager {
2089    /// Attempt to gracefully terminate all child procs managed by
2090    /// this `BootstrapProcManager`.
2091    ///
2092    /// Each child handle is asked to `terminate(timeout)`, which
2093    /// sends SIGTERM, waits up to the deadline, and escalates to
2094    /// SIGKILL if necessary. Termination is attempted concurrently,
2095    /// with at most `max_in_flight` tasks running at once.
2096    ///
2097    /// Returns a [`TerminateSummary`] with counts of how many procs
2098    /// were attempted, how many successfully terminated (including
2099    /// those that were already terminal), and how many failed.
2100    ///
2101    /// Logs a warning for each failure.
2102    async fn terminate_all(
2103        &self,
2104        cx: &impl context::Actor,
2105        timeout: Duration,
2106        max_in_flight: usize,
2107    ) -> TerminateSummary {
2108        // Snapshot to avoid holding the lock across awaits.
2109        let handles: Vec<BootstrapProcHandle> = {
2110            let guard = self.children.lock().await;
2111            guard.values().cloned().collect()
2112        };
2113
2114        let attempted = handles.len();
2115        let mut ok = 0usize;
2116
2117        let results = stream::iter(handles.into_iter().map(|h| async move {
2118            match h.terminate(cx, timeout).await {
2119                Ok(_) | Err(hyperactor::host::TerminateError::AlreadyTerminated(_)) => {
2120                    // Treat "already terminal" as success.
2121                    true
2122                }
2123                Err(e) => {
2124                    tracing::warn!(error=%e, "terminate_all: failed to terminate child");
2125                    false
2126                }
2127            }
2128        }))
2129        .buffer_unordered(max_in_flight.max(1))
2130        .collect::<Vec<bool>>()
2131        .await;
2132
2133        for r in results {
2134            if r {
2135                ok += 1;
2136            }
2137        }
2138
2139        TerminateSummary {
2140            attempted,
2141            ok,
2142            failed: attempted.saturating_sub(ok),
2143        }
2144    }
2145}
2146
2147/// Entry point to processes managed by hyperactor_mesh. Any process that is part
2148/// of a hyperactor_mesh program should call [`bootstrap`], which then configures
2149/// the process according to how it is invoked.
2150///
2151/// If bootstrap returns any error, it is defunct from the point of view of hyperactor_mesh,
2152/// and the process should likely exit:
2153///
2154/// ```ignore
2155/// let err = hyperactor_mesh::bootstrap().await;
2156/// tracing::error("could not bootstrap mesh process: {}", err);
2157/// std::process::exit(1);
2158/// ```
2159///
2160/// Use [`bootstrap_or_die`] to implement this behavior directly.
2161pub async fn bootstrap() -> anyhow::Error {
2162    let boot = ok!(Bootstrap::get_from_env()).unwrap_or_else(Bootstrap::default);
2163    boot.bootstrap().await
2164}
2165
2166/// Bootstrap a v0 proc mesh. This launches a control process that responds to
2167/// Allocator2Process messages, conveying its own state in Process2Allocator messages.
2168///
2169/// The bootstrapping process is controlled by the
2170/// following environment variables:
2171///
2172/// - `HYPERACTOR_MESH_BOOTSTRAP_ADDR`: the channel address to which Process2Allocator messages
2173///   should be sent.
2174/// - `HYPERACTOR_MESH_INDEX`: an index used to identify this process to the allocator.
2175async fn bootstrap_v0_proc_mesh(config: Option<Attrs>) -> anyhow::Error {
2176    // Apply config before entering the nested scope
2177    if let Some(attrs) = config {
2178        hyperactor_config::global::set(hyperactor_config::global::Source::ClientOverride, attrs);
2179        tracing::debug!("bootstrap: installed ClientOverride config snapshot (V0ProcMesh)");
2180    } else {
2181        tracing::debug!("bootstrap: no config snapshot provided (V0ProcMesh)");
2182    }
2183    tracing::info!(
2184        "bootstrap_v0_proc_mesh config:\n{}",
2185        hyperactor_config::global::attrs()
2186    );
2187
2188    pub async fn go() -> Result<(), anyhow::Error> {
2189        let procs = Arc::new(tokio::sync::Mutex::new(Vec::<Proc>::new()));
2190        let procs_for_cleanup = procs.clone();
2191        let _cleanup_guard = hyperactor::register_signal_cleanup_scoped(Box::pin(async move {
2192            for proc_to_stop in procs_for_cleanup.lock().await.iter_mut() {
2193                if let Err(err) = proc_to_stop
2194                    .destroy_and_wait::<()>(Duration::from_millis(10), None)
2195                    .await
2196                {
2197                    tracing::error!(
2198                        "error while stopping proc {}: {}",
2199                        proc_to_stop.proc_id(),
2200                        err
2201                    );
2202                }
2203            }
2204        }));
2205
2206        let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
2207            .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
2208            .parse()?;
2209        let bootstrap_index: usize = std::env::var(BOOTSTRAP_INDEX_ENV)
2210            .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))?
2211            .parse()?;
2212        let listen_addr = ChannelAddr::any(bootstrap_addr.transport());
2213
2214        let entered = tracing::span!(
2215            Level::INFO,
2216            "bootstrap_v0_proc_mesh",
2217            %bootstrap_addr,
2218            %bootstrap_index,
2219            %listen_addr,
2220        )
2221        .entered();
2222
2223        let (serve_addr, mut rx) = channel::serve(listen_addr)?;
2224        let tx = channel::dial(bootstrap_addr.clone())?;
2225
2226        let (rtx, mut return_channel) = oneshot::channel();
2227        tx.try_post(
2228            Process2Allocator(bootstrap_index, Process2AllocatorMessage::Hello(serve_addr)),
2229            rtx,
2230        );
2231        tokio::spawn(exit_if_missed_heartbeat(bootstrap_index, bootstrap_addr));
2232
2233        let _ = entered.exit();
2234
2235        let mut the_msg;
2236
2237        tokio::select! {
2238            msg = rx.recv() => {
2239                the_msg = msg;
2240            }
2241            returned_msg = &mut return_channel => {
2242                match returned_msg {
2243                    Ok(msg) => {
2244                        return Err(anyhow::anyhow!("Hello message was not delivered:{:?}", msg));
2245                    }
2246                    Err(_) => {
2247                        the_msg = rx.recv().await;
2248                    }
2249                }
2250            }
2251        }
2252        loop {
2253            match the_msg? {
2254                Allocator2Process::StartProc(proc_id, listen_transport) => {
2255                    let span = tracing::span!(Level::INFO, "Allocator2Process::StartProc", %proc_id, %listen_transport);
2256                    let (proc, mesh_agent) = ProcMeshAgent::bootstrap(proc_id.clone())
2257                        .instrument(span.clone())
2258                        .await?;
2259                    let entered = span.entered();
2260                    let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?;
2261                    let handle = proc.clone().serve(proc_rx);
2262                    drop(handle); // linter appeasement; it is safe to drop this future
2263                    let span = entered.exit();
2264                    tx.send(Process2Allocator(
2265                        bootstrap_index,
2266                        Process2AllocatorMessage::StartedProc(
2267                            proc_id.clone(),
2268                            mesh_agent.bind(),
2269                            proc_addr,
2270                        ),
2271                    ))
2272                    .instrument(span)
2273                    .await?;
2274                    procs.lock().await.push(proc);
2275                }
2276                Allocator2Process::StopAndExit(code) => {
2277                    tracing::info!("stopping procs with code {code}");
2278                    {
2279                        for proc_to_stop in procs.lock().await.iter_mut() {
2280                            if let Err(err) = proc_to_stop
2281                                .destroy_and_wait::<()>(Duration::from_millis(10), None)
2282                                .await
2283                            {
2284                                tracing::error!(
2285                                    "error while stopping proc {}: {}",
2286                                    proc_to_stop.proc_id(),
2287                                    err
2288                                );
2289                            }
2290                        }
2291                    }
2292                    tracing::info!("exiting with {code}");
2293                    std::process::exit(code);
2294                }
2295                Allocator2Process::Exit(code) => {
2296                    tracing::info!("exiting with {code}");
2297                    std::process::exit(code);
2298                }
2299            }
2300            the_msg = rx.recv().await;
2301        }
2302    }
2303
2304    go().await.unwrap_err()
2305}
2306
2307/// A variant of [`bootstrap`] that logs the error and exits the process
2308/// if bootstrapping fails.
2309pub async fn bootstrap_or_die() -> ! {
2310    let err = bootstrap().await;
2311    let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
2312    tracing::error!("failed to bootstrap mesh process: {}", err);
2313    std::process::exit(1)
2314}
2315
2316#[derive(enum_as_inner::EnumAsInner)]
2317enum DebugSink {
2318    File(std::fs::File),
2319    Sink,
2320}
2321
2322impl DebugSink {
2323    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2324        match self {
2325            DebugSink::File(f) => f.write(buf),
2326            DebugSink::Sink => Ok(buf.len()),
2327        }
2328    }
2329    fn flush(&mut self) -> io::Result<()> {
2330        match self {
2331            DebugSink::File(f) => f.flush(),
2332            DebugSink::Sink => Ok(()),
2333        }
2334    }
2335}
2336
2337fn debug_sink() -> &'static Mutex<DebugSink> {
2338    static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
2339    DEBUG_SINK.get_or_init(|| {
2340        let debug_path = {
2341            let mut p = std::env::temp_dir();
2342            if let Ok(user) = std::env::var("USER") {
2343                p.push(user);
2344            }
2345            std::fs::create_dir_all(&p).ok();
2346            p.push("monarch-bootstrap-debug.log");
2347            p
2348        };
2349        let sink = if debug_path.exists() {
2350            match OpenOptions::new()
2351                .append(true)
2352                .create(true)
2353                .open(debug_path.clone())
2354            {
2355                Ok(f) => DebugSink::File(f),
2356                Err(_e) => {
2357                    eprintln!(
2358                        "failed to open {} for bootstrap debug logging",
2359                        debug_path.display()
2360                    );
2361                    DebugSink::Sink
2362                }
2363            }
2364        } else {
2365            DebugSink::Sink
2366        };
2367        Mutex::new(sink)
2368    })
2369}
2370
2371/// If true, send `Debug` messages to stderr.
2372const DEBUG_TO_STDERR: bool = false;
2373
2374/// A bootstrap specific debug writer. If the file /tmp/monarch-bootstrap-debug.log
2375/// exists, then the writer's destination is that file; otherwise it discards all writes.
2376struct Debug;
2377
2378impl Debug {
2379    fn is_active() -> bool {
2380        DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
2381    }
2382}
2383
2384impl Write for Debug {
2385    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2386        let res = debug_sink().lock().unwrap().write(buf);
2387        if DEBUG_TO_STDERR {
2388            let n = match res {
2389                Ok(n) => n,
2390                Err(_) => buf.len(),
2391            };
2392            let _ = io::stderr().write_all(&buf[..n]);
2393        }
2394
2395        res
2396    }
2397    fn flush(&mut self) -> io::Result<()> {
2398        let res = debug_sink().lock().unwrap().flush();
2399        if DEBUG_TO_STDERR {
2400            let _ = io::stderr().flush();
2401        }
2402        res
2403    }
2404}
2405
2406/// Create a new runtime [`TempDir`]. The directory is created in
2407/// `$XDG_RUNTIME_DIR`, otherwise falling back to the system tempdir.
2408fn runtime_dir() -> io::Result<TempDir> {
2409    match std::env::var_os("XDG_RUNTIME_DIR") {
2410        Some(runtime_dir) => {
2411            let path = PathBuf::from(runtime_dir);
2412            tempfile::tempdir_in(path)
2413        }
2414        None => tempfile::tempdir(),
2415    }
2416}
2417
2418#[cfg(test)]
2419mod tests {
2420    use std::path::PathBuf;
2421    use std::process::Stdio;
2422
2423    use hyperactor::ActorId;
2424    use hyperactor::ActorRef;
2425    use hyperactor::ProcId;
2426    use hyperactor::RemoteSpawn;
2427    use hyperactor::WorldId;
2428    use hyperactor::channel::ChannelAddr;
2429    use hyperactor::channel::ChannelTransport;
2430    use hyperactor::channel::TcpMode;
2431    use hyperactor::clock::RealClock;
2432    use hyperactor::context::Mailbox as _;
2433    use hyperactor::host::ProcHandle;
2434    use hyperactor::id;
2435    use ndslice::Extent;
2436    use ndslice::ViewExt;
2437    use ndslice::extent;
2438    use tokio::process::Command;
2439
2440    use super::*;
2441    use crate::alloc::AllocSpec;
2442    use crate::alloc::Allocator;
2443    use crate::alloc::ProcessAllocator;
2444    use crate::v1::ActorMesh;
2445    use crate::v1::host_mesh::HostMesh;
2446    use crate::v1::testactor;
2447    use crate::v1::testing;
2448
2449    // Helper: Avoid repeating
2450    // `ChannelAddr::any(ChannelTransport::Unix)`.
2451    fn any_addr_for_test() -> ChannelAddr {
2452        ChannelAddr::any(ChannelTransport::Unix)
2453    }
2454
2455    #[test]
2456    fn test_bootstrap_mode_env_string_none_config_proc() {
2457        let values = [
2458            Bootstrap::default(),
2459            Bootstrap::Proc {
2460                proc_id: id!(foo[0]),
2461                backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
2462                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2463                socket_dir_path: PathBuf::from("notexist"),
2464                config: None,
2465            },
2466        ];
2467
2468        for value in values {
2469            let safe = value.to_env_safe_string().unwrap();
2470            let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2471
2472            // Re-encode and compare: deterministic round-trip of the
2473            // wire format.
2474            let safe2 = round.to_env_safe_string().unwrap();
2475            assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2476
2477            // Sanity: the decoded variant is what we expect.
2478            match (&value, &round) {
2479                (Bootstrap::Proc { config: None, .. }, Bootstrap::Proc { config: None, .. }) => {}
2480                (
2481                    Bootstrap::V0ProcMesh { config: None },
2482                    Bootstrap::V0ProcMesh { config: None },
2483                ) => {}
2484                _ => panic!("decoded variant mismatch: got {:?}", round),
2485            }
2486        }
2487    }
2488
2489    #[test]
2490    fn test_bootstrap_mode_env_string_none_config_host() {
2491        let value = Bootstrap::Host {
2492            addr: ChannelAddr::any(ChannelTransport::Unix),
2493            command: None,
2494            config: None,
2495        };
2496
2497        let safe = value.to_env_safe_string().unwrap();
2498        let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2499
2500        // Wire-format round-trip should be identical.
2501        let safe2 = round.to_env_safe_string().unwrap();
2502        assert_eq!(safe, safe2);
2503
2504        // Sanity: decoded variant is Host with None config.
2505        match round {
2506            Bootstrap::Host { config: None, .. } => {}
2507            other => panic!("expected Host with None config, got {:?}", other),
2508        }
2509    }
2510
2511    #[test]
2512    fn test_bootstrap_mode_env_string_invalid() {
2513        // Not valid base64
2514        assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2515    }
2516
2517    #[test]
2518    fn test_bootstrap_config_snapshot_roundtrip() {
2519        // Build a small, distinctive Attrs snapshot.
2520        let mut attrs = Attrs::new();
2521        attrs[MESH_TAIL_LOG_LINES] = 123;
2522        attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2523
2524        let socket_dir = runtime_dir().unwrap();
2525
2526        // Proc case
2527        {
2528            let original = Bootstrap::Proc {
2529                proc_id: id!(foo[42]),
2530                backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2531                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2532                config: Some(attrs.clone()),
2533                socket_dir_path: socket_dir.path().to_owned(),
2534            };
2535            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2536            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2537            match &decoded {
2538                Bootstrap::Proc { config, .. } => {
2539                    let cfg = config.as_ref().expect("expected Some(attrs)");
2540                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2541                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2542                }
2543                other => panic!("unexpected variant after roundtrip: {:?}", other),
2544            }
2545        }
2546
2547        // Host case
2548        {
2549            let original = Bootstrap::Host {
2550                addr: ChannelAddr::any(ChannelTransport::Unix),
2551                command: None,
2552                config: Some(attrs.clone()),
2553            };
2554            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2555            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2556            match &decoded {
2557                Bootstrap::Host { config, .. } => {
2558                    let cfg = config.as_ref().expect("expected Some(attrs)");
2559                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2560                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2561                }
2562                other => panic!("unexpected variant after roundtrip: {:?}", other),
2563            }
2564        }
2565
2566        // V0ProcMesh case
2567        {
2568            let original = Bootstrap::V0ProcMesh {
2569                config: Some(attrs.clone()),
2570            };
2571            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2572            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2573            match &decoded {
2574                Bootstrap::V0ProcMesh { config } => {
2575                    let cfg = config.as_ref().expect("expected Some(attrs)");
2576                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2577                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2578                }
2579                other => panic!("unexpected variant after roundtrip: {:?}", other),
2580            }
2581        }
2582    }
2583
2584    #[tokio::test]
2585    async fn test_child_terminated_on_manager_drop() {
2586        use std::path::PathBuf;
2587        use std::process::Stdio;
2588
2589        use tokio::process::Command;
2590
2591        // Manager; program path is irrelevant for this test.
2592        let command = BootstrapCommand {
2593            program: PathBuf::from("/bin/true"),
2594            ..Default::default()
2595        };
2596        let manager = BootstrapProcManager::new(command).unwrap();
2597
2598        // Spawn a long-running child process (sleep 30) with
2599        // kill_on_drop(true).
2600        let mut cmd = Command::new("/bin/sleep");
2601        cmd.arg("30")
2602            .stdout(Stdio::null())
2603            .stderr(Stdio::null())
2604            .kill_on_drop(true);
2605
2606        let child = cmd.spawn().expect("spawn sleep");
2607        let pid = child.id().expect("pid");
2608
2609        // Insert into the manager's children map (simulates a spawned
2610        // proc).
2611        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "test".to_string());
2612        {
2613            let mut children = manager.children.lock().await;
2614            children.insert(
2615                proc_id.clone(),
2616                BootstrapProcHandle::new(proc_id.clone(), child),
2617            );
2618        }
2619
2620        // (Linux-only) Verify the process exists before drop.
2621        #[cfg(target_os = "linux")]
2622        {
2623            let path = format!("/proc/{}", pid);
2624            assert!(
2625                std::fs::metadata(&path).is_ok(),
2626                "expected /proc/{pid} to exist before drop"
2627            );
2628        }
2629
2630        // Drop the manager. We don't rely on `Child::kill_on_drop`.
2631        // The exit monitor owns the Child; the manager's Drop uses
2632        // `pid_table` to best-effort SIGKILL any recorded PIDs. This
2633        // should terminate the child.
2634        drop(manager);
2635
2636        // Poll until the /proc entry disappears OR the state is
2637        // Zombie.
2638        #[cfg(target_os = "linux")]
2639        {
2640            let deadline = std::time::Instant::now() + Duration::from_millis(1500);
2641            let proc_dir = format!("/proc/{}", pid);
2642            let status_file = format!("{}/status", proc_dir);
2643
2644            let mut ok = false;
2645            while std::time::Instant::now() < deadline {
2646                match std::fs::read_to_string(&status_file) {
2647                    Ok(s) => {
2648                        if let Some(state_line) = s.lines().find(|l| l.starts_with("State:")) {
2649                            if state_line.contains('Z') {
2650                                // Only 'Z' (Zombie) is acceptable.
2651                                ok = true;
2652                                break;
2653                            } else {
2654                                // Still alive (e.g. 'R', 'S', etc.). Give it more time.
2655                            }
2656                        }
2657                    }
2658                    Err(_) => {
2659                        // /proc/<pid>/status no longer exists -> fully gone
2660                        ok = true;
2661                        break;
2662                    }
2663                }
2664                RealClock.sleep(Duration::from_millis(100)).await;
2665            }
2666
2667            assert!(ok, "expected /proc/{pid} to be gone or zombie after drop");
2668        }
2669
2670        // On non-Linux, absence of panics/hangs is the signal; PID
2671        // probing is platform-specific.
2672    }
2673
2674    #[tokio::test]
2675    async fn test_v1_child_logging() {
2676        use hyperactor::channel;
2677        use hyperactor::mailbox::BoxedMailboxSender;
2678        use hyperactor::mailbox::DialMailboxRouter;
2679        use hyperactor::mailbox::MailboxServer;
2680        use hyperactor::proc::Proc;
2681
2682        use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2683        use crate::logging::LogClientActor;
2684        use crate::logging::LogClientMessageClient;
2685        use crate::logging::LogForwardActor;
2686        use crate::logging::LogMessage;
2687        use crate::logging::OutputTarget;
2688        use crate::logging::test_tap;
2689
2690        let router = DialMailboxRouter::new();
2691        let (proc_addr, proc_rx) =
2692            channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2693        let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
2694        proc.clone().serve(proc_rx);
2695        router.bind(id!(client[0]).into(), proc_addr.clone());
2696        let (client, _handle) = proc.instance("client").unwrap();
2697
2698        let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2699        test_tap::install(tap_tx);
2700
2701        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2702        // SAFETY: unit-test scoped env var
2703        unsafe {
2704            std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2705        }
2706
2707        // Spawn the log client and disable aggregation (immediate
2708        // print + tap push).
2709        let log_client_actor = LogClientActor::new(()).await.unwrap();
2710        let log_client: ActorRef<LogClientActor> =
2711            proc.spawn("log_client", log_client_actor).unwrap().bind();
2712        log_client.set_aggregate(&client, None).await.unwrap();
2713
2714        // Spawn the forwarder in this proc (it will serve
2715        // BOOTSTRAP_LOG_CHANNEL).
2716        let log_forwarder_actor = LogForwardActor::new(log_client.clone()).await.unwrap();
2717        let _log_forwarder: ActorRef<LogForwardActor> = proc
2718            .spawn("log_forwarder", log_forwarder_actor)
2719            .unwrap()
2720            .bind();
2721
2722        // Dial the channel but don't post until we know the forwarder
2723        // is receiving.
2724        let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2725
2726        // Send a fake log message as if it came from the proc
2727        // manager's writer.
2728        tx.post(LogMessage::Log {
2729            hostname: "testhost".into(),
2730            pid: 12345,
2731            output_target: OutputTarget::Stdout,
2732            payload: wirevalue::Any::serialize(&"hello from child".to_string()).unwrap(),
2733        });
2734
2735        // Assert we see it via the tap.
2736        let line = RealClock
2737            .timeout(Duration::from_secs(2), tap_rx.recv())
2738            .await
2739            .expect("timed out waiting for log line")
2740            .expect("tap channel closed unexpectedly");
2741        assert!(
2742            line.contains("hello from child"),
2743            "log line did not appear via LogClientActor; got: {line}"
2744        );
2745    }
2746
2747    mod proc_handle {
2748
2749        use hyperactor::ActorId;
2750        use hyperactor::ActorRef;
2751        use hyperactor::ProcId;
2752        use hyperactor::WorldId;
2753        use hyperactor::host::ProcHandle;
2754        use tokio::process::Command;
2755
2756        use super::super::*;
2757        use super::any_addr_for_test;
2758
2759        // Helper: build a ProcHandle with a short-lived child
2760        // process. We don't rely on the actual process; we only
2761        // exercise the status transitions.
2762        fn handle_for_test() -> BootstrapProcHandle {
2763            // Spawn a trivial child that exits immediately.
2764            let child = Command::new("sh")
2765                .arg("-c")
2766                .arg("exit 0")
2767                .stdin(std::process::Stdio::null())
2768                .stdout(std::process::Stdio::null())
2769                .stderr(std::process::Stdio::null())
2770                .spawn()
2771                .expect("failed to spawn test child process");
2772
2773            let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
2774            BootstrapProcHandle::new(proc_id, child)
2775        }
2776
2777        #[tokio::test]
2778        async fn starting_to_running_ok() {
2779            let h = handle_for_test();
2780            assert!(matches!(h.status(), ProcStatus::Starting));
2781            let child_pid = h.pid().expect("child should have a pid");
2782            let child_started_at = RealClock.system_time_now();
2783            assert!(h.mark_running(child_pid, child_started_at));
2784            match h.status() {
2785                ProcStatus::Running { pid, started_at } => {
2786                    assert_eq!(pid, child_pid);
2787                    assert_eq!(started_at, child_started_at);
2788                }
2789                other => panic!("expected Running, got {other:?}"),
2790            }
2791        }
2792
2793        #[tokio::test]
2794        async fn running_to_stopping_to_stopped_ok() {
2795            let h = handle_for_test();
2796            let child_pid = h.pid().expect("child should have a pid");
2797            let child_started_at = RealClock.system_time_now();
2798            assert!(h.mark_running(child_pid, child_started_at));
2799            assert!(h.mark_stopping());
2800            assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2801            assert!(h.mark_stopped(0, Vec::new()));
2802            assert!(matches!(
2803                h.status(),
2804                ProcStatus::Stopped { exit_code: 0, .. }
2805            ));
2806        }
2807
2808        #[tokio::test]
2809        async fn running_to_killed_ok() {
2810            let h = handle_for_test();
2811            let child_pid = h.pid().expect("child should have a pid");
2812            let child_started_at = RealClock.system_time_now();
2813            assert!(h.mark_running(child_pid, child_started_at));
2814            assert!(h.mark_killed(9, true));
2815            assert!(matches!(
2816                h.status(),
2817                ProcStatus::Killed {
2818                    signal: 9,
2819                    core_dumped: true
2820                }
2821            ));
2822        }
2823
2824        #[tokio::test]
2825        async fn running_to_failed_ok() {
2826            let h = handle_for_test();
2827            let child_pid = h.pid().expect("child should have a pid");
2828            let child_started_at = RealClock.system_time_now();
2829            assert!(h.mark_running(child_pid, child_started_at));
2830            assert!(h.mark_failed("bootstrap error"));
2831            match h.status() {
2832                ProcStatus::Failed { reason } => {
2833                    assert_eq!(reason, "bootstrap error");
2834                }
2835                other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2836            }
2837        }
2838
2839        #[tokio::test]
2840        async fn illegal_transitions_are_rejected() {
2841            let h = handle_for_test();
2842            let child_pid = h.pid().expect("child should have a pid");
2843            let child_started_at = RealClock.system_time_now();
2844            // Starting -> Running is fine; second Running should be rejected.
2845            assert!(h.mark_running(child_pid, child_started_at));
2846            assert!(!h.mark_running(child_pid, RealClock.system_time_now()));
2847            match h.status() {
2848                ProcStatus::Running { pid, .. } => assert_eq!(pid, child_pid),
2849                other => panic!("expected Running, got {other:?}"),
2850            }
2851            // Once Stopped, we can't go to Running/Killed/Failed/etc.
2852            assert!(h.mark_stopping());
2853            assert!(h.mark_stopped(0, Vec::new()));
2854            assert!(!h.mark_running(child_pid, child_started_at));
2855            assert!(!h.mark_killed(9, false));
2856            assert!(!h.mark_failed("nope"));
2857
2858            assert!(matches!(
2859                h.status(),
2860                ProcStatus::Stopped { exit_code: 0, .. }
2861            ));
2862        }
2863
2864        #[tokio::test]
2865        async fn transitions_from_ready_are_legal() {
2866            let h = handle_for_test();
2867            let addr = any_addr_for_test();
2868            // Mark Running.
2869            let pid = h.pid().expect("child should have a pid");
2870            let t0 = RealClock.system_time_now();
2871            assert!(h.mark_running(pid, t0));
2872            // Build a consistent AgentRef for Ready using the
2873            // handle's ProcId.
2874            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2875            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2876            let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2877            // Ready -> Stopping -> Stopped should be legal.
2878            assert!(h.mark_ready(pid, t0, addr, agent_ref));
2879            assert!(h.mark_stopping());
2880            assert!(h.mark_stopped(0, Vec::new()));
2881        }
2882
2883        #[tokio::test]
2884        async fn ready_to_killed_is_legal() {
2885            let h = handle_for_test();
2886            let addr = any_addr_for_test();
2887            // Starting -> Running
2888            let pid = h.pid().expect("child should have a pid");
2889            let t0 = RealClock.system_time_now();
2890            assert!(h.mark_running(pid, t0));
2891            // Build a consistent AgentRef for Ready using the
2892            // handle's ProcId.
2893            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2894            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2895            let agent: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2896            // Running -> Ready
2897            assert!(h.mark_ready(pid, t0, addr, agent));
2898            // Ready -> Killed
2899            assert!(h.mark_killed(9, false));
2900        }
2901
2902        #[tokio::test]
2903        async fn mark_stopping_from_starting_uses_child_pid_when_available() {
2904            let h = handle_for_test();
2905
2906            // In `Starting`, we should still be able to see the PID via
2907            // Child::id().
2908            let child_pid = h
2909                .pid()
2910                .expect("Child::id() should be available in Starting");
2911
2912            // mark_stopping() should transition to Stopping{pid,
2913            // started_at} and stash the same pid we observed.
2914            assert!(
2915                h.mark_stopping(),
2916                "mark_stopping() should succeed from Starting"
2917            );
2918            match h.status() {
2919                ProcStatus::Stopping { pid, started_at } => {
2920                    assert_eq!(pid, child_pid, "Stopping pid should come from Child::id()");
2921                    assert!(
2922                        started_at <= RealClock.system_time_now(),
2923                        "started_at should be sane"
2924                    );
2925                }
2926                other => panic!("expected Stopping{{..}}; got {other:?}"),
2927            }
2928        }
2929
2930        #[tokio::test]
2931        async fn mark_stopping_noop_when_no_child_pid_available() {
2932            let h = handle_for_test();
2933
2934            // Simulate the exit-monitor having already taken the
2935            // Child so there is no Child::id() and we haven't cached
2936            // a pid yet.
2937            {
2938                let _ = h.child.lock().expect("child mutex").take();
2939            }
2940
2941            // With no observable pid, mark_stopping() must no-op and
2942            // leave us in Starting.
2943            assert!(
2944                !h.mark_stopping(),
2945                "mark_stopping() should no-op from Starting when no pid is observable"
2946            );
2947            assert!(matches!(h.status(), ProcStatus::Starting));
2948        }
2949
2950        #[tokio::test]
2951        async fn mark_failed_from_stopping_is_allowed() {
2952            let h = handle_for_test();
2953
2954            // Drive Starting -> Stopping (pid available via
2955            // Child::id()).
2956            assert!(h.mark_stopping(), "precondition: to Stopping");
2957
2958            // Now allow Stopping -> Failed.
2959            assert!(
2960                h.mark_failed("boom"),
2961                "mark_failed() should succeed from Stopping"
2962            );
2963            match h.status() {
2964                ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2965                other => panic!("expected Failed(\"boom\"), got {other:?}"),
2966            }
2967        }
2968    }
2969
2970    #[tokio::test]
2971    async fn test_exit_monitor_updates_status_on_clean_exit() {
2972        let command = BootstrapCommand {
2973            program: PathBuf::from("/bin/true"),
2974            ..Default::default()
2975        };
2976        let manager = BootstrapProcManager::new(command).unwrap();
2977
2978        // Spawn a fast-exiting child.
2979        let mut cmd = Command::new("true");
2980        cmd.stdout(Stdio::null()).stderr(Stdio::null());
2981        let child = cmd.spawn().expect("spawn true");
2982
2983        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "clean".into());
2984        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2985
2986        // Put into manager & start monitor.
2987        {
2988            let mut children = manager.children.lock().await;
2989            children.insert(proc_id.clone(), handle.clone());
2990        }
2991        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2992        {
2993            let guard = handle.child.lock().expect("child mutex");
2994            assert!(
2995                guard.is_none(),
2996                "expected Child to be taken by exit monitor"
2997            );
2998        }
2999
3000        let st = handle.wait_inner().await;
3001        assert!(matches!(st, ProcStatus::Stopped { .. }), "status={st:?}");
3002    }
3003
3004    #[tokio::test]
3005    async fn test_exit_monitor_updates_status_on_kill() {
3006        let command = BootstrapCommand {
3007            program: PathBuf::from("/bin/sleep"),
3008            ..Default::default()
3009        };
3010        let manager = BootstrapProcManager::new(command).unwrap();
3011
3012        // Spawn a process that will live long enough to kill.
3013        let mut cmd = Command::new("/bin/sleep");
3014        cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
3015        let child = cmd.spawn().expect("spawn sleep");
3016        let pid = child.id().expect("pid") as i32;
3017
3018        // Register with the manager and start the exit monitor.
3019        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "killed".into());
3020        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3021        {
3022            let mut children = manager.children.lock().await;
3023            children.insert(proc_id.clone(), handle.clone());
3024        }
3025        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3026        {
3027            let guard = handle.child.lock().expect("child mutex");
3028            assert!(
3029                guard.is_none(),
3030                "expected Child to be taken by exit monitor"
3031            );
3032        }
3033        // Send SIGKILL to the process.
3034        #[cfg(unix)]
3035        // SAFETY: We own the child process we just spawned and have
3036        // its PID.
3037        // Sending SIGKILL is safe here because:
3038        //  - the PID comes directly from `child.id()`
3039        //  - the process is guaranteed to be in our test scope
3040        //  - we don't touch any memory via this call, only signal the
3041        //    OS
3042        unsafe {
3043            libc::kill(pid, libc::SIGKILL);
3044        }
3045
3046        let st = handle.wait_inner().await;
3047        match st {
3048            ProcStatus::Killed { signal, .. } => assert_eq!(signal, libc::SIGKILL),
3049            other => panic!("expected Killed(SIGKILL), got {other:?}"),
3050        }
3051    }
3052
3053    #[tokio::test]
3054    async fn watch_notifies_on_status_changes() {
3055        let child = Command::new("sh")
3056            .arg("-c")
3057            .arg("sleep 0.1")
3058            .stdin(std::process::Stdio::null())
3059            .stdout(std::process::Stdio::null())
3060            .stderr(std::process::Stdio::null())
3061            .spawn()
3062            .expect("spawn");
3063
3064        let proc_id = ProcId::Ranked(WorldId("test".into()), 1);
3065        let handle = BootstrapProcHandle::new(proc_id, child);
3066        let mut rx = handle.watch();
3067
3068        // Starting -> Running
3069        let pid = handle.pid().unwrap_or(0);
3070        let now = RealClock.system_time_now();
3071        assert!(handle.mark_running(pid, now));
3072        rx.changed().await.ok(); // Observe the transition.
3073        match &*rx.borrow() {
3074            ProcStatus::Running { pid: p, started_at } => {
3075                assert_eq!(*p, pid);
3076                assert_eq!(*started_at, now);
3077            }
3078            s => panic!("expected Running, got {s:?}"),
3079        }
3080
3081        // Running -> Stopped
3082        assert!(handle.mark_stopped(0, Vec::new()));
3083        rx.changed().await.ok(); // Observe the transition.
3084        assert!(matches!(
3085            &*rx.borrow(),
3086            ProcStatus::Stopped { exit_code: 0, .. }
3087        ));
3088    }
3089
3090    #[tokio::test]
3091    async fn ready_errs_if_process_exits_before_running() {
3092        // Child exits immediately.
3093        let child = Command::new("sh")
3094            .arg("-c")
3095            .arg("exit 7")
3096            .stdin(std::process::Stdio::null())
3097            .stdout(std::process::Stdio::null())
3098            .stderr(std::process::Stdio::null())
3099            .spawn()
3100            .expect("spawn");
3101
3102        let proc_id = ProcId::Direct(
3103            ChannelAddr::any(ChannelTransport::Unix),
3104            "early-exit".into(),
3105        );
3106        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3107
3108        // Simulate the exit monitor doing its job directly here.
3109        // (Equivalent outcome: terminal state before Running.)
3110        assert!(handle.mark_stopped(7, Vec::new()));
3111
3112        // `ready()` should return Err with the terminal status.
3113        match handle.ready_inner().await {
3114            Ok(()) => panic!("ready() unexpectedly succeeded"),
3115            Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
3116                assert_eq!(exit_code, 7)
3117            }
3118            Err(other) => panic!("expected Stopped(7), got {other:?}"),
3119        }
3120    }
3121
3122    #[tokio::test]
3123    async fn status_unknown_proc_is_none() {
3124        let manager = BootstrapProcManager::new(BootstrapCommand {
3125            program: PathBuf::from("/bin/true"),
3126            ..Default::default()
3127        })
3128        .unwrap();
3129        let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
3130        assert!(manager.status(&unknown).await.is_none());
3131    }
3132
3133    #[tokio::test]
3134    async fn exit_monitor_child_already_taken_leaves_status_unchanged() {
3135        let manager = BootstrapProcManager::new(BootstrapCommand {
3136            program: PathBuf::from("/bin/sleep"),
3137            ..Default::default()
3138        })
3139        .unwrap();
3140
3141        // Long-ish child so it's alive while we "steal" it.
3142        let mut cmd = Command::new("/bin/sleep");
3143        cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
3144        let child = cmd.spawn().expect("spawn sleep");
3145
3146        let proc_id = ProcId::Direct(
3147            ChannelAddr::any(ChannelTransport::Unix),
3148            "already-taken".into(),
3149        );
3150        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3151
3152        // Insert, then deliberately consume the Child before starting
3153        // the monitor.
3154        {
3155            let mut children = manager.children.lock().await;
3156            children.insert(proc_id.clone(), handle.clone());
3157        }
3158        {
3159            let _ = handle.child.lock().expect("child mutex").take();
3160        }
3161
3162        // This should not panic; monitor will see None and bail.
3163        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3164
3165        // Status should remain whatever it was (Starting in this
3166        // setup).
3167        assert!(matches!(
3168            manager.status(&proc_id).await,
3169            Some(ProcStatus::Starting)
3170        ));
3171    }
3172
3173    #[tokio::test]
3174    async fn pid_none_after_exit_monitor_takes_child() {
3175        let manager = BootstrapProcManager::new(BootstrapCommand {
3176            program: PathBuf::from("/bin/sleep"),
3177            ..Default::default()
3178        })
3179        .unwrap();
3180
3181        let mut cmd = Command::new("/bin/sleep");
3182        cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
3183        let child = cmd.spawn().expect("spawn sleep");
3184
3185        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "pid-none".into());
3186        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3187
3188        // Before monitor: we should still be able to read a pid.
3189        assert!(handle.pid().is_some());
3190        {
3191            let mut children = manager.children.lock().await;
3192            children.insert(proc_id.clone(), handle.clone());
3193        }
3194        // `spawn_exit_monitor` takes the Child synchronously before
3195        // spawning the task.
3196        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3197
3198        // Immediately after, the handle should no longer expose a pid
3199        // (Child is gone).
3200        assert!(handle.pid().is_none());
3201    }
3202
3203    #[tokio::test]
3204    async fn starting_may_directly_be_marked_stopped() {
3205        // Unit-level transition check for the "process exited very
3206        // fast" case.
3207        let child = Command::new("sh")
3208            .arg("-c")
3209            .arg("exit 0")
3210            .stdin(std::process::Stdio::null())
3211            .stdout(std::process::Stdio::null())
3212            .stderr(std::process::Stdio::null())
3213            .spawn()
3214            .expect("spawn true");
3215
3216        let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "fast-exit".into());
3217        let handle = BootstrapProcHandle::new(proc_id, child);
3218
3219        // Take the child (don't hold the mutex across an await).
3220        let mut c = {
3221            let mut guard = handle.child.lock().expect("child mutex");
3222            guard.take()
3223        }
3224        .expect("child already taken");
3225
3226        let status = c.wait().await.expect("wait");
3227        let code = status.code().unwrap_or(0);
3228        assert!(handle.mark_stopped(code, Vec::new()));
3229
3230        assert!(matches!(
3231            handle.status(),
3232            ProcStatus::Stopped { exit_code: 0, .. }
3233        ));
3234    }
3235
3236    #[tokio::test]
3237    async fn handle_ready_allows_waiters() {
3238        let child = Command::new("sh")
3239            .arg("-c")
3240            .arg("sleep 0.1")
3241            .stdin(std::process::Stdio::null())
3242            .stdout(std::process::Stdio::null())
3243            .stderr(std::process::Stdio::null())
3244            .spawn()
3245            .expect("spawn sleep");
3246        let proc_id = ProcId::Ranked(WorldId("test".into()), 42);
3247        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3248
3249        let pid = handle.pid().expect("child should have a pid");
3250        let started_at = RealClock.system_time_now();
3251        assert!(handle.mark_running(pid, started_at));
3252
3253        let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3254        let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
3255
3256        // Pick any addr to carry in Ready (what the child would have
3257        // called back with).
3258        let ready_addr = any_addr_for_test();
3259
3260        // Stamp Ready and assert ready().await unblocks.
3261        assert!(handle.mark_ready(pid, started_at, ready_addr.clone(), agent_ref));
3262        handle
3263            .ready_inner()
3264            .await
3265            .expect("ready_inner() should complete after Ready");
3266
3267        // Sanity-check the Ready fields we control
3268        // (pid/started_at/addr).
3269        match handle.status() {
3270            ProcStatus::Ready {
3271                pid: p,
3272                started_at: t,
3273                addr: a,
3274                ..
3275            } => {
3276                assert_eq!(p, pid);
3277                assert_eq!(t, started_at);
3278                assert_eq!(a, ready_addr);
3279            }
3280            other => panic!("expected Ready, got {other:?}"),
3281        }
3282    }
3283
3284    #[tokio::test]
3285    async fn pid_behavior_across_states_running_ready_then_stopped() {
3286        let child = Command::new("sh")
3287            .arg("-c")
3288            .arg("sleep 0.1")
3289            .stdin(std::process::Stdio::null())
3290            .stdout(std::process::Stdio::null())
3291            .stderr(std::process::Stdio::null())
3292            .spawn()
3293            .expect("spawn");
3294
3295        let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
3296        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3297
3298        // Running → pid() Some
3299        let pid = handle.pid().expect("initial Child::id");
3300        let t0 = RealClock.system_time_now();
3301        assert!(handle.mark_running(pid, t0));
3302        assert_eq!(handle.pid(), Some(pid));
3303
3304        // Ready → pid() still Some even if Child taken
3305        let addr = any_addr_for_test();
3306        let agent = {
3307            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3308            ActorRef::<ProcMeshAgent>::attest(actor_id)
3309        };
3310        assert!(handle.mark_ready(pid, t0, addr, agent));
3311        {
3312            let _ = handle.child.lock().expect("child mutex").take();
3313        }
3314        assert_eq!(handle.pid(), Some(pid));
3315
3316        // Terminal (Stopped) → pid() None
3317        assert!(handle.mark_stopped(0, Vec::new()));
3318        assert_eq!(handle.pid(), None, "pid() should be None once terminal");
3319    }
3320
3321    #[tokio::test]
3322    async fn pid_is_available_in_ready_even_after_child_taken() {
3323        let child = Command::new("sh")
3324            .arg("-c")
3325            .arg("sleep 0.1")
3326            .stdin(std::process::Stdio::null())
3327            .stdout(std::process::Stdio::null())
3328            .stderr(std::process::Stdio::null())
3329            .spawn()
3330            .expect("spawn");
3331
3332        let proc_id = ProcId::Ranked(WorldId("test".into()), 99);
3333        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3334
3335        // Mark Running.
3336        let pid = handle.pid().expect("child should have pid (via Child::id)");
3337        let started_at = RealClock.system_time_now();
3338        assert!(handle.mark_running(pid, started_at));
3339
3340        // Stamp Ready with synthetic addr/agent (attested).
3341        let addr = any_addr_for_test();
3342        let agent = {
3343            let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3344            ActorRef::<ProcMeshAgent>::attest(actor_id)
3345        };
3346        assert!(handle.mark_ready(pid, started_at, addr, agent));
3347
3348        // Simulate exit monitor taking the Child (so Child::id() is
3349        // no longer available).
3350        {
3351            let _ = handle.child.lock().expect("child mutex").take();
3352        }
3353
3354        // **Regression check**: pid() must still return the cached
3355        // pid in Ready.
3356        assert_eq!(handle.pid(), Some(pid), "pid() should be cached in Ready");
3357    }
3358
3359    #[test]
3360    fn display_running_includes_pid_and_uptime() {
3361        let started_at = RealClock.system_time_now() - Duration::from_secs(42);
3362        let st = ProcStatus::Running {
3363            pid: 1234,
3364            started_at,
3365        };
3366
3367        let s = format!("{}", st);
3368        assert!(s.contains("1234"));
3369        assert!(s.contains("Running"));
3370        assert!(s.contains("42s"));
3371    }
3372
3373    #[test]
3374    fn display_ready_includes_pid_and_addr() {
3375        let started_at = RealClock.system_time_now() - Duration::from_secs(5);
3376        let addr = ChannelAddr::any(ChannelTransport::Unix);
3377        let agent =
3378            ActorRef::attest(ProcId::Direct(addr.clone(), "proc".into()).actor_id("agent", 0));
3379
3380        let st = ProcStatus::Ready {
3381            pid: 4321,
3382            started_at,
3383            addr: addr.clone(),
3384            agent,
3385        };
3386
3387        let s = format!("{}", st);
3388        assert!(s.contains("4321")); // pid
3389        assert!(s.contains(&addr.to_string())); // addr
3390        assert!(s.contains("Ready"));
3391    }
3392
3393    #[test]
3394    fn display_stopped_includes_exit_code() {
3395        let st = ProcStatus::Stopped {
3396            exit_code: 7,
3397            stderr_tail: Vec::new(),
3398        };
3399        let s = format!("{}", st);
3400        assert!(s.contains("Stopped"));
3401        assert!(s.contains("7"));
3402    }
3403
3404    #[test]
3405    fn display_other_variants_does_not_panic() {
3406        let samples = vec![
3407            ProcStatus::Starting,
3408            ProcStatus::Stopping {
3409                pid: 42,
3410                started_at: RealClock.system_time_now(),
3411            },
3412            ProcStatus::Ready {
3413                pid: 42,
3414                started_at: RealClock.system_time_now(),
3415                addr: ChannelAddr::any(ChannelTransport::Unix),
3416                agent: ActorRef::attest(
3417                    ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "x".into())
3418                        .actor_id("agent", 0),
3419                ),
3420            },
3421            ProcStatus::Killed {
3422                signal: 9,
3423                core_dumped: false,
3424            },
3425            ProcStatus::Failed {
3426                reason: "boom".into(),
3427            },
3428        ];
3429
3430        for st in samples {
3431            let _ = format!("{}", st); // Just make sure it doesn't panic.
3432        }
3433    }
3434
3435    #[tokio::test]
3436    async fn proc_handle_ready_ok_through_trait() {
3437        let child = Command::new("sh")
3438            .arg("-c")
3439            .arg("sleep 0.1")
3440            .stdin(Stdio::null())
3441            .stdout(Stdio::null())
3442            .stderr(Stdio::null())
3443            .spawn()
3444            .expect("spawn");
3445
3446        let proc_id = ProcId::Direct(any_addr_for_test(), "ph-ready-ok".into());
3447        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3448
3449        // Starting -> Running
3450        let pid = handle.pid().expect("pid");
3451        let t0 = RealClock.system_time_now();
3452        assert!(handle.mark_running(pid, t0));
3453
3454        // Synthesize Ready data
3455        let addr = any_addr_for_test();
3456        let agent: ActorRef<ProcMeshAgent> =
3457            ActorRef::attest(ActorId(proc_id.clone(), "agent".into(), 0));
3458        assert!(handle.mark_ready(pid, t0, addr, agent));
3459
3460        // Call the trait method (not ready_inner).
3461        let r = <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await;
3462        assert!(r.is_ok(), "expected Ok(()), got {r:?}");
3463    }
3464
3465    #[tokio::test]
3466    async fn proc_handle_wait_returns_terminal_status() {
3467        let child = Command::new("sh")
3468            .arg("-c")
3469            .arg("exit 0")
3470            .stdin(Stdio::null())
3471            .stdout(Stdio::null())
3472            .stderr(Stdio::null())
3473            .spawn()
3474            .expect("spawn");
3475
3476        let proc_id = ProcId::Direct(any_addr_for_test(), "ph-wait".into());
3477        let handle = BootstrapProcHandle::new(proc_id, child);
3478
3479        // Drive directly to a terminal state before calling wait()
3480        assert!(handle.mark_stopped(0, Vec::new()));
3481
3482        // Call the trait method (not wait_inner)
3483        let st = <BootstrapProcHandle as hyperactor::host::ProcHandle>::wait(&handle)
3484            .await
3485            .expect("wait should return Ok(terminal)");
3486
3487        match st {
3488            ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
3489            other => panic!("expected Stopped(0), got {other:?}"),
3490        }
3491    }
3492
3493    #[tokio::test]
3494    async fn ready_wrapper_maps_terminal_to_trait_error() {
3495        let child = Command::new("sh")
3496            .arg("-c")
3497            .arg("exit 7")
3498            .stdin(Stdio::null())
3499            .stdout(Stdio::null())
3500            .stderr(Stdio::null())
3501            .spawn()
3502            .expect("spawn");
3503        let proc_id = ProcId::Direct(any_addr_for_test(), "wrap".into());
3504        let handle = BootstrapProcHandle::new(proc_id, child);
3505
3506        assert!(handle.mark_stopped(7, Vec::new()));
3507
3508        match <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await {
3509            Ok(()) => panic!("expected Err"),
3510            Err(hyperactor::host::ReadyError::Terminal(ProcStatus::Stopped {
3511                exit_code, ..
3512            })) => {
3513                assert_eq!(exit_code, 7);
3514            }
3515            Err(e) => panic!("unexpected error: {e:?}"),
3516        }
3517    }
3518
3519    /// Create a ProcId and a host **backend_addr** channel that the
3520    /// bootstrap child proc will dial to attach its mailbox to the
3521    /// host.
3522    ///
3523    /// - `proc_id`: logical identity for the child proc (pure name;
3524    ///   not an OS pid).
3525    /// - `backend_addr`: a mailbox address served by the **parent
3526    ///   (host) proc** here; the spawned bootstrap process dials this
3527    ///   so its messages route via the host.
3528    async fn make_proc_id_and_backend_addr(
3529        instance: &hyperactor::Instance<()>,
3530        _tag: &str,
3531    ) -> (ProcId, ChannelAddr) {
3532        // Serve a Unix channel as the "backend_addr" and hook it into
3533        // this test proc.
3534        let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
3535
3536        // Route messages arriving on backend_addr into this test
3537        // proc's mailbox so the bootstrap child can reach the host
3538        // router.
3539        instance.proc().clone().serve(rx);
3540
3541        // We return an arbitrary (but unbound!) unix direct proc id here;
3542        // it is okay, as we're not testing connectivity.
3543        let proc_id = ProcId::Direct(ChannelTransport::Unix.any(), "test".to_string());
3544        (proc_id, backend_addr)
3545    }
3546
3547    #[tokio::test]
3548    #[cfg(fbcode_build)]
3549    async fn bootstrap_handle_terminate_graceful() {
3550        // Create a root direct-addressed proc + client instance.
3551        let root =
3552            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3553        let (instance, _handle) = root.instance("client").unwrap();
3554
3555        let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3556        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3557        let handle = mgr
3558            .spawn(
3559                proc_id.clone(),
3560                backend_addr.clone(),
3561                BootstrapProcConfig {
3562                    create_rank: 0,
3563                    client_config_override: Attrs::new(),
3564                },
3565            )
3566            .await
3567            .expect("spawn bootstrap child");
3568
3569        handle.ready().await.expect("ready");
3570
3571        let deadline = Duration::from_secs(2);
3572        match RealClock
3573            .timeout(deadline * 2, handle.terminate(&instance, deadline))
3574            .await
3575        {
3576            Err(_) => panic!("terminate() future hung"),
3577            Ok(Ok(st)) => {
3578                match st {
3579                    ProcStatus::Stopped { exit_code, .. } => {
3580                        // child called exit(0) on SIGTERM
3581                        assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3582                    }
3583                    ProcStatus::Killed { signal, .. } => {
3584                        // If the child didn't trap SIGTERM, we'd see
3585                        // SIGTERM (15) here and indeed, this is what
3586                        // we see. Since we call
3587                        // `hyperactor::initialize_with_current_runtime();`
3588                        // we seem unable to trap `SIGTERM` and
3589                        // instead folly intercepts:
3590                        // [0] *** Aborted at 1758850539 (Unix time, try 'date -d @1758850539') ***
3591                        // [0] *** Signal 15 (SIGTERM) (0x3951c00173692) received by PID 1527420 (pthread TID 0x7f803de66cc0) (linux TID 1527420) (maybe from PID 1521298, UID 234780) (code: 0), stack trace: ***
3592                        // [0]     @ 000000000000e713 folly::symbolizer::(anonymous namespace)::innerSignalHandler(int, siginfo_t*, void*)
3593                        // [0]                        ./fbcode/folly/debugging/symbolizer/SignalHandler.cpp:485
3594                        // It gets worse. When run with
3595                        // '@fbcode//mode/dev-nosan' it terminates
3596                        // with a SEGFAULT (metamate says this is a
3597                        // well known issue at Meta). So, TL;DR I
3598                        // restore default `SIGTERM` handling after
3599                        // the test exe has called
3600                        // `initialize_with_runtime`.
3601                        assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3602                    }
3603                    other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3604                }
3605            }
3606            Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3607        }
3608    }
3609
3610    #[tokio::test]
3611    #[cfg(fbcode_build)]
3612    async fn bootstrap_handle_kill_forced() {
3613        // Root proc + client instance (so the child can dial back).
3614        let root =
3615            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3616        let (instance, _handle) = root.instance("client").unwrap();
3617
3618        let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3619
3620        // Proc identity + host backend channel the child will dial.
3621        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3622
3623        // Launch the child bootstrap process.
3624        let handle = mgr
3625            .spawn(
3626                proc_id.clone(),
3627                backend_addr.clone(),
3628                BootstrapProcConfig {
3629                    create_rank: 0,
3630                    client_config_override: Attrs::new(),
3631                },
3632            )
3633            .await
3634            .expect("spawn bootstrap child");
3635
3636        // Wait until the child reports Ready (addr+agent returned via
3637        // callback).
3638        handle.ready().await.expect("ready");
3639
3640        // Force-kill the child and assert we observe a Killed
3641        // terminal status.
3642        let deadline = Duration::from_secs(5);
3643        match RealClock.timeout(deadline, handle.kill()).await {
3644            Err(_) => panic!("kill() future hung"),
3645            Ok(Ok(st)) => {
3646                // We expect a KILLED terminal state.
3647                match st {
3648                    ProcStatus::Killed { signal, .. } => {
3649                        // On Linux this should be SIGKILL (9).
3650                        assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3651                    }
3652                    other => panic!("expected Killed status after kill(); got: {other:?}"),
3653                }
3654            }
3655            Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3656        }
3657    }
3658
3659    #[tokio::test]
3660    #[cfg(fbcode_build)]
3661    async fn bootstrap_canonical_simple() {
3662        // SAFETY: unit-test scoped
3663        unsafe {
3664            std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false");
3665        }
3666        // Create an actor instance we'll use to send and receive
3667        // messages.
3668        let instance = testing::instance();
3669
3670        // Configure a ProcessAllocator with the bootstrap binary.
3671        let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3672            "monarch/hyperactor_mesh/bootstrap",
3673        )));
3674        // Request a new allocation of procs from the ProcessAllocator.
3675        let alloc = allocator
3676            .allocate(AllocSpec {
3677                extent: extent!(replicas = 1),
3678                constraints: Default::default(),
3679                proc_name: None,
3680                transport: ChannelTransport::Unix,
3681                proc_allocation_mode: Default::default(),
3682            })
3683            .await
3684            .unwrap();
3685
3686        // Build a HostMesh with explicit OS-process boundaries (per
3687        // rank):
3688        //
3689        // (1) Allocator → bootstrap proc [OS process #1]
3690        //     `ProcMesh::allocate(..)` starts one OS process per
3691        //     rank; each runs our runtime and the trampoline actor.
3692        //
3693        // (2) Host::serve(..) sets up a Host in the same OS process
3694        //     (no new process). It binds front/back channels, creates
3695        //     an in-process service proc (`Proc::new(..)`), and
3696        //     stores the `BootstrapProcManager` for later spawns.
3697        //
3698        // (3) Install HostMeshAgent (still no new OS process).
3699        //     `host.system_proc().spawn::<HostMeshAgent>("agent",
3700        //     host).await?` creates the HostMeshAgent actor in that
3701        //     service proc.
3702        //
3703        // (4) Collect & assemble. The trampoline returns a
3704        //     direct-addressed `ActorRef<HostMeshAgent>`; we collect
3705        //     one per rank and assemble a `HostMesh`.
3706        //
3707        // Note: When the Host is later asked to start a proc
3708        // (`host.spawn(name)`), it calls `ProcManager::spawn` on the
3709        // stored `BootstrapProcManager`, which does a
3710        // `Command::spawn()` to launch a new OS child process for
3711        // that proc.
3712        let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3713            .await
3714            .unwrap();
3715
3716        // Spawn a ProcMesh named "p0" on the host mesh:
3717        //
3718        // (1) Each HostMeshAgent (running inside its host's service
3719        // proc) receives the request.
3720        //
3721        // (2) The Host calls into its `BootstrapProcManager::spawn`,
3722        // which does `Command::spawn()` to launch a brand-new OS
3723        // process for the proc.
3724        //
3725        // (3) Inside that new process, bootstrap runs and a
3726        // `ProcMeshAgent` is started to manage it.
3727        //
3728        // (4) We collect the per-host procs into a `ProcMesh` and
3729        // return it.
3730        let proc_mesh = host_mesh
3731            .spawn(&instance, "p0", Extent::unity())
3732            .await
3733            .unwrap();
3734
3735        // Note: There is no support for status() in v1.
3736        // assert!(proc_mesh.status(&instance).await.is_err());
3737        // let proc_ref = proc_mesh.values().next().expect("one proc");
3738        // assert!(proc_ref.status(&instance).await.unwrap(), "proc should be alive");
3739
3740        // Spawn an `ActorMesh<TestActor>` named "a0" on the proc mesh:
3741        //
3742        // (1) For each proc (already running in its own OS process),
3743        // the `ProcMeshAgent` receives the request.
3744        //
3745        // (2) It spawns a `TestActor` inside that existing proc (no
3746        // new OS process).
3747        //
3748        // (3) The per-proc actors are collected into an
3749        // `ActorMesh<TestActor>` and returned.
3750        let actor_mesh: ActorMesh<testactor::TestActor> =
3751            proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3752
3753        // Open a fresh port on the client instance and send a
3754        // GetActorId message to the actor mesh. Each TestActor will
3755        // reply with its actor ID to the bound port. Receive one
3756        // reply and assert it matches the ID of the (single) actor in
3757        // the mesh.
3758        let (port, mut rx) = instance.mailbox().open_port();
3759        actor_mesh
3760            .cast(&instance, testactor::GetActorId(port.bind()))
3761            .unwrap();
3762        let got_id = rx.recv().await.unwrap();
3763        assert_eq!(
3764            got_id,
3765            actor_mesh.values().next().unwrap().actor_id().clone()
3766        );
3767
3768        // **Important**: If we don't shutdown the hosts, the
3769        // BootstrapProcManager's won't send SIGKILLs to their spawned
3770        // children and there will be orphans left behind.
3771        host_mesh.shutdown(&instance).await.expect("host shutdown");
3772    }
3773
3774    #[tokio::test]
3775    async fn exit_tail_is_attached_and_logged() {
3776        hyperactor_telemetry::initialize_logging_for_test();
3777
3778        let lock = hyperactor_config::global::lock();
3779        let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
3780
3781        // Spawn a child that writes to stderr then exits 7.
3782        let mut cmd = Command::new("sh");
3783        cmd.arg("-c")
3784            .arg("printf 'boom-1\\nboom-2\\n' 1>&2; exit 7")
3785            .stdout(Stdio::piped())
3786            .stderr(Stdio::piped());
3787
3788        let child = cmd.spawn().expect("spawn");
3789        let pid = child.id().unwrap_or_default();
3790
3791        // Build a BootstrapProcHandle around this child (like
3792        // manager.spawn() does).
3793        let proc_id = hyperactor::id!(testproc[0]);
3794        let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3795
3796        // Wire tailers + dummy writers (stdout/stderr -> sinks), then
3797        // stash them on the handle.
3798        {
3799            // Lock the child to get its pipes.
3800            let mut guard = handle.child.lock().expect("child mutex poisoned");
3801            if let Some(child) = guard.as_mut() {
3802                let out = child.stdout.take().expect("child stdout must be piped");
3803                let err = child.stderr.take().expect("child stderr must be piped");
3804
3805                // Set up logging like ProcManager does
3806                let log_channel = ChannelAddr::any(ChannelTransport::Unix);
3807                let tail_size = hyperactor_config::global::get(MESH_TAIL_LOG_LINES);
3808
3809                // Set up StreamFwders if pipes are available
3810                let stdout_monitor = StreamFwder::start(
3811                    out,
3812                    None,
3813                    OutputTarget::Stdout,
3814                    tail_size,
3815                    Some(log_channel.clone()),
3816                    pid,
3817                    0,
3818                );
3819
3820                let stderr_monitor = StreamFwder::start(
3821                    err,
3822                    None,
3823                    OutputTarget::Stderr,
3824                    tail_size,
3825                    Some(log_channel.clone()),
3826                    pid,
3827                    0,
3828                );
3829
3830                // Set the stream monitors on the handle
3831                handle.set_stream_monitors(Some(stdout_monitor), Some(stderr_monitor));
3832                tracing::info!("set stream monitors");
3833            } else {
3834                panic!("child already taken before wiring tailers");
3835            }
3836        }
3837
3838        // Start an exit monitor (consumes the Child and tailers from
3839        // the handle).
3840        let manager = BootstrapProcManager::new(BootstrapCommand {
3841            program: std::path::PathBuf::from("/bin/true"), // unused in this test
3842            ..Default::default()
3843        })
3844        .unwrap();
3845
3846        // Give the monitors time to start
3847        RealClock.sleep(Duration::from_millis(1000)).await;
3848
3849        // Now start the exit monitor which will take the child and wait for it to exit
3850        manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3851
3852        // Await terminal status and assert on exit code and stderr
3853        // tail.
3854        let st = RealClock
3855            .timeout(Duration::from_secs(2), handle.wait_inner())
3856            .await
3857            .expect("wait_inner() timed out (exit monitor stuck?)");
3858
3859        match st {
3860            ProcStatus::Stopped {
3861                exit_code,
3862                stderr_tail,
3863            } => {
3864                assert_eq!(
3865                    exit_code, 7,
3866                    "unexpected exit code; stderr_tail={:?}",
3867                    stderr_tail
3868                );
3869                let tail = stderr_tail.join("\n");
3870                assert!(tail.contains("boom-1"), "missing boom-1; tail:\n{tail}");
3871                assert!(tail.contains("boom-2"), "missing boom-2; tail:\n{tail}");
3872            }
3873            other => panic!("expected Stopped(7), got {other:?}"),
3874        }
3875    }
3876
3877    #[tokio::test]
3878    #[cfg(fbcode_build)]
3879    async fn test_host_bootstrap() {
3880        use crate::proc_mesh::mesh_agent::NewClientInstanceClient;
3881        use crate::v1::host_mesh::mesh_agent::GetLocalProcClient;
3882
3883        // Create a local instance just to call the local bootstrap actor.
3884        // We should find a way to avoid this for local handles.
3885        let temp_proc = Proc::local();
3886        let (temp_instance, _) = temp_proc.instance("temp").unwrap();
3887
3888        let handle = host(any_addr_for_test(), Some(BootstrapCommand::test()), None)
3889            .await
3890            .unwrap();
3891
3892        let local_proc = handle.get_local_proc(&temp_instance).await.unwrap();
3893        let _local_instance = local_proc
3894            .new_client_instance(&temp_instance)
3895            .await
3896            .unwrap();
3897    }
3898}