hyperactor_mesh/
bootstrap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! ## Bootstrap invariants (BS-*)
10//!
11//! - **BS-1 (locking):** Do not acquire other locks from inside
12//!   `transition(...)`. The state lock is held for the duration of
13//!   the transition; acquiring another lock risks deadlock.
14
15use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::env::VarError;
18use std::fmt;
19use std::fs::OpenOptions;
20use std::future;
21use std::io;
22use std::io::Write;
23use std::path::Path;
24use std::path::PathBuf;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::Mutex;
28use std::sync::OnceLock;
29use std::sync::Weak;
30use std::time::Duration;
31use std::time::SystemTime;
32
33use async_trait::async_trait;
34use base64::prelude::*;
35use futures::StreamExt;
36use futures::stream;
37use humantime::format_duration;
38use hyperactor::ActorHandle;
39use hyperactor::channel;
40use hyperactor::channel::ChannelAddr;
41use hyperactor::channel::ChannelError;
42use hyperactor::channel::ChannelTransport;
43use hyperactor::channel::Rx;
44use hyperactor::channel::Tx;
45use hyperactor::context;
46use hyperactor::host::Host;
47use hyperactor::host::HostError;
48use hyperactor::host::ProcHandle;
49use hyperactor::host::ProcManager;
50use hyperactor::host::TerminateSummary;
51use hyperactor::mailbox::IntoBoxedMailboxSender;
52use hyperactor::mailbox::MailboxClient;
53use hyperactor::mailbox::MailboxServer;
54use hyperactor::mailbox::MailboxServerHandle;
55use hyperactor::proc::Proc;
56use hyperactor::reference as hyperactor_reference;
57use hyperactor_config::CONFIG;
58use hyperactor_config::ConfigAttr;
59use hyperactor_config::attrs::Attrs;
60use hyperactor_config::attrs::declare_attrs;
61use hyperactor_config::global::override_or_global;
62use serde::Deserialize;
63use serde::Serialize;
64use tempfile::TempDir;
65use tokio::process::Command;
66use tokio::sync::oneshot;
67use tokio::sync::watch;
68use tracing::Instrument;
69use tracing::Level;
70use typeuri::Named;
71
72use crate::config::MESH_PROC_LAUNCHER_KIND;
73use crate::host_mesh::host_agent::HostAgent;
74use crate::host_mesh::host_agent::HostAgentMode;
75use crate::logging::OutputTarget;
76use crate::logging::StreamFwder;
77use crate::proc_agent::ProcAgent;
78use crate::proc_launcher::LaunchOptions;
79use crate::proc_launcher::NativeProcLauncher;
80use crate::proc_launcher::ProcExitKind;
81use crate::proc_launcher::ProcExitResult;
82use crate::proc_launcher::ProcLauncher;
83use crate::proc_launcher::ProcLauncherError;
84use crate::proc_launcher::StdioHandling;
85#[cfg(target_os = "linux")]
86use crate::proc_launcher::SystemdProcLauncher;
87use crate::proc_launcher::format_process_name;
88use crate::resource;
89
90mod mailbox;
91
92declare_attrs! {
93    /// Enable forwarding child stdout/stderr over the mesh log
94    /// channel.
95    ///
96    /// When `true` (default): child stdio is piped; [`StreamFwder`]
97    /// mirrors output to the parent console and forwards bytes to the
98    /// log channel so a `LogForwardActor` can receive them.
99    ///
100    /// When `false`: no channel forwarding occurs. Child stdio may
101    /// still be piped if [`MESH_ENABLE_FILE_CAPTURE`] is `true` or
102    /// [`MESH_TAIL_LOG_LINES`] > 0; otherwise the child inherits the
103    /// parent stdio (no interception).
104    ///
105    /// This flag does not affect console mirroring: child output
106    /// always reaches the parent console—either via inheritance (no
107    /// piping) or via [`StreamFwder`] when piping is active.
108    @meta(CONFIG = ConfigAttr::new(
109        Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
110        Some("enable_log_forwarding".to_string()),
111    ))
112    pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
113
114    /// When `true`: if stdio is piped, each child's `StreamFwder`
115    /// also forwards lines to a host-scoped `FileAppender` managed by
116    /// the `BootstrapProcManager`. That appender creates exactly two
117    /// files per manager instance—one for stdout and one for
118    /// stderr—and **all** child processes' lines are multiplexed into
119    /// those two files. This can be combined with
120    /// [`MESH_ENABLE_LOG_FORWARDING`] ("stream+local").
121    ///
122    /// Notes:
123    /// - The on-disk files are *aggregate*, not per-process.
124    ///   Disambiguation is via the optional rank prefix (see
125    ///   `PREFIX_WITH_RANK`), which `StreamFwder` prepends to lines
126    ///   before writing.
127    /// - On local runs, file capture is suppressed unless
128    ///   `FORCE_FILE_LOG=true`. In that case `StreamFwder` still
129    ///   runs, but the `FileAppender` may be `None` and no files are
130    ///   written.
131    /// - `MESH_TAIL_LOG_LINES` only controls the in-memory rotating
132    ///   buffer used for peeking—independent of file capture.
133    @meta(CONFIG = ConfigAttr::new(
134        Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
135        Some("enable_file_capture".to_string()),
136    ))
137    pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
138
139    /// Maximum number of log lines retained in a proc's stderr/stdout
140    /// tail buffer. Used by [`StreamFwder`] when wiring child
141    /// pipes. Default: 100
142    @meta(CONFIG = ConfigAttr::new(
143        Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
144        Some("tail_log_lines".to_string()),
145    ))
146    pub attr MESH_TAIL_LOG_LINES: usize = 0;
147
148    /// If enabled (default), bootstrap child processes install
149    /// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
150    /// parent dies unexpectedly. This is a **production safety net**
151    /// against leaked children; tests usually disable it via
152    /// `std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG",
153    /// "false")`.
154    @meta(CONFIG = ConfigAttr::new(
155        Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()),
156        Some("mesh_bootstrap_enable_pdeathsig".to_string()),
157    ))
158    pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
159
160    /// Maximum number of child terminations to run concurrently
161    /// during bulk shutdown. Prevents unbounded spawning of
162    /// termination tasks (which could otherwise spike CPU, I/O, or
163    /// file descriptor load).
164    @meta(CONFIG = ConfigAttr::new(
165        Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()),
166        Some("mesh_terminate_concurrency".to_string()),
167    ))
168    pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
169
170    /// Per-child grace window for termination. When a shutdown is
171    /// requested, the manager sends SIGTERM and waits this long for
172    /// the child to exit before escalating to SIGKILL.
173    @meta(CONFIG = ConfigAttr::new(
174        Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()),
175        Some("mesh_terminate_timeout".to_string()),
176    ))
177    pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
178}
179
180pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
181pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
182pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
183
184/// A channel used by each process to receive its own stdout and stderr
185/// Because stdout and stderr can only be obtained by the parent process,
186/// they need to be streamed back to the process.
187pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
188
189pub(crate) const BOOTSTRAP_MODE_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_MODE";
190pub(crate) const PROCESS_NAME_ENV: &str = "HYPERACTOR_PROCESS_NAME";
191
192/// Messages sent from the process to the allocator. This is an envelope
193/// containing the index of the process (i.e., its "address" assigned by
194/// the allocator), along with the control message in question.
195#[derive(Debug, Clone, Serialize, Deserialize, Named)]
196pub(crate) struct Process2Allocator(pub usize, pub Process2AllocatorMessage);
197wirevalue::register_type!(Process2Allocator);
198
199/// Control messages sent from processes to the allocator.
200#[derive(Debug, Clone, Serialize, Deserialize, Named)]
201pub(crate) enum Process2AllocatorMessage {
202    /// Initialize a process2allocator session. The process is
203    /// listening on the provided channel address, to which
204    /// [`Allocator2Process`] messages are sent.
205    Hello(ChannelAddr),
206
207    /// A proc with the provided ID was started. Its mailbox is
208    /// served at the provided channel address. Procs are started
209    /// after instruction by the allocator through the corresponding
210    /// [`Allocator2Process`] message.
211    StartedProc(
212        hyperactor_reference::ProcId,
213        hyperactor_reference::ActorRef<ProcAgent>,
214        ChannelAddr,
215    ),
216
217    Heartbeat,
218}
219wirevalue::register_type!(Process2AllocatorMessage);
220
221/// Messages sent from the allocator to a process.
222#[derive(Debug, Clone, Serialize, Deserialize, Named)]
223pub(crate) enum Allocator2Process {
224    /// Request to start a new proc with the provided ID, listening
225    /// to an address on the indicated channel transport.
226    StartProc(hyperactor_reference::ProcId, ChannelTransport),
227
228    /// A request for the process to shut down its procs and exit the
229    /// process with the provided code.
230    StopAndExit(i32),
231
232    /// A request for the process to immediately exit with the provided
233    /// exit code
234    Exit(i32),
235}
236wirevalue::register_type!(Allocator2Process);
237
238async fn exit_if_missed_heartbeat(bootstrap_index: usize, bootstrap_addr: ChannelAddr) {
239    let tx = match channel::dial(bootstrap_addr.clone()) {
240        Ok(tx) => tx,
241
242        Err(err) => {
243            tracing::error!(
244                "Failed to establish heartbeat connection to allocator, exiting! (addr: {:?}): {}",
245                bootstrap_addr,
246                err
247            );
248            std::process::exit(1);
249        }
250    };
251    tracing::info!(
252        "Heartbeat connection established to allocator (idx: {bootstrap_index}, addr: {bootstrap_addr:?})",
253    );
254    loop {
255        tokio::time::sleep(Duration::from_secs(5)).await;
256
257        let result = tx
258            .send(Process2Allocator(
259                bootstrap_index,
260                Process2AllocatorMessage::Heartbeat,
261            ))
262            .await;
263
264        if let Err(err) = result {
265            tracing::error!(
266                "Heartbeat failed to allocator, exiting! (addr: {:?}): {}",
267                bootstrap_addr,
268                err
269            );
270            std::process::exit(1);
271        }
272    }
273}
274
275#[macro_export]
276macro_rules! ok {
277    ($expr:expr $(,)?) => {
278        match $expr {
279            Ok(value) => value,
280            Err(e) => return ::anyhow::Error::from(e),
281        }
282    };
283}
284
285async fn halt<R>() -> R {
286    future::pending::<()>().await;
287    unreachable!()
288}
289
290/// A handle that waits for a host to finish shutting down.
291///
292/// Obtained from [`host`]. Awaiting [`HostShutdownHandle::join`] blocks until
293/// the [`ShutdownHost`] handler sends back the mailbox server handle, drains
294/// it, and (if `exit_on_shutdown`) calls `process::exit`.
295pub struct HostShutdownHandle {
296    rx: tokio::sync::oneshot::Receiver<MailboxServerHandle>,
297    exit_on_shutdown: bool,
298}
299
300impl HostShutdownHandle {
301    /// Wait for the host to finish shutting down, drain its mailbox server,
302    /// and optionally exit the process.
303    pub async fn join(self) {
304        match self.rx.await {
305            Ok(mailbox_handle) => {
306                mailbox_handle.stop("host shutting down");
307                let _ = mailbox_handle.await;
308            }
309            Err(_) => {} // sender dropped without sending — nothing to drain
310        }
311        if self.exit_on_shutdown {
312            std::process::exit(0);
313        }
314    }
315}
316
317/// Bootstrap a host in this process, returning a handle to the mesh agent.
318///
319/// To obtain the local proc, use `GetLocalProc` on the returned host mesh agent,
320/// then use `GetProc` on the returned proc mesh agent.
321///
322/// - `addr`: the listening address of the host; this is used to bind the frontend address;
323/// - `command`: optional bootstrap command to spawn procs, otherwise [`BootstrapProcManager::current`];
324/// - `config`: optional runtime config overlay.
325/// - `exit_on_shutdown`: if true, [`HostShutdownHandle::join`] will call `process::exit` after draining.
326pub async fn host(
327    addr: ChannelAddr,
328    command: Option<BootstrapCommand>,
329    config: Option<Attrs>,
330    exit_on_shutdown: bool,
331) -> anyhow::Result<(ActorHandle<HostAgent>, HostShutdownHandle)> {
332    if let Some(attrs) = config {
333        hyperactor_config::global::set(hyperactor_config::global::Source::Runtime, attrs);
334        tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
335    } else {
336        tracing::debug!("bootstrap: no config snapshot provided (Host)");
337    }
338
339    let command = match command {
340        Some(command) => command,
341        None => BootstrapCommand::current()?,
342    };
343    let manager = BootstrapProcManager::new(command)?;
344
345    let host = Host::new(manager, addr).await?;
346    let addr = host.addr().clone();
347
348    // The ShutdownHost handler will call host.serve() inside HostAgent::init
349    // (after this.bind::<Self>(), so the actor port is bound before the
350    // frontend starts routing messages), then send the resulting
351    // MailboxServerHandle back here for draining.
352    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<MailboxServerHandle>();
353
354    let system_proc = host.system_proc().clone();
355    let host_mesh_agent = system_proc.spawn::<HostAgent>(
356        "host_agent",
357        HostAgent::new(HostAgentMode::Process {
358            host,
359            shutdown_tx: Some(shutdown_tx),
360        }),
361    )?;
362
363    tracing::info!(
364        "serving host at {}, agent: {}",
365        addr,
366        host_mesh_agent.bind::<HostAgent>()
367    );
368
369    Ok((
370        host_mesh_agent,
371        HostShutdownHandle {
372            rx: shutdown_rx,
373            exit_on_shutdown,
374        },
375    ))
376}
377
378/// Bootstrap configures how a mesh process starts up.
379///
380/// Both `Proc` and `Host` variants may include an optional
381/// configuration snapshot (`hyperactor_config::Attrs`). This
382/// snapshot is serialized into the bootstrap payload and made
383/// available to the child. Interpretation and application of that
384/// snapshot is up to the child process; if omitted, the child falls
385/// back to environment/default values.
386#[derive(Clone, Debug, Serialize, Deserialize)]
387pub enum Bootstrap {
388    /// Bootstrap as a "v1" proc
389    Proc {
390        /// The ProcId of the proc to be bootstrapped.
391        proc_id: hyperactor_reference::ProcId,
392        /// The backend address to which messages are forwarded.
393        /// See [`hyperactor::host`] for channel topology details.
394        backend_addr: ChannelAddr,
395        /// The callback address used to indicate successful spawning.
396        callback_addr: ChannelAddr,
397        /// Directory for storing proc socket files. Procs place their sockets
398        /// in this directory, so that they can be looked up by other procs
399        /// for direct transfer.
400        socket_dir_path: PathBuf,
401        /// Optional config snapshot (`hyperactor_config::Attrs`)
402        /// captured by the parent. If present, the child installs it
403        /// as the `ClientOverride` layer so the parent's effective config
404        /// takes precedence over Defaults.
405        config: Option<Attrs>,
406    },
407
408    /// Bootstrap as a "v1" host bootstrap. This sets up a new `Host`,
409    /// managed by a [`crate::host_mesh::host_agent::HostAgent`].
410    Host {
411        /// The address on which to serve the host.
412        addr: ChannelAddr,
413        /// If specified, use the provided command instead of
414        /// [`BootstrapCommand::current`].
415        command: Option<BootstrapCommand>,
416        /// Optional config snapshot (`hyperactor_config::Attrs`)
417        /// captured by the parent. If present, the child installs it
418        /// as the `ClientOverride` layer so the parent's effective config
419        /// takes precedence over Defaults.
420        config: Option<Attrs>,
421        /// If true, exit the process after handling a shutdown request.
422        exit_on_shutdown: bool,
423    },
424
425    /// Bootstrap as a legacy "v0" proc.
426    V0ProcMesh {
427        /// Optional config snapshot (`hyperactor_config::Attrs`)
428        /// captured by the parent. If present, the child installs it
429        /// as the `ClientOverride` layer so the parent's effective config
430        /// takes precedence over Env/Defaults.
431        config: Option<Attrs>,
432    },
433}
434
435impl Default for Bootstrap {
436    fn default() -> Self {
437        Bootstrap::V0ProcMesh { config: None }
438    }
439}
440
441impl Bootstrap {
442    /// Serialize the mode into a environment-variable-safe string by
443    /// base64-encoding its JSON representation.
444    #[allow(clippy::result_large_err)]
445    pub(crate) fn to_env_safe_string(&self) -> crate::Result<String> {
446        Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
447    }
448
449    /// Deserialize the mode from the representation returned by [`to_env_safe_string`].
450    #[allow(clippy::result_large_err)]
451    pub(crate) fn from_env_safe_string(str: &str) -> crate::Result<Self> {
452        let data = BASE64_STANDARD.decode(str)?;
453        let data = std::str::from_utf8(&data)?;
454        Ok(serde_json::from_str(data)?)
455    }
456
457    /// Get a bootstrap configuration from the environment; returns `None`
458    /// if the environment does not specify a boostrap config.
459    pub fn get_from_env() -> anyhow::Result<Option<Self>> {
460        match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
461            Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
462                Ok(mode) => Ok(Some(mode)),
463                Err(e) => {
464                    Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
465                }
466            },
467            Err(VarError::NotPresent) => Ok(None),
468            Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
469        }
470    }
471
472    /// Inject this bootstrap configuration into the environment of the provided command.
473    pub fn to_env(&self, cmd: &mut Command) {
474        cmd.env(
475            "HYPERACTOR_MESH_BOOTSTRAP_MODE",
476            self.to_env_safe_string().unwrap(),
477        );
478    }
479
480    /// Bootstrap this binary according to this configuration.
481    /// This runs until all processes are ready to exit, or returns an error.
482    /// The Ok value is the exit code that should be used.
483    pub async fn bootstrap(self) -> anyhow::Result<i32> {
484        tracing::info!(
485            "bootstrapping mesh process: {}",
486            serde_json::to_string(&self).unwrap()
487        );
488
489        if Debug::is_active() {
490            let mut buf = Vec::new();
491            writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
492            #[cfg(unix)]
493            writeln!(
494                &mut buf,
495                "\tparent pid: {}",
496                std::os::unix::process::parent_id()
497            )
498            .unwrap();
499            writeln!(
500                &mut buf,
501                "\tconfig: {}",
502                serde_json::to_string(&self).unwrap()
503            )
504            .unwrap();
505            match std::env::current_exe() {
506                Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
507                Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
508            }
509            writeln!(&mut buf, "\targs:").unwrap();
510            for arg in std::env::args() {
511                writeln!(&mut buf, "\t\t{}", arg).unwrap();
512            }
513            writeln!(&mut buf, "\tenv:").unwrap();
514            for (key, val) in std::env::vars() {
515                writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
516            }
517            let _ = Debug.write(&buf);
518            if let Ok(s) = std::str::from_utf8(&buf) {
519                tracing::info!("{}", s);
520            } else {
521                tracing::info!("{:?}", buf);
522            }
523        }
524
525        match self {
526            Bootstrap::Proc {
527                proc_id,
528                backend_addr,
529                callback_addr,
530                socket_dir_path,
531                config,
532            } => {
533                let entered = tracing::span!(
534                    Level::INFO,
535                    "proc_bootstrap",
536                    %proc_id,
537                    %backend_addr,
538                    %callback_addr,
539                    socket_dir_path = %socket_dir_path.display(),
540                )
541                .entered();
542                if let Some(attrs) = config {
543                    hyperactor_config::global::set(
544                        hyperactor_config::global::Source::ClientOverride,
545                        attrs,
546                    );
547                    tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)");
548                } else {
549                    tracing::debug!("bootstrap: no config snapshot provided (Proc)");
550                }
551
552                if hyperactor_config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
553                    // Safety net: normal shutdown is via
554                    // `host_mesh.shutdown(&instance)`; PR_SET_PDEATHSIG
555                    // is a last-resort guard against leaks if that
556                    // protocol is bypassed.
557                    let _ = install_pdeathsig_kill();
558                } else {
559                    eprintln!("(bootstrap) PDEATHSIG disabled via config");
560                }
561
562                let (local_addr, name) = (proc_id.addr().clone(), proc_id.name());
563                // TODO provide a direct way to construct these
564                let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
565                let serve_addr = serve_addr.parse().unwrap();
566
567                // The following is a modified host::spawn_proc to support direct
568                // dialing between local procs: 1) we bind each proc to a deterministic
569                // address in socket_dir_path; 2) we use LocalProcDialer to dial these
570                // addresses for local procs.
571                let proc_sender = mailbox::LocalProcDialer::new(
572                    local_addr.clone(),
573                    socket_dir_path,
574                    MailboxClient::dial(backend_addr)?,
575                );
576
577                let proc = Proc::configured(proc_id.clone(), proc_sender.into_boxed());
578
579                let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<i32>();
580                let agent_handle = ProcAgent::boot_v1(proc.clone(), Some(shutdown_tx))
581                    .map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?;
582
583                let span = entered.exit();
584
585                // Finally serve the proc on the same transport as the backend address,
586                // and call back.
587                let (proc_addr, proc_rx) = channel::serve(serve_addr)?;
588                let mailbox_handle = proc.clone().serve(proc_rx);
589                channel::dial(callback_addr)?
590                    .send((proc_addr, agent_handle.bind::<ProcAgent>()))
591                    .instrument(span)
592                    .await
593                    .map_err(ChannelError::from)?;
594
595                // Wait for the StopAll handler to signal the exit code, then
596                // gracefully stop the mailbox server before exiting.
597                let exit_code = shutdown_rx.await.unwrap_or(1);
598                mailbox_handle.stop("process shutting down");
599                let _ = mailbox_handle.await;
600                tracing::info!("bootstrap shutting down with exit code {}", exit_code);
601                // Don't exit the proc, return Ok so the parent function can decide
602                // how to stop.
603                Ok(exit_code)
604            }
605            Bootstrap::Host {
606                addr,
607                command,
608                config,
609                exit_on_shutdown,
610            } => {
611                let (_agent_handle, shutdown) =
612                    host(addr, command, config, exit_on_shutdown).await?;
613                shutdown.join().await;
614                halt().await
615            }
616            Bootstrap::V0ProcMesh { config } => Err(bootstrap_v0_proc_mesh(config).await),
617        }
618    }
619
620    /// A variant of [`bootstrap`] that logs the error and exits the process
621    /// if bootstrapping fails.
622    pub async fn bootstrap_or_die(self) -> ! {
623        let exit_code = match self.bootstrap().await {
624            Ok(exit_code) => exit_code,
625            Err(err) => {
626                tracing::error!("failed to bootstrap mesh process: {}", err);
627                1
628            }
629        };
630        std::process::exit(exit_code);
631    }
632}
633
634/// Install "kill me if parent dies" and close the race window.
635pub fn install_pdeathsig_kill() -> io::Result<()> {
636    #[cfg(target_os = "linux")]
637    {
638        // SAFETY: `getppid()` is a simple libc syscall returning the
639        // parent PID; it has no side effects and does not touch memory.
640        let ppid_before = unsafe { libc::getppid() };
641
642        // SAFETY: Calling into libc; does not dereference memory, just
643        // asks the kernel to deliver SIGKILL on parent death.
644        let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
645        if rc != 0 {
646            return Err(io::Error::last_os_error());
647        }
648
649        // Race-close: if the parent died between our exec and prctl(),
650        // we won't get a signal, so detect that and exit now.
651        //
652        // If the parent PID changed, the parent has died and we've been
653        // reparented. Note: We cannot assume ppid == 1 means the parent
654        // died, as in container environments (e.g., Kubernetes) the parent
655        // may legitimately run as PID 1.
656        // SAFETY: `getppid()` is a simple libc syscall returning the
657        // parent PID; it has no side effects and does not touch memory.
658        let ppid_after = unsafe { libc::getppid() };
659        if ppid_before != ppid_after {
660            std::process::exit(0);
661        }
662    }
663    Ok(())
664}
665
666/// Represents the lifecycle state of a **proc as hosted in an OS
667/// process** managed by `BootstrapProcManager`.
668///
669/// Note: This type is deliberately distinct from [`ProcState`] and
670/// [`ProcStopReason`] (see `alloc.rs`). Those types model allocator
671/// *events* - e.g. "a proc was Created/Running/Stopped" - and are
672/// consumed from an event stream during allocation. By contrast,
673/// [`ProcStatus`] is a **live, queryable view**: it reflects the
674/// current observed status of a running proc, as seen through the
675/// [`BootstrapProcHandle`] API (stop, kill, status).
676///
677/// In short:
678/// - `ProcState`/`ProcStopReason`: historical / event-driven model
679/// - `ProcStatus`: immediate status surface for lifecycle control
680#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
681pub enum ProcStatus {
682    /// The OS process has been spawned but is not yet fully running.
683    /// (Process-level: child handle exists, no confirmation yet.)
684    Starting,
685    /// The OS process is alive and considered running.
686    /// (Proc-level: bootstrap may still be running.)
687    Running { started_at: SystemTime },
688    /// Ready means bootstrap has completed and the proc is serving.
689    /// (Proc-level: bootstrap completed.)
690    Ready {
691        started_at: SystemTime,
692        addr: ChannelAddr,
693        agent: hyperactor_reference::ActorRef<ProcAgent>,
694    },
695    /// A stop has been requested (SIGTERM, graceful shutdown, etc.),
696    /// but the OS process has not yet fully exited. (Proc-level:
697    /// shutdown in progress; Process-level: still running.)
698    Stopping { started_at: SystemTime },
699    /// The process exited with a normal exit code. (Process-level:
700    /// exit observed.)
701    Stopped {
702        exit_code: i32,
703        stderr_tail: Vec<String>,
704    },
705    /// The process was killed by a signal (e.g. SIGKILL).
706    /// (Process-level: abnormal termination.)
707    Killed { signal: i32, core_dumped: bool },
708    /// The proc or its process failed for some other reason
709    /// (bootstrap error, unexpected condition, etc.). (Both levels:
710    /// catch-all failure.)
711    Failed { reason: String },
712}
713
714impl ProcStatus {
715    /// Returns `true` if the proc is in a terminal (exited) state:
716    /// [`ProcStatus::Stopped`], [`ProcStatus::Killed`], or
717    /// [`ProcStatus::Failed`].
718    #[inline]
719    pub fn is_exit(&self) -> bool {
720        matches!(
721            self,
722            ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
723        )
724    }
725}
726
727impl std::fmt::Display for ProcStatus {
728    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
729        match self {
730            ProcStatus::Starting => write!(f, "Starting"),
731            ProcStatus::Running { started_at } => {
732                let uptime = started_at
733                    .elapsed()
734                    .map(|d| format!(" up {}", format_duration(d)))
735                    .unwrap_or_default();
736                write!(f, "Running{uptime}")
737            }
738            ProcStatus::Ready {
739                started_at, addr, ..
740            } => {
741                let uptime = started_at
742                    .elapsed()
743                    .map(|d| format!(" up {}", format_duration(d)))
744                    .unwrap_or_default();
745                write!(f, "Ready at {addr}{uptime}")
746            }
747            ProcStatus::Stopping { started_at } => {
748                let uptime = started_at
749                    .elapsed()
750                    .map(|d| format!(" up {}", format_duration(d)))
751                    .unwrap_or_default();
752                write!(f, "Stopping{uptime}")
753            }
754            ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
755            ProcStatus::Killed {
756                signal,
757                core_dumped,
758            } => {
759                if *core_dumped {
760                    write!(f, "Killed(sig={signal}, core)")
761                } else {
762                    write!(f, "Killed(sig={signal})")
763                }
764            }
765            ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
766        }
767    }
768}
769
770/// Error returned by [`BootstrapProcHandle::ready`].
771#[derive(Debug, Clone)]
772pub enum ReadyError {
773    /// The proc reached a terminal state before `Ready`.
774    Terminal(ProcStatus),
775    /// The internal watch channel closed unexpectedly.
776    ChannelClosed,
777}
778
779impl std::fmt::Display for ReadyError {
780    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
781        match self {
782            ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
783            ReadyError::ChannelClosed => write!(f, "status channel closed"),
784        }
785    }
786}
787impl std::error::Error for ReadyError {}
788
789/// A handle to a proc launched by [`BootstrapProcManager`].
790///
791/// `BootstrapProcHandle` is a lightweight supervisor for an external
792/// process: it tracks and broadcasts lifecycle state, and exposes a
793/// small control/observation surface. While it may temporarily hold a
794/// `tokio::process::Child` (shared behind a mutex) so the exit
795/// monitor can `wait()` it, it is **not** the unique owner of the OS
796/// process, and dropping a `BootstrapProcHandle` does not by itself
797/// terminate the process.
798///
799/// What it pairs together:
800/// - the **logical proc identity** (`ProcId`)
801/// - the **live status surface** ([`ProcStatus`]), available both as
802///   a synchronous snapshot (`status()`) and as an async stream via a
803///   `tokio::sync::watch` channel (`watch()` / `changed()`)
804///
805/// Responsibilities:
806/// - Retain the child handle only until the exit monitor claims it,
807///   so the OS process can be awaited and its terminal status
808///   recorded.
809/// - Hold stdout/stderr tailers until the exit monitor takes them,
810///   then join to recover buffered output for diagnostics.
811/// - Update status via the `mark_*` transitions and broadcast changes
812///   over the watch channel so tasks can `await` lifecycle
813///   transitions without polling.
814/// - Provide the foundation for higher-level APIs like `wait()`
815///   (await terminal) and, later, `terminate()` / `kill()`.
816///
817/// Notes:
818/// - Manager-level cleanup happens in [`BootstrapProcManager::drop`]:
819///   it SIGKILLs any still-recorded PIDs; we do not rely on
820///   `Child::kill_on_drop`.
821///
822/// Relationship to types:
823/// - [`ProcStatus`]: live status surface, updated by this handle.
824/// - [`ProcState`]/[`ProcStopReason`] (in `alloc.rs`):
825///   allocator-facing, historical event log; not directly updated by
826///   this type.
827#[derive(Clone)]
828pub struct BootstrapProcHandle {
829    /// Logical identity of the proc in the mesh.
830    proc_id: hyperactor_reference::ProcId,
831
832    /// Live lifecycle snapshot (see [`ProcStatus`]). Kept in a mutex
833    /// so [`BootstrapProcHandle::status`] can return a synchronous
834    /// copy. All mutations now flow through
835    /// [`BootstrapProcHandle::transition`], which updates this field
836    /// under the lock and then broadcasts on the watch channel.
837    status: Arc<std::sync::Mutex<ProcStatus>>,
838
839    /// Launcher used to terminate/kill the proc. The launcher owns
840    /// the actual OS child handle and PID tracking.
841    ///
842    /// We hold a `Weak` reference so that when `BootstrapProcManager`
843    /// drops, the launcher's `Arc` refcount reaches zero and its `Drop`
844    /// runs, cleaning up any remaining child processes. If the manager
845    /// is gone when we try to terminate/kill, we treat it as a no-op
846    /// (the proc is being killed by the launcher's Drop anyway).
847    launcher: Weak<dyn ProcLauncher>,
848
849    /// Stdout monitor for this proc. Created with `StreamFwder::start`, it
850    /// forwards output to a log channel and keeps a bounded ring buffer.
851    /// Transferred to the exit monitor, which joins it after `wait()`
852    /// to recover buffered lines.
853    stdout_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
854
855    /// Stderr monitor for this proc. Same behavior as `stdout_fwder`
856    /// but for stderr (used for exit-reason enrichment).
857    stderr_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
858
859    /// Watch sender for status transitions. Every `mark_*` goes
860    /// through [`BootstrapProcHandle::transition`], which updates the
861    /// snapshot under the lock and then `send`s the new
862    /// [`ProcStatus`].
863    tx: tokio::sync::watch::Sender<ProcStatus>,
864
865    /// Watch receiver seed. `watch()` clones this so callers can
866    /// `borrow()` the current status and `changed().await` future
867    /// transitions independently.
868    rx: tokio::sync::watch::Receiver<ProcStatus>,
869}
870
871impl fmt::Debug for BootstrapProcHandle {
872    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873        let status = self.status.lock().expect("status mutex poisoned").clone();
874        f.debug_struct("BootstrapProcHandle")
875            .field("proc_id", &self.proc_id)
876            .field("status", &status)
877            .field("launcher", &"<dyn ProcLauncher>")
878            .field("tx", &"<watch::Sender>")
879            .field("rx", &"<watch::Receiver>")
880            // Intentionally skip stdout_tailer / stderr_tailer (not
881            // Debug).
882            .finish()
883    }
884}
885
886// See BS-1 in module doc.
887impl BootstrapProcHandle {
888    /// Construct a new [`BootstrapProcHandle`] for a freshly spawned
889    /// OS process hosting a proc.
890    ///
891    /// - Initializes the status to [`ProcStatus::Starting`] since the
892    ///   child process has been created but not yet confirmed running.
893    /// - Stores the launcher reference for terminate/kill delegation.
894    ///
895    /// This is the canonical entry point used by
896    /// `BootstrapProcManager` when it launches a proc into a new
897    /// process.
898    pub(crate) fn new(
899        proc_id: hyperactor_reference::ProcId,
900        launcher: Weak<dyn ProcLauncher>,
901    ) -> Self {
902        let (tx, rx) = watch::channel(ProcStatus::Starting);
903        Self {
904            proc_id,
905            status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
906            launcher,
907            stdout_fwder: Arc::new(std::sync::Mutex::new(None)),
908            stderr_fwder: Arc::new(std::sync::Mutex::new(None)),
909            tx,
910            rx,
911        }
912    }
913
914    /// Return the logical proc identity in the mesh.
915    #[inline]
916    pub fn proc_id(&self) -> &hyperactor_reference::ProcId {
917        &self.proc_id
918    }
919
920    /// Create a new subscription to this proc's status stream.
921    ///
922    /// Each call returns a fresh [`watch::Receiver`] tied to this
923    /// handle's internal [`ProcStatus`] channel. The receiver can be
924    /// awaited on (`rx.changed().await`) to observe lifecycle
925    /// transitions as they occur.
926    ///
927    /// Notes:
928    /// - Multiple subscribers can exist simultaneously; each sees
929    ///   every status update in order.
930    /// - Use [`BootstrapProcHandle::status`] for a one-off snapshot;
931    ///   use `watch()` when you need to await changes over time.
932    #[inline]
933    pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
934        self.rx.clone()
935    }
936
937    /// Wait until this proc's status changes.
938    ///
939    /// This is a convenience wrapper around
940    /// [`watch::Receiver::changed`]: it subscribes internally via
941    /// [`BootstrapProcHandle::watch`] and awaits the next transition.
942    /// If no subscribers exist or the channel is closed, this returns
943    /// without error.
944    ///
945    /// Typical usage:
946    /// ```ignore
947    /// handle.changed().await;
948    /// match handle.status() {
949    ///     ProcStatus::Running { .. } => { /* now running */ }
950    ///     ProcStatus::Stopped { .. } => { /* exited */ }
951    ///     _ => {}
952    /// }
953    /// ```
954    #[inline]
955    pub async fn changed(&self) {
956        let _ = self.watch().changed().await;
957    }
958
959    /// Return a snapshot of the current [`ProcStatus`] for this proc.
960    ///
961    /// This is a *live view* of the lifecycle state as tracked by
962    /// [`BootstrapProcManager`]. It reflects what is currently known
963    /// about the underlying OS process (e.g., `Starting`, `Running`,
964    /// `Stopping`, etc.).
965    ///
966    /// Internally this reads the mutex-guarded status. Use this when
967    /// you just need a synchronous snapshot; use
968    /// [`BootstrapProcHandle::watch`] or
969    /// [`BootstrapProcHandle::changed`] if you want to await
970    /// transitions asynchronously.
971    #[must_use]
972    pub fn status(&self) -> ProcStatus {
973        // Source of truth for now is the mutex. We broadcast via
974        // `watch` in `transition`, but callers that want a
975        // synchronous snapshot should read the guarded value.
976        self.status.lock().expect("status mutex poisoned").clone()
977    }
978
979    /// Atomically apply a state transition while holding the status
980    /// lock, and send the updated value on the watch channel **while
981    /// still holding the lock**. This guarantees the mutex state and
982    /// the broadcast value stay in sync and avoids reordering between
983    /// concurrent transitions.
984    fn transition<F>(&self, f: F) -> bool
985    where
986        F: FnOnce(&mut ProcStatus) -> bool,
987    {
988        let mut guard = self.status.lock().expect("status mutex poisoned");
989        let _before = guard.clone();
990        let changed = f(&mut guard);
991        if changed {
992            // Publish while still holding the lock to preserve order.
993            let _ = self.tx.send(guard.clone());
994        }
995        changed
996    }
997
998    /// Transition this proc into the [`ProcStatus::Running`] state.
999    ///
1000    /// Called internally once the child OS process has been spawned.
1001    /// Records the `started_at` timestamp so that callers can query it
1002    /// later via [`BootstrapProcHandle::status`].
1003    ///
1004    /// This is a best-effort marker: it reflects that the process
1005    /// exists at the OS level, but does not guarantee that the proc
1006    /// has completed bootstrap or is fully ready.
1007    pub(crate) fn mark_running(&self, started_at: SystemTime) -> bool {
1008        self.transition(|st| match *st {
1009            ProcStatus::Starting => {
1010                *st = ProcStatus::Running { started_at };
1011                true
1012            }
1013            _ => {
1014                tracing::warn!(
1015                    "illegal transition: {:?} -> Running; leaving status unchanged",
1016                    *st
1017                );
1018                false
1019            }
1020        })
1021    }
1022
1023    /// Attempt to transition this proc into the [`ProcStatus::Ready`]
1024    /// state.
1025    ///
1026    /// This records the listening address and agent once the proc has
1027    /// successfully started and is ready to serve. The `started_at`
1028    /// timestamp is derived from the current `Running` state.
1029    ///
1030    /// Returns `true` if the transition succeeded (from `Starting` or
1031    /// `Running`), or `false` if the current state did not allow
1032    /// moving to `Ready`. In the latter case the state is left
1033    /// unchanged and a warning is logged.
1034    pub(crate) fn mark_ready(
1035        &self,
1036        addr: ChannelAddr,
1037        agent: hyperactor_reference::ActorRef<ProcAgent>,
1038    ) -> bool {
1039        tracing::info!(proc_id = %self.proc_id, %addr, "{} ready at {}", self.proc_id, addr);
1040        self.transition(|st| match st {
1041            ProcStatus::Starting => {
1042                // Unexpected: we should be Running before Ready, but
1043                // handle gracefully with current time.
1044                *st = ProcStatus::Ready {
1045                    started_at: std::time::SystemTime::now(),
1046                    addr,
1047                    agent,
1048                };
1049                true
1050            }
1051            ProcStatus::Running { started_at } => {
1052                let started_at = *started_at;
1053                *st = ProcStatus::Ready {
1054                    started_at,
1055                    addr,
1056                    agent,
1057                };
1058                true
1059            }
1060            _ => {
1061                tracing::warn!(
1062                    "illegal transition: {:?} -> Ready; leaving status unchanged",
1063                    st
1064                );
1065                false
1066            }
1067        })
1068    }
1069
1070    /// Record that a stop has been requested for the proc (e.g. a
1071    /// graceful shutdown via SIGTERM), but the underlying process has
1072    /// not yet fully exited.
1073    pub(crate) fn mark_stopping(&self) -> bool {
1074        let now = std::time::SystemTime::now();
1075
1076        self.transition(|st| match *st {
1077            ProcStatus::Running { started_at } => {
1078                *st = ProcStatus::Stopping { started_at };
1079                true
1080            }
1081            ProcStatus::Ready { started_at, .. } => {
1082                *st = ProcStatus::Stopping { started_at };
1083                true
1084            }
1085            ProcStatus::Starting => {
1086                *st = ProcStatus::Stopping { started_at: now };
1087                true
1088            }
1089            _ => false,
1090        })
1091    }
1092
1093    /// Record that the process has exited normally with the given
1094    /// exit code.
1095    pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
1096        self.transition(|st| match *st {
1097            ProcStatus::Starting
1098            | ProcStatus::Running { .. }
1099            | ProcStatus::Ready { .. }
1100            | ProcStatus::Stopping { .. } => {
1101                *st = ProcStatus::Stopped {
1102                    exit_code,
1103                    stderr_tail,
1104                };
1105                true
1106            }
1107            _ => {
1108                tracing::warn!(
1109                    "illegal transition: {:?} -> Stopped; leaving status unchanged",
1110                    *st
1111                );
1112                false
1113            }
1114        })
1115    }
1116
1117    /// Record that the process was killed by the given signal (e.g.
1118    /// SIGKILL, SIGTERM).
1119    pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
1120        self.transition(|st| match *st {
1121            ProcStatus::Starting
1122            | ProcStatus::Running { .. }
1123            | ProcStatus::Ready { .. }
1124            | ProcStatus::Stopping { .. } => {
1125                *st = ProcStatus::Killed {
1126                    signal,
1127                    core_dumped,
1128                };
1129                true
1130            }
1131            _ => {
1132                tracing::warn!(
1133                    "illegal transition: {:?} -> Killed; leaving status unchanged",
1134                    *st
1135                );
1136                false
1137            }
1138        })
1139    }
1140
1141    /// Record that the proc or its process failed for an unexpected
1142    /// reason (bootstrap error, spawn failure, etc.).
1143    pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
1144        self.transition(|st| match *st {
1145            ProcStatus::Starting
1146            | ProcStatus::Running { .. }
1147            | ProcStatus::Ready { .. }
1148            | ProcStatus::Stopping { .. } => {
1149                *st = ProcStatus::Failed {
1150                    reason: reason.into(),
1151                };
1152                true
1153            }
1154            _ => {
1155                tracing::warn!(
1156                    "illegal transition: {:?} -> Failed; leaving status unchanged",
1157                    *st
1158                );
1159                false
1160            }
1161        })
1162    }
1163
1164    /// Wait until the proc has reached a terminal state and return
1165    /// it.
1166    ///
1167    /// Terminal means [`ProcStatus::Stopped`],
1168    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]. If the
1169    /// current status is already terminal, returns immediately.
1170    ///
1171    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
1172    /// owner of the OS process, so you can call `wait()` from
1173    /// multiple tasks concurrently.
1174    ///
1175    /// Implementation detail: listens on this handle's `watch`
1176    /// channel. It snapshots the current status, and if not terminal
1177    /// awaits the next change. If the channel closes unexpectedly,
1178    /// returns the last observed status.
1179    ///
1180    /// Mirrors `tokio::process::Child::wait()`, but yields the
1181    /// higher-level [`ProcStatus`] instead of an `ExitStatus`.
1182    #[must_use]
1183    pub async fn wait_inner(&self) -> ProcStatus {
1184        let mut rx = self.watch();
1185        loop {
1186            let st = rx.borrow().clone();
1187            if st.is_exit() {
1188                return st;
1189            }
1190            // If the channel closes, return the last observed value.
1191            if rx.changed().await.is_err() {
1192                return st;
1193            }
1194        }
1195    }
1196
1197    /// Wait until the proc reaches the [`ProcStatus::Ready`] state.
1198    ///
1199    /// If the proc hits a terminal state ([`ProcStatus::Stopped`],
1200    /// [`ProcStatus::Killed`], or [`ProcStatus::Failed`]) before ever
1201    /// becoming `Ready`, this returns
1202    /// `Err(ReadyError::Terminal(status))`. If the internal watch
1203    /// channel closes unexpectedly, this returns
1204    /// `Err(ReadyError::ChannelClosed)`. Otherwise it returns
1205    /// `Ok(())` when `Ready` is first observed.
1206    ///
1207    /// Non-consuming: `BootstrapProcHandle` is a supervisor, not the
1208    /// owner; multiple tasks may await `ready()` concurrently.
1209    /// `Stopping` is not treated as terminal here; we continue
1210    /// waiting until `Ready` or a terminal state is seen.
1211    ///
1212    /// Companion to [`BootstrapProcHandle::wait_inner`]:
1213    /// `wait_inner()` resolves on exit; `ready_inner()` resolves on
1214    /// startup.
1215    pub async fn ready_inner(&self) -> Result<(), ReadyError> {
1216        let mut rx = self.watch();
1217        loop {
1218            let st = rx.borrow().clone();
1219            match &st {
1220                ProcStatus::Ready { .. } => return Ok(()),
1221                s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1222                _non_terminal => {
1223                    if rx.changed().await.is_err() {
1224                        return Err(ReadyError::ChannelClosed);
1225                    }
1226                }
1227            }
1228        }
1229    }
1230
1231    pub fn set_stream_monitors(&self, out: Option<StreamFwder>, err: Option<StreamFwder>) {
1232        *self
1233            .stdout_fwder
1234            .lock()
1235            .expect("stdout_tailer mutex poisoned") = out;
1236        *self
1237            .stderr_fwder
1238            .lock()
1239            .expect("stderr_tailer mutex poisoned") = err;
1240    }
1241
1242    fn take_stream_monitors(&self) -> (Option<StreamFwder>, Option<StreamFwder>) {
1243        let out = self
1244            .stdout_fwder
1245            .lock()
1246            .expect("stdout_tailer mutex poisoned")
1247            .take();
1248        let err = self
1249            .stderr_fwder
1250            .lock()
1251            .expect("stderr_tailer mutex poisoned")
1252            .take();
1253        (out, err)
1254    }
1255
1256    /// Wait for the proc to exit, escalating to launcher terminate/kill
1257    /// if it doesn't exit within `timeout`.
1258    ///
1259    /// This is a fire-and-forget helper: it assumes a stop signal has
1260    /// already been sent. It waits for exit, then escalates through
1261    /// terminate and kill if needed.
1262    pub(crate) async fn wait_or_brutally_kill(&self, timeout: Duration) {
1263        match tokio::time::timeout(timeout, self.wait_inner()).await {
1264            Ok(st) if st.is_exit() => return,
1265            _ => {}
1266        }
1267
1268        let _ = self.mark_stopping();
1269
1270        if let Some(launcher) = self.launcher.upgrade() {
1271            if let Err(e) = launcher.terminate(&self.proc_id, timeout).await {
1272                tracing::warn!(
1273                    proc_id = %self.proc_id,
1274                    error = %e,
1275                    "wait_or_brutally_kill: launcher terminate failed, trying kill"
1276                );
1277                let _ = launcher.kill(&self.proc_id).await;
1278            }
1279        }
1280
1281        let _ = self.wait_inner().await;
1282    }
1283
1284    /// Sends a StopAll message to the ProcAgent, which should exit the process.
1285    /// Waits for the successful state change of the process. If the process
1286    /// doesn't reach a terminal state, returns Err.
1287    async fn send_stop_all(
1288        &self,
1289        cx: &impl context::Actor,
1290        agent: hyperactor_reference::ActorRef<ProcAgent>,
1291        timeout: Duration,
1292        reason: &str,
1293    ) -> anyhow::Result<ProcStatus> {
1294        // For all of the messages and replies in this function:
1295        // if the proc is already dead, then the message will be undeliverable,
1296        // which should be ignored.
1297        // If this message isn't deliverable to the agent, the process may have
1298        // stopped already. No need to produce any errors, just continue with
1299        // killing the process.
1300        let mut agent_port = agent.port();
1301        agent_port.return_undeliverable(false);
1302        agent_port.send(
1303            cx,
1304            resource::StopAll {
1305                reason: reason.to_string(),
1306            },
1307        )?;
1308        // The agent handling Stop should exit the process, if it doesn't within
1309        // the time window, we escalate to SIGTERM.
1310        match tokio::time::timeout(timeout, self.wait()).await {
1311            Ok(Ok(st)) => Ok(st),
1312            Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1313            Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
1314        }
1315    }
1316}
1317
1318#[async_trait]
1319impl hyperactor::host::ProcHandle for BootstrapProcHandle {
1320    type Agent = ProcAgent;
1321    type TerminalStatus = ProcStatus;
1322
1323    #[inline]
1324    fn proc_id(&self) -> &hyperactor_reference::ProcId {
1325        &self.proc_id
1326    }
1327
1328    #[inline]
1329    fn addr(&self) -> Option<ChannelAddr> {
1330        match &*self.status.lock().expect("status mutex poisoned") {
1331            ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1332            _ => None,
1333        }
1334    }
1335
1336    #[inline]
1337    fn agent_ref(&self) -> Option<hyperactor_reference::ActorRef<Self::Agent>> {
1338        match &*self.status.lock().expect("status mutex poisoned") {
1339            ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1340            _ => None,
1341        }
1342    }
1343
1344    /// Wait until this proc first reaches the [`ProcStatus::Ready`]
1345    /// state.
1346    ///
1347    /// Returns `Ok(())` once `Ready` is observed.
1348    ///
1349    /// If the proc transitions directly to a terminal state before
1350    /// becoming `Ready`, returns `Err(ReadyError::Terminal(status))`.
1351    ///
1352    /// If the internal status watch closes unexpectedly before
1353    /// `Ready` is observed, returns `Err(ReadyError::ChannelClosed)`.
1354    async fn ready(&self) -> Result<(), hyperactor::host::ReadyError<Self::TerminalStatus>> {
1355        match self.ready_inner().await {
1356            Ok(()) => Ok(()),
1357            Err(ReadyError::Terminal(status)) => {
1358                Err(hyperactor::host::ReadyError::Terminal(status))
1359            }
1360            Err(ReadyError::ChannelClosed) => Err(hyperactor::host::ReadyError::ChannelClosed),
1361        }
1362    }
1363
1364    /// Wait until this proc reaches a terminal [`ProcStatus`].
1365    ///
1366    /// Returns `Ok(status)` when a terminal state is observed
1367    /// (`Stopped`, `Killed`, or `Failed`).
1368    ///
1369    /// If the internal status watch closes before any terminal state
1370    /// is seen, returns `Err(WaitError::ChannelClosed)`.
1371    async fn wait(&self) -> Result<Self::TerminalStatus, hyperactor::host::WaitError> {
1372        let status = self.wait_inner().await;
1373        if status.is_exit() {
1374            Ok(status)
1375        } else {
1376            Err(hyperactor::host::WaitError::ChannelClosed)
1377        }
1378    }
1379
1380    /// Attempt to terminate the underlying OS process.
1381    ///
1382    /// This drives **process-level** teardown only:
1383    /// - First attempts graceful shutdown via `ProcAgent` if available.
1384    /// - If that fails or times out, delegates to the launcher's
1385    ///   `terminate()` method, which handles SIGTERM/SIGKILL escalation.
1386    ///
1387    /// If the process was already in a terminal state when called,
1388    /// returns [`TerminateError::AlreadyTerminated`].
1389    ///
1390    /// # Parameters
1391    /// - `timeout`: Grace period to wait after graceful shutdown before
1392    ///   escalating.
1393    /// - `reason`: Human-readable reason for termination.
1394    ///
1395    /// # Returns
1396    /// - `Ok(ProcStatus)` if the process exited during the
1397    ///   termination sequence.
1398    /// - `Err(TerminateError)` if already exited, signaling failed,
1399    ///   or the channel was lost.
1400    async fn terminate(
1401        &self,
1402        cx: &impl context::Actor,
1403        timeout: Duration,
1404        reason: &str,
1405    ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1406        // If already terminal, return that.
1407        let st0 = self.status();
1408        if st0.is_exit() {
1409            tracing::debug!(?st0, "terminate(): already terminal");
1410            return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1411        }
1412
1413        // Before signaling, try to close actors normally. Only works if
1414        // they are in the Ready state and have an Agent we can message.
1415        let agent = self.agent_ref();
1416        if let Some(agent) = agent {
1417            match self.send_stop_all(cx, agent.clone(), timeout, reason).await {
1418                Ok(st) => return Ok(st),
1419                Err(e) => {
1420                    // Variety of possible errors, proceed with launcher termination.
1421                    tracing::warn!(
1422                        "ProcAgent {} could not successfully stop all actors: {}",
1423                        agent.actor_id(),
1424                        e,
1425                    );
1426                }
1427            }
1428        }
1429
1430        // Mark "Stopping" (ok if state races).
1431        let _ = self.mark_stopping();
1432
1433        // Delegate to launcher for SIGTERM/SIGKILL escalation.
1434        tracing::info!(proc_id = %self.proc_id, ?timeout, "terminate(): delegating to launcher");
1435        if let Some(launcher) = self.launcher.upgrade() {
1436            if let Err(e) = launcher.terminate(&self.proc_id, timeout).await {
1437                tracing::warn!(proc_id = %self.proc_id, error=%e, "terminate(): launcher termination failed");
1438                return Err(hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1439                    "launcher termination failed: {}",
1440                    e
1441                )));
1442            }
1443        } else {
1444            // Launcher dropped - its Drop is killing all procs anyway.
1445            tracing::debug!(proc_id = %self.proc_id, "terminate(): launcher gone, proc cleanup in progress");
1446        }
1447
1448        // Wait for the exit monitor to observe terminal state.
1449        let st = self.wait_inner().await;
1450        if st.is_exit() {
1451            tracing::info!(proc_id = %self.proc_id, ?st, "terminate(): exited");
1452            Ok(st)
1453        } else {
1454            Err(hyperactor::host::TerminateError::ChannelClosed)
1455        }
1456    }
1457
1458    /// Forcibly kill the underlying OS process.
1459    ///
1460    /// This bypasses any graceful shutdown semantics and immediately
1461    /// delegates to the launcher's `kill()` method. It is intended as
1462    /// a last-resort termination mechanism when `terminate()` fails or
1463    /// when no grace period is desired.
1464    ///
1465    /// # Behavior
1466    /// - If the process was already in a terminal state, returns
1467    ///   [`TerminateError::AlreadyTerminated`].
1468    /// - Otherwise delegates to the launcher's `kill()` method.
1469    /// - Then waits for the exit monitor to observe a terminal state.
1470    ///
1471    /// # Returns
1472    /// - `Ok(ProcStatus)` if the process exited after kill.
1473    /// - `Err(TerminateError)` if already exited, signaling failed,
1474    ///   or the channel was lost.
1475    async fn kill(
1476        &self,
1477    ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1478        // If already terminal, return that.
1479        let st0 = self.status();
1480        if st0.is_exit() {
1481            return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1482        }
1483
1484        // Delegate to launcher for kill.
1485        tracing::info!(proc_id = %self.proc_id, "kill(): delegating to launcher");
1486        if let Some(launcher) = self.launcher.upgrade() {
1487            if let Err(e) = launcher.kill(&self.proc_id).await {
1488                tracing::warn!(proc_id = %self.proc_id, error=%e, "kill(): launcher kill failed");
1489                return Err(hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1490                    "launcher kill failed: {}",
1491                    e
1492                )));
1493            }
1494        } else {
1495            // Launcher dropped - its Drop is killing all procs anyway.
1496            tracing::debug!(proc_id = %self.proc_id, "kill(): launcher gone, proc cleanup in progress");
1497        }
1498
1499        // Wait for exit monitor to record terminal status.
1500        let st = self.wait_inner().await;
1501        if st.is_exit() {
1502            Ok(st)
1503        } else {
1504            Err(hyperactor::host::TerminateError::ChannelClosed)
1505        }
1506    }
1507}
1508
1509/// A specification of the command used to bootstrap procs.
1510#[derive(Debug, Named, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1511pub struct BootstrapCommand {
1512    pub program: PathBuf,
1513    pub arg0: Option<String>,
1514    pub args: Vec<String>,
1515    pub env: HashMap<String, String>,
1516}
1517wirevalue::register_type!(BootstrapCommand);
1518
1519impl BootstrapCommand {
1520    /// Creates a bootstrap command specification to replicate the
1521    /// invocation of the currently running process.
1522    pub fn current() -> io::Result<Self> {
1523        let mut args: VecDeque<String> = std::env::args().collect();
1524        let arg0 = args.pop_front();
1525
1526        Ok(Self {
1527            program: std::env::current_exe()?,
1528            arg0,
1529            args: args.into(),
1530            env: std::env::vars().collect(),
1531        })
1532    }
1533
1534    /// Create a new `Command` reflecting this bootstrap command
1535    /// configuration.
1536    pub fn new(&self) -> Command {
1537        let mut cmd = Command::new(&self.program);
1538        if let Some(arg0) = &self.arg0 {
1539            cmd.arg0(arg0);
1540        }
1541        for arg in &self.args {
1542            cmd.arg(arg);
1543        }
1544        for (k, v) in &self.env {
1545            cmd.env(k, v);
1546        }
1547        cmd
1548    }
1549
1550    /// Bootstrap command used for testing, invoking the Buck-built
1551    /// `monarch/hyperactor_mesh/bootstrap` binary.
1552    ///
1553    /// Intended for integration tests where we need to spawn real
1554    /// bootstrap processes under proc manager control. Not available
1555    /// outside of test builds.
1556    #[cfg(test)]
1557    #[cfg(fbcode_build)]
1558    pub(crate) fn test() -> Self {
1559        Self {
1560            program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1561            arg0: None,
1562            args: vec![],
1563            env: HashMap::new(),
1564        }
1565    }
1566}
1567
1568impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1569    /// Creates a bootstrap command from the provided path.
1570    fn from(s: T) -> Self {
1571        Self {
1572            program: s.into(),
1573            arg0: None,
1574            args: vec![],
1575            env: HashMap::new(),
1576        }
1577    }
1578}
1579
1580/// Selects which built-in process launcher backend to use for
1581/// spawning procs.
1582///
1583/// This is an internal "implementation choice" control for the
1584/// `ProcLauncher` abstraction: both variants are expected to satisfy
1585/// the same lifecycle contract (launch, observe exit,
1586/// terminate/kill), but they differ in *how* the OS process is
1587/// supervised.
1588///
1589/// Variants:
1590/// - [`LauncherKind::Native`]: spawns and supervises child processes
1591///   directly using `tokio::process` (traditional parent/child
1592///   model).
1593/// - [`LauncherKind::Systemd`]: delegates supervision to `systemd
1594///   --user` by creating transient `.service` units and observing
1595///   lifecycle via D-Bus.
1596///
1597/// Configuration/parsing:
1598/// - The empty string and `"native"` map to [`LauncherKind::Native`]
1599///   (default).
1600/// - `"systemd"` maps to [`LauncherKind::Systemd`].
1601/// - Any other value is rejected as [`io::ErrorKind::InvalidInput`].
1602#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1603pub(crate) enum LauncherKind {
1604    /// Spawn and supervise OS children directly (tokio-based
1605    /// launcher).
1606    Native,
1607    /// Spawn via transient `systemd --user` units and observe via
1608    /// D-Bus.
1609    #[cfg(target_os = "linux")]
1610    Systemd,
1611}
1612
1613impl FromStr for LauncherKind {
1614    type Err = io::Error;
1615
1616    /// Parse a launcher kind from configuration text.
1617    ///
1618    /// Accepted values (case-insensitive, surrounding whitespace
1619    /// ignored):
1620    /// - `""` or `"native"` → [`LauncherKind::Native`]
1621    /// - `"systemd"` → [`LauncherKind::Systemd`] (Linux only)
1622    ///
1623    /// Returns [`io::ErrorKind::InvalidInput`] for any other string.
1624    fn from_str(s: &str) -> Result<Self, Self::Err> {
1625        match s.trim().to_ascii_lowercase().as_str() {
1626            "" | "native" => Ok(Self::Native),
1627            #[cfg(target_os = "linux")]
1628            "systemd" => Ok(Self::Systemd),
1629            other => Err(io::Error::new(
1630                io::ErrorKind::InvalidInput,
1631                format!(
1632                    "unknown proc launcher kind {other:?}; expected 'native'{}",
1633                    if cfg!(target_os = "linux") {
1634                        " or 'systemd'"
1635                    } else {
1636                        ""
1637                    }
1638                ),
1639            )),
1640        }
1641    }
1642}
1643
1644/// Host-side manager for launching and supervising **bootstrap
1645/// processes** (via the `bootstrap` entry point).
1646///
1647/// `BootstrapProcManager` is responsible for:
1648/// - choosing and constructing the configured [`ProcLauncher`]
1649///   backend,
1650/// - preparing the bootstrap command/environment for each proc,
1651/// - tracking proc lifecycle state via [`BootstrapProcHandle`] /
1652///   [`ProcStatus`],
1653/// - providing status/query APIs over the set of active procs.
1654///
1655/// It maintains an async registry mapping [`ProcId`] →
1656/// [`BootstrapProcHandle`] for lifecycle queries and exit
1657/// observation.
1658///
1659/// ## Stdio and cleanup
1660///
1661/// Stdio handling and shutdown/cleanup behavior are
1662/// **launcher-dependent**:
1663/// - The native launcher may capture/tail stdout/stderr and manages
1664///   OS child processes directly.
1665/// - The systemd launcher delegates supervision to systemd transient
1666///   units on the user manager and does not expose a PID; stdio is
1667///   managed by systemd.
1668///
1669/// On drop/shutdown, process cleanup is *best-effort* and performed
1670/// via the selected launcher (e.g. direct child termination for
1671/// native, `StopUnit` for systemd).
1672pub struct BootstrapProcManager {
1673    /// The process launcher backend. Initialized on first use via
1674    /// `launcher()`, or explicitly via `set_launcher()`.
1675    launcher: OnceLock<Arc<dyn ProcLauncher>>,
1676
1677    /// The command specification used to bootstrap new processes.
1678    command: BootstrapCommand,
1679
1680    /// Async registry of running children, keyed by [`ProcId`]. Holds
1681    /// [`BootstrapProcHandle`]s so callers can query or monitor
1682    /// status.
1683    children: Arc<tokio::sync::Mutex<HashMap<hyperactor_reference::ProcId, BootstrapProcHandle>>>,
1684
1685    /// FileMonitor that aggregates logs from all children. None if
1686    /// file monitor creation failed.
1687    file_appender: Option<Arc<crate::logging::FileAppender>>,
1688
1689    /// Directory for storing proc socket files. Procs place their
1690    /// sockets in this directory, so that they can be looked up by
1691    /// other procs for direct transfer.
1692    socket_dir: TempDir,
1693}
1694
1695impl BootstrapProcManager {
1696    /// Construct a new [`BootstrapProcManager`] that will launch
1697    /// procs using the given bootstrap command specification.
1698    ///
1699    /// This is the general entry point when you want to manage procs
1700    /// backed by a specific binary path (e.g. a bootstrap
1701    /// trampoline).
1702    pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1703        let file_appender = if hyperactor_config::global::get(MESH_ENABLE_FILE_CAPTURE) {
1704            match crate::logging::FileAppender::new() {
1705                Some(fm) => {
1706                    tracing::info!("file appender created successfully");
1707                    Some(Arc::new(fm))
1708                }
1709                None => {
1710                    tracing::warn!("failed to create file appender");
1711                    None
1712                }
1713            }
1714        } else {
1715            None
1716        };
1717
1718        Ok(Self {
1719            launcher: OnceLock::new(),
1720            command,
1721            children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1722            file_appender,
1723            socket_dir: runtime_dir()?,
1724        })
1725    }
1726
1727    /// Install a custom launcher.
1728    ///
1729    /// Returns error if already initialized (by prior
1730    /// `set_launcher()` OR by a spawn that triggered default init via
1731    /// `launcher()`).
1732    ///
1733    /// Must be called before any spawn operation that would
1734    /// initialize the default launcher.
1735    pub fn set_launcher(&self, launcher: Arc<dyn ProcLauncher>) -> Result<(), ProcLauncherError> {
1736        self.launcher.set(launcher).map_err(|_| {
1737            ProcLauncherError::Other(
1738                "launcher already initialized; call set_proc_launcher before first spawn".into(),
1739            )
1740        })
1741    }
1742
1743    /// Get the launcher, initializing with the default if not already
1744    /// set.
1745    ///
1746    /// Once this is called, any subsequent calls to `set_launcher()`
1747    /// will fail.
1748    pub fn launcher(&self) -> &Arc<dyn ProcLauncher> {
1749        self.launcher.get_or_init(|| {
1750            let kind_str = hyperactor_config::global::get_cloned(MESH_PROC_LAUNCHER_KIND);
1751            let kind: LauncherKind = kind_str.parse().unwrap_or(LauncherKind::Native);
1752            tracing::info!(kind = ?kind, config_value = %kind_str, "using default proc launcher");
1753            match kind {
1754                LauncherKind::Native => Arc::new(NativeProcLauncher::new()),
1755                #[cfg(target_os = "linux")]
1756                LauncherKind::Systemd => Arc::new(SystemdProcLauncher::new()),
1757            }
1758        })
1759    }
1760
1761    /// The bootstrap command used to launch processes.
1762    pub fn command(&self) -> &BootstrapCommand {
1763        &self.command
1764    }
1765
1766    /// The socket directory, where per-proc Unix sockets are placed.
1767    pub fn socket_dir(&self) -> &Path {
1768        self.socket_dir.path()
1769    }
1770
1771    /// Return the current [`ProcStatus`] for the given [`ProcId`], if
1772    /// the proc is known to this manager.
1773    ///
1774    /// This queries the live [`BootstrapProcHandle`] stored in the
1775    /// manager's internal map. It provides an immediate snapshot of
1776    /// lifecycle state (`Starting`, `Running`, `Stopping`, `Stopped`,
1777    /// etc.).
1778    ///
1779    /// Returns `None` if the manager has no record of the proc (e.g.
1780    /// never spawned here, or entry already removed).
1781    pub async fn status(&self, proc_id: &hyperactor_reference::ProcId) -> Option<ProcStatus> {
1782        self.children.lock().await.get(proc_id).map(|h| h.status())
1783    }
1784
1785    /// Non-blocking stop: send `StopAll`, then spawn a background task
1786    /// that waits for exit and escalates if needed.
1787    ///
1788    /// The handle stays in `children` so that [`status()`] continues
1789    /// to reflect the live lifecycle (Stopping -> Stopped/Killed/Failed).
1790    /// Idempotent: no-ops if the proc is already stopping or exited.
1791    pub(crate) async fn request_stop(
1792        &self,
1793        cx: &impl context::Actor,
1794        proc: &hyperactor_reference::ProcId,
1795        timeout: Duration,
1796        reason: &str,
1797    ) {
1798        let handle = {
1799            let guard = self.children.lock().await;
1800            guard.get(proc).cloned()
1801        };
1802
1803        let Some(handle) = handle else { return };
1804
1805        let status = handle.status();
1806        if status.is_exit() || matches!(status, ProcStatus::Stopping { .. }) {
1807            return;
1808        }
1809
1810        if let Some(agent) = handle.agent_ref() {
1811            let mut agent_port = agent.port();
1812            agent_port.return_undeliverable(false);
1813            let _ = agent_port.send(
1814                cx,
1815                resource::StopAll {
1816                    reason: reason.to_string(),
1817                },
1818            );
1819        }
1820
1821        let _ = handle.mark_stopping();
1822        tokio::spawn(async move {
1823            handle.wait_or_brutally_kill(timeout).await;
1824        });
1825    }
1826
1827    fn spawn_exit_monitor(
1828        &self,
1829        proc_id: hyperactor_reference::ProcId,
1830        handle: BootstrapProcHandle,
1831        exit_rx: tokio::sync::oneshot::Receiver<ProcExitResult>,
1832    ) {
1833        tokio::spawn(async move {
1834            // Wait for the launcher to report terminal status.
1835            let exit_result = match exit_rx.await {
1836                Ok(res) => res,
1837                Err(_) => {
1838                    // exit_rx sender was dropped without sending - launcher error.
1839                    let _ = handle.mark_failed("exit_rx sender dropped unexpectedly");
1840                    tracing::error!(
1841                        name = "ProcStatus",
1842                        status = "Exited::ChannelDropped",
1843                        %proc_id,
1844                        "exit channel closed without result"
1845                    );
1846                    return;
1847                }
1848            };
1849
1850            // Collect stderr tail from StreamFwder if we captured stdio.
1851            // The launcher may also provide stderr_tail in exit_result;
1852            // prefer StreamFwder's tail if available (more complete).
1853            let mut stderr_tail: Vec<String> = Vec::new();
1854            let (stdout_mon, stderr_mon) = handle.take_stream_monitors();
1855
1856            if let Some(t) = stderr_mon {
1857                let (lines, _bytes) = t.abort().await;
1858                stderr_tail = lines;
1859            }
1860            if let Some(t) = stdout_mon {
1861                let (_lines, _bytes) = t.abort().await;
1862            }
1863
1864            // Fall back to launcher-provided tail if we didn't capture.
1865            if stderr_tail.is_empty() {
1866                if let Some(tail) = exit_result.stderr_tail {
1867                    stderr_tail = tail;
1868                }
1869            }
1870
1871            let tail_str = if stderr_tail.is_empty() {
1872                None
1873            } else {
1874                Some(stderr_tail.join("\n"))
1875            };
1876
1877            match exit_result.kind {
1878                ProcExitKind::Exited { code } => {
1879                    let _ = handle.mark_stopped(code, stderr_tail);
1880                    tracing::info!(
1881                        name = "ProcStatus",
1882                        status = "Exited::ExitWithCode",
1883                        %proc_id,
1884                        exit_code = code,
1885                        tail = tail_str,
1886                        "proc exited with code {code}"
1887                    );
1888                }
1889                ProcExitKind::Signaled {
1890                    signal,
1891                    core_dumped,
1892                } => {
1893                    let _ = handle.mark_killed(signal, core_dumped);
1894                    tracing::info!(
1895                        name = "ProcStatus",
1896                        status = "Exited::KilledBySignal",
1897                        %proc_id,
1898                        tail = tail_str,
1899                        "killed by signal {signal}"
1900                    );
1901                }
1902                ProcExitKind::Failed { reason } => {
1903                    let _ = handle.mark_failed(&reason);
1904                    tracing::info!(
1905                        name = "ProcStatus",
1906                        status = "Exited::Failed",
1907                        %proc_id,
1908                        tail = tail_str,
1909                        "proc failed: {reason}"
1910                    );
1911                }
1912            }
1913        });
1914    }
1915}
1916
1917/// The configuration used for bootstrapped procs.
1918pub struct BootstrapProcConfig {
1919    /// The proc's create rank.
1920    pub create_rank: usize,
1921
1922    /// Config values to set on the spawned proc's global config,
1923    /// at the `ClientOverride` layer.
1924    pub client_config_override: Attrs,
1925}
1926
1927#[async_trait]
1928impl ProcManager for BootstrapProcManager {
1929    type Handle = BootstrapProcHandle;
1930
1931    type Config = BootstrapProcConfig;
1932
1933    /// Return the [`ChannelTransport`] used by this proc manager.
1934    ///
1935    /// For `BootstrapProcManager` this is always
1936    /// [`ChannelTransport::Unix`], since all procs are spawned
1937    /// locally on the same host and communicate over Unix domain
1938    /// sockets.
1939    fn transport(&self) -> ChannelTransport {
1940        ChannelTransport::Unix
1941    }
1942
1943    /// Launch a new proc under this [`BootstrapProcManager`].
1944    ///
1945    /// Spawns the configured bootstrap binary (`self.program`) in a
1946    /// fresh child process. The environment is populated with
1947    /// variables that describe the bootstrap context — most
1948    /// importantly `HYPERACTOR_MESH_BOOTSTRAP_MODE`, which carries a
1949    /// base64-encoded JSON [`Bootstrap::Proc`] payload (proc id,
1950    /// backend addr, callback addr, optional config snapshot).
1951    /// Additional variables like `BOOTSTRAP_LOG_CHANNEL` are also set
1952    /// up for logging and control.
1953    ///
1954    /// Responsibilities performed here:
1955    /// - Create a one-shot callback channel so the child can confirm
1956    ///   successful bootstrap and return its mailbox address plus agent
1957    ///   reference.
1958    /// - Spawn the OS process with stdout/stderr piped.
1959    /// - Stamp the new [`BootstrapProcHandle`] as
1960    ///   [`ProcStatus::Running`] once a PID is observed.
1961    /// - Wire stdout/stderr pipes into local writers and forward them
1962    ///   over the logging channel (`BOOTSTRAP_LOG_CHANNEL`).
1963    /// - Insert the handle into the manager's children map and start
1964    ///   an exit monitor to track process termination.
1965    ///
1966    /// Returns a [`BootstrapProcHandle`] that exposes the child
1967    /// process's lifecycle (status, wait/ready, termination). Errors
1968    /// are surfaced as [`HostError`].
1969    #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string()))]
1970    async fn spawn(
1971        &self,
1972        proc_id: hyperactor_reference::ProcId,
1973        backend_addr: ChannelAddr,
1974        config: BootstrapProcConfig,
1975    ) -> Result<Self::Handle, HostError> {
1976        let (callback_addr, mut callback_rx) =
1977            channel::serve::<(ChannelAddr, hyperactor_reference::ActorRef<ProcAgent>)>(
1978                ChannelAddr::any(ChannelTransport::Unix),
1979            )?;
1980
1981        // Decide whether we need to capture stdio.
1982        let overrides = &config.client_config_override;
1983        let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
1984        let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
1985        let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
1986        let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1987
1988        let mode = Bootstrap::Proc {
1989            proc_id: proc_id.clone(),
1990            backend_addr,
1991            callback_addr,
1992            socket_dir_path: self.socket_dir.path().to_owned(),
1993            config: Some(config.client_config_override.clone()),
1994        };
1995
1996        // Build LaunchOptions for the launcher.
1997        let bootstrap_payload = mode
1998            .to_env_safe_string()
1999            .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?;
2000
2001        let opts = LaunchOptions {
2002            bootstrap_payload,
2003            process_name: format_process_name(&proc_id),
2004            command: self.command.clone(),
2005            want_stdio: need_stdio,
2006            tail_lines: tail_size,
2007            log_channel: if enable_forwarding {
2008                Some(ChannelAddr::any(ChannelTransport::Unix))
2009            } else {
2010                None
2011            },
2012        };
2013
2014        // Launch via the configured launcher backend.
2015        let launch_result = self
2016            .launcher()
2017            .launch(&proc_id, opts.clone())
2018            .await
2019            .map_err(|e| {
2020                let io_err = match e {
2021                    ProcLauncherError::Launch(io_err) => io_err,
2022                    other => std::io::Error::other(other.to_string()),
2023                };
2024                HostError::ProcessSpawnFailure(proc_id.clone(), io_err)
2025            })?;
2026
2027        // Wire up StreamFwders if stdio was captured.
2028        let (out_fwder, err_fwder) = match launch_result.stdio {
2029            StdioHandling::Captured { stdout, stderr } => {
2030                let (file_stdout, file_stderr) = if enable_file_capture {
2031                    match self.file_appender.as_deref() {
2032                        Some(fm) => (
2033                            Some(fm.addr_for(OutputTarget::Stdout)),
2034                            Some(fm.addr_for(OutputTarget::Stderr)),
2035                        ),
2036                        None => {
2037                            tracing::warn!("enable_file_capture=true but no FileAppender");
2038                            (None, None)
2039                        }
2040                    }
2041                } else {
2042                    (None, None)
2043                };
2044
2045                let out = StreamFwder::start(
2046                    stdout,
2047                    file_stdout,
2048                    OutputTarget::Stdout,
2049                    tail_size,
2050                    opts.log_channel.clone(),
2051                    &proc_id,
2052                    config.create_rank,
2053                );
2054                let err = StreamFwder::start(
2055                    stderr,
2056                    file_stderr,
2057                    OutputTarget::Stderr,
2058                    tail_size,
2059                    opts.log_channel.clone(),
2060                    &proc_id,
2061                    config.create_rank,
2062                );
2063                (Some(out), Some(err))
2064            }
2065            StdioHandling::Inherited | StdioHandling::ManagedByLauncher => {
2066                if !need_stdio {
2067                    tracing::info!(
2068                        %proc_id, enable_forwarding, enable_file_capture, tail_size,
2069                        "child stdio NOT captured (forwarding/file_capture/tail all disabled)"
2070                    );
2071                }
2072                (None, None)
2073            }
2074        };
2075
2076        // Create handle with launcher reference for terminate/kill delegation.
2077        let handle = BootstrapProcHandle::new(proc_id.clone(), Arc::downgrade(self.launcher()));
2078        handle.mark_running(launch_result.started_at);
2079        handle.set_stream_monitors(out_fwder, err_fwder);
2080
2081        // Retain handle for lifecycle management.
2082        {
2083            let mut children = self.children.lock().await;
2084            children.insert(proc_id.clone(), handle.clone());
2085        }
2086
2087        // Kick off an exit monitor that updates ProcStatus when the
2088        // launcher reports terminal status.
2089        self.spawn_exit_monitor(proc_id.clone(), handle.clone(), launch_result.exit_rx);
2090
2091        // Handle callback from child proc when it confirms bootstrap.
2092        let h = handle.clone();
2093        tokio::spawn(async move {
2094            match callback_rx.recv().await {
2095                Ok((addr, agent)) => {
2096                    let _ = h.mark_ready(addr, agent);
2097                }
2098                Err(e) => {
2099                    // Child never called back; record failure.
2100                    let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
2101                }
2102            }
2103        });
2104
2105        // Callers do `handle.read().await` for mesh readiness.
2106        Ok(handle)
2107    }
2108}
2109
2110#[async_trait]
2111impl hyperactor::host::SingleTerminate for BootstrapProcManager {
2112    /// Attempt to gracefully terminate one child procs managed by
2113    /// this `BootstrapProcManager`.
2114    ///
2115    /// Each child handle is asked to `terminate(timeout)`, which
2116    /// sends SIGTERM, waits up to the deadline, and escalates to
2117    /// SIGKILL if necessary. Termination is attempted concurrently,
2118    /// with at most `max_in_flight` tasks running at once.
2119    ///
2120    /// Logs a warning for each failure.
2121    async fn terminate_proc(
2122        &self,
2123        cx: &impl context::Actor,
2124        proc: &hyperactor_reference::ProcId,
2125        timeout: Duration,
2126        reason: &str,
2127    ) -> Result<
2128        (
2129            Vec<hyperactor_reference::ActorId>,
2130            Vec<hyperactor_reference::ActorId>,
2131        ),
2132        anyhow::Error,
2133    > {
2134        // Snapshot to avoid holding the lock across awaits.
2135        let proc_handle: Option<BootstrapProcHandle> = {
2136            let mut guard = self.children.lock().await;
2137            guard.remove(proc)
2138        };
2139
2140        if let Some(h) = proc_handle {
2141            h.terminate(cx, timeout, reason)
2142                .await
2143                .map(|_| (Vec::new(), Vec::new()))
2144                .map_err(|e| e.into())
2145        } else {
2146            Err(anyhow::anyhow!("proc doesn't exist: {}", proc))
2147        }
2148    }
2149}
2150
2151#[async_trait]
2152impl hyperactor::host::BulkTerminate for BootstrapProcManager {
2153    /// Attempt to gracefully terminate all child procs managed by
2154    /// this `BootstrapProcManager`.
2155    ///
2156    /// Each child handle is asked to `terminate(timeout)`, which
2157    /// sends SIGTERM, waits up to the deadline, and escalates to
2158    /// SIGKILL if necessary. Termination is attempted concurrently,
2159    /// with at most `max_in_flight` tasks running at once.
2160    ///
2161    /// Returns a [`TerminateSummary`] with counts of how many procs
2162    /// were attempted, how many successfully terminated (including
2163    /// those that were already terminal), and how many failed.
2164    ///
2165    /// Logs a warning for each failure.
2166    async fn terminate_all(
2167        &self,
2168        cx: &impl context::Actor,
2169        timeout: Duration,
2170        max_in_flight: usize,
2171        reason: &str,
2172    ) -> TerminateSummary {
2173        // Snapshot to avoid holding the lock across awaits.
2174        let handles: Vec<BootstrapProcHandle> = {
2175            let guard = self.children.lock().await;
2176            guard.values().cloned().collect()
2177        };
2178
2179        let attempted = handles.len();
2180        let mut ok = 0usize;
2181
2182        let results = stream::iter(handles.into_iter().map(|h| async move {
2183            match h.terminate(cx, timeout, reason).await {
2184                Ok(_) | Err(hyperactor::host::TerminateError::AlreadyTerminated(_)) => {
2185                    // Treat "already terminal" as success.
2186                    true
2187                }
2188                Err(e) => {
2189                    tracing::warn!(error=%e, "terminate_all: failed to terminate child");
2190                    false
2191                }
2192            }
2193        }))
2194        .buffer_unordered(max_in_flight.max(1))
2195        .collect::<Vec<bool>>()
2196        .await;
2197
2198        for r in results {
2199            if r {
2200                ok += 1;
2201            }
2202        }
2203
2204        TerminateSummary {
2205            attempted,
2206            ok,
2207            failed: attempted.saturating_sub(ok),
2208        }
2209    }
2210}
2211
2212/// Entry point to processes managed by hyperactor_mesh. Any process that is part
2213/// of a hyperactor_mesh program should call [`bootstrap`], which then configures
2214/// the process according to how it is invoked.
2215///
2216/// If bootstrap returns any error, it is defunct from the point of view of hyperactor_mesh,
2217/// and the process should likely exit:
2218///
2219/// ```ignore
2220/// let err = hyperactor_mesh::bootstrap().await;
2221/// tracing::error("could not bootstrap mesh process: {}", err);
2222/// std::process::exit(1);
2223/// ```
2224///
2225/// Use [`bootstrap_or_die`] to implement this behavior directly.
2226/// Else if the bootstrap returns Ok, the process has cleaned up successfully and
2227/// should exit the "main" of the program.
2228pub async fn bootstrap() -> anyhow::Result<i32> {
2229    let boot = Bootstrap::get_from_env()?.unwrap_or_else(Bootstrap::default);
2230    boot.bootstrap().await
2231}
2232
2233/// Bootstrap a v0 proc mesh. This launches a control process that responds to
2234/// Allocator2Process messages, conveying its own state in Process2Allocator messages.
2235///
2236/// The bootstrapping process is controlled by the
2237/// following environment variables:
2238///
2239/// - `HYPERACTOR_MESH_BOOTSTRAP_ADDR`: the channel address to which Process2Allocator messages
2240///   should be sent.
2241/// - `HYPERACTOR_MESH_INDEX`: an index used to identify this process to the allocator.
2242async fn bootstrap_v0_proc_mesh(config: Option<Attrs>) -> anyhow::Error {
2243    // Apply config before entering the nested scope
2244    if let Some(attrs) = config {
2245        hyperactor_config::global::set(hyperactor_config::global::Source::ClientOverride, attrs);
2246        tracing::debug!("bootstrap: installed ClientOverride config snapshot (V0ProcMesh)");
2247    } else {
2248        tracing::debug!("bootstrap: no config snapshot provided (V0ProcMesh)");
2249    }
2250    tracing::info!(
2251        "bootstrap_v0_proc_mesh config:\n{}",
2252        hyperactor_config::global::attrs()
2253    );
2254
2255    pub async fn go() -> Result<(), anyhow::Error> {
2256        let procs = Arc::new(tokio::sync::Mutex::new(Vec::<Proc>::new()));
2257        let procs_for_cleanup = procs.clone();
2258        let _cleanup_guard = hyperactor::register_signal_cleanup_scoped(Box::pin(async move {
2259            for proc_to_stop in procs_for_cleanup.lock().await.iter_mut() {
2260                if let Err(err) = proc_to_stop
2261                    .destroy_and_wait::<()>(
2262                        Duration::from_millis(10),
2263                        None,
2264                        "execute cleanup callback",
2265                    )
2266                    .await
2267                {
2268                    tracing::error!(
2269                        "error while stopping proc {}: {}",
2270                        proc_to_stop.proc_id(),
2271                        err
2272                    );
2273                }
2274            }
2275        }));
2276
2277        let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
2278            .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
2279            .parse()?;
2280        let bootstrap_index: usize = std::env::var(BOOTSTRAP_INDEX_ENV)
2281            .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))?
2282            .parse()?;
2283        let listen_addr = ChannelAddr::any(bootstrap_addr.transport());
2284
2285        let entered = tracing::span!(
2286            Level::INFO,
2287            "bootstrap_v0_proc_mesh",
2288            %bootstrap_addr,
2289            %bootstrap_index,
2290            %listen_addr,
2291        )
2292        .entered();
2293
2294        let (serve_addr, mut rx) = channel::serve(listen_addr)?;
2295        let tx = channel::dial(bootstrap_addr.clone())?;
2296
2297        let (rtx, mut return_channel) = oneshot::channel();
2298        tx.try_post(
2299            Process2Allocator(bootstrap_index, Process2AllocatorMessage::Hello(serve_addr)),
2300            rtx,
2301        );
2302        tokio::spawn(exit_if_missed_heartbeat(bootstrap_index, bootstrap_addr));
2303
2304        let _ = entered.exit();
2305
2306        let mut the_msg;
2307
2308        tokio::select! {
2309            msg = rx.recv() => {
2310                the_msg = msg;
2311            }
2312            returned_msg = &mut return_channel => {
2313                match returned_msg {
2314                    Ok(msg) => {
2315                        return Err(anyhow::anyhow!("Hello message was not delivered:{:?}", msg));
2316                    }
2317                    Err(_) => {
2318                        the_msg = rx.recv().await;
2319                    }
2320                }
2321            }
2322        }
2323        loop {
2324            match the_msg? {
2325                Allocator2Process::StartProc(proc_id, listen_transport) => {
2326                    let span = tracing::span!(Level::INFO, "Allocator2Process::StartProc", %proc_id, %listen_transport);
2327                    let (proc, mesh_agent) = ProcAgent::bootstrap(proc_id.clone())
2328                        .instrument(span.clone())
2329                        .await?;
2330                    let entered = span.entered();
2331                    let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?;
2332                    let handle = proc.clone().serve(proc_rx);
2333                    drop(handle); // linter appeasement; it is safe to drop this future
2334                    let span = entered.exit();
2335                    tx.send(Process2Allocator(
2336                        bootstrap_index,
2337                        Process2AllocatorMessage::StartedProc(
2338                            proc_id.clone(),
2339                            mesh_agent.bind(),
2340                            proc_addr,
2341                        ),
2342                    ))
2343                    .instrument(span)
2344                    .await?;
2345                    procs.lock().await.push(proc);
2346                }
2347                Allocator2Process::StopAndExit(code) => {
2348                    tracing::info!("stopping procs with code {code}");
2349                    {
2350                        for proc_to_stop in procs.lock().await.iter_mut() {
2351                            if let Err(err) = proc_to_stop
2352                                .destroy_and_wait::<()>(
2353                                    Duration::from_millis(10),
2354                                    None,
2355                                    "stop and exit",
2356                                )
2357                                .await
2358                            {
2359                                tracing::error!(
2360                                    "error while stopping proc {}: {}",
2361                                    proc_to_stop.proc_id(),
2362                                    err
2363                                );
2364                            }
2365                        }
2366                    }
2367                    tracing::info!("exiting with {code}");
2368                    std::process::exit(code);
2369                }
2370                Allocator2Process::Exit(code) => {
2371                    tracing::info!("exiting with {code}");
2372                    std::process::exit(code);
2373                }
2374            }
2375            the_msg = rx.recv().await;
2376        }
2377    }
2378
2379    go().await.unwrap_err()
2380}
2381
2382/// A variant of [`bootstrap`] that logs the error and exits the process
2383/// if bootstrapping fails.
2384pub async fn bootstrap_or_die() -> ! {
2385    match bootstrap().await {
2386        Ok(exit_code) => std::process::exit(exit_code),
2387        Err(err) => {
2388            let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
2389            tracing::error!("failed to bootstrap mesh process: {}", err);
2390            std::process::exit(1);
2391        }
2392    }
2393}
2394
2395#[derive(enum_as_inner::EnumAsInner)]
2396enum DebugSink {
2397    File(std::fs::File),
2398    Sink,
2399}
2400
2401impl DebugSink {
2402    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2403        match self {
2404            DebugSink::File(f) => f.write(buf),
2405            DebugSink::Sink => Ok(buf.len()),
2406        }
2407    }
2408    fn flush(&mut self) -> io::Result<()> {
2409        match self {
2410            DebugSink::File(f) => f.flush(),
2411            DebugSink::Sink => Ok(()),
2412        }
2413    }
2414}
2415
2416fn debug_sink() -> &'static Mutex<DebugSink> {
2417    static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
2418    DEBUG_SINK.get_or_init(|| {
2419        let debug_path = {
2420            let mut p = std::env::temp_dir();
2421            if let Ok(user) = std::env::var("USER") {
2422                p.push(user);
2423            }
2424            std::fs::create_dir_all(&p).ok();
2425            p.push("monarch-bootstrap-debug.log");
2426            p
2427        };
2428        let sink = if debug_path.exists() {
2429            match OpenOptions::new()
2430                .append(true)
2431                .create(true)
2432                .open(debug_path.clone())
2433            {
2434                Ok(f) => DebugSink::File(f),
2435                Err(_e) => {
2436                    eprintln!(
2437                        "failed to open {} for bootstrap debug logging",
2438                        debug_path.display()
2439                    );
2440                    DebugSink::Sink
2441                }
2442            }
2443        } else {
2444            DebugSink::Sink
2445        };
2446        Mutex::new(sink)
2447    })
2448}
2449
2450/// If true, send `Debug` messages to stderr.
2451const DEBUG_TO_STDERR: bool = false;
2452
2453/// A bootstrap specific debug writer. If the file /tmp/monarch-bootstrap-debug.log
2454/// exists, then the writer's destination is that file; otherwise it discards all writes.
2455struct Debug;
2456
2457impl Debug {
2458    fn is_active() -> bool {
2459        DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
2460    }
2461}
2462
2463impl Write for Debug {
2464    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2465        let res = debug_sink().lock().unwrap().write(buf);
2466        if DEBUG_TO_STDERR {
2467            let n = match res {
2468                Ok(n) => n,
2469                Err(_) => buf.len(),
2470            };
2471            let _ = io::stderr().write_all(&buf[..n]);
2472        }
2473
2474        res
2475    }
2476    fn flush(&mut self) -> io::Result<()> {
2477        let res = debug_sink().lock().unwrap().flush();
2478        if DEBUG_TO_STDERR {
2479            let _ = io::stderr().flush();
2480        }
2481        res
2482    }
2483}
2484
2485/// Create a new runtime [`TempDir`]. The directory is created in
2486/// `$XDG_RUNTIME_DIR`, otherwise falling back to the system tempdir.
2487fn runtime_dir() -> io::Result<TempDir> {
2488    match std::env::var_os("XDG_RUNTIME_DIR") {
2489        Some(runtime_dir) => {
2490            let path = PathBuf::from(runtime_dir);
2491            tempfile::tempdir_in(path)
2492        }
2493        None => tempfile::tempdir(),
2494    }
2495}
2496
2497#[cfg(test)]
2498mod tests {
2499    use std::path::PathBuf;
2500
2501    use hyperactor::RemoteSpawn;
2502    use hyperactor::channel::ChannelAddr;
2503    use hyperactor::channel::ChannelTransport;
2504    use hyperactor::channel::TcpMode;
2505    use hyperactor::context::Mailbox as _;
2506    use hyperactor::host::ProcHandle;
2507    use hyperactor::reference as hyperactor_reference;
2508    use hyperactor::testing::ids::test_proc_id;
2509    use hyperactor::testing::ids::test_proc_id_with_addr;
2510    use hyperactor_config::Flattrs;
2511    use ndslice::Extent;
2512    use ndslice::ViewExt;
2513    use ndslice::extent;
2514    use tokio::process::Command;
2515
2516    use super::*;
2517    use crate::ActorMesh;
2518    use crate::alloc::AllocSpec;
2519    use crate::alloc::Allocator;
2520    use crate::alloc::ProcessAllocator;
2521    use crate::host_mesh::HostMesh;
2522    use crate::testactor;
2523    use crate::testing;
2524
2525    #[test]
2526    fn test_bootstrap_mode_env_string_none_config_proc() {
2527        let values = [
2528            Bootstrap::default(),
2529            Bootstrap::Proc {
2530                proc_id: test_proc_id("foo_0"),
2531                backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
2532                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2533                socket_dir_path: PathBuf::from("notexist"),
2534                config: None,
2535            },
2536        ];
2537
2538        for value in values {
2539            let safe = value.to_env_safe_string().unwrap();
2540            let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2541
2542            // Re-encode and compare: deterministic round-trip of the
2543            // wire format.
2544            let safe2 = round.to_env_safe_string().unwrap();
2545            assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2546
2547            // Sanity: the decoded variant is what we expect.
2548            match (&value, &round) {
2549                (Bootstrap::Proc { config: None, .. }, Bootstrap::Proc { config: None, .. }) => {}
2550                (
2551                    Bootstrap::V0ProcMesh { config: None },
2552                    Bootstrap::V0ProcMesh { config: None },
2553                ) => {}
2554                _ => panic!("decoded variant mismatch: got {:?}", round),
2555            }
2556        }
2557    }
2558
2559    #[test]
2560    fn test_bootstrap_mode_env_string_none_config_host() {
2561        let value = Bootstrap::Host {
2562            addr: ChannelAddr::any(ChannelTransport::Unix),
2563            command: None,
2564            config: None,
2565            exit_on_shutdown: false,
2566        };
2567
2568        let safe = value.to_env_safe_string().unwrap();
2569        let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2570
2571        // Wire-format round-trip should be identical.
2572        let safe2 = round.to_env_safe_string().unwrap();
2573        assert_eq!(safe, safe2);
2574
2575        // Sanity: decoded variant is Host with None config.
2576        match round {
2577            Bootstrap::Host { config: None, .. } => {}
2578            other => panic!("expected Host with None config, got {:?}", other),
2579        }
2580    }
2581
2582    #[test]
2583    fn test_bootstrap_mode_env_string_invalid() {
2584        // Not valid base64
2585        assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2586    }
2587
2588    #[test]
2589    fn test_bootstrap_config_snapshot_roundtrip() {
2590        // Build a small, distinctive Attrs snapshot.
2591        let mut attrs = Attrs::new();
2592        attrs[MESH_TAIL_LOG_LINES] = 123;
2593        attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2594
2595        let socket_dir = runtime_dir().unwrap();
2596
2597        // Proc case
2598        {
2599            let original = Bootstrap::Proc {
2600                proc_id: test_proc_id("foo_42"),
2601                backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2602                callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2603                config: Some(attrs.clone()),
2604                socket_dir_path: socket_dir.path().to_owned(),
2605            };
2606            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2607            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2608            match &decoded {
2609                Bootstrap::Proc { config, .. } => {
2610                    let cfg = config.as_ref().expect("expected Some(attrs)");
2611                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2612                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2613                }
2614                other => panic!("unexpected variant after roundtrip: {:?}", other),
2615            }
2616        }
2617
2618        // Host case
2619        {
2620            let original = Bootstrap::Host {
2621                addr: ChannelAddr::any(ChannelTransport::Unix),
2622                command: None,
2623                config: Some(attrs.clone()),
2624                exit_on_shutdown: false,
2625            };
2626            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2627            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2628            match &decoded {
2629                Bootstrap::Host { config, .. } => {
2630                    let cfg = config.as_ref().expect("expected Some(attrs)");
2631                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2632                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2633                }
2634                other => panic!("unexpected variant after roundtrip: {:?}", other),
2635            }
2636        }
2637
2638        // V0ProcMesh case
2639        {
2640            let original = Bootstrap::V0ProcMesh {
2641                config: Some(attrs.clone()),
2642            };
2643            let env_str = original.to_env_safe_string().expect("encode bootstrap");
2644            let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2645            match &decoded {
2646                Bootstrap::V0ProcMesh { config } => {
2647                    let cfg = config.as_ref().expect("expected Some(attrs)");
2648                    assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2649                    assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2650                }
2651                other => panic!("unexpected variant after roundtrip: {:?}", other),
2652            }
2653        }
2654    }
2655
2656    #[tokio::test]
2657    async fn test_v1_child_logging() {
2658        use hyperactor::channel;
2659        use hyperactor::mailbox::BoxedMailboxSender;
2660        use hyperactor::mailbox::DialMailboxRouter;
2661        use hyperactor::mailbox::MailboxServer;
2662        use hyperactor::proc::Proc;
2663
2664        use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2665        use crate::logging::LogClientActor;
2666        use crate::logging::LogClientMessageClient;
2667        use crate::logging::LogForwardActor;
2668        use crate::logging::LogMessage;
2669        use crate::logging::OutputTarget;
2670        use crate::logging::test_tap;
2671
2672        let router = DialMailboxRouter::new();
2673        let (proc_addr, proc_rx) =
2674            channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2675        let proc = Proc::configured(
2676            test_proc_id("client_0"),
2677            BoxedMailboxSender::new(router.clone()),
2678        );
2679        proc.clone().serve(proc_rx);
2680        router.bind(test_proc_id("client_0").into(), proc_addr.clone());
2681        let (client, _handle) = proc.instance("client").unwrap();
2682
2683        let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2684        test_tap::install(tap_tx);
2685
2686        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2687        // SAFETY: unit-test scoped env var
2688        unsafe {
2689            std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2690        }
2691
2692        // Spawn the log client and disable aggregation (immediate
2693        // print + tap push).
2694        let log_client_actor = LogClientActor::new((), Flattrs::default()).await.unwrap();
2695        let log_client: hyperactor_reference::ActorRef<LogClientActor> =
2696            proc.spawn("log_client", log_client_actor).unwrap().bind();
2697        log_client.set_aggregate(&client, None).await.unwrap();
2698
2699        // Spawn the forwarder in this proc (it will serve
2700        // BOOTSTRAP_LOG_CHANNEL).
2701        let log_forwarder_actor = LogForwardActor::new(log_client.clone(), Flattrs::default())
2702            .await
2703            .unwrap();
2704        let _log_forwarder: hyperactor_reference::ActorRef<LogForwardActor> = proc
2705            .spawn("log_forwarder", log_forwarder_actor)
2706            .unwrap()
2707            .bind();
2708
2709        // Dial the channel but don't post until we know the forwarder
2710        // is receiving.
2711        let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2712
2713        // Send a fake log message as if it came from the proc
2714        // manager's writer.
2715        tx.post(LogMessage::Log {
2716            hostname: "testhost".into(),
2717            proc_id: "testproc[0]".into(),
2718            output_target: OutputTarget::Stdout,
2719            payload: wirevalue::Any::serialize(&"hello from child".to_string()).unwrap(),
2720        });
2721
2722        // Assert we see it via the tap.
2723        let line = tokio::time::timeout(Duration::from_secs(2), tap_rx.recv())
2724            .await
2725            .expect("timed out waiting for log line")
2726            .expect("tap channel closed unexpectedly");
2727        assert!(
2728            line.contains("hello from child"),
2729            "log line did not appear via LogClientActor; got: {line}"
2730        );
2731    }
2732
2733    mod proc_handle {
2734
2735        use std::sync::Arc;
2736        use std::time::Duration;
2737
2738        use async_trait::async_trait;
2739        use hyperactor::host::ProcHandle;
2740        use hyperactor::reference as hyperactor_reference;
2741        use hyperactor::testing::ids::test_proc_id;
2742
2743        use super::super::*;
2744        use crate::proc_launcher::LaunchOptions;
2745        use crate::proc_launcher::LaunchResult;
2746        use crate::proc_launcher::ProcLauncher;
2747        use crate::proc_launcher::ProcLauncherError;
2748
2749        /// A test launcher that panics on any method call.
2750        ///
2751        /// This is used by unit tests that only exercise status
2752        /// transitions on `BootstrapProcHandle` without actually
2753        /// launching or terminating processes. If any launcher method
2754        /// is called, the test will panic—indicating an unexpected
2755        /// code path.
2756        struct TestProcLauncher;
2757
2758        #[async_trait]
2759        impl ProcLauncher for TestProcLauncher {
2760            async fn launch(
2761                &self,
2762                _proc_id: &hyperactor_reference::ProcId,
2763                _opts: LaunchOptions,
2764            ) -> Result<LaunchResult, ProcLauncherError> {
2765                panic!("TestProcLauncher::launch should not be called in unit tests");
2766            }
2767
2768            async fn terminate(
2769                &self,
2770                _proc_id: &hyperactor_reference::ProcId,
2771                _timeout: Duration,
2772            ) -> Result<(), ProcLauncherError> {
2773                panic!("TestProcLauncher::terminate should not be called in unit tests");
2774            }
2775
2776            async fn kill(
2777                &self,
2778                _proc_id: &hyperactor_reference::ProcId,
2779            ) -> Result<(), ProcLauncherError> {
2780                panic!("TestProcLauncher::kill should not be called in unit tests");
2781            }
2782        }
2783
2784        // Helper: build a ProcHandle for state-transition unit tests.
2785        //
2786        // This creates a handle with a no-op test launcher. The
2787        // launcher will panic if any of its methods are called,
2788        // ensuring tests only exercise status transitions and not
2789        // actual process lifecycle.
2790        fn handle_for_test() -> BootstrapProcHandle {
2791            let proc_id = test_proc_id("0");
2792            let launcher: Arc<dyn ProcLauncher> = Arc::new(TestProcLauncher);
2793            BootstrapProcHandle::new(proc_id, Arc::downgrade(&launcher))
2794        }
2795
2796        #[tokio::test]
2797        async fn starting_to_running_ok() {
2798            let h = handle_for_test();
2799            assert!(matches!(h.status(), ProcStatus::Starting));
2800            let child_started_at = std::time::SystemTime::now();
2801            assert!(h.mark_running(child_started_at));
2802            match h.status() {
2803                ProcStatus::Running { started_at } => {
2804                    assert_eq!(started_at, child_started_at);
2805                }
2806                other => panic!("expected Running, got {other:?}"),
2807            }
2808        }
2809
2810        #[tokio::test]
2811        async fn running_to_stopping_to_stopped_ok() {
2812            let h = handle_for_test();
2813            let child_started_at = std::time::SystemTime::now();
2814            assert!(h.mark_running(child_started_at));
2815            assert!(h.mark_stopping());
2816            assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2817            assert!(h.mark_stopped(0, Vec::new()));
2818            assert!(matches!(
2819                h.status(),
2820                ProcStatus::Stopped { exit_code: 0, .. }
2821            ));
2822        }
2823
2824        #[tokio::test]
2825        async fn running_to_killed_ok() {
2826            let h = handle_for_test();
2827            let child_started_at = std::time::SystemTime::now();
2828            assert!(h.mark_running(child_started_at));
2829            assert!(h.mark_killed(9, true));
2830            assert!(matches!(
2831                h.status(),
2832                ProcStatus::Killed {
2833                    signal: 9,
2834                    core_dumped: true
2835                }
2836            ));
2837        }
2838
2839        #[tokio::test]
2840        async fn running_to_failed_ok() {
2841            let h = handle_for_test();
2842            let child_started_at = std::time::SystemTime::now();
2843            assert!(h.mark_running(child_started_at));
2844            assert!(h.mark_failed("bootstrap error"));
2845            match h.status() {
2846                ProcStatus::Failed { reason } => {
2847                    assert_eq!(reason, "bootstrap error");
2848                }
2849                other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2850            }
2851        }
2852
2853        #[tokio::test]
2854        async fn illegal_transitions_are_rejected() {
2855            let h = handle_for_test();
2856            let child_started_at = std::time::SystemTime::now();
2857            // Starting -> Running is fine; second Running should be rejected.
2858            assert!(h.mark_running(child_started_at));
2859            assert!(!h.mark_running(std::time::SystemTime::now()));
2860            assert!(matches!(h.status(), ProcStatus::Running { .. }));
2861            // Once Stopped, we can't go to Running/Killed/Failed/etc.
2862            assert!(h.mark_stopping());
2863            assert!(h.mark_stopped(0, Vec::new()));
2864            assert!(!h.mark_running(child_started_at));
2865            assert!(!h.mark_killed(9, false));
2866            assert!(!h.mark_failed("nope"));
2867
2868            assert!(matches!(
2869                h.status(),
2870                ProcStatus::Stopped { exit_code: 0, .. }
2871            ));
2872        }
2873
2874        #[tokio::test]
2875        async fn transitions_from_ready_are_legal() {
2876            let h = handle_for_test();
2877            let addr = ChannelAddr::any(ChannelTransport::Unix);
2878            // Mark Running.
2879            let t0 = std::time::SystemTime::now();
2880            assert!(h.mark_running(t0));
2881            // Build a consistent AgentRef for Ready using the
2882            // handle's ProcId.
2883            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2884            let actor_id = proc_id.actor_id("proc_agent", 0);
2885            let agent_ref: hyperactor_reference::ActorRef<ProcAgent> =
2886                hyperactor_reference::ActorRef::attest(actor_id);
2887            // Ready -> Stopping -> Stopped should be legal.
2888            assert!(h.mark_ready(addr, agent_ref));
2889            assert!(h.mark_stopping());
2890            assert!(h.mark_stopped(0, Vec::new()));
2891        }
2892
2893        #[tokio::test]
2894        async fn ready_to_killed_is_legal() {
2895            let h = handle_for_test();
2896            let addr = ChannelAddr::any(ChannelTransport::Unix);
2897            // Starting -> Running
2898            let t0 = std::time::SystemTime::now();
2899            assert!(h.mark_running(t0));
2900            // Build a consistent AgentRef for Ready using the
2901            // handle's ProcId.
2902            let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2903            let actor_id = proc_id.actor_id("proc_agent", 0);
2904            let agent: hyperactor_reference::ActorRef<ProcAgent> =
2905                hyperactor_reference::ActorRef::attest(actor_id);
2906            // Running -> Ready
2907            assert!(h.mark_ready(addr, agent));
2908            // Ready -> Killed
2909            assert!(h.mark_killed(9, false));
2910        }
2911
2912        #[tokio::test]
2913        async fn mark_failed_from_stopping_is_allowed() {
2914            let h = handle_for_test();
2915
2916            // Drive Starting -> Stopping.
2917            assert!(h.mark_stopping(), "precondition: to Stopping");
2918
2919            // Now allow Stopping -> Failed.
2920            assert!(
2921                h.mark_failed("boom"),
2922                "mark_failed() should succeed from Stopping"
2923            );
2924            match h.status() {
2925                ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2926                other => panic!("expected Failed(\"boom\"), got {other:?}"),
2927            }
2928        }
2929    }
2930
2931    /// A test launcher that panics on any method call.
2932    ///
2933    /// This is used by unit tests that only exercise status
2934    /// transitions on `BootstrapProcHandle` without actually
2935    /// launching or terminating processes.
2936    struct TestLauncher;
2937
2938    #[async_trait::async_trait]
2939    impl crate::proc_launcher::ProcLauncher for TestLauncher {
2940        async fn launch(
2941            &self,
2942            _proc_id: &hyperactor_reference::ProcId,
2943            _opts: crate::proc_launcher::LaunchOptions,
2944        ) -> Result<crate::proc_launcher::LaunchResult, crate::proc_launcher::ProcLauncherError>
2945        {
2946            panic!("TestLauncher::launch should not be called in unit tests");
2947        }
2948
2949        async fn terminate(
2950            &self,
2951            _proc_id: &hyperactor_reference::ProcId,
2952            _timeout: std::time::Duration,
2953        ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
2954            panic!("TestLauncher::terminate should not be called in unit tests");
2955        }
2956
2957        async fn kill(
2958            &self,
2959            _proc_id: &hyperactor_reference::ProcId,
2960        ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
2961            panic!("TestLauncher::kill should not be called in unit tests");
2962        }
2963    }
2964
2965    fn test_handle(proc_id: hyperactor_reference::ProcId) -> BootstrapProcHandle {
2966        let launcher: std::sync::Arc<dyn crate::proc_launcher::ProcLauncher> =
2967            std::sync::Arc::new(TestLauncher);
2968        BootstrapProcHandle::new(proc_id, std::sync::Arc::downgrade(&launcher))
2969    }
2970
2971    #[tokio::test]
2972    async fn watch_notifies_on_status_changes() {
2973        let proc_id = test_proc_id("1");
2974        let handle = test_handle(proc_id);
2975        let mut rx = handle.watch();
2976
2977        // Starting -> Running
2978        let now = std::time::SystemTime::now();
2979        assert!(handle.mark_running(now));
2980        rx.changed().await.ok(); // Observe the transition.
2981        match &*rx.borrow() {
2982            ProcStatus::Running { started_at } => {
2983                assert_eq!(*started_at, now);
2984            }
2985            s => panic!("expected Running, got {s:?}"),
2986        }
2987
2988        // Running -> Stopped
2989        assert!(handle.mark_stopped(0, Vec::new()));
2990        rx.changed().await.ok(); // Observe the transition.
2991        assert!(matches!(
2992            &*rx.borrow(),
2993            ProcStatus::Stopped { exit_code: 0, .. }
2994        ));
2995    }
2996
2997    #[tokio::test]
2998    async fn ready_errs_if_process_exits_before_running() {
2999        let proc_id =
3000            test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "early-exit");
3001        let handle = test_handle(proc_id);
3002
3003        // Simulate the exit monitor doing its job directly here.
3004        // (Equivalent outcome: terminal state before Running.)
3005        assert!(handle.mark_stopped(7, Vec::new()));
3006
3007        // `ready()` should return Err with the terminal status.
3008        match handle.ready_inner().await {
3009            Ok(()) => panic!("ready() unexpectedly succeeded"),
3010            Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
3011                assert_eq!(exit_code, 7)
3012            }
3013            Err(other) => panic!("expected Stopped(7), got {other:?}"),
3014        }
3015    }
3016
3017    #[tokio::test]
3018    async fn status_unknown_proc_is_none() {
3019        let manager = BootstrapProcManager::new(BootstrapCommand {
3020            program: PathBuf::from("/bin/true"),
3021            ..Default::default()
3022        })
3023        .unwrap();
3024        let unknown = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "nope");
3025        assert!(manager.status(&unknown).await.is_none());
3026    }
3027
3028    #[tokio::test]
3029    async fn handle_ready_allows_waiters() {
3030        let proc_id = test_proc_id("42");
3031        let handle = test_handle(proc_id.clone());
3032
3033        let started_at = std::time::SystemTime::now();
3034        assert!(handle.mark_running(started_at));
3035
3036        let actor_id = proc_id.actor_id("proc_agent", 0);
3037        let agent_ref: hyperactor_reference::ActorRef<ProcAgent> =
3038            hyperactor_reference::ActorRef::attest(actor_id);
3039
3040        // Pick any addr to carry in Ready (what the child would have
3041        // called back with).
3042        let ready_addr = ChannelAddr::any(ChannelTransport::Unix);
3043
3044        // Stamp Ready and assert ready().await unblocks.
3045        assert!(handle.mark_ready(ready_addr.clone(), agent_ref));
3046        handle
3047            .ready_inner()
3048            .await
3049            .expect("ready_inner() should complete after Ready");
3050
3051        // Sanity-check the Ready fields we control
3052        // (started_at/addr).
3053        match handle.status() {
3054            ProcStatus::Ready {
3055                started_at: t,
3056                addr: a,
3057                ..
3058            } => {
3059                assert_eq!(t, started_at);
3060                assert_eq!(a, ready_addr);
3061            }
3062            other => panic!("expected Ready, got {other:?}"),
3063        }
3064    }
3065
3066    #[test]
3067    fn display_running_includes_uptime() {
3068        let started_at = std::time::SystemTime::now() - Duration::from_secs(42);
3069        let st = ProcStatus::Running { started_at };
3070
3071        let s = format!("{}", st);
3072        assert!(s.contains("Running"));
3073        assert!(s.contains("42s"));
3074    }
3075
3076    #[test]
3077    fn display_ready_includes_addr() {
3078        let started_at = std::time::SystemTime::now() - Duration::from_secs(5);
3079        let addr = ChannelAddr::any(ChannelTransport::Unix);
3080        let agent = hyperactor_reference::ActorRef::attest(
3081            test_proc_id_with_addr(addr.clone(), "proc")
3082                .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
3083        );
3084
3085        let st = ProcStatus::Ready {
3086            started_at,
3087            addr: addr.clone(),
3088            agent,
3089        };
3090
3091        let s = format!("{}", st);
3092        assert!(s.contains(&addr.to_string())); // addr
3093        assert!(s.contains("Ready"));
3094    }
3095
3096    #[test]
3097    fn display_stopped_includes_exit_code() {
3098        let st = ProcStatus::Stopped {
3099            exit_code: 7,
3100            stderr_tail: Vec::new(),
3101        };
3102        let s = format!("{}", st);
3103        assert!(s.contains("Stopped"));
3104        assert!(s.contains("7"));
3105    }
3106
3107    #[test]
3108    fn display_other_variants_does_not_panic() {
3109        let samples = vec![
3110            ProcStatus::Starting,
3111            ProcStatus::Stopping {
3112                started_at: std::time::SystemTime::now(),
3113            },
3114            ProcStatus::Ready {
3115                started_at: std::time::SystemTime::now(),
3116                addr: ChannelAddr::any(ChannelTransport::Unix),
3117                agent: hyperactor_reference::ActorRef::attest(
3118                    test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "x")
3119                        .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
3120                ),
3121            },
3122            ProcStatus::Killed {
3123                signal: 9,
3124                core_dumped: false,
3125            },
3126            ProcStatus::Failed {
3127                reason: "boom".into(),
3128            },
3129        ];
3130
3131        for st in samples {
3132            let _ = format!("{}", st); // Just make sure it doesn't panic.
3133        }
3134    }
3135
3136    #[tokio::test]
3137    async fn proc_handle_ready_ok_through_trait() {
3138        let proc_id =
3139            test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-ready-ok");
3140        let handle = test_handle(proc_id.clone());
3141
3142        // Starting -> Running
3143        let t0 = std::time::SystemTime::now();
3144        assert!(handle.mark_running(t0));
3145
3146        // Synthesize Ready data
3147        let addr = ChannelAddr::any(ChannelTransport::Unix);
3148        let agent: hyperactor_reference::ActorRef<ProcAgent> =
3149            hyperactor_reference::ActorRef::attest(proc_id.actor_id("proc_agent", 0));
3150        assert!(handle.mark_ready(addr, agent));
3151
3152        // Call the trait method (not ready_inner).
3153        let r = <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await;
3154        assert!(r.is_ok(), "expected Ok(()), got {r:?}");
3155    }
3156
3157    #[tokio::test]
3158    async fn proc_handle_wait_returns_terminal_status() {
3159        let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-wait");
3160        let handle = test_handle(proc_id);
3161
3162        // Drive directly to a terminal state before calling wait()
3163        assert!(handle.mark_stopped(0, Vec::new()));
3164
3165        // Call the trait method (not wait_inner)
3166        let st = <BootstrapProcHandle as hyperactor::host::ProcHandle>::wait(&handle)
3167            .await
3168            .expect("wait should return Ok(terminal)");
3169
3170        match st {
3171            ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
3172            other => panic!("expected Stopped(0), got {other:?}"),
3173        }
3174    }
3175
3176    #[tokio::test]
3177    async fn ready_wrapper_maps_terminal_to_trait_error() {
3178        let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "wrap");
3179        let handle = test_handle(proc_id);
3180
3181        assert!(handle.mark_stopped(7, Vec::new()));
3182
3183        match <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await {
3184            Ok(()) => panic!("expected Err"),
3185            Err(hyperactor::host::ReadyError::Terminal(ProcStatus::Stopped {
3186                exit_code, ..
3187            })) => {
3188                assert_eq!(exit_code, 7);
3189            }
3190            Err(e) => panic!("unexpected error: {e:?}"),
3191        }
3192    }
3193
3194    /// Create a ProcId and a host **backend_addr** channel that the
3195    /// bootstrap child proc will dial to attach its mailbox to the
3196    /// host.
3197    ///
3198    /// - `proc_id`: logical identity for the child proc (pure name;
3199    ///   not an OS pid).
3200    /// - `backend_addr`: a mailbox address served by the **parent
3201    ///   (host) proc** here; the spawned bootstrap process dials this
3202    ///   so its messages route via the host.
3203    async fn make_proc_id_and_backend_addr(
3204        instance: &hyperactor::Instance<()>,
3205        _tag: &str,
3206    ) -> (hyperactor_reference::ProcId, ChannelAddr) {
3207        // Serve a Unix channel as the "backend_addr" and hook it into
3208        // this test proc.
3209        let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
3210
3211        // Route messages arriving on backend_addr into this test
3212        // proc's mailbox so the bootstrap child can reach the host
3213        // router.
3214        instance.proc().clone().serve(rx);
3215
3216        // We return an arbitrary (but unbound!) unix direct proc id here;
3217        // it is okay, as we're not testing connectivity.
3218        let proc_id = test_proc_id_with_addr(ChannelTransport::Unix.any(), "proc");
3219        (proc_id, backend_addr)
3220    }
3221
3222    #[tokio::test]
3223    #[cfg(fbcode_build)]
3224    async fn bootstrap_handle_terminate_graceful() {
3225        // Create a root direct-addressed proc + client instance.
3226        let root =
3227            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3228        let (instance, _handle) = root.instance("client").unwrap();
3229
3230        let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3231        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3232        let handle = mgr
3233            .spawn(
3234                proc_id.clone(),
3235                backend_addr.clone(),
3236                BootstrapProcConfig {
3237                    create_rank: 0,
3238                    client_config_override: Attrs::new(),
3239                },
3240            )
3241            .await
3242            .expect("spawn bootstrap child");
3243
3244        handle.ready().await.expect("ready");
3245
3246        let deadline = Duration::from_secs(2);
3247        match tokio::time::timeout(
3248            deadline * 2,
3249            handle.terminate(&instance, deadline, "test terminate"),
3250        )
3251        .await
3252        {
3253            Err(_) => panic!("terminate() future hung"),
3254            Ok(Ok(st)) => {
3255                match st {
3256                    ProcStatus::Stopped { exit_code, .. } => {
3257                        // child called exit(0) on SIGTERM
3258                        assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3259                    }
3260                    ProcStatus::Killed { signal, .. } => {
3261                        // If the child didn't trap SIGTERM, we'd see
3262                        // SIGTERM (15) here and indeed, this is what
3263                        // we see. Since we call
3264                        // `hyperactor::initialize_with_current_runtime();`
3265                        // we seem unable to trap `SIGTERM` and
3266                        // instead folly intercepts:
3267                        // [0] *** Aborted at 1758850539 (Unix time, try 'date -d @1758850539') ***
3268                        // [0] *** Signal 15 (SIGTERM) (0x3951c00173692) received by PID 1527420 (pthread TID 0x7f803de66cc0) (linux TID 1527420) (maybe from PID 1521298, UID 234780) (code: 0), stack trace: ***
3269                        // [0]     @ 000000000000e713 folly::symbolizer::(anonymous namespace)::innerSignalHandler(int, siginfo_t*, void*)
3270                        // [0]                        ./fbcode/folly/debugging/symbolizer/SignalHandler.cpp:485
3271                        // It gets worse. When run with
3272                        // '@fbcode//mode/dev-nosan' it terminates
3273                        // with a SEGFAULT (metamate says this is a
3274                        // well known issue at Meta). So, TL;DR I
3275                        // restore default `SIGTERM` handling after
3276                        // the test exe has called
3277                        // `initialize_with_runtime`.
3278                        assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3279                    }
3280                    other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3281                }
3282            }
3283            Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3284        }
3285    }
3286
3287    #[tokio::test]
3288    #[cfg(fbcode_build)]
3289    async fn bootstrap_handle_kill_forced() {
3290        // Root proc + client instance (so the child can dial back).
3291        let root =
3292            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3293        let (instance, _handle) = root.instance("client").unwrap();
3294
3295        let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3296
3297        // Proc identity + host backend channel the child will dial.
3298        let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3299
3300        // Launch the child bootstrap process.
3301        let handle = mgr
3302            .spawn(
3303                proc_id.clone(),
3304                backend_addr.clone(),
3305                BootstrapProcConfig {
3306                    create_rank: 0,
3307                    client_config_override: Attrs::new(),
3308                },
3309            )
3310            .await
3311            .expect("spawn bootstrap child");
3312
3313        // Wait until the child reports Ready (addr+agent returned via
3314        // callback).
3315        handle.ready().await.expect("ready");
3316
3317        // Force-kill the child and assert we observe a Killed
3318        // terminal status.
3319        let deadline = Duration::from_secs(5);
3320        match tokio::time::timeout(deadline, handle.kill()).await {
3321            Err(_) => panic!("kill() future hung"),
3322            Ok(Ok(st)) => {
3323                // We expect a KILLED terminal state.
3324                match st {
3325                    ProcStatus::Killed { signal, .. } => {
3326                        // On Linux this should be SIGKILL (9).
3327                        assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3328                    }
3329                    other => panic!("expected Killed status after kill(); got: {other:?}"),
3330                }
3331            }
3332            Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3333        }
3334    }
3335
3336    #[tokio::test]
3337    #[cfg(fbcode_build)]
3338    async fn bootstrap_canonical_simple() {
3339        // SAFETY: unit-test scoped
3340        unsafe {
3341            std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false");
3342        }
3343        // Create an actor instance we'll use to send and receive
3344        // messages.
3345        let instance = testing::instance();
3346
3347        // Configure a ProcessAllocator with the bootstrap binary.
3348        let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3349            "monarch/hyperactor_mesh/bootstrap",
3350        )));
3351        // Request a new allocation of procs from the ProcessAllocator.
3352        let alloc = allocator
3353            .allocate(AllocSpec {
3354                extent: extent!(replicas = 1),
3355                constraints: Default::default(),
3356                proc_name: None,
3357                transport: ChannelTransport::Unix,
3358                proc_allocation_mode: Default::default(),
3359            })
3360            .await
3361            .unwrap();
3362
3363        // Build a HostMesh with explicit OS-process boundaries (per
3364        // rank):
3365        //
3366        // (1) Allocator → bootstrap proc [OS process #1]
3367        //     `ProcMesh::allocate(..)` starts one OS process per
3368        //     rank; each runs our runtime and the trampoline actor.
3369        //
3370        // (2) Host::serve(..) sets up a Host in the same OS process
3371        //     (no new process). It binds front/back channels, creates
3372        //     an in-process service proc (`Proc::configured(..)`), and
3373        //     stores the `BootstrapProcManager` for later spawns.
3374        //
3375        // (3) Install HostAgent (still no new OS process).
3376        //     `host.system_proc().spawn::<HostAgent>("host_agent",
3377        //     host).await?` creates the HostAgent actor in that
3378        //     service proc.
3379        //
3380        // (4) Collect & assemble. The trampoline returns a
3381        //     direct-addressed `ActorRef<HostAgent>`; we collect
3382        //     one per rank and assemble a `HostMesh`.
3383        //
3384        // Note: When the Host is later asked to start a proc
3385        // (`host.spawn(name)`), it calls `ProcManager::spawn` on the
3386        // stored `BootstrapProcManager`, which does a
3387        // `Command::spawn()` to launch a new OS child process for
3388        // that proc.
3389        let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3390            .await
3391            .unwrap();
3392
3393        // Spawn a ProcMesh named "p0" on the host mesh:
3394        //
3395        // (1) Each HostAgent (running inside its host's service
3396        // proc) receives the request.
3397        //
3398        // (2) The Host calls into its `BootstrapProcManager::spawn`,
3399        // which does `Command::spawn()` to launch a brand-new OS
3400        // process for the proc.
3401        //
3402        // (3) Inside that new process, bootstrap runs and a
3403        // `ProcAgent` is started to manage it.
3404        //
3405        // (4) We collect the per-host procs into a `ProcMesh` and
3406        // return it.
3407        let proc_mesh = host_mesh
3408            .spawn(&instance, "p0", Extent::unity())
3409            .await
3410            .unwrap();
3411
3412        // Note: There is no support for status() in v1.
3413        // assert!(proc_mesh.status(&instance).await.is_err());
3414        // let proc_ref = proc_mesh.values().next().expect("one proc");
3415        // assert!(proc_ref.status(&instance).await.unwrap(), "proc should be alive");
3416
3417        // Spawn an `ActorMesh<TestActor>` named "a0" on the proc mesh:
3418        //
3419        // (1) For each proc (already running in its own OS process),
3420        // the `ProcAgent` receives the request.
3421        //
3422        // (2) It spawns a `TestActor` inside that existing proc (no
3423        // new OS process).
3424        //
3425        // (3) The per-proc actors are collected into an
3426        // `ActorMesh<TestActor>` and returned.
3427        let actor_mesh: ActorMesh<testactor::TestActor> =
3428            proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3429
3430        // Open a fresh port on the client instance and send a
3431        // GetActorId message to the actor mesh. Each TestActor will
3432        // reply with its actor ID to the bound port. Receive one
3433        // reply and assert it matches the ID of the (single) actor in
3434        // the mesh.
3435        let (port, mut rx) = instance.mailbox().open_port();
3436        actor_mesh
3437            .cast(&instance, testactor::GetActorId(port.bind()))
3438            .unwrap();
3439        let (got_id, _seq) = rx.recv().await.unwrap();
3440        assert_eq!(
3441            got_id,
3442            actor_mesh.values().next().unwrap().actor_id().clone()
3443        );
3444
3445        // **Important**: If we don't shutdown the hosts, the
3446        // BootstrapProcManager's won't send SIGKILLs to their spawned
3447        // children and there will be orphans left behind.
3448        host_mesh.shutdown(&instance).await.expect("host shutdown");
3449    }
3450
3451    /// Same as `bootstrap_canonical_simple` but using the systemd
3452    /// launcher backend.
3453    #[tokio::test]
3454    #[cfg(all(fbcode_build, target_os = "linux"))]
3455    async fn bootstrap_canonical_simple_systemd_launcher() {
3456        // Acquire exclusive config lock and select systemd launcher.
3457        let config = hyperactor_config::global::lock();
3458        let _guard = config.override_key(MESH_PROC_LAUNCHER_KIND, "systemd".to_string());
3459
3460        // Create an actor instance we'll use to send and receive
3461        // messages.
3462        let instance = testing::instance();
3463
3464        // Configure a ProcessAllocator with the bootstrap binary.
3465        let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3466            "monarch/hyperactor_mesh/bootstrap",
3467        )));
3468        // Request a new allocation of procs from the
3469        // ProcessAllocator.
3470        let alloc = allocator
3471            .allocate(AllocSpec {
3472                extent: extent!(replicas = 1),
3473                constraints: Default::default(),
3474                proc_name: None,
3475                transport: ChannelTransport::Unix,
3476                proc_allocation_mode: Default::default(),
3477            })
3478            .await
3479            .unwrap();
3480
3481        // Build a HostMesh (see bootstrap_canonical_simple for
3482        // detailed comments).
3483        let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3484            .await
3485            .unwrap();
3486
3487        // Spawn a ProcMesh named "p0" on the host mesh.
3488        let proc_mesh = host_mesh
3489            .spawn(&instance, "p0", Extent::unity())
3490            .await
3491            .unwrap();
3492
3493        // Spawn an `ActorMesh<TestActor>` named "a0" on the proc
3494        // mesh.
3495        let actor_mesh: ActorMesh<testactor::TestActor> =
3496            proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3497
3498        // Send a GetActorId message and verify we get a response.
3499        let (port, mut rx) = instance.mailbox().open_port();
3500        actor_mesh
3501            .cast(&instance, testactor::GetActorId(port.bind()))
3502            .unwrap();
3503        let (got_id, _) = rx.recv().await.unwrap();
3504        assert_eq!(
3505            got_id,
3506            actor_mesh.values().next().unwrap().actor_id().clone()
3507        );
3508
3509        // Capture the proc_id and expected unit name before shutdown.
3510        use crate::proc_launcher::SystemdProcLauncher;
3511        use crate::systemd::SystemdManagerProxy;
3512        use crate::systemd::SystemdUnitProxy;
3513
3514        let proc_id = proc_mesh.proc_ids().next().expect("one proc");
3515        let expected_unit = SystemdProcLauncher::unit_name(&proc_id);
3516
3517        // Shutdown cleanly.
3518        //
3519        // Contract: after shutdown, procs are no longer running and
3520        // the mesh is quiescent. We intentionally do NOT assert that
3521        // the systemd transient unit disappears. With
3522        // RemainAfterExit=true, systemd may keep the unit around for
3523        // observation and GC it later according to its own policy;
3524        // that persistence is not considered a leak.
3525        host_mesh.shutdown(&instance).await.expect("host shutdown");
3526
3527        // Liveness check (best-effort): unit may persist, but should
3528        // not remain running. With RemainAfterExit=true, systemd may
3529        // keep the unit around in active/exited after the process has
3530        // terminated. That is not a leak. We only require that the
3531        // unit is not running after shutdown.
3532        let conn = zbus::Connection::session().await.expect("D-Bus session");
3533        let manager = SystemdManagerProxy::new(&conn)
3534            .await
3535            .expect("manager proxy");
3536
3537        let mut ok = false;
3538        for _ in 0..100 {
3539            match manager.get_unit(&expected_unit).await {
3540                Err(_) => {
3541                    // Unit already gone: fine.
3542                    ok = true;
3543                    break;
3544                }
3545                Ok(path) => {
3546                    if let Ok(unit) = SystemdUnitProxy::builder(&conn)
3547                        .path(path)
3548                        .unwrap()
3549                        .build()
3550                        .await
3551                    {
3552                        let active = unit.active_state().await.unwrap_or_default();
3553                        let sub = unit.sub_state().await.unwrap_or_default();
3554                        // Fail only if still running; any other state
3555                        // is acceptable.
3556                        if !(active == "active" && sub == "running") && active != "activating" {
3557                            ok = true;
3558                            break;
3559                        }
3560                    }
3561                }
3562            }
3563            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3564        }
3565        assert!(
3566            ok,
3567            "unit should be gone or quiescent (not running) after shutdown"
3568        );
3569    }
3570
3571    #[tokio::test]
3572    #[cfg(fbcode_build)]
3573    async fn test_host_bootstrap() {
3574        use crate::host_mesh::host_agent::GetLocalProcClient;
3575        use crate::proc_agent::NewClientInstanceClient;
3576
3577        // Create a local instance just to call the local bootstrap actor.
3578        // We should find a way to avoid this for local handles.
3579        let temp_proc = Proc::local();
3580        let (temp_instance, _) = temp_proc.instance("temp").unwrap();
3581
3582        let handle = host(
3583            ChannelAddr::any(ChannelTransport::Unix),
3584            Some(BootstrapCommand::test()),
3585            None,
3586            false,
3587        )
3588        .await
3589        .unwrap();
3590
3591        let local_proc = handle.0.get_local_proc(&temp_instance).await.unwrap();
3592        let _local_instance = local_proc
3593            .new_client_instance(&temp_instance)
3594            .await
3595            .unwrap();
3596    }
3597
3598    // BootstrapProcManager OnceLock Semantics Tests
3599    //
3600    // These tests verify the "install exactly once / default locks
3601    // in" behavior of the proc launcher OnceLock.
3602
3603    use std::time::Duration;
3604
3605    use crate::proc_launcher::LaunchOptions;
3606    use crate::proc_launcher::LaunchResult;
3607    use crate::proc_launcher::ProcExitKind;
3608    use crate::proc_launcher::ProcExitResult;
3609    use crate::proc_launcher::ProcLauncher;
3610    use crate::proc_launcher::ProcLauncherError;
3611    use crate::proc_launcher::StdioHandling;
3612
3613    /// A dummy proc launcher for testing. Does not actually launch
3614    /// anything.
3615    #[allow(dead_code)]
3616    struct DummyLauncher {
3617        /// Marker value to identify this instance.
3618        marker: u64,
3619    }
3620
3621    impl DummyLauncher {
3622        fn new(marker: u64) -> Self {
3623            Self { marker }
3624        }
3625
3626        #[allow(dead_code)]
3627        fn marker(&self) -> u64 {
3628            self.marker
3629        }
3630    }
3631
3632    #[async_trait::async_trait]
3633    impl ProcLauncher for DummyLauncher {
3634        async fn launch(
3635            &self,
3636            _proc_id: &hyperactor_reference::ProcId,
3637            _opts: LaunchOptions,
3638        ) -> Result<LaunchResult, ProcLauncherError> {
3639            let (tx, rx) = tokio::sync::oneshot::channel();
3640            // Immediately send exit result
3641            let _ = tx.send(ProcExitResult {
3642                kind: ProcExitKind::Exited { code: 0 },
3643                stderr_tail: Some(vec![]),
3644            });
3645            Ok(LaunchResult {
3646                pid: None,
3647                started_at: std::time::SystemTime::now(),
3648                stdio: StdioHandling::ManagedByLauncher,
3649                exit_rx: rx,
3650            })
3651        }
3652
3653        async fn terminate(
3654            &self,
3655            _proc_id: &hyperactor_reference::ProcId,
3656            _timeout: Duration,
3657        ) -> Result<(), ProcLauncherError> {
3658            Ok(())
3659        }
3660
3661        async fn kill(
3662            &self,
3663            _proc_id: &hyperactor_reference::ProcId,
3664        ) -> Result<(), ProcLauncherError> {
3665            Ok(())
3666        }
3667    }
3668
3669    // set_launcher() then launcher() returns the same Arc.
3670    #[test]
3671    #[cfg(fbcode_build)]
3672    fn test_set_launcher_then_get() {
3673        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3674
3675        let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(42));
3676        let custom_ptr = Arc::as_ptr(&custom);
3677
3678        // Install the custom launcher
3679        manager.set_launcher(custom).unwrap();
3680
3681        // Get the launcher and verify it's the same Arc
3682        let got = manager.launcher();
3683        let got_ptr = Arc::as_ptr(got);
3684
3685        assert_eq!(
3686            custom_ptr, got_ptr,
3687            "launcher() should return the same Arc that was set"
3688        );
3689    }
3690
3691    // launcher() first (forces default init), then set_launcher()
3692    // fails.
3693    #[test]
3694    #[cfg(fbcode_build)]
3695    fn test_get_launcher_then_set_fails() {
3696        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3697
3698        // Force default initialization by calling launcher()
3699        let _ = manager.launcher();
3700
3701        // Now try to set a custom launcher - should fail
3702        let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(99));
3703        let result = manager.set_launcher(custom);
3704
3705        assert!(
3706            result.is_err(),
3707            "set_launcher should fail after launcher() was called"
3708        );
3709
3710        // Verify error message mentions the cause
3711        let err = result.unwrap_err();
3712        let err_msg = err.to_string();
3713        assert!(
3714            err_msg.contains("already initialized"),
3715            "error should mention 'already initialized', got: {}",
3716            err_msg
3717        );
3718    }
3719
3720    // set_launcher() twice fails.
3721    #[test]
3722    #[cfg(fbcode_build)]
3723    fn test_set_launcher_twice_fails() {
3724        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3725
3726        let first: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(1));
3727        let second: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(2));
3728
3729        // First set should succeed
3730        manager.set_launcher(first).unwrap();
3731
3732        // Second set should fail
3733        let result = manager.set_launcher(second);
3734        assert!(result.is_err(), "second set_launcher should fail");
3735
3736        // Verify error message
3737        let err = result.unwrap_err();
3738        let err_msg = err.to_string();
3739        assert!(
3740            err_msg.contains("already initialized"),
3741            "error should mention 'already initialized', got: {}",
3742            err_msg
3743        );
3744    }
3745
3746    /// OnceLock is empty before any call.
3747    #[test]
3748    #[cfg(fbcode_build)]
3749    fn test_launcher_initially_empty() {
3750        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3751
3752        // At this point, the OnceLock should be empty (not yet
3753        // initialized) We can verify this by successfully calling
3754        // set_launcher
3755        let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(123));
3756        let result = manager.set_launcher(custom);
3757
3758        assert!(
3759            result.is_ok(),
3760            "set_launcher should succeed on fresh manager"
3761        );
3762    }
3763
3764    /// launcher() returns the same Arc on repeated calls.
3765    #[test]
3766    #[cfg(fbcode_build)]
3767    fn test_launcher_idempotent() {
3768        let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3769
3770        // Call launcher() twice
3771        let first = manager.launcher();
3772        let second = manager.launcher();
3773
3774        // Should return the same Arc (same pointer)
3775        assert!(
3776            Arc::ptr_eq(first, second),
3777            "launcher() should return the same Arc on repeated calls"
3778        );
3779    }
3780}