hyperactor_mesh/
proc_launcher.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Proc launching abstraction.
10//!
11//! This module defines a small strategy interface, [`ProcLauncher`],
12//! used by
13//! [`BootstrapProcManager`](crate::bootstrap::BootstrapProcManager)
14//! to start and stop procs while keeping lifecycle tracking
15//! centralized in the manager.
16//!
17//! A launcher is responsible for the *mechanics* of running a proc
18//! (native OS process, container, VM, etc.) and for reporting
19//! *terminal status* back to the manager.
20//!
21//! ## Key properties
22//!
23//! - **Readiness is callback-driven.** A proc is considered *ready*
24//!   only when it signals readiness via the existing bootstrap
25//!   callback (`callback_addr`) in the [`Bootstrap`] payload.
26//!   Launchers do not determine readiness.
27//! - **Terminal status is channel-driven.** `launch` returns a
28//!   [`LaunchResult`] whose `exit_rx` resolves exactly once with a
29//!   [`ProcExitResult`]. This channel is the single source of truth
30//!   for how a proc ended.
31//! - **Termination is initiation-only.** [`ProcLauncher::terminate`]
32//!   and [`ProcLauncher::kill`] initiate shutdown and return without
33//!   waiting for the proc to exit; callers observe completion by
34//!   awaiting `exit_rx` (or a higher level handle built on it).
35//!
36//! ## Stdio handling
37//!
38//! [`StdioHandling`] describes whether stdout/stderr are made
39//! available to the manager for forwarding and tail collection
40//! (`Captured`), inherited (`Inherited`), or handled entirely by the
41//! launcher (`ManagedByLauncher`).
42#![allow(dead_code, unused_imports)] // Temporary
43
44use std::collections::HashMap;
45use std::fmt;
46use std::time::Duration;
47
48use async_trait::async_trait;
49use hyperactor::channel::ChannelAddr;
50use hyperactor::reference as hyperactor_reference;
51use serde::Deserialize;
52use serde::Serialize;
53use tokio::process::ChildStderr;
54use tokio::process::ChildStdout;
55use tokio::sync::oneshot;
56use typeuri::Named;
57
58use crate::bootstrap;
59use crate::bootstrap::BootstrapCommand;
60
61mod native;
62pub(crate) use native::NativeProcLauncher;
63
64#[cfg(target_os = "linux")]
65mod systemd;
66#[cfg(target_os = "linux")]
67pub(crate) use systemd::SystemdProcLauncher;
68
69/// Result of launching a proc.
70///
71/// The launcher arranges for terminal status to be delivered on
72/// `exit_rx`. `exit_rx` is the single source of truth for terminal
73/// status.
74#[derive(Debug)]
75pub struct LaunchResult {
76    /// OS process ID if known (`None` for containers/VMs without
77    /// visible PID).
78    pub pid: Option<u32>,
79    /// Captured immediately after spawn succeeds.
80    pub started_at: std::time::SystemTime,
81    /// How stdio is handled.
82    pub stdio: StdioHandling,
83    /// Fires exactly once with the proc terminal result.
84    pub exit_rx: oneshot::Receiver<ProcExitResult>,
85}
86
87/// How proc stdio is handled.
88pub enum StdioHandling {
89    /// Pipes provided; manager can attach StreamFwder / tail
90    /// collection.
91    Captured {
92        stdout: ChildStdout,
93        stderr: ChildStderr,
94    },
95    /// Inherited from parent process (no capture).
96    Inherited,
97    /// Launcher manages logs internally (e.g. Docker log streaming).
98    /// Spawner provides stderr_tail in ProcExitResult. Manager should
99    /// not attach StreamFwder.
100    ManagedByLauncher,
101}
102
103impl fmt::Debug for StdioHandling {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        match self {
106            StdioHandling::Captured { .. } => f.write_str("Captured"),
107            StdioHandling::Inherited => f.write_str("Inherited"),
108            StdioHandling::ManagedByLauncher => f.write_str("ManagedByLauncher"),
109        }
110    }
111}
112
113/// How a proc terminated.
114#[derive(Debug, Clone)]
115pub enum ProcExitKind {
116    /// Normal exit with code.
117    Exited { code: i32 },
118    /// Killed by signal.
119    Signaled { signal: i32, core_dumped: bool },
120    /// Launcher level failure (spawn/wait/plumbing failed).
121    Failed { reason: String },
122}
123
124/// Terminal status of a proc.
125#[derive(Debug, Clone)]
126pub struct ProcExitResult {
127    /// How the proc terminated.
128    pub kind: ProcExitKind,
129    /// Tail of stderr output if available.
130    pub stderr_tail: Option<Vec<String>>,
131}
132
133/// Errors produced by proc launching / termination backends.
134///
135/// For now, this is intentionally lightweight:
136/// - `Launch` preserves an `io::Error` source chain (useful
137///   immediately).
138/// - `Terminate` / `Kill` carry a string for now, since we haven't
139///   committed to a concrete error taxonomy (e.g. nix vs actor RPC vs
140///   container APIs).
141/// - `Other` is a catch-all for plumbing / unexpected failures.
142///
143/// As additional launchers are introduced (e.g. actor-based
144/// spawning), we can refine this into more structured variants
145/// without changing the trait shape.
146#[derive(Debug, thiserror::Error)]
147pub enum ProcLauncherError {
148    /// Failure while launching a proc (e.g. command spawn failure).
149    #[error("launch failed: {0}")]
150    Launch(#[source] std::io::Error),
151    /// Failure while initiating graceful termination.
152    #[error("terminate failed: {0}")]
153    Terminate(String),
154    /// Failure while initiating an immediate kill.
155    #[error("kill failed: {0}")]
156    Kill(String),
157    /// Miscellaneous launcher failure not captured by the other
158    /// variants.
159    ///
160    /// Useful for "shouldn't happen" plumbing errors or
161    /// backend-specific cases we haven't modeled yet.
162    #[error("launcher error: {0}")]
163    Other(String),
164}
165
166/// Per-process CPU/NUMA binding configuration.
167///
168/// When attached to a proc spec, the bootstrap command is wrapped with
169/// `numactl` (on NUMA systems) or `taskset` (Linux fallback) before launch.
170#[derive(Clone, Debug, Default, Serialize, Deserialize, Named)]
171pub struct ProcBind {
172    /// NUMA node for CPU binding (`numactl --cpunodebind`).
173    pub cpunodebind: Option<String>,
174    /// NUMA node for memory binding (`numactl --membind`).
175    pub membind: Option<String>,
176    /// Physical CPU list (`numactl --physcpubind`).
177    pub physcpubind: Option<String>,
178    /// CPU set for taskset fallback (`taskset -c`).
179    pub cpus: Option<String>,
180}
181wirevalue::register_type!(ProcBind);
182
183impl From<HashMap<String, String>> for ProcBind {
184    fn from(map: HashMap<String, String>) -> Self {
185        Self {
186            cpunodebind: map.get("cpunodebind").cloned(),
187            membind: map.get("membind").cloned(),
188            physcpubind: map.get("physcpubind").cloned(),
189            cpus: map.get("cpus").cloned(),
190        }
191    }
192}
193
194/// Per-launch policy computed by the manager and handed to the
195/// launcher.
196///
197/// Motivation: different launch backends (native OS process, docker,
198/// python/actor protocol) need different *mechanics*, but they should
199/// all receive the same *intent* from the manager:
200/// - do we need a stdio stream we can forward / tail?
201/// - if so, how many tail lines should be retained for diagnostics?
202/// - if forwarding over the mesh is enabled, what channel address
203///   should be used (if applicable)?
204
205#[derive(Debug, Clone)]
206pub struct LaunchOptions {
207    /// Serialized Bootstrap payload for
208    /// HYPERACTOR_MESH_BOOTSTRAP_MODE env var.
209    pub bootstrap_payload: String,
210
211    /// Human-readable process name for HYPERACTOR_PROCESS_NAME env
212    /// var.
213    pub process_name: String,
214
215    /// The bootstrap command describing what should be executed for a
216    /// proc.
217    ///
218    /// This is the *payload* of what to run; the launcher decides
219    /// *how* to run it:
220    /// - Native launcher: execs it directly (spawns the command).
221    /// - systemd/container/VM/actor launchers: may treat it as the
222    ///   command to run *inside* the launched environment.
223    ///
224    /// Used by backends to construct the actual invocation.
225    pub command: BootstrapCommand,
226
227    /// If true, the manager wants access to a stream (pipes or an
228    /// equivalent).
229    ///
230    /// Native: pipe stdout/stderr (`Stdio::piped()`), return
231    /// `StdioHandling::Captured`. Docker/Python: may implement as log
232    /// streaming, RPC, etc., or ignore and return
233    /// `StdioHandling::ManagedByLauncher` / `Inherited` depending on
234    /// capability.
235    pub want_stdio: bool,
236
237    /// Max number of stderr (and/or stdout) lines retained for
238    /// diagnostics/tailing.
239    ///
240    /// Manager uses this when attaching `StreamFwder` for native
241    /// `Captured` pipes. Backends that manage logs internally may use
242    /// it to decide how much to retain before reporting `stderr_tail`
243    /// in `ProcExitResult`.
244    ///
245    /// `0` means "no tail buffering requested".
246    pub tail_lines: usize,
247
248    /// Optional "forward logs over mesh" address.
249    ///
250    /// This is **provisioned by the manager** when log-forwarding is
251    /// enabled.
252    /// - `None` means "no mesh log forwarding requested".
253    /// - `Some(addr)` means "arrange for child logs to be forwarded
254    ///   to `addr`" (if the backend supports it).
255    ///
256    /// Native backend: sets `BOOTSTRAP_LOG_CHANNEL=addr` in the child
257    /// env when `Some`. Other backends may ignore it or use it to
258    /// wire their own forwarding mechanism.
259    pub log_channel: Option<ChannelAddr>,
260
261    /// Optional CPU/NUMA binding for this proc.
262    ///
263    /// Launchers that support binding should apply it using
264    /// backend-appropriate mechanisms (e.g., `numactl` for native,
265    /// unit properties for systemd). Launchers that do not support
266    /// it may ignore this field.
267    pub proc_bind: Option<ProcBind>,
268}
269
270/// Format a human-readable process name for diagnostics and logs.
271///
272/// Used by launchers to set `HYPERACTOR_PROCESS_NAME` environment
273/// variable.
274///
275/// This string is intended for operators (not for stable identity).
276/// We populate [`PROCESS_NAME_ENV`] with it so the launched proc can
277/// include a friendly identifier in logs, crash reports, etc.
278///
279/// Format:
280/// - `ProcId(_, name)` → `<name> @ <host_process_name>`
281///
282/// The host identity is taken from the current process's
283/// `HYPERACTOR_PROCESS_NAME`, falling back to the machine hostname.
284/// This groups procs under their host process in traces and logs.
285pub fn format_process_name(proc_id: &hyperactor_reference::ProcId) -> String {
286    let who = proc_id.name();
287
288    let host = std::env::var(bootstrap::PROCESS_NAME_ENV).unwrap_or_else(|_| {
289        hostname::get()
290            .unwrap_or_else(|_| "unknown_host".into())
291            .into_string()
292            .unwrap_or("unknown_host".to_string())
293    });
294
295    format!("{} @ {}", who, host)
296}
297
298/// Strategy interface for launching and stopping a proc.
299///
300/// This trait is internal to `hyperactor_mesh`:
301/// `BootstrapProcManager` uses it to delegate the mechanics of
302/// starting and stopping a proc while keeping lifecycle tracking
303/// (readiness and terminal status) centralized in the manager.
304///
305/// Contract:
306/// - **Readiness is determined by the bootstrap callback mechanism
307///   (`callback_addr`)**, not by this trait. A proc is considered
308///   ready only when it invokes the callback, regardless of its
309///   underlying execution state.
310/// - **Terminal status is sourced from `LaunchResult::exit_rx`**:
311///   `launch` must return an `exit_rx` that resolves exactly once
312///   with the proc's terminal outcome. Implementations must ensure
313///   `exit_rx` resolves even if setup fails after partial work (for
314///   example, if spawning succeeds but exit monitoring cannot be
315///   established).
316/// - `terminate` and `kill` initiate shutdown and return without
317///   waiting for exit. Callers observe completion by awaiting
318///   `exit_rx` (or a higher-level handle built on it).
319///
320/// **Process tree semantics:** A launched proc may involve wrapper
321/// processes (shell, runtime shims, sanitizers) and/or spawn
322/// descendants. Launchers should treat termination as applying to the
323/// entire launched process tree (e.g., by placing the child in its
324/// own process group and signaling the group). Callers must not
325/// assume the returned PID is the only process that needs
326/// terminating.
327#[async_trait]
328pub trait ProcLauncher: Send + Sync + 'static {
329    /// Launch a proc using the provided bootstrap payload and config.
330    ///
331    /// Implementations must:
332    /// - Start the underlying proc/container/VM.
333    /// - Arrange for `LaunchResult::exit_rx` to resolve exactly once
334    ///   with the terminal outcome.
335    /// - Return quickly after starting the proc (readiness is handled
336    ///   elsewhere).
337    ///
338    /// `opts` communicates *policy/intent* computed by the manager.
339    /// The launcher uses it to choose backend-specific mechanics
340    /// (pipes vs inherit, log streaming, etc.).
341    async fn launch(
342        &self,
343        proc_id: &hyperactor_reference::ProcId,
344        opts: LaunchOptions,
345    ) -> Result<LaunchResult, ProcLauncherError>;
346
347    /// Initiate graceful termination.
348    ///
349    /// Semantics:
350    /// - Send a graceful termination request (SIGTERM / RPC / API
351    ///   call).
352    /// - Schedule escalation to `kill` after `timeout` if
353    ///   appropriate.
354    /// - Return immediately; final status is delivered through
355    ///   `exit_rx`.
356    ///
357    /// This is a fallback mechanism used when higher-level
358    /// (agent-first) termination cannot be applied or fails.
359    async fn terminate(
360        &self,
361        proc_id: &hyperactor_reference::ProcId,
362        timeout: Duration,
363    ) -> Result<(), ProcLauncherError>;
364
365    /// Initiate a force-kill.
366    ///
367    /// Semantics:
368    /// - Request termination as forcefully as the backend allows.
369    /// - Return immediately; final status is delivered through
370    ///   `exit_rx`.
371    ///
372    /// The exact mechanism is backend-specific:
373    /// - **Native**: sends SIGKILL directly to the PID.
374    /// - **Systemd**: calls `StopUnit` (same as `terminate`), which
375    ///   sends SIGTERM and escalates to SIGKILL after the unit's
376    ///   configured timeout. There is no separate "immediate SIGKILL"
377    ///   API in the systemd D-Bus interface without adding `KillUnit`.
378    ///
379    /// **Note**: For backends like systemd, `kill()` currently behaves
380    /// identically to `terminate()`. Callers who need a stronger
381    /// guarantee should await `exit_rx` with a timeout rather than
382    /// assuming immediate termination.
383    ///
384    /// Idempotent behavior is preferred: killing an already-dead proc
385    /// should not be treated as an error unless the backend cannot
386    /// determine state.
387    async fn kill(&self, proc_id: &hyperactor_reference::ProcId) -> Result<(), ProcLauncherError>;
388}