Skip to main content

hyperactor_mesh/
bootstrap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! ## Bootstrap invariants (BS-*)
10//!
11//! - **BS-1 (locking):** Do not acquire other locks from inside
12//!   `transition(...)`. The state lock is held for the duration of
13//!   the transition; acquiring another lock risks deadlock.
14
15use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::env::VarError;
18use std::fmt;
19use std::fs::OpenOptions;
20use std::future;
21use std::io;
22use std::io::Write;
23use std::path::Path;
24use std::path::PathBuf;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::Mutex;
28use std::sync::OnceLock;
29use std::sync::Weak;
30use std::time::Duration;
31use std::time::SystemTime;
32
33use anyhow::Context;
34use async_trait::async_trait;
35use base64::prelude::*;
36use futures::StreamExt;
37use futures::stream;
38use humantime::format_duration;
39use hyperactor::ActorAddr;
40use hyperactor::ActorHandle;
41use hyperactor::ActorRef;
42use hyperactor::Endpoint as _;
43use hyperactor::ProcAddr;
44use hyperactor::channel;
45use hyperactor::channel::ChannelAddr;
46use hyperactor::channel::ChannelError;
47use hyperactor::channel::ChannelTransport;
48use hyperactor::channel::Rx;
49use hyperactor::channel::Tx;
50use hyperactor::context;
51use hyperactor::mailbox::IntoBoxedMailboxSender;
52use hyperactor::mailbox::MailboxClient;
53use hyperactor::mailbox::MailboxServer;
54use hyperactor::mailbox::MailboxServerHandle;
55use hyperactor::proc::Proc;
56use hyperactor_config::CONFIG;
57use hyperactor_config::ConfigAttr;
58use hyperactor_config::attrs::Attrs;
59use hyperactor_config::attrs::declare_attrs;
60use hyperactor_config::global::override_or_global;
61use serde::Deserialize;
62use serde::Serialize;
63use tempfile::TempDir;
64use tokio::process::Command;
65use tokio::sync::watch;
66use tracing::Instrument;
67use tracing::Level;
68use typeuri::Named;
69
70use crate::config::MESH_PROC_LAUNCHER_KIND;
71use crate::host::BulkTerminate;
72use crate::host::Host;
73use crate::host::HostError;
74use crate::host::ProcHandle;
75use crate::host::ProcManager;
76use crate::host::ReadyError as HostReadyError;
77use crate::host::SingleTerminate;
78use crate::host::TerminateError;
79use crate::host::TerminateSummary;
80use crate::host::WaitError;
81use crate::host_mesh::host_agent::HostAgent;
82use crate::host_mesh::host_agent::HostAgentMode;
83use crate::logging::OutputTarget;
84use crate::logging::StreamFwder;
85use crate::proc_agent::ProcAgent;
86use crate::proc_launcher::LaunchOptions;
87use crate::proc_launcher::NativeProcLauncher;
88use crate::proc_launcher::ProcExitKind;
89use crate::proc_launcher::ProcExitResult;
90use crate::proc_launcher::ProcLauncher;
91use crate::proc_launcher::ProcLauncherError;
92use crate::proc_launcher::StdioHandling;
93#[cfg(target_os = "linux")]
94use crate::proc_launcher::SystemdProcLauncher;
95use crate::proc_launcher::format_process_name;
96use crate::resource;
97
98mod mailbox;
99
100declare_attrs! {
101    /// Enable forwarding child stdout/stderr over the mesh log
102    /// channel.
103    ///
104    /// When `true` (default): child stdio is piped; [`StreamFwder`]
105    /// mirrors output to the parent console and forwards bytes to the
106    /// log channel so a `LogForwardActor` can receive them.
107    ///
108    /// When `false`: no channel forwarding occurs. Child stdio may
109    /// still be piped if [`MESH_ENABLE_FILE_CAPTURE`] is `true` or
110    /// [`MESH_TAIL_LOG_LINES`] > 0; otherwise the child inherits the
111    /// parent stdio (no interception).
112    ///
113    /// This flag does not affect console mirroring: child output
114    /// always reaches the parent console—either via inheritance (no
115    /// piping) or via [`StreamFwder`] when piping is active.
116    @meta(CONFIG = ConfigAttr::new(
117        Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
118        Some("enable_log_forwarding".to_string()),
119    ))
120    pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
121
122    /// When `true`: if stdio is piped, each child's `StreamFwder`
123    /// also forwards lines to a host-scoped `FileAppender` managed by
124    /// the `BootstrapProcManager`. That appender creates exactly two
125    /// files per manager instance—one for stdout and one for
126    /// stderr—and **all** child processes' lines are multiplexed into
127    /// those two files. This can be combined with
128    /// [`MESH_ENABLE_LOG_FORWARDING`] ("stream+local").
129    ///
130    /// Notes:
131    /// - The on-disk files are *aggregate*, not per-process.
132    ///   Disambiguation is via the optional rank prefix (see
133    ///   `PREFIX_WITH_RANK`), which `StreamFwder` prepends to lines
134    ///   before writing.
135    /// - On local runs, file capture is suppressed unless
136    ///   `FORCE_FILE_LOG=true`. In that case `StreamFwder` still
137    ///   runs, but the `FileAppender` may be `None` and no files are
138    ///   written.
139    /// - `MESH_TAIL_LOG_LINES` only controls the in-memory rotating
140    ///   buffer used for peeking—independent of file capture.
141    @meta(CONFIG = ConfigAttr::new(
142        Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
143        Some("enable_file_capture".to_string()),
144    ))
145    pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
146
147    /// Maximum number of log lines retained in a proc's stderr/stdout
148    /// tail buffer. Used by [`StreamFwder`] when wiring child
149    /// pipes. Default: 100
150    @meta(CONFIG = ConfigAttr::new(
151        Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
152        Some("tail_log_lines".to_string()),
153    ))
154    pub attr MESH_TAIL_LOG_LINES: usize = 0;
155
156    /// If enabled (default), bootstrap child processes install
157    /// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
158    /// parent dies unexpectedly. This is a **production safety net**
159    /// against leaked children; tests usually disable it via
160    /// `std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG",
161    /// "false")`.
162    @meta(CONFIG = ConfigAttr::new(
163        Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()),
164        Some("mesh_bootstrap_enable_pdeathsig".to_string()),
165    ))
166    pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
167
168    /// Maximum number of child terminations to run concurrently
169    /// during bulk shutdown. Prevents unbounded spawning of
170    /// termination tasks (which could otherwise spike CPU, I/O, or
171    /// file descriptor load).
172    @meta(CONFIG = ConfigAttr::new(
173        Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()),
174        Some("mesh_terminate_concurrency".to_string()),
175    ))
176    pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
177
178    /// Per-child grace window for termination. When a shutdown is
179    /// requested, the manager sends SIGTERM and waits this long for
180    /// the child to exit before escalating to SIGKILL.
181    @meta(CONFIG = ConfigAttr::new(
182        Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()),
183        Some("mesh_terminate_timeout".to_string()),
184    ))
185    pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
186}
187
188pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
189
190/// A channel used by each process to receive its own stdout and stderr
191/// Because stdout and stderr can only be obtained by the parent process,
192/// they need to be streamed back to the process.
193pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
194
195pub(crate) const BOOTSTRAP_MODE_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_MODE";
196pub(crate) const PROCESS_NAME_ENV: &str = "HYPERACTOR_PROCESS_NAME";
197
198#[macro_export]
199macro_rules! ok {
200    ($expr:expr $(,)?) => {
201        match $expr {
202            Ok(value) => value,
203            Err(e) => return ::anyhow::Error::from(e),
204        }
205    };
206}
207
208pub async fn halt<R>() -> R {
209    future::pending::<()>().await;
210    unreachable!()
211}
212
213/// A handle that waits for a host to finish shutting down.
214///
215/// Obtained from [`host`]. Awaiting [`HostShutdownHandle::join`] blocks until
216/// the [`ShutdownHost`] handler sends back the mailbox server handle, drains
217/// it, and (if `exit_on_shutdown`) calls `process::exit`.
218///
219/// Note: [`DrainHost`] does **not** trigger this handle — a drained host
220/// keeps its mailbox server (and Unix socket) alive so new clients can
221/// reconnect to the same address.
222pub struct HostShutdownHandle {
223    rx: tokio::sync::oneshot::Receiver<MailboxServerHandle>,
224    exit_on_shutdown: bool,
225}
226
227impl HostShutdownHandle {
228    /// Wait for the host to finish shutting down, drain its mailbox server,
229    /// and optionally exit the process.
230    pub async fn join(self) {
231        match self.rx.await {
232            Ok(mailbox_handle) => {
233                mailbox_handle.stop("host shutting down");
234                let _ = mailbox_handle.await;
235            }
236            Err(_) => {} // sender dropped without sending — nothing to drain
237        }
238        if self.exit_on_shutdown {
239            std::process::exit(0);
240        }
241    }
242}
243
244/// Bootstrap a host in this process, returning a handle to the mesh agent.
245///
246/// To obtain the local proc, use `GetLocalProc` on the returned host mesh agent,
247/// then use `GetProc` on the returned proc mesh agent.
248///
249/// - `addr`: the listening address of the host; this is used to bind the frontend address;
250/// - `command`: optional bootstrap command to spawn procs, otherwise [`BootstrapProcManager::current`];
251/// - `config`: optional runtime config overlay.
252/// - `exit_on_shutdown`: if true, [`HostShutdownHandle::join`] will call `process::exit` after draining.
253/// - `listener`: when `Some`, it is used as the frontend listening socket
254///   instead of binding a new one.
255pub async fn host(
256    addr: ChannelAddr,
257    command: Option<BootstrapCommand>,
258    config: Option<Attrs>,
259    exit_on_shutdown: bool,
260    listener: Option<std::net::TcpListener>,
261) -> anyhow::Result<(ActorHandle<HostAgent>, HostShutdownHandle)> {
262    if let Some(attrs) = config {
263        hyperactor_config::global::set(hyperactor_config::global::Source::Runtime, attrs);
264        tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
265    } else {
266        tracing::debug!("bootstrap: no config snapshot provided (Host)");
267    }
268
269    let command = match command {
270        Some(command) => command,
271        None => BootstrapCommand::current()?,
272    };
273    let manager = BootstrapProcManager::new(command)?;
274
275    let host = Host::new_with_default(manager, addr, None, listener).await?;
276    let addr = host.addr().clone();
277
278    // The ShutdownHost handler will call host.serve() inside
279    // HostAgent::init (after this.bind::<Self>(), so the handler port is bound
280    // before the frontend starts routing messages), then send the resulting
281    // MailboxServerHandle back here for draining.
282    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<MailboxServerHandle>();
283
284    let system_proc = host.system_proc().clone();
285    let host_mesh_agent = system_proc.spawn::<HostAgent>(
286        "host_agent",
287        HostAgent::new(HostAgentMode::Process {
288            host,
289            shutdown_tx: Some(shutdown_tx),
290        }),
291    )?;
292
293    tracing::info!(
294        "serving host at {}, agent: {}",
295        addr,
296        host_mesh_agent.bind::<HostAgent>()
297    );
298
299    Ok((
300        host_mesh_agent,
301        HostShutdownHandle {
302            rx: shutdown_rx,
303            exit_on_shutdown,
304        },
305    ))
306}
307
308/// Bootstrap configures how a mesh process starts up.
309///
310/// Both `Proc` and `Host` variants may include an optional
311/// configuration snapshot (`hyperactor_config::Attrs`). This
312/// snapshot is serialized into the bootstrap payload and made
313/// available to the child. Interpretation and application of that
314/// snapshot is up to the child process; if omitted, the child falls
315/// back to environment/default values.
316#[derive(Clone, Debug, Serialize, Deserialize)]
317pub enum Bootstrap {
318    /// Bootstrap as a "v1" proc
319    Proc {
320        /// The ProcAddr of the proc to be bootstrapped.
321        proc_id: ProcAddr,
322        /// The backend address to which messages are forwarded.
323        /// See [`crate::host`] for channel topology details.
324        backend_addr: ChannelAddr,
325        /// The callback address used to indicate successful spawning.
326        callback_addr: ChannelAddr,
327        /// Directory for storing proc socket files. Procs place their sockets
328        /// in this directory, so that they can be looked up by other procs
329        /// for direct transfer.
330        socket_dir_path: PathBuf,
331        /// Optional config snapshot (`hyperactor_config::Attrs`)
332        /// captured by the parent. If present, the child installs it
333        /// as the `ClientOverride` layer so the parent's effective config
334        /// takes precedence over Defaults.
335        config: Option<Attrs>,
336    },
337
338    /// Bootstrap as a "v1" host bootstrap. This sets up a new `Host`,
339    /// managed by a [`crate::host_mesh::host_agent::HostAgent`].
340    Host {
341        /// The address on which to serve the host.
342        addr: ChannelAddr,
343        /// If specified, use the provided command instead of
344        /// [`BootstrapCommand::current`].
345        command: Option<BootstrapCommand>,
346        /// Optional config snapshot (`hyperactor_config::Attrs`)
347        /// captured by the parent. If present, the child installs it
348        /// as the `ClientOverride` layer so the parent's effective config
349        /// takes precedence over Defaults.
350        config: Option<Attrs>,
351        /// If true, exit the process after handling a shutdown request.
352        exit_on_shutdown: bool,
353    },
354}
355
356impl Bootstrap {
357    /// Serialize the mode into a environment-variable-safe string by
358    /// base64-encoding its JSON representation.
359    #[allow(clippy::result_large_err)]
360    pub(crate) fn to_env_safe_string(&self) -> crate::Result<String> {
361        Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
362    }
363
364    /// Deserialize the mode from the representation returned by [`to_env_safe_string`].
365    #[allow(clippy::result_large_err)]
366    pub(crate) fn from_env_safe_string(str: &str) -> crate::Result<Self> {
367        let data = BASE64_STANDARD.decode(str)?;
368        let data = std::str::from_utf8(&data)?;
369        Ok(serde_json::from_str(data)?)
370    }
371
372    /// Get a bootstrap configuration from the environment; returns `None`
373    /// if the environment does not specify a boostrap config.
374    pub fn get_from_env() -> anyhow::Result<Option<Self>> {
375        match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
376            Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
377                Ok(mode) => Ok(Some(mode)),
378                Err(e) => {
379                    Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
380                }
381            },
382            Err(VarError::NotPresent) => Ok(None),
383            Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
384        }
385    }
386
387    /// Inject this bootstrap configuration into the environment of the provided command.
388    pub fn to_env(&self, cmd: &mut Command) {
389        cmd.env(
390            "HYPERACTOR_MESH_BOOTSTRAP_MODE",
391            self.to_env_safe_string().unwrap(),
392        );
393    }
394
395    /// Bootstrap this binary according to this configuration.
396    /// This runs until all processes are ready to exit, or returns an error.
397    /// The Ok value is the exit code that should be used.
398    pub async fn bootstrap(self) -> anyhow::Result<i32> {
399        tracing::info!(
400            "bootstrapping mesh process: {}",
401            serde_json::to_string(&self).unwrap()
402        );
403
404        if Debug::is_active() {
405            let mut buf = Vec::new();
406            writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
407            #[cfg(unix)]
408            writeln!(
409                &mut buf,
410                "\tparent pid: {}",
411                std::os::unix::process::parent_id()
412            )
413            .unwrap();
414            writeln!(
415                &mut buf,
416                "\tconfig: {}",
417                serde_json::to_string(&self).unwrap()
418            )
419            .unwrap();
420            match std::env::current_exe() {
421                Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
422                Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
423            }
424            writeln!(&mut buf, "\targs:").unwrap();
425            for arg in std::env::args() {
426                writeln!(&mut buf, "\t\t{}", arg).unwrap();
427            }
428            writeln!(&mut buf, "\tenv:").unwrap();
429            for (key, val) in std::env::vars() {
430                writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
431            }
432            let _ = Debug.write(&buf);
433            if let Ok(s) = std::str::from_utf8(&buf) {
434                tracing::info!("{}", s);
435            } else {
436                tracing::info!("{:?}", buf);
437            }
438        }
439
440        match self {
441            Bootstrap::Proc {
442                proc_id,
443                backend_addr,
444                callback_addr,
445                socket_dir_path,
446                config,
447            } => {
448                let entered = tracing::span!(
449                    Level::INFO,
450                    "proc_bootstrap",
451                    %proc_id,
452                    %backend_addr,
453                    %callback_addr,
454                    socket_dir_path = %socket_dir_path.display(),
455                )
456                .entered();
457                if let Some(attrs) = config {
458                    hyperactor_config::global::set(
459                        hyperactor_config::global::Source::ClientOverride,
460                        attrs,
461                    );
462                    tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)");
463                } else {
464                    tracing::debug!("bootstrap: no config snapshot provided (Proc)");
465                }
466
467                if hyperactor_config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
468                    // Safety net: normal shutdown is via
469                    // `host_mesh.shutdown(&instance)`; PR_SET_PDEATHSIG
470                    // is a last-resort guard against leaks if that
471                    // protocol is bypassed.
472                    let _ = install_pdeathsig_kill();
473                } else {
474                    eprintln!("(bootstrap) PDEATHSIG disabled via config");
475                }
476
477                let local_addr = proc_id.addr().clone();
478                let (serve_addr, _) = local_proc_addr(&socket_dir_path, proc_id.id())?;
479
480                // The following is a modified host::spawn_proc to support direct
481                // dialing between local procs: 1) we bind each proc to a deterministic
482                // address in socket_dir_path; 2) we use LocalProcDialer to dial these
483                // addresses for local procs.
484                let proc_sender = mailbox::LocalProcDialer::new(
485                    local_addr.clone(),
486                    socket_dir_path,
487                    MailboxClient::dial(backend_addr)?,
488                );
489
490                let proc = Proc::configured(proc_id.clone(), proc_sender.into_boxed());
491
492                let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<i32>();
493                let agent_handle = ProcAgent::boot_v1(proc.clone(), Some(shutdown_tx))
494                    .map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?;
495
496                let span = entered.exit();
497
498                // Finally serve the proc on the same transport as the backend address,
499                // and call back.
500                let (proc_addr, proc_rx) = channel::serve(serve_addr)?;
501                let mailbox_handle = proc.clone().serve(proc_rx);
502                channel::dial(callback_addr)?
503                    .send((proc_addr, agent_handle.bind::<ProcAgent>()))
504                    .instrument(span)
505                    .await
506                    .map_err(ChannelError::from)?;
507
508                // Wait for the StopAll handler to signal the exit code, then
509                // gracefully stop the mailbox server before exiting.
510                let exit_code = shutdown_rx.await.unwrap_or(1);
511                mailbox_handle.stop("process shutting down");
512                let _ = mailbox_handle.await;
513                tracing::info!("bootstrap shutting down with exit code {}", exit_code);
514                // Don't exit the proc, return Ok so the parent function can decide
515                // how to stop.
516                Ok(exit_code)
517            }
518            Bootstrap::Host {
519                addr,
520                command,
521                config,
522                exit_on_shutdown,
523            } => {
524                let (_agent_handle, shutdown) =
525                    host(addr, command, config, exit_on_shutdown, None).await?;
526                shutdown.join().await;
527                halt().await
528            }
529        }
530    }
531
532    /// A variant of [`bootstrap`] that logs the error and exits the process
533    /// if bootstrapping fails.
534    pub async fn bootstrap_or_die(self) -> ! {
535        let exit_code = match self.bootstrap().await {
536            Ok(exit_code) => exit_code,
537            Err(err) => {
538                tracing::error!("failed to bootstrap mesh process: {}", err);
539                1
540            }
541        };
542        std::process::exit(exit_code);
543    }
544}
545
546/// Install "kill me if parent dies" and close the race window.
547pub fn install_pdeathsig_kill() -> io::Result<()> {
548    #[cfg(target_os = "linux")]
549    {
550        // SAFETY: `getppid()` is a simple libc syscall returning the
551        // parent PID; it has no side effects and does not touch memory.
552        let ppid_before = unsafe { libc::getppid() };
553
554        // SAFETY: Calling into libc; does not dereference memory, just
555        // asks the kernel to deliver SIGKILL on parent death.
556        let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
557        if rc != 0 {
558            return Err(io::Error::last_os_error());
559        }
560
561        // Race-close: if the parent died between our exec and prctl(),
562        // we won't get a signal, so detect that and exit now.
563        //
564        // If the parent PID changed, the parent has died and we've been
565        // reparented. Note: We cannot assume ppid == 1 means the parent
566        // died, as in container environments (e.g., Kubernetes) the parent
567        // may legitimately run as PID 1.
568        // SAFETY: `getppid()` is a simple libc syscall returning the
569        // parent PID; it has no side effects and does not touch memory.
570        let ppid_after = unsafe { libc::getppid() };
571        if ppid_before != ppid_after {
572            std::process::exit(0);
573        }
574    }
575    Ok(())
576}
577
578/// Represents the lifecycle state of a **proc as hosted in an OS
579/// process** managed by `BootstrapProcManager`.
580///
581/// Note: This type is deliberately distinct from [`ProcState`] and
582/// [`ProcStopReason`] (see `alloc.rs`). Those types model allocator
583/// *events* - e.g. "a proc was Created/Running/Stopped" - and are
584/// consumed from an event stream during allocation. By contrast,
585/// [`ProcStatus`] is a **live, queryable view**: it reflects the
586/// current observed status of a running proc, as seen through the
587/// [`BootstrapProcHandle`] API (stop, kill, status).
588///
589/// In short:
590/// - `ProcState`/`ProcStopReason`: historical / event-driven model
591/// - `ProcStatus`: immediate status surface for lifecycle control
592#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
593pub enum ProcStatus {
594    /// The OS process has been spawned but is not yet fully running.
595    /// (Process-level: child handle exists, no confirmation yet.)
596    Starting,
597    /// The OS process is alive and considered running.
598    /// (Proc-level: bootstrap may still be running.)
599    Running { started_at: SystemTime },
600    /// Ready means bootstrap has completed and the proc is serving.
601    /// (Proc-level: bootstrap completed.)
602    Ready {
603        started_at: SystemTime,
604        addr: ChannelAddr,
605        agent: ActorRef<ProcAgent>,
606    },
607    /// A stop has been requested (SIGTERM, graceful shutdown, etc.),
608    /// but the OS process has not yet fully exited. (Proc-level:
609    /// shutdown in progress; Process-level: still running.)
610    Stopping { started_at: SystemTime },
611    /// The process exited with a normal exit code. (Process-level:
612    /// exit observed.)
613    Stopped {
614        exit_code: i32,
615        stderr_tail: Vec<String>,
616    },
617    /// The process was killed by a signal (e.g. SIGKILL).
618    /// (Process-level: abnormal termination.)
619    Killed { signal: i32, core_dumped: bool },
620    /// The proc or its process failed for some other reason
621    /// (bootstrap error, unexpected condition, etc.). (Both levels:
622    /// catch-all failure.)
623    Failed { reason: String },
624}
625
626impl ProcStatus {
627    /// Returns `true` if the proc is in a terminal (exited) state:
628    /// [`ProcStatus::Stopped`], [`ProcStatus::Killed`], or
629    /// [`ProcStatus::Failed`].
630    #[inline]
631    pub fn is_exit(&self) -> bool {
632        matches!(
633            self,
634            ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
635        )
636    }
637}
638
639impl std::fmt::Display for ProcStatus {
640    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641        match self {
642            ProcStatus::Starting => write!(f, "Starting"),
643            ProcStatus::Running { started_at } => {
644                let uptime = started_at
645                    .elapsed()
646                    .map(|d| format!(" up {}", format_duration(d)))
647                    .unwrap_or_default();
648                write!(f, "Running{uptime}")
649            }
650            ProcStatus::Ready {
651                started_at, addr, ..
652            } => {
653                let uptime = started_at
654                    .elapsed()
655                    .map(|d| format!(" up {}", format_duration(d)))
656                    .unwrap_or_default();
657                write!(f, "Ready at {addr}{uptime}")
658            }
659            ProcStatus::Stopping { started_at } => {
660                let uptime = started_at
661                    .elapsed()
662                    .map(|d| format!(" up {}", format_duration(d)))
663                    .unwrap_or_default();
664                write!(f, "Stopping{uptime}")
665            }
666            ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
667            ProcStatus::Killed {
668                signal,
669                core_dumped,
670            } => {
671                if *core_dumped {
672                    write!(f, "Killed(sig={signal}, core)")
673                } else {
674                    write!(f, "Killed(sig={signal})")
675                }
676            }
677            ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
678        }
679    }
680}
681
682/// Error returned by [`BootstrapProcHandle::ready`].
683#[derive(Debug, Clone)]
684pub enum ReadyError {
685    /// The proc reached a terminal state before `Ready`.
686    Terminal(ProcStatus),
687    /// The internal watch channel closed unexpectedly.
688    ChannelClosed,
689}
690
691impl std::fmt::Display for ReadyError {
692    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
693        match self {
694            ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
695            ReadyError::ChannelClosed => write!(f, "status channel closed"),
696        }
697    }
698}
699impl std::error::Error for ReadyError {}
700
701/// A handle to a proc launched by [`BootstrapProcManager`].
702///
703/// `BootstrapProcHandle` is a lightweight supervisor for an external
704/// process: it tracks and broadcasts lifecycle state, and exposes a
705/// small control/observation surface. While it may temporarily hold a
706/// `tokio::process::Child` (shared behind a mutex) so the exit
707/// monitor can `wait()` it, it is **not** the unique owner of the OS
708/// process, and dropping a `BootstrapProcHandle` does not by itself
709/// terminate the process.
710///
711/// What it pairs together:
712/// - the **logical proc identity** (`ProcAddr`)
713/// - the **live status surface** ([`ProcStatus`]), available both as
714///   a synchronous snapshot (`status()`) and as an async stream via a
715///   `tokio::sync::watch` channel (`watch()` / `changed()`)
716///
717/// Responsibilities:
718/// - Retain the child handle only until the exit monitor claims it,
719///   so the OS process can be awaited and its terminal status
720///   recorded.
721/// - Hold stdout/stderr tailers until the exit monitor takes them,
722///   then join to recover buffered output for diagnostics.
723/// - Update status via the `mark_*` transitions and broadcast changes
724///   over the watch channel so tasks can `await` lifecycle
725///   transitions without polling.
726/// - Provide the foundation for higher-level APIs like `wait()`
727///   (await terminal) and, later, `terminate()` / `kill()`.
728///
729/// Notes:
730/// - Manager-level cleanup happens in [`BootstrapProcManager::drop`]:
731///   it SIGKILLs any still-recorded PIDs; we do not rely on
732///   `Child::kill_on_drop`.
733///
734/// Relationship to types:
735/// - [`ProcStatus`]: live status surface, updated by this handle.
736/// - [`ProcState`]/[`ProcStopReason`] (in `alloc.rs`):
737///   allocator-facing, historical event log; not directly updated by
738///   this type.
739#[derive(Clone)]
740pub struct BootstrapProcHandle {
741    /// Logical identity of the proc in the mesh.
742    proc_id: ProcAddr,
743
744    /// Live lifecycle snapshot (see [`ProcStatus`]). Kept in a mutex
745    /// so [`BootstrapProcHandle::status`] can return a synchronous
746    /// copy. All mutations now flow through
747    /// [`BootstrapProcHandle::transition`], which updates this field
748    /// under the lock and then broadcasts on the watch channel.
749    status: Arc<std::sync::Mutex<ProcStatus>>,
750
751    /// Launcher used to terminate/kill the proc. The launcher owns
752    /// the actual OS child handle and PID tracking.
753    ///
754    /// We hold a `Weak` reference so that when `BootstrapProcManager`
755    /// drops, the launcher's `Arc` refcount reaches zero and its `Drop`
756    /// runs, cleaning up any remaining child processes. If the manager
757    /// is gone when we try to terminate/kill, we treat it as a no-op
758    /// (the proc is being killed by the launcher's Drop anyway).
759    launcher: Weak<dyn ProcLauncher>,
760
761    /// Stdout monitor for this proc. Created with `StreamFwder::start`, it
762    /// forwards output to a log channel and keeps a bounded ring buffer.
763    /// Transferred to the exit monitor, which joins it after `wait()`
764    /// to recover buffered lines.
765    stdout_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
766
767    /// Stderr monitor for this proc. Same behavior as `stdout_fwder`
768    /// but for stderr (used for exit-reason enrichment).
769    stderr_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
770
771    /// Watch sender for status transitions. Every `mark_*` goes
772    /// through [`BootstrapProcHandle::transition`], which updates the
773    /// snapshot under the lock and then `send`s the new
774    /// [`ProcStatus`].
775    tx: tokio::sync::watch::Sender<ProcStatus>,
776
777    /// Watch receiver seed. `watch()` clones this so callers can
778    /// `borrow()` the current status and `changed().await` future
779    /// transitions independently.
780    rx: tokio::sync::watch::Receiver<ProcStatus>,
781}
782
783impl fmt::Debug for BootstrapProcHandle {
784    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
785        let status = self.status.lock().expect("status mutex poisoned").clone();
786        f.debug_struct("BootstrapProcHandle")
787            .field("proc_id", &self.proc_id)
788            .field("status", &status)
789            .field("launcher", &"<dyn ProcLauncher>")
790            .field("tx", &"<watch::Sender>")
791            .field("rx", &"<watch::Receiver>")
792            // Intentionally skip stdout_tailer / stderr_tailer (not
793            // Debug).
794            .finish()
795    }
796}
797
798// See BS-1 in module doc.
799impl BootstrapProcHandle {
800    /// Construct a new [`BootstrapProcHandle`] for a freshly spawned
801    /// OS process hosting a proc.
802    ///
803    /// - Initializes the status to [`ProcStatus::Starting`] since the
804    ///   child process has been created but not yet confirmed running.
805    /// - Stores the launcher reference for terminate/kill delegation.
806    ///
807    /// This is the canonical entry point used by
808    /// `BootstrapProcManager` when it launches a proc into a new
809    /// process.
810    pub(crate) fn new(proc_id: ProcAddr, launcher: Weak<dyn ProcLauncher>) -> Self {
811        let (tx, rx) = watch::channel(ProcStatus::Starting);
812        Self {
813            proc_id,
814            status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
815            launcher,
816            stdout_fwder: Arc::new(std::sync::Mutex::new(None)),
817            stderr_fwder: Arc::new(std::sync::Mutex::new(None)),
818            tx,
819            rx,
820        }
821    }
822
823    /// Return the logical proc address in the mesh.
824    #[inline]
825    pub fn proc_addr(&self) -> &ProcAddr {
826        &self.proc_id
827    }
828
829    /// Create a new subscription to this proc's status stream.
830    ///
831    /// Each call returns a fresh [`watch::Receiver`] tied to this
832    /// handle's internal [`ProcStatus`] channel. The receiver can be
833    /// awaited on (`rx.changed().await`) to observe lifecycle
834    /// transitions as they occur.
835    ///
836    /// Notes:
837    /// - Multiple subscribers can exist simultaneously; each sees
838    ///   every status update in order.
839    /// - Use [`BootstrapProcHandle::status`] for a one-off snapshot;
840    ///   use `watch()` when you need to await changes over time.
841    #[inline]
842    pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
843        self.rx.clone()
844    }
845
846    /// Wait until this proc's status changes.
847    ///
848    /// This is a convenience wrapper around
849    /// [`watch::Receiver::changed`]: it subscribes internally via
850    /// [`BootstrapProcHandle::watch`] and awaits the next transition.
851    /// If no subscribers exist or the channel is closed, this returns
852    /// without error.
853    ///
854    /// Typical usage:
855    /// ```ignore
856    /// handle.changed().await;
857    /// match handle.status() {
858    ///     ProcStatus::Running { .. } => { /* now running */ }
859    ///     ProcStatus::Stopped { .. } => { /* exited */ }
860    ///     _ => {}
861    /// }
862    /// ```
863    #[inline]
864    pub async fn changed(&self) {
865        let _ = self.watch().changed().await;
866    }
867
868    /// Return a snapshot of the current [`ProcStatus`] for this proc.
869    ///
870    /// This is a *live view* of the lifecycle state as tracked by
871    /// [`BootstrapProcManager`]. It reflects what is currently known
872    /// about the underlying OS process (e.g., `Starting`, `Running`,
873    /// `Stopping`, etc.).
874    ///
875    /// Internally this reads the mutex-guarded status. Use this when
876    /// you just need a synchronous snapshot; use
877    /// [`BootstrapProcHandle::watch`] or
878    /// [`BootstrapProcHandle::changed`] if you want to await
879    /// transitions asynchronously.
880    #[must_use]
881    pub fn status(&self) -> ProcStatus {
882        // Source of truth for now is the mutex. We broadcast via
883        // `watch` in `transition`, but callers that want a
884        // synchronous snapshot should read the guarded value.
885        self.status.lock().expect("status mutex poisoned").clone()
886    }
887
888    /// Atomically apply a state transition while holding the status
889    /// lock, and send the updated value on the watch channel **while
890    /// still holding the lock**. This guarantees the mutex state and
891    /// the broadcast value stay in sync and avoids reordering between
892    /// concurrent transitions.
893    fn transition<F>(&self, f: F) -> bool
894    where
895        F: FnOnce(&mut ProcStatus) -> bool,
896    {
897        let mut guard = self.status.lock().expect("status mutex poisoned");
898        let _before = guard.clone();
899        let changed = f(&mut guard);
900        if changed {
901            // Publish while still holding the lock to preserve order.
902            let _ = self.tx.send(guard.clone());
903        }
904        changed
905    }
906
907    /// Transition this proc into the [`ProcStatus::Running`] state.
908    ///
909    /// Called internally once the child OS process has been spawned.
910    /// Records the `started_at` timestamp so that callers can query it
911    /// later via [`BootstrapProcHandle::status`].
912    ///
913    /// This is a best-effort marker: it reflects that the process
914    /// exists at the OS level, but does not guarantee that the proc
915    /// has completed bootstrap or is fully ready.
916    pub(crate) fn mark_running(&self, started_at: SystemTime) -> bool {
917        self.transition(|st| match *st {
918            ProcStatus::Starting => {
919                *st = ProcStatus::Running { started_at };
920                true
921            }
922            _ => {
923                tracing::warn!(
924                    "illegal transition: {:?} -> Running; leaving status unchanged",
925                    *st
926                );
927                false
928            }
929        })
930    }
931
932    /// Attempt to transition this proc into the [`ProcStatus::Ready`]
933    /// state.
934    ///
935    /// This records the listening address and agent once the proc has
936    /// successfully started and is ready to serve. The `started_at`
937    /// timestamp is derived from the current `Running` state.
938    ///
939    /// Returns `true` if the transition succeeded (from `Starting` or
940    /// `Running`), or `false` if the current state did not allow
941    /// moving to `Ready`. In the latter case the state is left
942    /// unchanged and a warning is logged.
943    pub(crate) fn mark_ready(&self, addr: ChannelAddr, agent: ActorRef<ProcAgent>) -> bool {
944        tracing::info!(proc_id = %self.proc_id, %addr, "{} ready at {}", self.proc_id, addr);
945        self.transition(|st| match st {
946            ProcStatus::Starting => {
947                // Unexpected: we should be Running before Ready, but
948                // handle gracefully with current time.
949                *st = ProcStatus::Ready {
950                    started_at: std::time::SystemTime::now(),
951                    addr,
952                    agent,
953                };
954                true
955            }
956            ProcStatus::Running { started_at } => {
957                let started_at = *started_at;
958                *st = ProcStatus::Ready {
959                    started_at,
960                    addr,
961                    agent,
962                };
963                true
964            }
965            _ => {
966                tracing::warn!(
967                    "illegal transition: {:?} -> Ready; leaving status unchanged",
968                    st
969                );
970                false
971            }
972        })
973    }
974
975    /// Record that a stop has been requested for the proc (e.g. a
976    /// graceful shutdown via SIGTERM), but the underlying process has
977    /// not yet fully exited.
978    pub(crate) fn mark_stopping(&self) -> bool {
979        let now = std::time::SystemTime::now();
980
981        self.transition(|st| match *st {
982            ProcStatus::Running { started_at } => {
983                *st = ProcStatus::Stopping { started_at };
984                true
985            }
986            ProcStatus::Ready { started_at, .. } => {
987                *st = ProcStatus::Stopping { started_at };
988                true
989            }
990            ProcStatus::Starting => {
991                *st = ProcStatus::Stopping { started_at: now };
992                true
993            }
994            _ => false,
995        })
996    }
997
998    /// Record that the process has exited normally with the given
999    /// exit code.
1000    pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
1001        self.transition(|st| match *st {
1002            ProcStatus::Starting
1003            | ProcStatus::Running { .. }
1004            | ProcStatus::Ready { .. }
1005            | ProcStatus::Stopping { .. } => {
1006                *st = ProcStatus::Stopped {
1007                    exit_code,
1008                    stderr_tail,
1009                };
1010                true
1011            }
1012            _ => {
1013                tracing::warn!(
1014                    "illegal transition: {:?} -> Stopped; leaving status unchanged",
1015                    *st
1016                );
1017                false
1018            }
1019        })
1020    }
1021
1022    /// Record that the process was killed by the given signal (e.g.
1023    /// SIGKILL, SIGTERM).
1024    pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
1025        self.transition(|st| match *st {
1026            ProcStatus::Starting
1027            | ProcStatus::Running { .. }
1028            | ProcStatus::Ready { .. }
1029            | ProcStatus::Stopping { .. } => {
1030                *st = ProcStatus::Killed {
1031                    signal,
1032                    core_dumped,
1033                };
1034                true
1035            }
1036            _ => {
1037                tracing::warn!(
1038                    "illegal transition: {:?} -> Killed; leaving status unchanged",
1039                    *st
1040                );
1041                false
1042            }
1043        })
1044    }
1045
1046    /// Record that the proc or its process failed for an unexpected
1047    /// reason (bootstrap error, spawn failure, etc.).
1048    pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
1049        self.transition(|st| match *st {
1050            ProcStatus::Starting
1051            | ProcStatus::Running { .. }
1052            | ProcStatus::Ready { .. }
1053            | ProcStatus::Stopping { .. } => {
1054                *st = ProcStatus::Failed {
1055                    reason: reason.into(),
1056                };
1057                true
1058            }
1059            _ => {
1060                tracing::warn!(
1061                    "illegal transition: {:?} -> Failed; leaving status unchanged",
1062                    *st
1063                );
1064                false
1065            }
1066        })
1067    }
1068
1069    /// Wait until the proc has reached a terminal state and return
1070    /// it.
1071    ///
1072    /// Terminal means [`ProcStatus::Stopped`],
1073    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]. If the
1074    /// current status is already terminal, returns immediately.
1075    ///
1076    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
1077    /// owner of the OS process, so you can call `wait()` from
1078    /// multiple tasks concurrently.
1079    ///
1080    /// Implementation detail: listens on this handle's `watch`
1081    /// channel. It snapshots the current status, and if not terminal
1082    /// awaits the next change. If the channel closes unexpectedly,
1083    /// returns the last observed status.
1084    ///
1085    /// Mirrors `tokio::process::Child::wait()`, but yields the
1086    /// higher-level [`ProcStatus`] instead of an `ExitStatus`.
1087    #[must_use]
1088    pub async fn wait_inner(&self) -> ProcStatus {
1089        let mut rx = self.watch();
1090        loop {
1091            let st = rx.borrow().clone();
1092            if st.is_exit() {
1093                return st;
1094            }
1095            // If the channel closes, return the last observed value.
1096            if rx.changed().await.is_err() {
1097                return st;
1098            }
1099        }
1100    }
1101
1102    /// Wait until the proc reaches the [`ProcStatus::Ready`] state.
1103    ///
1104    /// If the proc hits a terminal state ([`ProcStatus::Stopped`],
1105    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]) before ever
1106    /// becoming `Ready`, this returns
1107    /// `Err(ReadyError::Terminal(status))`. If the internal watch
1108    /// channel closes unexpectedly, this returns
1109    /// `Err(ReadyError::ChannelClosed)`. Otherwise it returns
1110    /// `Ok(())` when `Ready` is first observed.
1111    ///
1112    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
1113    /// owner; multiple tasks may await `ready()` concurrently.
1114    /// `Stopping` is not treated as terminal here; we continue
1115    /// waiting until `Ready` or a terminal state is seen.
1116    ///
1117    /// Companion to [`BootstrapProcHandle::wait_inner`]:
1118    /// `wait_inner()` resolves on exit; `ready_inner()` resolves on
1119    /// startup.
1120    pub async fn ready_inner(&self) -> Result<(), ReadyError> {
1121        let mut rx = self.watch();
1122        loop {
1123            let st = rx.borrow().clone();
1124            match &st {
1125                ProcStatus::Ready { .. } => return Ok(()),
1126                s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1127                _non_terminal => {
1128                    if rx.changed().await.is_err() {
1129                        return Err(ReadyError::ChannelClosed);
1130                    }
1131                }
1132            }
1133        }
1134    }
1135
1136    pub fn set_stream_monitors(&self, out: Option<StreamFwder>, err: Option<StreamFwder>) {
1137        *self
1138            .stdout_fwder
1139            .lock()
1140            .expect("stdout_tailer mutex poisoned") = out;
1141        *self
1142            .stderr_fwder
1143            .lock()
1144            .expect("stderr_tailer mutex poisoned") = err;
1145    }
1146
1147    fn take_stream_monitors(&self) -> (Option<StreamFwder>, Option<StreamFwder>) {
1148        let out = self
1149            .stdout_fwder
1150            .lock()
1151            .expect("stdout_tailer mutex poisoned")
1152            .take();
1153        let err = self
1154            .stderr_fwder
1155            .lock()
1156            .expect("stderr_tailer mutex poisoned")
1157            .take();
1158        (out, err)
1159    }
1160
1161    /// Wait for the proc to exit, escalating to launcher terminate/kill
1162    /// if it doesn't exit within `timeout`.
1163    ///
1164    /// This is a fire-and-forget helper: it assumes a stop signal has
1165    /// already been sent. It waits for exit, then escalates through
1166    /// terminate and kill if needed.
1167    pub(crate) async fn wait_or_brutally_kill(&self, timeout: Duration) {
1168        match tokio::time::timeout(timeout, self.wait_inner()).await {
1169            Ok(st) if st.is_exit() => return,
1170            _ => {}
1171        }
1172
1173        let _ = self.mark_stopping();
1174
1175        if let Some(launcher) = self.launcher.upgrade() {
1176            let ref_proc_id: ProcAddr = self.proc_id.clone();
1177            if let Err(e) = launcher.terminate(&ref_proc_id, timeout).await {
1178                tracing::warn!(
1179                    proc_id = %self.proc_id,
1180                    error = %e,
1181                    "wait_or_brutally_kill: launcher terminate failed, trying kill"
1182                );
1183                let _ = launcher.kill(&ref_proc_id).await;
1184            }
1185        }
1186
1187        let _ = self.wait_inner().await;
1188    }
1189
1190    /// Sends a StopAll message to the ProcAgent, which should exit the process.
1191    /// Waits for the successful state change of the process. If the process
1192    /// doesn't reach a terminal state, returns Err.
1193    async fn send_stop_all(
1194        &self,
1195        cx: &impl context::Actor,
1196        agent: ActorRef<ProcAgent>,
1197        timeout: Duration,
1198        reason: &str,
1199    ) -> anyhow::Result<ProcStatus> {
1200        // For all of the messages and replies in this function:
1201        // if the proc is already dead, then the message will be undeliverable,
1202        // which should be ignored.
1203        // If this message isn't deliverable to the agent, the process may have
1204        // stopped already. No need to produce any errors, just continue with
1205        // killing the process.
1206        let mut agent_port = agent.port();
1207        agent_port.return_undeliverable(false);
1208        agent_port.post(
1209            cx,
1210            resource::StopAll {
1211                reason: reason.to_string(),
1212            },
1213        );
1214        // The agent handling Stop should exit the process, if it doesn't within
1215        // the time window, we escalate to SIGTERM.
1216        match tokio::time::timeout(timeout, self.wait()).await {
1217            Ok(Ok(st)) => Ok(st),
1218            Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1219            Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
1220        }
1221    }
1222}
1223
1224#[async_trait]
1225impl ProcHandle for BootstrapProcHandle {
1226    type Agent = ProcAgent;
1227    type TerminalStatus = ProcStatus;
1228
1229    #[inline]
1230    fn proc_addr(&self) -> &ProcAddr {
1231        &self.proc_id
1232    }
1233
1234    #[inline]
1235    fn addr(&self) -> Option<ChannelAddr> {
1236        match &*self.status.lock().expect("status mutex poisoned") {
1237            ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1238            _ => None,
1239        }
1240    }
1241
1242    #[inline]
1243    fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1244        match &*self.status.lock().expect("status mutex poisoned") {
1245            ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1246            _ => None,
1247        }
1248    }
1249
1250    /// Wait until this proc first reaches the [`ProcStatus::Ready`]
1251    /// state.
1252    ///
1253    /// Returns `Ok(())` once `Ready` is observed.
1254    ///
1255    /// If the proc transitions directly to a terminal state before
1256    /// becoming `Ready`, returns `Err(ReadyError::Terminal(status))`.
1257    ///
1258    /// If the internal status watch closes unexpectedly before
1259    /// `Ready` is observed, returns `Err(ReadyError::ChannelClosed)`.
1260    async fn ready(&self) -> Result<(), HostReadyError<Self::TerminalStatus>> {
1261        match self.ready_inner().await {
1262            Ok(()) => Ok(()),
1263            Err(ReadyError::Terminal(status)) => Err(HostReadyError::Terminal(status)),
1264            Err(ReadyError::ChannelClosed) => Err(HostReadyError::ChannelClosed),
1265        }
1266    }
1267
1268    /// Wait until this proc reaches a terminal [`ProcStatus`].
1269    ///
1270    /// Returns `Ok(status)` when a terminal state is observed
1271    /// (`Stopped`, `Killed`, or `Failed`).
1272    ///
1273    /// If the internal status watch closes before any terminal state
1274    /// is seen, returns `Err(WaitError::ChannelClosed)`.
1275    async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1276        let status = self.wait_inner().await;
1277        if status.is_exit() {
1278            Ok(status)
1279        } else {
1280            Err(WaitError::ChannelClosed)
1281        }
1282    }
1283
1284    /// Attempt to terminate the underlying OS process.
1285    ///
1286    /// This drives **process-level** teardown only:
1287    /// - First attempts graceful shutdown via `ProcAgent` if available.
1288    /// - If that fails or times out, delegates to the launcher's
1289    ///   `terminate()` method, which handles SIGTERM/SIGKILL escalation.
1290    ///
1291    /// If the process was already in a terminal state when called,
1292    /// returns [`TerminateError::AlreadyTerminated`].
1293    ///
1294    /// # Parameters
1295    /// - `timeout`: Grace period to wait after graceful shutdown before
1296    ///   escalating.
1297    /// - `reason`: Human-readable reason for termination.
1298    ///
1299    /// # Returns
1300    /// - `Ok(ProcStatus)` if the process exited during the
1301    ///   termination sequence.
1302    /// - `Err(TerminateError)` if already exited, signaling failed,
1303    ///   or the channel was lost.
1304    async fn terminate(
1305        &self,
1306        cx: &impl context::Actor,
1307        timeout: Duration,
1308        reason: &str,
1309    ) -> Result<ProcStatus, TerminateError<Self::TerminalStatus>> {
1310        // If already terminal, return that.
1311        let st0 = self.status();
1312        if st0.is_exit() {
1313            tracing::debug!(?st0, "terminate(): already terminal");
1314            return Err(TerminateError::AlreadyTerminated(st0));
1315        }
1316
1317        // Before signaling, try to close actors normally. Only works if
1318        // they are in the Ready state and have an Agent we can message.
1319        let agent = self.agent_ref();
1320        if let Some(agent) = agent {
1321            match self.send_stop_all(cx, agent.clone(), timeout, reason).await {
1322                Ok(st) => return Ok(st),
1323                Err(e) => {
1324                    // Variety of possible errors, proceed with launcher termination.
1325                    tracing::warn!(
1326                        "ProcAgent {} could not successfully stop all actors: {}",
1327                        agent.actor_addr(),
1328                        e,
1329                    );
1330                }
1331            }
1332        }
1333
1334        // Mark "Stopping" (ok if state races).
1335        let _ = self.mark_stopping();
1336
1337        // Delegate to launcher for SIGTERM/SIGKILL escalation.
1338        tracing::info!(proc_id = %self.proc_id, ?timeout, "terminate(): delegating to launcher");
1339        let ref_proc_id: ProcAddr = self.proc_id.clone();
1340        if let Some(launcher) = self.launcher.upgrade() {
1341            if let Err(e) = launcher.terminate(&ref_proc_id, timeout).await {
1342                tracing::warn!(proc_id = %self.proc_id, error=%e, "terminate(): launcher termination failed");
1343                return Err(TerminateError::Io(anyhow::anyhow!(
1344                    "launcher termination failed: {}",
1345                    e
1346                )));
1347            }
1348        } else {
1349            // Launcher dropped - its Drop is killing all procs anyway.
1350            tracing::debug!(proc_id = %self.proc_id, "terminate(): launcher gone, proc cleanup in progress");
1351        }
1352
1353        // Wait for the exit monitor to observe terminal state.
1354        let st = self.wait_inner().await;
1355        if st.is_exit() {
1356            tracing::info!(proc_id = %self.proc_id, ?st, "terminate(): exited");
1357            Ok(st)
1358        } else {
1359            Err(TerminateError::ChannelClosed)
1360        }
1361    }
1362
1363    /// Forcibly kill the underlying OS process.
1364    ///
1365    /// This bypasses any graceful shutdown semantics and immediately
1366    /// delegates to the launcher's `kill()` method. It is intended as
1367    /// a last-resort termination mechanism when `terminate()` fails or
1368    /// when no grace period is desired.
1369    ///
1370    /// # Behavior
1371    /// - If the process was already in a terminal state, returns
1372    ///   [`TerminateError::AlreadyTerminated`].
1373    /// - Otherwise delegates to the launcher's `kill()` method.
1374    /// - Then waits for the exit monitor to observe a terminal state.
1375    ///
1376    /// # Returns
1377    /// - `Ok(ProcStatus)` if the process exited after kill.
1378    /// - `Err(TerminateError)` if already exited, signaling failed,
1379    ///   or the channel was lost.
1380    async fn kill(&self) -> Result<ProcStatus, TerminateError<Self::TerminalStatus>> {
1381        // If already terminal, return that.
1382        let st0 = self.status();
1383        if st0.is_exit() {
1384            return Err(TerminateError::AlreadyTerminated(st0));
1385        }
1386
1387        // Delegate to launcher for kill.
1388        tracing::info!(proc_id = %self.proc_id, "kill(): delegating to launcher");
1389        let ref_proc_id: ProcAddr = self.proc_id.clone();
1390        if let Some(launcher) = self.launcher.upgrade() {
1391            if let Err(e) = launcher.kill(&ref_proc_id).await {
1392                tracing::warn!(proc_id = %self.proc_id, error=%e, "kill(): launcher kill failed");
1393                return Err(TerminateError::Io(anyhow::anyhow!(
1394                    "launcher kill failed: {}",
1395                    e
1396                )));
1397            }
1398        } else {
1399            // Launcher dropped - its Drop is killing all procs anyway.
1400            tracing::debug!(proc_id = %self.proc_id, "kill(): launcher gone, proc cleanup in progress");
1401        }
1402
1403        // Wait for exit monitor to record terminal status.
1404        let st = self.wait_inner().await;
1405        if st.is_exit() {
1406            Ok(st)
1407        } else {
1408            Err(TerminateError::ChannelClosed)
1409        }
1410    }
1411}
1412
1413/// A specification of the command used to bootstrap procs.
1414#[derive(Debug, Named, Serialize, Deserialize, Clone, Default)]
1415pub struct BootstrapCommand {
1416    pub program: PathBuf,
1417    pub arg0: Option<String>,
1418    pub args: Vec<String>,
1419    pub env: HashMap<String, String>,
1420}
1421wirevalue::register_type!(BootstrapCommand);
1422
1423impl std::hash::Hash for BootstrapCommand {
1424    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1425        self.program.hash(state);
1426        self.arg0.hash(state);
1427        self.args.hash(state);
1428        let mut pairs: Vec<_> = self.env.iter().collect();
1429        pairs.sort();
1430        pairs.hash(state);
1431    }
1432}
1433
1434impl PartialEq for BootstrapCommand {
1435    fn eq(&self, other: &Self) -> bool {
1436        self.program == other.program
1437            && self.arg0 == other.arg0
1438            && self.args == other.args
1439            && self.env == other.env
1440    }
1441}
1442
1443impl Eq for BootstrapCommand {}
1444
1445impl BootstrapCommand {
1446    /// Creates a bootstrap command specification to replicate the
1447    /// invocation of the currently running process.
1448    pub fn current() -> io::Result<Self> {
1449        let mut args: VecDeque<String> = std::env::args().collect();
1450        let arg0 = args.pop_front();
1451
1452        Ok(Self {
1453            program: std::env::current_exe()?,
1454            arg0,
1455            args: args.into(),
1456            env: std::env::vars().collect(),
1457        })
1458    }
1459
1460    /// Create a new `Command` reflecting this bootstrap command
1461    /// configuration.
1462    pub fn new(&self) -> Command {
1463        let mut cmd = Command::new(&self.program);
1464        if let Some(arg0) = &self.arg0 {
1465            cmd.arg0(arg0);
1466        }
1467        for arg in &self.args {
1468            cmd.arg(arg);
1469        }
1470        for (k, v) in &self.env {
1471            cmd.env(k, v);
1472        }
1473        cmd
1474    }
1475
1476    /// Bootstrap command used for testing, invoking the Buck-built
1477    /// `monarch/hyperactor_mesh/bootstrap` binary.
1478    ///
1479    /// Intended for integration tests where we need to spawn real
1480    /// bootstrap processes under proc manager control. Not available
1481    /// outside of test builds.
1482    #[cfg(test)]
1483    #[cfg(fbcode_build)]
1484    pub(crate) fn test() -> Self {
1485        Self {
1486            program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1487            arg0: None,
1488            args: vec![],
1489            env: HashMap::new(),
1490        }
1491    }
1492}
1493
1494impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1495    /// Creates a bootstrap command from the provided path.
1496    fn from(s: T) -> Self {
1497        Self {
1498            program: s.into(),
1499            arg0: None,
1500            args: vec![],
1501            env: HashMap::new(),
1502        }
1503    }
1504}
1505
1506/// Selects which built-in process launcher backend to use for
1507/// spawning procs.
1508///
1509/// This is an internal "implementation choice" control for the
1510/// `ProcLauncher` abstraction: both variants are expected to satisfy
1511/// the same lifecycle contract (launch, observe exit,
1512/// terminate/kill), but they differ in *how* the OS process is
1513/// supervised.
1514///
1515/// Variants:
1516/// - [`LauncherKind::Native`]: spawns and supervises child processes
1517///   directly using `tokio::process` (traditional parent/child
1518///   model).
1519/// - [`LauncherKind::Systemd`]: delegates supervision to `systemd
1520///   --user` by creating transient `.service` units and observing
1521///   lifecycle via D-Bus.
1522///
1523/// Configuration/parsing:
1524/// - The empty string and `"native"` map to [`LauncherKind::Native`]
1525///   (default).
1526/// - `"systemd"` maps to [`LauncherKind::Systemd`].
1527/// - Any other value is rejected as [`io::ErrorKind::InvalidInput`].
1528#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1529pub(crate) enum LauncherKind {
1530    /// Spawn and supervise OS children directly (tokio-based
1531    /// launcher).
1532    Native,
1533    /// Spawn via transient `systemd --user` units and observe via
1534    /// D-Bus.
1535    #[cfg(target_os = "linux")]
1536    Systemd,
1537}
1538
1539impl FromStr for LauncherKind {
1540    type Err = io::Error;
1541
1542    /// Parse a launcher kind from configuration text.
1543    ///
1544    /// Accepted values (case-insensitive, surrounding whitespace
1545    /// ignored):
1546    /// - `""` or `"native"` → [`LauncherKind::Native`]
1547    /// - `"systemd"` → [`LauncherKind::Systemd`] (Linux only)
1548    ///
1549    /// Returns [`io::ErrorKind::InvalidInput`] for any other string.
1550    fn from_str(s: &str) -> Result<Self, Self::Err> {
1551        match s.trim().to_ascii_lowercase().as_str() {
1552            "" | "native" => Ok(Self::Native),
1553            #[cfg(target_os = "linux")]
1554            "systemd" => Ok(Self::Systemd),
1555            other => Err(io::Error::new(
1556                io::ErrorKind::InvalidInput,
1557                format!(
1558                    "unknown proc launcher kind {other:?}; expected 'native'{}",
1559                    if cfg!(target_os = "linux") {
1560                        " or 'systemd'"
1561                    } else {
1562                        ""
1563                    }
1564                ),
1565            )),
1566        }
1567    }
1568}
1569
1570/// Host-side manager for launching and supervising **bootstrap
1571/// processes** (via the `bootstrap` entry point).
1572///
1573/// `BootstrapProcManager` is responsible for:
1574/// - choosing and constructing the configured [`ProcLauncher`]
1575///   backend,
1576/// - preparing the bootstrap command/environment for each proc,
1577/// - tracking proc lifecycle state via [`BootstrapProcHandle`] /
1578///   [`ProcStatus`],
1579/// - providing status/query APIs over the set of active procs.
1580///
1581/// It maintains an async registry mapping [`ProcAddr`] →
1582/// [`BootstrapProcHandle`] for lifecycle queries and exit
1583/// observation.
1584///
1585/// ## Stdio and cleanup
1586///
1587/// Stdio handling and shutdown/cleanup behavior are
1588/// **launcher-dependent**:
1589/// - The native launcher may capture/tail stdout/stderr and manages
1590///   OS child processes directly.
1591/// - The systemd launcher delegates supervision to systemd transient
1592///   units on the user manager and does not expose a PID; stdio is
1593///   managed by systemd.
1594///
1595/// On drop/shutdown, process cleanup is *best-effort* and performed
1596/// via the selected launcher (e.g. direct child termination for
1597/// native, `StopUnit` for systemd).
1598pub struct BootstrapProcManager {
1599    /// The process launcher backend. Initialized on first use via
1600    /// `launcher()`, or explicitly via `set_launcher()`.
1601    launcher: OnceLock<Arc<dyn ProcLauncher>>,
1602
1603    /// The command specification used to bootstrap new processes.
1604    command: BootstrapCommand,
1605
1606    /// Async registry of running children, keyed by [`ProcAddr`]. Holds
1607    /// [`BootstrapProcHandle`]s so callers can query or monitor
1608    /// status.
1609    children: Arc<tokio::sync::Mutex<HashMap<ProcAddr, BootstrapProcHandle>>>,
1610
1611    /// FileMonitor that aggregates logs from all children. None if
1612    /// file monitor creation failed.
1613    file_appender: Option<Arc<crate::logging::FileAppender>>,
1614
1615    /// Directory for storing proc socket files. Procs place their
1616    /// sockets in this directory, so that they can be looked up by
1617    /// other procs for direct transfer.
1618    socket_dir: TempDir,
1619}
1620
1621impl BootstrapProcManager {
1622    /// Construct a new [`BootstrapProcManager`] that will launch
1623    /// procs using the given bootstrap command specification.
1624    ///
1625    /// This is the general entry point when you want to manage procs
1626    /// backed by a specific binary path (e.g. a bootstrap
1627    /// trampoline).
1628    pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1629        let file_appender = if hyperactor_config::global::get(MESH_ENABLE_FILE_CAPTURE) {
1630            match crate::logging::FileAppender::new() {
1631                Some(fm) => {
1632                    tracing::info!("file appender created successfully");
1633                    Some(Arc::new(fm))
1634                }
1635                None => {
1636                    tracing::warn!("failed to create file appender");
1637                    None
1638                }
1639            }
1640        } else {
1641            None
1642        };
1643
1644        Ok(Self {
1645            launcher: OnceLock::new(),
1646            command,
1647            children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1648            file_appender,
1649            socket_dir: runtime_dir()?,
1650        })
1651    }
1652
1653    /// Install a custom launcher.
1654    ///
1655    /// Returns error if already initialized (by prior
1656    /// `set_launcher()` OR by a spawn that triggered default init via
1657    /// `launcher()`).
1658    ///
1659    /// Must be called before any spawn operation that would
1660    /// initialize the default launcher.
1661    pub fn set_launcher(&self, launcher: Arc<dyn ProcLauncher>) -> Result<(), ProcLauncherError> {
1662        self.launcher.set(launcher).map_err(|_| {
1663            ProcLauncherError::Other(
1664                "launcher already initialized; call set_proc_launcher before first spawn".into(),
1665            )
1666        })
1667    }
1668
1669    /// Get the launcher, initializing with the default if not already
1670    /// set.
1671    ///
1672    /// Once this is called, any subsequent calls to `set_launcher()`
1673    /// will fail.
1674    pub fn launcher(&self) -> &Arc<dyn ProcLauncher> {
1675        self.launcher.get_or_init(|| {
1676            let kind_str = hyperactor_config::global::get_cloned(MESH_PROC_LAUNCHER_KIND);
1677            let kind: LauncherKind = kind_str.parse().unwrap_or(LauncherKind::Native);
1678            tracing::info!(kind = ?kind, config_value = %kind_str, "using default proc launcher");
1679            match kind {
1680                LauncherKind::Native => Arc::new(NativeProcLauncher::new()),
1681                #[cfg(target_os = "linux")]
1682                LauncherKind::Systemd => Arc::new(SystemdProcLauncher::new()),
1683            }
1684        })
1685    }
1686
1687    /// The bootstrap command used to launch processes.
1688    pub fn command(&self) -> &BootstrapCommand {
1689        &self.command
1690    }
1691
1692    /// The socket directory, where per-proc Unix sockets are placed.
1693    pub fn socket_dir(&self) -> &Path {
1694        self.socket_dir.path()
1695    }
1696
1697    /// Return the current [`ProcStatus`] for the given [`ProcAddr`], if
1698    /// the proc is known to this manager.
1699    ///
1700    /// This queries the live [`BootstrapProcHandle`] stored in the
1701    /// manager's internal map. It provides an immediate snapshot of
1702    /// lifecycle state (`Starting`, `Running`, `Stopping`, `Stopped`,
1703    /// etc.).
1704    ///
1705    /// Returns `None` if the manager has no record of the proc (e.g.
1706    /// never spawned here, or entry already removed).
1707    pub async fn status(&self, proc_id: &ProcAddr) -> Option<ProcStatus> {
1708        self.children.lock().await.get(proc_id).map(|h| h.status())
1709    }
1710
1711    /// Return a watch receiver for the given proc's status stream,
1712    /// if the proc is known to this manager.
1713    pub async fn watch(
1714        &self,
1715        proc_id: &ProcAddr,
1716    ) -> Option<tokio::sync::watch::Receiver<ProcStatus>> {
1717        self.children.lock().await.get(proc_id).map(|h| h.watch())
1718    }
1719
1720    /// Non-blocking stop: send `StopAll`, then spawn a background task
1721    /// that waits for exit and escalates if needed.
1722    ///
1723    /// The handle stays in `children` so that [`status()`] continues
1724    /// to reflect the live lifecycle (Stopping -> Stopped/Killed/Failed).
1725    /// Idempotent: no-ops if the proc is already stopping or exited.
1726    pub(crate) async fn request_stop(
1727        &self,
1728        cx: &impl context::Actor,
1729        proc: &ProcAddr,
1730        timeout: Duration,
1731        reason: &str,
1732    ) {
1733        let handle = {
1734            let guard = self.children.lock().await;
1735            guard.get(proc).cloned()
1736        };
1737
1738        let Some(handle) = handle else { return };
1739
1740        let status = handle.status();
1741        if status.is_exit() || matches!(status, ProcStatus::Stopping { .. }) {
1742            return;
1743        }
1744
1745        if let Some(agent) = handle.agent_ref() {
1746            let mut agent_port = agent.port();
1747            agent_port.return_undeliverable(false);
1748            let _ = agent_port.post(
1749                cx,
1750                resource::StopAll {
1751                    reason: reason.to_string(),
1752                },
1753            );
1754        }
1755
1756        let _ = handle.mark_stopping();
1757        tokio::spawn(async move {
1758            handle.wait_or_brutally_kill(timeout).await;
1759        });
1760    }
1761
1762    fn spawn_exit_monitor(
1763        &self,
1764        proc_id: ProcAddr,
1765        handle: BootstrapProcHandle,
1766        exit_rx: tokio::sync::oneshot::Receiver<ProcExitResult>,
1767    ) {
1768        tokio::spawn(async move {
1769            // Wait for the launcher to report terminal status.
1770            let exit_result = match exit_rx.await {
1771                Ok(res) => res,
1772                Err(_) => {
1773                    // exit_rx sender was dropped without sending - launcher error.
1774                    let _ = handle.mark_failed("exit_rx sender dropped unexpectedly");
1775                    tracing::error!(
1776                        name = "ProcStatus",
1777                        status = "Exited::ChannelDropped",
1778                        %proc_id,
1779                        "exit channel closed without result"
1780                    );
1781                    return;
1782                }
1783            };
1784
1785            // Collect stderr tail from StreamFwder if we captured stdio.
1786            // The launcher may also provide stderr_tail in exit_result;
1787            // prefer StreamFwder's tail if available (more complete).
1788            let mut stderr_tail: Vec<String> = Vec::new();
1789            let (stdout_mon, stderr_mon) = handle.take_stream_monitors();
1790
1791            if let Some(t) = stderr_mon {
1792                let (lines, _bytes) = t.abort().await;
1793                stderr_tail = lines;
1794            }
1795            if let Some(t) = stdout_mon {
1796                let (_lines, _bytes) = t.abort().await;
1797            }
1798
1799            // Fall back to launcher-provided tail if we didn't capture.
1800            if stderr_tail.is_empty()
1801                && let Some(tail) = exit_result.stderr_tail
1802            {
1803                stderr_tail = tail;
1804            }
1805
1806            let tail_str = if stderr_tail.is_empty() {
1807                None
1808            } else {
1809                Some(stderr_tail.join("\n"))
1810            };
1811
1812            match exit_result.kind {
1813                ProcExitKind::Exited { code } => {
1814                    let _ = handle.mark_stopped(code, stderr_tail);
1815                    tracing::info!(
1816                        name = "ProcStatus",
1817                        status = "Exited::ExitWithCode",
1818                        %proc_id,
1819                        exit_code = code,
1820                        tail = tail_str,
1821                        "proc exited with code {code}"
1822                    );
1823                }
1824                ProcExitKind::Signaled {
1825                    signal,
1826                    core_dumped,
1827                } => {
1828                    let _ = handle.mark_killed(signal, core_dumped);
1829                    tracing::info!(
1830                        name = "ProcStatus",
1831                        status = "Exited::KilledBySignal",
1832                        %proc_id,
1833                        tail = tail_str,
1834                        "killed by signal {signal}"
1835                    );
1836                }
1837                ProcExitKind::Failed { reason } => {
1838                    let _ = handle.mark_failed(&reason);
1839                    tracing::info!(
1840                        name = "ProcStatus",
1841                        status = "Exited::Failed",
1842                        %proc_id,
1843                        tail = tail_str,
1844                        "proc failed: {reason}"
1845                    );
1846                }
1847            }
1848        });
1849    }
1850}
1851
1852pub use crate::proc_launcher::ProcBind;
1853
1854/// The configuration used for bootstrapped procs.
1855pub struct BootstrapProcConfig {
1856    /// The proc's create rank.
1857    pub create_rank: usize,
1858
1859    /// Config values to set on the spawned proc's global config,
1860    /// at the `ClientOverride` layer.
1861    pub client_config_override: Attrs,
1862
1863    /// Optional per-process CPU/NUMA binding configuration.
1864    /// When set, the bootstrap command is wrapped with `numactl`
1865    /// (on NUMA systems) or `taskset` (Linux fallback) before launch.
1866    pub proc_bind: Option<ProcBind>,
1867    /// Optional bootstrap command override. When set, this command is used
1868    /// to spawn the proc instead of the manager's default bootstrap command.
1869    pub bootstrap_command: Option<BootstrapCommand>,
1870}
1871
1872#[async_trait]
1873impl ProcManager for BootstrapProcManager {
1874    type Handle = BootstrapProcHandle;
1875
1876    type Config = BootstrapProcConfig;
1877
1878    /// Return the [`ChannelTransport`] used by this proc manager.
1879    ///
1880    /// For `BootstrapProcManager` this is always
1881    /// [`ChannelTransport::Unix`], since all procs are spawned
1882    /// locally on the same host and communicate over Unix domain
1883    /// sockets.
1884    fn transport(&self) -> ChannelTransport {
1885        ChannelTransport::Unix
1886    }
1887
1888    /// Launch a new proc under this [`BootstrapProcManager`].
1889    ///
1890    /// Spawns the configured bootstrap binary (`self.program`) in a
1891    /// fresh child process. The environment is populated with
1892    /// variables that describe the bootstrap context — most
1893    /// importantly `HYPERACTOR_MESH_BOOTSTRAP_MODE`, which carries a
1894    /// base64-encoded JSON [`Bootstrap::Proc`] payload (proc id,
1895    /// backend addr, callback addr, optional config snapshot).
1896    /// Additional variables like `BOOTSTRAP_LOG_CHANNEL` are also set
1897    /// up for logging and control.
1898    ///
1899    /// Responsibilities performed here:
1900    /// - Create a one-shot callback channel so the child can confirm
1901    ///   successful bootstrap and return its mailbox address plus agent
1902    ///   reference.
1903    /// - Spawn the OS process with stdout/stderr piped.
1904    /// - Stamp the new [`BootstrapProcHandle`] as
1905    ///   [`ProcStatus::Running`] once a PID is observed.
1906    /// - Wire stdout/stderr pipes into local writers and forward them
1907    ///   over the logging channel (`BOOTSTRAP_LOG_CHANNEL`).
1908    /// - Insert the handle into the manager's children map and start
1909    ///   an exit monitor to track process termination.
1910    ///
1911    /// Returns a [`BootstrapProcHandle`] that exposes the child
1912    /// process's lifecycle (status, wait/ready, termination). Errors
1913    /// are surfaced as [`HostError`].
1914    #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string()))]
1915    async fn spawn(
1916        &self,
1917        proc_id: ProcAddr,
1918        backend_addr: ChannelAddr,
1919        config: BootstrapProcConfig,
1920    ) -> Result<Self::Handle, HostError> {
1921        let (callback_addr, mut callback_rx) = channel::serve::<(ChannelAddr, ActorRef<ProcAgent>)>(
1922            ChannelAddr::any(ChannelTransport::Unix),
1923        )?;
1924
1925        // Decide whether we need to capture stdio.
1926        let overrides = &config.client_config_override;
1927        let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
1928        let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
1929        let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
1930        let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1931
1932        let mode = Bootstrap::Proc {
1933            proc_id: proc_id.clone(),
1934            backend_addr,
1935            callback_addr,
1936            socket_dir_path: self.socket_dir.path().to_owned(),
1937            config: Some(config.client_config_override.clone()),
1938        };
1939
1940        // Build LaunchOptions for the launcher.
1941        let bootstrap_payload = mode
1942            .to_env_safe_string()
1943            .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?;
1944
1945        let opts = LaunchOptions {
1946            bootstrap_payload,
1947            process_name: format_process_name(&proc_id.clone()),
1948            command: config
1949                .bootstrap_command
1950                .as_ref()
1951                .unwrap_or(&self.command)
1952                .clone(),
1953            want_stdio: need_stdio,
1954            tail_lines: tail_size,
1955            log_channel: if enable_forwarding {
1956                Some(ChannelAddr::any(ChannelTransport::Unix))
1957            } else {
1958                None
1959            },
1960            proc_bind: config.proc_bind.clone(),
1961        };
1962
1963        // Launch via the configured launcher backend.
1964        tracing::info!(proc_id = %proc_id, "launching proc with opts={opts:?}");
1965        let ref_proc_id: ProcAddr = proc_id.clone();
1966        let launch_result = self
1967            .launcher()
1968            .launch(&ref_proc_id, opts.clone())
1969            .await
1970            .map_err(|e| {
1971                let io_err = match e {
1972                    ProcLauncherError::Launch(io_err) => io_err,
1973                    other => std::io::Error::other(other.to_string()),
1974                };
1975                HostError::ProcessSpawnFailure(
1976                    proc_id.clone(),
1977                    format!("{:?}", opts.command),
1978                    io_err,
1979                )
1980            })?;
1981
1982        // Wire up StreamFwders if stdio was captured.
1983        let (out_fwder, err_fwder) = match launch_result.stdio {
1984            StdioHandling::Captured { stdout, stderr } => {
1985                let (file_stdout, file_stderr) = if enable_file_capture {
1986                    match self.file_appender.as_deref() {
1987                        Some(fm) => (
1988                            Some(fm.addr_for(OutputTarget::Stdout)),
1989                            Some(fm.addr_for(OutputTarget::Stderr)),
1990                        ),
1991                        None => {
1992                            tracing::warn!("enable_file_capture=true but no FileAppender");
1993                            (None, None)
1994                        }
1995                    }
1996                } else {
1997                    (None, None)
1998                };
1999
2000                let out = StreamFwder::start(
2001                    stdout,
2002                    file_stdout,
2003                    OutputTarget::Stdout,
2004                    tail_size,
2005                    opts.log_channel.clone(),
2006                    &ref_proc_id,
2007                    config.create_rank,
2008                );
2009                let err = StreamFwder::start(
2010                    stderr,
2011                    file_stderr,
2012                    OutputTarget::Stderr,
2013                    tail_size,
2014                    opts.log_channel.clone(),
2015                    &ref_proc_id,
2016                    config.create_rank,
2017                );
2018                (Some(out), Some(err))
2019            }
2020            StdioHandling::Inherited | StdioHandling::ManagedByLauncher => {
2021                if !need_stdio {
2022                    tracing::info!(
2023                        %proc_id, enable_forwarding, enable_file_capture, tail_size,
2024                        "child stdio NOT captured (forwarding/file_capture/tail all disabled)"
2025                    );
2026                }
2027                (None, None)
2028            }
2029        };
2030
2031        // Create handle with launcher reference for terminate/kill delegation.
2032        let handle = BootstrapProcHandle::new(proc_id.clone(), Arc::downgrade(self.launcher()));
2033        handle.mark_running(launch_result.started_at);
2034        handle.set_stream_monitors(out_fwder, err_fwder);
2035
2036        // Retain handle for lifecycle management.
2037        {
2038            let mut children = self.children.lock().await;
2039            children.insert(proc_id.clone(), handle.clone());
2040        }
2041
2042        // Kick off an exit monitor that updates ProcStatus when the
2043        // launcher reports terminal status.
2044        self.spawn_exit_monitor(proc_id.clone(), handle.clone(), launch_result.exit_rx);
2045
2046        // Handle callback from child proc when it confirms bootstrap.
2047        let h = handle.clone();
2048        tokio::spawn(async move {
2049            match callback_rx.recv().await {
2050                Ok((addr, agent)) => {
2051                    let _ = h.mark_ready(addr, agent);
2052                }
2053                Err(e) => {
2054                    // Child never called back; record failure.
2055                    let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
2056                }
2057            }
2058        });
2059
2060        // Callers do `handle.read().await` for mesh readiness.
2061        Ok(handle)
2062    }
2063}
2064
2065#[async_trait]
2066impl SingleTerminate for BootstrapProcManager {
2067    /// Attempt to gracefully terminate one child procs managed by
2068    /// this `BootstrapProcManager`.
2069    ///
2070    /// Each child handle is asked to `terminate(timeout)`, which
2071    /// sends SIGTERM, waits up to the deadline, and escalates to
2072    /// SIGKILL if necessary. Termination is attempted concurrently,
2073    /// with at most `max_in_flight` tasks running at once.
2074    ///
2075    /// Logs a warning for each failure.
2076    async fn terminate_proc(
2077        &self,
2078        cx: &impl context::Actor,
2079        proc: &ProcAddr,
2080        timeout: Duration,
2081        reason: &str,
2082    ) -> Result<(Vec<ActorAddr>, Vec<ActorAddr>), anyhow::Error> {
2083        // Snapshot to avoid holding the lock across awaits.
2084        let proc_handle: Option<BootstrapProcHandle> = {
2085            let mut guard = self.children.lock().await;
2086            guard.remove(proc)
2087        };
2088
2089        if let Some(h) = proc_handle {
2090            h.terminate(cx, timeout, reason)
2091                .await
2092                .map(|_| (Vec::new(), Vec::new()))
2093                .map_err(|e| e.into())
2094        } else {
2095            Err(anyhow::anyhow!("proc doesn't exist: {}", proc))
2096        }
2097    }
2098}
2099
2100#[async_trait]
2101impl BulkTerminate for BootstrapProcManager {
2102    /// Attempt to gracefully terminate all child procs managed by
2103    /// this `BootstrapProcManager`.
2104    ///
2105    /// Each child handle is asked to `terminate(timeout)`, which
2106    /// sends SIGTERM, waits up to the deadline, and escalates to
2107    /// SIGKILL if necessary. Termination is attempted concurrently,
2108    /// with at most `max_in_flight` tasks running at once.
2109    ///
2110    /// Returns a [`TerminateSummary`] with counts of how many procs
2111    /// were attempted, how many successfully terminated (including
2112    /// those that were already terminal), and how many failed.
2113    ///
2114    /// Logs a warning for each failure.
2115    async fn terminate_all(
2116        &self,
2117        cx: &impl context::Actor,
2118        timeout: Duration,
2119        max_in_flight: usize,
2120        reason: &str,
2121    ) -> TerminateSummary {
2122        // Drain the children list to avoid holding the lock across awaits and
2123        // avoid subsequent calls from trying to terminate again.
2124        let handles: Vec<BootstrapProcHandle> = {
2125            let mut guard = self.children.lock().await;
2126            guard.drain().map(|(_, v)| v).collect()
2127        };
2128
2129        let attempted = handles.len();
2130        let mut ok = 0usize;
2131
2132        let results = stream::iter(handles.into_iter().map(|h| async move {
2133            match h.terminate(cx, timeout, reason).await {
2134                Ok(_) | Err(TerminateError::AlreadyTerminated(_)) => {
2135                    // Treat "already terminal" as success.
2136                    true
2137                }
2138                Err(e) => {
2139                    tracing::warn!(error=%e, "terminate_all: failed to terminate child");
2140                    false
2141                }
2142            }
2143        }))
2144        .buffer_unordered(max_in_flight.max(1))
2145        .collect::<Vec<bool>>()
2146        .await;
2147
2148        for r in results {
2149            if r {
2150                ok += 1;
2151            }
2152        }
2153
2154        TerminateSummary {
2155            attempted,
2156            ok,
2157            failed: attempted.saturating_sub(ok),
2158        }
2159    }
2160}
2161
2162/// Entry point to processes managed by hyperactor_mesh. Any process that is part
2163/// of a hyperactor_mesh program should call [`bootstrap`], which then configures
2164/// the process according to how it is invoked.
2165///
2166/// If bootstrap returns any error, it is defunct from the point of view of hyperactor_mesh,
2167/// and the process should likely exit:
2168///
2169/// ```ignore
2170/// let err = hyperactor_mesh::bootstrap().await;
2171/// tracing::error("could not bootstrap mesh process: {}", err);
2172/// std::process::exit(1);
2173/// ```
2174///
2175/// Use [`bootstrap_or_die`] to implement this behavior directly.
2176/// Else if the bootstrap returns Ok, the process has cleaned up successfully and
2177/// should exit the "main" of the program.
2178pub async fn bootstrap() -> anyhow::Result<i32> {
2179    let Some(boot) = Bootstrap::get_from_env()? else {
2180        anyhow::bail!(
2181            "bootstrap: no bootstrap mode configured (HYPERACTOR_MESH_BOOTSTRAP_MODE unset)"
2182        );
2183    };
2184    boot.bootstrap().await
2185}
2186
2187/// A variant of [`bootstrap`] that logs the error and exits the process
2188/// if bootstrapping fails.
2189pub async fn bootstrap_or_die() -> ! {
2190    match bootstrap().await {
2191        Ok(exit_code) => std::process::exit(exit_code),
2192        Err(err) => {
2193            let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
2194            tracing::error!("failed to bootstrap mesh process: {}", err);
2195            std::process::exit(1);
2196        }
2197    }
2198}
2199
2200#[derive(enum_as_inner::EnumAsInner)]
2201enum DebugSink {
2202    File(std::fs::File),
2203    Sink,
2204}
2205
2206impl DebugSink {
2207    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2208        match self {
2209            DebugSink::File(f) => f.write(buf),
2210            DebugSink::Sink => Ok(buf.len()),
2211        }
2212    }
2213    fn flush(&mut self) -> io::Result<()> {
2214        match self {
2215            DebugSink::File(f) => f.flush(),
2216            DebugSink::Sink => Ok(()),
2217        }
2218    }
2219}
2220
2221fn debug_sink() -> &'static Mutex<DebugSink> {
2222    static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
2223    DEBUG_SINK.get_or_init(|| {
2224        let debug_path = {
2225            let mut p = std::env::temp_dir();
2226            if let Ok(user) = std::env::var("USER") {
2227                p.push(user);
2228            }
2229            std::fs::create_dir_all(&p).ok();
2230            p.push("monarch-bootstrap-debug.log");
2231            p
2232        };
2233        let sink = if debug_path.exists() {
2234            match OpenOptions::new()
2235                .append(true)
2236                .create(true)
2237                .open(debug_path.clone())
2238            {
2239                Ok(f) => DebugSink::File(f),
2240                Err(_e) => {
2241                    eprintln!(
2242                        "failed to open {} for bootstrap debug logging",
2243                        debug_path.display()
2244                    );
2245                    DebugSink::Sink
2246                }
2247            }
2248        } else {
2249            DebugSink::Sink
2250        };
2251        Mutex::new(sink)
2252    })
2253}
2254
2255/// If true, send `Debug` messages to stderr.
2256const DEBUG_TO_STDERR: bool = false;
2257
2258/// A bootstrap specific debug writer. If the file /tmp/monarch-bootstrap-debug.log
2259/// exists, then the writer's destination is that file; otherwise it discards all writes.
2260struct Debug;
2261
2262impl Debug {
2263    fn is_active() -> bool {
2264        DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
2265    }
2266}
2267
2268impl Write for Debug {
2269    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2270        let res = debug_sink().lock().unwrap().write(buf);
2271        if DEBUG_TO_STDERR {
2272            let n = match res {
2273                Ok(n) => n,
2274                Err(_) => buf.len(),
2275            };
2276            let _ = io::stderr().write_all(&buf[..n]);
2277        }
2278
2279        res
2280    }
2281    fn flush(&mut self) -> io::Result<()> {
2282        let res = debug_sink().lock().unwrap().flush();
2283        if DEBUG_TO_STDERR {
2284            let _ = io::stderr().flush();
2285        }
2286        res
2287    }
2288}
2289
2290/// Build the bind/dial [`ChannelAddr`] for a local proc within `socket_dir`.
2291pub(crate) fn local_proc_addr(
2292    socket_dir: &Path,
2293    proc_id: &hyperactor::id::ProcId,
2294) -> anyhow::Result<(ChannelAddr, PathBuf)> {
2295    let path = proc_id.to_path_elem(socket_dir);
2296    let addr = std::os::unix::net::SocketAddr::from_pathname(path.clone())
2297        .with_context(|| {
2298            format!(
2299                "constructing unix socket address for proc {proc_id} \
2300            at {} ({} bytes); path must fit within SUN_LEN \
2301             (108 on Linux, 104 on macOS)",
2302                path.display(),
2303                path.as_os_str().len()
2304            )
2305        })?
2306        .into();
2307    Ok((addr, path))
2308}
2309
2310/// Create a new runtime [`TempDir`]. The directory is created in
2311/// `$XDG_RUNTIME_DIR` if set and the directory exists, otherwise
2312/// falling back to the system tempdir.
2313fn runtime_dir() -> io::Result<TempDir> {
2314    if let Some(runtime_dir) = std::env::var_os("XDG_RUNTIME_DIR") {
2315        let path = PathBuf::from(runtime_dir);
2316        if path.is_dir() {
2317            return tempfile::tempdir_in(path);
2318        }
2319    }
2320    tempfile::tempdir()
2321}
2322
2323#[cfg(test)]
2324mod tests {
2325    use std::path::PathBuf;
2326
2327    use hyperactor::RemoteSpawn;
2328    use hyperactor::channel::ChannelAddr;
2329    use hyperactor::channel::ChannelTransport;
2330    use hyperactor::channel::TcpMode;
2331    use hyperactor::testing::ids::test_proc_id;
2332    use hyperactor::testing::ids::test_proc_id_with_addr;
2333    use hyperactor_config::Flattrs;
2334
2335    use super::*;
2336
2337    #[test]
2338    fn test_bootstrap_mode_env_string_none_config_proc() {
2339        let value = Bootstrap::Proc {
2340            proc_id: test_proc_id("foo_0"),
2341            backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
2342            callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2343            socket_dir_path: PathBuf::from("notexist"),
2344            config: None,
2345        };
2346
2347        let safe = value.to_env_safe_string().unwrap();
2348        let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2349
2350        // Re-encode and compare: deterministic round-trip of the
2351        // wire format.
2352        let safe2 = round.to_env_safe_string().unwrap();
2353        assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2354
2355        // Sanity: the decoded variant is what we expect.
2356        match round {
2357            Bootstrap::Proc { config: None, .. } => {}
2358            other => panic!("expected Proc with None config, got {:?}", other),
2359        }
2360    }
2361
2362    #[test]
2363    fn test_bootstrap_mode_env_string_none_config_host() {
2364        let value = Bootstrap::Host {
2365            addr: ChannelAddr::any(ChannelTransport::Unix),
2366            command: None,
2367            config: None,
2368            exit_on_shutdown: false,
2369        };
2370
2371        let safe = value.to_env_safe_string().unwrap();
2372        let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2373
2374        // Wire-format round-trip should be identical.
2375        let safe2 = round.to_env_safe_string().unwrap();
2376        assert_eq!(safe, safe2);
2377
2378        // Sanity: decoded variant is Host with None config.
2379        match round {
2380            Bootstrap::Host { config: None, .. } => {}
2381            other => panic!("expected Host with None config, got {:?}", other),
2382        }
2383    }
2384
2385    #[test]
2386    fn test_bootstrap_mode_env_string_invalid() {
2387        // Not valid base64
2388        assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2389    }
2390
2391    #[test]
2392    fn test_bootstrap_config_snapshot_roundtrip() {
2393        // Build a small, distinctive Attrs snapshot.
2394        let mut attrs = Attrs::new();
2395        attrs[MESH_TAIL_LOG_LINES] = 123;
2396        attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2397
2398        let socket_dir = runtime_dir().unwrap();
2399
2400        // Proc case
2401        {
2402            let original = Bootstrap::Proc {
2403                proc_id: test_proc_id("foo_42"),
2404                backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2405                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2406                config: Some(attrs.clone()),
2407                socket_dir_path: socket_dir.path().to_owned(),
2408            };
2409            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2410            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2411            match &decoded {
2412                Bootstrap::Proc { config, .. } => {
2413                    let cfg = config.as_ref().expect("expected Some(attrs)");
2414                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2415                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2416                }
2417                other => panic!("unexpected variant after roundtrip: {:?}", other),
2418            }
2419        }
2420
2421        // Host case
2422        {
2423            let original = Bootstrap::Host {
2424                addr: ChannelAddr::any(ChannelTransport::Unix),
2425                command: None,
2426                config: Some(attrs.clone()),
2427                exit_on_shutdown: false,
2428            };
2429            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2430            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2431            match &decoded {
2432                Bootstrap::Host { config, .. } => {
2433                    let cfg = config.as_ref().expect("expected Some(attrs)");
2434                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2435                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2436                }
2437                other => panic!("unexpected variant after roundtrip: {:?}", other),
2438            }
2439        }
2440    }
2441
2442    #[tokio::test]
2443    async fn test_v1_child_logging() {
2444        use hyperactor::channel;
2445        use hyperactor::mailbox::BoxedMailboxSender;
2446        use hyperactor::mailbox::DialMailboxRouter;
2447        use hyperactor::mailbox::MailboxServer;
2448        use hyperactor::proc::Proc;
2449
2450        use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2451        use crate::logging::LogClientActor;
2452        use crate::logging::LogClientMessageClient;
2453        use crate::logging::LogForwardActor;
2454        use crate::logging::LogMessage;
2455        use crate::logging::OutputTarget;
2456        use crate::logging::test_tap;
2457
2458        let router = DialMailboxRouter::new();
2459        let (proc_addr, proc_rx) =
2460            channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2461        let proc = Proc::configured(
2462            test_proc_id("client_0"),
2463            BoxedMailboxSender::new(router.clone()),
2464        );
2465        proc.clone().serve(proc_rx);
2466        let proc_ref: ProcAddr = test_proc_id("client_0");
2467        router.bind(proc_ref, proc_addr.clone());
2468        let (client, _handle) = proc.client("client").unwrap();
2469
2470        let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2471        test_tap::install(tap_tx);
2472
2473        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2474        // SAFETY: unit-test scoped env var
2475        unsafe {
2476            std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2477        }
2478
2479        // Spawn the log client and disable aggregation (immediate
2480        // print + tap push).
2481        let log_client_actor = LogClientActor::new((), Flattrs::default()).await.unwrap();
2482        let log_client: ActorRef<LogClientActor> =
2483            proc.spawn("log_client", log_client_actor).unwrap().bind();
2484        log_client.set_aggregate(&client, None).await.unwrap();
2485
2486        // Spawn the forwarder in this proc (it will serve
2487        // BOOTSTRAP_LOG_CHANNEL).
2488        let log_forwarder_actor = LogForwardActor::new(log_client.clone(), Flattrs::default())
2489            .await
2490            .unwrap();
2491        let _log_forwarder: ActorRef<LogForwardActor> = proc
2492            .spawn("log_forwarder", log_forwarder_actor)
2493            .unwrap()
2494            .bind();
2495
2496        // Dial the channel but don't post until we know the forwarder
2497        // is receiving.
2498        let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2499
2500        // Send a fake log message as if it came from the proc
2501        // manager's writer.
2502        tx.post(LogMessage::Log {
2503            hostname: "testhost".into(),
2504            proc_id: "testproc[0]".into(),
2505            output_target: OutputTarget::Stdout,
2506            payload: wirevalue::Any::serialize(&"hello from child".to_string()).unwrap(),
2507        });
2508
2509        // Assert we see it via the tap.
2510        let line = tokio::time::timeout(Duration::from_secs(2), tap_rx.recv())
2511            .await
2512            .expect("timed out waiting for log line")
2513            .expect("tap channel closed unexpectedly");
2514        assert!(
2515            line.contains("hello from child"),
2516            "log line did not appear via LogClientActor; got: {line}"
2517        );
2518    }
2519
2520    mod proc_handle {
2521
2522        use std::sync::Arc;
2523        use std::time::Duration;
2524
2525        use async_trait::async_trait;
2526        use hyperactor::ActorRef;
2527        use hyperactor::ProcAddr;
2528        use hyperactor::testing::ids::test_proc_id;
2529
2530        use super::super::*;
2531        use crate::host::ProcHandle;
2532        use crate::proc_launcher::LaunchOptions;
2533        use crate::proc_launcher::LaunchResult;
2534        use crate::proc_launcher::ProcLauncher;
2535        use crate::proc_launcher::ProcLauncherError;
2536
2537        /// A test launcher that panics on any method call.
2538        ///
2539        /// This is used by unit tests that only exercise status
2540        /// transitions on `BootstrapProcHandle` without actually
2541        /// launching or terminating processes. If any launcher method
2542        /// is called, the test will panic—indicating an unexpected
2543        /// code path.
2544        struct TestProcLauncher;
2545
2546        #[async_trait]
2547        impl ProcLauncher for TestProcLauncher {
2548            async fn launch(
2549                &self,
2550                _proc_id: &ProcAddr,
2551                _opts: LaunchOptions,
2552            ) -> Result<LaunchResult, ProcLauncherError> {
2553                panic!("TestProcLauncher::launch should not be called in unit tests");
2554            }
2555
2556            async fn terminate(
2557                &self,
2558                _proc_id: &ProcAddr,
2559                _timeout: Duration,
2560            ) -> Result<(), ProcLauncherError> {
2561                panic!("TestProcLauncher::terminate should not be called in unit tests");
2562            }
2563
2564            async fn kill(&self, _proc_id: &ProcAddr) -> Result<(), ProcLauncherError> {
2565                panic!("TestProcLauncher::kill should not be called in unit tests");
2566            }
2567        }
2568
2569        // Helper: build a ProcHandle for state-transition unit tests.
2570        //
2571        // This creates a handle with a no-op test launcher. The
2572        // launcher will panic if any of its methods are called,
2573        // ensuring tests only exercise status transitions and not
2574        // actual process lifecycle.
2575        fn handle_for_test() -> BootstrapProcHandle {
2576            let proc_id: ProcAddr = test_proc_id("0");
2577            let launcher: Arc<dyn ProcLauncher> = Arc::new(TestProcLauncher);
2578            BootstrapProcHandle::new(proc_id, Arc::downgrade(&launcher))
2579        }
2580
2581        #[tokio::test]
2582        async fn starting_to_running_ok() {
2583            let h = handle_for_test();
2584            assert!(matches!(h.status(), ProcStatus::Starting));
2585            let child_started_at = std::time::SystemTime::now();
2586            assert!(h.mark_running(child_started_at));
2587            match h.status() {
2588                ProcStatus::Running { started_at } => {
2589                    assert_eq!(started_at, child_started_at);
2590                }
2591                other => panic!("expected Running, got {other:?}"),
2592            }
2593        }
2594
2595        #[tokio::test]
2596        async fn running_to_stopping_to_stopped_ok() {
2597            let h = handle_for_test();
2598            let child_started_at = std::time::SystemTime::now();
2599            assert!(h.mark_running(child_started_at));
2600            assert!(h.mark_stopping());
2601            assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2602            assert!(h.mark_stopped(0, Vec::new()));
2603            assert!(matches!(
2604                h.status(),
2605                ProcStatus::Stopped { exit_code: 0, .. }
2606            ));
2607        }
2608
2609        #[tokio::test]
2610        async fn running_to_killed_ok() {
2611            let h = handle_for_test();
2612            let child_started_at = std::time::SystemTime::now();
2613            assert!(h.mark_running(child_started_at));
2614            assert!(h.mark_killed(9, true));
2615            assert!(matches!(
2616                h.status(),
2617                ProcStatus::Killed {
2618                    signal: 9,
2619                    core_dumped: true
2620                }
2621            ));
2622        }
2623
2624        #[tokio::test]
2625        async fn running_to_failed_ok() {
2626            let h = handle_for_test();
2627            let child_started_at = std::time::SystemTime::now();
2628            assert!(h.mark_running(child_started_at));
2629            assert!(h.mark_failed("bootstrap error"));
2630            match h.status() {
2631                ProcStatus::Failed { reason } => {
2632                    assert_eq!(reason, "bootstrap error");
2633                }
2634                other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2635            }
2636        }
2637
2638        #[tokio::test]
2639        async fn illegal_transitions_are_rejected() {
2640            let h = handle_for_test();
2641            let child_started_at = std::time::SystemTime::now();
2642            // Starting -> Running is fine; second Running should be rejected.
2643            assert!(h.mark_running(child_started_at));
2644            assert!(!h.mark_running(std::time::SystemTime::now()));
2645            assert!(matches!(h.status(), ProcStatus::Running { .. }));
2646            // Once Stopped, we can't go to Running/Killed/Failed/etc.
2647            assert!(h.mark_stopping());
2648            assert!(h.mark_stopped(0, Vec::new()));
2649            assert!(!h.mark_running(child_started_at));
2650            assert!(!h.mark_killed(9, false));
2651            assert!(!h.mark_failed("nope"));
2652
2653            assert!(matches!(
2654                h.status(),
2655                ProcStatus::Stopped { exit_code: 0, .. }
2656            ));
2657        }
2658
2659        #[tokio::test]
2660        async fn transitions_from_ready_are_legal() {
2661            let h = handle_for_test();
2662            let addr = ChannelAddr::any(ChannelTransport::Unix);
2663            // Mark Running.
2664            let t0 = std::time::SystemTime::now();
2665            assert!(h.mark_running(t0));
2666            // Build a consistent AgentRef for Ready using the
2667            // handle's ProcAddr.
2668            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_addr(&h);
2669            let actor_id = proc_id.actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME);
2670            let agent_ref: ActorRef<ProcAgent> = ActorRef::attest(actor_id);
2671            // Ready -> Stopping -> Stopped should be legal.
2672            assert!(h.mark_ready(addr, agent_ref));
2673            assert!(h.mark_stopping());
2674            assert!(h.mark_stopped(0, Vec::new()));
2675        }
2676
2677        #[tokio::test]
2678        async fn ready_to_killed_is_legal() {
2679            let h = handle_for_test();
2680            let addr = ChannelAddr::any(ChannelTransport::Unix);
2681            // Starting -> Running
2682            let t0 = std::time::SystemTime::now();
2683            assert!(h.mark_running(t0));
2684            // Build a consistent AgentRef for Ready using the
2685            // handle's ProcAddr.
2686            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_addr(&h);
2687            let actor_id = proc_id.actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME);
2688            let agent: ActorRef<ProcAgent> = ActorRef::attest(actor_id);
2689            // Running -> Ready
2690            assert!(h.mark_ready(addr, agent));
2691            // Ready -> Killed
2692            assert!(h.mark_killed(9, false));
2693        }
2694
2695        #[tokio::test]
2696        async fn mark_failed_from_stopping_is_allowed() {
2697            let h = handle_for_test();
2698
2699            // Drive Starting -> Stopping.
2700            assert!(h.mark_stopping(), "precondition: to Stopping");
2701
2702            // Now allow Stopping -> Failed.
2703            assert!(
2704                h.mark_failed("boom"),
2705                "mark_failed() should succeed from Stopping"
2706            );
2707            match h.status() {
2708                ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2709                other => panic!("expected Failed(\"boom\"), got {other:?}"),
2710            }
2711        }
2712    }
2713
2714    /// A test launcher that panics on any method call.
2715    ///
2716    /// This is used by unit tests that only exercise status
2717    /// transitions on `BootstrapProcHandle` without actually
2718    /// launching or terminating processes.
2719    struct TestLauncher;
2720
2721    #[async_trait::async_trait]
2722    impl crate::proc_launcher::ProcLauncher for TestLauncher {
2723        async fn launch(
2724            &self,
2725            _proc_id: &ProcAddr,
2726            _opts: crate::proc_launcher::LaunchOptions,
2727        ) -> Result<crate::proc_launcher::LaunchResult, crate::proc_launcher::ProcLauncherError>
2728        {
2729            panic!("TestLauncher::launch should not be called in unit tests");
2730        }
2731
2732        async fn terminate(
2733            &self,
2734            _proc_id: &ProcAddr,
2735            _timeout: std::time::Duration,
2736        ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
2737            panic!("TestLauncher::terminate should not be called in unit tests");
2738        }
2739
2740        async fn kill(
2741            &self,
2742            _proc_id: &ProcAddr,
2743        ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
2744            panic!("TestLauncher::kill should not be called in unit tests");
2745        }
2746    }
2747
2748    fn test_handle(proc_id: ProcAddr) -> BootstrapProcHandle {
2749        let launcher: std::sync::Arc<dyn crate::proc_launcher::ProcLauncher> =
2750            std::sync::Arc::new(TestLauncher);
2751        BootstrapProcHandle::new(proc_id, std::sync::Arc::downgrade(&launcher))
2752    }
2753
2754    #[tokio::test]
2755    async fn watch_notifies_on_status_changes() {
2756        let proc_id = test_proc_id("1");
2757        let handle = test_handle(proc_id);
2758        let mut rx = handle.watch();
2759
2760        // Starting -> Running
2761        let now = std::time::SystemTime::now();
2762        assert!(handle.mark_running(now));
2763        rx.changed().await.ok(); // Observe the transition.
2764        match &*rx.borrow() {
2765            ProcStatus::Running { started_at } => {
2766                assert_eq!(*started_at, now);
2767            }
2768            s => panic!("expected Running, got {s:?}"),
2769        }
2770
2771        // Running -> Stopped
2772        assert!(handle.mark_stopped(0, Vec::new()));
2773        rx.changed().await.ok(); // Observe the transition.
2774        assert!(matches!(
2775            &*rx.borrow(),
2776            ProcStatus::Stopped { exit_code: 0, .. }
2777        ));
2778    }
2779
2780    #[tokio::test]
2781    async fn ready_errs_if_process_exits_before_running() {
2782        let proc_id =
2783            test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "early-exit");
2784        let handle = test_handle(proc_id);
2785
2786        // Simulate the exit monitor doing its job directly here.
2787        // (Equivalent outcome: terminal state before Running.)
2788        assert!(handle.mark_stopped(7, Vec::new()));
2789
2790        // `ready()` should return Err with the terminal status.
2791        match handle.ready_inner().await {
2792            Ok(()) => panic!("ready() unexpectedly succeeded"),
2793            Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
2794                assert_eq!(exit_code, 7)
2795            }
2796            Err(other) => panic!("expected Stopped(7), got {other:?}"),
2797        }
2798    }
2799
2800    #[tokio::test]
2801    async fn status_unknown_proc_is_none() {
2802        let manager = BootstrapProcManager::new(BootstrapCommand {
2803            program: PathBuf::from("/bin/true"),
2804            ..Default::default()
2805        })
2806        .unwrap();
2807        let unknown = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "nope");
2808        assert!(manager.status(&unknown).await.is_none());
2809    }
2810
2811    #[tokio::test]
2812    async fn handle_ready_allows_waiters() {
2813        let proc_id = test_proc_id("42");
2814        let handle = test_handle(proc_id.clone());
2815
2816        let started_at = std::time::SystemTime::now();
2817        assert!(handle.mark_running(started_at));
2818
2819        let actor_id = proc_id.actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME);
2820        let agent_ref: ActorRef<ProcAgent> = ActorRef::attest(actor_id);
2821
2822        // Pick any addr to carry in Ready (what the child would have
2823        // called back with).
2824        let ready_addr = ChannelAddr::any(ChannelTransport::Unix);
2825
2826        // Stamp Ready and assert ready().await unblocks.
2827        assert!(handle.mark_ready(ready_addr.clone(), agent_ref));
2828        handle
2829            .ready_inner()
2830            .await
2831            .expect("ready_inner() should complete after Ready");
2832
2833        // Sanity-check the Ready fields we control
2834        // (started_at/addr).
2835        match handle.status() {
2836            ProcStatus::Ready {
2837                started_at: t,
2838                addr: a,
2839                ..
2840            } => {
2841                assert_eq!(t, started_at);
2842                assert_eq!(a, ready_addr);
2843            }
2844            other => panic!("expected Ready, got {other:?}"),
2845        }
2846    }
2847
2848    #[test]
2849    fn display_running_includes_uptime() {
2850        let started_at = std::time::SystemTime::now() - Duration::from_secs(42);
2851        let st = ProcStatus::Running { started_at };
2852
2853        let s = format!("{}", st);
2854        assert!(s.contains("Running"));
2855        assert!(s.contains("42s"));
2856    }
2857
2858    #[test]
2859    fn display_ready_includes_addr() {
2860        let started_at = std::time::SystemTime::now() - Duration::from_secs(5);
2861        let addr = ChannelAddr::any(ChannelTransport::Unix);
2862        let agent = ActorRef::attest(
2863            test_proc_id_with_addr(addr.clone(), "proc")
2864                .actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME),
2865        );
2866
2867        let st = ProcStatus::Ready {
2868            started_at,
2869            addr: addr.clone(),
2870            agent,
2871        };
2872
2873        let s = format!("{}", st);
2874        assert!(s.contains(&addr.to_string())); // addr
2875        assert!(s.contains("Ready"));
2876    }
2877
2878    #[test]
2879    fn display_stopped_includes_exit_code() {
2880        let st = ProcStatus::Stopped {
2881            exit_code: 7,
2882            stderr_tail: Vec::new(),
2883        };
2884        let s = format!("{}", st);
2885        assert!(s.contains("Stopped"));
2886        assert!(s.contains("7"));
2887    }
2888
2889    #[test]
2890    fn display_other_variants_does_not_panic() {
2891        let samples = vec![
2892            ProcStatus::Starting,
2893            ProcStatus::Stopping {
2894                started_at: std::time::SystemTime::now(),
2895            },
2896            ProcStatus::Ready {
2897                started_at: std::time::SystemTime::now(),
2898                addr: ChannelAddr::any(ChannelTransport::Unix),
2899                agent: ActorRef::attest(
2900                    test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "x")
2901                        .actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME),
2902                ),
2903            },
2904            ProcStatus::Killed {
2905                signal: 9,
2906                core_dumped: false,
2907            },
2908            ProcStatus::Failed {
2909                reason: "boom".into(),
2910            },
2911        ];
2912
2913        for st in samples {
2914            let _ = format!("{}", st); // Just make sure it doesn't panic.
2915        }
2916    }
2917
2918    #[tokio::test]
2919    async fn proc_handle_ready_ok_through_trait() {
2920        let proc_id =
2921            test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-ready-ok");
2922        let handle = test_handle(proc_id.clone());
2923
2924        // Starting -> Running
2925        let t0 = std::time::SystemTime::now();
2926        assert!(handle.mark_running(t0));
2927
2928        // Synthesize Ready data
2929        let addr = ChannelAddr::any(ChannelTransport::Unix);
2930        let agent: ActorRef<ProcAgent> =
2931            ActorRef::attest(proc_id.actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME));
2932        assert!(handle.mark_ready(addr, agent));
2933
2934        // Call the trait method (not ready_inner).
2935        let r = <BootstrapProcHandle as ProcHandle>::ready(&handle).await;
2936        assert!(r.is_ok(), "expected Ok(()), got {r:?}");
2937    }
2938
2939    #[tokio::test]
2940    async fn proc_handle_wait_returns_terminal_status() {
2941        let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-wait");
2942        let handle = test_handle(proc_id);
2943
2944        // Drive directly to a terminal state before calling wait()
2945        assert!(handle.mark_stopped(0, Vec::new()));
2946
2947        // Call the trait method (not wait_inner)
2948        let st = <BootstrapProcHandle as ProcHandle>::wait(&handle)
2949            .await
2950            .expect("wait should return Ok(terminal)");
2951
2952        match st {
2953            ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
2954            other => panic!("expected Stopped(0), got {other:?}"),
2955        }
2956    }
2957
2958    #[tokio::test]
2959    async fn ready_wrapper_maps_terminal_to_trait_error() {
2960        let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "wrap");
2961        let handle = test_handle(proc_id);
2962
2963        assert!(handle.mark_stopped(7, Vec::new()));
2964
2965        match <BootstrapProcHandle as ProcHandle>::ready(&handle).await {
2966            Ok(()) => panic!("expected Err"),
2967            Err(HostReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
2968                assert_eq!(exit_code, 7);
2969            }
2970            Err(e) => panic!("unexpected error: {e:?}"),
2971        }
2972    }
2973
2974    /// Create a ProcAddr and a host **backend_addr** channel that the
2975    /// bootstrap child proc will dial to attach its mailbox to the
2976    /// host.
2977    ///
2978    /// - `proc_id`: logical identity for the child proc (pure name;
2979    ///   not an OS pid).
2980    /// - `backend_addr`: a mailbox address served by the **parent
2981    ///   (host) proc** here; the spawned bootstrap process dials this
2982    ///   so its messages route via the host.
2983    #[cfg(fbcode_build)]
2984    async fn make_proc_id_and_backend_addr(
2985        instance: &hyperactor::Instance<()>,
2986        _tag: &str,
2987    ) -> (ProcAddr, ChannelAddr) {
2988        // Serve a Unix channel as the "backend_addr" and hook it into
2989        // this test proc.
2990        let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2991
2992        // Route messages arriving on backend_addr into this test
2993        // proc's mailbox so the bootstrap child can reach the host
2994        // router.
2995        instance.proc().clone().serve(rx);
2996
2997        // We return an arbitrary (but unbound!) unix direct proc id here;
2998        // it is okay, as we're not testing connectivity.
2999        let proc_id = test_proc_id_with_addr(ChannelTransport::Unix.any(), "proc");
3000        (proc_id, backend_addr)
3001    }
3002
3003    #[tokio::test]
3004    #[cfg(fbcode_build)]
3005    async fn bootstrap_handle_terminate_graceful() {
3006        // Create a root direct-addressed proc + client instance.
3007        let root =
3008            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3009        let (instance, _handle) = root.client("client").unwrap();
3010
3011        let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3012        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3013        let handle = mgr
3014            .spawn(
3015                proc_id.clone(),
3016                backend_addr.clone(),
3017                BootstrapProcConfig {
3018                    create_rank: 0,
3019                    client_config_override: Attrs::new(),
3020                    proc_bind: None,
3021                    bootstrap_command: None,
3022                },
3023            )
3024            .await
3025            .expect("spawn bootstrap child");
3026
3027        handle.ready().await.expect("ready");
3028
3029        let deadline = Duration::from_secs(2);
3030        match tokio::time::timeout(
3031            deadline * 2,
3032            handle.terminate(&instance, deadline, "test terminate"),
3033        )
3034        .await
3035        {
3036            Err(_) => panic!("terminate() future hung"),
3037            Ok(Ok(st)) => {
3038                match st {
3039                    ProcStatus::Stopped { exit_code, .. } => {
3040                        // child called exit(0) on SIGTERM
3041                        assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3042                    }
3043                    ProcStatus::Killed { signal, .. } => {
3044                        // If the child didn't trap SIGTERM, we'd see
3045                        // SIGTERM (15) here and indeed, this is what
3046                        // we see. Since we call
3047                        // `hyperactor::initialize_with_current_runtime();`
3048                        // we seem unable to trap `SIGTERM` and
3049                        // instead folly intercepts:
3050                        // [0] *** Aborted at 1758850539 (Unix time, try 'date -d @1758850539') ***
3051                        // [0] *** Signal 15 (SIGTERM) (0x3951c00173692) received by PID 1527420 (pthread TID 0x7f803de66cc0) (linux TID 1527420) (maybe from PID 1521298, UID 234780) (code: 0), stack trace: ***
3052                        // [0]     @ 000000000000e713 folly::symbolizer::(anonymous namespace)::innerSignalHandler(int, siginfo_t*, void*)
3053                        // [0]                        ./fbcode/folly/debugging/symbolizer/SignalHandler.cpp:485
3054                        // It gets worse. When run with
3055                        // '@fbcode//mode/dev-nosan' it terminates
3056                        // with a SEGFAULT (metamate says this is a
3057                        // well known issue at Meta). So, TL;DR I
3058                        // restore default `SIGTERM` handling after
3059                        // the test exe has called
3060                        // `initialize_with_runtime`.
3061                        assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3062                    }
3063                    other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3064                }
3065            }
3066            Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3067        }
3068    }
3069
3070    #[tokio::test]
3071    #[cfg(fbcode_build)]
3072    async fn bootstrap_handle_kill_forced() {
3073        // Root proc + client instance (so the child can dial back).
3074        let root =
3075            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3076        let (instance, _handle) = root.client("client").unwrap();
3077
3078        let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3079
3080        // Proc identity + host backend channel the child will dial.
3081        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3082
3083        // Launch the child bootstrap process.
3084        let handle = mgr
3085            .spawn(
3086                proc_id.clone(),
3087                backend_addr.clone(),
3088                BootstrapProcConfig {
3089                    create_rank: 0,
3090                    client_config_override: Attrs::new(),
3091                    proc_bind: None,
3092                    bootstrap_command: None,
3093                },
3094            )
3095            .await
3096            .expect("spawn bootstrap child");
3097
3098        // Wait until the child reports Ready (addr+agent returned via
3099        // callback).
3100        handle.ready().await.expect("ready");
3101
3102        // Force-kill the child and assert we observe a Killed
3103        // terminal status.
3104        let deadline = Duration::from_secs(5);
3105        match tokio::time::timeout(deadline, handle.kill()).await {
3106            Err(_) => panic!("kill() future hung"),
3107            Ok(Ok(st)) => {
3108                // We expect a KILLED terminal state.
3109                match st {
3110                    ProcStatus::Killed { signal, .. } => {
3111                        // On Linux this should be SIGKILL (9).
3112                        assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3113                    }
3114                    other => panic!("expected Killed status after kill(); got: {other:?}"),
3115                }
3116            }
3117            Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3118        }
3119    }
3120
3121    #[tokio::test]
3122    #[cfg(fbcode_build)]
3123    async fn test_host_bootstrap() {
3124        use crate::host_mesh::host_agent::GetLocalProcClient;
3125        use crate::proc_agent::NewClientInstanceClient;
3126
3127        // Create a local instance just to call the local bootstrap actor.
3128        // We should find a way to avoid this for local handles.
3129        let temp_proc = Proc::isolated();
3130        let (temp_instance, _) = temp_proc.client("temp").unwrap();
3131
3132        let handle = host(
3133            ChannelAddr::any(ChannelTransport::Unix),
3134            Some(BootstrapCommand::test()),
3135            None,
3136            false,
3137            None,
3138        )
3139        .await
3140        .unwrap();
3141
3142        let local_proc = handle.0.get_local_proc(&temp_instance).await.unwrap();
3143        let _local_instance = local_proc
3144            .new_client_instance(&temp_instance)
3145            .await
3146            .unwrap();
3147    }
3148
3149    // BootstrapProcManager OnceLock Semantics Tests
3150    //
3151    // These tests verify the "install exactly once / default locks
3152    // in" behavior of the proc launcher OnceLock.
3153
3154    use std::time::Duration;
3155
3156    use crate::proc_launcher::LaunchOptions;
3157    use crate::proc_launcher::LaunchResult;
3158    use crate::proc_launcher::ProcExitKind;
3159    use crate::proc_launcher::ProcExitResult;
3160    use crate::proc_launcher::ProcLauncher;
3161    use crate::proc_launcher::ProcLauncherError;
3162    use crate::proc_launcher::StdioHandling;
3163
3164    /// A dummy proc launcher for testing. Does not actually launch
3165    /// anything.
3166    #[allow(dead_code)]
3167    struct DummyLauncher {
3168        /// Marker value to identify this instance.
3169        marker: u64,
3170    }
3171
3172    impl DummyLauncher {
3173        #[allow(dead_code)]
3174        fn new(marker: u64) -> Self {
3175            Self { marker }
3176        }
3177
3178        #[allow(dead_code)]
3179        fn marker(&self) -> u64 {
3180            self.marker
3181        }
3182    }
3183
3184    #[async_trait::async_trait]
3185    impl ProcLauncher for DummyLauncher {
3186        async fn launch(
3187            &self,
3188            _proc_id: &ProcAddr,
3189            _opts: LaunchOptions,
3190        ) -> Result<LaunchResult, ProcLauncherError> {
3191            let (tx, rx) = tokio::sync::oneshot::channel();
3192            // Immediately send exit result
3193            let _ = tx.send(ProcExitResult {
3194                kind: ProcExitKind::Exited { code: 0 },
3195                stderr_tail: Some(vec![]),
3196            });
3197            Ok(LaunchResult {
3198                pid: None,
3199                started_at: std::time::SystemTime::now(),
3200                stdio: StdioHandling::ManagedByLauncher,
3201                exit_rx: rx,
3202            })
3203        }
3204
3205        async fn terminate(
3206            &self,
3207            _proc_id: &ProcAddr,
3208            _timeout: Duration,
3209        ) -> Result<(), ProcLauncherError> {
3210            Ok(())
3211        }
3212
3213        async fn kill(&self, _proc_id: &ProcAddr) -> Result<(), ProcLauncherError> {
3214            Ok(())
3215        }
3216    }
3217
3218    // set_launcher() then launcher() returns the same Arc.
3219    #[test]
3220    #[cfg(fbcode_build)]
3221    fn test_set_launcher_then_get() {
3222        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3223
3224        let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(42));
3225        let custom_ptr = Arc::as_ptr(&custom);
3226
3227        // Install the custom launcher
3228        manager.set_launcher(custom).unwrap();
3229
3230        // Get the launcher and verify it's the same Arc
3231        let got = manager.launcher();
3232        let got_ptr = Arc::as_ptr(got);
3233
3234        assert_eq!(
3235            custom_ptr, got_ptr,
3236            "launcher() should return the same Arc that was set"
3237        );
3238    }
3239
3240    // launcher() first (forces default init), then set_launcher()
3241    // fails.
3242    #[test]
3243    #[cfg(fbcode_build)]
3244    fn test_get_launcher_then_set_fails() {
3245        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3246
3247        // Force default initialization by calling launcher()
3248        let _ = manager.launcher();
3249
3250        // Now try to set a custom launcher - should fail
3251        let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(99));
3252        let result = manager.set_launcher(custom);
3253
3254        assert!(
3255            result.is_err(),
3256            "set_launcher should fail after launcher() was called"
3257        );
3258
3259        // Verify error message mentions the cause
3260        let err = result.unwrap_err();
3261        let err_msg = err.to_string();
3262        assert!(
3263            err_msg.contains("already initialized"),
3264            "error should mention 'already initialized', got: {}",
3265            err_msg
3266        );
3267    }
3268
3269    // set_launcher() twice fails.
3270    #[test]
3271    #[cfg(fbcode_build)]
3272    fn test_set_launcher_twice_fails() {
3273        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3274
3275        let first: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(1));
3276        let second: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(2));
3277
3278        // First set should succeed
3279        manager.set_launcher(first).unwrap();
3280
3281        // Second set should fail
3282        let result = manager.set_launcher(second);
3283        assert!(result.is_err(), "second set_launcher should fail");
3284
3285        // Verify error message
3286        let err = result.unwrap_err();
3287        let err_msg = err.to_string();
3288        assert!(
3289            err_msg.contains("already initialized"),
3290            "error should mention 'already initialized', got: {}",
3291            err_msg
3292        );
3293    }
3294
3295    /// OnceLock is empty before any call.
3296    #[test]
3297    #[cfg(fbcode_build)]
3298    fn test_launcher_initially_empty() {
3299        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3300
3301        // At this point, the OnceLock should be empty (not yet
3302        // initialized) We can verify this by successfully calling
3303        // set_launcher
3304        let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(123));
3305        let result = manager.set_launcher(custom);
3306
3307        assert!(
3308            result.is_ok(),
3309            "set_launcher should succeed on fresh manager"
3310        );
3311    }
3312
3313    /// launcher() returns the same Arc on repeated calls.
3314    #[test]
3315    #[cfg(fbcode_build)]
3316    fn test_launcher_idempotent() {
3317        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3318
3319        // Call launcher() twice
3320        let first = manager.launcher();
3321        let second = manager.launcher();
3322
3323        // Should return the same Arc (same pointer)
3324        assert!(
3325            Arc::ptr_eq(first, second),
3326            "launcher() should return the same Arc on repeated calls"
3327        );
3328    }
3329}