monarch_hyperactor/proc_launcher.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Actor-based proc launcher implementation.
10//!
11//! This module provides [`ActorProcLauncher`], a [`ProcLauncher`]
12//! that delegates proc spawning to a Python actor implementing the
13//! `ProcLauncher` ABC from `monarch._src.actor.proc_launcher`.
14//!
15//! This enables custom spawning strategies (Docker, VMs, etc.) while
16//! reusing the existing lifecycle management in
17//! `BootstrapProcManager`.
18
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use hyperactor::ActorHandle;
25use hyperactor::Instance;
26use hyperactor::Mailbox;
27use hyperactor::reference;
28use hyperactor_mesh::proc_launcher::LaunchOptions;
29use hyperactor_mesh::proc_launcher::LaunchResult;
30use hyperactor_mesh::proc_launcher::ProcExitKind;
31use hyperactor_mesh::proc_launcher::ProcExitResult;
32use hyperactor_mesh::proc_launcher::ProcLauncher;
33use hyperactor_mesh::proc_launcher::ProcLauncherError;
34use hyperactor_mesh::proc_launcher::StdioHandling;
35use pyo3::prelude::*;
36use tokio::sync::Mutex;
37use tokio::sync::oneshot;
38
39use crate::actor::MethodSpecifier;
40use crate::actor::PythonActor;
41use crate::actor::PythonMessage;
42use crate::actor::PythonMessageKind;
43use crate::mailbox::EitherPortRef;
44use crate::mailbox::PythonOncePortRef;
45
46/// Python / PyO3 helpers used by the actor-based proc launcher.
47///
48/// This module contains small utilities that:
49/// - format Python objects for diagnostics, and
50/// - perform common imports under the GIL with consistent error
51/// mapping.
52mod py {
53 use hyperactor_mesh::proc_launcher::ProcLauncherError;
54 use pyo3::prelude::*;
55
56 /// Format a Python object for inclusion in error messages.
57 ///
58 /// Prefers `str(obj)` (more user-friendly for exceptions), and
59 /// falls back to `repr(obj)` if `str()` fails. If both fail,
60 /// returns a fixed placeholder.
61 pub(super) fn pyany_to_error_string(obj: &Bound<'_, PyAny>) -> String {
62 obj.str()
63 .map(|s| s.to_string())
64 .or_else(|_| obj.repr().map(|r| r.to_string()))
65 .unwrap_or_else(|_| "<error formatting exception>".into())
66 }
67
68 /// Import the `cloudpickle` module under the GIL.
69 ///
70 /// Errors are mapped into `ProcLauncherError` with context so
71 /// callers can report import failures as protocol/interop errors.
72 pub(super) fn import_cloudpickle(
73 py: Python<'_>,
74 ) -> Result<Bound<'_, pyo3::types::PyModule>, ProcLauncherError> {
75 py.import("cloudpickle")
76 .map_err(|e| ProcLauncherError::Other(format!("import cloudpickle: {e}")))
77 }
78}
79
80/// Decoding of exit results returned by the Python proc spawner.
81///
82/// The spawner replies on an explicit exit port with a pickled
83/// payload. That payload must be a `ProcExitResult` dataclass
84/// (from `monarch._src.actor.proc_launcher`) with the standard
85/// exit-reporting attributes.
86///
87/// Decoding is *strict*: all required attributes must be present
88/// with correct types.
89mod decode {
90 use hyperactor_mesh::proc_launcher::ProcExitKind;
91 use hyperactor_mesh::proc_launcher::ProcExitResult;
92 use hyperactor_mesh::proc_launcher::ProcLauncherError;
93 use pyo3::prelude::*;
94
95 /// Field names for the `ProcExitResult` dataclass attributes.
96 const K_EXIT_CODE: &str = "exit_code";
97 const K_SIGNAL: &str = "signal";
98 const K_CORE_DUMPED: &str = "core_dumped";
99 const K_FAILED_REASON: &str = "failed_reason";
100 const K_STDERR_TAIL: &str = "stderr_tail";
101
102 /// Required attributes for a valid `ProcExitResult` dataclass.
103 const REQUIRED_ATTRS: [&str; 5] = [
104 K_EXIT_CODE,
105 K_SIGNAL,
106 K_CORE_DUMPED,
107 K_FAILED_REASON,
108 K_STDERR_TAIL,
109 ];
110
111 /// Intermediate representation of exit information decoded from
112 /// the Python spawner.
113 ///
114 /// This struct separates:
115 /// (1) parsing/validation of a Python dataclass payload, from
116 /// (2) policy decisions when mapping into [`ProcExitResult`] /
117 /// [`ProcExitKind`].
118 #[derive(Debug)]
119 pub(super) struct DecodedExit {
120 /// Process exit code, if known.
121 pub exit_code: Option<i32>,
122 /// Terminating signal number, if the process was killed by a signal.
123 pub signal: Option<i32>,
124 /// Whether a core dump was produced for a signaled termination.
125 pub core_dumped: bool,
126 /// Failure reason reported by the spawner.
127 pub failed_reason: Option<String>,
128 /// Trailing stderr lines captured by the spawner.
129 pub stderr_tail: Vec<String>,
130 }
131
132 /// Validate that the object has all required attributes.
133 fn validate_shape(obj: &Bound<'_, PyAny>) -> Result<(), ProcLauncherError> {
134 for k in REQUIRED_ATTRS {
135 if !obj
136 .hasattr(k)
137 .map_err(|e| ProcLauncherError::Other(format!("hasattr {k}: {e}")))?
138 {
139 return Err(ProcLauncherError::Other(format!(
140 "ProcExitResult must be a ProcExitResult dataclass; missing attribute {k}"
141 )));
142 }
143 }
144 Ok(())
145 }
146
147 /// Extract fields from a validated `ProcExitResult` dataclass.
148 fn extract_fields(obj: &Bound<'_, PyAny>) -> Result<DecodedExit, ProcLauncherError> {
149 let exit_code = obj
150 .getattr(K_EXIT_CODE)
151 .map_err(|e| ProcLauncherError::Other(format!("getattr {K_EXIT_CODE}: {e}")))?
152 .extract::<Option<i32>>()
153 .map_err(|e| ProcLauncherError::Other(format!("extract {K_EXIT_CODE}: {e}")))?;
154
155 let signal = obj
156 .getattr(K_SIGNAL)
157 .map_err(|e| ProcLauncherError::Other(format!("getattr {K_SIGNAL}: {e}")))?
158 .extract::<Option<i32>>()
159 .map_err(|e| ProcLauncherError::Other(format!("extract {K_SIGNAL}: {e}")))?;
160
161 let core_dumped = obj
162 .getattr(K_CORE_DUMPED)
163 .map_err(|e| ProcLauncherError::Other(format!("getattr {K_CORE_DUMPED}: {e}")))?
164 .extract::<bool>()
165 .map_err(|e| ProcLauncherError::Other(format!("extract {K_CORE_DUMPED}: {e}")))?;
166
167 let failed_reason = obj
168 .getattr(K_FAILED_REASON)
169 .map_err(|e| ProcLauncherError::Other(format!("getattr {K_FAILED_REASON}: {e}")))?
170 .extract::<Option<String>>()
171 .map_err(|e| ProcLauncherError::Other(format!("extract {K_FAILED_REASON}: {e}")))?;
172
173 let stderr_tail = obj
174 .getattr(K_STDERR_TAIL)
175 .map_err(|e| ProcLauncherError::Other(format!("getattr {K_STDERR_TAIL}: {e}")))?
176 .extract::<Vec<String>>()
177 .map_err(|e| ProcLauncherError::Other(format!("extract {K_STDERR_TAIL}: {e}")))?;
178
179 Ok(DecodedExit {
180 exit_code,
181 signal,
182 core_dumped,
183 failed_reason,
184 stderr_tail,
185 })
186 }
187
188 /// Decode exit data from a `ProcExitResult` dataclass.
189 ///
190 /// Decoding is *strict*:
191 /// - All required attributes must exist (missing attributes are protocol errors).
192 /// - Attribute values must have the expected types (or be `None`
193 /// for optional fields).
194 pub(super) fn decode_exit_obj(
195 obj: &Bound<'_, PyAny>,
196 ) -> Result<DecodedExit, ProcLauncherError> {
197 validate_shape(obj)?;
198 extract_fields(obj)
199 }
200
201 /// Convert a decoded Python exit payload into a
202 /// [`ProcExitResult`].
203 ///
204 /// Mapping rules:
205 /// - If `failed_reason` is set, the proc is treated as
206 /// [`ProcExitKind::Failed`].
207 /// - Else if `signal` is set, the proc is treated as
208 /// [`ProcExitKind::Signaled`] (propagating `core_dumped`).
209 /// - Else the proc is treated as [`ProcExitKind::Exited`]; if
210 /// `exit_code` is missing, `-1` is used as a sentinel for
211 /// "unknown".
212 ///
213 /// `stderr_tail` is always populated from the decoded payload
214 /// (possibly empty).
215 fn decoded_to_exit_result(d: DecodedExit) -> ProcExitResult {
216 let kind = if let Some(reason) = d.failed_reason {
217 ProcExitKind::Failed { reason }
218 } else if let Some(sig) = d.signal {
219 ProcExitKind::Signaled {
220 signal: sig,
221 core_dumped: d.core_dumped,
222 }
223 } else {
224 // -1 is a sentinel for "exit_code missing/unknown"
225 ProcExitKind::Exited {
226 code: d.exit_code.unwrap_or(-1),
227 }
228 };
229
230 ProcExitResult {
231 kind,
232 stderr_tail: Some(d.stderr_tail),
233 }
234 }
235
236 /// Map a Python exception value into a failed [`ProcExitResult`].
237 ///
238 /// This is used when the spawner reports an exception (rather than a
239 /// normal exit payload). The exception is formatted using
240 /// [`super::py::pyany_to_error_string`] and embedded in
241 /// [`ProcExitKind::Failed`]. No stderr tail is attached because the
242 /// failure originated in the spawner logic rather than the launched
243 /// process.
244 fn exception_to_exit_result(err_obj: &Bound<'_, PyAny>) -> ProcExitResult {
245 let reason = format!(
246 "spawner raised: {}",
247 super::py::pyany_to_error_string(err_obj)
248 );
249 ProcExitResult {
250 kind: ProcExitKind::Failed { reason },
251 stderr_tail: None,
252 }
253 }
254
255 /// Convert a spawner response [`PythonMessage`] into a
256 /// [`ProcExitResult`].
257 ///
258 /// The spawner replies on the exit port with a pickled payload:
259 /// - [`PythonMessageKind::Result`]: a pickled `ProcExitResult`-shaped
260 /// object (dataclass), which is decoded via [`decode_exit_obj`]
261 /// and mapped with [`decoded_to_exit_result`].
262 /// - [`PythonMessageKind::Exception`]: a pickled exception object,
263 /// which is mapped to [`ProcExitKind::Failed`] via
264 /// [`exception_to_exit_result`].
265 ///
266 /// Messages carrying a pending pickle state are rejected (exit
267 /// results must be fully materialized), and any unexpected message
268 /// kind is treated as a protocol error.
269 pub(super) fn convert_py_exit_result(
270 msg: crate::actor::PythonMessage,
271 ) -> Result<ProcExitResult, ProcLauncherError> {
272 use crate::actor::PythonMessageKind;
273
274 Python::attach(|py| {
275 let cloudpickle = super::py::import_cloudpickle(py)?;
276
277 match msg.kind {
278 PythonMessageKind::Result { .. } => {
279 let obj = cloudpickle
280 .call_method1("loads", (msg.message.to_bytes().as_ref(),))
281 .map_err(|e| ProcLauncherError::Other(format!("cloudpickle.loads: {e}")))?;
282 let decoded = decode_exit_obj(&obj)?;
283 Ok(decoded_to_exit_result(decoded))
284 }
285 PythonMessageKind::Exception { .. } => {
286 let err_obj = cloudpickle
287 .call_method1("loads", (msg.message.to_bytes().as_ref(),))
288 .map_err(|e| {
289 ProcLauncherError::Other(format!("cloudpickle.loads exception: {e}"))
290 })?;
291 Ok(exception_to_exit_result(&err_obj))
292 }
293 _ => Err(ProcLauncherError::Other(
294 "unexpected message kind in exit result".into(),
295 )),
296 }
297 })
298 }
299
300 #[cfg(test)]
301 mod tests {
302 use std::ffi::CStr;
303
304 use super::*;
305
306 // --
307 // Pure Rust tests for decoded_to_exit_result
308
309 // If `failed_reason` is present, it takes priority over
310 // signal/exit_code and the stderr tail is preserved.
311 #[test]
312 fn test_decoded_to_exit_result_failed_reason() {
313 let decoded = DecodedExit {
314 exit_code: Some(1),
315 signal: Some(9),
316 core_dumped: true,
317 failed_reason: Some("spawn failed".into()),
318 stderr_tail: vec!["error line".into()],
319 };
320 let result = decoded_to_exit_result(decoded);
321 // failed_reason takes priority over everything else
322 assert!(matches!(
323 result.kind,
324 ProcExitKind::Failed { reason } if reason == "spawn failed"
325 ));
326 assert_eq!(result.stderr_tail, Some(vec!["error line".into()]));
327 }
328
329 // If `failed_reason` is absent but a signal is present, we
330 // produce a `Signaled` exit result (including the
331 // core_dumped bit).
332 #[test]
333 fn test_decoded_to_exit_result_signal() {
334 let decoded = DecodedExit {
335 exit_code: Some(128 + 9),
336 signal: Some(9),
337 core_dumped: true,
338 failed_reason: None,
339 stderr_tail: vec![],
340 };
341 let result = decoded_to_exit_result(decoded);
342 assert!(matches!(
343 result.kind,
344 ProcExitKind::Signaled {
345 signal: 9,
346 core_dumped: true
347 }
348 ));
349 }
350
351 // If neither `failed_reason` nor `signal` is present, we
352 // produce an `Exited` result using the provided exit code
353 // and preserve stderr tail.
354 #[test]
355 fn test_decoded_to_exit_result_exit_code() {
356 let decoded = DecodedExit {
357 exit_code: Some(42),
358 signal: None,
359 core_dumped: false,
360 failed_reason: None,
361 stderr_tail: vec!["line1".into(), "line2".into()],
362 };
363 let result = decoded_to_exit_result(decoded);
364 assert!(matches!(result.kind, ProcExitKind::Exited { code: 42 }));
365 assert_eq!(
366 result.stderr_tail,
367 Some(vec!["line1".into(), "line2".into()])
368 );
369 }
370
371 // If no `failed_reason`, no `signal`, and `exit_code` is
372 // missing, we use the sentinel `-1` to mean "unknown exit
373 // code".
374 #[test]
375 fn test_decoded_to_exit_result_missing_exit_code_sentinel() {
376 // When no failed_reason, no signal, and no exit_code, we
377 // use -1 sentinel
378 let decoded = DecodedExit {
379 exit_code: None,
380 signal: None,
381 core_dumped: false,
382 failed_reason: None,
383 stderr_tail: vec![],
384 };
385 let result = decoded_to_exit_result(decoded);
386 assert!(matches!(result.kind, ProcExitKind::Exited { code: -1 }));
387 }
388
389 // --
390 // GIL-based tests for validate_shape and decode_exit_obj
391
392 // Helper: Run a small Python snippet and return its locals
393 // dict.
394 //
395 // The snippet should assign any values it wants to assert on
396 // into `locals`, e.g. `obj = FakeExit()`, so Rust can pull
397 // them out by name.
398 fn run_py_code<'py>(py: Python<'py>, code: &CStr) -> Bound<'py, pyo3::types::PyDict> {
399 let locals = pyo3::types::PyDict::new(py);
400 py.run(code, None, Some(&locals)).unwrap();
401 locals
402 }
403
404 // A Python object with all required attributes should pass
405 // shape validation.
406 #[test]
407 fn test_validate_shape_valid_dataclass() {
408 Python::initialize();
409 Python::attach(|py| {
410 // Create a simple class with all required attributes
411 let locals = run_py_code(
412 py,
413 c"
414class FakeExit:
415 exit_code = 0
416 signal = None
417 core_dumped = False
418 failed_reason = None
419 stderr_tail = []
420obj = FakeExit()
421",
422 );
423 let obj = locals.get_item("obj").unwrap().unwrap();
424 assert!(validate_shape(&obj).is_ok());
425 });
426 }
427
428 // Missing required attributes should be rejected, and the
429 // error should mention which attribute is missing to aid
430 // debugging.
431 #[test]
432 fn test_validate_shape_missing_attribute() {
433 Python::initialize();
434 Python::attach(|py| {
435 // Missing stderr_tail
436 let locals = run_py_code(
437 py,
438 c"
439class IncompleteExit:
440 exit_code = 0
441 signal = None
442 core_dumped = False
443 failed_reason = None
444obj = IncompleteExit()
445",
446 );
447 let obj = locals.get_item("obj").unwrap().unwrap();
448 let err = validate_shape(&obj).unwrap_err();
449 assert!(
450 err.to_string().contains("stderr_tail"),
451 "error should mention missing attribute: {err}"
452 );
453 });
454 }
455
456 // A well-formed Python exit object should decode into a
457 // `DecodedExit` with the expected field values.
458 #[test]
459 fn test_decode_exit_obj_valid() {
460 Python::initialize();
461 Python::attach(|py| {
462 let locals = run_py_code(
463 py,
464 c"
465class FakeExit:
466 exit_code = 42
467 signal = None
468 core_dumped = False
469 failed_reason = None
470 stderr_tail = ['line1', 'line2']
471obj = FakeExit()
472",
473 );
474 let obj = locals.get_item("obj").unwrap().unwrap();
475 let decoded = decode_exit_obj(&obj).unwrap();
476 assert_eq!(decoded.exit_code, Some(42));
477 assert_eq!(decoded.signal, None);
478 assert!(!decoded.core_dumped);
479 assert_eq!(decoded.failed_reason, None);
480 assert_eq!(decoded.stderr_tail, vec!["line1", "line2"]);
481 });
482 }
483
484 // Type mismatches in the Python payload should fail
485 // decoding, and the error should mention the field that
486 // could not be extracted.
487 #[test]
488 fn test_decode_exit_obj_wrong_type() {
489 Python::initialize();
490 Python::attach(|py| {
491 // exit_code is a string instead of int
492 let locals = run_py_code(
493 py,
494 c"
495class BadExit:
496 exit_code = 'not an int'
497 signal = None
498 core_dumped = False
499 failed_reason = None
500 stderr_tail = []
501obj = BadExit()
502",
503 );
504 let obj = locals.get_item("obj").unwrap().unwrap();
505 let err = decode_exit_obj(&obj).unwrap_err();
506 assert!(
507 err.to_string().contains("exit_code"),
508 "error should mention field: {err}"
509 );
510 });
511 }
512 }
513}
514
515use decode::convert_py_exit_result;
516use py::import_cloudpickle;
517
518/// A [`ProcLauncher`] implemented by delegating proc lifecycle
519/// operations to a Python actor.
520///
521/// The `spawner` actor must implement the `ProcLauncher` ABC from
522/// `monarch._src.actor.proc_launcher`, and is responsible for
523/// actually spawning and controlling OS processes (Docker, VMs,
524/// etc.). Rust retains the *lifecycle wiring* expected by
525/// [`BootstrapProcManager`]: it initiates launch/terminate/kill
526/// requests and exposes an [`oneshot::Receiver`] (`exit_rx`) that
527/// resolves when the spawner reports exit.
528///
529/// ## Semantics
530///
531/// - **PID is optional**: the Python spawner may not expose a real
532/// PID, so [`LaunchResult::pid`] is `None`.
533/// - **Exit reporting is required**: the spawner must send exactly
534/// one exit result on the provided exit port. If the port is closed
535/// or the payload cannot be decoded, the receiver resolves to a
536/// [`ProcExitKind::Failed`] result.
537/// - **Termination is best-effort**: `terminate` and `kill` are
538/// forwarded to the spawner; success only means the request was
539/// delivered.
540///
541/// ## Context requirement
542///
543/// [`ProcLauncher`] methods don't take a context parameter, but
544/// sending actor messages does. This launcher stores an
545/// [`Instance<()>`] ("client-only" actor) to use as the send context.
546/// The instance is created via [`Proc::instance()`] and must remain
547/// valid for the lifetime of the launcher.
548#[derive(Debug)]
549pub struct ActorProcLauncher {
550 /// Handle to the Python spawner actor that implements the
551 /// ProcLauncher ABC.
552 spawner: ActorHandle<PythonActor>,
553
554 /// Mailbox used to allocate the one-shot exit port per launched
555 /// proc.
556 mailbox: Mailbox,
557
558 /// Client-only actor instance used as the send context for all
559 /// messages to `spawner`.
560 ///
561 /// Created via `Proc::instance()`. The `()` type indicates this
562 /// is not a real actor—just a sending context. Must outlive the
563 /// launcher.
564 instance: Instance<()>,
565
566 /// Debug-only tracking of procs launched via this instance.
567 ///
568 /// Not used for correctness; used for diagnostics and sanity
569 /// checks.
570 active_procs: Arc<Mutex<HashSet<reference::ProcId>>>,
571}
572
573impl ActorProcLauncher {
574 /// Create a new actor-based proc launcher.
575 ///
576 /// # Arguments
577 ///
578 /// * `spawner` - Handle to the Python actor implementing the
579 /// `ProcLauncher` ABC.
580 /// * `mailbox` - Mailbox used to create one-shot exit ports.
581 /// * `instance` - Send context for `ActorHandle::send` (typically
582 /// from `Proc::instance()`). Any valid instance granting send
583 /// capability is sufficient; it need not be
584 /// `Instance<PythonActor>`. Must remain valid for the
585 /// launcher's lifetime.
586 pub fn new(
587 spawner: ActorHandle<PythonActor>,
588 mailbox: Mailbox,
589 instance: Instance<()>,
590 ) -> Self {
591 Self {
592 spawner,
593 mailbox,
594 instance,
595 active_procs: Arc::new(Mutex::new(HashSet::new())),
596 }
597 }
598}
599
600#[async_trait]
601impl ProcLauncher for ActorProcLauncher {
602 /// Spawn a proc by delegating to the Python spawner actor.
603 ///
604 /// This method:
605 /// 1) opens a one-shot mailbox port used for the spawner's exit
606 /// notification,
607 /// 2) serializes `(proc_id, LaunchOptions)` with `cloudpickle`,
608 /// 3) sends a `CallMethod { launch, ExplicitPort(..) }` message
609 /// to the spawner,
610 /// 4) returns immediately with a [`LaunchResult`] whose `exit_rx`
611 /// completes once the spawner reports process termination (or the
612 /// port closes).
613 ///
614 /// ## Notes
615 /// - `pid` is always `None`: the Rust side does not assume an OS
616 /// PID exists.
617 /// - Exit is observed asynchronously via `exit_rx`;
618 /// termination/kill are best-effort requests to the spawner actor
619 /// rather than direct OS signals.
620 /// - If decoding the exit payload fails, the returned `exit_rx`
621 /// resolves to `ProcExitKind::Failed` with a decode error reason.
622 async fn launch(
623 &self,
624 proc_id: &reference::ProcId,
625 opts: LaunchOptions,
626 ) -> Result<LaunchResult, ProcLauncherError> {
627 let (exit_port, exit_port_rx) = self.mailbox.open_once_port::<PythonMessage>();
628
629 let pickled_args = Python::attach(|py| -> Result<Vec<u8>, ProcLauncherError> {
630 let cloudpickle = import_cloudpickle(py)?;
631
632 let mod_ = py
633 .import("monarch._src.actor.proc_launcher")
634 .map_err(|e| ProcLauncherError::Other(format!("import proc_launcher: {e}")))?;
635 let launch_opts_cls = mod_
636 .getattr("LaunchOptions")
637 .map_err(|e| ProcLauncherError::Other(format!("getattr LaunchOptions: {e}")))?;
638
639 let program = opts.command.program.to_str().ok_or_else(|| {
640 ProcLauncherError::Other("program path is not valid UTF-8".into())
641 })?;
642
643 let env = pyo3::types::PyDict::new(py);
644 for (k, v) in &opts.command.env {
645 env.set_item(k, v)
646 .map_err(|e| ProcLauncherError::Other(format!("set env item: {e}")))?;
647 }
648
649 let py_opts = launch_opts_cls
650 .call1((
651 &opts.bootstrap_payload,
652 &opts.process_name,
653 program,
654 opts.command.arg0.as_deref(),
655 &opts.command.args,
656 env,
657 opts.want_stdio,
658 opts.tail_lines,
659 opts.log_channel.as_ref().map(|a| a.to_string()),
660 ))
661 .map_err(|e| ProcLauncherError::Other(format!("construct LaunchOptions: {e}")))?;
662
663 let args = (proc_id.to_string(), py_opts);
664 let kwargs = pyo3::types::PyDict::new(py);
665 let pickled = cloudpickle
666 .call_method1("dumps", ((args, kwargs),))
667 .map_err(|e| ProcLauncherError::Other(format!("cloudpickle: {e}")))?;
668
669 pickled
670 .extract::<Vec<u8>>()
671 .map_err(|e| ProcLauncherError::Other(format!("extract bytes: {e}")))
672 })?;
673
674 let bound_port = exit_port.bind();
675 let message = PythonMessage {
676 kind: PythonMessageKind::CallMethod {
677 name: MethodSpecifier::ExplicitPort {
678 name: "launch".into(),
679 },
680 response_port: Some(EitherPortRef::Once(PythonOncePortRef::from(bound_port))),
681 },
682 message: pickled_args.into(),
683 };
684
685 self.spawner
686 .send(&self.instance, message)
687 .map_err(|e| ProcLauncherError::Other(format!("send to spawner failed: {e}")))?;
688
689 self.active_procs.lock().await.insert(proc_id.clone());
690
691 let (exit_tx, exit_rx) = oneshot::channel();
692 let active_procs = Arc::clone(&self.active_procs);
693 let proc_id_clone = proc_id.clone();
694
695 tokio::spawn(async move {
696 let result = match exit_port_rx.recv().await {
697 Ok(py_message) => {
698 convert_py_exit_result(py_message).unwrap_or_else(|e| ProcExitResult {
699 kind: ProcExitKind::Failed {
700 reason: format!("failed to decode exit result: {e}"),
701 },
702 stderr_tail: None,
703 })
704 }
705 Err(_) => ProcExitResult {
706 kind: ProcExitKind::Failed {
707 reason: "exit port closed (spawner crashed or forgot to send)".into(),
708 },
709 stderr_tail: None,
710 },
711 };
712 active_procs.lock().await.remove(&proc_id_clone);
713 let _ = exit_tx.send(result);
714 });
715
716 Ok(LaunchResult {
717 pid: None,
718 started_at: std::time::SystemTime::now(),
719 stdio: StdioHandling::ManagedByLauncher,
720 exit_rx,
721 })
722 }
723
724 /// Request graceful termination of a proc, with a best-effort
725 /// timeout.
726 ///
727 /// This delegates to the Python spawner actor's
728 /// `terminate(proc_id, timeout_secs)` method. The request is sent
729 /// fire-and-forget: we do not wait for an acknowledgment, and
730 /// there is no guarantee the proc will actually exit within
731 /// `timeout`.
732 ///
733 /// ## Errors
734 ///
735 /// Returns `ProcLauncherError::Terminate` if we fail to:
736 /// - import/serialize the request via `cloudpickle`, or
737 /// - send the message to the spawner actor.
738 async fn terminate(
739 &self,
740 proc_id: &reference::ProcId,
741 timeout: Duration,
742 ) -> Result<(), ProcLauncherError> {
743 let pickled = Python::attach(|py| -> Result<Vec<u8>, ProcLauncherError> {
744 let cloudpickle =
745 import_cloudpickle(py).map_err(|e| ProcLauncherError::Terminate(format!("{e}")))?;
746 let args = (proc_id.to_string(), timeout.as_secs_f64());
747 let kwargs = pyo3::types::PyDict::new(py);
748 cloudpickle
749 .call_method1("dumps", ((args, kwargs),))
750 .map_err(|e| ProcLauncherError::Terminate(format!("cloudpickle: {e}")))?
751 .extract()
752 .map_err(|e| ProcLauncherError::Terminate(format!("extract: {e}")))
753 })?;
754
755 let message = PythonMessage {
756 kind: PythonMessageKind::CallMethod {
757 name: MethodSpecifier::ReturnsResponse {
758 name: "terminate".into(),
759 },
760 response_port: None,
761 },
762 message: pickled.into(),
763 };
764
765 self.spawner
766 .send(&self.instance, message)
767 .map_err(|e| ProcLauncherError::Terminate(format!("send failed: {e}")))
768 }
769
770 /// Forcefully kill a proc.
771 ///
772 /// This delegates to the Python spawner actor's `kill(proc_id)`
773 /// method. Like `terminate`, this is best-effort and
774 /// fire-and-forget: success here means the request was serialized
775 /// and delivered to the spawner actor, not that the process is
776 /// already dead.
777 ///
778 /// ## Errors
779 ///
780 /// Returns `ProcLauncherError::Kill` if we fail to:
781 /// - import/serialize the request via `cloudpickle`, or
782 /// - send the message to the spawner actor.
783 async fn kill(&self, proc_id: &reference::ProcId) -> Result<(), ProcLauncherError> {
784 let pickled = Python::attach(|py| -> Result<Vec<u8>, ProcLauncherError> {
785 let cloudpickle =
786 import_cloudpickle(py).map_err(|e| ProcLauncherError::Kill(format!("{e}")))?;
787 let args = (proc_id.to_string(),);
788 let kwargs = pyo3::types::PyDict::new(py);
789 cloudpickle
790 .call_method1("dumps", ((args, kwargs),))
791 .map_err(|e| ProcLauncherError::Kill(format!("cloudpickle: {e}")))?
792 .extract()
793 .map_err(|e| ProcLauncherError::Kill(format!("extract: {e}")))
794 })?;
795
796 let message = PythonMessage {
797 kind: PythonMessageKind::CallMethod {
798 name: MethodSpecifier::ReturnsResponse {
799 name: "kill".into(),
800 },
801 response_port: None,
802 },
803 message: pickled.into(),
804 };
805
806 self.spawner
807 .send(&self.instance, message)
808 .map_err(|e| ProcLauncherError::Kill(format!("send failed: {e}")))
809 }
810}
811
812#[cfg(test)]
813mod tests {
814 use super::*;
815
816 // Verifies that `pyany_to_error_string` formats Python objects
817 // using `str()` (falling back to `repr()`).
818 #[test]
819 fn test_pyany_to_error_string() {
820 Python::initialize();
821 Python::attach(|py| {
822 // A Python string should round-trip through `str()`
823 // unchanged.
824 let s = pyo3::types::PyString::new(py, "hello");
825 assert_eq!(py::pyany_to_error_string(s.as_any()), "hello");
826
827 // Non-strings should format via `str()` when possible.
828 let i = 42i32.into_pyobject(py).unwrap();
829 assert_eq!(py::pyany_to_error_string(i.as_any()), "42");
830 });
831 }
832}