hyperactor_mesh/
proc_launcher.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Proc launching abstraction.
10//!
11//! This module defines a small strategy interface, [`ProcLauncher`],
12//! used by
13//! [`BootstrapProcManager`](crate::bootstrap::BootstrapProcManager)
14//! to start and stop procs while keeping lifecycle tracking
15//! centralized in the manager.
16//!
17//! A launcher is responsible for the *mechanics* of running a proc
18//! (native OS process, container, VM, etc.) and for reporting
19//! *terminal status* back to the manager.
20//!
21//! ## Key properties
22//!
23//! - **Readiness is callback-driven.** A proc is considered *ready*
24//!   only when it signals readiness via the existing bootstrap
25//!   callback (`callback_addr`) in the [`Bootstrap`] payload.
26//!   Launchers do not determine readiness.
27//! - **Terminal status is channel-driven.** `launch` returns a
28//!   [`LaunchResult`] whose `exit_rx` resolves exactly once with a
29//!   [`ProcExitResult`]. This channel is the single source of truth
30//!   for how a proc ended.
31//! - **Termination is initiation-only.** [`ProcLauncher::terminate`]
32//!   and [`ProcLauncher::kill`] initiate shutdown and return without
33//!   waiting for the proc to exit; callers observe completion by
34//!   awaiting `exit_rx` (or a higher level handle built on it).
35//!
36//! ## Stdio handling
37//!
38//! [`StdioHandling`] describes whether stdout/stderr are made
39//! available to the manager for forwarding and tail collection
40//! (`Captured`), inherited (`Inherited`), or handled entirely by the
41//! launcher (`ManagedByLauncher`).
42#![allow(dead_code, unused_imports)] // Temporary
43
44use std::fmt;
45use std::time::Duration;
46
47use async_trait::async_trait;
48use hyperactor::channel::ChannelAddr;
49use hyperactor::reference as hyperactor_reference;
50use tokio::process::ChildStderr;
51use tokio::process::ChildStdout;
52use tokio::sync::oneshot;
53
54use crate::bootstrap::BootstrapCommand;
55
56mod native;
57pub(crate) use native::NativeProcLauncher;
58
59#[cfg(target_os = "linux")]
60mod systemd;
61#[cfg(target_os = "linux")]
62pub(crate) use systemd::SystemdProcLauncher;
63
64/// Result of launching a proc.
65///
66/// The launcher arranges for terminal status to be delivered on
67/// `exit_rx`. `exit_rx` is the single source of truth for terminal
68/// status.
69#[derive(Debug)]
70pub struct LaunchResult {
71    /// OS process ID if known (`None` for containers/VMs without
72    /// visible PID).
73    pub pid: Option<u32>,
74    /// Captured immediately after spawn succeeds.
75    pub started_at: std::time::SystemTime,
76    /// How stdio is handled.
77    pub stdio: StdioHandling,
78    /// Fires exactly once with the proc terminal result.
79    pub exit_rx: oneshot::Receiver<ProcExitResult>,
80}
81
82/// How proc stdio is handled.
83pub enum StdioHandling {
84    /// Pipes provided; manager can attach StreamFwder / tail
85    /// collection.
86    Captured {
87        stdout: ChildStdout,
88        stderr: ChildStderr,
89    },
90    /// Inherited from parent process (no capture).
91    Inherited,
92    /// Launcher manages logs internally (e.g. Docker log streaming).
93    /// Spawner provides stderr_tail in ProcExitResult. Manager should
94    /// not attach StreamFwder.
95    ManagedByLauncher,
96}
97
98impl fmt::Debug for StdioHandling {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        match self {
101            StdioHandling::Captured { .. } => f.write_str("Captured"),
102            StdioHandling::Inherited => f.write_str("Inherited"),
103            StdioHandling::ManagedByLauncher => f.write_str("ManagedByLauncher"),
104        }
105    }
106}
107
108/// How a proc terminated.
109#[derive(Debug, Clone)]
110pub enum ProcExitKind {
111    /// Normal exit with code.
112    Exited { code: i32 },
113    /// Killed by signal.
114    Signaled { signal: i32, core_dumped: bool },
115    /// Launcher level failure (spawn/wait/plumbing failed).
116    Failed { reason: String },
117}
118
119/// Terminal status of a proc.
120#[derive(Debug, Clone)]
121pub struct ProcExitResult {
122    /// How the proc terminated.
123    pub kind: ProcExitKind,
124    /// Tail of stderr output if available.
125    pub stderr_tail: Option<Vec<String>>,
126}
127
128/// Errors produced by proc launching / termination backends.
129///
130/// For now, this is intentionally lightweight:
131/// - `Launch` preserves an `io::Error` source chain (useful
132///   immediately).
133/// - `Terminate` / `Kill` carry a string for now, since we haven't
134///   committed to a concrete error taxonomy (e.g. nix vs actor RPC vs
135///   container APIs).
136/// - `Other` is a catch-all for plumbing / unexpected failures.
137///
138/// As additional launchers are introduced (e.g. actor-based
139/// spawning), we can refine this into more structured variants
140/// without changing the trait shape.
141#[derive(Debug, thiserror::Error)]
142pub enum ProcLauncherError {
143    /// Failure while launching a proc (e.g. command spawn failure).
144    #[error("launch failed: {0}")]
145    Launch(#[source] std::io::Error),
146    /// Failure while initiating graceful termination.
147    #[error("terminate failed: {0}")]
148    Terminate(String),
149    /// Failure while initiating an immediate kill.
150    #[error("kill failed: {0}")]
151    Kill(String),
152    /// Miscellaneous launcher failure not captured by the other
153    /// variants.
154    ///
155    /// Useful for "shouldn't happen" plumbing errors or
156    /// backend-specific cases we haven't modeled yet.
157    #[error("launcher error: {0}")]
158    Other(String),
159}
160
161/// Per-launch policy computed by the manager and handed to the
162/// launcher.
163///
164/// Motivation: different launch backends (native OS process, docker,
165/// python/actor protocol) need different *mechanics*, but they should
166/// all receive the same *intent* from the manager:
167/// - do we need a stdio stream we can forward / tail?
168/// - if so, how many tail lines should be retained for diagnostics?
169/// - if forwarding over the mesh is enabled, what channel address
170///   should be used (if applicable)?
171
172#[derive(Debug, Clone)]
173pub struct LaunchOptions {
174    /// Serialized Bootstrap payload for
175    /// HYPERACTOR_MESH_BOOTSTRAP_MODE env var.
176    pub bootstrap_payload: String,
177
178    /// Human-readable process name for HYPERACTOR_PROCESS_NAME env
179    /// var.
180    pub process_name: String,
181
182    /// The bootstrap command describing what should be executed for a
183    /// proc.
184    ///
185    /// This is the *payload* of what to run; the launcher decides
186    /// *how* to run it:
187    /// - Native launcher: execs it directly (spawns the command).
188    /// - systemd/container/VM/actor launchers: may treat it as the
189    ///   command to run *inside* the launched environment.
190    ///
191    /// Used by backends to construct the actual invocation.
192    pub command: BootstrapCommand,
193
194    /// If true, the manager wants access to a stream (pipes or an
195    /// equivalent).
196    ///
197    /// Native: pipe stdout/stderr (`Stdio::piped()`), return
198    /// `StdioHandling::Captured`. Docker/Python: may implement as log
199    /// streaming, RPC, etc., or ignore and return
200    /// `StdioHandling::ManagedByLauncher` / `Inherited` depending on
201    /// capability.
202    pub want_stdio: bool,
203
204    /// Max number of stderr (and/or stdout) lines retained for
205    /// diagnostics/tailing.
206    ///
207    /// Manager uses this when attaching `StreamFwder` for native
208    /// `Captured` pipes. Backends that manage logs internally may use
209    /// it to decide how much to retain before reporting `stderr_tail`
210    /// in `ProcExitResult`.
211    ///
212    /// `0` means "no tail buffering requested".
213    pub tail_lines: usize,
214
215    /// Optional "forward logs over mesh" address.
216    ///
217    /// This is **provisioned by the manager** when log-forwarding is
218    /// enabled.
219    /// - `None` means "no mesh log forwarding requested".
220    /// - `Some(addr)` means "arrange for child logs to be forwarded
221    ///   to `addr`" (if the backend supports it).
222    ///
223    /// Native backend: sets `BOOTSTRAP_LOG_CHANNEL=addr` in the child
224    /// env when `Some`. Other backends may ignore it or use it to
225    /// wire their own forwarding mechanism.
226    pub log_channel: Option<ChannelAddr>,
227}
228
229/// Format a human-readable process name for diagnostics and logs.
230///
231/// Used by launchers to set `HYPERACTOR_PROCESS_NAME` environment
232/// variable.
233///
234/// This string is intended for operators (not for stable identity).
235/// We populate [`PROCESS_NAME_ENV`] with it so the launched proc can
236/// include a friendly identifier in logs, crash reports, etc.
237///
238/// Format:
239/// - `ProcId(_, name)` → `proc <name> @ <hostname>`
240///
241/// Notes:
242/// - We best-effort resolve the local hostname; on failure or
243///   non-UTF8 we fall back to `"unknown_host"`.
244/// - This is **not** guaranteed to be unique and should not be parsed
245///   for program logic.
246pub fn format_process_name(proc_id: &hyperactor_reference::ProcId) -> String {
247    let who = proc_id.name();
248
249    let host = hostname::get()
250        .unwrap_or_else(|_| "unknown_host".into())
251        .into_string()
252        .unwrap_or("unknown_host".to_string());
253
254    format!("proc {} @ {}", who, host)
255}
256
257/// Strategy interface for launching and stopping a proc.
258///
259/// This trait is internal to `hyperactor_mesh`:
260/// `BootstrapProcManager` uses it to delegate the mechanics of
261/// starting and stopping a proc while keeping lifecycle tracking
262/// (readiness and terminal status) centralized in the manager.
263///
264/// Contract:
265/// - **Readiness is determined by the bootstrap callback mechanism
266///   (`callback_addr`)**, not by this trait. A proc is considered
267///   ready only when it invokes the callback, regardless of its
268///   underlying execution state.
269/// - **Terminal status is sourced from `LaunchResult::exit_rx`**:
270///   `launch` must return an `exit_rx` that resolves exactly once
271///   with the proc's terminal outcome. Implementations must ensure
272///   `exit_rx` resolves even if setup fails after partial work (for
273///   example, if spawning succeeds but exit monitoring cannot be
274///   established).
275/// - `terminate` and `kill` initiate shutdown and return without
276///   waiting for exit. Callers observe completion by awaiting
277///   `exit_rx` (or a higher-level handle built on it).
278///
279/// **Process tree semantics:** A launched proc may involve wrapper
280/// processes (shell, runtime shims, sanitizers) and/or spawn
281/// descendants. Launchers should treat termination as applying to the
282/// entire launched process tree (e.g., by placing the child in its
283/// own process group and signaling the group). Callers must not
284/// assume the returned PID is the only process that needs
285/// terminating.
286#[async_trait]
287pub trait ProcLauncher: Send + Sync + 'static {
288    /// Launch a proc using the provided bootstrap payload and config.
289    ///
290    /// Implementations must:
291    /// - Start the underlying proc/container/VM.
292    /// - Arrange for `LaunchResult::exit_rx` to resolve exactly once
293    ///   with the terminal outcome.
294    /// - Return quickly after starting the proc (readiness is handled
295    ///   elsewhere).
296    ///
297    /// `opts` communicates *policy/intent* computed by the manager.
298    /// The launcher uses it to choose backend-specific mechanics
299    /// (pipes vs inherit, log streaming, etc.).
300    async fn launch(
301        &self,
302        proc_id: &hyperactor_reference::ProcId,
303        opts: LaunchOptions,
304    ) -> Result<LaunchResult, ProcLauncherError>;
305
306    /// Initiate graceful termination.
307    ///
308    /// Semantics:
309    /// - Send a graceful termination request (SIGTERM / RPC / API
310    ///   call).
311    /// - Schedule escalation to `kill` after `timeout` if
312    ///   appropriate.
313    /// - Return immediately; final status is delivered through
314    ///   `exit_rx`.
315    ///
316    /// This is a fallback mechanism used when higher-level
317    /// (agent-first) termination cannot be applied or fails.
318    async fn terminate(
319        &self,
320        proc_id: &hyperactor_reference::ProcId,
321        timeout: Duration,
322    ) -> Result<(), ProcLauncherError>;
323
324    /// Initiate a force-kill.
325    ///
326    /// Semantics:
327    /// - Request termination as forcefully as the backend allows.
328    /// - Return immediately; final status is delivered through
329    ///   `exit_rx`.
330    ///
331    /// The exact mechanism is backend-specific:
332    /// - **Native**: sends SIGKILL directly to the PID.
333    /// - **Systemd**: calls `StopUnit` (same as `terminate`), which
334    ///   sends SIGTERM and escalates to SIGKILL after the unit's
335    ///   configured timeout. There is no separate "immediate SIGKILL"
336    ///   API in the systemd D-Bus interface without adding `KillUnit`.
337    ///
338    /// **Note**: For backends like systemd, `kill()` currently behaves
339    /// identically to `terminate()`. Callers who need a stronger
340    /// guarantee should await `exit_rx` with a timeout rather than
341    /// assuming immediate termination.
342    ///
343    /// Idempotent behavior is preferred: killing an already-dead proc
344    /// should not be treated as an error unless the backend cannot
345    /// determine state.
346    async fn kill(&self, proc_id: &hyperactor_reference::ProcId) -> Result<(), ProcLauncherError>;
347}