monarch_hyperactor/
proc_launcher.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Actor-based proc launcher implementation.
10//!
11//! This module provides [`ActorProcLauncher`], a [`ProcLauncher`]
12//! that delegates proc spawning to a Python actor implementing the
13//! `ProcLauncher` ABC from `monarch._src.actor.proc_launcher`.
14//!
15//! This enables custom spawning strategies (Docker, VMs, etc.) while
16//! reusing the existing lifecycle management in
17//! `BootstrapProcManager`.
18
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use hyperactor::ActorHandle;
25use hyperactor::Instance;
26use hyperactor::Mailbox;
27use hyperactor::reference;
28use hyperactor_mesh::proc_launcher::LaunchOptions;
29use hyperactor_mesh::proc_launcher::LaunchResult;
30use hyperactor_mesh::proc_launcher::ProcExitKind;
31use hyperactor_mesh::proc_launcher::ProcExitResult;
32use hyperactor_mesh::proc_launcher::ProcLauncher;
33use hyperactor_mesh::proc_launcher::ProcLauncherError;
34use hyperactor_mesh::proc_launcher::StdioHandling;
35use pyo3::prelude::*;
36use tokio::sync::Mutex;
37use tokio::sync::oneshot;
38
39use crate::actor::MethodSpecifier;
40use crate::actor::PythonActor;
41use crate::actor::PythonMessage;
42use crate::actor::PythonMessageKind;
43use crate::mailbox::EitherPortRef;
44use crate::mailbox::PythonOncePortRef;
45
46/// Python / PyO3 helpers used by the actor-based proc launcher.
47///
48/// This module contains small utilities that:
49/// - format Python objects for diagnostics, and
50/// - perform common imports under the GIL with consistent error
51///   mapping.
52mod py {
53    use hyperactor_mesh::proc_launcher::ProcLauncherError;
54    use pyo3::prelude::*;
55
56    /// Format a Python object for inclusion in error messages.
57    ///
58    /// Prefers `str(obj)` (more user-friendly for exceptions), and
59    /// falls back to `repr(obj)` if `str()` fails. If both fail,
60    /// returns a fixed placeholder.
61    pub(super) fn pyany_to_error_string(obj: &Bound<'_, PyAny>) -> String {
62        obj.str()
63            .map(|s| s.to_string())
64            .or_else(|_| obj.repr().map(|r| r.to_string()))
65            .unwrap_or_else(|_| "<error formatting exception>".into())
66    }
67
68    /// Import the `cloudpickle` module under the GIL.
69    ///
70    /// Errors are mapped into `ProcLauncherError` with context so
71    /// callers can report import failures as protocol/interop errors.
72    pub(super) fn import_cloudpickle(
73        py: Python<'_>,
74    ) -> Result<Bound<'_, pyo3::types::PyModule>, ProcLauncherError> {
75        py.import("cloudpickle")
76            .map_err(|e| ProcLauncherError::Other(format!("import cloudpickle: {e}")))
77    }
78}
79
80/// Decoding of exit results returned by the Python proc spawner.
81///
82/// The spawner replies on an explicit exit port with a pickled
83/// payload. That payload must be a `ProcExitResult` dataclass
84/// (from `monarch._src.actor.proc_launcher`) with the standard
85/// exit-reporting attributes.
86///
87/// Decoding is *strict*: all required attributes must be present
88/// with correct types.
89mod decode {
90    use hyperactor_mesh::proc_launcher::ProcExitKind;
91    use hyperactor_mesh::proc_launcher::ProcExitResult;
92    use hyperactor_mesh::proc_launcher::ProcLauncherError;
93    use pyo3::prelude::*;
94
95    /// Field names for the `ProcExitResult` dataclass attributes.
96    const K_EXIT_CODE: &str = "exit_code";
97    const K_SIGNAL: &str = "signal";
98    const K_CORE_DUMPED: &str = "core_dumped";
99    const K_FAILED_REASON: &str = "failed_reason";
100    const K_STDERR_TAIL: &str = "stderr_tail";
101
102    /// Required attributes for a valid `ProcExitResult` dataclass.
103    const REQUIRED_ATTRS: [&str; 5] = [
104        K_EXIT_CODE,
105        K_SIGNAL,
106        K_CORE_DUMPED,
107        K_FAILED_REASON,
108        K_STDERR_TAIL,
109    ];
110
111    /// Intermediate representation of exit information decoded from
112    /// the Python spawner.
113    ///
114    /// This struct separates:
115    /// (1) parsing/validation of a Python dataclass payload, from
116    /// (2) policy decisions when mapping into [`ProcExitResult`] /
117    ///     [`ProcExitKind`].
118    #[derive(Debug)]
119    pub(super) struct DecodedExit {
120        /// Process exit code, if known.
121        pub exit_code: Option<i32>,
122        /// Terminating signal number, if the process was killed by a signal.
123        pub signal: Option<i32>,
124        /// Whether a core dump was produced for a signaled termination.
125        pub core_dumped: bool,
126        /// Failure reason reported by the spawner.
127        pub failed_reason: Option<String>,
128        /// Trailing stderr lines captured by the spawner.
129        pub stderr_tail: Vec<String>,
130    }
131
132    /// Validate that the object has all required attributes.
133    fn validate_shape(obj: &Bound<'_, PyAny>) -> Result<(), ProcLauncherError> {
134        for k in REQUIRED_ATTRS {
135            if !obj
136                .hasattr(k)
137                .map_err(|e| ProcLauncherError::Other(format!("hasattr {k}: {e}")))?
138            {
139                return Err(ProcLauncherError::Other(format!(
140                    "ProcExitResult must be a ProcExitResult dataclass; missing attribute {k}"
141                )));
142            }
143        }
144        Ok(())
145    }
146
147    /// Extract fields from a validated `ProcExitResult` dataclass.
148    fn extract_fields(obj: &Bound<'_, PyAny>) -> Result<DecodedExit, ProcLauncherError> {
149        let exit_code = obj
150            .getattr(K_EXIT_CODE)
151            .map_err(|e| ProcLauncherError::Other(format!("getattr {K_EXIT_CODE}: {e}")))?
152            .extract::<Option<i32>>()
153            .map_err(|e| ProcLauncherError::Other(format!("extract {K_EXIT_CODE}: {e}")))?;
154
155        let signal = obj
156            .getattr(K_SIGNAL)
157            .map_err(|e| ProcLauncherError::Other(format!("getattr {K_SIGNAL}: {e}")))?
158            .extract::<Option<i32>>()
159            .map_err(|e| ProcLauncherError::Other(format!("extract {K_SIGNAL}: {e}")))?;
160
161        let core_dumped = obj
162            .getattr(K_CORE_DUMPED)
163            .map_err(|e| ProcLauncherError::Other(format!("getattr {K_CORE_DUMPED}: {e}")))?
164            .extract::<bool>()
165            .map_err(|e| ProcLauncherError::Other(format!("extract {K_CORE_DUMPED}: {e}")))?;
166
167        let failed_reason = obj
168            .getattr(K_FAILED_REASON)
169            .map_err(|e| ProcLauncherError::Other(format!("getattr {K_FAILED_REASON}: {e}")))?
170            .extract::<Option<String>>()
171            .map_err(|e| ProcLauncherError::Other(format!("extract {K_FAILED_REASON}: {e}")))?;
172
173        let stderr_tail = obj
174            .getattr(K_STDERR_TAIL)
175            .map_err(|e| ProcLauncherError::Other(format!("getattr {K_STDERR_TAIL}: {e}")))?
176            .extract::<Vec<String>>()
177            .map_err(|e| ProcLauncherError::Other(format!("extract {K_STDERR_TAIL}: {e}")))?;
178
179        Ok(DecodedExit {
180            exit_code,
181            signal,
182            core_dumped,
183            failed_reason,
184            stderr_tail,
185        })
186    }
187
188    /// Decode exit data from a `ProcExitResult` dataclass.
189    ///
190    /// Decoding is *strict*:
191    /// - All required attributes must exist (missing attributes are protocol errors).
192    /// - Attribute values must have the expected types (or be `None`
193    ///   for optional fields).
194    pub(super) fn decode_exit_obj(
195        obj: &Bound<'_, PyAny>,
196    ) -> Result<DecodedExit, ProcLauncherError> {
197        validate_shape(obj)?;
198        extract_fields(obj)
199    }
200
201    /// Convert a decoded Python exit payload into a
202    /// [`ProcExitResult`].
203    ///
204    /// Mapping rules:
205    /// - If `failed_reason` is set, the proc is treated as
206    ///   [`ProcExitKind::Failed`].
207    /// - Else if `signal` is set, the proc is treated as
208    ///   [`ProcExitKind::Signaled`] (propagating `core_dumped`).
209    /// - Else the proc is treated as [`ProcExitKind::Exited`]; if
210    ///   `exit_code` is missing, `-1` is used as a sentinel for
211    ///   "unknown".
212    ///
213    /// `stderr_tail` is always populated from the decoded payload
214    /// (possibly empty).
215    fn decoded_to_exit_result(d: DecodedExit) -> ProcExitResult {
216        let kind = if let Some(reason) = d.failed_reason {
217            ProcExitKind::Failed { reason }
218        } else if let Some(sig) = d.signal {
219            ProcExitKind::Signaled {
220                signal: sig,
221                core_dumped: d.core_dumped,
222            }
223        } else {
224            // -1 is a sentinel for "exit_code missing/unknown"
225            ProcExitKind::Exited {
226                code: d.exit_code.unwrap_or(-1),
227            }
228        };
229
230        ProcExitResult {
231            kind,
232            stderr_tail: Some(d.stderr_tail),
233        }
234    }
235
236    /// Map a Python exception value into a failed [`ProcExitResult`].
237    ///
238    /// This is used when the spawner reports an exception (rather than a
239    /// normal exit payload). The exception is formatted using
240    /// [`super::py::pyany_to_error_string`] and embedded in
241    /// [`ProcExitKind::Failed`]. No stderr tail is attached because the
242    /// failure originated in the spawner logic rather than the launched
243    /// process.
244    fn exception_to_exit_result(err_obj: &Bound<'_, PyAny>) -> ProcExitResult {
245        let reason = format!(
246            "spawner raised: {}",
247            super::py::pyany_to_error_string(err_obj)
248        );
249        ProcExitResult {
250            kind: ProcExitKind::Failed { reason },
251            stderr_tail: None,
252        }
253    }
254
255    /// Convert a spawner response [`PythonMessage`] into a
256    /// [`ProcExitResult`].
257    ///
258    /// The spawner replies on the exit port with a pickled payload:
259    /// - [`PythonMessageKind::Result`]: a pickled `ProcExitResult`-shaped
260    ///   object (dataclass), which is decoded via [`decode_exit_obj`]
261    ///   and mapped with [`decoded_to_exit_result`].
262    /// - [`PythonMessageKind::Exception`]: a pickled exception object,
263    ///   which is mapped to [`ProcExitKind::Failed`] via
264    ///   [`exception_to_exit_result`].
265    ///
266    /// Messages carrying a pending pickle state are rejected (exit
267    /// results must be fully materialized), and any unexpected message
268    /// kind is treated as a protocol error.
269    pub(super) fn convert_py_exit_result(
270        msg: crate::actor::PythonMessage,
271    ) -> Result<ProcExitResult, ProcLauncherError> {
272        use crate::actor::PythonMessageKind;
273
274        Python::attach(|py| {
275            let cloudpickle = super::py::import_cloudpickle(py)?;
276
277            match msg.kind {
278                PythonMessageKind::Result { .. } => {
279                    let obj = cloudpickle
280                        .call_method1("loads", (msg.message.to_bytes().as_ref(),))
281                        .map_err(|e| ProcLauncherError::Other(format!("cloudpickle.loads: {e}")))?;
282                    let decoded = decode_exit_obj(&obj)?;
283                    Ok(decoded_to_exit_result(decoded))
284                }
285                PythonMessageKind::Exception { .. } => {
286                    let err_obj = cloudpickle
287                        .call_method1("loads", (msg.message.to_bytes().as_ref(),))
288                        .map_err(|e| {
289                            ProcLauncherError::Other(format!("cloudpickle.loads exception: {e}"))
290                        })?;
291                    Ok(exception_to_exit_result(&err_obj))
292                }
293                _ => Err(ProcLauncherError::Other(
294                    "unexpected message kind in exit result".into(),
295                )),
296            }
297        })
298    }
299
300    #[cfg(test)]
301    mod tests {
302        use std::ffi::CStr;
303
304        use super::*;
305
306        // --
307        // Pure Rust tests for decoded_to_exit_result
308
309        // If `failed_reason` is present, it takes priority over
310        // signal/exit_code and the stderr tail is preserved.
311        #[test]
312        fn test_decoded_to_exit_result_failed_reason() {
313            let decoded = DecodedExit {
314                exit_code: Some(1),
315                signal: Some(9),
316                core_dumped: true,
317                failed_reason: Some("spawn failed".into()),
318                stderr_tail: vec!["error line".into()],
319            };
320            let result = decoded_to_exit_result(decoded);
321            // failed_reason takes priority over everything else
322            assert!(matches!(
323                result.kind,
324                ProcExitKind::Failed { reason } if reason == "spawn failed"
325            ));
326            assert_eq!(result.stderr_tail, Some(vec!["error line".into()]));
327        }
328
329        // If `failed_reason` is absent but a signal is present, we
330        // produce a `Signaled` exit result (including the
331        // core_dumped bit).
332        #[test]
333        fn test_decoded_to_exit_result_signal() {
334            let decoded = DecodedExit {
335                exit_code: Some(128 + 9),
336                signal: Some(9),
337                core_dumped: true,
338                failed_reason: None,
339                stderr_tail: vec![],
340            };
341            let result = decoded_to_exit_result(decoded);
342            assert!(matches!(
343                result.kind,
344                ProcExitKind::Signaled {
345                    signal: 9,
346                    core_dumped: true
347                }
348            ));
349        }
350
351        // If neither `failed_reason` nor `signal` is present, we
352        // produce an `Exited` result using the provided exit code
353        // and preserve stderr tail.
354        #[test]
355        fn test_decoded_to_exit_result_exit_code() {
356            let decoded = DecodedExit {
357                exit_code: Some(42),
358                signal: None,
359                core_dumped: false,
360                failed_reason: None,
361                stderr_tail: vec!["line1".into(), "line2".into()],
362            };
363            let result = decoded_to_exit_result(decoded);
364            assert!(matches!(result.kind, ProcExitKind::Exited { code: 42 }));
365            assert_eq!(
366                result.stderr_tail,
367                Some(vec!["line1".into(), "line2".into()])
368            );
369        }
370
371        // If no `failed_reason`, no `signal`, and `exit_code` is
372        // missing, we use the sentinel `-1` to mean "unknown exit
373        // code".
374        #[test]
375        fn test_decoded_to_exit_result_missing_exit_code_sentinel() {
376            // When no failed_reason, no signal, and no exit_code, we
377            // use -1 sentinel
378            let decoded = DecodedExit {
379                exit_code: None,
380                signal: None,
381                core_dumped: false,
382                failed_reason: None,
383                stderr_tail: vec![],
384            };
385            let result = decoded_to_exit_result(decoded);
386            assert!(matches!(result.kind, ProcExitKind::Exited { code: -1 }));
387        }
388
389        // --
390        // GIL-based tests for validate_shape and decode_exit_obj
391
392        // Helper: Run a small Python snippet and return its locals
393        // dict.
394        //
395        // The snippet should assign any values it wants to assert on
396        // into `locals`, e.g. `obj = FakeExit()`, so Rust can pull
397        // them out by name.
398        fn run_py_code<'py>(py: Python<'py>, code: &CStr) -> Bound<'py, pyo3::types::PyDict> {
399            let locals = pyo3::types::PyDict::new(py);
400            py.run(code, None, Some(&locals)).unwrap();
401            locals
402        }
403
404        // A Python object with all required attributes should pass
405        // shape validation.
406        #[test]
407        fn test_validate_shape_valid_dataclass() {
408            Python::initialize();
409            Python::attach(|py| {
410                // Create a simple class with all required attributes
411                let locals = run_py_code(
412                    py,
413                    c"
414class FakeExit:
415    exit_code = 0
416    signal = None
417    core_dumped = False
418    failed_reason = None
419    stderr_tail = []
420obj = FakeExit()
421",
422                );
423                let obj = locals.get_item("obj").unwrap().unwrap();
424                assert!(validate_shape(&obj).is_ok());
425            });
426        }
427
428        // Missing required attributes should be rejected, and the
429        // error should mention which attribute is missing to aid
430        // debugging.
431        #[test]
432        fn test_validate_shape_missing_attribute() {
433            Python::initialize();
434            Python::attach(|py| {
435                // Missing stderr_tail
436                let locals = run_py_code(
437                    py,
438                    c"
439class IncompleteExit:
440    exit_code = 0
441    signal = None
442    core_dumped = False
443    failed_reason = None
444obj = IncompleteExit()
445",
446                );
447                let obj = locals.get_item("obj").unwrap().unwrap();
448                let err = validate_shape(&obj).unwrap_err();
449                assert!(
450                    err.to_string().contains("stderr_tail"),
451                    "error should mention missing attribute: {err}"
452                );
453            });
454        }
455
456        // A well-formed Python exit object should decode into a
457        // `DecodedExit` with the expected field values.
458        #[test]
459        fn test_decode_exit_obj_valid() {
460            Python::initialize();
461            Python::attach(|py| {
462                let locals = run_py_code(
463                    py,
464                    c"
465class FakeExit:
466    exit_code = 42
467    signal = None
468    core_dumped = False
469    failed_reason = None
470    stderr_tail = ['line1', 'line2']
471obj = FakeExit()
472",
473                );
474                let obj = locals.get_item("obj").unwrap().unwrap();
475                let decoded = decode_exit_obj(&obj).unwrap();
476                assert_eq!(decoded.exit_code, Some(42));
477                assert_eq!(decoded.signal, None);
478                assert!(!decoded.core_dumped);
479                assert_eq!(decoded.failed_reason, None);
480                assert_eq!(decoded.stderr_tail, vec!["line1", "line2"]);
481            });
482        }
483
484        // Type mismatches in the Python payload should fail
485        // decoding, and the error should mention the field that
486        // could not be extracted.
487        #[test]
488        fn test_decode_exit_obj_wrong_type() {
489            Python::initialize();
490            Python::attach(|py| {
491                // exit_code is a string instead of int
492                let locals = run_py_code(
493                    py,
494                    c"
495class BadExit:
496    exit_code = 'not an int'
497    signal = None
498    core_dumped = False
499    failed_reason = None
500    stderr_tail = []
501obj = BadExit()
502",
503                );
504                let obj = locals.get_item("obj").unwrap().unwrap();
505                let err = decode_exit_obj(&obj).unwrap_err();
506                assert!(
507                    err.to_string().contains("exit_code"),
508                    "error should mention field: {err}"
509                );
510            });
511        }
512    }
513}
514
515use decode::convert_py_exit_result;
516use py::import_cloudpickle;
517
518/// A [`ProcLauncher`] implemented by delegating proc lifecycle
519/// operations to a Python actor.
520///
521/// The `spawner` actor must implement the `ProcLauncher` ABC from
522/// `monarch._src.actor.proc_launcher`, and is responsible for
523/// actually spawning and controlling OS processes (Docker, VMs,
524/// etc.). Rust retains the *lifecycle wiring* expected by
525/// [`BootstrapProcManager`]: it initiates launch/terminate/kill
526/// requests and exposes an [`oneshot::Receiver`] (`exit_rx`) that
527/// resolves when the spawner reports exit.
528///
529/// ## Semantics
530///
531/// - **PID is optional**: the Python spawner may not expose a real
532///   PID, so [`LaunchResult::pid`] is `None`.
533/// - **Exit reporting is required**: the spawner must send exactly
534///   one exit result on the provided exit port. If the port is closed
535///   or the payload cannot be decoded, the receiver resolves to a
536///   [`ProcExitKind::Failed`] result.
537/// - **Termination is best-effort**: `terminate` and `kill` are
538///   forwarded to the spawner; success only means the request was
539///   delivered.
540///
541/// ## Context requirement
542///
543/// [`ProcLauncher`] methods don't take a context parameter, but
544/// sending actor messages does. This launcher stores an
545/// [`Instance<()>`] ("client-only" actor) to use as the send context.
546/// The instance is created via [`Proc::instance()`] and must remain
547/// valid for the lifetime of the launcher.
548#[derive(Debug)]
549pub struct ActorProcLauncher {
550    /// Handle to the Python spawner actor that implements the
551    /// ProcLauncher ABC.
552    spawner: ActorHandle<PythonActor>,
553
554    /// Mailbox used to allocate the one-shot exit port per launched
555    /// proc.
556    mailbox: Mailbox,
557
558    /// Client-only actor instance used as the send context for all
559    /// messages to `spawner`.
560    ///
561    /// Created via `Proc::instance()`. The `()` type indicates this
562    /// is not a real actor—just a sending context. Must outlive the
563    /// launcher.
564    instance: Instance<()>,
565
566    /// Debug-only tracking of procs launched via this instance.
567    ///
568    /// Not used for correctness; used for diagnostics and sanity
569    /// checks.
570    active_procs: Arc<Mutex<HashSet<reference::ProcId>>>,
571}
572
573impl ActorProcLauncher {
574    /// Create a new actor-based proc launcher.
575    ///
576    /// # Arguments
577    ///
578    /// * `spawner` - Handle to the Python actor implementing the
579    ///   `ProcLauncher` ABC.
580    /// * `mailbox` - Mailbox used to create one-shot exit ports.
581    /// * `instance` - Send context for `ActorHandle::send` (typically
582    ///   from `Proc::instance()`). Any valid instance granting send
583    ///   capability is sufficient; it need not be
584    ///   `Instance<PythonActor>`. Must remain valid for the
585    ///   launcher's lifetime.
586    pub fn new(
587        spawner: ActorHandle<PythonActor>,
588        mailbox: Mailbox,
589        instance: Instance<()>,
590    ) -> Self {
591        Self {
592            spawner,
593            mailbox,
594            instance,
595            active_procs: Arc::new(Mutex::new(HashSet::new())),
596        }
597    }
598}
599
600#[async_trait]
601impl ProcLauncher for ActorProcLauncher {
602    /// Spawn a proc by delegating to the Python spawner actor.
603    ///
604    /// This method:
605    /// 1) opens a one-shot mailbox port used for the spawner's exit
606    ///    notification,
607    /// 2) serializes `(proc_id, LaunchOptions)` with `cloudpickle`,
608    /// 3) sends a `CallMethod { launch, ExplicitPort(..) }` message
609    ///    to the spawner,
610    /// 4) returns immediately with a [`LaunchResult`] whose `exit_rx`
611    ///    completes once the spawner reports process termination (or the
612    ///    port closes).
613    ///
614    /// ## Notes
615    /// - `pid` is always `None`: the Rust side does not assume an OS
616    ///   PID exists.
617    /// - Exit is observed asynchronously via `exit_rx`;
618    ///   termination/kill are best-effort requests to the spawner actor
619    ///   rather than direct OS signals.
620    /// - If decoding the exit payload fails, the returned `exit_rx`
621    ///   resolves to `ProcExitKind::Failed` with a decode error reason.
622    async fn launch(
623        &self,
624        proc_id: &reference::ProcId,
625        opts: LaunchOptions,
626    ) -> Result<LaunchResult, ProcLauncherError> {
627        let (exit_port, exit_port_rx) = self.mailbox.open_once_port::<PythonMessage>();
628
629        let pickled_args = Python::attach(|py| -> Result<Vec<u8>, ProcLauncherError> {
630            let cloudpickle = import_cloudpickle(py)?;
631
632            let mod_ = py
633                .import("monarch._src.actor.proc_launcher")
634                .map_err(|e| ProcLauncherError::Other(format!("import proc_launcher: {e}")))?;
635            let launch_opts_cls = mod_
636                .getattr("LaunchOptions")
637                .map_err(|e| ProcLauncherError::Other(format!("getattr LaunchOptions: {e}")))?;
638
639            let program = opts.command.program.to_str().ok_or_else(|| {
640                ProcLauncherError::Other("program path is not valid UTF-8".into())
641            })?;
642
643            let env = pyo3::types::PyDict::new(py);
644            for (k, v) in &opts.command.env {
645                env.set_item(k, v)
646                    .map_err(|e| ProcLauncherError::Other(format!("set env item: {e}")))?;
647            }
648
649            let py_opts = launch_opts_cls
650                .call1((
651                    &opts.bootstrap_payload,
652                    &opts.process_name,
653                    program,
654                    opts.command.arg0.as_deref(),
655                    &opts.command.args,
656                    env,
657                    opts.want_stdio,
658                    opts.tail_lines,
659                    opts.log_channel.as_ref().map(|a| a.to_string()),
660                ))
661                .map_err(|e| ProcLauncherError::Other(format!("construct LaunchOptions: {e}")))?;
662
663            let args = (proc_id.to_string(), py_opts);
664            let kwargs = pyo3::types::PyDict::new(py);
665            let pickled = cloudpickle
666                .call_method1("dumps", ((args, kwargs),))
667                .map_err(|e| ProcLauncherError::Other(format!("cloudpickle: {e}")))?;
668
669            pickled
670                .extract::<Vec<u8>>()
671                .map_err(|e| ProcLauncherError::Other(format!("extract bytes: {e}")))
672        })?;
673
674        let bound_port = exit_port.bind();
675        let message = PythonMessage {
676            kind: PythonMessageKind::CallMethod {
677                name: MethodSpecifier::ExplicitPort {
678                    name: "launch".into(),
679                },
680                response_port: Some(EitherPortRef::Once(PythonOncePortRef::from(bound_port))),
681            },
682            message: pickled_args.into(),
683        };
684
685        self.spawner
686            .send(&self.instance, message)
687            .map_err(|e| ProcLauncherError::Other(format!("send to spawner failed: {e}")))?;
688
689        self.active_procs.lock().await.insert(proc_id.clone());
690
691        let (exit_tx, exit_rx) = oneshot::channel();
692        let active_procs = Arc::clone(&self.active_procs);
693        let proc_id_clone = proc_id.clone();
694
695        tokio::spawn(async move {
696            let result = match exit_port_rx.recv().await {
697                Ok(py_message) => {
698                    convert_py_exit_result(py_message).unwrap_or_else(|e| ProcExitResult {
699                        kind: ProcExitKind::Failed {
700                            reason: format!("failed to decode exit result: {e}"),
701                        },
702                        stderr_tail: None,
703                    })
704                }
705                Err(_) => ProcExitResult {
706                    kind: ProcExitKind::Failed {
707                        reason: "exit port closed (spawner crashed or forgot to send)".into(),
708                    },
709                    stderr_tail: None,
710                },
711            };
712            active_procs.lock().await.remove(&proc_id_clone);
713            let _ = exit_tx.send(result);
714        });
715
716        Ok(LaunchResult {
717            pid: None,
718            started_at: std::time::SystemTime::now(),
719            stdio: StdioHandling::ManagedByLauncher,
720            exit_rx,
721        })
722    }
723
724    /// Request graceful termination of a proc, with a best-effort
725    /// timeout.
726    ///
727    /// This delegates to the Python spawner actor's
728    /// `terminate(proc_id, timeout_secs)` method. The request is sent
729    /// fire-and-forget: we do not wait for an acknowledgment, and
730    /// there is no guarantee the proc will actually exit within
731    /// `timeout`.
732    ///
733    /// ## Errors
734    ///
735    /// Returns `ProcLauncherError::Terminate` if we fail to:
736    /// - import/serialize the request via `cloudpickle`, or
737    /// - send the message to the spawner actor.
738    async fn terminate(
739        &self,
740        proc_id: &reference::ProcId,
741        timeout: Duration,
742    ) -> Result<(), ProcLauncherError> {
743        let pickled = Python::attach(|py| -> Result<Vec<u8>, ProcLauncherError> {
744            let cloudpickle =
745                import_cloudpickle(py).map_err(|e| ProcLauncherError::Terminate(format!("{e}")))?;
746            let args = (proc_id.to_string(), timeout.as_secs_f64());
747            let kwargs = pyo3::types::PyDict::new(py);
748            cloudpickle
749                .call_method1("dumps", ((args, kwargs),))
750                .map_err(|e| ProcLauncherError::Terminate(format!("cloudpickle: {e}")))?
751                .extract()
752                .map_err(|e| ProcLauncherError::Terminate(format!("extract: {e}")))
753        })?;
754
755        let message = PythonMessage {
756            kind: PythonMessageKind::CallMethod {
757                name: MethodSpecifier::ReturnsResponse {
758                    name: "terminate".into(),
759                },
760                response_port: None,
761            },
762            message: pickled.into(),
763        };
764
765        self.spawner
766            .send(&self.instance, message)
767            .map_err(|e| ProcLauncherError::Terminate(format!("send failed: {e}")))
768    }
769
770    /// Forcefully kill a proc.
771    ///
772    /// This delegates to the Python spawner actor's `kill(proc_id)`
773    /// method. Like `terminate`, this is best-effort and
774    /// fire-and-forget: success here means the request was serialized
775    /// and delivered to the spawner actor, not that the process is
776    /// already dead.
777    ///
778    /// ## Errors
779    ///
780    /// Returns `ProcLauncherError::Kill` if we fail to:
781    /// - import/serialize the request via `cloudpickle`, or
782    /// - send the message to the spawner actor.
783    async fn kill(&self, proc_id: &reference::ProcId) -> Result<(), ProcLauncherError> {
784        let pickled = Python::attach(|py| -> Result<Vec<u8>, ProcLauncherError> {
785            let cloudpickle =
786                import_cloudpickle(py).map_err(|e| ProcLauncherError::Kill(format!("{e}")))?;
787            let args = (proc_id.to_string(),);
788            let kwargs = pyo3::types::PyDict::new(py);
789            cloudpickle
790                .call_method1("dumps", ((args, kwargs),))
791                .map_err(|e| ProcLauncherError::Kill(format!("cloudpickle: {e}")))?
792                .extract()
793                .map_err(|e| ProcLauncherError::Kill(format!("extract: {e}")))
794        })?;
795
796        let message = PythonMessage {
797            kind: PythonMessageKind::CallMethod {
798                name: MethodSpecifier::ReturnsResponse {
799                    name: "kill".into(),
800                },
801                response_port: None,
802            },
803            message: pickled.into(),
804        };
805
806        self.spawner
807            .send(&self.instance, message)
808            .map_err(|e| ProcLauncherError::Kill(format!("send failed: {e}")))
809    }
810}
811
812#[cfg(test)]
813mod tests {
814    use super::*;
815
816    // Verifies that `pyany_to_error_string` formats Python objects
817    // using `str()` (falling back to `repr()`).
818    #[test]
819    fn test_pyany_to_error_string() {
820        Python::initialize();
821        Python::attach(|py| {
822            // A Python string should round-trip through `str()`
823            // unchanged.
824            let s = pyo3::types::PyString::new(py, "hello");
825            assert_eq!(py::pyany_to_error_string(s.as_any()), "hello");
826
827            // Non-strings should format via `str()` when possible.
828            let i = 42i32.into_pyobject(py).unwrap();
829            assert_eq!(py::pyany_to_error_string(i.as_any()), "42");
830        });
831    }
832}