hyperactor_multiprocess/
pyspy.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::process::Stdio;
10use std::str;
11
12use anyhow::Context;
13use hyperactor::serde_json;
14use py_spy::stack_trace::Frame;
15use py_spy::stack_trace::LocalVariable;
16use py_spy::stack_trace::StackTrace;
17use serde::Deserialize;
18use serde::Serialize;
19use tokio::process::Command;
20
21/// A full stack trace from PySpy.
22#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
23pub struct PySpyTrace {
24    /// The process id than generated this stack trace.
25    pub pid: i32,
26    /// The command line used to start the process.
27    pub command_line: String,
28    /// The stack traces.
29    pub stack_traces: Option<Vec<PySpyStackTrace>>,
30    /// The error, if any.
31    pub error: Option<String>,
32}
33
34/// A stack trace from PySpy.
35/// Wrapper is needed to have our own derives.
36#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
37pub struct PySpyStackTrace {
38    /// The process id than generated this stack trace
39    pub pid: i32,
40    /// The python thread id for this stack trace
41    pub thread_id: u64,
42    /// The python thread name for this stack trace
43    pub thread_name: Option<String>,
44    /// The OS thread id for this stack tracee
45    pub os_thread_id: Option<u64>,
46    /// Whether or not the thread was active
47    pub active: bool,
48    /// Whether or not the thread held the GIL
49    pub owns_gil: bool,
50    /// The frames
51    pub frames: Vec<PySpyFrame>,
52}
53
54/// A frame from PySpy.
55/// Wrapper is needed to have our own derives.
56#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
57pub struct PySpyFrame {
58    /// The function name
59    pub name: String,
60    /// The full filename of the file
61    pub filename: String,
62    /// The module/shared library the
63    pub module: Option<String>,
64    /// A short, more readable, representation of the filename
65    pub short_filename: Option<String>,
66    /// The line number inside the file (or 0 for native frames without line information)
67    pub line: i32,
68    /// Local Variables associated with the frame
69    pub locals: Option<Vec<PySpyLocalVariable>>,
70    /// If this is an entry frame. Each entry frame corresponds to one native frame.
71    pub is_entry: bool,
72}
73
74/// A frame local variable.
75#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
76pub struct PySpyLocalVariable {
77    /// Variable name.
78    pub name: String,
79    /// Variable address.
80    pub addr: usize,
81    /// Whether or not the variable is an argument.
82    pub arg: bool,
83    /// Variable representation.
84    pub repr: Option<String>,
85}
86
87impl From<StackTrace> for PySpyStackTrace {
88    fn from(stack_trace: StackTrace) -> Self {
89        Self {
90            pid: stack_trace.pid,
91            thread_id: stack_trace.thread_id,
92            thread_name: stack_trace.thread_name,
93            os_thread_id: stack_trace.os_thread_id,
94            active: stack_trace.active,
95            owns_gil: stack_trace.owns_gil,
96            frames: stack_trace
97                .frames
98                .into_iter()
99                .map(|frame| frame.into())
100                .collect(),
101        }
102    }
103}
104
105impl From<Frame> for PySpyFrame {
106    fn from(frame: Frame) -> Self {
107        Self {
108            name: frame.name,
109            filename: frame.filename,
110            module: frame.module,
111            short_filename: frame.short_filename,
112            line: frame.line,
113            locals: frame
114                .locals
115                .map(|locals| locals.into_iter().map(|local| local.into()).collect()),
116            is_entry: frame.is_entry,
117        }
118    }
119}
120
121impl From<LocalVariable> for PySpyLocalVariable {
122    fn from(local_variable: LocalVariable) -> Self {
123        Self {
124            name: local_variable.name,
125            addr: local_variable.addr,
126            arg: local_variable.arg,
127            repr: local_variable.repr,
128        }
129    }
130}
131
132/// Run py-spy and return the stack trace. Py-spy is run has a subprocess
133/// to avoid any bad side effects specially when running in non-blocking mode
134/// which risks segfaulting py-spy process.
135pub async fn py_spy(
136    pid: i32,
137    native: bool,
138    native_all: bool,
139    blocking: bool,
140) -> Result<PySpyTrace, anyhow::Error> {
141    // Unfortunately py-spy exec doesn't produce process information when output is json.
142    // We need to collect them ourselves.
143    let process =
144        remoteprocess::Process::new(pid).context(format!("failed to open process {}", pid))?;
145
146    let command_line = process.cmdline()?.join(" ");
147    match run_py_spy(pid, native, native_all, blocking).await {
148        Ok(stack_traces) => Ok(PySpyTrace {
149            pid,
150            command_line,
151            stack_traces: Some(stack_traces),
152            error: None,
153        }),
154        Err(e) => Ok(PySpyTrace {
155            pid,
156            command_line,
157            stack_traces: None,
158            error: Some(e.to_string()),
159        }),
160    }
161}
162
163async fn run_py_spy(
164    pid: i32,
165    native: bool,
166    native_all: bool,
167    blocking: bool,
168) -> Result<Vec<PySpyStackTrace>, anyhow::Error> {
169    let pid_str = pid.to_string();
170    let mut args = vec!["dump", "--pid", &pid_str, "--json"];
171    if native {
172        args.push("--native");
173    }
174    if native_all {
175        args.push("--native-all");
176    }
177    if !blocking {
178        args.push("--nonblocking");
179    }
180
181    let pyspy_bin = std::env::var("PYSPY_BIN").unwrap_or("py-spy".to_string());
182    tracing::info!("running {} {}", pyspy_bin, args.join(" "));
183
184    // In some situations when py-spy is run in non-blocking mode, it can segfault due
185    // to race condition accessing the process memory which can mutate in the process.
186    // Nothing much to do but retry a few times.
187    async fn spy_call(pyspy_bin: String, args: Vec<&str>) -> Result<String, anyhow::Error> {
188        let retries = 3;
189        for _x in 0..retries {
190            let child = Command::new(pyspy_bin.clone())
191                .stdout(Stdio::piped())
192                .stderr(Stdio::piped())
193                .stdin(Stdio::null())
194                .args(args.clone())
195                .spawn()
196                .context("failed to spawn py-spy process")?;
197            let result = child
198                .wait_with_output()
199                .await
200                .context("failed to run py-spy process");
201            match result {
202                Ok(output) if output.status.success() => {
203                    let stdout = str::from_utf8(&output.stdout)
204                        .context("failed to get py-spy output as utf8")?;
205                    return Ok(stdout.to_string());
206                }
207                _ => {}
208            }
209
210            #[allow(clippy::disallowed_methods)]
211            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
212        }
213        anyhow::bail!("failed to run py-spy after {} retries", retries);
214    }
215
216    let stdout = spy_call(pyspy_bin, args).await?;
217    let stack_trace: Vec<PySpyStackTrace> =
218        serde_json::from_str(&stdout).context("failed to parse py-spy json output")?;
219
220    Ok(stack_trace)
221}