hyperactor_mesh/
pyspy.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! py-spy integration for remote Python stack dumps.
10//!
11//! See PS-* invariants in `introspect` module doc.
12
13use async_trait::async_trait;
14use hyperactor::Actor;
15use hyperactor::Context;
16use hyperactor::HandleClient;
17use hyperactor::Handler;
18use hyperactor::RefClient;
19use hyperactor::reference as hyperactor_reference;
20use serde::Deserialize;
21use serde::Serialize;
22use typeuri::Named;
23
24use crate::config::MESH_ADMIN_PYSPY_TIMEOUT;
25use crate::config::PYSPY_BIN;
26
27/// Result of a py-spy stack dump request.
28///
29/// See PS-2, PS-4 in `introspect` module doc.
30#[derive(
31    Debug,
32    Clone,
33    PartialEq,
34    Serialize,
35    Deserialize,
36    Named,
37    schemars::JsonSchema
38)]
39pub enum PySpyResult {
40    /// Successful stack dump with structured traces.
41    Ok {
42        /// OS process ID that was dumped.
43        pid: u32,
44        /// Path or name of the py-spy binary that produced the dump.
45        binary: String,
46        /// Per-thread stack traces from py-spy.
47        stack_traces: Vec<PySpyStackTrace>,
48        /// Non-fatal warnings from the capture (e.g., flag
49        /// fallbacks). Empty when the capture completed without
50        /// caveats.
51        warnings: Vec<String>,
52    },
53    /// py-spy binary not found in environment.
54    BinaryNotFound {
55        /// Candidate paths that were tried before giving up.
56        searched: Vec<String>,
57    },
58    /// py-spy exited with an error.
59    Failed {
60        /// OS process ID that was targeted.
61        pid: u32,
62        /// Path or name of the py-spy binary that failed.
63        binary: String,
64        /// Exit code from the py-spy process, if available.
65        exit_code: Option<i32>,
66        /// Captured stderr output.
67        stderr: String,
68    },
69}
70wirevalue::register_type!(PySpyResult);
71
72/// A single thread's stack trace from py-spy `--json` output.
73#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
74pub struct PySpyStackTrace {
75    /// OS process ID that owns this thread.
76    pub pid: i32,
77    /// Python-level thread identifier (`threading.get_ident()`).
78    pub thread_id: u64,
79    /// Python thread name, if set.
80    pub thread_name: Option<String>,
81    /// OS-level thread ID (e.g., `gettid()` on Linux).
82    pub os_thread_id: Option<u64>,
83    /// Whether the thread is actively running (not idle/waiting).
84    pub active: bool,
85    /// Whether the thread currently holds the GIL.
86    pub owns_gil: bool,
87    /// Stack frames, innermost first.
88    pub frames: Vec<PySpyFrame>,
89}
90
91/// A single frame in a py-spy stack trace.
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
93pub struct PySpyFrame {
94    /// Function or method name.
95    pub name: String,
96    /// Absolute path to the source file.
97    pub filename: String,
98    /// Python module name, if known.
99    pub module: Option<String>,
100    /// Basename or abbreviated path.
101    pub short_filename: Option<String>,
102    /// Source line number.
103    pub line: i32,
104    /// Local variables captured in this frame, if available.
105    pub locals: Option<Vec<PySpyLocalVariable>>,
106    /// Whether this frame is an entry point (e.g., module `__main__`).
107    pub is_entry: bool,
108}
109
110/// A local variable captured in a py-spy frame.
111#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
112pub struct PySpyLocalVariable {
113    /// Variable name.
114    pub name: String,
115    /// Memory address of the Python object.
116    pub addr: usize,
117    /// Whether this variable is a function argument.
118    pub arg: bool,
119    /// `repr()` of the value, if captured.
120    pub repr: Option<String>,
121}
122
123/// Options controlling py-spy capture behavior.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct PySpyOpts {
126    /// Include per-thread stacks (`--threads`).
127    pub threads: bool,
128    /// Include native C/C++ frames (`--native`).
129    pub native: bool,
130    /// Include native frames for all threads, not just those with
131    /// Python frames (`--native-all`).
132    pub native_all: bool,
133    /// Use nonblocking mode — py-spy reads without pausing the
134    /// target process (`--nonblocking`). Enables retry logic (PS-10).
135    pub nonblocking: bool,
136}
137
138/// Request a py-spy stack dump from this process.
139///
140/// Both ProcAgent and HostAgent handle this message. The handler
141/// delegates to [`PySpyWorker::spawn_and_forward`] which runs py-spy
142/// against `std::process::id()`.
143///
144/// See PS-1 in `introspect` module doc.
145#[derive(Debug, Serialize, Deserialize, Named, Handler, HandleClient, RefClient)]
146pub struct PySpyDump {
147    /// Capture options (threads, native frames, nonblocking mode).
148    pub opts: PySpyOpts,
149    /// Reply port for the result.
150    #[reply]
151    pub result: hyperactor_reference::OncePortRef<PySpyResult>,
152}
153wirevalue::register_type!(PySpyDump);
154
155/// Runs py-spy against the current process.
156///
157/// See PS-1, PS-3 in `introspect` module doc.
158pub struct PySpyRunner;
159
160impl PySpyRunner {
161    /// Dump Python stacks for this process.
162    ///
163    /// Resolves the py-spy binary (PS-3), attaches to
164    /// `std::process::id()` (PS-1), and returns structured JSON
165    /// output (PS-4).
166    ///
167    /// PS-1 is structurally enforced: `PySpyDump` carries no PID
168    /// field, and this method hardcodes `std::process::id()`. There
169    /// is no code path that could substitute a different PID.
170    pub async fn dump_self(&self, opts: &PySpyOpts) -> PySpyResult {
171        let pid = std::process::id();
172        let pyspy_bin: String = hyperactor_config::global::get_cloned(PYSPY_BIN);
173        let candidates = resolve_candidates(if pyspy_bin.is_empty() {
174            None
175        } else {
176            Some(pyspy_bin)
177        });
178        let mut searched = vec![];
179
180        for (binary, label) in &candidates {
181            searched.push(label.clone());
182            if let Some(result) = try_exec(
183                binary,
184                pid,
185                opts,
186                hyperactor_config::global::get(MESH_ADMIN_PYSPY_TIMEOUT),
187            )
188            .await
189            {
190                return result;
191            }
192        }
193
194        PySpyResult::BinaryNotFound { searched }
195    }
196}
197
198/// Internal message from ProcAgent to a spawned PySpyWorker.
199/// Carries the original caller's reply port so the worker
200/// responds directly without routing back through ProcAgent.
201#[derive(Debug, Serialize, Deserialize, Named)]
202pub struct RunPySpyDump {
203    pub opts: PySpyOpts,
204    /// The original caller's reply port, forwarded from PySpyDump.
205    pub reply_port: hyperactor::reference::OncePortRef<PySpyResult>,
206}
207wirevalue::register_type!(RunPySpyDump);
208
209/// Short-lived child actor that runs py-spy off the ProcAgent
210/// handler path. Spawned per-request; self-terminates after reply.
211/// Concurrent instances are permitted — py-spy attaches read-only
212/// via `process_vm_readv` and multiple concurrent dumps are safe.
213#[hyperactor::export(handlers = [RunPySpyDump])]
214pub struct PySpyWorker;
215
216impl Actor for PySpyWorker {}
217
218impl PySpyWorker {
219    /// Spawn a PySpyWorker, forward the py-spy request, and let
220    /// the worker reply directly to the caller. On spawn failure,
221    /// sends a `Failed` result back via `reply_port`.
222    pub(crate) fn spawn_and_forward(
223        cx: &impl hyperactor::context::Actor,
224        opts: PySpyOpts,
225        reply_port: hyperactor::reference::OncePortRef<PySpyResult>,
226    ) -> Result<(), anyhow::Error> {
227        let worker = match Self.spawn(cx) {
228            Ok(handle) => handle,
229            Err(e) => {
230                let fail = PySpyResult::Failed {
231                    pid: std::process::id(),
232                    binary: String::new(),
233                    exit_code: None,
234                    stderr: format!("failed to spawn pyspy worker: {}", e),
235                };
236                reply_port.send(cx, fail)?;
237                return Ok(());
238            }
239        };
240        // Once reply_port moves into RunPySpyDump, we lose it.
241        // MailboxSenderError does not carry the unsent message, so
242        // on send failure the caller will observe a timeout rather
243        // than an explicit Failed reply.
244        if let Err(e) = worker.send(cx, RunPySpyDump { opts, reply_port }) {
245            tracing::error!("failed to send to pyspy worker: {}", e);
246        }
247        Ok(())
248    }
249}
250
251#[async_trait]
252impl Handler<RunPySpyDump> for PySpyWorker {
253    async fn handle(
254        &mut self,
255        cx: &Context<Self>,
256        message: RunPySpyDump,
257    ) -> Result<(), anyhow::Error> {
258        let result = PySpyRunner.dump_self(&message.opts).await;
259        message.reply_port.send(cx, result)?;
260        cx.stop("pyspy dump complete")?;
261        Ok(())
262    }
263}
264
265/// Return the ordered list of py-spy binary candidates to try.
266/// See PS-3 in `introspect` module doc.
267fn resolve_candidates(pyspy_bin_env: Option<String>) -> Vec<(String, String)> {
268    let mut candidates = vec![];
269    if let Some(path) = pyspy_bin_env {
270        if !path.is_empty() {
271            let label = format!("PYSPY_BIN={}", path);
272            candidates.push((path, label));
273        }
274    }
275    candidates.push(("py-spy".to_string(), "py-spy on PATH".to_string()));
276    candidates
277}
278
279/// Build the py-spy command for a given binary path.
280fn build_command(binary: &str, pid: u32, opts: &PySpyOpts) -> tokio::process::Command {
281    let mut cmd = tokio::process::Command::new(binary);
282    cmd.arg("dump")
283        .arg("--pid")
284        .arg(pid.to_string())
285        .arg("--json");
286    if opts.threads {
287        cmd.arg("--threads");
288    }
289    if opts.native {
290        cmd.arg("--native");
291    }
292    if opts.native_all {
293        cmd.arg("--native-all");
294    }
295    if opts.nonblocking {
296        cmd.arg("--nonblocking");
297    }
298    cmd.stdout(std::process::Stdio::piped());
299    cmd.stderr(std::process::Stdio::piped());
300    cmd
301}
302
303/// Map a process::Output to a PySpyResult, parsing the `--json`
304/// output into structured `PySpyStackTrace` values.
305/// See PS-2, PS-4 in `introspect` module doc.
306fn map_output(output: std::process::Output, pid: u32, binary: &str) -> PySpyResult {
307    if output.status.success() {
308        match serde_json::from_slice::<Vec<PySpyStackTrace>>(&output.stdout) {
309            Ok(stack_traces) => PySpyResult::Ok {
310                pid,
311                binary: binary.to_string(),
312                stack_traces,
313                warnings: vec![],
314            },
315            Err(e) => PySpyResult::Failed {
316                pid,
317                binary: binary.to_string(),
318                exit_code: None,
319                stderr: format!("failed to parse py-spy JSON output: {}", e),
320            },
321        }
322    } else {
323        let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
324        PySpyResult::Failed {
325            pid,
326            binary: binary.to_string(),
327            exit_code: output.status.code(),
328            stderr,
329        }
330    }
331}
332
333/// Returns true if the failure indicates the py-spy binary does not
334/// support `--native-all` (exit code 2, stderr mentions the flag).
335/// Used by `try_exec` to downgrade and retry (PS-11).
336fn is_unsupported_native_all(result: &PySpyResult) -> bool {
337    matches!(
338        result,
339        PySpyResult::Failed {
340            exit_code: Some(2),
341            stderr,
342            ..
343        } if stderr.contains("--native-all")
344    )
345}
346
347/// Result of a single spawn → collect execution step.
348enum ExecOnce {
349    /// py-spy produced a result (success or failure).
350    Result(PySpyResult),
351    /// The binary was not found (NotFound from spawn).
352    NotFound,
353}
354
355/// Spawn the py-spy binary once, collect output, and return the
356/// result. Factored out of `try_exec` so both the normal attempt
357/// path and the PS-11 native-all downgrade path share one
358/// implementation of deadline check → spawn → collect.
359async fn exec_once(
360    binary: &str,
361    pid: u32,
362    opts: &PySpyOpts,
363    deadline: tokio::time::Instant,
364    timeout: std::time::Duration,
365) -> ExecOnce {
366    let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
367    if remaining.is_zero() {
368        return ExecOnce::Result(PySpyResult::Failed {
369            pid,
370            binary: binary.to_string(),
371            exit_code: None,
372            stderr: format!("py-spy subprocess timed out after {}s", timeout.as_secs()),
373        });
374    }
375    let child = match build_command(binary, pid, opts).spawn() {
376        Ok(child) => child,
377        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return ExecOnce::NotFound,
378        Err(e) => {
379            return ExecOnce::Result(PySpyResult::Failed {
380                pid,
381                binary: binary.to_string(),
382                exit_code: None,
383                stderr: format!("failed to execute: {}", e),
384            });
385        }
386    };
387    ExecOnce::Result(collect_with_timeout(child, pid, binary, remaining).await)
388}
389
390/// Try to execute py-spy with the given binary path. Returns `None`
391/// if the binary was not found (NotFound error), allowing the caller
392/// to try the next candidate.
393///
394/// In nonblocking mode, retries up to 3 times with 100ms backoff
395/// because py-spy can segfault reading mutating process memory
396/// (PS-10). All attempts share a single deadline so total wall time
397/// never exceeds the caller's timeout budget (PS-5).
398///
399/// If `native_all` is requested but the py-spy binary does not
400/// support `--native-all` (exit code 2), the flag is dropped and the
401/// command is retried immediately within the same attempt (PS-11a
402/// through PS-11e).
403async fn try_exec(
404    binary: &str,
405    pid: u32,
406    opts: &PySpyOpts,
407    timeout: std::time::Duration,
408) -> Option<PySpyResult> {
409    let deadline = tokio::time::Instant::now() + timeout;
410    let retries = if opts.nonblocking { 3 } else { 1 };
411    let mut last_result = None;
412    let mut effective_opts = opts.clone();
413
414    for attempt in 0..retries {
415        if attempt > 0 {
416            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
417        }
418        let mut result = match exec_once(binary, pid, &effective_opts, deadline, timeout).await {
419            ExecOnce::NotFound => return None,
420            ExecOnce::Result(r) => r,
421        };
422        // PS-11a: py-spy too old for --native-all; downgrade and
423        // retry immediately within the same attempt (PS-11b: no
424        // backoff, no retry slot consumed).
425        if is_unsupported_native_all(&result) && effective_opts.native_all {
426            // PS-11e: sticky downgrade — later outer retries keep
427            // native_all = false.
428            effective_opts.native_all = false;
429            result = match exec_once(binary, pid, &effective_opts, deadline, timeout).await {
430                ExecOnce::NotFound => return None,
431                ExecOnce::Result(r) => r,
432            };
433            // PS-11c: inject warning on successful downgraded result.
434            if let PySpyResult::Ok { warnings, .. } = &mut result {
435                warnings.push(
436                    "--native-all unsupported by this py-spy; fell back to --native".to_string(),
437                );
438            }
439            // PS-11d: if the downgraded retry also failed, fall
440            // through to the normal last_result path below.
441        }
442        match &result {
443            PySpyResult::Ok { .. } => return Some(result),
444            _ => {
445                last_result = Some(result);
446            }
447        }
448    }
449
450    last_result
451}
452
453/// Collect stdout/stderr from a spawned child concurrently with wait,
454/// bounded by `timeout`. On expiry the child is killed and reaped.
455///
456/// Reads stdout/stderr concurrently with wait to avoid pipe-buffer
457/// deadlock. Keeps the `Child` handle alive so we can `start_kill`
458/// and `wait` on timeout for deterministic termination and reaping.
459///
460/// See PS-5 in `introspect` module doc.
461async fn collect_with_timeout(
462    mut child: tokio::process::Child,
463    pid: u32,
464    binary: &str,
465    timeout: std::time::Duration,
466) -> PySpyResult {
467    let mut stdout_handle = child.stdout.take();
468    let mut stderr_handle = child.stderr.take();
469
470    let collect = async {
471        let stdout_fut = async {
472            let mut buf = Vec::new();
473            if let Some(ref mut r) = stdout_handle {
474                let _ = tokio::io::AsyncReadExt::read_to_end(r, &mut buf).await;
475            }
476            buf
477        };
478        let stderr_fut = async {
479            let mut buf = Vec::new();
480            if let Some(ref mut r) = stderr_handle {
481                let _ = tokio::io::AsyncReadExt::read_to_end(r, &mut buf).await;
482            }
483            buf
484        };
485        let (stdout_bytes, stderr_bytes, status) =
486            tokio::join!(stdout_fut, stderr_fut, child.wait());
487        (stdout_bytes, stderr_bytes, status)
488    };
489
490    match tokio::time::timeout(timeout, collect).await {
491        Ok((stdout_bytes, stderr_bytes, Ok(status))) => {
492            let output = std::process::Output {
493                status,
494                stdout: stdout_bytes,
495                stderr: stderr_bytes,
496            };
497            map_output(output, pid, binary)
498        }
499        Ok((_, _, Err(e))) => PySpyResult::Failed {
500            pid,
501            binary: binary.to_string(),
502            exit_code: None,
503            stderr: format!("failed to wait for child: {}", e),
504        },
505        Err(_) => {
506            // Timeout — kill and reap deterministically.
507            let _ = child.start_kill();
508            let _ = child.wait().await;
509            PySpyResult::Failed {
510                pid,
511                binary: binary.to_string(),
512                exit_code: None,
513                stderr: format!("py-spy subprocess timed out after {}s", timeout.as_secs()),
514            }
515        }
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522
523    #[test]
524    fn pyspy_result_wirevalue_roundtrip() {
525        // Regression test: #[serde(skip_serializing_if)] is
526        // incompatible with bincode (positional format). Empty
527        // warnings must still round-trip correctly through wirevalue
528        // Multipart encoding.
529        let original = PySpyResult::Ok {
530            pid: 42,
531            binary: "py-spy".to_string(),
532            stack_traces: vec![PySpyStackTrace {
533                pid: 42,
534                thread_id: 1,
535                thread_name: Some("main".to_string()),
536                os_thread_id: Some(100),
537                active: true,
538                owns_gil: true,
539                frames: vec![PySpyFrame {
540                    name: "do_work".to_string(),
541                    filename: "test.py".to_string(),
542                    module: None,
543                    short_filename: None,
544                    line: 10,
545                    locals: None,
546                    is_entry: false,
547                }],
548            }],
549            warnings: vec![],
550        };
551        let any = wirevalue::Any::serialize(&original).expect("serialize");
552        let restored: PySpyResult = any.deserialized().expect("deserialize");
553        assert_eq!(original, restored);
554    }
555
556    #[test]
557    fn candidates_no_env() {
558        // PS-3: no PYSPY_BIN → only PATH candidate.
559        let candidates = resolve_candidates(None);
560        assert_eq!(candidates.len(), 1);
561        assert_eq!(candidates[0].0, "py-spy");
562        assert_eq!(candidates[0].1, "py-spy on PATH");
563    }
564
565    #[test]
566    fn candidates_env_first_then_path() {
567        // PS-3: PYSPY_BIN first, then PATH.
568        let candidates = resolve_candidates(Some("/custom/py-spy".to_string()));
569        assert_eq!(candidates.len(), 2);
570        assert_eq!(candidates[0].0, "/custom/py-spy");
571        assert!(candidates[0].1.contains("PYSPY_BIN=/custom/py-spy"));
572        assert_eq!(candidates[1].0, "py-spy");
573    }
574
575    #[test]
576    fn candidates_empty_env_ignored() {
577        // PS-3: empty PYSPY_BIN is treated as unset.
578        let candidates = resolve_candidates(Some(String::new()));
579        assert_eq!(candidates.len(), 1);
580        assert_eq!(candidates[0].0, "py-spy");
581    }
582
583    #[test]
584    fn output_success_parses_json() {
585        // PS-4: py-spy --json stdout is parsed into structured traces.
586        let json = serde_json::json!([{
587            "pid": 42,
588            "thread_id": 1234,
589            "thread_name": "MainThread",
590            "os_thread_id": 5678,
591            "active": true,
592            "owns_gil": true,
593            "frames": [{
594                "name": "do_work",
595                "filename": "foo.py",
596                "module": null,
597                "short_filename": null,
598                "line": 10,
599                "locals": null,
600                "is_entry": false
601            }]
602        }]);
603        let output = std::process::Output {
604            status: std::process::ExitStatus::default(),
605            stdout: serde_json::to_vec(&json).unwrap(),
606            stderr: vec![],
607        };
608        let result = map_output(output, 42, "/usr/bin/py-spy");
609        match result {
610            PySpyResult::Ok {
611                pid,
612                binary,
613                stack_traces,
614                ..
615            } => {
616                assert_eq!(pid, 42);
617                assert_eq!(binary, "/usr/bin/py-spy");
618                assert_eq!(stack_traces.len(), 1);
619                assert_eq!(stack_traces[0].thread_id, 1234);
620                assert_eq!(stack_traces[0].thread_name.as_deref(), Some("MainThread"));
621                assert!(stack_traces[0].owns_gil);
622                assert_eq!(stack_traces[0].frames.len(), 1);
623                assert_eq!(stack_traces[0].frames[0].name, "do_work");
624                assert_eq!(stack_traces[0].frames[0].filename, "foo.py");
625                assert_eq!(stack_traces[0].frames[0].line, 10);
626            }
627            other => panic!("expected Ok, got {:?}", other),
628        }
629    }
630
631    #[test]
632    fn output_invalid_json_maps_to_failed() {
633        // PS-4: unparseable JSON maps to Failed.
634        let output = std::process::Output {
635            status: std::process::ExitStatus::default(),
636            stdout: b"not valid json".to_vec(),
637            stderr: vec![],
638        };
639        let result = map_output(output, 42, "py-spy");
640        match result {
641            PySpyResult::Failed { pid, stderr, .. } => {
642                assert_eq!(pid, 42);
643                assert!(
644                    stderr.contains("failed to parse py-spy JSON output"),
645                    "unexpected stderr: {stderr}"
646                );
647            }
648            other => panic!("expected Failed, got {:?}", other),
649        }
650    }
651
652    #[test]
653    fn output_nonzero_exit_maps_to_failed() {
654        // PS-2: nonzero exit → Failed with stderr.
655        use std::os::unix::process::ExitStatusExt;
656        let status = std::process::ExitStatus::from_raw(256); // exit code 1
657        let output = std::process::Output {
658            status,
659            stdout: vec![],
660            stderr: b"Permission denied".to_vec(),
661        };
662        let result = map_output(output, 99, "py-spy");
663        match result {
664            PySpyResult::Failed {
665                pid,
666                binary,
667                exit_code,
668                stderr,
669            } => {
670                assert_eq!(pid, 99);
671                assert_eq!(binary, "py-spy");
672                assert_eq!(exit_code, Some(1));
673                assert_eq!(stderr, "Permission denied");
674            }
675            other => panic!("expected Failed, got {:?}", other),
676        }
677    }
678
679    #[test]
680    fn output_preserves_caller_pid() {
681        // PS-1: pid in result is exactly what the caller passes.
682        let json = serde_json::json!([]);
683        let output = std::process::Output {
684            status: std::process::ExitStatus::default(),
685            stdout: serde_json::to_vec(&json).unwrap(),
686            stderr: vec![],
687        };
688        let result = map_output(output, 12345, "bin");
689        match result {
690            PySpyResult::Ok { pid, .. } => assert_eq!(pid, 12345),
691            other => panic!("expected Ok, got {:?}", other),
692        }
693    }
694
695    fn default_opts() -> PySpyOpts {
696        PySpyOpts {
697            threads: false,
698            native: false,
699            native_all: false,
700            nonblocking: false,
701        }
702    }
703
704    #[tokio::test]
705    async fn exec_missing_binary_returns_none() {
706        // PS-3: NotFound from exec → None (triggers fallback).
707        let result = try_exec(
708            "/definitely/not/a/real/binary",
709            1,
710            &default_opts(),
711            std::time::Duration::from_secs(5),
712        )
713        .await;
714        assert!(result.is_none());
715    }
716
717    #[tokio::test]
718    async fn exec_present_binary_returns_some() {
719        // "true" exits 0 with empty stdout, which is not valid
720        // py-spy JSON. We expect a Failed result from parse error.
721        let result = try_exec(
722            "true",
723            1,
724            &default_opts(),
725            std::time::Duration::from_secs(5),
726        )
727        .await;
728        match result {
729            Some(PySpyResult::Failed { stderr, .. }) => {
730                assert!(
731                    stderr.contains("parse"),
732                    "expected JSON parse error, got: {stderr}"
733                );
734            }
735            other => panic!("expected Some(Failed{{parse..}}), got: {other:?}"),
736        }
737    }
738
739    #[tokio::test]
740    async fn collect_timeout_kills_child_and_returns_failed() {
741        // PS-5: subprocess that hangs past timeout → Failed with
742        // "timed out" message; child is killed and reaped.
743        use tokio::process::Command;
744
745        let child = Command::new("sleep")
746            .arg("100")
747            .stdout(std::process::Stdio::piped())
748            .stderr(std::process::Stdio::piped())
749            .spawn()
750            .expect("sleep must be available");
751
752        let result = collect_with_timeout(
753            child,
754            std::process::id(),
755            "sleep",
756            std::time::Duration::from_millis(100),
757        )
758        .await;
759
760        match result {
761            PySpyResult::Failed { stderr, .. } => {
762                assert!(
763                    stderr.contains("timed out"),
764                    "expected timeout message, got: {stderr}"
765                );
766            }
767            other => panic!("expected Failed, got {:?}", other),
768        }
769    }
770
771    #[tokio::test]
772    async fn exec_failing_binary_returns_failed() {
773        // "false" exists on all unix systems and exits 1.
774        let result = try_exec(
775            "false",
776            42,
777            &default_opts(),
778            std::time::Duration::from_secs(5),
779        )
780        .await;
781        assert!(result.is_some());
782        match result.unwrap() {
783            PySpyResult::Failed {
784                pid,
785                binary,
786                exit_code,
787                ..
788            } => {
789                assert_eq!(pid, 42);
790                assert_eq!(binary, "false");
791                assert!(exit_code.is_some());
792            }
793            other => panic!("expected Failed, got {:?}", other),
794        }
795    }
796
797    /// Write a fake py-spy shell script to a temp file, make it
798    /// executable, and return the path. The script logs each
799    /// invocation's argv to `<script>.log`.
800    ///
801    /// Returns a `TempPath` (not `NamedTempFile`) so the write fd is
802    /// closed before exec — Linux returns ETXTBSY if a file with an
803    /// open write fd is executed.
804    fn write_fake_pyspy(script_body: &str) -> tempfile::TempPath {
805        use std::io::Write;
806        use std::os::unix::fs::PermissionsExt;
807
808        let mut f = tempfile::NamedTempFile::new().expect("create temp file");
809        write!(f, "#!/bin/sh\n{script_body}").expect("write script");
810        f.as_file().sync_all().expect("sync");
811        std::fs::set_permissions(f.path(), std::fs::Permissions::from_mode(0o755))
812            .expect("chmod +x");
813        f.into_temp_path()
814    }
815
816    /// Read the argv log written by the fake script. Each line is one
817    /// invocation's `$@`.
818    fn read_log(script_path: &std::path::Path) -> Vec<String> {
819        let log_path = format!("{}.log", script_path.display());
820        match std::fs::read_to_string(&log_path) {
821            Ok(contents) => contents.lines().map(String::from).collect(),
822            Err(_) => vec![],
823        }
824    }
825
826    #[tokio::test]
827    async fn native_all_downgrade_succeeds() {
828        // PS-11a, PS-11b, PS-11c: unsupported --native-all triggers
829        // immediate downgrade in the same attempt, and the successful
830        // result carries the fallback warning.
831        let script = write_fake_pyspy(
832            r#"
833echo "$@" >> "$0.log"
834for arg in "$@"; do
835    if [ "$arg" = "--native-all" ]; then
836        echo "unrecognized option --native-all" >&2
837        exit 2
838    fi
839done
840echo "[]"
841exit 0
842"#,
843        );
844        let opts = PySpyOpts {
845            threads: false,
846            native: true,
847            native_all: true,
848            nonblocking: false,
849        };
850        let result = try_exec(
851            script.to_str().unwrap(),
852            1,
853            &opts,
854            std::time::Duration::from_secs(5),
855        )
856        .await;
857        // Must succeed with the downgraded result.
858        let result = result.expect("expected Some");
859        match &result {
860            PySpyResult::Ok { warnings, .. } => {
861                assert!(
862                    warnings.iter().any(|w| w.contains("fell back to --native")),
863                    "PS-11c: expected fallback warning, got: {warnings:?}"
864                );
865            }
866            other => panic!("expected Ok, got: {other:?}"),
867        }
868        // Check invocation log.
869        let log = read_log(&script);
870        assert_eq!(
871            log.len(),
872            2,
873            "PS-11b: expected exactly 2 invocations, got {}",
874            log.len()
875        );
876        assert!(
877            log[0].contains("--native-all"),
878            "PS-11a: first invocation must include --native-all, got: {}",
879            log[0]
880        );
881        assert!(
882            !log[1].contains("--native-all"),
883            "PS-11a: second invocation must NOT include --native-all, got: {}",
884            log[1]
885        );
886    }
887
888    #[tokio::test]
889    async fn native_all_downgrade_fails_retries_continue() {
890        // PS-11d, PS-11e: downgraded retry fails, outer nonblocking
891        // retries continue with native_all = false.
892        let script = write_fake_pyspy(
893            r#"
894echo "$@" >> "$0.log"
895for arg in "$@"; do
896    if [ "$arg" = "--native-all" ]; then
897        echo "unrecognized option --native-all" >&2
898        exit 2
899    fi
900done
901echo "Permission denied" >&2
902exit 1
903"#,
904        );
905        let opts = PySpyOpts {
906            threads: false,
907            native: true,
908            native_all: true,
909            nonblocking: true, // 3 outer retries
910        };
911        let result = try_exec(
912            script.to_str().unwrap(),
913            1,
914            &opts,
915            std::time::Duration::from_secs(10),
916        )
917        .await;
918        // Must be a generic failure, not the native-all error.
919        let result = result.expect("expected Some");
920        match &result {
921            PySpyResult::Failed {
922                stderr, exit_code, ..
923            } => {
924                assert!(
925                    stderr.contains("Permission denied"),
926                    "PS-11d: expected generic failure, got: {stderr}"
927                );
928                assert_eq!(*exit_code, Some(1));
929            }
930            other => panic!("expected Failed, got: {other:?}"),
931        }
932        // Check invocation log: 4 calls total.
933        //   Attempt 0: --native-all (fail) → downgrade (fail)
934        //   Attempt 1: without --native-all (fail)
935        //   Attempt 2: without --native-all (fail)
936        let log = read_log(&script);
937        assert_eq!(log.len(), 4, "expected 4 invocations, got {}", log.len());
938        assert!(
939            log[0].contains("--native-all"),
940            "PS-11a: first invocation must include --native-all, got: {}",
941            log[0]
942        );
943        for (i, line) in log[1..].iter().enumerate() {
944            assert!(
945                !line.contains("--native-all"),
946                "PS-11e: invocation {} must NOT include --native-all, got: {}",
947                i + 1,
948                line
949            );
950        }
951    }
952}