hyperactor_mesh/proc_launcher.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Proc launching abstraction.
10//!
11//! This module defines a small strategy interface, [`ProcLauncher`],
12//! used by
13//! [`BootstrapProcManager`](crate::bootstrap::BootstrapProcManager)
14//! to start and stop procs while keeping lifecycle tracking
15//! centralized in the manager.
16//!
17//! A launcher is responsible for the *mechanics* of running a proc
18//! (native OS process, container, VM, etc.) and for reporting
19//! *terminal status* back to the manager.
20//!
21//! ## Key properties
22//!
23//! - **Readiness is callback-driven.** A proc is considered *ready*
24//! only when it signals readiness via the existing bootstrap
25//! callback (`callback_addr`) in the [`Bootstrap`] payload.
26//! Launchers do not determine readiness.
27//! - **Terminal status is channel-driven.** `launch` returns a
28//! [`LaunchResult`] whose `exit_rx` resolves exactly once with a
29//! [`ProcExitResult`]. This channel is the single source of truth
30//! for how a proc ended.
31//! - **Termination is initiation-only.** [`ProcLauncher::terminate`]
32//! and [`ProcLauncher::kill`] initiate shutdown and return without
33//! waiting for the proc to exit; callers observe completion by
34//! awaiting `exit_rx` (or a higher level handle built on it).
35//!
36//! ## Stdio handling
37//!
38//! [`StdioHandling`] describes whether stdout/stderr are made
39//! available to the manager for forwarding and tail collection
40//! (`Captured`), inherited (`Inherited`), or handled entirely by the
41//! launcher (`ManagedByLauncher`).
42#![allow(dead_code, unused_imports)] // Temporary
43
44use std::collections::HashMap;
45use std::fmt;
46use std::time::Duration;
47
48use async_trait::async_trait;
49use hyperactor::channel::ChannelAddr;
50use hyperactor::reference as hyperactor_reference;
51use serde::Deserialize;
52use serde::Serialize;
53use tokio::process::ChildStderr;
54use tokio::process::ChildStdout;
55use tokio::sync::oneshot;
56use typeuri::Named;
57
58use crate::bootstrap;
59use crate::bootstrap::BootstrapCommand;
60
61mod native;
62pub(crate) use native::NativeProcLauncher;
63
64#[cfg(target_os = "linux")]
65mod systemd;
66#[cfg(target_os = "linux")]
67pub(crate) use systemd::SystemdProcLauncher;
68
69/// Result of launching a proc.
70///
71/// The launcher arranges for terminal status to be delivered on
72/// `exit_rx`. `exit_rx` is the single source of truth for terminal
73/// status.
74#[derive(Debug)]
75pub struct LaunchResult {
76 /// OS process ID if known (`None` for containers/VMs without
77 /// visible PID).
78 pub pid: Option<u32>,
79 /// Captured immediately after spawn succeeds.
80 pub started_at: std::time::SystemTime,
81 /// How stdio is handled.
82 pub stdio: StdioHandling,
83 /// Fires exactly once with the proc terminal result.
84 pub exit_rx: oneshot::Receiver<ProcExitResult>,
85}
86
87/// How proc stdio is handled.
88pub enum StdioHandling {
89 /// Pipes provided; manager can attach StreamFwder / tail
90 /// collection.
91 Captured {
92 stdout: ChildStdout,
93 stderr: ChildStderr,
94 },
95 /// Inherited from parent process (no capture).
96 Inherited,
97 /// Launcher manages logs internally (e.g. Docker log streaming).
98 /// Spawner provides stderr_tail in ProcExitResult. Manager should
99 /// not attach StreamFwder.
100 ManagedByLauncher,
101}
102
103impl fmt::Debug for StdioHandling {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 match self {
106 StdioHandling::Captured { .. } => f.write_str("Captured"),
107 StdioHandling::Inherited => f.write_str("Inherited"),
108 StdioHandling::ManagedByLauncher => f.write_str("ManagedByLauncher"),
109 }
110 }
111}
112
113/// How a proc terminated.
114#[derive(Debug, Clone)]
115pub enum ProcExitKind {
116 /// Normal exit with code.
117 Exited { code: i32 },
118 /// Killed by signal.
119 Signaled { signal: i32, core_dumped: bool },
120 /// Launcher level failure (spawn/wait/plumbing failed).
121 Failed { reason: String },
122}
123
124/// Terminal status of a proc.
125#[derive(Debug, Clone)]
126pub struct ProcExitResult {
127 /// How the proc terminated.
128 pub kind: ProcExitKind,
129 /// Tail of stderr output if available.
130 pub stderr_tail: Option<Vec<String>>,
131}
132
133/// Errors produced by proc launching / termination backends.
134///
135/// For now, this is intentionally lightweight:
136/// - `Launch` preserves an `io::Error` source chain (useful
137/// immediately).
138/// - `Terminate` / `Kill` carry a string for now, since we haven't
139/// committed to a concrete error taxonomy (e.g. nix vs actor RPC vs
140/// container APIs).
141/// - `Other` is a catch-all for plumbing / unexpected failures.
142///
143/// As additional launchers are introduced (e.g. actor-based
144/// spawning), we can refine this into more structured variants
145/// without changing the trait shape.
146#[derive(Debug, thiserror::Error)]
147pub enum ProcLauncherError {
148 /// Failure while launching a proc (e.g. command spawn failure).
149 #[error("launch failed: {0}")]
150 Launch(#[source] std::io::Error),
151 /// Failure while initiating graceful termination.
152 #[error("terminate failed: {0}")]
153 Terminate(String),
154 /// Failure while initiating an immediate kill.
155 #[error("kill failed: {0}")]
156 Kill(String),
157 /// Miscellaneous launcher failure not captured by the other
158 /// variants.
159 ///
160 /// Useful for "shouldn't happen" plumbing errors or
161 /// backend-specific cases we haven't modeled yet.
162 #[error("launcher error: {0}")]
163 Other(String),
164}
165
166/// Per-process CPU/NUMA binding configuration.
167///
168/// When attached to a proc spec, the bootstrap command is wrapped with
169/// `numactl` (on NUMA systems) or `taskset` (Linux fallback) before launch.
170#[derive(Clone, Debug, Default, Serialize, Deserialize, Named)]
171pub struct ProcBind {
172 /// NUMA node for CPU binding (`numactl --cpunodebind`).
173 pub cpunodebind: Option<String>,
174 /// NUMA node for memory binding (`numactl --membind`).
175 pub membind: Option<String>,
176 /// Physical CPU list (`numactl --physcpubind`).
177 pub physcpubind: Option<String>,
178 /// CPU set for taskset fallback (`taskset -c`).
179 pub cpus: Option<String>,
180}
181wirevalue::register_type!(ProcBind);
182
183impl From<HashMap<String, String>> for ProcBind {
184 fn from(map: HashMap<String, String>) -> Self {
185 Self {
186 cpunodebind: map.get("cpunodebind").cloned(),
187 membind: map.get("membind").cloned(),
188 physcpubind: map.get("physcpubind").cloned(),
189 cpus: map.get("cpus").cloned(),
190 }
191 }
192}
193
194/// Per-launch policy computed by the manager and handed to the
195/// launcher.
196///
197/// Motivation: different launch backends (native OS process, docker,
198/// python/actor protocol) need different *mechanics*, but they should
199/// all receive the same *intent* from the manager:
200/// - do we need a stdio stream we can forward / tail?
201/// - if so, how many tail lines should be retained for diagnostics?
202/// - if forwarding over the mesh is enabled, what channel address
203/// should be used (if applicable)?
204
205#[derive(Debug, Clone)]
206pub struct LaunchOptions {
207 /// Serialized Bootstrap payload for
208 /// HYPERACTOR_MESH_BOOTSTRAP_MODE env var.
209 pub bootstrap_payload: String,
210
211 /// Human-readable process name for HYPERACTOR_PROCESS_NAME env
212 /// var.
213 pub process_name: String,
214
215 /// The bootstrap command describing what should be executed for a
216 /// proc.
217 ///
218 /// This is the *payload* of what to run; the launcher decides
219 /// *how* to run it:
220 /// - Native launcher: execs it directly (spawns the command).
221 /// - systemd/container/VM/actor launchers: may treat it as the
222 /// command to run *inside* the launched environment.
223 ///
224 /// Used by backends to construct the actual invocation.
225 pub command: BootstrapCommand,
226
227 /// If true, the manager wants access to a stream (pipes or an
228 /// equivalent).
229 ///
230 /// Native: pipe stdout/stderr (`Stdio::piped()`), return
231 /// `StdioHandling::Captured`. Docker/Python: may implement as log
232 /// streaming, RPC, etc., or ignore and return
233 /// `StdioHandling::ManagedByLauncher` / `Inherited` depending on
234 /// capability.
235 pub want_stdio: bool,
236
237 /// Max number of stderr (and/or stdout) lines retained for
238 /// diagnostics/tailing.
239 ///
240 /// Manager uses this when attaching `StreamFwder` for native
241 /// `Captured` pipes. Backends that manage logs internally may use
242 /// it to decide how much to retain before reporting `stderr_tail`
243 /// in `ProcExitResult`.
244 ///
245 /// `0` means "no tail buffering requested".
246 pub tail_lines: usize,
247
248 /// Optional "forward logs over mesh" address.
249 ///
250 /// This is **provisioned by the manager** when log-forwarding is
251 /// enabled.
252 /// - `None` means "no mesh log forwarding requested".
253 /// - `Some(addr)` means "arrange for child logs to be forwarded
254 /// to `addr`" (if the backend supports it).
255 ///
256 /// Native backend: sets `BOOTSTRAP_LOG_CHANNEL=addr` in the child
257 /// env when `Some`. Other backends may ignore it or use it to
258 /// wire their own forwarding mechanism.
259 pub log_channel: Option<ChannelAddr>,
260
261 /// Optional CPU/NUMA binding for this proc.
262 ///
263 /// Launchers that support binding should apply it using
264 /// backend-appropriate mechanisms (e.g., `numactl` for native,
265 /// unit properties for systemd). Launchers that do not support
266 /// it may ignore this field.
267 pub proc_bind: Option<ProcBind>,
268}
269
270/// Format a human-readable process name for diagnostics and logs.
271///
272/// Used by launchers to set `HYPERACTOR_PROCESS_NAME` environment
273/// variable.
274///
275/// This string is intended for operators (not for stable identity).
276/// We populate [`PROCESS_NAME_ENV`] with it so the launched proc can
277/// include a friendly identifier in logs, crash reports, etc.
278///
279/// Format:
280/// - `ProcId(_, name)` → `<name> @ <host_process_name>`
281///
282/// The host identity is taken from the current process's
283/// `HYPERACTOR_PROCESS_NAME`, falling back to the machine hostname.
284/// This groups procs under their host process in traces and logs.
285pub fn format_process_name(proc_id: &hyperactor_reference::ProcId) -> String {
286 let who = proc_id.name();
287
288 let host = std::env::var(bootstrap::PROCESS_NAME_ENV).unwrap_or_else(|_| {
289 hostname::get()
290 .unwrap_or_else(|_| "unknown_host".into())
291 .into_string()
292 .unwrap_or("unknown_host".to_string())
293 });
294
295 format!("{} @ {}", who, host)
296}
297
298/// Strategy interface for launching and stopping a proc.
299///
300/// This trait is internal to `hyperactor_mesh`:
301/// `BootstrapProcManager` uses it to delegate the mechanics of
302/// starting and stopping a proc while keeping lifecycle tracking
303/// (readiness and terminal status) centralized in the manager.
304///
305/// Contract:
306/// - **Readiness is determined by the bootstrap callback mechanism
307/// (`callback_addr`)**, not by this trait. A proc is considered
308/// ready only when it invokes the callback, regardless of its
309/// underlying execution state.
310/// - **Terminal status is sourced from `LaunchResult::exit_rx`**:
311/// `launch` must return an `exit_rx` that resolves exactly once
312/// with the proc's terminal outcome. Implementations must ensure
313/// `exit_rx` resolves even if setup fails after partial work (for
314/// example, if spawning succeeds but exit monitoring cannot be
315/// established).
316/// - `terminate` and `kill` initiate shutdown and return without
317/// waiting for exit. Callers observe completion by awaiting
318/// `exit_rx` (or a higher-level handle built on it).
319///
320/// **Process tree semantics:** A launched proc may involve wrapper
321/// processes (shell, runtime shims, sanitizers) and/or spawn
322/// descendants. Launchers should treat termination as applying to the
323/// entire launched process tree (e.g., by placing the child in its
324/// own process group and signaling the group). Callers must not
325/// assume the returned PID is the only process that needs
326/// terminating.
327#[async_trait]
328pub trait ProcLauncher: Send + Sync + 'static {
329 /// Launch a proc using the provided bootstrap payload and config.
330 ///
331 /// Implementations must:
332 /// - Start the underlying proc/container/VM.
333 /// - Arrange for `LaunchResult::exit_rx` to resolve exactly once
334 /// with the terminal outcome.
335 /// - Return quickly after starting the proc (readiness is handled
336 /// elsewhere).
337 ///
338 /// `opts` communicates *policy/intent* computed by the manager.
339 /// The launcher uses it to choose backend-specific mechanics
340 /// (pipes vs inherit, log streaming, etc.).
341 async fn launch(
342 &self,
343 proc_id: &hyperactor_reference::ProcId,
344 opts: LaunchOptions,
345 ) -> Result<LaunchResult, ProcLauncherError>;
346
347 /// Initiate graceful termination.
348 ///
349 /// Semantics:
350 /// - Send a graceful termination request (SIGTERM / RPC / API
351 /// call).
352 /// - Schedule escalation to `kill` after `timeout` if
353 /// appropriate.
354 /// - Return immediately; final status is delivered through
355 /// `exit_rx`.
356 ///
357 /// This is a fallback mechanism used when higher-level
358 /// (agent-first) termination cannot be applied or fails.
359 async fn terminate(
360 &self,
361 proc_id: &hyperactor_reference::ProcId,
362 timeout: Duration,
363 ) -> Result<(), ProcLauncherError>;
364
365 /// Initiate a force-kill.
366 ///
367 /// Semantics:
368 /// - Request termination as forcefully as the backend allows.
369 /// - Return immediately; final status is delivered through
370 /// `exit_rx`.
371 ///
372 /// The exact mechanism is backend-specific:
373 /// - **Native**: sends SIGKILL directly to the PID.
374 /// - **Systemd**: calls `StopUnit` (same as `terminate`), which
375 /// sends SIGTERM and escalates to SIGKILL after the unit's
376 /// configured timeout. There is no separate "immediate SIGKILL"
377 /// API in the systemd D-Bus interface without adding `KillUnit`.
378 ///
379 /// **Note**: For backends like systemd, `kill()` currently behaves
380 /// identically to `terminate()`. Callers who need a stronger
381 /// guarantee should await `exit_rx` with a timeout rather than
382 /// assuming immediate termination.
383 ///
384 /// Idempotent behavior is preferred: killing an already-dead proc
385 /// should not be treated as an error unless the backend cannot
386 /// determine state.
387 async fn kill(&self, proc_id: &hyperactor_reference::ProcId) -> Result<(), ProcLauncherError>;
388}