hyperactor_mesh/proc_launcher.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Proc launching abstraction.
10//!
11//! This module defines a small strategy interface, [`ProcLauncher`],
12//! used by
13//! [`BootstrapProcManager`](crate::bootstrap::BootstrapProcManager)
14//! to start and stop procs while keeping lifecycle tracking
15//! centralized in the manager.
16//!
17//! A launcher is responsible for the *mechanics* of running a proc
18//! (native OS process, container, VM, etc.) and for reporting
19//! *terminal status* back to the manager.
20//!
21//! ## Key properties
22//!
23//! - **Readiness is callback-driven.** A proc is considered *ready*
24//! only when it signals readiness via the existing bootstrap
25//! callback (`callback_addr`) in the [`Bootstrap`] payload.
26//! Launchers do not determine readiness.
27//! - **Terminal status is channel-driven.** `launch` returns a
28//! [`LaunchResult`] whose `exit_rx` resolves exactly once with a
29//! [`ProcExitResult`]. This channel is the single source of truth
30//! for how a proc ended.
31//! - **Termination is initiation-only.** [`ProcLauncher::terminate`]
32//! and [`ProcLauncher::kill`] initiate shutdown and return without
33//! waiting for the proc to exit; callers observe completion by
34//! awaiting `exit_rx` (or a higher level handle built on it).
35//!
36//! ## Stdio handling
37//!
38//! [`StdioHandling`] describes whether stdout/stderr are made
39//! available to the manager for forwarding and tail collection
40//! (`Captured`), inherited (`Inherited`), or handled entirely by the
41//! launcher (`ManagedByLauncher`).
42#![allow(dead_code, unused_imports)] // Temporary
43
44use std::collections::HashMap;
45use std::fmt;
46use std::time::Duration;
47
48use async_trait::async_trait;
49use hyperactor::channel::ChannelAddr;
50use serde::Deserialize;
51use serde::Serialize;
52use tokio::process::ChildStderr;
53use tokio::process::ChildStdout;
54use tokio::sync::oneshot;
55use typeuri::Named;
56
57use crate::bootstrap;
58use crate::bootstrap::BootstrapCommand;
59
60mod native;
61pub(crate) use native::NativeProcLauncher;
62
63#[cfg(target_os = "linux")]
64mod systemd;
65#[cfg(target_os = "linux")]
66pub(crate) use systemd::SystemdProcLauncher;
67
68/// Result of launching a proc.
69///
70/// The launcher arranges for terminal status to be delivered on
71/// `exit_rx`. `exit_rx` is the single source of truth for terminal
72/// status.
73#[derive(Debug)]
74pub struct LaunchResult {
75 /// OS process ID if known (`None` for containers/VMs without
76 /// visible PID).
77 pub pid: Option<u32>,
78 /// Captured immediately after spawn succeeds.
79 pub started_at: std::time::SystemTime,
80 /// How stdio is handled.
81 pub stdio: StdioHandling,
82 /// Fires exactly once with the proc terminal result.
83 pub exit_rx: oneshot::Receiver<ProcExitResult>,
84}
85
86/// How proc stdio is handled.
87pub enum StdioHandling {
88 /// Pipes provided; manager can attach StreamFwder / tail
89 /// collection.
90 Captured {
91 stdout: ChildStdout,
92 stderr: ChildStderr,
93 },
94 /// Inherited from parent process (no capture).
95 Inherited,
96 /// Launcher manages logs internally (e.g. Docker log streaming).
97 /// Spawner provides stderr_tail in ProcExitResult. Manager should
98 /// not attach StreamFwder.
99 ManagedByLauncher,
100}
101
102impl fmt::Debug for StdioHandling {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 match self {
105 StdioHandling::Captured { .. } => f.write_str("Captured"),
106 StdioHandling::Inherited => f.write_str("Inherited"),
107 StdioHandling::ManagedByLauncher => f.write_str("ManagedByLauncher"),
108 }
109 }
110}
111
112/// How a proc terminated.
113#[derive(Debug, Clone)]
114pub enum ProcExitKind {
115 /// Normal exit with code.
116 Exited { code: i32 },
117 /// Killed by signal.
118 Signaled { signal: i32, core_dumped: bool },
119 /// Launcher level failure (spawn/wait/plumbing failed).
120 Failed { reason: String },
121}
122
123/// Terminal status of a proc.
124#[derive(Debug, Clone)]
125pub struct ProcExitResult {
126 /// How the proc terminated.
127 pub kind: ProcExitKind,
128 /// Tail of stderr output if available.
129 pub stderr_tail: Option<Vec<String>>,
130}
131
132/// Errors produced by proc launching / termination backends.
133///
134/// For now, this is intentionally lightweight:
135/// - `Launch` preserves an `io::Error` source chain (useful
136/// immediately).
137/// - `Terminate` / `Kill` carry a string for now, since we haven't
138/// committed to a concrete error taxonomy (e.g. nix vs actor RPC vs
139/// container APIs).
140/// - `Other` is a catch-all for plumbing / unexpected failures.
141///
142/// As additional launchers are introduced (e.g. actor-based
143/// spawning), we can refine this into more structured variants
144/// without changing the trait shape.
145#[derive(Debug, thiserror::Error)]
146pub enum ProcLauncherError {
147 /// Failure while launching a proc (e.g. command spawn failure).
148 #[error("launch failed: {0}")]
149 Launch(#[source] std::io::Error),
150 /// Failure while initiating graceful termination.
151 #[error("terminate failed: {0}")]
152 Terminate(String),
153 /// Failure while initiating an immediate kill.
154 #[error("kill failed: {0}")]
155 Kill(String),
156 /// Miscellaneous launcher failure not captured by the other
157 /// variants.
158 ///
159 /// Useful for "shouldn't happen" plumbing errors or
160 /// backend-specific cases we haven't modeled yet.
161 #[error("launcher error: {0}")]
162 Other(String),
163}
164
165/// Per-process CPU/NUMA binding configuration.
166///
167/// When attached to a proc spec, the bootstrap command is wrapped with
168/// `numactl` (on NUMA systems) or `taskset` (Linux fallback) before launch.
169#[derive(Clone, Debug, Default, Serialize, Deserialize, Named)]
170pub struct ProcBind {
171 /// NUMA node for CPU binding (`numactl --cpunodebind`).
172 pub cpunodebind: Option<String>,
173 /// NUMA node for memory binding (`numactl --membind`).
174 pub membind: Option<String>,
175 /// Physical CPU list (`numactl --physcpubind`).
176 pub physcpubind: Option<String>,
177 /// CPU set for taskset fallback (`taskset -c`).
178 pub cpus: Option<String>,
179}
180wirevalue::register_type!(ProcBind);
181
182impl From<HashMap<String, String>> for ProcBind {
183 fn from(map: HashMap<String, String>) -> Self {
184 Self {
185 cpunodebind: map.get("cpunodebind").cloned(),
186 membind: map.get("membind").cloned(),
187 physcpubind: map.get("physcpubind").cloned(),
188 cpus: map.get("cpus").cloned(),
189 }
190 }
191}
192
193/// Per-launch policy computed by the manager and handed to the
194/// launcher.
195///
196/// Motivation: different launch backends (native OS process, docker,
197/// python/actor protocol) need different *mechanics*, but they should
198/// all receive the same *intent* from the manager:
199/// - do we need a stdio stream we can forward / tail?
200/// - if so, how many tail lines should be retained for diagnostics?
201/// - if forwarding over the mesh is enabled, what channel address
202/// should be used (if applicable)?
203
204#[derive(Debug, Clone)]
205pub struct LaunchOptions {
206 /// Serialized Bootstrap payload for
207 /// HYPERACTOR_MESH_BOOTSTRAP_MODE env var.
208 pub bootstrap_payload: String,
209
210 /// Human-readable process name for HYPERACTOR_PROCESS_NAME env
211 /// var.
212 pub process_name: String,
213
214 /// The bootstrap command describing what should be executed for a
215 /// proc.
216 ///
217 /// This is the *payload* of what to run; the launcher decides
218 /// *how* to run it:
219 /// - Native launcher: execs it directly (spawns the command).
220 /// - systemd/container/VM/actor launchers: may treat it as the
221 /// command to run *inside* the launched environment.
222 ///
223 /// Used by backends to construct the actual invocation.
224 pub command: BootstrapCommand,
225
226 /// If true, the manager wants access to a stream (pipes or an
227 /// equivalent).
228 ///
229 /// Native: pipe stdout/stderr (`Stdio::piped()`), return
230 /// `StdioHandling::Captured`. Docker/Python: may implement as log
231 /// streaming, RPC, etc., or ignore and return
232 /// `StdioHandling::ManagedByLauncher` / `Inherited` depending on
233 /// capability.
234 pub want_stdio: bool,
235
236 /// Max number of stderr (and/or stdout) lines retained for
237 /// diagnostics/tailing.
238 ///
239 /// Manager uses this when attaching `StreamFwder` for native
240 /// `Captured` pipes. Backends that manage logs internally may use
241 /// it to decide how much to retain before reporting `stderr_tail`
242 /// in `ProcExitResult`.
243 ///
244 /// `0` means "no tail buffering requested".
245 pub tail_lines: usize,
246
247 /// Optional "forward logs over mesh" address.
248 ///
249 /// This is **provisioned by the manager** when log-forwarding is
250 /// enabled.
251 /// - `None` means "no mesh log forwarding requested".
252 /// - `Some(addr)` means "arrange for child logs to be forwarded
253 /// to `addr`" (if the backend supports it).
254 ///
255 /// Native backend: sets `BOOTSTRAP_LOG_CHANNEL=addr` in the child
256 /// env when `Some`. Other backends may ignore it or use it to
257 /// wire their own forwarding mechanism.
258 pub log_channel: Option<ChannelAddr>,
259
260 /// Optional CPU/NUMA binding for this proc.
261 ///
262 /// Launchers that support binding should apply it using
263 /// backend-appropriate mechanisms (e.g., `numactl` for native,
264 /// unit properties for systemd). Launchers that do not support
265 /// it may ignore this field.
266 pub proc_bind: Option<ProcBind>,
267}
268
269/// Format a human-readable process name for diagnostics and logs.
270///
271/// Used by launchers to set `HYPERACTOR_PROCESS_NAME` environment
272/// variable.
273///
274/// This string is intended for operators (not for stable identity).
275/// We populate [`PROCESS_NAME_ENV`] with it so the launched proc can
276/// include a friendly identifier in logs, crash reports, etc.
277///
278/// Format:
279/// - `ProcAddr(_, name)` → `<name> @ <host_process_name>`
280///
281/// The host identity is taken from the current process's
282/// `HYPERACTOR_PROCESS_NAME`, falling back to the machine hostname.
283/// This groups procs under their host process in traces and logs.
284pub fn format_process_name(proc_id: &hyperactor::ProcAddr) -> String {
285 let who = proc_id
286 .label()
287 .map(|l| l.as_str().to_string())
288 .unwrap_or_else(|| proc_id.id().to_string());
289
290 let host = std::env::var(bootstrap::PROCESS_NAME_ENV).unwrap_or_else(|_| {
291 hostname::get()
292 .unwrap_or_else(|_| "unknown_host".into())
293 .into_string()
294 .unwrap_or("unknown_host".to_string())
295 });
296
297 format!("{} @ {}", who, host)
298}
299
300/// Strategy interface for launching and stopping a proc.
301///
302/// This trait is internal to `hyperactor_mesh`:
303/// `BootstrapProcManager` uses it to delegate the mechanics of
304/// starting and stopping a proc while keeping lifecycle tracking
305/// (readiness and terminal status) centralized in the manager.
306///
307/// Contract:
308/// - **Readiness is determined by the bootstrap callback mechanism
309/// (`callback_addr`)**, not by this trait. A proc is considered
310/// ready only when it invokes the callback, regardless of its
311/// underlying execution state.
312/// - **Terminal status is sourced from `LaunchResult::exit_rx`**:
313/// `launch` must return an `exit_rx` that resolves exactly once
314/// with the proc's terminal outcome. Implementations must ensure
315/// `exit_rx` resolves even if setup fails after partial work (for
316/// example, if spawning succeeds but exit monitoring cannot be
317/// established).
318/// - `terminate` and `kill` initiate shutdown and return without
319/// waiting for exit. Callers observe completion by awaiting
320/// `exit_rx` (or a higher-level handle built on it).
321///
322/// **Process tree semantics:** A launched proc may involve wrapper
323/// processes (shell, runtime shims, sanitizers) and/or spawn
324/// descendants. Launchers should treat termination as applying to the
325/// entire launched process tree (e.g., by placing the child in its
326/// own process group and signaling the group). Callers must not
327/// assume the returned PID is the only process that needs
328/// terminating.
329#[async_trait]
330pub trait ProcLauncher: Send + Sync + 'static {
331 /// Launch a proc using the provided bootstrap payload and config.
332 ///
333 /// Implementations must:
334 /// - Start the underlying proc/container/VM.
335 /// - Arrange for `LaunchResult::exit_rx` to resolve exactly once
336 /// with the terminal outcome.
337 /// - Return quickly after starting the proc (readiness is handled
338 /// elsewhere).
339 ///
340 /// `opts` communicates *policy/intent* computed by the manager.
341 /// The launcher uses it to choose backend-specific mechanics
342 /// (pipes vs inherit, log streaming, etc.).
343 async fn launch(
344 &self,
345 proc_id: &hyperactor::ProcAddr,
346 opts: LaunchOptions,
347 ) -> Result<LaunchResult, ProcLauncherError>;
348
349 /// Initiate graceful termination.
350 ///
351 /// Semantics:
352 /// - Send a graceful termination request (SIGTERM / RPC / API
353 /// call).
354 /// - Schedule escalation to `kill` after `timeout` if
355 /// appropriate.
356 /// - Return immediately; final status is delivered through
357 /// `exit_rx`.
358 ///
359 /// This is a fallback mechanism used when higher-level
360 /// (agent-first) termination cannot be applied or fails.
361 async fn terminate(
362 &self,
363 proc_id: &hyperactor::ProcAddr,
364 timeout: Duration,
365 ) -> Result<(), ProcLauncherError>;
366
367 /// Initiate a force-kill.
368 ///
369 /// Semantics:
370 /// - Request termination as forcefully as the backend allows.
371 /// - Return immediately; final status is delivered through
372 /// `exit_rx`.
373 ///
374 /// The exact mechanism is backend-specific:
375 /// - **Native**: sends SIGKILL directly to the PID.
376 /// - **Systemd**: calls `StopUnit` (same as `terminate`), which
377 /// sends SIGTERM and escalates to SIGKILL after the unit's
378 /// configured timeout. There is no separate "immediate SIGKILL"
379 /// API in the systemd D-Bus interface without adding `KillUnit`.
380 ///
381 /// **Note**: For backends like systemd, `kill()` currently behaves
382 /// identically to `terminate()`. Callers who need a stronger
383 /// guarantee should await `exit_rx` with a timeout rather than
384 /// assuming immediate termination.
385 ///
386 /// Idempotent behavior is preferred: killing an already-dead proc
387 /// should not be treated as an error unless the backend cannot
388 /// determine state.
389 async fn kill(&self, proc_id: &hyperactor::ProcAddr) -> Result<(), ProcLauncherError>;
390}