Skip to main content

hyperactor_mesh/
pyspy.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! py-spy integration for remote Python stack dumps and profiles.
10//!
11//! See PS-* and PP-* invariants in `introspect` module doc.
12
13use async_trait::async_trait;
14use hyperactor::Actor;
15use hyperactor::Context;
16use hyperactor::Endpoint as _;
17use hyperactor::HandleClient;
18use hyperactor::Handler;
19use hyperactor::OncePortRef;
20use hyperactor::RefClient;
21use serde::Deserialize;
22use serde::Serialize;
23use typeuri::Named;
24
25use crate::config::MESH_ADMIN_PYSPY_TIMEOUT;
26use crate::config::PYSPY_BIN;
27
28/// Result of a py-spy stack dump request.
29///
30/// See PS-2, PS-4 in `introspect` module doc.
31#[derive(
32    Debug,
33    Clone,
34    PartialEq,
35    Serialize,
36    Deserialize,
37    Named,
38    schemars::JsonSchema
39)]
40pub enum PySpyResult {
41    /// Successful stack dump with structured traces.
42    Ok {
43        /// OS process ID that was dumped.
44        pid: u32,
45        /// Path or name of the py-spy binary that produced the dump.
46        binary: String,
47        /// Per-thread stack traces from py-spy.
48        stack_traces: Vec<PySpyStackTrace>,
49        /// Non-fatal warnings from the capture (e.g., flag
50        /// fallbacks). Empty when the capture completed without
51        /// caveats.
52        warnings: Vec<String>,
53    },
54    /// py-spy binary not found in environment.
55    BinaryNotFound {
56        /// Candidate paths that were tried before giving up.
57        searched: Vec<String>,
58    },
59    /// py-spy exited with an error.
60    Failed {
61        /// OS process ID that was targeted.
62        pid: u32,
63        /// Path or name of the py-spy binary that failed.
64        binary: String,
65        /// Exit code from the py-spy process, if available.
66        exit_code: Option<i32>,
67        /// Captured stderr output.
68        stderr: String,
69    },
70}
71wirevalue::register_type!(PySpyResult);
72
73/// A single thread's stack trace from py-spy `--json` output.
74#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
75pub struct PySpyStackTrace {
76    /// OS process ID that owns this thread.
77    pub pid: i32,
78    /// Python-level thread identifier (`threading.get_ident()`).
79    pub thread_id: u64,
80    /// Python thread name, if set.
81    pub thread_name: Option<String>,
82    /// OS-level thread ID (e.g., `gettid()` on Linux).
83    pub os_thread_id: Option<u64>,
84    /// Whether the thread is actively running (not idle/waiting).
85    pub active: bool,
86    /// Whether the thread currently holds the GIL.
87    pub owns_gil: bool,
88    /// Stack frames, innermost first.
89    pub frames: Vec<PySpyFrame>,
90}
91
92/// A single frame in a py-spy stack trace.
93#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
94pub struct PySpyFrame {
95    /// Function or method name.
96    pub name: String,
97    /// Absolute path to the source file.
98    pub filename: String,
99    /// Python module name, if known.
100    pub module: Option<String>,
101    /// Basename or abbreviated path.
102    pub short_filename: Option<String>,
103    /// Source line number.
104    pub line: i32,
105    /// Local variables captured in this frame, if available.
106    pub locals: Option<Vec<PySpyLocalVariable>>,
107    /// Whether this frame is an entry point (e.g., module `__main__`).
108    pub is_entry: bool,
109}
110
111/// A local variable captured in a py-spy frame.
112#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
113pub struct PySpyLocalVariable {
114    /// Variable name.
115    pub name: String,
116    /// Memory address of the Python object.
117    pub addr: usize,
118    /// Whether this variable is a function argument.
119    pub arg: bool,
120    /// `repr()` of the value, if captured.
121    pub repr: Option<String>,
122}
123
124/// Options controlling py-spy capture behavior.
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct PySpyOpts {
127    /// Include per-thread stacks (`--threads`).
128    pub threads: bool,
129    /// Include native C/C++ frames (`--native`).
130    pub native: bool,
131    /// Include native frames for all threads, not just those with
132    /// Python frames (`--native-all`).
133    pub native_all: bool,
134    /// Use nonblocking mode — py-spy reads without pausing the
135    /// target process (`--nonblocking`). Enables retry logic (PS-10).
136    pub nonblocking: bool,
137}
138
139/// Public JSON-facing options for a py-spy profile capture.
140///
141/// Deserialized from the HTTP POST body. Validated and converted to
142/// `ValidatedProfileRequest` before any actor messaging.
143///
144/// See PP-1 in `introspect` module doc.
145#[derive(Debug, Clone, Serialize, Deserialize, schemars::JsonSchema)]
146pub struct PySpyProfileOpts {
147    /// Sampling duration in whole seconds. py-spy `--duration`
148    /// accepts integers only. Must be >= 1; upper bound enforced
149    /// at runtime by `MESH_ADMIN_PYSPY_MAX_PROFILE_DURATION`.
150    #[schemars(range(min = 1))]
151    pub duration_s: u32,
152    /// Sampling rate in Hz. Must be 1..=1000.
153    #[schemars(range(min = 1, max = 1000))]
154    pub rate_hz: u32,
155    /// Include native C/C++ frames.
156    pub native: bool,
157    /// Include per-thread stacks.
158    pub threads: bool,
159    /// Use nonblocking mode.
160    pub nonblocking: bool,
161}
162
163/// Validated profile duration. Guaranteed non-zero.
164#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
165pub(crate) struct ProfileDurationSecs(std::num::NonZeroU32);
166
167impl ProfileDurationSecs {
168    pub fn get(self) -> u32 {
169        self.0.get()
170    }
171}
172
173/// Validated sample rate. Guaranteed 1..=1000.
174#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
175pub(crate) struct SampleRateHz(std::num::NonZeroU32);
176
177impl SampleRateHz {
178    pub fn get(self) -> u32 {
179        self.0.get()
180    }
181}
182
183/// Validated profile request. If this exists, it is valid.
184/// Construct only via `try_new`. See PP-1, PP-2.
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub(crate) struct ValidatedProfileRequest {
187    /// Sampling duration (guaranteed non-zero, within max).
188    duration: ProfileDurationSecs,
189    /// Sampling rate (guaranteed 1..=1000).
190    rate: SampleRateHz,
191    /// Include native C/C++ frames.
192    native: bool,
193    /// Include per-thread stacks.
194    threads: bool,
195    /// Use nonblocking mode.
196    nonblocking: bool,
197    /// Kill deadline for the py-spy subprocess.
198    subprocess_timeout: std::time::Duration,
199    /// Bridge reply wait deadline (subprocess + margin).
200    bridge_timeout: std::time::Duration,
201}
202
203impl ValidatedProfileRequest {
204    pub fn duration(&self) -> ProfileDurationSecs {
205        self.duration
206    }
207    pub fn rate(&self) -> SampleRateHz {
208        self.rate
209    }
210    pub fn native(&self) -> bool {
211        self.native
212    }
213    pub fn threads(&self) -> bool {
214        self.threads
215    }
216    pub fn nonblocking(&self) -> bool {
217        self.nonblocking
218    }
219    pub fn subprocess_timeout(&self) -> std::time::Duration {
220        self.subprocess_timeout
221    }
222    pub fn bridge_timeout(&self) -> std::time::Duration {
223        self.bridge_timeout
224    }
225
226    pub fn try_new(
227        opts: &PySpyProfileOpts,
228        max_duration: std::time::Duration,
229    ) -> Result<Self, String> {
230        let duration = std::num::NonZeroU32::new(opts.duration_s)
231            .map(ProfileDurationSecs)
232            .ok_or_else(|| "duration_s must be positive".to_string())?;
233        if std::time::Duration::from_secs(u64::from(duration.get())) > max_duration {
234            return Err(format!(
235                "duration_s {}s exceeds max {}s",
236                duration.get(),
237                max_duration.as_secs()
238            ));
239        }
240        let rate = std::num::NonZeroU32::new(opts.rate_hz)
241            .filter(|n| n.get() <= 1000)
242            .map(SampleRateHz)
243            .ok_or_else(|| format!("rate_hz must be 1..=1000, got {}", opts.rate_hz))?;
244        let subprocess_timeout = std::time::Duration::from_secs(u64::from(duration.get()) + 15);
245        let bridge_timeout = subprocess_timeout + std::time::Duration::from_secs(5);
246        Ok(Self {
247            duration,
248            rate,
249            native: opts.native,
250            threads: opts.threads,
251            nonblocking: opts.nonblocking,
252            subprocess_timeout,
253            bridge_timeout,
254        })
255    }
256}
257
258/// Wire result of a py-spy profile capture. The HTTP handler
259/// unwraps this to produce `image/svg+xml` or `ApiError`.
260/// Not a public JSON contract. See PP-2, PP-3.
261#[derive(Debug, Clone, Serialize, Deserialize, Named)]
262pub enum PySpyProfileResult {
263    Ok {
264        pid: u32,
265        binary: String,
266        svg: Vec<u8>,
267    },
268    BinaryNotFound {
269        searched: Vec<String>,
270    },
271    TimedOut {
272        pid: u32,
273        binary: String,
274        timeout_s: u64,
275        stderr: String,
276    },
277    ExitFailure {
278        pid: u32,
279        binary: String,
280        exit_code: Option<i32>,
281        stderr: String,
282    },
283    OutputMissing {
284        pid: u32,
285        binary: String,
286    },
287    OutputEmpty {
288        pid: u32,
289        binary: String,
290    },
291    OutputReadFailure {
292        pid: u32,
293        binary: String,
294        error: String,
295    },
296    WorkerSpawnFailure {
297        error: String,
298    },
299    SubprocessSpawnFailure {
300        pid: u32,
301        binary: String,
302        error: String,
303    },
304    WaitFailure {
305        pid: u32,
306        binary: String,
307        error: String,
308    },
309    TempDirFailure {
310        pid: u32,
311        binary: String,
312        error: String,
313    },
314}
315wirevalue::register_type!(PySpyProfileResult);
316
317/// Internal profile execution outcome. Converted to
318/// `PySpyProfileResult` at the actor reply boundary.
319#[derive(Debug)]
320pub(crate) enum ProfileExecOutcome {
321    Ok {
322        pid: u32,
323        binary: String,
324        svg: Vec<u8>,
325    },
326    BinaryNotFound {
327        searched: Vec<String>,
328    },
329    TimedOut {
330        pid: u32,
331        binary: String,
332        timeout: std::time::Duration,
333        stderr: String,
334    },
335    ExitFailure {
336        pid: u32,
337        binary: String,
338        exit_code: Option<i32>,
339        stderr: String,
340    },
341    OutputMissing {
342        pid: u32,
343        binary: String,
344    },
345    OutputEmpty {
346        pid: u32,
347        binary: String,
348    },
349    OutputReadFailure {
350        pid: u32,
351        binary: String,
352        error: String,
353    },
354    WorkerSpawnFailure {
355        error: String,
356    },
357    SubprocessSpawnFailure {
358        pid: u32,
359        binary: String,
360        error: String,
361    },
362    WaitFailure {
363        pid: u32,
364        binary: String,
365        error: String,
366    },
367    TempDirFailure {
368        pid: u32,
369        binary: String,
370        error: String,
371    },
372}
373
374impl From<ProfileExecOutcome> for PySpyProfileResult {
375    fn from(outcome: ProfileExecOutcome) -> Self {
376        match outcome {
377            ProfileExecOutcome::Ok { pid, binary, svg } => {
378                PySpyProfileResult::Ok { pid, binary, svg }
379            }
380            ProfileExecOutcome::BinaryNotFound { searched } => {
381                PySpyProfileResult::BinaryNotFound { searched }
382            }
383            ProfileExecOutcome::TimedOut {
384                pid,
385                binary,
386                timeout,
387                stderr,
388            } => PySpyProfileResult::TimedOut {
389                pid,
390                binary,
391                timeout_s: timeout.as_secs(),
392                stderr,
393            },
394            ProfileExecOutcome::ExitFailure {
395                pid,
396                binary,
397                exit_code,
398                stderr,
399            } => PySpyProfileResult::ExitFailure {
400                pid,
401                binary,
402                exit_code,
403                stderr,
404            },
405            ProfileExecOutcome::OutputMissing { pid, binary } => {
406                PySpyProfileResult::OutputMissing { pid, binary }
407            }
408            ProfileExecOutcome::OutputEmpty { pid, binary } => {
409                PySpyProfileResult::OutputEmpty { pid, binary }
410            }
411            ProfileExecOutcome::OutputReadFailure { pid, binary, error } => {
412                PySpyProfileResult::OutputReadFailure { pid, binary, error }
413            }
414            ProfileExecOutcome::WorkerSpawnFailure { error } => {
415                PySpyProfileResult::WorkerSpawnFailure { error }
416            }
417            ProfileExecOutcome::SubprocessSpawnFailure { pid, binary, error } => {
418                PySpyProfileResult::SubprocessSpawnFailure { pid, binary, error }
419            }
420            ProfileExecOutcome::WaitFailure { pid, binary, error } => {
421                PySpyProfileResult::WaitFailure { pid, binary, error }
422            }
423            ProfileExecOutcome::TempDirFailure { pid, binary, error } => {
424                PySpyProfileResult::TempDirFailure { pid, binary, error }
425            }
426        }
427    }
428}
429
430/// Request a py-spy stack dump from this process.
431///
432/// Both ProcAgent and HostAgent handle this message. The handler
433/// delegates to [`PySpyWorker::spawn_and_forward`] which runs py-spy
434/// against `std::process::id()`.
435///
436/// See PS-1 in `introspect` module doc.
437#[derive(Debug, Serialize, Deserialize, Named, Handler, HandleClient, RefClient)]
438pub struct PySpyDump {
439    /// Capture options (threads, native frames, nonblocking mode).
440    pub opts: PySpyOpts,
441    /// Reply port for the result.
442    #[reply]
443    pub result: OncePortRef<PySpyResult>,
444}
445wirevalue::register_type!(PySpyDump);
446
447/// Request a py-spy profile capture from this process.
448///
449/// Runs `py-spy record` for the requested duration. Separate contract
450/// from `PySpyDump` — does not affect the existing dump pipeline.
451///
452/// See PP-4, PP-5 in `introspect` module doc.
453#[allow(private_interfaces)] // pub required by hyperactor macros; actual use is crate-internal
454#[derive(Debug, Serialize, Deserialize, Named, Handler, HandleClient, RefClient)]
455pub struct PySpyProfile {
456    /// Validated profile request (opts + derived timeouts).
457    pub request: ValidatedProfileRequest,
458    /// Reply port for the result.
459    #[reply]
460    pub result: OncePortRef<PySpyProfileResult>,
461}
462wirevalue::register_type!(PySpyProfile);
463
464/// Runs py-spy against the current process.
465///
466/// See PS-1, PS-3 in `introspect` module doc.
467pub struct PySpyRunner;
468
469impl PySpyRunner {
470    /// Dump Python stacks for this process.
471    ///
472    /// Resolves the py-spy binary (PS-3), attaches to
473    /// `std::process::id()` (PS-1), and returns structured JSON
474    /// output (PS-4).
475    ///
476    /// PS-1 is structurally enforced: `PySpyDump` carries no PID
477    /// field, and this method hardcodes `std::process::id()`. There
478    /// is no code path that could substitute a different PID.
479    pub async fn dump_self(&self, opts: &PySpyOpts) -> PySpyResult {
480        let pid = std::process::id();
481        let pyspy_bin: String = hyperactor_config::global::get_cloned(PYSPY_BIN);
482        let candidates = resolve_candidates(if pyspy_bin.is_empty() {
483            None
484        } else {
485            Some(pyspy_bin)
486        });
487        let mut searched = vec![];
488
489        for (binary, label) in &candidates {
490            searched.push(label.clone());
491            if let Some(result) = try_exec(
492                binary,
493                pid,
494                opts,
495                hyperactor_config::global::get(MESH_ADMIN_PYSPY_TIMEOUT),
496            )
497            .await
498            {
499                return result;
500            }
501        }
502
503        PySpyResult::BinaryNotFound { searched }
504    }
505
506    /// Profile Python stacks for this process over a duration.
507    /// See PP-3, PP-4.
508    pub(crate) async fn profile_self(
509        &self,
510        request: &ValidatedProfileRequest,
511    ) -> ProfileExecOutcome {
512        let pid = std::process::id();
513        let pyspy_bin: String = hyperactor_config::global::get_cloned(PYSPY_BIN);
514        let candidates = resolve_candidates(if pyspy_bin.is_empty() {
515            None
516        } else {
517            Some(pyspy_bin)
518        });
519        let mut searched = vec![];
520
521        for (binary, label) in &candidates {
522            searched.push(label.clone());
523            if let Some(result) = try_profile(binary, pid, request).await {
524                return result;
525            }
526        }
527
528        ProfileExecOutcome::BinaryNotFound { searched }
529    }
530}
531
532/// Internal message from ProcAgent to a spawned PySpyWorker.
533/// Carries the original caller's reply port so the worker
534/// responds directly without routing back through ProcAgent.
535#[derive(Debug, Serialize, Deserialize, Named)]
536pub struct RunPySpyDump {
537    pub opts: PySpyOpts,
538    /// The original caller's reply port, forwarded from PySpyDump.
539    pub reply_port: hyperactor::OncePortRef<PySpyResult>,
540}
541wirevalue::register_type!(RunPySpyDump);
542
543/// Short-lived child actor that runs py-spy off the ProcAgent
544/// handler path. Spawned per-request; self-terminates after reply.
545/// Concurrent instances are permitted — py-spy attaches read-only
546/// via `process_vm_readv` and multiple concurrent dumps are safe.
547#[hyperactor::export(handlers = [RunPySpyDump])]
548pub struct PySpyWorker;
549
550impl Actor for PySpyWorker {}
551
552impl PySpyWorker {
553    /// Spawn a PySpyWorker, forward the py-spy request, and let
554    /// the worker reply directly to the caller. On spawn failure,
555    /// sends a `Failed` result back via `reply_port`.
556    pub(crate) fn spawn_and_forward(
557        cx: &impl hyperactor::context::Actor,
558        opts: PySpyOpts,
559        reply_port: hyperactor::OncePortRef<PySpyResult>,
560    ) -> Result<(), anyhow::Error> {
561        let worker = match Self.spawn(cx) {
562            Ok(handle) => handle,
563            Err(e) => {
564                let fail = PySpyResult::Failed {
565                    pid: std::process::id(),
566                    binary: String::new(),
567                    exit_code: None,
568                    stderr: format!("failed to spawn pyspy worker: {}", e),
569                };
570                reply_port.post(cx, fail);
571                return Ok(());
572            }
573        };
574        worker.post(cx, RunPySpyDump { opts, reply_port });
575        Ok(())
576    }
577}
578
579#[async_trait]
580impl Handler<RunPySpyDump> for PySpyWorker {
581    async fn handle(
582        &mut self,
583        cx: &Context<Self>,
584        message: RunPySpyDump,
585    ) -> Result<(), anyhow::Error> {
586        let result = PySpyRunner.dump_self(&message.opts).await;
587        message.reply_port.post(cx, result);
588        cx.stop("pyspy dump complete")?;
589        Ok(())
590    }
591}
592
593/// Internal forwarded message for profile capture.
594#[allow(private_interfaces)] // pub required by hyperactor macros; actual use is crate-internal
595#[derive(Debug, Serialize, Deserialize, Named)]
596pub struct RunPySpyProfile {
597    pub request: ValidatedProfileRequest,
598    pub reply_port: hyperactor::OncePortRef<PySpyProfileResult>,
599}
600wirevalue::register_type!(RunPySpyProfile);
601
602/// Short-lived child actor for profile capture. Separate from
603/// `PySpyWorker` (PP-5).
604#[hyperactor::export(handlers = [RunPySpyProfile])]
605pub struct PySpyProfileWorker;
606
607impl Actor for PySpyProfileWorker {}
608
609impl PySpyProfileWorker {
610    /// Spawn a profile worker and forward the request. On spawn
611    /// failure, sends `WorkerSpawnFailure` back via `reply_port`.
612    pub(crate) fn spawn_and_forward(
613        cx: &impl hyperactor::context::Actor,
614        request: ValidatedProfileRequest,
615        reply_port: hyperactor::OncePortRef<PySpyProfileResult>,
616    ) -> Result<(), anyhow::Error> {
617        let worker = match Self.spawn(cx) {
618            Ok(handle) => handle,
619            Err(e) => {
620                let fail = ProfileExecOutcome::WorkerSpawnFailure {
621                    error: e.to_string(),
622                };
623                reply_port.post(cx, PySpyProfileResult::from(fail));
624                return Ok(());
625            }
626        };
627        worker.post(
628            cx,
629            RunPySpyProfile {
630                request,
631                reply_port,
632            },
633        );
634        Ok(())
635    }
636}
637
638#[async_trait]
639impl Handler<RunPySpyProfile> for PySpyProfileWorker {
640    async fn handle(
641        &mut self,
642        cx: &Context<Self>,
643        message: RunPySpyProfile,
644    ) -> Result<(), anyhow::Error> {
645        let outcome = PySpyRunner.profile_self(&message.request).await;
646        message
647            .reply_port
648            .post(cx, PySpyProfileResult::from(outcome));
649        cx.stop("pyspy profile complete")?;
650        Ok(())
651    }
652}
653
654/// Return the ordered list of py-spy binary candidates to try.
655/// See PS-3 in `introspect` module doc.
656fn resolve_candidates(pyspy_bin_env: Option<String>) -> Vec<(String, String)> {
657    let mut candidates = vec![];
658    if let Some(path) = pyspy_bin_env
659        && !path.is_empty()
660    {
661        let label = format!("PYSPY_BIN={}", path);
662        candidates.push((path, label));
663    }
664    candidates.push(("py-spy".to_string(), "py-spy on PATH".to_string()));
665    candidates
666}
667
668/// Build the py-spy command for a given binary path.
669fn build_command(binary: &str, pid: u32, opts: &PySpyOpts) -> tokio::process::Command {
670    let mut cmd = tokio::process::Command::new(binary);
671    cmd.arg("dump")
672        .arg("--pid")
673        .arg(pid.to_string())
674        .arg("--json");
675    if opts.threads {
676        cmd.arg("--threads");
677    }
678    if opts.native {
679        cmd.arg("--native");
680    }
681    if opts.native_all {
682        cmd.arg("--native-all");
683    }
684    if opts.nonblocking {
685        cmd.arg("--nonblocking");
686    }
687    cmd.stdout(std::process::Stdio::piped());
688    cmd.stderr(std::process::Stdio::piped());
689    cmd
690}
691
692/// Map a process::Output to a PySpyResult, parsing the `--json`
693/// output into structured `PySpyStackTrace` values.
694/// See PS-2, PS-4 in `introspect` module doc.
695fn map_output(output: std::process::Output, pid: u32, binary: &str) -> PySpyResult {
696    if output.status.success() {
697        match serde_json::from_slice::<Vec<PySpyStackTrace>>(&output.stdout) {
698            Ok(stack_traces) => PySpyResult::Ok {
699                pid,
700                binary: binary.to_string(),
701                stack_traces,
702                warnings: vec![],
703            },
704            Err(e) => PySpyResult::Failed {
705                pid,
706                binary: binary.to_string(),
707                exit_code: None,
708                stderr: format!("failed to parse py-spy JSON output: {}", e),
709            },
710        }
711    } else {
712        let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
713        PySpyResult::Failed {
714            pid,
715            binary: binary.to_string(),
716            exit_code: output.status.code(),
717            stderr,
718        }
719    }
720}
721
722/// Returns true if the failure indicates the py-spy binary does not
723/// support `--native-all` (exit code 2, stderr mentions the flag).
724/// Used by `try_exec` to downgrade and retry (PS-11).
725fn is_unsupported_native_all(result: &PySpyResult) -> bool {
726    matches!(
727        result,
728        PySpyResult::Failed {
729            exit_code: Some(2),
730            stderr,
731            ..
732        } if stderr.contains("--native-all")
733    )
734}
735
736/// Result of a single spawn → collect execution step.
737enum ExecOnce {
738    /// py-spy produced a result (success or failure).
739    Result(PySpyResult),
740    /// The binary was not found (NotFound from spawn).
741    NotFound,
742}
743
744/// Spawn the py-spy binary once, collect output, and return the
745/// result. Factored out of `try_exec` so both the normal attempt
746/// path and the PS-11 native-all downgrade path share one
747/// implementation of deadline check → spawn → collect.
748async fn exec_once(
749    binary: &str,
750    pid: u32,
751    opts: &PySpyOpts,
752    deadline: tokio::time::Instant,
753    timeout: std::time::Duration,
754) -> ExecOnce {
755    let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
756    if remaining.is_zero() {
757        return ExecOnce::Result(PySpyResult::Failed {
758            pid,
759            binary: binary.to_string(),
760            exit_code: None,
761            stderr: format!("py-spy subprocess timed out after {}s", timeout.as_secs()),
762        });
763    }
764    let child = match build_command(binary, pid, opts).spawn() {
765        Ok(child) => child,
766        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return ExecOnce::NotFound,
767        Err(e) => {
768            return ExecOnce::Result(PySpyResult::Failed {
769                pid,
770                binary: binary.to_string(),
771                exit_code: None,
772                stderr: format!("failed to execute: {}", e),
773            });
774        }
775    };
776    ExecOnce::Result(collect_with_timeout(child, pid, binary, remaining).await)
777}
778
779/// Try to execute py-spy with the given binary path. Returns `None`
780/// if the binary was not found (NotFound error), allowing the caller
781/// to try the next candidate.
782///
783/// In nonblocking mode, retries up to 3 times with 100ms backoff
784/// because py-spy can segfault reading mutating process memory
785/// (PS-10). All attempts share a single deadline so total wall time
786/// never exceeds the caller's timeout budget (PS-5).
787///
788/// If `native_all` is requested but the py-spy binary does not
789/// support `--native-all` (exit code 2), the flag is dropped and the
790/// command is retried immediately within the same attempt (PS-11a
791/// through PS-11e).
792async fn try_exec(
793    binary: &str,
794    pid: u32,
795    opts: &PySpyOpts,
796    timeout: std::time::Duration,
797) -> Option<PySpyResult> {
798    let deadline = tokio::time::Instant::now() + timeout;
799    let retries = if opts.nonblocking { 3 } else { 1 };
800    let mut last_result = None;
801    let mut effective_opts = opts.clone();
802
803    for attempt in 0..retries {
804        if attempt > 0 {
805            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
806        }
807        let mut result = match exec_once(binary, pid, &effective_opts, deadline, timeout).await {
808            ExecOnce::NotFound => return None,
809            ExecOnce::Result(r) => r,
810        };
811        // PS-11a: py-spy too old for --native-all; downgrade and
812        // retry immediately within the same attempt (PS-11b: no
813        // backoff, no retry slot consumed).
814        if is_unsupported_native_all(&result) && effective_opts.native_all {
815            // PS-11e: sticky downgrade — later outer retries keep
816            // native_all = false.
817            effective_opts.native_all = false;
818            result = match exec_once(binary, pid, &effective_opts, deadline, timeout).await {
819                ExecOnce::NotFound => return None,
820                ExecOnce::Result(r) => r,
821            };
822            // PS-11c: inject warning on successful downgraded result.
823            if let PySpyResult::Ok { warnings, .. } = &mut result {
824                warnings.push(
825                    "--native-all unsupported by this py-spy; fell back to --native".to_string(),
826                );
827            }
828            // PS-11d: if the downgraded retry also failed, fall
829            // through to the normal last_result path below.
830        }
831        match &result {
832            PySpyResult::Ok { .. } => return Some(result),
833            _ => {
834                last_result = Some(result);
835            }
836        }
837    }
838
839    last_result
840}
841
842/// Collect stdout/stderr from a spawned child concurrently with wait,
843/// bounded by `timeout`. On expiry the child is killed and reaped.
844///
845/// Reads stdout/stderr concurrently with wait to avoid pipe-buffer
846/// deadlock. Keeps the `Child` handle alive so we can `start_kill`
847/// and `wait` on timeout for deterministic termination and reaping.
848///
849/// See PS-5 in `introspect` module doc.
850async fn collect_with_timeout(
851    mut child: tokio::process::Child,
852    pid: u32,
853    binary: &str,
854    timeout: std::time::Duration,
855) -> PySpyResult {
856    let mut stdout_handle = child.stdout.take();
857    let mut stderr_handle = child.stderr.take();
858
859    let collect = async {
860        let stdout_fut = async {
861            let mut buf = Vec::new();
862            if let Some(ref mut r) = stdout_handle {
863                let _ = tokio::io::AsyncReadExt::read_to_end(r, &mut buf).await;
864            }
865            buf
866        };
867        let stderr_fut = async {
868            let mut buf = Vec::new();
869            if let Some(ref mut r) = stderr_handle {
870                let _ = tokio::io::AsyncReadExt::read_to_end(r, &mut buf).await;
871            }
872            buf
873        };
874        let (stdout_bytes, stderr_bytes, status) =
875            tokio::join!(stdout_fut, stderr_fut, child.wait());
876        (stdout_bytes, stderr_bytes, status)
877    };
878
879    match tokio::time::timeout(timeout, collect).await {
880        Ok((stdout_bytes, stderr_bytes, Ok(status))) => {
881            let output = std::process::Output {
882                status,
883                stdout: stdout_bytes,
884                stderr: stderr_bytes,
885            };
886            map_output(output, pid, binary)
887        }
888        Ok((_, _, Err(e))) => PySpyResult::Failed {
889            pid,
890            binary: binary.to_string(),
891            exit_code: None,
892            stderr: format!("failed to wait for child: {}", e),
893        },
894        Err(_) => {
895            // Timeout — kill and reap deterministically.
896            let _ = child.start_kill();
897            let _ = child.wait().await;
898            PySpyResult::Failed {
899                pid,
900                binary: binary.to_string(),
901                exit_code: None,
902                stderr: format!("py-spy subprocess timed out after {}s", timeout.as_secs()),
903            }
904        }
905    }
906}
907
908/// Build a `py-spy record --format flamegraph` command.
909fn build_record_command(
910    binary: &str,
911    pid: u32,
912    request: &ValidatedProfileRequest,
913    output_path: &std::path::Path,
914) -> tokio::process::Command {
915    let mut cmd = tokio::process::Command::new(binary);
916    cmd.arg("record")
917        .arg("--pid")
918        .arg(pid.to_string())
919        .arg("--duration")
920        .arg(request.duration().get().to_string())
921        .arg("--rate")
922        .arg(request.rate().get().to_string())
923        .arg("--format")
924        .arg("flamegraph")
925        .arg("--output")
926        .arg(output_path);
927    if request.native() {
928        cmd.arg("--native");
929    }
930    if request.threads() {
931        cmd.arg("--threads");
932    }
933    if request.nonblocking() {
934        cmd.arg("--nonblocking");
935    }
936    // py-spy record writes output to a file, not stdout. Do NOT
937    // pipe stdout — an undrained pipe can deadlock the child.
938    cmd.stdout(std::process::Stdio::null());
939    cmd.stderr(std::process::Stdio::piped());
940    cmd
941}
942
943/// Collect stderr and wait for exit, bounded by `timeout`. On
944/// expiry the child is explicitly killed and reaped. See PP-2, PP-3.
945async fn collect_profile_with_timeout(
946    mut child: tokio::process::Child,
947    pid: u32,
948    binary: &str,
949    timeout: std::time::Duration,
950) -> Result<(std::process::ExitStatus, String), ProfileExecOutcome> {
951    // Drain stderr on a separate task so it does not block the
952    // child.wait() path and so `child` stays in this scope for
953    // explicit kill/reap on timeout.
954    let stderr_handle = child.stderr.take();
955    let stderr_task = tokio::spawn(async move {
956        let mut buf = Vec::new();
957        if let Some(mut r) = stderr_handle {
958            let _ = tokio::io::AsyncReadExt::read_to_end(&mut r, &mut buf).await;
959        }
960        buf
961    });
962
963    match tokio::time::timeout(timeout, child.wait()).await {
964        Ok(Ok(status)) => {
965            let stderr_bytes = stderr_task.await.unwrap_or_default();
966            let stderr = String::from_utf8_lossy(&stderr_bytes).into_owned();
967            Ok((status, stderr))
968        }
969        Ok(Err(e)) => {
970            stderr_task.abort();
971            Err(ProfileExecOutcome::WaitFailure {
972                pid,
973                binary: binary.to_string(),
974                error: e.to_string(),
975            })
976        }
977        Err(_) => {
978            // Timeout — explicit kill and reap.
979            let _ = child.start_kill();
980            let _ = child.wait().await;
981            let stderr_bytes = stderr_task.await.unwrap_or_default();
982            let stderr = String::from_utf8_lossy(&stderr_bytes).into_owned();
983            Err(ProfileExecOutcome::TimedOut {
984                pid,
985                binary: binary.to_string(),
986                timeout,
987                stderr,
988            })
989        }
990    }
991}
992
993/// Try to run a profile capture with the given binary. Returns `None`
994/// if the binary was not found (caller tries next candidate).
995async fn try_profile(
996    binary: &str,
997    pid: u32,
998    request: &ValidatedProfileRequest,
999) -> Option<ProfileExecOutcome> {
1000    let timeout = request.subprocess_timeout();
1001    let tmp_dir = match tempfile::tempdir() {
1002        Ok(d) => d,
1003        Err(e) => {
1004            return Some(ProfileExecOutcome::TempDirFailure {
1005                pid,
1006                binary: binary.to_string(),
1007                error: e.to_string(),
1008            });
1009        }
1010    };
1011    let svg_path = tmp_dir.path().join("profile.svg");
1012
1013    let child = match build_record_command(binary, pid, request, &svg_path).spawn() {
1014        Ok(c) => c,
1015        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return None,
1016        Err(e) => {
1017            return Some(ProfileExecOutcome::SubprocessSpawnFailure {
1018                pid,
1019                binary: binary.to_string(),
1020                error: e.to_string(),
1021            });
1022        }
1023    };
1024
1025    let (status, stderr) = match collect_profile_with_timeout(child, pid, binary, timeout).await {
1026        Ok(pair) => pair,
1027        Err(outcome) => return Some(outcome),
1028    };
1029
1030    if !status.success() {
1031        return Some(ProfileExecOutcome::ExitFailure {
1032            pid,
1033            binary: binary.to_string(),
1034            exit_code: status.code(),
1035            stderr,
1036        });
1037    }
1038
1039    match std::fs::read(&svg_path) {
1040        Ok(bytes) if bytes.is_empty() => Some(ProfileExecOutcome::OutputEmpty {
1041            pid,
1042            binary: binary.to_string(),
1043        }),
1044        Ok(svg) => Some(ProfileExecOutcome::Ok {
1045            pid,
1046            binary: binary.to_string(),
1047            svg,
1048        }),
1049        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1050            Some(ProfileExecOutcome::OutputMissing {
1051                pid,
1052                binary: binary.to_string(),
1053            })
1054        }
1055        Err(e) => Some(ProfileExecOutcome::OutputReadFailure {
1056            pid,
1057            binary: binary.to_string(),
1058            error: e.to_string(),
1059        }),
1060    }
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065    use std::io::Write;
1066    use std::os::unix::fs::PermissionsExt;
1067    use std::os::unix::process::ExitStatusExt;
1068    use std::time::Duration;
1069
1070    use tokio::process::Command;
1071
1072    use super::*;
1073
1074    #[test]
1075    fn pyspy_result_wirevalue_roundtrip() {
1076        // Regression test: #[serde(skip_serializing_if)] is
1077        // incompatible with bincode (positional format). Empty
1078        // warnings must still round-trip correctly through wirevalue
1079        // Multipart encoding.
1080        let original = PySpyResult::Ok {
1081            pid: 42,
1082            binary: "py-spy".to_string(),
1083            stack_traces: vec![PySpyStackTrace {
1084                pid: 42,
1085                thread_id: 1,
1086                thread_name: Some("main".to_string()),
1087                os_thread_id: Some(100),
1088                active: true,
1089                owns_gil: true,
1090                frames: vec![PySpyFrame {
1091                    name: "do_work".to_string(),
1092                    filename: "test.py".to_string(),
1093                    module: None,
1094                    short_filename: None,
1095                    line: 10,
1096                    locals: None,
1097                    is_entry: false,
1098                }],
1099            }],
1100            warnings: vec![],
1101        };
1102        let any = wirevalue::Any::serialize(&original).expect("serialize");
1103        let restored: PySpyResult = any.deserialized().expect("deserialize");
1104        assert_eq!(original, restored);
1105    }
1106
1107    #[test]
1108    fn candidates_no_env() {
1109        // PS-3: no PYSPY_BIN → only PATH candidate.
1110        let candidates = resolve_candidates(None);
1111        assert_eq!(candidates.len(), 1);
1112        assert_eq!(candidates[0].0, "py-spy");
1113        assert_eq!(candidates[0].1, "py-spy on PATH");
1114    }
1115
1116    #[test]
1117    fn candidates_env_first_then_path() {
1118        // PS-3: PYSPY_BIN first, then PATH.
1119        let candidates = resolve_candidates(Some("/custom/py-spy".to_string()));
1120        assert_eq!(candidates.len(), 2);
1121        assert_eq!(candidates[0].0, "/custom/py-spy");
1122        assert!(candidates[0].1.contains("PYSPY_BIN=/custom/py-spy"));
1123        assert_eq!(candidates[1].0, "py-spy");
1124    }
1125
1126    #[test]
1127    fn candidates_empty_env_ignored() {
1128        // PS-3: empty PYSPY_BIN is treated as unset.
1129        let candidates = resolve_candidates(Some(String::new()));
1130        assert_eq!(candidates.len(), 1);
1131        assert_eq!(candidates[0].0, "py-spy");
1132    }
1133
1134    #[test]
1135    fn output_success_parses_json() {
1136        // PS-4: py-spy --json stdout is parsed into structured traces.
1137        let json = serde_json::json!([{
1138            "pid": 42,
1139            "thread_id": 1234,
1140            "thread_name": "MainThread",
1141            "os_thread_id": 5678,
1142            "active": true,
1143            "owns_gil": true,
1144            "frames": [{
1145                "name": "do_work",
1146                "filename": "foo.py",
1147                "module": null,
1148                "short_filename": null,
1149                "line": 10,
1150                "locals": null,
1151                "is_entry": false
1152            }]
1153        }]);
1154        let output = std::process::Output {
1155            status: std::process::ExitStatus::default(),
1156            stdout: serde_json::to_vec(&json).unwrap(),
1157            stderr: vec![],
1158        };
1159        let result = map_output(output, 42, "/usr/bin/py-spy");
1160        match result {
1161            PySpyResult::Ok {
1162                pid,
1163                binary,
1164                stack_traces,
1165                ..
1166            } => {
1167                assert_eq!(pid, 42);
1168                assert_eq!(binary, "/usr/bin/py-spy");
1169                assert_eq!(stack_traces.len(), 1);
1170                assert_eq!(stack_traces[0].thread_id, 1234);
1171                assert_eq!(stack_traces[0].thread_name.as_deref(), Some("MainThread"));
1172                assert!(stack_traces[0].owns_gil);
1173                assert_eq!(stack_traces[0].frames.len(), 1);
1174                assert_eq!(stack_traces[0].frames[0].name, "do_work");
1175                assert_eq!(stack_traces[0].frames[0].filename, "foo.py");
1176                assert_eq!(stack_traces[0].frames[0].line, 10);
1177            }
1178            other => panic!("expected Ok, got {:?}", other),
1179        }
1180    }
1181
1182    #[test]
1183    fn output_invalid_json_maps_to_failed() {
1184        // PS-4: unparseable JSON maps to Failed.
1185        let output = std::process::Output {
1186            status: std::process::ExitStatus::default(),
1187            stdout: b"not valid json".to_vec(),
1188            stderr: vec![],
1189        };
1190        let result = map_output(output, 42, "py-spy");
1191        match result {
1192            PySpyResult::Failed { pid, stderr, .. } => {
1193                assert_eq!(pid, 42);
1194                assert!(
1195                    stderr.contains("failed to parse py-spy JSON output"),
1196                    "unexpected stderr: {stderr}"
1197                );
1198            }
1199            other => panic!("expected Failed, got {:?}", other),
1200        }
1201    }
1202
1203    #[test]
1204    fn output_nonzero_exit_maps_to_failed() {
1205        // PS-2: nonzero exit → Failed with stderr.
1206        let status = std::process::ExitStatus::from_raw(256); // exit code 1
1207        let output = std::process::Output {
1208            status,
1209            stdout: vec![],
1210            stderr: b"Permission denied".to_vec(),
1211        };
1212        let result = map_output(output, 99, "py-spy");
1213        match result {
1214            PySpyResult::Failed {
1215                pid,
1216                binary,
1217                exit_code,
1218                stderr,
1219            } => {
1220                assert_eq!(pid, 99);
1221                assert_eq!(binary, "py-spy");
1222                assert_eq!(exit_code, Some(1));
1223                assert_eq!(stderr, "Permission denied");
1224            }
1225            other => panic!("expected Failed, got {:?}", other),
1226        }
1227    }
1228
1229    #[test]
1230    fn output_preserves_caller_pid() {
1231        // PS-1: pid in result is exactly what the caller passes.
1232        let json = serde_json::json!([]);
1233        let output = std::process::Output {
1234            status: std::process::ExitStatus::default(),
1235            stdout: serde_json::to_vec(&json).unwrap(),
1236            stderr: vec![],
1237        };
1238        let result = map_output(output, 12345, "bin");
1239        match result {
1240            PySpyResult::Ok { pid, .. } => assert_eq!(pid, 12345),
1241            other => panic!("expected Ok, got {:?}", other),
1242        }
1243    }
1244
1245    fn default_opts() -> PySpyOpts {
1246        PySpyOpts {
1247            threads: false,
1248            native: false,
1249            native_all: false,
1250            nonblocking: false,
1251        }
1252    }
1253
1254    #[tokio::test]
1255    async fn exec_missing_binary_returns_none() {
1256        // PS-3: NotFound from exec → None (triggers fallback).
1257        let result = try_exec(
1258            "/definitely/not/a/real/binary",
1259            1,
1260            &default_opts(),
1261            std::time::Duration::from_secs(5),
1262        )
1263        .await;
1264        assert!(result.is_none());
1265    }
1266
1267    #[tokio::test]
1268    async fn exec_present_binary_returns_some() {
1269        // "true" exits 0 with empty stdout, which is not valid
1270        // py-spy JSON. We expect a Failed result from parse error.
1271        let result = try_exec(
1272            "true",
1273            1,
1274            &default_opts(),
1275            std::time::Duration::from_secs(5),
1276        )
1277        .await;
1278        match result {
1279            Some(PySpyResult::Failed { stderr, .. }) => {
1280                assert!(
1281                    stderr.contains("parse"),
1282                    "expected JSON parse error, got: {stderr}"
1283                );
1284            }
1285            other => panic!("expected Some(Failed{{parse..}}), got: {other:?}"),
1286        }
1287    }
1288
1289    #[tokio::test]
1290    async fn collect_timeout_kills_child_and_returns_failed() {
1291        // PS-5: subprocess that hangs past timeout → Failed with
1292        // "timed out" message; child is killed and reaped.
1293        let child = Command::new("sleep")
1294            .arg("100")
1295            .stdout(std::process::Stdio::piped())
1296            .stderr(std::process::Stdio::piped())
1297            .spawn()
1298            .expect("sleep must be available");
1299
1300        let result = collect_with_timeout(
1301            child,
1302            std::process::id(),
1303            "sleep",
1304            std::time::Duration::from_millis(100),
1305        )
1306        .await;
1307
1308        match result {
1309            PySpyResult::Failed { stderr, .. } => {
1310                assert!(
1311                    stderr.contains("timed out"),
1312                    "expected timeout message, got: {stderr}"
1313                );
1314            }
1315            other => panic!("expected Failed, got {:?}", other),
1316        }
1317    }
1318
1319    #[tokio::test]
1320    async fn exec_failing_binary_returns_failed() {
1321        // "false" exists on all unix systems and exits 1.
1322        let result = try_exec(
1323            "false",
1324            42,
1325            &default_opts(),
1326            std::time::Duration::from_secs(5),
1327        )
1328        .await;
1329        assert!(result.is_some());
1330        match result.unwrap() {
1331            PySpyResult::Failed {
1332                pid,
1333                binary,
1334                exit_code,
1335                ..
1336            } => {
1337                assert_eq!(pid, 42);
1338                assert_eq!(binary, "false");
1339                assert!(exit_code.is_some());
1340            }
1341            other => panic!("expected Failed, got {:?}", other),
1342        }
1343    }
1344
1345    /// Write a fake py-spy shell script to a temp file, make it
1346    /// executable, and return the path. The script logs each
1347    /// invocation's argv to `<script>.log`.
1348    ///
1349    /// Returns a `TempPath` (not `NamedTempFile`) so the write fd is
1350    /// closed before exec — Linux returns ETXTBSY if a file with an
1351    /// open write fd is executed.
1352    fn write_fake_pyspy(script_body: &str) -> tempfile::TempPath {
1353        let mut f = tempfile::NamedTempFile::new().expect("create temp file");
1354        write!(f, "#!/bin/sh\n{script_body}").expect("write script");
1355        f.as_file().sync_all().expect("sync");
1356        std::fs::set_permissions(f.path(), std::fs::Permissions::from_mode(0o755))
1357            .expect("chmod +x");
1358        f.into_temp_path()
1359    }
1360
1361    /// Read the argv log written by the fake script. Each line is one
1362    /// invocation's `$@`.
1363    fn read_log(script_path: &std::path::Path) -> Vec<String> {
1364        let log_path = format!("{}.log", script_path.display());
1365        match std::fs::read_to_string(&log_path) {
1366            Ok(contents) => contents.lines().map(String::from).collect(),
1367            Err(_) => vec![],
1368        }
1369    }
1370
1371    #[tokio::test]
1372    async fn native_all_downgrade_succeeds() {
1373        // PS-11a, PS-11b, PS-11c: unsupported --native-all triggers
1374        // immediate downgrade in the same attempt, and the successful
1375        // result carries the fallback warning.
1376        let script = write_fake_pyspy(
1377            r#"
1378echo "$@" >> "$0.log"
1379for arg in "$@"; do
1380    if [ "$arg" = "--native-all" ]; then
1381        echo "unrecognized option --native-all" >&2
1382        exit 2
1383    fi
1384done
1385echo "[]"
1386exit 0
1387"#,
1388        );
1389        let opts = PySpyOpts {
1390            threads: false,
1391            native: true,
1392            native_all: true,
1393            nonblocking: false,
1394        };
1395        let result = try_exec(
1396            script.to_str().unwrap(),
1397            1,
1398            &opts,
1399            std::time::Duration::from_secs(5),
1400        )
1401        .await;
1402        // Must succeed with the downgraded result.
1403        let result = result.expect("expected Some");
1404        match &result {
1405            PySpyResult::Ok { warnings, .. } => {
1406                assert!(
1407                    warnings.iter().any(|w| w.contains("fell back to --native")),
1408                    "PS-11c: expected fallback warning, got: {warnings:?}"
1409                );
1410            }
1411            other => panic!("expected Ok, got: {other:?}"),
1412        }
1413        // Check invocation log.
1414        let log = read_log(&script);
1415        assert_eq!(
1416            log.len(),
1417            2,
1418            "PS-11b: expected exactly 2 invocations, got {}",
1419            log.len()
1420        );
1421        assert!(
1422            log[0].contains("--native-all"),
1423            "PS-11a: first invocation must include --native-all, got: {}",
1424            log[0]
1425        );
1426        assert!(
1427            !log[1].contains("--native-all"),
1428            "PS-11a: second invocation must NOT include --native-all, got: {}",
1429            log[1]
1430        );
1431    }
1432
1433    #[tokio::test]
1434    async fn native_all_downgrade_fails_retries_continue() {
1435        // PS-11d, PS-11e: downgraded retry fails, outer nonblocking
1436        // retries continue with native_all = false.
1437        let script = write_fake_pyspy(
1438            r#"
1439echo "$@" >> "$0.log"
1440for arg in "$@"; do
1441    if [ "$arg" = "--native-all" ]; then
1442        echo "unrecognized option --native-all" >&2
1443        exit 2
1444    fi
1445done
1446echo "Permission denied" >&2
1447exit 1
1448"#,
1449        );
1450        let opts = PySpyOpts {
1451            threads: false,
1452            native: true,
1453            native_all: true,
1454            nonblocking: true, // 3 outer retries
1455        };
1456        let result = try_exec(
1457            script.to_str().unwrap(),
1458            1,
1459            &opts,
1460            std::time::Duration::from_secs(10),
1461        )
1462        .await;
1463        // Must be a generic failure, not the native-all error.
1464        let result = result.expect("expected Some");
1465        match &result {
1466            PySpyResult::Failed {
1467                stderr, exit_code, ..
1468            } => {
1469                assert!(
1470                    stderr.contains("Permission denied"),
1471                    "PS-11d: expected generic failure, got: {stderr}"
1472                );
1473                assert_eq!(*exit_code, Some(1));
1474            }
1475            other => panic!("expected Failed, got: {other:?}"),
1476        }
1477        // Check invocation log: 4 calls total.
1478        //   Attempt 0: --native-all (fail) → downgrade (fail)
1479        //   Attempt 1: without --native-all (fail)
1480        //   Attempt 2: without --native-all (fail)
1481        let log = read_log(&script);
1482        assert_eq!(log.len(), 4, "expected 4 invocations, got {}", log.len());
1483        assert!(
1484            log[0].contains("--native-all"),
1485            "PS-11a: first invocation must include --native-all, got: {}",
1486            log[0]
1487        );
1488        for (i, line) in log[1..].iter().enumerate() {
1489            assert!(
1490                !line.contains("--native-all"),
1491                "PS-11e: invocation {} must NOT include --native-all, got: {}",
1492                i + 1,
1493                line
1494            );
1495        }
1496    }
1497
1498    /// PP-2: subprocess timeout yields `TimedOut` with partial stderr.
1499    #[tokio::test]
1500    async fn profile_collect_timeout_returns_timed_out() {
1501        let child = Command::new("sh")
1502            .arg("-c")
1503            .arg("echo diag >&2; sleep 60")
1504            .stdout(std::process::Stdio::null())
1505            .stderr(std::process::Stdio::piped())
1506            .spawn()
1507            .expect("sh must be available");
1508
1509        let result = collect_profile_with_timeout(
1510            child,
1511            std::process::id(),
1512            "sh",
1513            std::time::Duration::from_millis(200),
1514        )
1515        .await;
1516
1517        match result {
1518            Err(ProfileExecOutcome::TimedOut { stderr, .. }) => {
1519                assert!(
1520                    stderr.contains("diag"),
1521                    "expected partial stderr captured after kill, got: {stderr}"
1522                );
1523            }
1524            other => panic!("expected TimedOut, got: {other:?}"),
1525        }
1526    }
1527
1528    fn test_request() -> ValidatedProfileRequest {
1529        ValidatedProfileRequest::try_new(
1530            &PySpyProfileOpts {
1531                duration_s: 1,
1532                rate_hz: 100,
1533                native: false,
1534                threads: false,
1535                nonblocking: false,
1536            },
1537            std::time::Duration::from_secs(300),
1538        )
1539        .unwrap()
1540    }
1541
1542    /// PP-4, PS-3: missing binary yields `None` (try next candidate).
1543    #[tokio::test]
1544    async fn profile_try_missing_binary_returns_none() {
1545        let result = try_profile("/definitely/not/a/real/binary", 1, &test_request()).await;
1546        assert!(result.is_none(), "missing binary must return None");
1547    }
1548
1549    /// PP-3: successful exit with empty output yields `OutputEmpty`.
1550    #[tokio::test]
1551    async fn profile_success_exit_empty_file_returns_output_empty() {
1552        let script = write_fake_pyspy(
1553            r#"
1554output=""
1555while [ $# -gt 0 ]; do
1556    case "$1" in
1557        --output) shift; output="$1" ;;
1558    esac
1559    shift
1560done
1561touch "$output"
1562exit 0
1563"#,
1564        );
1565        let result = try_profile(script.to_str().unwrap(), 1, &test_request()).await;
1566        assert!(
1567            matches!(result, Some(ProfileExecOutcome::OutputEmpty { .. })),
1568            "PP-3: expected OutputEmpty, got: {result:?}"
1569        );
1570    }
1571
1572    /// PP-3: successful exit with missing output yields `OutputMissing`.
1573    #[tokio::test]
1574    async fn profile_success_exit_missing_file_returns_output_missing() {
1575        let script = write_fake_pyspy("exit 0\n");
1576        let result = try_profile(script.to_str().unwrap(), 1, &test_request()).await;
1577        assert!(
1578            matches!(result, Some(ProfileExecOutcome::OutputMissing { .. })),
1579            "PP-3: expected OutputMissing, got: {result:?}"
1580        );
1581    }
1582
1583    /// PP-1: zero duration rejected.
1584    #[test]
1585    fn validated_request_rejects_zero_duration() {
1586        let opts = PySpyProfileOpts {
1587            duration_s: 0,
1588            rate_hz: 100,
1589            native: false,
1590            threads: false,
1591            nonblocking: false,
1592        };
1593        let err = ValidatedProfileRequest::try_new(&opts, std::time::Duration::from_secs(300));
1594        assert!(err.is_err());
1595        assert!(err.unwrap_err().contains("positive"));
1596    }
1597
1598    /// PP-1: over-max duration rejected.
1599    #[test]
1600    fn validated_request_rejects_over_max_duration() {
1601        let opts = PySpyProfileOpts {
1602            duration_s: 999,
1603            rate_hz: 100,
1604            native: false,
1605            threads: false,
1606            nonblocking: false,
1607        };
1608        let err = ValidatedProfileRequest::try_new(&opts, std::time::Duration::from_secs(300));
1609        assert!(err.is_err());
1610        assert!(err.unwrap_err().contains("exceeds max"));
1611    }
1612
1613    /// PP-1: zero rate rejected.
1614    #[test]
1615    fn validated_request_rejects_zero_rate() {
1616        let opts = PySpyProfileOpts {
1617            duration_s: 5,
1618            rate_hz: 0,
1619            native: false,
1620            threads: false,
1621            nonblocking: false,
1622        };
1623        let err = ValidatedProfileRequest::try_new(&opts, std::time::Duration::from_secs(300));
1624        assert!(err.is_err());
1625        assert!(err.unwrap_err().contains("rate_hz"));
1626    }
1627
1628    /// PP-1: excessive rate rejected.
1629    #[test]
1630    fn validated_request_rejects_excessive_rate() {
1631        let opts = PySpyProfileOpts {
1632            duration_s: 5,
1633            rate_hz: 9999,
1634            native: false,
1635            threads: false,
1636            nonblocking: false,
1637        };
1638        let err = ValidatedProfileRequest::try_new(&opts, std::time::Duration::from_secs(300));
1639        assert!(err.is_err());
1640        assert!(err.unwrap_err().contains("rate_hz"));
1641    }
1642
1643    /// PP-2: timeout arithmetic is correct and deterministic.
1644    #[test]
1645    fn validated_request_computes_exact_timeouts() {
1646        let opts = PySpyProfileOpts {
1647            duration_s: 30,
1648            rate_hz: 100,
1649            native: true,
1650            threads: false,
1651            nonblocking: false,
1652        };
1653        let req =
1654            ValidatedProfileRequest::try_new(&opts, std::time::Duration::from_secs(300)).unwrap();
1655        assert_eq!(req.duration().get(), 30);
1656        assert_eq!(req.rate().get(), 100);
1657        assert!(req.native());
1658        assert_eq!(req.subprocess_timeout(), std::time::Duration::from_secs(45));
1659        assert_eq!(req.bridge_timeout(), std::time::Duration::from_secs(50));
1660    }
1661
1662    /// PP-6: internal-to-wire conversion is near-identity.
1663    #[test]
1664    fn profile_exec_outcome_conversion_is_identity() {
1665        // Each internal outcome maps to the identically-named wire variant.
1666        let r = PySpyProfileResult::from(ProfileExecOutcome::Ok {
1667            pid: 1,
1668            binary: "b".into(),
1669            svg: vec![1],
1670        });
1671        assert!(matches!(r, PySpyProfileResult::Ok { pid: 1, .. }));
1672
1673        let r = PySpyProfileResult::from(ProfileExecOutcome::BinaryNotFound {
1674            searched: vec!["x".into()],
1675        });
1676        assert!(matches!(r, PySpyProfileResult::BinaryNotFound { .. }));
1677
1678        let r = PySpyProfileResult::from(ProfileExecOutcome::TimedOut {
1679            pid: 1,
1680            binary: "b".into(),
1681            timeout: Duration::from_secs(10),
1682            stderr: "s".into(),
1683        });
1684        assert!(matches!(
1685            r,
1686            PySpyProfileResult::TimedOut { timeout_s: 10, .. }
1687        ));
1688
1689        let r = PySpyProfileResult::from(ProfileExecOutcome::ExitFailure {
1690            pid: 1,
1691            binary: "b".into(),
1692            exit_code: Some(2),
1693            stderr: "e".into(),
1694        });
1695        assert!(matches!(
1696            r,
1697            PySpyProfileResult::ExitFailure {
1698                exit_code: Some(2),
1699                ..
1700            }
1701        ));
1702
1703        let r = PySpyProfileResult::from(ProfileExecOutcome::OutputMissing {
1704            pid: 1,
1705            binary: "b".into(),
1706        });
1707        assert!(matches!(
1708            r,
1709            PySpyProfileResult::OutputMissing { pid: 1, .. }
1710        ));
1711
1712        let r = PySpyProfileResult::from(ProfileExecOutcome::OutputEmpty {
1713            pid: 1,
1714            binary: "b".into(),
1715        });
1716        assert!(matches!(r, PySpyProfileResult::OutputEmpty { pid: 1, .. }));
1717
1718        let r = PySpyProfileResult::from(ProfileExecOutcome::OutputReadFailure {
1719            pid: 1,
1720            binary: "b".into(),
1721            error: "permission denied".into(),
1722        });
1723        assert!(matches!(r, PySpyProfileResult::OutputReadFailure { .. }));
1724
1725        let r =
1726            PySpyProfileResult::from(ProfileExecOutcome::WorkerSpawnFailure { error: "w".into() });
1727        assert!(matches!(r, PySpyProfileResult::WorkerSpawnFailure { .. }));
1728
1729        let r = PySpyProfileResult::from(ProfileExecOutcome::SubprocessSpawnFailure {
1730            pid: 1,
1731            binary: "b".into(),
1732            error: "s".into(),
1733        });
1734        assert!(matches!(
1735            r,
1736            PySpyProfileResult::SubprocessSpawnFailure { .. }
1737        ));
1738
1739        let r = PySpyProfileResult::from(ProfileExecOutcome::WaitFailure {
1740            pid: 1,
1741            binary: "b".into(),
1742            error: "w".into(),
1743        });
1744        assert!(matches!(r, PySpyProfileResult::WaitFailure { .. }));
1745
1746        let r = PySpyProfileResult::from(ProfileExecOutcome::TempDirFailure {
1747            pid: 1,
1748            binary: "b".into(),
1749            error: "t".into(),
1750        });
1751        assert!(matches!(r, PySpyProfileResult::TempDirFailure { .. }));
1752    }
1753}