hyperactor_mesh/
bootstrap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! ## Bootstrap invariants (BS-*)
10//!
11//! - **BS-1 (locking):** Do not acquire other locks from inside
12//!   `transition(...)`. The state lock is held for the duration of
13//!   the transition; acquiring another lock risks deadlock.
14
15use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::env::VarError;
18use std::fmt;
19use std::fs::OpenOptions;
20use std::future;
21use std::io;
22use std::io::Write;
23use std::path::Path;
24use std::path::PathBuf;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::Mutex;
28use std::sync::OnceLock;
29use std::sync::Weak;
30use std::time::Duration;
31use std::time::SystemTime;
32
33use async_trait::async_trait;
34use base64::prelude::*;
35use futures::StreamExt;
36use futures::stream;
37use humantime::format_duration;
38use hyperactor::ActorHandle;
39use hyperactor::channel;
40use hyperactor::channel::ChannelAddr;
41use hyperactor::channel::ChannelError;
42use hyperactor::channel::ChannelTransport;
43use hyperactor::channel::Rx;
44use hyperactor::channel::Tx;
45use hyperactor::context;
46use hyperactor::host::Host;
47use hyperactor::host::HostError;
48use hyperactor::host::ProcHandle;
49use hyperactor::host::ProcManager;
50use hyperactor::host::TerminateSummary;
51use hyperactor::mailbox::IntoBoxedMailboxSender;
52use hyperactor::mailbox::MailboxClient;
53use hyperactor::mailbox::MailboxServer;
54use hyperactor::mailbox::MailboxServerHandle;
55use hyperactor::proc::Proc;
56use hyperactor::reference as hyperactor_reference;
57use hyperactor_config::CONFIG;
58use hyperactor_config::ConfigAttr;
59use hyperactor_config::attrs::Attrs;
60use hyperactor_config::attrs::declare_attrs;
61use hyperactor_config::global::override_or_global;
62use serde::Deserialize;
63use serde::Serialize;
64use tempfile::TempDir;
65use tokio::process::Command;
66use tokio::sync::oneshot;
67use tokio::sync::watch;
68use tracing::Instrument;
69use tracing::Level;
70use typeuri::Named;
71
72use crate::config::MESH_PROC_LAUNCHER_KIND;
73use crate::host_mesh::host_agent::HostAgent;
74use crate::host_mesh::host_agent::HostAgentMode;
75use crate::logging::OutputTarget;
76use crate::logging::StreamFwder;
77use crate::proc_agent::ProcAgent;
78use crate::proc_launcher::LaunchOptions;
79use crate::proc_launcher::NativeProcLauncher;
80use crate::proc_launcher::ProcExitKind;
81use crate::proc_launcher::ProcExitResult;
82use crate::proc_launcher::ProcLauncher;
83use crate::proc_launcher::ProcLauncherError;
84use crate::proc_launcher::StdioHandling;
85#[cfg(target_os = "linux")]
86use crate::proc_launcher::SystemdProcLauncher;
87use crate::proc_launcher::format_process_name;
88use crate::resource;
89
90mod mailbox;
91
92declare_attrs! {
93    /// Enable forwarding child stdout/stderr over the mesh log
94    /// channel.
95    ///
96    /// When `true` (default): child stdio is piped; [`StreamFwder`]
97    /// mirrors output to the parent console and forwards bytes to the
98    /// log channel so a `LogForwardActor` can receive them.
99    ///
100    /// When `false`: no channel forwarding occurs. Child stdio may
101    /// still be piped if [`MESH_ENABLE_FILE_CAPTURE`] is `true` or
102    /// [`MESH_TAIL_LOG_LINES`] > 0; otherwise the child inherits the
103    /// parent stdio (no interception).
104    ///
105    /// This flag does not affect console mirroring: child output
106    /// always reaches the parent console—either via inheritance (no
107    /// piping) or via [`StreamFwder`] when piping is active.
108    @meta(CONFIG = ConfigAttr::new(
109        Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
110        Some("enable_log_forwarding".to_string()),
111    ))
112    pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
113
114    /// When `true`: if stdio is piped, each child's `StreamFwder`
115    /// also forwards lines to a host-scoped `FileAppender` managed by
116    /// the `BootstrapProcManager`. That appender creates exactly two
117    /// files per manager instance—one for stdout and one for
118    /// stderr—and **all** child processes' lines are multiplexed into
119    /// those two files. This can be combined with
120    /// [`MESH_ENABLE_LOG_FORWARDING`] ("stream+local").
121    ///
122    /// Notes:
123    /// - The on-disk files are *aggregate*, not per-process.
124    ///   Disambiguation is via the optional rank prefix (see
125    ///   `PREFIX_WITH_RANK`), which `StreamFwder` prepends to lines
126    ///   before writing.
127    /// - On local runs, file capture is suppressed unless
128    ///   `FORCE_FILE_LOG=true`. In that case `StreamFwder` still
129    ///   runs, but the `FileAppender` may be `None` and no files are
130    ///   written.
131    /// - `MESH_TAIL_LOG_LINES` only controls the in-memory rotating
132    ///   buffer used for peeking—independent of file capture.
133    @meta(CONFIG = ConfigAttr::new(
134        Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
135        Some("enable_file_capture".to_string()),
136    ))
137    pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
138
139    /// Maximum number of log lines retained in a proc's stderr/stdout
140    /// tail buffer. Used by [`StreamFwder`] when wiring child
141    /// pipes. Default: 100
142    @meta(CONFIG = ConfigAttr::new(
143        Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
144        Some("tail_log_lines".to_string()),
145    ))
146    pub attr MESH_TAIL_LOG_LINES: usize = 0;
147
148    /// If enabled (default), bootstrap child processes install
149    /// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
150    /// parent dies unexpectedly. This is a **production safety net**
151    /// against leaked children; tests usually disable it via
152    /// `std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG",
153    /// "false")`.
154    @meta(CONFIG = ConfigAttr::new(
155        Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()),
156        Some("mesh_bootstrap_enable_pdeathsig".to_string()),
157    ))
158    pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
159
160    /// Maximum number of child terminations to run concurrently
161    /// during bulk shutdown. Prevents unbounded spawning of
162    /// termination tasks (which could otherwise spike CPU, I/O, or
163    /// file descriptor load).
164    @meta(CONFIG = ConfigAttr::new(
165        Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()),
166        Some("mesh_terminate_concurrency".to_string()),
167    ))
168    pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
169
170    /// Per-child grace window for termination. When a shutdown is
171    /// requested, the manager sends SIGTERM and waits this long for
172    /// the child to exit before escalating to SIGKILL.
173    @meta(CONFIG = ConfigAttr::new(
174        Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()),
175        Some("mesh_terminate_timeout".to_string()),
176    ))
177    pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
178}
179
180pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
181pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
182pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
183
184/// A channel used by each process to receive its own stdout and stderr
185/// Because stdout and stderr can only be obtained by the parent process,
186/// they need to be streamed back to the process.
187pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
188
189pub(crate) const BOOTSTRAP_MODE_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_MODE";
190pub(crate) const PROCESS_NAME_ENV: &str = "HYPERACTOR_PROCESS_NAME";
191
192/// Messages sent from the process to the allocator. This is an envelope
193/// containing the index of the process (i.e., its "address" assigned by
194/// the allocator), along with the control message in question.
195#[derive(Debug, Clone, Serialize, Deserialize, Named)]
196pub(crate) struct Process2Allocator(pub usize, pub Process2AllocatorMessage);
197wirevalue::register_type!(Process2Allocator);
198
199/// Control messages sent from processes to the allocator.
200#[derive(Debug, Clone, Serialize, Deserialize, Named)]
201pub(crate) enum Process2AllocatorMessage {
202    /// Initialize a process2allocator session. The process is
203    /// listening on the provided channel address, to which
204    /// [`Allocator2Process`] messages are sent.
205    Hello(ChannelAddr),
206
207    /// A proc with the provided ID was started. Its mailbox is
208    /// served at the provided channel address. Procs are started
209    /// after instruction by the allocator through the corresponding
210    /// [`Allocator2Process`] message.
211    StartedProc(
212        hyperactor_reference::ProcId,
213        hyperactor_reference::ActorRef<ProcAgent>,
214        ChannelAddr,
215    ),
216
217    Heartbeat,
218}
219wirevalue::register_type!(Process2AllocatorMessage);
220
221/// Messages sent from the allocator to a process.
222#[derive(Debug, Clone, Serialize, Deserialize, Named)]
223pub(crate) enum Allocator2Process {
224    /// Request to start a new proc with the provided ID, listening
225    /// to an address on the indicated channel transport.
226    StartProc(hyperactor_reference::ProcId, ChannelTransport),
227
228    /// A request for the process to shut down its procs and exit the
229    /// process with the provided code.
230    StopAndExit(i32),
231
232    /// A request for the process to immediately exit with the provided
233    /// exit code
234    Exit(i32),
235}
236wirevalue::register_type!(Allocator2Process);
237
238async fn exit_if_missed_heartbeat(bootstrap_index: usize, bootstrap_addr: ChannelAddr) {
239    let tx = match channel::dial(bootstrap_addr.clone()) {
240        Ok(tx) => tx,
241
242        Err(err) => {
243            tracing::error!(
244                "Failed to establish heartbeat connection to allocator, exiting! (addr: {:?}): {}",
245                bootstrap_addr,
246                err
247            );
248            std::process::exit(1);
249        }
250    };
251    tracing::info!(
252        "Heartbeat connection established to allocator (idx: {bootstrap_index}, addr: {bootstrap_addr:?})",
253    );
254    loop {
255        tokio::time::sleep(Duration::from_secs(5)).await;
256
257        let result = tx
258            .send(Process2Allocator(
259                bootstrap_index,
260                Process2AllocatorMessage::Heartbeat,
261            ))
262            .await;
263
264        if let Err(err) = result {
265            tracing::error!(
266                "Heartbeat failed to allocator, exiting! (addr: {:?}): {}",
267                bootstrap_addr,
268                err
269            );
270            std::process::exit(1);
271        }
272    }
273}
274
275#[macro_export]
276macro_rules! ok {
277    ($expr:expr $(,)?) => {
278        match $expr {
279            Ok(value) => value,
280            Err(e) => return ::anyhow::Error::from(e),
281        }
282    };
283}
284
285async fn halt<R>() -> R {
286    future::pending::<()>().await;
287    unreachable!()
288}
289
290/// A handle that waits for a host to finish shutting down.
291///
292/// Obtained from [`host`]. Awaiting [`HostShutdownHandle::join`] blocks until
293/// the [`ShutdownHost`] handler sends back the mailbox server handle, drains
294/// it, and (if `exit_on_shutdown`) calls `process::exit`.
295///
296/// Note: [`DrainHost`] does **not** trigger this handle — a drained host
297/// keeps its mailbox server (and Unix socket) alive so new clients can
298/// reconnect to the same address.
299pub struct HostShutdownHandle {
300    rx: tokio::sync::oneshot::Receiver<MailboxServerHandle>,
301    exit_on_shutdown: bool,
302}
303
304impl HostShutdownHandle {
305    /// Wait for the host to finish shutting down, drain its mailbox server,
306    /// and optionally exit the process.
307    pub async fn join(self) {
308        match self.rx.await {
309            Ok(mailbox_handle) => {
310                mailbox_handle.stop("host shutting down");
311                let _ = mailbox_handle.await;
312            }
313            Err(_) => {} // sender dropped without sending — nothing to drain
314        }
315        if self.exit_on_shutdown {
316            std::process::exit(0);
317        }
318    }
319}
320
321/// Bootstrap a host in this process, returning a handle to the mesh agent.
322///
323/// To obtain the local proc, use `GetLocalProc` on the returned host mesh agent,
324/// then use `GetProc` on the returned proc mesh agent.
325///
326/// - `addr`: the listening address of the host; this is used to bind the frontend address;
327/// - `command`: optional bootstrap command to spawn procs, otherwise [`BootstrapProcManager::current`];
328/// - `config`: optional runtime config overlay.
329/// - `exit_on_shutdown`: if true, [`HostShutdownHandle::join`] will call `process::exit` after draining.
330pub async fn host(
331    addr: ChannelAddr,
332    command: Option<BootstrapCommand>,
333    config: Option<Attrs>,
334    exit_on_shutdown: bool,
335) -> anyhow::Result<(ActorHandle<HostAgent>, HostShutdownHandle)> {
336    if let Some(attrs) = config {
337        hyperactor_config::global::set(hyperactor_config::global::Source::Runtime, attrs);
338        tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
339    } else {
340        tracing::debug!("bootstrap: no config snapshot provided (Host)");
341    }
342
343    let command = match command {
344        Some(command) => command,
345        None => BootstrapCommand::current()?,
346    };
347    let manager = BootstrapProcManager::new(command)?;
348
349    let host = Host::new(manager, addr).await?;
350    let addr = host.addr().clone();
351
352    // The ShutdownHost handler will call host.serve() inside
353    // HostAgent::init (after this.bind::<Self>(), so the actor port is bound
354    // before the frontend starts routing messages), then send the resulting
355    // MailboxServerHandle back here for draining.
356    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<MailboxServerHandle>();
357
358    let system_proc = host.system_proc().clone();
359    let host_mesh_agent = system_proc.spawn::<HostAgent>(
360        "host_agent",
361        HostAgent::new(HostAgentMode::Process {
362            host,
363            shutdown_tx: Some(shutdown_tx),
364        }),
365    )?;
366
367    tracing::info!(
368        "serving host at {}, agent: {}",
369        addr,
370        host_mesh_agent.bind::<HostAgent>()
371    );
372
373    Ok((
374        host_mesh_agent,
375        HostShutdownHandle {
376            rx: shutdown_rx,
377            exit_on_shutdown,
378        },
379    ))
380}
381
382/// Bootstrap configures how a mesh process starts up.
383///
384/// Both `Proc` and `Host` variants may include an optional
385/// configuration snapshot (`hyperactor_config::Attrs`). This
386/// snapshot is serialized into the bootstrap payload and made
387/// available to the child. Interpretation and application of that
388/// snapshot is up to the child process; if omitted, the child falls
389/// back to environment/default values.
390#[derive(Clone, Debug, Serialize, Deserialize)]
391pub enum Bootstrap {
392    /// Bootstrap as a "v1" proc
393    Proc {
394        /// The ProcId of the proc to be bootstrapped.
395        proc_id: hyperactor_reference::ProcId,
396        /// The backend address to which messages are forwarded.
397        /// See [`hyperactor::host`] for channel topology details.
398        backend_addr: ChannelAddr,
399        /// The callback address used to indicate successful spawning.
400        callback_addr: ChannelAddr,
401        /// Directory for storing proc socket files. Procs place their sockets
402        /// in this directory, so that they can be looked up by other procs
403        /// for direct transfer.
404        socket_dir_path: PathBuf,
405        /// Optional config snapshot (`hyperactor_config::Attrs`)
406        /// captured by the parent. If present, the child installs it
407        /// as the `ClientOverride` layer so the parent's effective config
408        /// takes precedence over Defaults.
409        config: Option<Attrs>,
410    },
411
412    /// Bootstrap as a "v1" host bootstrap. This sets up a new `Host`,
413    /// managed by a [`crate::host_mesh::host_agent::HostAgent`].
414    Host {
415        /// The address on which to serve the host.
416        addr: ChannelAddr,
417        /// If specified, use the provided command instead of
418        /// [`BootstrapCommand::current`].
419        command: Option<BootstrapCommand>,
420        /// Optional config snapshot (`hyperactor_config::Attrs`)
421        /// captured by the parent. If present, the child installs it
422        /// as the `ClientOverride` layer so the parent's effective config
423        /// takes precedence over Defaults.
424        config: Option<Attrs>,
425        /// If true, exit the process after handling a shutdown request.
426        exit_on_shutdown: bool,
427    },
428
429    /// Bootstrap as a legacy "v0" proc.
430    V0ProcMesh {
431        /// Optional config snapshot (`hyperactor_config::Attrs`)
432        /// captured by the parent. If present, the child installs it
433        /// as the `ClientOverride` layer so the parent's effective config
434        /// takes precedence over Env/Defaults.
435        config: Option<Attrs>,
436    },
437}
438
439impl Default for Bootstrap {
440    fn default() -> Self {
441        Bootstrap::V0ProcMesh { config: None }
442    }
443}
444
445impl Bootstrap {
446    /// Serialize the mode into a environment-variable-safe string by
447    /// base64-encoding its JSON representation.
448    #[allow(clippy::result_large_err)]
449    pub(crate) fn to_env_safe_string(&self) -> crate::Result<String> {
450        Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
451    }
452
453    /// Deserialize the mode from the representation returned by [`to_env_safe_string`].
454    #[allow(clippy::result_large_err)]
455    pub(crate) fn from_env_safe_string(str: &str) -> crate::Result<Self> {
456        let data = BASE64_STANDARD.decode(str)?;
457        let data = std::str::from_utf8(&data)?;
458        Ok(serde_json::from_str(data)?)
459    }
460
461    /// Get a bootstrap configuration from the environment; returns `None`
462    /// if the environment does not specify a boostrap config.
463    pub fn get_from_env() -> anyhow::Result<Option<Self>> {
464        match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
465            Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
466                Ok(mode) => Ok(Some(mode)),
467                Err(e) => {
468                    Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
469                }
470            },
471            Err(VarError::NotPresent) => Ok(None),
472            Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
473        }
474    }
475
476    /// Inject this bootstrap configuration into the environment of the provided command.
477    pub fn to_env(&self, cmd: &mut Command) {
478        cmd.env(
479            "HYPERACTOR_MESH_BOOTSTRAP_MODE",
480            self.to_env_safe_string().unwrap(),
481        );
482    }
483
484    /// Bootstrap this binary according to this configuration.
485    /// This runs until all processes are ready to exit, or returns an error.
486    /// The Ok value is the exit code that should be used.
487    pub async fn bootstrap(self) -> anyhow::Result<i32> {
488        tracing::info!(
489            "bootstrapping mesh process: {}",
490            serde_json::to_string(&self).unwrap()
491        );
492
493        if Debug::is_active() {
494            let mut buf = Vec::new();
495            writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
496            #[cfg(unix)]
497            writeln!(
498                &mut buf,
499                "\tparent pid: {}",
500                std::os::unix::process::parent_id()
501            )
502            .unwrap();
503            writeln!(
504                &mut buf,
505                "\tconfig: {}",
506                serde_json::to_string(&self).unwrap()
507            )
508            .unwrap();
509            match std::env::current_exe() {
510                Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
511                Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
512            }
513            writeln!(&mut buf, "\targs:").unwrap();
514            for arg in std::env::args() {
515                writeln!(&mut buf, "\t\t{}", arg).unwrap();
516            }
517            writeln!(&mut buf, "\tenv:").unwrap();
518            for (key, val) in std::env::vars() {
519                writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
520            }
521            let _ = Debug.write(&buf);
522            if let Ok(s) = std::str::from_utf8(&buf) {
523                tracing::info!("{}", s);
524            } else {
525                tracing::info!("{:?}", buf);
526            }
527        }
528
529        match self {
530            Bootstrap::Proc {
531                proc_id,
532                backend_addr,
533                callback_addr,
534                socket_dir_path,
535                config,
536            } => {
537                let entered = tracing::span!(
538                    Level::INFO,
539                    "proc_bootstrap",
540                    %proc_id,
541                    %backend_addr,
542                    %callback_addr,
543                    socket_dir_path = %socket_dir_path.display(),
544                )
545                .entered();
546                if let Some(attrs) = config {
547                    hyperactor_config::global::set(
548                        hyperactor_config::global::Source::ClientOverride,
549                        attrs,
550                    );
551                    tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)");
552                } else {
553                    tracing::debug!("bootstrap: no config snapshot provided (Proc)");
554                }
555
556                if hyperactor_config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
557                    // Safety net: normal shutdown is via
558                    // `host_mesh.shutdown(&instance)`; PR_SET_PDEATHSIG
559                    // is a last-resort guard against leaks if that
560                    // protocol is bypassed.
561                    let _ = install_pdeathsig_kill();
562                } else {
563                    eprintln!("(bootstrap) PDEATHSIG disabled via config");
564                }
565
566                let (local_addr, name) = (proc_id.addr().clone(), proc_id.name());
567                // TODO provide a direct way to construct these
568                let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
569                let serve_addr = serve_addr.parse().unwrap();
570
571                // The following is a modified host::spawn_proc to support direct
572                // dialing between local procs: 1) we bind each proc to a deterministic
573                // address in socket_dir_path; 2) we use LocalProcDialer to dial these
574                // addresses for local procs.
575                let proc_sender = mailbox::LocalProcDialer::new(
576                    local_addr.clone(),
577                    socket_dir_path,
578                    MailboxClient::dial(backend_addr)?,
579                );
580
581                let proc = Proc::configured(proc_id.clone(), proc_sender.into_boxed());
582
583                let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<i32>();
584                let agent_handle = ProcAgent::boot_v1(proc.clone(), Some(shutdown_tx))
585                    .map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?;
586
587                let span = entered.exit();
588
589                // Finally serve the proc on the same transport as the backend address,
590                // and call back.
591                let (proc_addr, proc_rx) = channel::serve(serve_addr)?;
592                let mailbox_handle = proc.clone().serve(proc_rx);
593                channel::dial(callback_addr)?
594                    .send((proc_addr, agent_handle.bind::<ProcAgent>()))
595                    .instrument(span)
596                    .await
597                    .map_err(ChannelError::from)?;
598
599                // Wait for the StopAll handler to signal the exit code, then
600                // gracefully stop the mailbox server before exiting.
601                let exit_code = shutdown_rx.await.unwrap_or(1);
602                mailbox_handle.stop("process shutting down");
603                let _ = mailbox_handle.await;
604                tracing::info!("bootstrap shutting down with exit code {}", exit_code);
605                // Don't exit the proc, return Ok so the parent function can decide
606                // how to stop.
607                Ok(exit_code)
608            }
609            Bootstrap::Host {
610                addr,
611                command,
612                config,
613                exit_on_shutdown,
614            } => {
615                let (_agent_handle, shutdown) =
616                    host(addr, command, config, exit_on_shutdown).await?;
617                shutdown.join().await;
618                halt().await
619            }
620            Bootstrap::V0ProcMesh { config } => Err(bootstrap_v0_proc_mesh(config).await),
621        }
622    }
623
624    /// A variant of [`bootstrap`] that logs the error and exits the process
625    /// if bootstrapping fails.
626    pub async fn bootstrap_or_die(self) -> ! {
627        let exit_code = match self.bootstrap().await {
628            Ok(exit_code) => exit_code,
629            Err(err) => {
630                tracing::error!("failed to bootstrap mesh process: {}", err);
631                1
632            }
633        };
634        std::process::exit(exit_code);
635    }
636}
637
638/// Install "kill me if parent dies" and close the race window.
639pub fn install_pdeathsig_kill() -> io::Result<()> {
640    #[cfg(target_os = "linux")]
641    {
642        // SAFETY: `getppid()` is a simple libc syscall returning the
643        // parent PID; it has no side effects and does not touch memory.
644        let ppid_before = unsafe { libc::getppid() };
645
646        // SAFETY: Calling into libc; does not dereference memory, just
647        // asks the kernel to deliver SIGKILL on parent death.
648        let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
649        if rc != 0 {
650            return Err(io::Error::last_os_error());
651        }
652
653        // Race-close: if the parent died between our exec and prctl(),
654        // we won't get a signal, so detect that and exit now.
655        //
656        // If the parent PID changed, the parent has died and we've been
657        // reparented. Note: We cannot assume ppid == 1 means the parent
658        // died, as in container environments (e.g., Kubernetes) the parent
659        // may legitimately run as PID 1.
660        // SAFETY: `getppid()` is a simple libc syscall returning the
661        // parent PID; it has no side effects and does not touch memory.
662        let ppid_after = unsafe { libc::getppid() };
663        if ppid_before != ppid_after {
664            std::process::exit(0);
665        }
666    }
667    Ok(())
668}
669
670/// Represents the lifecycle state of a **proc as hosted in an OS
671/// process** managed by `BootstrapProcManager`.
672///
673/// Note: This type is deliberately distinct from [`ProcState`] and
674/// [`ProcStopReason`] (see `alloc.rs`). Those types model allocator
675/// *events* - e.g. "a proc was Created/Running/Stopped" - and are
676/// consumed from an event stream during allocation. By contrast,
677/// [`ProcStatus`] is a **live, queryable view**: it reflects the
678/// current observed status of a running proc, as seen through the
679/// [`BootstrapProcHandle`] API (stop, kill, status).
680///
681/// In short:
682/// - `ProcState`/`ProcStopReason`: historical / event-driven model
683/// - `ProcStatus`: immediate status surface for lifecycle control
684#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
685pub enum ProcStatus {
686    /// The OS process has been spawned but is not yet fully running.
687    /// (Process-level: child handle exists, no confirmation yet.)
688    Starting,
689    /// The OS process is alive and considered running.
690    /// (Proc-level: bootstrap may still be running.)
691    Running { started_at: SystemTime },
692    /// Ready means bootstrap has completed and the proc is serving.
693    /// (Proc-level: bootstrap completed.)
694    Ready {
695        started_at: SystemTime,
696        addr: ChannelAddr,
697        agent: hyperactor_reference::ActorRef<ProcAgent>,
698    },
699    /// A stop has been requested (SIGTERM, graceful shutdown, etc.),
700    /// but the OS process has not yet fully exited. (Proc-level:
701    /// shutdown in progress; Process-level: still running.)
702    Stopping { started_at: SystemTime },
703    /// The process exited with a normal exit code. (Process-level:
704    /// exit observed.)
705    Stopped {
706        exit_code: i32,
707        stderr_tail: Vec<String>,
708    },
709    /// The process was killed by a signal (e.g. SIGKILL).
710    /// (Process-level: abnormal termination.)
711    Killed { signal: i32, core_dumped: bool },
712    /// The proc or its process failed for some other reason
713    /// (bootstrap error, unexpected condition, etc.). (Both levels:
714    /// catch-all failure.)
715    Failed { reason: String },
716}
717
718impl ProcStatus {
719    /// Returns `true` if the proc is in a terminal (exited) state:
720    /// [`ProcStatus::Stopped`], [`ProcStatus::Killed`], or
721    /// [`ProcStatus::Failed`].
722    #[inline]
723    pub fn is_exit(&self) -> bool {
724        matches!(
725            self,
726            ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
727        )
728    }
729}
730
731impl std::fmt::Display for ProcStatus {
732    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
733        match self {
734            ProcStatus::Starting => write!(f, "Starting"),
735            ProcStatus::Running { started_at } => {
736                let uptime = started_at
737                    .elapsed()
738                    .map(|d| format!(" up {}", format_duration(d)))
739                    .unwrap_or_default();
740                write!(f, "Running{uptime}")
741            }
742            ProcStatus::Ready {
743                started_at, addr, ..
744            } => {
745                let uptime = started_at
746                    .elapsed()
747                    .map(|d| format!(" up {}", format_duration(d)))
748                    .unwrap_or_default();
749                write!(f, "Ready at {addr}{uptime}")
750            }
751            ProcStatus::Stopping { started_at } => {
752                let uptime = started_at
753                    .elapsed()
754                    .map(|d| format!(" up {}", format_duration(d)))
755                    .unwrap_or_default();
756                write!(f, "Stopping{uptime}")
757            }
758            ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
759            ProcStatus::Killed {
760                signal,
761                core_dumped,
762            } => {
763                if *core_dumped {
764                    write!(f, "Killed(sig={signal}, core)")
765                } else {
766                    write!(f, "Killed(sig={signal})")
767                }
768            }
769            ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
770        }
771    }
772}
773
774/// Error returned by [`BootstrapProcHandle::ready`].
775#[derive(Debug, Clone)]
776pub enum ReadyError {
777    /// The proc reached a terminal state before `Ready`.
778    Terminal(ProcStatus),
779    /// The internal watch channel closed unexpectedly.
780    ChannelClosed,
781}
782
783impl std::fmt::Display for ReadyError {
784    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
785        match self {
786            ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
787            ReadyError::ChannelClosed => write!(f, "status channel closed"),
788        }
789    }
790}
791impl std::error::Error for ReadyError {}
792
793/// A handle to a proc launched by [`BootstrapProcManager`].
794///
795/// `BootstrapProcHandle` is a lightweight supervisor for an external
796/// process: it tracks and broadcasts lifecycle state, and exposes a
797/// small control/observation surface. While it may temporarily hold a
798/// `tokio::process::Child` (shared behind a mutex) so the exit
799/// monitor can `wait()` it, it is **not** the unique owner of the OS
800/// process, and dropping a `BootstrapProcHandle` does not by itself
801/// terminate the process.
802///
803/// What it pairs together:
804/// - the **logical proc identity** (`ProcId`)
805/// - the **live status surface** ([`ProcStatus`]), available both as
806///   a synchronous snapshot (`status()`) and as an async stream via a
807///   `tokio::sync::watch` channel (`watch()` / `changed()`)
808///
809/// Responsibilities:
810/// - Retain the child handle only until the exit monitor claims it,
811///   so the OS process can be awaited and its terminal status
812///   recorded.
813/// - Hold stdout/stderr tailers until the exit monitor takes them,
814///   then join to recover buffered output for diagnostics.
815/// - Update status via the `mark_*` transitions and broadcast changes
816///   over the watch channel so tasks can `await` lifecycle
817///   transitions without polling.
818/// - Provide the foundation for higher-level APIs like `wait()`
819///   (await terminal) and, later, `terminate()` / `kill()`.
820///
821/// Notes:
822/// - Manager-level cleanup happens in [`BootstrapProcManager::drop`]:
823///   it SIGKILLs any still-recorded PIDs; we do not rely on
824///   `Child::kill_on_drop`.
825///
826/// Relationship to types:
827/// - [`ProcStatus`]: live status surface, updated by this handle.
828/// - [`ProcState`]/[`ProcStopReason`] (in `alloc.rs`):
829///   allocator-facing, historical event log; not directly updated by
830///   this type.
831#[derive(Clone)]
832pub struct BootstrapProcHandle {
833    /// Logical identity of the proc in the mesh.
834    proc_id: hyperactor_reference::ProcId,
835
836    /// Live lifecycle snapshot (see [`ProcStatus`]). Kept in a mutex
837    /// so [`BootstrapProcHandle::status`] can return a synchronous
838    /// copy. All mutations now flow through
839    /// [`BootstrapProcHandle::transition`], which updates this field
840    /// under the lock and then broadcasts on the watch channel.
841    status: Arc<std::sync::Mutex<ProcStatus>>,
842
843    /// Launcher used to terminate/kill the proc. The launcher owns
844    /// the actual OS child handle and PID tracking.
845    ///
846    /// We hold a `Weak` reference so that when `BootstrapProcManager`
847    /// drops, the launcher's `Arc` refcount reaches zero and its `Drop`
848    /// runs, cleaning up any remaining child processes. If the manager
849    /// is gone when we try to terminate/kill, we treat it as a no-op
850    /// (the proc is being killed by the launcher's Drop anyway).
851    launcher: Weak<dyn ProcLauncher>,
852
853    /// Stdout monitor for this proc. Created with `StreamFwder::start`, it
854    /// forwards output to a log channel and keeps a bounded ring buffer.
855    /// Transferred to the exit monitor, which joins it after `wait()`
856    /// to recover buffered lines.
857    stdout_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
858
859    /// Stderr monitor for this proc. Same behavior as `stdout_fwder`
860    /// but for stderr (used for exit-reason enrichment).
861    stderr_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
862
863    /// Watch sender for status transitions. Every `mark_*` goes
864    /// through [`BootstrapProcHandle::transition`], which updates the
865    /// snapshot under the lock and then `send`s the new
866    /// [`ProcStatus`].
867    tx: tokio::sync::watch::Sender<ProcStatus>,
868
869    /// Watch receiver seed. `watch()` clones this so callers can
870    /// `borrow()` the current status and `changed().await` future
871    /// transitions independently.
872    rx: tokio::sync::watch::Receiver<ProcStatus>,
873}
874
875impl fmt::Debug for BootstrapProcHandle {
876    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
877        let status = self.status.lock().expect("status mutex poisoned").clone();
878        f.debug_struct("BootstrapProcHandle")
879            .field("proc_id", &self.proc_id)
880            .field("status", &status)
881            .field("launcher", &"<dyn ProcLauncher>")
882            .field("tx", &"<watch::Sender>")
883            .field("rx", &"<watch::Receiver>")
884            // Intentionally skip stdout_tailer / stderr_tailer (not
885            // Debug).
886            .finish()
887    }
888}
889
890// See BS-1 in module doc.
891impl BootstrapProcHandle {
892    /// Construct a new [`BootstrapProcHandle`] for a freshly spawned
893    /// OS process hosting a proc.
894    ///
895    /// - Initializes the status to [`ProcStatus::Starting`] since the
896    ///   child process has been created but not yet confirmed running.
897    /// - Stores the launcher reference for terminate/kill delegation.
898    ///
899    /// This is the canonical entry point used by
900    /// `BootstrapProcManager` when it launches a proc into a new
901    /// process.
902    pub(crate) fn new(
903        proc_id: hyperactor_reference::ProcId,
904        launcher: Weak<dyn ProcLauncher>,
905    ) -> Self {
906        let (tx, rx) = watch::channel(ProcStatus::Starting);
907        Self {
908            proc_id,
909            status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
910            launcher,
911            stdout_fwder: Arc::new(std::sync::Mutex::new(None)),
912            stderr_fwder: Arc::new(std::sync::Mutex::new(None)),
913            tx,
914            rx,
915        }
916    }
917
918    /// Return the logical proc identity in the mesh.
919    #[inline]
920    pub fn proc_id(&self) -> &hyperactor_reference::ProcId {
921        &self.proc_id
922    }
923
924    /// Create a new subscription to this proc's status stream.
925    ///
926    /// Each call returns a fresh [`watch::Receiver`] tied to this
927    /// handle's internal [`ProcStatus`] channel. The receiver can be
928    /// awaited on (`rx.changed().await`) to observe lifecycle
929    /// transitions as they occur.
930    ///
931    /// Notes:
932    /// - Multiple subscribers can exist simultaneously; each sees
933    ///   every status update in order.
934    /// - Use [`BootstrapProcHandle::status`] for a one-off snapshot;
935    ///   use `watch()` when you need to await changes over time.
936    #[inline]
937    pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
938        self.rx.clone()
939    }
940
941    /// Wait until this proc's status changes.
942    ///
943    /// This is a convenience wrapper around
944    /// [`watch::Receiver::changed`]: it subscribes internally via
945    /// [`BootstrapProcHandle::watch`] and awaits the next transition.
946    /// If no subscribers exist or the channel is closed, this returns
947    /// without error.
948    ///
949    /// Typical usage:
950    /// ```ignore
951    /// handle.changed().await;
952    /// match handle.status() {
953    ///     ProcStatus::Running { .. } => { /* now running */ }
954    ///     ProcStatus::Stopped { .. } => { /* exited */ }
955    ///     _ => {}
956    /// }
957    /// ```
958    #[inline]
959    pub async fn changed(&self) {
960        let _ = self.watch().changed().await;
961    }
962
963    /// Return a snapshot of the current [`ProcStatus`] for this proc.
964    ///
965    /// This is a *live view* of the lifecycle state as tracked by
966    /// [`BootstrapProcManager`]. It reflects what is currently known
967    /// about the underlying OS process (e.g., `Starting`, `Running`,
968    /// `Stopping`, etc.).
969    ///
970    /// Internally this reads the mutex-guarded status. Use this when
971    /// you just need a synchronous snapshot; use
972    /// [`BootstrapProcHandle::watch`] or
973    /// [`BootstrapProcHandle::changed`] if you want to await
974    /// transitions asynchronously.
975    #[must_use]
976    pub fn status(&self) -> ProcStatus {
977        // Source of truth for now is the mutex. We broadcast via
978        // `watch` in `transition`, but callers that want a
979        // synchronous snapshot should read the guarded value.
980        self.status.lock().expect("status mutex poisoned").clone()
981    }
982
983    /// Atomically apply a state transition while holding the status
984    /// lock, and send the updated value on the watch channel **while
985    /// still holding the lock**. This guarantees the mutex state and
986    /// the broadcast value stay in sync and avoids reordering between
987    /// concurrent transitions.
988    fn transition<F>(&self, f: F) -> bool
989    where
990        F: FnOnce(&mut ProcStatus) -> bool,
991    {
992        let mut guard = self.status.lock().expect("status mutex poisoned");
993        let _before = guard.clone();
994        let changed = f(&mut guard);
995        if changed {
996            // Publish while still holding the lock to preserve order.
997            let _ = self.tx.send(guard.clone());
998        }
999        changed
1000    }
1001
1002    /// Transition this proc into the [`ProcStatus::Running`] state.
1003    ///
1004    /// Called internally once the child OS process has been spawned.
1005    /// Records the `started_at` timestamp so that callers can query it
1006    /// later via [`BootstrapProcHandle::status`].
1007    ///
1008    /// This is a best-effort marker: it reflects that the process
1009    /// exists at the OS level, but does not guarantee that the proc
1010    /// has completed bootstrap or is fully ready.
1011    pub(crate) fn mark_running(&self, started_at: SystemTime) -> bool {
1012        self.transition(|st| match *st {
1013            ProcStatus::Starting => {
1014                *st = ProcStatus::Running { started_at };
1015                true
1016            }
1017            _ => {
1018                tracing::warn!(
1019                    "illegal transition: {:?} -> Running; leaving status unchanged",
1020                    *st
1021                );
1022                false
1023            }
1024        })
1025    }
1026
1027    /// Attempt to transition this proc into the [`ProcStatus::Ready`]
1028    /// state.
1029    ///
1030    /// This records the listening address and agent once the proc has
1031    /// successfully started and is ready to serve. The `started_at`
1032    /// timestamp is derived from the current `Running` state.
1033    ///
1034    /// Returns `true` if the transition succeeded (from `Starting` or
1035    /// `Running`), or `false` if the current state did not allow
1036    /// moving to `Ready`. In the latter case the state is left
1037    /// unchanged and a warning is logged.
1038    pub(crate) fn mark_ready(
1039        &self,
1040        addr: ChannelAddr,
1041        agent: hyperactor_reference::ActorRef<ProcAgent>,
1042    ) -> bool {
1043        tracing::info!(proc_id = %self.proc_id, %addr, "{} ready at {}", self.proc_id, addr);
1044        self.transition(|st| match st {
1045            ProcStatus::Starting => {
1046                // Unexpected: we should be Running before Ready, but
1047                // handle gracefully with current time.
1048                *st = ProcStatus::Ready {
1049                    started_at: std::time::SystemTime::now(),
1050                    addr,
1051                    agent,
1052                };
1053                true
1054            }
1055            ProcStatus::Running { started_at } => {
1056                let started_at = *started_at;
1057                *st = ProcStatus::Ready {
1058                    started_at,
1059                    addr,
1060                    agent,
1061                };
1062                true
1063            }
1064            _ => {
1065                tracing::warn!(
1066                    "illegal transition: {:?} -> Ready; leaving status unchanged",
1067                    st
1068                );
1069                false
1070            }
1071        })
1072    }
1073
1074    /// Record that a stop has been requested for the proc (e.g. a
1075    /// graceful shutdown via SIGTERM), but the underlying process has
1076    /// not yet fully exited.
1077    pub(crate) fn mark_stopping(&self) -> bool {
1078        let now = std::time::SystemTime::now();
1079
1080        self.transition(|st| match *st {
1081            ProcStatus::Running { started_at } => {
1082                *st = ProcStatus::Stopping { started_at };
1083                true
1084            }
1085            ProcStatus::Ready { started_at, .. } => {
1086                *st = ProcStatus::Stopping { started_at };
1087                true
1088            }
1089            ProcStatus::Starting => {
1090                *st = ProcStatus::Stopping { started_at: now };
1091                true
1092            }
1093            _ => false,
1094        })
1095    }
1096
1097    /// Record that the process has exited normally with the given
1098    /// exit code.
1099    pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
1100        self.transition(|st| match *st {
1101            ProcStatus::Starting
1102            | ProcStatus::Running { .. }
1103            | ProcStatus::Ready { .. }
1104            | ProcStatus::Stopping { .. } => {
1105                *st = ProcStatus::Stopped {
1106                    exit_code,
1107                    stderr_tail,
1108                };
1109                true
1110            }
1111            _ => {
1112                tracing::warn!(
1113                    "illegal transition: {:?} -> Stopped; leaving status unchanged",
1114                    *st
1115                );
1116                false
1117            }
1118        })
1119    }
1120
1121    /// Record that the process was killed by the given signal (e.g.
1122    /// SIGKILL, SIGTERM).
1123    pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
1124        self.transition(|st| match *st {
1125            ProcStatus::Starting
1126            | ProcStatus::Running { .. }
1127            | ProcStatus::Ready { .. }
1128            | ProcStatus::Stopping { .. } => {
1129                *st = ProcStatus::Killed {
1130                    signal,
1131                    core_dumped,
1132                };
1133                true
1134            }
1135            _ => {
1136                tracing::warn!(
1137                    "illegal transition: {:?} -> Killed; leaving status unchanged",
1138                    *st
1139                );
1140                false
1141            }
1142        })
1143    }
1144
1145    /// Record that the proc or its process failed for an unexpected
1146    /// reason (bootstrap error, spawn failure, etc.).
1147    pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
1148        self.transition(|st| match *st {
1149            ProcStatus::Starting
1150            | ProcStatus::Running { .. }
1151            | ProcStatus::Ready { .. }
1152            | ProcStatus::Stopping { .. } => {
1153                *st = ProcStatus::Failed {
1154                    reason: reason.into(),
1155                };
1156                true
1157            }
1158            _ => {
1159                tracing::warn!(
1160                    "illegal transition: {:?} -> Failed; leaving status unchanged",
1161                    *st
1162                );
1163                false
1164            }
1165        })
1166    }
1167
1168    /// Wait until the proc has reached a terminal state and return
1169    /// it.
1170    ///
1171    /// Terminal means [`ProcStatus::Stopped`],
1172    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]. If the
1173    /// current status is already terminal, returns immediately.
1174    ///
1175    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
1176    /// owner of the OS process, so you can call `wait()` from
1177    /// multiple tasks concurrently.
1178    ///
1179    /// Implementation detail: listens on this handle's `watch`
1180    /// channel. It snapshots the current status, and if not terminal
1181    /// awaits the next change. If the channel closes unexpectedly,
1182    /// returns the last observed status.
1183    ///
1184    /// Mirrors `tokio::process::Child::wait()`, but yields the
1185    /// higher-level [`ProcStatus`] instead of an `ExitStatus`.
1186    #[must_use]
1187    pub async fn wait_inner(&self) -> ProcStatus {
1188        let mut rx = self.watch();
1189        loop {
1190            let st = rx.borrow().clone();
1191            if st.is_exit() {
1192                return st;
1193            }
1194            // If the channel closes, return the last observed value.
1195            if rx.changed().await.is_err() {
1196                return st;
1197            }
1198        }
1199    }
1200
1201    /// Wait until the proc reaches the [`ProcStatus::Ready`] state.
1202    ///
1203    /// If the proc hits a terminal state ([`ProcStatus::Stopped`],
1204    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]) before ever
1205    /// becoming `Ready`, this returns
1206    /// `Err(ReadyError::Terminal(status))`. If the internal watch
1207    /// channel closes unexpectedly, this returns
1208    /// `Err(ReadyError::ChannelClosed)`. Otherwise it returns
1209    /// `Ok(())` when `Ready` is first observed.
1210    ///
1211    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
1212    /// owner; multiple tasks may await `ready()` concurrently.
1213    /// `Stopping` is not treated as terminal here; we continue
1214    /// waiting until `Ready` or a terminal state is seen.
1215    ///
1216    /// Companion to [`BootstrapProcHandle::wait_inner`]:
1217    /// `wait_inner()` resolves on exit; `ready_inner()` resolves on
1218    /// startup.
1219    pub async fn ready_inner(&self) -> Result<(), ReadyError> {
1220        let mut rx = self.watch();
1221        loop {
1222            let st = rx.borrow().clone();
1223            match &st {
1224                ProcStatus::Ready { .. } => return Ok(()),
1225                s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1226                _non_terminal => {
1227                    if rx.changed().await.is_err() {
1228                        return Err(ReadyError::ChannelClosed);
1229                    }
1230                }
1231            }
1232        }
1233    }
1234
1235    pub fn set_stream_monitors(&self, out: Option<StreamFwder>, err: Option<StreamFwder>) {
1236        *self
1237            .stdout_fwder
1238            .lock()
1239            .expect("stdout_tailer mutex poisoned") = out;
1240        *self
1241            .stderr_fwder
1242            .lock()
1243            .expect("stderr_tailer mutex poisoned") = err;
1244    }
1245
1246    fn take_stream_monitors(&self) -> (Option<StreamFwder>, Option<StreamFwder>) {
1247        let out = self
1248            .stdout_fwder
1249            .lock()
1250            .expect("stdout_tailer mutex poisoned")
1251            .take();
1252        let err = self
1253            .stderr_fwder
1254            .lock()
1255            .expect("stderr_tailer mutex poisoned")
1256            .take();
1257        (out, err)
1258    }
1259
1260    /// Wait for the proc to exit, escalating to launcher terminate/kill
1261    /// if it doesn't exit within `timeout`.
1262    ///
1263    /// This is a fire-and-forget helper: it assumes a stop signal has
1264    /// already been sent. It waits for exit, then escalates through
1265    /// terminate and kill if needed.
1266    pub(crate) async fn wait_or_brutally_kill(&self, timeout: Duration) {
1267        match tokio::time::timeout(timeout, self.wait_inner()).await {
1268            Ok(st) if st.is_exit() => return,
1269            _ => {}
1270        }
1271
1272        let _ = self.mark_stopping();
1273
1274        if let Some(launcher) = self.launcher.upgrade() {
1275            if let Err(e) = launcher.terminate(&self.proc_id, timeout).await {
1276                tracing::warn!(
1277                    proc_id = %self.proc_id,
1278                    error = %e,
1279                    "wait_or_brutally_kill: launcher terminate failed, trying kill"
1280                );
1281                let _ = launcher.kill(&self.proc_id).await;
1282            }
1283        }
1284
1285        let _ = self.wait_inner().await;
1286    }
1287
1288    /// Sends a StopAll message to the ProcAgent, which should exit the process.
1289    /// Waits for the successful state change of the process. If the process
1290    /// doesn't reach a terminal state, returns Err.
1291    async fn send_stop_all(
1292        &self,
1293        cx: &impl context::Actor,
1294        agent: hyperactor_reference::ActorRef<ProcAgent>,
1295        timeout: Duration,
1296        reason: &str,
1297    ) -> anyhow::Result<ProcStatus> {
1298        // For all of the messages and replies in this function:
1299        // if the proc is already dead, then the message will be undeliverable,
1300        // which should be ignored.
1301        // If this message isn't deliverable to the agent, the process may have
1302        // stopped already. No need to produce any errors, just continue with
1303        // killing the process.
1304        let mut agent_port = agent.port();
1305        agent_port.return_undeliverable(false);
1306        agent_port.send(
1307            cx,
1308            resource::StopAll {
1309                reason: reason.to_string(),
1310            },
1311        )?;
1312        // The agent handling Stop should exit the process, if it doesn't within
1313        // the time window, we escalate to SIGTERM.
1314        match tokio::time::timeout(timeout, self.wait()).await {
1315            Ok(Ok(st)) => Ok(st),
1316            Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1317            Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
1318        }
1319    }
1320}
1321
1322#[async_trait]
1323impl hyperactor::host::ProcHandle for BootstrapProcHandle {
1324    type Agent = ProcAgent;
1325    type TerminalStatus = ProcStatus;
1326
1327    #[inline]
1328    fn proc_id(&self) -> &hyperactor_reference::ProcId {
1329        &self.proc_id
1330    }
1331
1332    #[inline]
1333    fn addr(&self) -> Option<ChannelAddr> {
1334        match &*self.status.lock().expect("status mutex poisoned") {
1335            ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1336            _ => None,
1337        }
1338    }
1339
1340    #[inline]
1341    fn agent_ref(&self) -> Option<hyperactor_reference::ActorRef<Self::Agent>> {
1342        match &*self.status.lock().expect("status mutex poisoned") {
1343            ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1344            _ => None,
1345        }
1346    }
1347
1348    /// Wait until this proc first reaches the [`ProcStatus::Ready`]
1349    /// state.
1350    ///
1351    /// Returns `Ok(())` once `Ready` is observed.
1352    ///
1353    /// If the proc transitions directly to a terminal state before
1354    /// becoming `Ready`, returns `Err(ReadyError::Terminal(status))`.
1355    ///
1356    /// If the internal status watch closes unexpectedly before
1357    /// `Ready` is observed, returns `Err(ReadyError::ChannelClosed)`.
1358    async fn ready(&self) -> Result<(), hyperactor::host::ReadyError<Self::TerminalStatus>> {
1359        match self.ready_inner().await {
1360            Ok(()) => Ok(()),
1361            Err(ReadyError::Terminal(status)) => {
1362                Err(hyperactor::host::ReadyError::Terminal(status))
1363            }
1364            Err(ReadyError::ChannelClosed) => Err(hyperactor::host::ReadyError::ChannelClosed),
1365        }
1366    }
1367
1368    /// Wait until this proc reaches a terminal [`ProcStatus`].
1369    ///
1370    /// Returns `Ok(status)` when a terminal state is observed
1371    /// (`Stopped`, `Killed`, or `Failed`).
1372    ///
1373    /// If the internal status watch closes before any terminal state
1374    /// is seen, returns `Err(WaitError::ChannelClosed)`.
1375    async fn wait(&self) -> Result<Self::TerminalStatus, hyperactor::host::WaitError> {
1376        let status = self.wait_inner().await;
1377        if status.is_exit() {
1378            Ok(status)
1379        } else {
1380            Err(hyperactor::host::WaitError::ChannelClosed)
1381        }
1382    }
1383
1384    /// Attempt to terminate the underlying OS process.
1385    ///
1386    /// This drives **process-level** teardown only:
1387    /// - First attempts graceful shutdown via `ProcAgent` if available.
1388    /// - If that fails or times out, delegates to the launcher's
1389    ///   `terminate()` method, which handles SIGTERM/SIGKILL escalation.
1390    ///
1391    /// If the process was already in a terminal state when called,
1392    /// returns [`TerminateError::AlreadyTerminated`].
1393    ///
1394    /// # Parameters
1395    /// - `timeout`: Grace period to wait after graceful shutdown before
1396    ///   escalating.
1397    /// - `reason`: Human-readable reason for termination.
1398    ///
1399    /// # Returns
1400    /// - `Ok(ProcStatus)` if the process exited during the
1401    ///   termination sequence.
1402    /// - `Err(TerminateError)` if already exited, signaling failed,
1403    ///   or the channel was lost.
1404    async fn terminate(
1405        &self,
1406        cx: &impl context::Actor,
1407        timeout: Duration,
1408        reason: &str,
1409    ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1410        // If already terminal, return that.
1411        let st0 = self.status();
1412        if st0.is_exit() {
1413            tracing::debug!(?st0, "terminate(): already terminal");
1414            return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1415        }
1416
1417        // Before signaling, try to close actors normally. Only works if
1418        // they are in the Ready state and have an Agent we can message.
1419        let agent = self.agent_ref();
1420        if let Some(agent) = agent {
1421            match self.send_stop_all(cx, agent.clone(), timeout, reason).await {
1422                Ok(st) => return Ok(st),
1423                Err(e) => {
1424                    // Variety of possible errors, proceed with launcher termination.
1425                    tracing::warn!(
1426                        "ProcAgent {} could not successfully stop all actors: {}",
1427                        agent.actor_id(),
1428                        e,
1429                    );
1430                }
1431            }
1432        }
1433
1434        // Mark "Stopping" (ok if state races).
1435        let _ = self.mark_stopping();
1436
1437        // Delegate to launcher for SIGTERM/SIGKILL escalation.
1438        tracing::info!(proc_id = %self.proc_id, ?timeout, "terminate(): delegating to launcher");
1439        if let Some(launcher) = self.launcher.upgrade() {
1440            if let Err(e) = launcher.terminate(&self.proc_id, timeout).await {
1441                tracing::warn!(proc_id = %self.proc_id, error=%e, "terminate(): launcher termination failed");
1442                return Err(hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1443                    "launcher termination failed: {}",
1444                    e
1445                )));
1446            }
1447        } else {
1448            // Launcher dropped - its Drop is killing all procs anyway.
1449            tracing::debug!(proc_id = %self.proc_id, "terminate(): launcher gone, proc cleanup in progress");
1450        }
1451
1452        // Wait for the exit monitor to observe terminal state.
1453        let st = self.wait_inner().await;
1454        if st.is_exit() {
1455            tracing::info!(proc_id = %self.proc_id, ?st, "terminate(): exited");
1456            Ok(st)
1457        } else {
1458            Err(hyperactor::host::TerminateError::ChannelClosed)
1459        }
1460    }
1461
1462    /// Forcibly kill the underlying OS process.
1463    ///
1464    /// This bypasses any graceful shutdown semantics and immediately
1465    /// delegates to the launcher's `kill()` method. It is intended as
1466    /// a last-resort termination mechanism when `terminate()` fails or
1467    /// when no grace period is desired.
1468    ///
1469    /// # Behavior
1470    /// - If the process was already in a terminal state, returns
1471    ///   [`TerminateError::AlreadyTerminated`].
1472    /// - Otherwise delegates to the launcher's `kill()` method.
1473    /// - Then waits for the exit monitor to observe a terminal state.
1474    ///
1475    /// # Returns
1476    /// - `Ok(ProcStatus)` if the process exited after kill.
1477    /// - `Err(TerminateError)` if already exited, signaling failed,
1478    ///   or the channel was lost.
1479    async fn kill(
1480        &self,
1481    ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1482        // If already terminal, return that.
1483        let st0 = self.status();
1484        if st0.is_exit() {
1485            return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1486        }
1487
1488        // Delegate to launcher for kill.
1489        tracing::info!(proc_id = %self.proc_id, "kill(): delegating to launcher");
1490        if let Some(launcher) = self.launcher.upgrade() {
1491            if let Err(e) = launcher.kill(&self.proc_id).await {
1492                tracing::warn!(proc_id = %self.proc_id, error=%e, "kill(): launcher kill failed");
1493                return Err(hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1494                    "launcher kill failed: {}",
1495                    e
1496                )));
1497            }
1498        } else {
1499            // Launcher dropped - its Drop is killing all procs anyway.
1500            tracing::debug!(proc_id = %self.proc_id, "kill(): launcher gone, proc cleanup in progress");
1501        }
1502
1503        // Wait for exit monitor to record terminal status.
1504        let st = self.wait_inner().await;
1505        if st.is_exit() {
1506            Ok(st)
1507        } else {
1508            Err(hyperactor::host::TerminateError::ChannelClosed)
1509        }
1510    }
1511}
1512
1513/// A specification of the command used to bootstrap procs.
1514#[derive(Debug, Named, Serialize, Deserialize, Clone, Default)]
1515pub struct BootstrapCommand {
1516    pub program: PathBuf,
1517    pub arg0: Option<String>,
1518    pub args: Vec<String>,
1519    pub env: HashMap<String, String>,
1520}
1521wirevalue::register_type!(BootstrapCommand);
1522
1523impl std::hash::Hash for BootstrapCommand {
1524    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1525        self.program.hash(state);
1526        self.arg0.hash(state);
1527        self.args.hash(state);
1528        let mut pairs: Vec<_> = self.env.iter().collect();
1529        pairs.sort();
1530        pairs.hash(state);
1531    }
1532}
1533
1534impl PartialEq for BootstrapCommand {
1535    fn eq(&self, other: &Self) -> bool {
1536        self.program == other.program
1537            && self.arg0 == other.arg0
1538            && self.args == other.args
1539            && self.env == other.env
1540    }
1541}
1542
1543impl Eq for BootstrapCommand {}
1544
1545impl BootstrapCommand {
1546    /// Creates a bootstrap command specification to replicate the
1547    /// invocation of the currently running process.
1548    pub fn current() -> io::Result<Self> {
1549        let mut args: VecDeque<String> = std::env::args().collect();
1550        let arg0 = args.pop_front();
1551
1552        Ok(Self {
1553            program: std::env::current_exe()?,
1554            arg0,
1555            args: args.into(),
1556            env: std::env::vars().collect(),
1557        })
1558    }
1559
1560    /// Create a new `Command` reflecting this bootstrap command
1561    /// configuration.
1562    pub fn new(&self) -> Command {
1563        let mut cmd = Command::new(&self.program);
1564        if let Some(arg0) = &self.arg0 {
1565            cmd.arg0(arg0);
1566        }
1567        for arg in &self.args {
1568            cmd.arg(arg);
1569        }
1570        for (k, v) in &self.env {
1571            cmd.env(k, v);
1572        }
1573        cmd
1574    }
1575
1576    /// Bootstrap command used for testing, invoking the Buck-built
1577    /// `monarch/hyperactor_mesh/bootstrap` binary.
1578    ///
1579    /// Intended for integration tests where we need to spawn real
1580    /// bootstrap processes under proc manager control. Not available
1581    /// outside of test builds.
1582    #[cfg(test)]
1583    #[cfg(fbcode_build)]
1584    pub(crate) fn test() -> Self {
1585        Self {
1586            program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1587            arg0: None,
1588            args: vec![],
1589            env: HashMap::new(),
1590        }
1591    }
1592}
1593
1594impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1595    /// Creates a bootstrap command from the provided path.
1596    fn from(s: T) -> Self {
1597        Self {
1598            program: s.into(),
1599            arg0: None,
1600            args: vec![],
1601            env: HashMap::new(),
1602        }
1603    }
1604}
1605
1606/// Selects which built-in process launcher backend to use for
1607/// spawning procs.
1608///
1609/// This is an internal "implementation choice" control for the
1610/// `ProcLauncher` abstraction: both variants are expected to satisfy
1611/// the same lifecycle contract (launch, observe exit,
1612/// terminate/kill), but they differ in *how* the OS process is
1613/// supervised.
1614///
1615/// Variants:
1616/// - [`LauncherKind::Native`]: spawns and supervises child processes
1617///   directly using `tokio::process` (traditional parent/child
1618///   model).
1619/// - [`LauncherKind::Systemd`]: delegates supervision to `systemd
1620///   --user` by creating transient `.service` units and observing
1621///   lifecycle via D-Bus.
1622///
1623/// Configuration/parsing:
1624/// - The empty string and `"native"` map to [`LauncherKind::Native`]
1625///   (default).
1626/// - `"systemd"` maps to [`LauncherKind::Systemd`].
1627/// - Any other value is rejected as [`io::ErrorKind::InvalidInput`].
1628#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1629pub(crate) enum LauncherKind {
1630    /// Spawn and supervise OS children directly (tokio-based
1631    /// launcher).
1632    Native,
1633    /// Spawn via transient `systemd --user` units and observe via
1634    /// D-Bus.
1635    #[cfg(target_os = "linux")]
1636    Systemd,
1637}
1638
1639impl FromStr for LauncherKind {
1640    type Err = io::Error;
1641
1642    /// Parse a launcher kind from configuration text.
1643    ///
1644    /// Accepted values (case-insensitive, surrounding whitespace
1645    /// ignored):
1646    /// - `""` or `"native"` → [`LauncherKind::Native`]
1647    /// - `"systemd"` → [`LauncherKind::Systemd`] (Linux only)
1648    ///
1649    /// Returns [`io::ErrorKind::InvalidInput`] for any other string.
1650    fn from_str(s: &str) -> Result<Self, Self::Err> {
1651        match s.trim().to_ascii_lowercase().as_str() {
1652            "" | "native" => Ok(Self::Native),
1653            #[cfg(target_os = "linux")]
1654            "systemd" => Ok(Self::Systemd),
1655            other => Err(io::Error::new(
1656                io::ErrorKind::InvalidInput,
1657                format!(
1658                    "unknown proc launcher kind {other:?}; expected 'native'{}",
1659                    if cfg!(target_os = "linux") {
1660                        " or 'systemd'"
1661                    } else {
1662                        ""
1663                    }
1664                ),
1665            )),
1666        }
1667    }
1668}
1669
1670/// Host-side manager for launching and supervising **bootstrap
1671/// processes** (via the `bootstrap` entry point).
1672///
1673/// `BootstrapProcManager` is responsible for:
1674/// - choosing and constructing the configured [`ProcLauncher`]
1675///   backend,
1676/// - preparing the bootstrap command/environment for each proc,
1677/// - tracking proc lifecycle state via [`BootstrapProcHandle`] /
1678///   [`ProcStatus`],
1679/// - providing status/query APIs over the set of active procs.
1680///
1681/// It maintains an async registry mapping [`ProcId`] →
1682/// [`BootstrapProcHandle`] for lifecycle queries and exit
1683/// observation.
1684///
1685/// ## Stdio and cleanup
1686///
1687/// Stdio handling and shutdown/cleanup behavior are
1688/// **launcher-dependent**:
1689/// - The native launcher may capture/tail stdout/stderr and manages
1690///   OS child processes directly.
1691/// - The systemd launcher delegates supervision to systemd transient
1692///   units on the user manager and does not expose a PID; stdio is
1693///   managed by systemd.
1694///
1695/// On drop/shutdown, process cleanup is *best-effort* and performed
1696/// via the selected launcher (e.g. direct child termination for
1697/// native, `StopUnit` for systemd).
1698pub struct BootstrapProcManager {
1699    /// The process launcher backend. Initialized on first use via
1700    /// `launcher()`, or explicitly via `set_launcher()`.
1701    launcher: OnceLock<Arc<dyn ProcLauncher>>,
1702
1703    /// The command specification used to bootstrap new processes.
1704    command: BootstrapCommand,
1705
1706    /// Async registry of running children, keyed by [`ProcId`]. Holds
1707    /// [`BootstrapProcHandle`]s so callers can query or monitor
1708    /// status.
1709    children: Arc<tokio::sync::Mutex<HashMap<hyperactor_reference::ProcId, BootstrapProcHandle>>>,
1710
1711    /// FileMonitor that aggregates logs from all children. None if
1712    /// file monitor creation failed.
1713    file_appender: Option<Arc<crate::logging::FileAppender>>,
1714
1715    /// Directory for storing proc socket files. Procs place their
1716    /// sockets in this directory, so that they can be looked up by
1717    /// other procs for direct transfer.
1718    socket_dir: TempDir,
1719}
1720
1721impl BootstrapProcManager {
1722    /// Construct a new [`BootstrapProcManager`] that will launch
1723    /// procs using the given bootstrap command specification.
1724    ///
1725    /// This is the general entry point when you want to manage procs
1726    /// backed by a specific binary path (e.g. a bootstrap
1727    /// trampoline).
1728    pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1729        let file_appender = if hyperactor_config::global::get(MESH_ENABLE_FILE_CAPTURE) {
1730            match crate::logging::FileAppender::new() {
1731                Some(fm) => {
1732                    tracing::info!("file appender created successfully");
1733                    Some(Arc::new(fm))
1734                }
1735                None => {
1736                    tracing::warn!("failed to create file appender");
1737                    None
1738                }
1739            }
1740        } else {
1741            None
1742        };
1743
1744        Ok(Self {
1745            launcher: OnceLock::new(),
1746            command,
1747            children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1748            file_appender,
1749            socket_dir: runtime_dir()?,
1750        })
1751    }
1752
1753    /// Install a custom launcher.
1754    ///
1755    /// Returns error if already initialized (by prior
1756    /// `set_launcher()` OR by a spawn that triggered default init via
1757    /// `launcher()`).
1758    ///
1759    /// Must be called before any spawn operation that would
1760    /// initialize the default launcher.
1761    pub fn set_launcher(&self, launcher: Arc<dyn ProcLauncher>) -> Result<(), ProcLauncherError> {
1762        self.launcher.set(launcher).map_err(|_| {
1763            ProcLauncherError::Other(
1764                "launcher already initialized; call set_proc_launcher before first spawn".into(),
1765            )
1766        })
1767    }
1768
1769    /// Get the launcher, initializing with the default if not already
1770    /// set.
1771    ///
1772    /// Once this is called, any subsequent calls to `set_launcher()`
1773    /// will fail.
1774    pub fn launcher(&self) -> &Arc<dyn ProcLauncher> {
1775        self.launcher.get_or_init(|| {
1776            let kind_str = hyperactor_config::global::get_cloned(MESH_PROC_LAUNCHER_KIND);
1777            let kind: LauncherKind = kind_str.parse().unwrap_or(LauncherKind::Native);
1778            tracing::info!(kind = ?kind, config_value = %kind_str, "using default proc launcher");
1779            match kind {
1780                LauncherKind::Native => Arc::new(NativeProcLauncher::new()),
1781                #[cfg(target_os = "linux")]
1782                LauncherKind::Systemd => Arc::new(SystemdProcLauncher::new()),
1783            }
1784        })
1785    }
1786
1787    /// The bootstrap command used to launch processes.
1788    pub fn command(&self) -> &BootstrapCommand {
1789        &self.command
1790    }
1791
1792    /// The socket directory, where per-proc Unix sockets are placed.
1793    pub fn socket_dir(&self) -> &Path {
1794        self.socket_dir.path()
1795    }
1796
1797    /// Return the current [`ProcStatus`] for the given [`ProcId`], if
1798    /// the proc is known to this manager.
1799    ///
1800    /// This queries the live [`BootstrapProcHandle`] stored in the
1801    /// manager's internal map. It provides an immediate snapshot of
1802    /// lifecycle state (`Starting`, `Running`, `Stopping`, `Stopped`,
1803    /// etc.).
1804    ///
1805    /// Returns `None` if the manager has no record of the proc (e.g.
1806    /// never spawned here, or entry already removed).
1807    pub async fn status(&self, proc_id: &hyperactor_reference::ProcId) -> Option<ProcStatus> {
1808        self.children.lock().await.get(proc_id).map(|h| h.status())
1809    }
1810
1811    /// Return a watch receiver for the given proc's status stream,
1812    /// if the proc is known to this manager.
1813    pub async fn watch(
1814        &self,
1815        proc_id: &hyperactor_reference::ProcId,
1816    ) -> Option<tokio::sync::watch::Receiver<ProcStatus>> {
1817        self.children.lock().await.get(proc_id).map(|h| h.watch())
1818    }
1819
1820    /// Non-blocking stop: send `StopAll`, then spawn a background task
1821    /// that waits for exit and escalates if needed.
1822    ///
1823    /// The handle stays in `children` so that [`status()`] continues
1824    /// to reflect the live lifecycle (Stopping -> Stopped/Killed/Failed).
1825    /// Idempotent: no-ops if the proc is already stopping or exited.
1826    pub(crate) async fn request_stop(
1827        &self,
1828        cx: &impl context::Actor,
1829        proc: &hyperactor_reference::ProcId,
1830        timeout: Duration,
1831        reason: &str,
1832    ) {
1833        let handle = {
1834            let guard = self.children.lock().await;
1835            guard.get(proc).cloned()
1836        };
1837
1838        let Some(handle) = handle else { return };
1839
1840        let status = handle.status();
1841        if status.is_exit() || matches!(status, ProcStatus::Stopping { .. }) {
1842            return;
1843        }
1844
1845        if let Some(agent) = handle.agent_ref() {
1846            let mut agent_port = agent.port();
1847            agent_port.return_undeliverable(false);
1848            let _ = agent_port.send(
1849                cx,
1850                resource::StopAll {
1851                    reason: reason.to_string(),
1852                },
1853            );
1854        }
1855
1856        let _ = handle.mark_stopping();
1857        tokio::spawn(async move {
1858            handle.wait_or_brutally_kill(timeout).await;
1859        });
1860    }
1861
1862    fn spawn_exit_monitor(
1863        &self,
1864        proc_id: hyperactor_reference::ProcId,
1865        handle: BootstrapProcHandle,
1866        exit_rx: tokio::sync::oneshot::Receiver<ProcExitResult>,
1867    ) {
1868        tokio::spawn(async move {
1869            // Wait for the launcher to report terminal status.
1870            let exit_result = match exit_rx.await {
1871                Ok(res) => res,
1872                Err(_) => {
1873                    // exit_rx sender was dropped without sending - launcher error.
1874                    let _ = handle.mark_failed("exit_rx sender dropped unexpectedly");
1875                    tracing::error!(
1876                        name = "ProcStatus",
1877                        status = "Exited::ChannelDropped",
1878                        %proc_id,
1879                        "exit channel closed without result"
1880                    );
1881                    return;
1882                }
1883            };
1884
1885            // Collect stderr tail from StreamFwder if we captured stdio.
1886            // The launcher may also provide stderr_tail in exit_result;
1887            // prefer StreamFwder's tail if available (more complete).
1888            let mut stderr_tail: Vec<String> = Vec::new();
1889            let (stdout_mon, stderr_mon) = handle.take_stream_monitors();
1890
1891            if let Some(t) = stderr_mon {
1892                let (lines, _bytes) = t.abort().await;
1893                stderr_tail = lines;
1894            }
1895            if let Some(t) = stdout_mon {
1896                let (_lines, _bytes) = t.abort().await;
1897            }
1898
1899            // Fall back to launcher-provided tail if we didn't capture.
1900            if stderr_tail.is_empty() {
1901                if let Some(tail) = exit_result.stderr_tail {
1902                    stderr_tail = tail;
1903                }
1904            }
1905
1906            let tail_str = if stderr_tail.is_empty() {
1907                None
1908            } else {
1909                Some(stderr_tail.join("\n"))
1910            };
1911
1912            match exit_result.kind {
1913                ProcExitKind::Exited { code } => {
1914                    let _ = handle.mark_stopped(code, stderr_tail);
1915                    tracing::info!(
1916                        name = "ProcStatus",
1917                        status = "Exited::ExitWithCode",
1918                        %proc_id,
1919                        exit_code = code,
1920                        tail = tail_str,
1921                        "proc exited with code {code}"
1922                    );
1923                }
1924                ProcExitKind::Signaled {
1925                    signal,
1926                    core_dumped,
1927                } => {
1928                    let _ = handle.mark_killed(signal, core_dumped);
1929                    tracing::info!(
1930                        name = "ProcStatus",
1931                        status = "Exited::KilledBySignal",
1932                        %proc_id,
1933                        tail = tail_str,
1934                        "killed by signal {signal}"
1935                    );
1936                }
1937                ProcExitKind::Failed { reason } => {
1938                    let _ = handle.mark_failed(&reason);
1939                    tracing::info!(
1940                        name = "ProcStatus",
1941                        status = "Exited::Failed",
1942                        %proc_id,
1943                        tail = tail_str,
1944                        "proc failed: {reason}"
1945                    );
1946                }
1947            }
1948        });
1949    }
1950}
1951
1952pub use crate::proc_launcher::ProcBind;
1953
1954/// The configuration used for bootstrapped procs.
1955pub struct BootstrapProcConfig {
1956    /// The proc's create rank.
1957    pub create_rank: usize,
1958
1959    /// Config values to set on the spawned proc's global config,
1960    /// at the `ClientOverride` layer.
1961    pub client_config_override: Attrs,
1962
1963    /// Optional per-process CPU/NUMA binding configuration.
1964    /// When set, the bootstrap command is wrapped with `numactl`
1965    /// (on NUMA systems) or `taskset` (Linux fallback) before launch.
1966    pub proc_bind: Option<ProcBind>,
1967    /// Optional bootstrap command override. When set, this command is used
1968    /// to spawn the proc instead of the manager's default bootstrap command.
1969    pub bootstrap_command: Option<BootstrapCommand>,
1970}
1971
1972#[async_trait]
1973impl ProcManager for BootstrapProcManager {
1974    type Handle = BootstrapProcHandle;
1975
1976    type Config = BootstrapProcConfig;
1977
1978    /// Return the [`ChannelTransport`] used by this proc manager.
1979    ///
1980    /// For `BootstrapProcManager` this is always
1981    /// [`ChannelTransport::Unix`], since all procs are spawned
1982    /// locally on the same host and communicate over Unix domain
1983    /// sockets.
1984    fn transport(&self) -> ChannelTransport {
1985        ChannelTransport::Unix
1986    }
1987
1988    /// Launch a new proc under this [`BootstrapProcManager`].
1989    ///
1990    /// Spawns the configured bootstrap binary (`self.program`) in a
1991    /// fresh child process. The environment is populated with
1992    /// variables that describe the bootstrap context — most
1993    /// importantly `HYPERACTOR_MESH_BOOTSTRAP_MODE`, which carries a
1994    /// base64-encoded JSON [`Bootstrap::Proc`] payload (proc id,
1995    /// backend addr, callback addr, optional config snapshot).
1996    /// Additional variables like `BOOTSTRAP_LOG_CHANNEL` are also set
1997    /// up for logging and control.
1998    ///
1999    /// Responsibilities performed here:
2000    /// - Create a one-shot callback channel so the child can confirm
2001    ///   successful bootstrap and return its mailbox address plus agent
2002    ///   reference.
2003    /// - Spawn the OS process with stdout/stderr piped.
2004    /// - Stamp the new [`BootstrapProcHandle`] as
2005    ///   [`ProcStatus::Running`] once a PID is observed.
2006    /// - Wire stdout/stderr pipes into local writers and forward them
2007    ///   over the logging channel (`BOOTSTRAP_LOG_CHANNEL`).
2008    /// - Insert the handle into the manager's children map and start
2009    ///   an exit monitor to track process termination.
2010    ///
2011    /// Returns a [`BootstrapProcHandle`] that exposes the child
2012    /// process's lifecycle (status, wait/ready, termination). Errors
2013    /// are surfaced as [`HostError`].
2014    #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string()))]
2015    async fn spawn(
2016        &self,
2017        proc_id: hyperactor_reference::ProcId,
2018        backend_addr: ChannelAddr,
2019        config: BootstrapProcConfig,
2020    ) -> Result<Self::Handle, HostError> {
2021        let (callback_addr, mut callback_rx) =
2022            channel::serve::<(ChannelAddr, hyperactor_reference::ActorRef<ProcAgent>)>(
2023                ChannelAddr::any(ChannelTransport::Unix),
2024            )?;
2025
2026        // Decide whether we need to capture stdio.
2027        let overrides = &config.client_config_override;
2028        let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
2029        let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
2030        let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
2031        let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
2032
2033        let mode = Bootstrap::Proc {
2034            proc_id: proc_id.clone(),
2035            backend_addr,
2036            callback_addr,
2037            socket_dir_path: self.socket_dir.path().to_owned(),
2038            config: Some(config.client_config_override.clone()),
2039        };
2040
2041        // Build LaunchOptions for the launcher.
2042        let bootstrap_payload = mode
2043            .to_env_safe_string()
2044            .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?;
2045
2046        let opts = LaunchOptions {
2047            bootstrap_payload,
2048            process_name: format_process_name(&proc_id),
2049            command: config
2050                .bootstrap_command
2051                .as_ref()
2052                .unwrap_or(&self.command)
2053                .clone(),
2054            want_stdio: need_stdio,
2055            tail_lines: tail_size,
2056            log_channel: if enable_forwarding {
2057                Some(ChannelAddr::any(ChannelTransport::Unix))
2058            } else {
2059                None
2060            },
2061            proc_bind: config.proc_bind.clone(),
2062        };
2063
2064        // Launch via the configured launcher backend.
2065        let launch_result = self
2066            .launcher()
2067            .launch(&proc_id, opts.clone())
2068            .await
2069            .map_err(|e| {
2070                let io_err = match e {
2071                    ProcLauncherError::Launch(io_err) => io_err,
2072                    other => std::io::Error::other(other.to_string()),
2073                };
2074                HostError::ProcessSpawnFailure(proc_id.clone(), io_err)
2075            })?;
2076
2077        // Wire up StreamFwders if stdio was captured.
2078        let (out_fwder, err_fwder) = match launch_result.stdio {
2079            StdioHandling::Captured { stdout, stderr } => {
2080                let (file_stdout, file_stderr) = if enable_file_capture {
2081                    match self.file_appender.as_deref() {
2082                        Some(fm) => (
2083                            Some(fm.addr_for(OutputTarget::Stdout)),
2084                            Some(fm.addr_for(OutputTarget::Stderr)),
2085                        ),
2086                        None => {
2087                            tracing::warn!("enable_file_capture=true but no FileAppender");
2088                            (None, None)
2089                        }
2090                    }
2091                } else {
2092                    (None, None)
2093                };
2094
2095                let out = StreamFwder::start(
2096                    stdout,
2097                    file_stdout,
2098                    OutputTarget::Stdout,
2099                    tail_size,
2100                    opts.log_channel.clone(),
2101                    &proc_id,
2102                    config.create_rank,
2103                );
2104                let err = StreamFwder::start(
2105                    stderr,
2106                    file_stderr,
2107                    OutputTarget::Stderr,
2108                    tail_size,
2109                    opts.log_channel.clone(),
2110                    &proc_id,
2111                    config.create_rank,
2112                );
2113                (Some(out), Some(err))
2114            }
2115            StdioHandling::Inherited | StdioHandling::ManagedByLauncher => {
2116                if !need_stdio {
2117                    tracing::info!(
2118                        %proc_id, enable_forwarding, enable_file_capture, tail_size,
2119                        "child stdio NOT captured (forwarding/file_capture/tail all disabled)"
2120                    );
2121                }
2122                (None, None)
2123            }
2124        };
2125
2126        // Create handle with launcher reference for terminate/kill delegation.
2127        let handle = BootstrapProcHandle::new(proc_id.clone(), Arc::downgrade(self.launcher()));
2128        handle.mark_running(launch_result.started_at);
2129        handle.set_stream_monitors(out_fwder, err_fwder);
2130
2131        // Retain handle for lifecycle management.
2132        {
2133            let mut children = self.children.lock().await;
2134            children.insert(proc_id.clone(), handle.clone());
2135        }
2136
2137        // Kick off an exit monitor that updates ProcStatus when the
2138        // launcher reports terminal status.
2139        self.spawn_exit_monitor(proc_id.clone(), handle.clone(), launch_result.exit_rx);
2140
2141        // Handle callback from child proc when it confirms bootstrap.
2142        let h = handle.clone();
2143        tokio::spawn(async move {
2144            match callback_rx.recv().await {
2145                Ok((addr, agent)) => {
2146                    let _ = h.mark_ready(addr, agent);
2147                }
2148                Err(e) => {
2149                    // Child never called back; record failure.
2150                    let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
2151                }
2152            }
2153        });
2154
2155        // Callers do `handle.read().await` for mesh readiness.
2156        Ok(handle)
2157    }
2158}
2159
2160#[async_trait]
2161impl hyperactor::host::SingleTerminate for BootstrapProcManager {
2162    /// Attempt to gracefully terminate one child procs managed by
2163    /// this `BootstrapProcManager`.
2164    ///
2165    /// Each child handle is asked to `terminate(timeout)`, which
2166    /// sends SIGTERM, waits up to the deadline, and escalates to
2167    /// SIGKILL if necessary. Termination is attempted concurrently,
2168    /// with at most `max_in_flight` tasks running at once.
2169    ///
2170    /// Logs a warning for each failure.
2171    async fn terminate_proc(
2172        &self,
2173        cx: &impl context::Actor,
2174        proc: &hyperactor_reference::ProcId,
2175        timeout: Duration,
2176        reason: &str,
2177    ) -> Result<
2178        (
2179            Vec<hyperactor_reference::ActorId>,
2180            Vec<hyperactor_reference::ActorId>,
2181        ),
2182        anyhow::Error,
2183    > {
2184        // Snapshot to avoid holding the lock across awaits.
2185        let proc_handle: Option<BootstrapProcHandle> = {
2186            let mut guard = self.children.lock().await;
2187            guard.remove(proc)
2188        };
2189
2190        if let Some(h) = proc_handle {
2191            h.terminate(cx, timeout, reason)
2192                .await
2193                .map(|_| (Vec::new(), Vec::new()))
2194                .map_err(|e| e.into())
2195        } else {
2196            Err(anyhow::anyhow!("proc doesn't exist: {}", proc))
2197        }
2198    }
2199}
2200
2201#[async_trait]
2202impl hyperactor::host::BulkTerminate for BootstrapProcManager {
2203    /// Attempt to gracefully terminate all child procs managed by
2204    /// this `BootstrapProcManager`.
2205    ///
2206    /// Each child handle is asked to `terminate(timeout)`, which
2207    /// sends SIGTERM, waits up to the deadline, and escalates to
2208    /// SIGKILL if necessary. Termination is attempted concurrently,
2209    /// with at most `max_in_flight` tasks running at once.
2210    ///
2211    /// Returns a [`TerminateSummary`] with counts of how many procs
2212    /// were attempted, how many successfully terminated (including
2213    /// those that were already terminal), and how many failed.
2214    ///
2215    /// Logs a warning for each failure.
2216    async fn terminate_all(
2217        &self,
2218        cx: &impl context::Actor,
2219        timeout: Duration,
2220        max_in_flight: usize,
2221        reason: &str,
2222    ) -> TerminateSummary {
2223        // Drain the children list to avoid holding the lock across awaits and
2224        // avoid subsequent calls from trying to terminate again.
2225        let handles: Vec<BootstrapProcHandle> = {
2226            let mut guard = self.children.lock().await;
2227            guard.drain().map(|(_, v)| v).collect()
2228        };
2229
2230        let attempted = handles.len();
2231        let mut ok = 0usize;
2232
2233        let results = stream::iter(handles.into_iter().map(|h| async move {
2234            match h.terminate(cx, timeout, reason).await {
2235                Ok(_) | Err(hyperactor::host::TerminateError::AlreadyTerminated(_)) => {
2236                    // Treat "already terminal" as success.
2237                    true
2238                }
2239                Err(e) => {
2240                    tracing::warn!(error=%e, "terminate_all: failed to terminate child");
2241                    false
2242                }
2243            }
2244        }))
2245        .buffer_unordered(max_in_flight.max(1))
2246        .collect::<Vec<bool>>()
2247        .await;
2248
2249        for r in results {
2250            if r {
2251                ok += 1;
2252            }
2253        }
2254
2255        TerminateSummary {
2256            attempted,
2257            ok,
2258            failed: attempted.saturating_sub(ok),
2259        }
2260    }
2261}
2262
2263/// Entry point to processes managed by hyperactor_mesh. Any process that is part
2264/// of a hyperactor_mesh program should call [`bootstrap`], which then configures
2265/// the process according to how it is invoked.
2266///
2267/// If bootstrap returns any error, it is defunct from the point of view of hyperactor_mesh,
2268/// and the process should likely exit:
2269///
2270/// ```ignore
2271/// let err = hyperactor_mesh::bootstrap().await;
2272/// tracing::error("could not bootstrap mesh process: {}", err);
2273/// std::process::exit(1);
2274/// ```
2275///
2276/// Use [`bootstrap_or_die`] to implement this behavior directly.
2277/// Else if the bootstrap returns Ok, the process has cleaned up successfully and
2278/// should exit the "main" of the program.
2279pub async fn bootstrap() -> anyhow::Result<i32> {
2280    let boot = Bootstrap::get_from_env()?.unwrap_or_else(Bootstrap::default);
2281    boot.bootstrap().await
2282}
2283
2284/// Bootstrap a v0 proc mesh. This launches a control process that responds to
2285/// Allocator2Process messages, conveying its own state in Process2Allocator messages.
2286///
2287/// The bootstrapping process is controlled by the
2288/// following environment variables:
2289///
2290/// - `HYPERACTOR_MESH_BOOTSTRAP_ADDR`: the channel address to which Process2Allocator messages
2291///   should be sent.
2292/// - `HYPERACTOR_MESH_INDEX`: an index used to identify this process to the allocator.
2293async fn bootstrap_v0_proc_mesh(config: Option<Attrs>) -> anyhow::Error {
2294    // Apply config before entering the nested scope
2295    if let Some(attrs) = config {
2296        hyperactor_config::global::set(hyperactor_config::global::Source::ClientOverride, attrs);
2297        tracing::debug!("bootstrap: installed ClientOverride config snapshot (V0ProcMesh)");
2298    } else {
2299        tracing::debug!("bootstrap: no config snapshot provided (V0ProcMesh)");
2300    }
2301    tracing::info!(
2302        "bootstrap_v0_proc_mesh config:\n{}",
2303        hyperactor_config::global::attrs()
2304    );
2305
2306    pub async fn go() -> Result<(), anyhow::Error> {
2307        let procs = Arc::new(tokio::sync::Mutex::new(Vec::<Proc>::new()));
2308        let procs_for_cleanup = procs.clone();
2309        let _cleanup_guard = hyperactor::register_signal_cleanup_scoped(Box::pin(async move {
2310            for proc_to_stop in procs_for_cleanup.lock().await.iter_mut() {
2311                if let Err(err) = proc_to_stop
2312                    .destroy_and_wait::<()>(
2313                        Duration::from_millis(10),
2314                        None,
2315                        "execute cleanup callback",
2316                    )
2317                    .await
2318                {
2319                    tracing::error!(
2320                        "error while stopping proc {}: {}",
2321                        proc_to_stop.proc_id(),
2322                        err
2323                    );
2324                }
2325            }
2326        }));
2327
2328        let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
2329            .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
2330            .parse()?;
2331        let bootstrap_index: usize = std::env::var(BOOTSTRAP_INDEX_ENV)
2332            .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))?
2333            .parse()?;
2334        let listen_addr = ChannelAddr::any(bootstrap_addr.transport());
2335
2336        let entered = tracing::span!(
2337            Level::INFO,
2338            "bootstrap_v0_proc_mesh",
2339            %bootstrap_addr,
2340            %bootstrap_index,
2341            %listen_addr,
2342        )
2343        .entered();
2344
2345        let (serve_addr, mut rx) = channel::serve(listen_addr)?;
2346        let tx = channel::dial(bootstrap_addr.clone())?;
2347
2348        let (rtx, mut return_channel) = oneshot::channel();
2349        tx.try_post(
2350            Process2Allocator(bootstrap_index, Process2AllocatorMessage::Hello(serve_addr)),
2351            rtx,
2352        );
2353        tokio::spawn(exit_if_missed_heartbeat(bootstrap_index, bootstrap_addr));
2354
2355        let _ = entered.exit();
2356
2357        let mut the_msg;
2358
2359        tokio::select! {
2360            msg = rx.recv() => {
2361                the_msg = msg;
2362            }
2363            returned_msg = &mut return_channel => {
2364                match returned_msg {
2365                    Ok(msg) => {
2366                        return Err(anyhow::anyhow!("Hello message was not delivered:{:?}", msg));
2367                    }
2368                    Err(_) => {
2369                        the_msg = rx.recv().await;
2370                    }
2371                }
2372            }
2373        }
2374        loop {
2375            match the_msg? {
2376                Allocator2Process::StartProc(proc_id, listen_transport) => {
2377                    let span = tracing::span!(Level::INFO, "Allocator2Process::StartProc", %proc_id, %listen_transport);
2378                    let (proc, mesh_agent) = ProcAgent::bootstrap(proc_id.clone())
2379                        .instrument(span.clone())
2380                        .await?;
2381                    let entered = span.entered();
2382                    let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?;
2383                    let handle = proc.clone().serve(proc_rx);
2384                    drop(handle); // linter appeasement; it is safe to drop this future
2385                    let span = entered.exit();
2386                    tx.send(Process2Allocator(
2387                        bootstrap_index,
2388                        Process2AllocatorMessage::StartedProc(
2389                            proc_id.clone(),
2390                            mesh_agent.bind(),
2391                            proc_addr,
2392                        ),
2393                    ))
2394                    .instrument(span)
2395                    .await?;
2396                    procs.lock().await.push(proc);
2397                }
2398                Allocator2Process::StopAndExit(code) => {
2399                    tracing::info!("stopping procs with code {code}");
2400                    {
2401                        for proc_to_stop in procs.lock().await.iter_mut() {
2402                            if let Err(err) = proc_to_stop
2403                                .destroy_and_wait::<()>(
2404                                    Duration::from_millis(10),
2405                                    None,
2406                                    "stop and exit",
2407                                )
2408                                .await
2409                            {
2410                                tracing::error!(
2411                                    "error while stopping proc {}: {}",
2412                                    proc_to_stop.proc_id(),
2413                                    err
2414                                );
2415                            }
2416                        }
2417                    }
2418                    tracing::info!("exiting with {code}");
2419                    std::process::exit(code);
2420                }
2421                Allocator2Process::Exit(code) => {
2422                    tracing::info!("exiting with {code}");
2423                    std::process::exit(code);
2424                }
2425            }
2426            the_msg = rx.recv().await;
2427        }
2428    }
2429
2430    go().await.unwrap_err()
2431}
2432
2433/// A variant of [`bootstrap`] that logs the error and exits the process
2434/// if bootstrapping fails.
2435pub async fn bootstrap_or_die() -> ! {
2436    match bootstrap().await {
2437        Ok(exit_code) => std::process::exit(exit_code),
2438        Err(err) => {
2439            let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
2440            tracing::error!("failed to bootstrap mesh process: {}", err);
2441            std::process::exit(1);
2442        }
2443    }
2444}
2445
2446#[derive(enum_as_inner::EnumAsInner)]
2447enum DebugSink {
2448    File(std::fs::File),
2449    Sink,
2450}
2451
2452impl DebugSink {
2453    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2454        match self {
2455            DebugSink::File(f) => f.write(buf),
2456            DebugSink::Sink => Ok(buf.len()),
2457        }
2458    }
2459    fn flush(&mut self) -> io::Result<()> {
2460        match self {
2461            DebugSink::File(f) => f.flush(),
2462            DebugSink::Sink => Ok(()),
2463        }
2464    }
2465}
2466
2467fn debug_sink() -> &'static Mutex<DebugSink> {
2468    static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
2469    DEBUG_SINK.get_or_init(|| {
2470        let debug_path = {
2471            let mut p = std::env::temp_dir();
2472            if let Ok(user) = std::env::var("USER") {
2473                p.push(user);
2474            }
2475            std::fs::create_dir_all(&p).ok();
2476            p.push("monarch-bootstrap-debug.log");
2477            p
2478        };
2479        let sink = if debug_path.exists() {
2480            match OpenOptions::new()
2481                .append(true)
2482                .create(true)
2483                .open(debug_path.clone())
2484            {
2485                Ok(f) => DebugSink::File(f),
2486                Err(_e) => {
2487                    eprintln!(
2488                        "failed to open {} for bootstrap debug logging",
2489                        debug_path.display()
2490                    );
2491                    DebugSink::Sink
2492                }
2493            }
2494        } else {
2495            DebugSink::Sink
2496        };
2497        Mutex::new(sink)
2498    })
2499}
2500
2501/// If true, send `Debug` messages to stderr.
2502const DEBUG_TO_STDERR: bool = false;
2503
2504/// A bootstrap specific debug writer. If the file /tmp/monarch-bootstrap-debug.log
2505/// exists, then the writer's destination is that file; otherwise it discards all writes.
2506struct Debug;
2507
2508impl Debug {
2509    fn is_active() -> bool {
2510        DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
2511    }
2512}
2513
2514impl Write for Debug {
2515    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2516        let res = debug_sink().lock().unwrap().write(buf);
2517        if DEBUG_TO_STDERR {
2518            let n = match res {
2519                Ok(n) => n,
2520                Err(_) => buf.len(),
2521            };
2522            let _ = io::stderr().write_all(&buf[..n]);
2523        }
2524
2525        res
2526    }
2527    fn flush(&mut self) -> io::Result<()> {
2528        let res = debug_sink().lock().unwrap().flush();
2529        if DEBUG_TO_STDERR {
2530            let _ = io::stderr().flush();
2531        }
2532        res
2533    }
2534}
2535
2536/// Create a new runtime [`TempDir`]. The directory is created in
2537/// `$XDG_RUNTIME_DIR`, otherwise falling back to the system tempdir.
2538fn runtime_dir() -> io::Result<TempDir> {
2539    match std::env::var_os("XDG_RUNTIME_DIR") {
2540        Some(runtime_dir) => {
2541            let path = PathBuf::from(runtime_dir);
2542            tempfile::tempdir_in(path)
2543        }
2544        None => tempfile::tempdir(),
2545    }
2546}
2547
2548#[cfg(test)]
2549mod tests {
2550    use std::path::PathBuf;
2551
2552    use hyperactor::RemoteSpawn;
2553    use hyperactor::channel::ChannelAddr;
2554    use hyperactor::channel::ChannelTransport;
2555    use hyperactor::channel::TcpMode;
2556    use hyperactor::context::Mailbox as _;
2557    use hyperactor::host::ProcHandle;
2558    use hyperactor::reference as hyperactor_reference;
2559    use hyperactor::testing::ids::test_proc_id;
2560    use hyperactor::testing::ids::test_proc_id_with_addr;
2561    use hyperactor_config::Flattrs;
2562    use ndslice::Extent;
2563    use ndslice::ViewExt;
2564    use ndslice::extent;
2565    use tokio::process::Command;
2566
2567    use super::*;
2568    use crate::ActorMesh;
2569    use crate::alloc::AllocSpec;
2570    use crate::alloc::Allocator;
2571    use crate::alloc::ProcessAllocator;
2572    use crate::host_mesh::HostMesh;
2573    use crate::testactor;
2574    use crate::testing;
2575
2576    #[test]
2577    fn test_bootstrap_mode_env_string_none_config_proc() {
2578        let values = [
2579            Bootstrap::default(),
2580            Bootstrap::Proc {
2581                proc_id: test_proc_id("foo_0"),
2582                backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
2583                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2584                socket_dir_path: PathBuf::from("notexist"),
2585                config: None,
2586            },
2587        ];
2588
2589        for value in values {
2590            let safe = value.to_env_safe_string().unwrap();
2591            let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2592
2593            // Re-encode and compare: deterministic round-trip of the
2594            // wire format.
2595            let safe2 = round.to_env_safe_string().unwrap();
2596            assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2597
2598            // Sanity: the decoded variant is what we expect.
2599            match (&value, &round) {
2600                (Bootstrap::Proc { config: None, .. }, Bootstrap::Proc { config: None, .. }) => {}
2601                (
2602                    Bootstrap::V0ProcMesh { config: None },
2603                    Bootstrap::V0ProcMesh { config: None },
2604                ) => {}
2605                _ => panic!("decoded variant mismatch: got {:?}", round),
2606            }
2607        }
2608    }
2609
2610    #[test]
2611    fn test_bootstrap_mode_env_string_none_config_host() {
2612        let value = Bootstrap::Host {
2613            addr: ChannelAddr::any(ChannelTransport::Unix),
2614            command: None,
2615            config: None,
2616            exit_on_shutdown: false,
2617        };
2618
2619        let safe = value.to_env_safe_string().unwrap();
2620        let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2621
2622        // Wire-format round-trip should be identical.
2623        let safe2 = round.to_env_safe_string().unwrap();
2624        assert_eq!(safe, safe2);
2625
2626        // Sanity: decoded variant is Host with None config.
2627        match round {
2628            Bootstrap::Host { config: None, .. } => {}
2629            other => panic!("expected Host with None config, got {:?}", other),
2630        }
2631    }
2632
2633    #[test]
2634    fn test_bootstrap_mode_env_string_invalid() {
2635        // Not valid base64
2636        assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2637    }
2638
2639    #[test]
2640    fn test_bootstrap_config_snapshot_roundtrip() {
2641        // Build a small, distinctive Attrs snapshot.
2642        let mut attrs = Attrs::new();
2643        attrs[MESH_TAIL_LOG_LINES] = 123;
2644        attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2645
2646        let socket_dir = runtime_dir().unwrap();
2647
2648        // Proc case
2649        {
2650            let original = Bootstrap::Proc {
2651                proc_id: test_proc_id("foo_42"),
2652                backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2653                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2654                config: Some(attrs.clone()),
2655                socket_dir_path: socket_dir.path().to_owned(),
2656            };
2657            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2658            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2659            match &decoded {
2660                Bootstrap::Proc { config, .. } => {
2661                    let cfg = config.as_ref().expect("expected Some(attrs)");
2662                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2663                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2664                }
2665                other => panic!("unexpected variant after roundtrip: {:?}", other),
2666            }
2667        }
2668
2669        // Host case
2670        {
2671            let original = Bootstrap::Host {
2672                addr: ChannelAddr::any(ChannelTransport::Unix),
2673                command: None,
2674                config: Some(attrs.clone()),
2675                exit_on_shutdown: false,
2676            };
2677            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2678            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2679            match &decoded {
2680                Bootstrap::Host { config, .. } => {
2681                    let cfg = config.as_ref().expect("expected Some(attrs)");
2682                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2683                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2684                }
2685                other => panic!("unexpected variant after roundtrip: {:?}", other),
2686            }
2687        }
2688
2689        // V0ProcMesh case
2690        {
2691            let original = Bootstrap::V0ProcMesh {
2692                config: Some(attrs.clone()),
2693            };
2694            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2695            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2696            match &decoded {
2697                Bootstrap::V0ProcMesh { config } => {
2698                    let cfg = config.as_ref().expect("expected Some(attrs)");
2699                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2700                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2701                }
2702                other => panic!("unexpected variant after roundtrip: {:?}", other),
2703            }
2704        }
2705    }
2706
2707    #[tokio::test]
2708    async fn test_v1_child_logging() {
2709        use hyperactor::channel;
2710        use hyperactor::mailbox::BoxedMailboxSender;
2711        use hyperactor::mailbox::DialMailboxRouter;
2712        use hyperactor::mailbox::MailboxServer;
2713        use hyperactor::proc::Proc;
2714
2715        use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2716        use crate::logging::LogClientActor;
2717        use crate::logging::LogClientMessageClient;
2718        use crate::logging::LogForwardActor;
2719        use crate::logging::LogMessage;
2720        use crate::logging::OutputTarget;
2721        use crate::logging::test_tap;
2722
2723        let router = DialMailboxRouter::new();
2724        let (proc_addr, proc_rx) =
2725            channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2726        let proc = Proc::configured(
2727            test_proc_id("client_0"),
2728            BoxedMailboxSender::new(router.clone()),
2729        );
2730        proc.clone().serve(proc_rx);
2731        router.bind(test_proc_id("client_0").into(), proc_addr.clone());
2732        let (client, _handle) = proc.instance("client").unwrap();
2733
2734        let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2735        test_tap::install(tap_tx);
2736
2737        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2738        // SAFETY: unit-test scoped env var
2739        unsafe {
2740            std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2741        }
2742
2743        // Spawn the log client and disable aggregation (immediate
2744        // print + tap push).
2745        let log_client_actor = LogClientActor::new((), Flattrs::default()).await.unwrap();
2746        let log_client: hyperactor_reference::ActorRef<LogClientActor> =
2747            proc.spawn("log_client", log_client_actor).unwrap().bind();
2748        log_client.set_aggregate(&client, None).await.unwrap();
2749
2750        // Spawn the forwarder in this proc (it will serve
2751        // BOOTSTRAP_LOG_CHANNEL).
2752        let log_forwarder_actor = LogForwardActor::new(log_client.clone(), Flattrs::default())
2753            .await
2754            .unwrap();
2755        let _log_forwarder: hyperactor_reference::ActorRef<LogForwardActor> = proc
2756            .spawn("log_forwarder", log_forwarder_actor)
2757            .unwrap()
2758            .bind();
2759
2760        // Dial the channel but don't post until we know the forwarder
2761        // is receiving.
2762        let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2763
2764        // Send a fake log message as if it came from the proc
2765        // manager's writer.
2766        tx.post(LogMessage::Log {
2767            hostname: "testhost".into(),
2768            proc_id: "testproc[0]".into(),
2769            output_target: OutputTarget::Stdout,
2770            payload: wirevalue::Any::serialize(&"hello from child".to_string()).unwrap(),
2771        });
2772
2773        // Assert we see it via the tap.
2774        let line = tokio::time::timeout(Duration::from_secs(2), tap_rx.recv())
2775            .await
2776            .expect("timed out waiting for log line")
2777            .expect("tap channel closed unexpectedly");
2778        assert!(
2779            line.contains("hello from child"),
2780            "log line did not appear via LogClientActor; got: {line}"
2781        );
2782    }
2783
2784    mod proc_handle {
2785
2786        use std::sync::Arc;
2787        use std::time::Duration;
2788
2789        use async_trait::async_trait;
2790        use hyperactor::host::ProcHandle;
2791        use hyperactor::reference as hyperactor_reference;
2792        use hyperactor::testing::ids::test_proc_id;
2793
2794        use super::super::*;
2795        use crate::proc_launcher::LaunchOptions;
2796        use crate::proc_launcher::LaunchResult;
2797        use crate::proc_launcher::ProcLauncher;
2798        use crate::proc_launcher::ProcLauncherError;
2799
2800        /// A test launcher that panics on any method call.
2801        ///
2802        /// This is used by unit tests that only exercise status
2803        /// transitions on `BootstrapProcHandle` without actually
2804        /// launching or terminating processes. If any launcher method
2805        /// is called, the test will panic—indicating an unexpected
2806        /// code path.
2807        struct TestProcLauncher;
2808
2809        #[async_trait]
2810        impl ProcLauncher for TestProcLauncher {
2811            async fn launch(
2812                &self,
2813                _proc_id: &hyperactor_reference::ProcId,
2814                _opts: LaunchOptions,
2815            ) -> Result<LaunchResult, ProcLauncherError> {
2816                panic!("TestProcLauncher::launch should not be called in unit tests");
2817            }
2818
2819            async fn terminate(
2820                &self,
2821                _proc_id: &hyperactor_reference::ProcId,
2822                _timeout: Duration,
2823            ) -> Result<(), ProcLauncherError> {
2824                panic!("TestProcLauncher::terminate should not be called in unit tests");
2825            }
2826
2827            async fn kill(
2828                &self,
2829                _proc_id: &hyperactor_reference::ProcId,
2830            ) -> Result<(), ProcLauncherError> {
2831                panic!("TestProcLauncher::kill should not be called in unit tests");
2832            }
2833        }
2834
2835        // Helper: build a ProcHandle for state-transition unit tests.
2836        //
2837        // This creates a handle with a no-op test launcher. The
2838        // launcher will panic if any of its methods are called,
2839        // ensuring tests only exercise status transitions and not
2840        // actual process lifecycle.
2841        fn handle_for_test() -> BootstrapProcHandle {
2842            let proc_id = test_proc_id("0");
2843            let launcher: Arc<dyn ProcLauncher> = Arc::new(TestProcLauncher);
2844            BootstrapProcHandle::new(proc_id, Arc::downgrade(&launcher))
2845        }
2846
2847        #[tokio::test]
2848        async fn starting_to_running_ok() {
2849            let h = handle_for_test();
2850            assert!(matches!(h.status(), ProcStatus::Starting));
2851            let child_started_at = std::time::SystemTime::now();
2852            assert!(h.mark_running(child_started_at));
2853            match h.status() {
2854                ProcStatus::Running { started_at } => {
2855                    assert_eq!(started_at, child_started_at);
2856                }
2857                other => panic!("expected Running, got {other:?}"),
2858            }
2859        }
2860
2861        #[tokio::test]
2862        async fn running_to_stopping_to_stopped_ok() {
2863            let h = handle_for_test();
2864            let child_started_at = std::time::SystemTime::now();
2865            assert!(h.mark_running(child_started_at));
2866            assert!(h.mark_stopping());
2867            assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2868            assert!(h.mark_stopped(0, Vec::new()));
2869            assert!(matches!(
2870                h.status(),
2871                ProcStatus::Stopped { exit_code: 0, .. }
2872            ));
2873        }
2874
2875        #[tokio::test]
2876        async fn running_to_killed_ok() {
2877            let h = handle_for_test();
2878            let child_started_at = std::time::SystemTime::now();
2879            assert!(h.mark_running(child_started_at));
2880            assert!(h.mark_killed(9, true));
2881            assert!(matches!(
2882                h.status(),
2883                ProcStatus::Killed {
2884                    signal: 9,
2885                    core_dumped: true
2886                }
2887            ));
2888        }
2889
2890        #[tokio::test]
2891        async fn running_to_failed_ok() {
2892            let h = handle_for_test();
2893            let child_started_at = std::time::SystemTime::now();
2894            assert!(h.mark_running(child_started_at));
2895            assert!(h.mark_failed("bootstrap error"));
2896            match h.status() {
2897                ProcStatus::Failed { reason } => {
2898                    assert_eq!(reason, "bootstrap error");
2899                }
2900                other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2901            }
2902        }
2903
2904        #[tokio::test]
2905        async fn illegal_transitions_are_rejected() {
2906            let h = handle_for_test();
2907            let child_started_at = std::time::SystemTime::now();
2908            // Starting -> Running is fine; second Running should be rejected.
2909            assert!(h.mark_running(child_started_at));
2910            assert!(!h.mark_running(std::time::SystemTime::now()));
2911            assert!(matches!(h.status(), ProcStatus::Running { .. }));
2912            // Once Stopped, we can't go to Running/Killed/Failed/etc.
2913            assert!(h.mark_stopping());
2914            assert!(h.mark_stopped(0, Vec::new()));
2915            assert!(!h.mark_running(child_started_at));
2916            assert!(!h.mark_killed(9, false));
2917            assert!(!h.mark_failed("nope"));
2918
2919            assert!(matches!(
2920                h.status(),
2921                ProcStatus::Stopped { exit_code: 0, .. }
2922            ));
2923        }
2924
2925        #[tokio::test]
2926        async fn transitions_from_ready_are_legal() {
2927            let h = handle_for_test();
2928            let addr = ChannelAddr::any(ChannelTransport::Unix);
2929            // Mark Running.
2930            let t0 = std::time::SystemTime::now();
2931            assert!(h.mark_running(t0));
2932            // Build a consistent AgentRef for Ready using the
2933            // handle's ProcId.
2934            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2935            let actor_id = proc_id.actor_id("proc_agent", 0);
2936            let agent_ref: hyperactor_reference::ActorRef<ProcAgent> =
2937                hyperactor_reference::ActorRef::attest(actor_id);
2938            // Ready -> Stopping -> Stopped should be legal.
2939            assert!(h.mark_ready(addr, agent_ref));
2940            assert!(h.mark_stopping());
2941            assert!(h.mark_stopped(0, Vec::new()));
2942        }
2943
2944        #[tokio::test]
2945        async fn ready_to_killed_is_legal() {
2946            let h = handle_for_test();
2947            let addr = ChannelAddr::any(ChannelTransport::Unix);
2948            // Starting -> Running
2949            let t0 = std::time::SystemTime::now();
2950            assert!(h.mark_running(t0));
2951            // Build a consistent AgentRef for Ready using the
2952            // handle's ProcId.
2953            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2954            let actor_id = proc_id.actor_id("proc_agent", 0);
2955            let agent: hyperactor_reference::ActorRef<ProcAgent> =
2956                hyperactor_reference::ActorRef::attest(actor_id);
2957            // Running -> Ready
2958            assert!(h.mark_ready(addr, agent));
2959            // Ready -> Killed
2960            assert!(h.mark_killed(9, false));
2961        }
2962
2963        #[tokio::test]
2964        async fn mark_failed_from_stopping_is_allowed() {
2965            let h = handle_for_test();
2966
2967            // Drive Starting -> Stopping.
2968            assert!(h.mark_stopping(), "precondition: to Stopping");
2969
2970            // Now allow Stopping -> Failed.
2971            assert!(
2972                h.mark_failed("boom"),
2973                "mark_failed() should succeed from Stopping"
2974            );
2975            match h.status() {
2976                ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2977                other => panic!("expected Failed(\"boom\"), got {other:?}"),
2978            }
2979        }
2980    }
2981
2982    /// A test launcher that panics on any method call.
2983    ///
2984    /// This is used by unit tests that only exercise status
2985    /// transitions on `BootstrapProcHandle` without actually
2986    /// launching or terminating processes.
2987    struct TestLauncher;
2988
2989    #[async_trait::async_trait]
2990    impl crate::proc_launcher::ProcLauncher for TestLauncher {
2991        async fn launch(
2992            &self,
2993            _proc_id: &hyperactor_reference::ProcId,
2994            _opts: crate::proc_launcher::LaunchOptions,
2995        ) -> Result<crate::proc_launcher::LaunchResult, crate::proc_launcher::ProcLauncherError>
2996        {
2997            panic!("TestLauncher::launch should not be called in unit tests");
2998        }
2999
3000        async fn terminate(
3001            &self,
3002            _proc_id: &hyperactor_reference::ProcId,
3003            _timeout: std::time::Duration,
3004        ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
3005            panic!("TestLauncher::terminate should not be called in unit tests");
3006        }
3007
3008        async fn kill(
3009            &self,
3010            _proc_id: &hyperactor_reference::ProcId,
3011        ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
3012            panic!("TestLauncher::kill should not be called in unit tests");
3013        }
3014    }
3015
3016    fn test_handle(proc_id: hyperactor_reference::ProcId) -> BootstrapProcHandle {
3017        let launcher: std::sync::Arc<dyn crate::proc_launcher::ProcLauncher> =
3018            std::sync::Arc::new(TestLauncher);
3019        BootstrapProcHandle::new(proc_id, std::sync::Arc::downgrade(&launcher))
3020    }
3021
3022    #[tokio::test]
3023    async fn watch_notifies_on_status_changes() {
3024        let proc_id = test_proc_id("1");
3025        let handle = test_handle(proc_id);
3026        let mut rx = handle.watch();
3027
3028        // Starting -> Running
3029        let now = std::time::SystemTime::now();
3030        assert!(handle.mark_running(now));
3031        rx.changed().await.ok(); // Observe the transition.
3032        match &*rx.borrow() {
3033            ProcStatus::Running { started_at } => {
3034                assert_eq!(*started_at, now);
3035            }
3036            s => panic!("expected Running, got {s:?}"),
3037        }
3038
3039        // Running -> Stopped
3040        assert!(handle.mark_stopped(0, Vec::new()));
3041        rx.changed().await.ok(); // Observe the transition.
3042        assert!(matches!(
3043            &*rx.borrow(),
3044            ProcStatus::Stopped { exit_code: 0, .. }
3045        ));
3046    }
3047
3048    #[tokio::test]
3049    async fn ready_errs_if_process_exits_before_running() {
3050        let proc_id =
3051            test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "early-exit");
3052        let handle = test_handle(proc_id);
3053
3054        // Simulate the exit monitor doing its job directly here.
3055        // (Equivalent outcome: terminal state before Running.)
3056        assert!(handle.mark_stopped(7, Vec::new()));
3057
3058        // `ready()` should return Err with the terminal status.
3059        match handle.ready_inner().await {
3060            Ok(()) => panic!("ready() unexpectedly succeeded"),
3061            Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
3062                assert_eq!(exit_code, 7)
3063            }
3064            Err(other) => panic!("expected Stopped(7), got {other:?}"),
3065        }
3066    }
3067
3068    #[tokio::test]
3069    async fn status_unknown_proc_is_none() {
3070        let manager = BootstrapProcManager::new(BootstrapCommand {
3071            program: PathBuf::from("/bin/true"),
3072            ..Default::default()
3073        })
3074        .unwrap();
3075        let unknown = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "nope");
3076        assert!(manager.status(&unknown).await.is_none());
3077    }
3078
3079    #[tokio::test]
3080    async fn handle_ready_allows_waiters() {
3081        let proc_id = test_proc_id("42");
3082        let handle = test_handle(proc_id.clone());
3083
3084        let started_at = std::time::SystemTime::now();
3085        assert!(handle.mark_running(started_at));
3086
3087        let actor_id = proc_id.actor_id("proc_agent", 0);
3088        let agent_ref: hyperactor_reference::ActorRef<ProcAgent> =
3089            hyperactor_reference::ActorRef::attest(actor_id);
3090
3091        // Pick any addr to carry in Ready (what the child would have
3092        // called back with).
3093        let ready_addr = ChannelAddr::any(ChannelTransport::Unix);
3094
3095        // Stamp Ready and assert ready().await unblocks.
3096        assert!(handle.mark_ready(ready_addr.clone(), agent_ref));
3097        handle
3098            .ready_inner()
3099            .await
3100            .expect("ready_inner() should complete after Ready");
3101
3102        // Sanity-check the Ready fields we control
3103        // (started_at/addr).
3104        match handle.status() {
3105            ProcStatus::Ready {
3106                started_at: t,
3107                addr: a,
3108                ..
3109            } => {
3110                assert_eq!(t, started_at);
3111                assert_eq!(a, ready_addr);
3112            }
3113            other => panic!("expected Ready, got {other:?}"),
3114        }
3115    }
3116
3117    #[test]
3118    fn display_running_includes_uptime() {
3119        let started_at = std::time::SystemTime::now() - Duration::from_secs(42);
3120        let st = ProcStatus::Running { started_at };
3121
3122        let s = format!("{}", st);
3123        assert!(s.contains("Running"));
3124        assert!(s.contains("42s"));
3125    }
3126
3127    #[test]
3128    fn display_ready_includes_addr() {
3129        let started_at = std::time::SystemTime::now() - Duration::from_secs(5);
3130        let addr = ChannelAddr::any(ChannelTransport::Unix);
3131        let agent = hyperactor_reference::ActorRef::attest(
3132            test_proc_id_with_addr(addr.clone(), "proc")
3133                .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
3134        );
3135
3136        let st = ProcStatus::Ready {
3137            started_at,
3138            addr: addr.clone(),
3139            agent,
3140        };
3141
3142        let s = format!("{}", st);
3143        assert!(s.contains(&addr.to_string())); // addr
3144        assert!(s.contains("Ready"));
3145    }
3146
3147    #[test]
3148    fn display_stopped_includes_exit_code() {
3149        let st = ProcStatus::Stopped {
3150            exit_code: 7,
3151            stderr_tail: Vec::new(),
3152        };
3153        let s = format!("{}", st);
3154        assert!(s.contains("Stopped"));
3155        assert!(s.contains("7"));
3156    }
3157
3158    #[test]
3159    fn display_other_variants_does_not_panic() {
3160        let samples = vec![
3161            ProcStatus::Starting,
3162            ProcStatus::Stopping {
3163                started_at: std::time::SystemTime::now(),
3164            },
3165            ProcStatus::Ready {
3166                started_at: std::time::SystemTime::now(),
3167                addr: ChannelAddr::any(ChannelTransport::Unix),
3168                agent: hyperactor_reference::ActorRef::attest(
3169                    test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "x")
3170                        .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
3171                ),
3172            },
3173            ProcStatus::Killed {
3174                signal: 9,
3175                core_dumped: false,
3176            },
3177            ProcStatus::Failed {
3178                reason: "boom".into(),
3179            },
3180        ];
3181
3182        for st in samples {
3183            let _ = format!("{}", st); // Just make sure it doesn't panic.
3184        }
3185    }
3186
3187    #[tokio::test]
3188    async fn proc_handle_ready_ok_through_trait() {
3189        let proc_id =
3190            test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-ready-ok");
3191        let handle = test_handle(proc_id.clone());
3192
3193        // Starting -> Running
3194        let t0 = std::time::SystemTime::now();
3195        assert!(handle.mark_running(t0));
3196
3197        // Synthesize Ready data
3198        let addr = ChannelAddr::any(ChannelTransport::Unix);
3199        let agent: hyperactor_reference::ActorRef<ProcAgent> =
3200            hyperactor_reference::ActorRef::attest(proc_id.actor_id("proc_agent", 0));
3201        assert!(handle.mark_ready(addr, agent));
3202
3203        // Call the trait method (not ready_inner).
3204        let r = <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await;
3205        assert!(r.is_ok(), "expected Ok(()), got {r:?}");
3206    }
3207
3208    #[tokio::test]
3209    async fn proc_handle_wait_returns_terminal_status() {
3210        let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-wait");
3211        let handle = test_handle(proc_id);
3212
3213        // Drive directly to a terminal state before calling wait()
3214        assert!(handle.mark_stopped(0, Vec::new()));
3215
3216        // Call the trait method (not wait_inner)
3217        let st = <BootstrapProcHandle as hyperactor::host::ProcHandle>::wait(&handle)
3218            .await
3219            .expect("wait should return Ok(terminal)");
3220
3221        match st {
3222            ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
3223            other => panic!("expected Stopped(0), got {other:?}"),
3224        }
3225    }
3226
3227    #[tokio::test]
3228    async fn ready_wrapper_maps_terminal_to_trait_error() {
3229        let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "wrap");
3230        let handle = test_handle(proc_id);
3231
3232        assert!(handle.mark_stopped(7, Vec::new()));
3233
3234        match <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await {
3235            Ok(()) => panic!("expected Err"),
3236            Err(hyperactor::host::ReadyError::Terminal(ProcStatus::Stopped {
3237                exit_code, ..
3238            })) => {
3239                assert_eq!(exit_code, 7);
3240            }
3241            Err(e) => panic!("unexpected error: {e:?}"),
3242        }
3243    }
3244
3245    /// Create a ProcId and a host **backend_addr** channel that the
3246    /// bootstrap child proc will dial to attach its mailbox to the
3247    /// host.
3248    ///
3249    /// - `proc_id`: logical identity for the child proc (pure name;
3250    ///   not an OS pid).
3251    /// - `backend_addr`: a mailbox address served by the **parent
3252    ///   (host) proc** here; the spawned bootstrap process dials this
3253    ///   so its messages route via the host.
3254    async fn make_proc_id_and_backend_addr(
3255        instance: &hyperactor::Instance<()>,
3256        _tag: &str,
3257    ) -> (hyperactor_reference::ProcId, ChannelAddr) {
3258        // Serve a Unix channel as the "backend_addr" and hook it into
3259        // this test proc.
3260        let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
3261
3262        // Route messages arriving on backend_addr into this test
3263        // proc's mailbox so the bootstrap child can reach the host
3264        // router.
3265        instance.proc().clone().serve(rx);
3266
3267        // We return an arbitrary (but unbound!) unix direct proc id here;
3268        // it is okay, as we're not testing connectivity.
3269        let proc_id = test_proc_id_with_addr(ChannelTransport::Unix.any(), "proc");
3270        (proc_id, backend_addr)
3271    }
3272
3273    #[tokio::test]
3274    #[cfg(fbcode_build)]
3275    async fn bootstrap_handle_terminate_graceful() {
3276        // Create a root direct-addressed proc + client instance.
3277        let root =
3278            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3279        let (instance, _handle) = root.instance("client").unwrap();
3280
3281        let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3282        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3283        let handle = mgr
3284            .spawn(
3285                proc_id.clone(),
3286                backend_addr.clone(),
3287                BootstrapProcConfig {
3288                    create_rank: 0,
3289                    client_config_override: Attrs::new(),
3290                    proc_bind: None,
3291                    bootstrap_command: None,
3292                },
3293            )
3294            .await
3295            .expect("spawn bootstrap child");
3296
3297        handle.ready().await.expect("ready");
3298
3299        let deadline = Duration::from_secs(2);
3300        match tokio::time::timeout(
3301            deadline * 2,
3302            handle.terminate(&instance, deadline, "test terminate"),
3303        )
3304        .await
3305        {
3306            Err(_) => panic!("terminate() future hung"),
3307            Ok(Ok(st)) => {
3308                match st {
3309                    ProcStatus::Stopped { exit_code, .. } => {
3310                        // child called exit(0) on SIGTERM
3311                        assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3312                    }
3313                    ProcStatus::Killed { signal, .. } => {
3314                        // If the child didn't trap SIGTERM, we'd see
3315                        // SIGTERM (15) here and indeed, this is what
3316                        // we see. Since we call
3317                        // `hyperactor::initialize_with_current_runtime();`
3318                        // we seem unable to trap `SIGTERM` and
3319                        // instead folly intercepts:
3320                        // [0] *** Aborted at 1758850539 (Unix time, try 'date -d @1758850539') ***
3321                        // [0] *** Signal 15 (SIGTERM) (0x3951c00173692) received by PID 1527420 (pthread TID 0x7f803de66cc0) (linux TID 1527420) (maybe from PID 1521298, UID 234780) (code: 0), stack trace: ***
3322                        // [0]     @ 000000000000e713 folly::symbolizer::(anonymous namespace)::innerSignalHandler(int, siginfo_t*, void*)
3323                        // [0]                        ./fbcode/folly/debugging/symbolizer/SignalHandler.cpp:485
3324                        // It gets worse. When run with
3325                        // '@fbcode//mode/dev-nosan' it terminates
3326                        // with a SEGFAULT (metamate says this is a
3327                        // well known issue at Meta). So, TL;DR I
3328                        // restore default `SIGTERM` handling after
3329                        // the test exe has called
3330                        // `initialize_with_runtime`.
3331                        assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3332                    }
3333                    other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3334                }
3335            }
3336            Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3337        }
3338    }
3339
3340    #[tokio::test]
3341    #[cfg(fbcode_build)]
3342    async fn bootstrap_handle_kill_forced() {
3343        // Root proc + client instance (so the child can dial back).
3344        let root =
3345            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3346        let (instance, _handle) = root.instance("client").unwrap();
3347
3348        let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3349
3350        // Proc identity + host backend channel the child will dial.
3351        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3352
3353        // Launch the child bootstrap process.
3354        let handle = mgr
3355            .spawn(
3356                proc_id.clone(),
3357                backend_addr.clone(),
3358                BootstrapProcConfig {
3359                    create_rank: 0,
3360                    client_config_override: Attrs::new(),
3361                    proc_bind: None,
3362                    bootstrap_command: None,
3363                },
3364            )
3365            .await
3366            .expect("spawn bootstrap child");
3367
3368        // Wait until the child reports Ready (addr+agent returned via
3369        // callback).
3370        handle.ready().await.expect("ready");
3371
3372        // Force-kill the child and assert we observe a Killed
3373        // terminal status.
3374        let deadline = Duration::from_secs(5);
3375        match tokio::time::timeout(deadline, handle.kill()).await {
3376            Err(_) => panic!("kill() future hung"),
3377            Ok(Ok(st)) => {
3378                // We expect a KILLED terminal state.
3379                match st {
3380                    ProcStatus::Killed { signal, .. } => {
3381                        // On Linux this should be SIGKILL (9).
3382                        assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3383                    }
3384                    other => panic!("expected Killed status after kill(); got: {other:?}"),
3385                }
3386            }
3387            Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3388        }
3389    }
3390
3391    #[tokio::test]
3392    #[cfg(fbcode_build)]
3393    async fn bootstrap_canonical_simple() {
3394        // SAFETY: unit-test scoped
3395        unsafe {
3396            std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false");
3397        }
3398        // Create an actor instance we'll use to send and receive
3399        // messages.
3400        let instance = testing::instance();
3401
3402        // Configure a ProcessAllocator with the bootstrap binary.
3403        let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3404            "monarch/hyperactor_mesh/bootstrap",
3405        )));
3406        // Request a new allocation of procs from the ProcessAllocator.
3407        let alloc = allocator
3408            .allocate(AllocSpec {
3409                extent: extent!(replicas = 1),
3410                constraints: Default::default(),
3411                proc_name: None,
3412                transport: ChannelTransport::Unix,
3413                proc_allocation_mode: Default::default(),
3414            })
3415            .await
3416            .unwrap();
3417
3418        // Build a HostMesh with explicit OS-process boundaries (per
3419        // rank):
3420        //
3421        // (1) Allocator → bootstrap proc [OS process #1]
3422        //     `ProcMesh::allocate(..)` starts one OS process per
3423        //     rank; each runs our runtime and the trampoline actor.
3424        //
3425        // (2) Host::serve(..) sets up a Host in the same OS process
3426        //     (no new process). It binds front/back channels, creates
3427        //     an in-process service proc (`Proc::configured(..)`), and
3428        //     stores the `BootstrapProcManager` for later spawns.
3429        //
3430        // (3) Install HostAgent (still no new OS process).
3431        //     `host.system_proc().spawn::<HostAgent>("host_agent",
3432        //     host).await?` creates the HostAgent actor in that
3433        //     service proc.
3434        //
3435        // (4) Collect & assemble. The trampoline returns a
3436        //     direct-addressed `ActorRef<HostAgent>`; we collect
3437        //     one per rank and assemble a `HostMesh`.
3438        //
3439        // Note: When the Host is later asked to start a proc
3440        // (`host.spawn(name)`), it calls `ProcManager::spawn` on the
3441        // stored `BootstrapProcManager`, which does a
3442        // `Command::spawn()` to launch a new OS child process for
3443        // that proc.
3444        let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3445            .await
3446            .unwrap();
3447
3448        // Spawn a ProcMesh named "p0" on the host mesh:
3449        //
3450        // (1) Each HostAgent (running inside its host's service
3451        // proc) receives the request.
3452        //
3453        // (2) The Host calls into its `BootstrapProcManager::spawn`,
3454        // which does `Command::spawn()` to launch a brand-new OS
3455        // process for the proc.
3456        //
3457        // (3) Inside that new process, bootstrap runs and a
3458        // `ProcAgent` is started to manage it.
3459        //
3460        // (4) We collect the per-host procs into a `ProcMesh` and
3461        // return it.
3462        let proc_mesh = host_mesh
3463            .spawn(&instance, "p0", Extent::unity(), None)
3464            .await
3465            .unwrap();
3466
3467        // Note: There is no support for status() in v1.
3468        // assert!(proc_mesh.status(&instance).await.is_err());
3469        // let proc_ref = proc_mesh.values().next().expect("one proc");
3470        // assert!(proc_ref.status(&instance).await.unwrap(), "proc should be alive");
3471
3472        // Spawn an `ActorMesh<TestActor>` named "a0" on the proc mesh:
3473        //
3474        // (1) For each proc (already running in its own OS process),
3475        // the `ProcAgent` receives the request.
3476        //
3477        // (2) It spawns a `TestActor` inside that existing proc (no
3478        // new OS process).
3479        //
3480        // (3) The per-proc actors are collected into an
3481        // `ActorMesh<TestActor>` and returned.
3482        let actor_mesh: ActorMesh<testactor::TestActor> =
3483            proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3484
3485        // Open a fresh port on the client instance and send a
3486        // GetActorId message to the actor mesh. Each TestActor will
3487        // reply with its actor ID to the bound port. Receive one
3488        // reply and assert it matches the ID of the (single) actor in
3489        // the mesh.
3490        let (port, mut rx) = instance.mailbox().open_port();
3491        actor_mesh
3492            .cast(&instance, testactor::GetActorId(port.bind()))
3493            .unwrap();
3494        let (got_id, _seq) = rx.recv().await.unwrap();
3495        assert_eq!(
3496            got_id,
3497            actor_mesh.values().next().unwrap().actor_id().clone()
3498        );
3499
3500        // **Important**: If we don't shutdown the hosts, the
3501        // BootstrapProcManager's won't send SIGKILLs to their spawned
3502        // children and there will be orphans left behind.
3503        host_mesh.shutdown(&instance).await.expect("host shutdown");
3504    }
3505
3506    /// Same as `bootstrap_canonical_simple` but using the systemd
3507    /// launcher backend.
3508    #[tokio::test]
3509    #[cfg(all(fbcode_build, target_os = "linux"))]
3510    async fn bootstrap_canonical_simple_systemd_launcher() {
3511        // Create an actor instance we'll use to send and receive
3512        // messages.
3513        let instance = testing::instance();
3514
3515        // Set an absurdly high flush timeout to prove the flush
3516        // completes naturally (host networking stays alive during
3517        // worker teardown) and never relies on the timeout.
3518        let config = hyperactor_config::global::lock();
3519        let _flush_guard = config.override_key(
3520            hyperactor::config::FORWARDER_FLUSH_TIMEOUT,
3521            std::time::Duration::from_secs(600),
3522        );
3523
3524        // Configure a ProcessAllocator with the bootstrap binary.
3525        let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3526            "monarch/hyperactor_mesh/bootstrap",
3527        )));
3528        // Request a new allocation of procs from the
3529        // ProcessAllocator.
3530        let alloc = allocator
3531            .allocate(AllocSpec {
3532                extent: extent!(replicas = 1),
3533                constraints: Default::default(),
3534                proc_name: None,
3535                transport: ChannelTransport::Unix,
3536                proc_allocation_mode: Default::default(),
3537            })
3538            .await
3539            .unwrap();
3540
3541        // Build a HostMesh (see bootstrap_canonical_simple for
3542        // detailed comments).
3543        let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3544            .await
3545            .unwrap();
3546
3547        // Spawn a ProcMesh named "p0" on the host mesh.
3548        let proc_mesh = host_mesh
3549            .spawn(&instance, "p0", Extent::unity(), None)
3550            .await
3551            .unwrap();
3552
3553        // Spawn an `ActorMesh<TestActor>` named "a0" on the proc
3554        // mesh.
3555        let actor_mesh: ActorMesh<testactor::TestActor> =
3556            proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3557
3558        // Send a GetActorId message and verify we get a response.
3559        let (port, mut rx) = instance.mailbox().open_port();
3560        actor_mesh
3561            .cast(&instance, testactor::GetActorId(port.bind()))
3562            .unwrap();
3563        let (got_id, _) = rx.recv().await.unwrap();
3564        assert_eq!(
3565            got_id,
3566            actor_mesh.values().next().unwrap().actor_id().clone()
3567        );
3568
3569        // Capture the proc_id and expected unit name before shutdown.
3570        use crate::proc_launcher::SystemdProcLauncher;
3571        use crate::systemd::SystemdManagerProxy;
3572        use crate::systemd::SystemdUnitProxy;
3573
3574        let proc_id = proc_mesh.proc_ids().next().expect("one proc");
3575        let expected_unit = SystemdProcLauncher::unit_name(&proc_id);
3576
3577        // Shutdown cleanly.
3578        //
3579        // Contract: after shutdown, procs are no longer running and
3580        // the mesh is quiescent. We intentionally do NOT assert that
3581        // the systemd transient unit disappears. With
3582        // RemainAfterExit=true, systemd may keep the unit around for
3583        // observation and GC it later according to its own policy;
3584        // that persistence is not considered a leak.
3585        host_mesh.shutdown(&instance).await.expect("host shutdown");
3586
3587        // Liveness check (best-effort): unit may persist, but should
3588        // not remain running. With RemainAfterExit=true, systemd may
3589        // keep the unit around in active/exited after the process has
3590        // terminated. That is not a leak. We only require that the
3591        // unit is not running after shutdown.
3592        let conn = zbus::Connection::session().await.expect("D-Bus session");
3593        let manager = SystemdManagerProxy::new(&conn)
3594            .await
3595            .expect("manager proxy");
3596
3597        let mut ok = false;
3598        for _ in 0..100 {
3599            match manager.get_unit(&expected_unit).await {
3600                Err(_) => {
3601                    // Unit already gone: fine.
3602                    ok = true;
3603                    break;
3604                }
3605                Ok(path) => {
3606                    if let Ok(unit) = SystemdUnitProxy::builder(&conn)
3607                        .path(path)
3608                        .unwrap()
3609                        .build()
3610                        .await
3611                    {
3612                        let active = unit.active_state().await.unwrap_or_default();
3613                        let sub = unit.sub_state().await.unwrap_or_default();
3614                        // Fail only if still running; any other state
3615                        // is acceptable.
3616                        if !(active == "active" && sub == "running") && active != "activating" {
3617                            ok = true;
3618                            break;
3619                        }
3620                    }
3621                }
3622            }
3623            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3624        }
3625        assert!(
3626            ok,
3627            "unit should be gone or quiescent (not running) after shutdown"
3628        );
3629    }
3630
3631    #[tokio::test]
3632    #[cfg(fbcode_build)]
3633    async fn test_host_bootstrap() {
3634        use crate::host_mesh::host_agent::GetLocalProcClient;
3635        use crate::proc_agent::NewClientInstanceClient;
3636
3637        // Create a local instance just to call the local bootstrap actor.
3638        // We should find a way to avoid this for local handles.
3639        let temp_proc = Proc::local();
3640        let (temp_instance, _) = temp_proc.instance("temp").unwrap();
3641
3642        let handle = host(
3643            ChannelAddr::any(ChannelTransport::Unix),
3644            Some(BootstrapCommand::test()),
3645            None,
3646            false,
3647        )
3648        .await
3649        .unwrap();
3650
3651        let local_proc = handle.0.get_local_proc(&temp_instance).await.unwrap();
3652        let _local_instance = local_proc
3653            .new_client_instance(&temp_instance)
3654            .await
3655            .unwrap();
3656    }
3657
3658    // BootstrapProcManager OnceLock Semantics Tests
3659    //
3660    // These tests verify the "install exactly once / default locks
3661    // in" behavior of the proc launcher OnceLock.
3662
3663    use std::time::Duration;
3664
3665    use crate::proc_launcher::LaunchOptions;
3666    use crate::proc_launcher::LaunchResult;
3667    use crate::proc_launcher::ProcExitKind;
3668    use crate::proc_launcher::ProcExitResult;
3669    use crate::proc_launcher::ProcLauncher;
3670    use crate::proc_launcher::ProcLauncherError;
3671    use crate::proc_launcher::StdioHandling;
3672
3673    /// A dummy proc launcher for testing. Does not actually launch
3674    /// anything.
3675    #[allow(dead_code)]
3676    struct DummyLauncher {
3677        /// Marker value to identify this instance.
3678        marker: u64,
3679    }
3680
3681    impl DummyLauncher {
3682        fn new(marker: u64) -> Self {
3683            Self { marker }
3684        }
3685
3686        #[allow(dead_code)]
3687        fn marker(&self) -> u64 {
3688            self.marker
3689        }
3690    }
3691
3692    #[async_trait::async_trait]
3693    impl ProcLauncher for DummyLauncher {
3694        async fn launch(
3695            &self,
3696            _proc_id: &hyperactor_reference::ProcId,
3697            _opts: LaunchOptions,
3698        ) -> Result<LaunchResult, ProcLauncherError> {
3699            let (tx, rx) = tokio::sync::oneshot::channel();
3700            // Immediately send exit result
3701            let _ = tx.send(ProcExitResult {
3702                kind: ProcExitKind::Exited { code: 0 },
3703                stderr_tail: Some(vec![]),
3704            });
3705            Ok(LaunchResult {
3706                pid: None,
3707                started_at: std::time::SystemTime::now(),
3708                stdio: StdioHandling::ManagedByLauncher,
3709                exit_rx: rx,
3710            })
3711        }
3712
3713        async fn terminate(
3714            &self,
3715            _proc_id: &hyperactor_reference::ProcId,
3716            _timeout: Duration,
3717        ) -> Result<(), ProcLauncherError> {
3718            Ok(())
3719        }
3720
3721        async fn kill(
3722            &self,
3723            _proc_id: &hyperactor_reference::ProcId,
3724        ) -> Result<(), ProcLauncherError> {
3725            Ok(())
3726        }
3727    }
3728
3729    // set_launcher() then launcher() returns the same Arc.
3730    #[test]
3731    #[cfg(fbcode_build)]
3732    fn test_set_launcher_then_get() {
3733        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3734
3735        let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(42));
3736        let custom_ptr = Arc::as_ptr(&custom);
3737
3738        // Install the custom launcher
3739        manager.set_launcher(custom).unwrap();
3740
3741        // Get the launcher and verify it's the same Arc
3742        let got = manager.launcher();
3743        let got_ptr = Arc::as_ptr(got);
3744
3745        assert_eq!(
3746            custom_ptr, got_ptr,
3747            "launcher() should return the same Arc that was set"
3748        );
3749    }
3750
3751    // launcher() first (forces default init), then set_launcher()
3752    // fails.
3753    #[test]
3754    #[cfg(fbcode_build)]
3755    fn test_get_launcher_then_set_fails() {
3756        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3757
3758        // Force default initialization by calling launcher()
3759        let _ = manager.launcher();
3760
3761        // Now try to set a custom launcher - should fail
3762        let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(99));
3763        let result = manager.set_launcher(custom);
3764
3765        assert!(
3766            result.is_err(),
3767            "set_launcher should fail after launcher() was called"
3768        );
3769
3770        // Verify error message mentions the cause
3771        let err = result.unwrap_err();
3772        let err_msg = err.to_string();
3773        assert!(
3774            err_msg.contains("already initialized"),
3775            "error should mention 'already initialized', got: {}",
3776            err_msg
3777        );
3778    }
3779
3780    // set_launcher() twice fails.
3781    #[test]
3782    #[cfg(fbcode_build)]
3783    fn test_set_launcher_twice_fails() {
3784        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3785
3786        let first: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(1));
3787        let second: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(2));
3788
3789        // First set should succeed
3790        manager.set_launcher(first).unwrap();
3791
3792        // Second set should fail
3793        let result = manager.set_launcher(second);
3794        assert!(result.is_err(), "second set_launcher should fail");
3795
3796        // Verify error message
3797        let err = result.unwrap_err();
3798        let err_msg = err.to_string();
3799        assert!(
3800            err_msg.contains("already initialized"),
3801            "error should mention 'already initialized', got: {}",
3802            err_msg
3803        );
3804    }
3805
3806    /// OnceLock is empty before any call.
3807    #[test]
3808    #[cfg(fbcode_build)]
3809    fn test_launcher_initially_empty() {
3810        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3811
3812        // At this point, the OnceLock should be empty (not yet
3813        // initialized) We can verify this by successfully calling
3814        // set_launcher
3815        let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(123));
3816        let result = manager.set_launcher(custom);
3817
3818        assert!(
3819            result.is_ok(),
3820            "set_launcher should succeed on fresh manager"
3821        );
3822    }
3823
3824    /// launcher() returns the same Arc on repeated calls.
3825    #[test]
3826    #[cfg(fbcode_build)]
3827    fn test_launcher_idempotent() {
3828        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3829
3830        // Call launcher() twice
3831        let first = manager.launcher();
3832        let second = manager.launcher();
3833
3834        // Should return the same Arc (same pointer)
3835        assert!(
3836            Arc::ptr_eq(first, second),
3837            "launcher() should return the same Arc on repeated calls"
3838        );
3839    }
3840}