1use async_trait::async_trait;
14use hyperactor::Actor;
15use hyperactor::Context;
16use hyperactor::Endpoint as _;
17use hyperactor::HandleClient;
18use hyperactor::Handler;
19use hyperactor::OncePortRef;
20use hyperactor::RefClient;
21use serde::Deserialize;
22use serde::Serialize;
23use typeuri::Named;
24
25use crate::config::MESH_ADMIN_PYSPY_TIMEOUT;
26use crate::config::PYSPY_BIN;
27
28#[derive(
32 Debug,
33 Clone,
34 PartialEq,
35 Serialize,
36 Deserialize,
37 Named,
38 schemars::JsonSchema
39)]
40pub enum PySpyResult {
41 Ok {
43 pid: u32,
45 binary: String,
47 stack_traces: Vec<PySpyStackTrace>,
49 warnings: Vec<String>,
53 },
54 BinaryNotFound {
56 searched: Vec<String>,
58 },
59 Failed {
61 pid: u32,
63 binary: String,
65 exit_code: Option<i32>,
67 stderr: String,
69 },
70}
71wirevalue::register_type!(PySpyResult);
72
73#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
75pub struct PySpyStackTrace {
76 pub pid: i32,
78 pub thread_id: u64,
80 pub thread_name: Option<String>,
82 pub os_thread_id: Option<u64>,
84 pub active: bool,
86 pub owns_gil: bool,
88 pub frames: Vec<PySpyFrame>,
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
94pub struct PySpyFrame {
95 pub name: String,
97 pub filename: String,
99 pub module: Option<String>,
101 pub short_filename: Option<String>,
103 pub line: i32,
105 pub locals: Option<Vec<PySpyLocalVariable>>,
107 pub is_entry: bool,
109}
110
111#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
113pub struct PySpyLocalVariable {
114 pub name: String,
116 pub addr: usize,
118 pub arg: bool,
120 pub repr: Option<String>,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct PySpyOpts {
127 pub threads: bool,
129 pub native: bool,
131 pub native_all: bool,
134 pub nonblocking: bool,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize, schemars::JsonSchema)]
146pub struct PySpyProfileOpts {
147 #[schemars(range(min = 1))]
151 pub duration_s: u32,
152 #[schemars(range(min = 1, max = 1000))]
154 pub rate_hz: u32,
155 pub native: bool,
157 pub threads: bool,
159 pub nonblocking: bool,
161}
162
163#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
165pub(crate) struct ProfileDurationSecs(std::num::NonZeroU32);
166
167impl ProfileDurationSecs {
168 pub fn get(self) -> u32 {
169 self.0.get()
170 }
171}
172
173#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
175pub(crate) struct SampleRateHz(std::num::NonZeroU32);
176
177impl SampleRateHz {
178 pub fn get(self) -> u32 {
179 self.0.get()
180 }
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
186pub(crate) struct ValidatedProfileRequest {
187 duration: ProfileDurationSecs,
189 rate: SampleRateHz,
191 native: bool,
193 threads: bool,
195 nonblocking: bool,
197 subprocess_timeout: std::time::Duration,
199 bridge_timeout: std::time::Duration,
201}
202
203impl ValidatedProfileRequest {
204 pub fn duration(&self) -> ProfileDurationSecs {
205 self.duration
206 }
207 pub fn rate(&self) -> SampleRateHz {
208 self.rate
209 }
210 pub fn native(&self) -> bool {
211 self.native
212 }
213 pub fn threads(&self) -> bool {
214 self.threads
215 }
216 pub fn nonblocking(&self) -> bool {
217 self.nonblocking
218 }
219 pub fn subprocess_timeout(&self) -> std::time::Duration {
220 self.subprocess_timeout
221 }
222 pub fn bridge_timeout(&self) -> std::time::Duration {
223 self.bridge_timeout
224 }
225
226 pub fn try_new(
227 opts: &PySpyProfileOpts,
228 max_duration: std::time::Duration,
229 ) -> Result<Self, String> {
230 let duration = std::num::NonZeroU32::new(opts.duration_s)
231 .map(ProfileDurationSecs)
232 .ok_or_else(|| "duration_s must be positive".to_string())?;
233 if std::time::Duration::from_secs(u64::from(duration.get())) > max_duration {
234 return Err(format!(
235 "duration_s {}s exceeds max {}s",
236 duration.get(),
237 max_duration.as_secs()
238 ));
239 }
240 let rate = std::num::NonZeroU32::new(opts.rate_hz)
241 .filter(|n| n.get() <= 1000)
242 .map(SampleRateHz)
243 .ok_or_else(|| format!("rate_hz must be 1..=1000, got {}", opts.rate_hz))?;
244 let subprocess_timeout = std::time::Duration::from_secs(u64::from(duration.get()) + 15);
245 let bridge_timeout = subprocess_timeout + std::time::Duration::from_secs(5);
246 Ok(Self {
247 duration,
248 rate,
249 native: opts.native,
250 threads: opts.threads,
251 nonblocking: opts.nonblocking,
252 subprocess_timeout,
253 bridge_timeout,
254 })
255 }
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize, Named)]
262pub enum PySpyProfileResult {
263 Ok {
264 pid: u32,
265 binary: String,
266 svg: Vec<u8>,
267 },
268 BinaryNotFound {
269 searched: Vec<String>,
270 },
271 TimedOut {
272 pid: u32,
273 binary: String,
274 timeout_s: u64,
275 stderr: String,
276 },
277 ExitFailure {
278 pid: u32,
279 binary: String,
280 exit_code: Option<i32>,
281 stderr: String,
282 },
283 OutputMissing {
284 pid: u32,
285 binary: String,
286 },
287 OutputEmpty {
288 pid: u32,
289 binary: String,
290 },
291 OutputReadFailure {
292 pid: u32,
293 binary: String,
294 error: String,
295 },
296 WorkerSpawnFailure {
297 error: String,
298 },
299 SubprocessSpawnFailure {
300 pid: u32,
301 binary: String,
302 error: String,
303 },
304 WaitFailure {
305 pid: u32,
306 binary: String,
307 error: String,
308 },
309 TempDirFailure {
310 pid: u32,
311 binary: String,
312 error: String,
313 },
314}
315wirevalue::register_type!(PySpyProfileResult);
316
317#[derive(Debug)]
320pub(crate) enum ProfileExecOutcome {
321 Ok {
322 pid: u32,
323 binary: String,
324 svg: Vec<u8>,
325 },
326 BinaryNotFound {
327 searched: Vec<String>,
328 },
329 TimedOut {
330 pid: u32,
331 binary: String,
332 timeout: std::time::Duration,
333 stderr: String,
334 },
335 ExitFailure {
336 pid: u32,
337 binary: String,
338 exit_code: Option<i32>,
339 stderr: String,
340 },
341 OutputMissing {
342 pid: u32,
343 binary: String,
344 },
345 OutputEmpty {
346 pid: u32,
347 binary: String,
348 },
349 OutputReadFailure {
350 pid: u32,
351 binary: String,
352 error: String,
353 },
354 WorkerSpawnFailure {
355 error: String,
356 },
357 SubprocessSpawnFailure {
358 pid: u32,
359 binary: String,
360 error: String,
361 },
362 WaitFailure {
363 pid: u32,
364 binary: String,
365 error: String,
366 },
367 TempDirFailure {
368 pid: u32,
369 binary: String,
370 error: String,
371 },
372}
373
374impl From<ProfileExecOutcome> for PySpyProfileResult {
375 fn from(outcome: ProfileExecOutcome) -> Self {
376 match outcome {
377 ProfileExecOutcome::Ok { pid, binary, svg } => {
378 PySpyProfileResult::Ok { pid, binary, svg }
379 }
380 ProfileExecOutcome::BinaryNotFound { searched } => {
381 PySpyProfileResult::BinaryNotFound { searched }
382 }
383 ProfileExecOutcome::TimedOut {
384 pid,
385 binary,
386 timeout,
387 stderr,
388 } => PySpyProfileResult::TimedOut {
389 pid,
390 binary,
391 timeout_s: timeout.as_secs(),
392 stderr,
393 },
394 ProfileExecOutcome::ExitFailure {
395 pid,
396 binary,
397 exit_code,
398 stderr,
399 } => PySpyProfileResult::ExitFailure {
400 pid,
401 binary,
402 exit_code,
403 stderr,
404 },
405 ProfileExecOutcome::OutputMissing { pid, binary } => {
406 PySpyProfileResult::OutputMissing { pid, binary }
407 }
408 ProfileExecOutcome::OutputEmpty { pid, binary } => {
409 PySpyProfileResult::OutputEmpty { pid, binary }
410 }
411 ProfileExecOutcome::OutputReadFailure { pid, binary, error } => {
412 PySpyProfileResult::OutputReadFailure { pid, binary, error }
413 }
414 ProfileExecOutcome::WorkerSpawnFailure { error } => {
415 PySpyProfileResult::WorkerSpawnFailure { error }
416 }
417 ProfileExecOutcome::SubprocessSpawnFailure { pid, binary, error } => {
418 PySpyProfileResult::SubprocessSpawnFailure { pid, binary, error }
419 }
420 ProfileExecOutcome::WaitFailure { pid, binary, error } => {
421 PySpyProfileResult::WaitFailure { pid, binary, error }
422 }
423 ProfileExecOutcome::TempDirFailure { pid, binary, error } => {
424 PySpyProfileResult::TempDirFailure { pid, binary, error }
425 }
426 }
427 }
428}
429
430#[derive(Debug, Serialize, Deserialize, Named, Handler, HandleClient, RefClient)]
438pub struct PySpyDump {
439 pub opts: PySpyOpts,
441 #[reply]
443 pub result: OncePortRef<PySpyResult>,
444}
445wirevalue::register_type!(PySpyDump);
446
447#[allow(private_interfaces)] #[derive(Debug, Serialize, Deserialize, Named, Handler, HandleClient, RefClient)]
455pub struct PySpyProfile {
456 pub request: ValidatedProfileRequest,
458 #[reply]
460 pub result: OncePortRef<PySpyProfileResult>,
461}
462wirevalue::register_type!(PySpyProfile);
463
464pub struct PySpyRunner;
468
469impl PySpyRunner {
470 pub async fn dump_self(&self, opts: &PySpyOpts) -> PySpyResult {
480 let pid = std::process::id();
481 let pyspy_bin: String = hyperactor_config::global::get_cloned(PYSPY_BIN);
482 let candidates = resolve_candidates(if pyspy_bin.is_empty() {
483 None
484 } else {
485 Some(pyspy_bin)
486 });
487 let mut searched = vec![];
488
489 for (binary, label) in &candidates {
490 searched.push(label.clone());
491 if let Some(result) = try_exec(
492 binary,
493 pid,
494 opts,
495 hyperactor_config::global::get(MESH_ADMIN_PYSPY_TIMEOUT),
496 )
497 .await
498 {
499 return result;
500 }
501 }
502
503 PySpyResult::BinaryNotFound { searched }
504 }
505
506 pub(crate) async fn profile_self(
509 &self,
510 request: &ValidatedProfileRequest,
511 ) -> ProfileExecOutcome {
512 let pid = std::process::id();
513 let pyspy_bin: String = hyperactor_config::global::get_cloned(PYSPY_BIN);
514 let candidates = resolve_candidates(if pyspy_bin.is_empty() {
515 None
516 } else {
517 Some(pyspy_bin)
518 });
519 let mut searched = vec![];
520
521 for (binary, label) in &candidates {
522 searched.push(label.clone());
523 if let Some(result) = try_profile(binary, pid, request).await {
524 return result;
525 }
526 }
527
528 ProfileExecOutcome::BinaryNotFound { searched }
529 }
530}
531
532#[derive(Debug, Serialize, Deserialize, Named)]
536pub struct RunPySpyDump {
537 pub opts: PySpyOpts,
538 pub reply_port: hyperactor::OncePortRef<PySpyResult>,
540}
541wirevalue::register_type!(RunPySpyDump);
542
543#[hyperactor::export(handlers = [RunPySpyDump])]
548pub struct PySpyWorker;
549
550impl Actor for PySpyWorker {}
551
552impl PySpyWorker {
553 pub(crate) fn spawn_and_forward(
557 cx: &impl hyperactor::context::Actor,
558 opts: PySpyOpts,
559 reply_port: hyperactor::OncePortRef<PySpyResult>,
560 ) -> Result<(), anyhow::Error> {
561 let worker = match Self.spawn(cx) {
562 Ok(handle) => handle,
563 Err(e) => {
564 let fail = PySpyResult::Failed {
565 pid: std::process::id(),
566 binary: String::new(),
567 exit_code: None,
568 stderr: format!("failed to spawn pyspy worker: {}", e),
569 };
570 reply_port.post(cx, fail);
571 return Ok(());
572 }
573 };
574 worker.post(cx, RunPySpyDump { opts, reply_port });
575 Ok(())
576 }
577}
578
579#[async_trait]
580impl Handler<RunPySpyDump> for PySpyWorker {
581 async fn handle(
582 &mut self,
583 cx: &Context<Self>,
584 message: RunPySpyDump,
585 ) -> Result<(), anyhow::Error> {
586 let result = PySpyRunner.dump_self(&message.opts).await;
587 message.reply_port.post(cx, result);
588 cx.stop("pyspy dump complete")?;
589 Ok(())
590 }
591}
592
593#[allow(private_interfaces)] #[derive(Debug, Serialize, Deserialize, Named)]
596pub struct RunPySpyProfile {
597 pub request: ValidatedProfileRequest,
598 pub reply_port: hyperactor::OncePortRef<PySpyProfileResult>,
599}
600wirevalue::register_type!(RunPySpyProfile);
601
602#[hyperactor::export(handlers = [RunPySpyProfile])]
605pub struct PySpyProfileWorker;
606
607impl Actor for PySpyProfileWorker {}
608
609impl PySpyProfileWorker {
610 pub(crate) fn spawn_and_forward(
613 cx: &impl hyperactor::context::Actor,
614 request: ValidatedProfileRequest,
615 reply_port: hyperactor::OncePortRef<PySpyProfileResult>,
616 ) -> Result<(), anyhow::Error> {
617 let worker = match Self.spawn(cx) {
618 Ok(handle) => handle,
619 Err(e) => {
620 let fail = ProfileExecOutcome::WorkerSpawnFailure {
621 error: e.to_string(),
622 };
623 reply_port.post(cx, PySpyProfileResult::from(fail));
624 return Ok(());
625 }
626 };
627 worker.post(
628 cx,
629 RunPySpyProfile {
630 request,
631 reply_port,
632 },
633 );
634 Ok(())
635 }
636}
637
638#[async_trait]
639impl Handler<RunPySpyProfile> for PySpyProfileWorker {
640 async fn handle(
641 &mut self,
642 cx: &Context<Self>,
643 message: RunPySpyProfile,
644 ) -> Result<(), anyhow::Error> {
645 let outcome = PySpyRunner.profile_self(&message.request).await;
646 message
647 .reply_port
648 .post(cx, PySpyProfileResult::from(outcome));
649 cx.stop("pyspy profile complete")?;
650 Ok(())
651 }
652}
653
654fn resolve_candidates(pyspy_bin_env: Option<String>) -> Vec<(String, String)> {
657 let mut candidates = vec![];
658 if let Some(path) = pyspy_bin_env
659 && !path.is_empty()
660 {
661 let label = format!("PYSPY_BIN={}", path);
662 candidates.push((path, label));
663 }
664 candidates.push(("py-spy".to_string(), "py-spy on PATH".to_string()));
665 candidates
666}
667
668fn build_command(binary: &str, pid: u32, opts: &PySpyOpts) -> tokio::process::Command {
670 let mut cmd = tokio::process::Command::new(binary);
671 cmd.arg("dump")
672 .arg("--pid")
673 .arg(pid.to_string())
674 .arg("--json");
675 if opts.threads {
676 cmd.arg("--threads");
677 }
678 if opts.native {
679 cmd.arg("--native");
680 }
681 if opts.native_all {
682 cmd.arg("--native-all");
683 }
684 if opts.nonblocking {
685 cmd.arg("--nonblocking");
686 }
687 cmd.stdout(std::process::Stdio::piped());
688 cmd.stderr(std::process::Stdio::piped());
689 cmd
690}
691
692fn map_output(output: std::process::Output, pid: u32, binary: &str) -> PySpyResult {
696 if output.status.success() {
697 match serde_json::from_slice::<Vec<PySpyStackTrace>>(&output.stdout) {
698 Ok(stack_traces) => PySpyResult::Ok {
699 pid,
700 binary: binary.to_string(),
701 stack_traces,
702 warnings: vec![],
703 },
704 Err(e) => PySpyResult::Failed {
705 pid,
706 binary: binary.to_string(),
707 exit_code: None,
708 stderr: format!("failed to parse py-spy JSON output: {}", e),
709 },
710 }
711 } else {
712 let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
713 PySpyResult::Failed {
714 pid,
715 binary: binary.to_string(),
716 exit_code: output.status.code(),
717 stderr,
718 }
719 }
720}
721
722fn is_unsupported_native_all(result: &PySpyResult) -> bool {
726 matches!(
727 result,
728 PySpyResult::Failed {
729 exit_code: Some(2),
730 stderr,
731 ..
732 } if stderr.contains("--native-all")
733 )
734}
735
736enum ExecOnce {
738 Result(PySpyResult),
740 NotFound,
742}
743
744async fn exec_once(
749 binary: &str,
750 pid: u32,
751 opts: &PySpyOpts,
752 deadline: tokio::time::Instant,
753 timeout: std::time::Duration,
754) -> ExecOnce {
755 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
756 if remaining.is_zero() {
757 return ExecOnce::Result(PySpyResult::Failed {
758 pid,
759 binary: binary.to_string(),
760 exit_code: None,
761 stderr: format!("py-spy subprocess timed out after {}s", timeout.as_secs()),
762 });
763 }
764 let child = match build_command(binary, pid, opts).spawn() {
765 Ok(child) => child,
766 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return ExecOnce::NotFound,
767 Err(e) => {
768 return ExecOnce::Result(PySpyResult::Failed {
769 pid,
770 binary: binary.to_string(),
771 exit_code: None,
772 stderr: format!("failed to execute: {}", e),
773 });
774 }
775 };
776 ExecOnce::Result(collect_with_timeout(child, pid, binary, remaining).await)
777}
778
779async fn try_exec(
793 binary: &str,
794 pid: u32,
795 opts: &PySpyOpts,
796 timeout: std::time::Duration,
797) -> Option<PySpyResult> {
798 let deadline = tokio::time::Instant::now() + timeout;
799 let retries = if opts.nonblocking { 3 } else { 1 };
800 let mut last_result = None;
801 let mut effective_opts = opts.clone();
802
803 for attempt in 0..retries {
804 if attempt > 0 {
805 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
806 }
807 let mut result = match exec_once(binary, pid, &effective_opts, deadline, timeout).await {
808 ExecOnce::NotFound => return None,
809 ExecOnce::Result(r) => r,
810 };
811 if is_unsupported_native_all(&result) && effective_opts.native_all {
815 effective_opts.native_all = false;
818 result = match exec_once(binary, pid, &effective_opts, deadline, timeout).await {
819 ExecOnce::NotFound => return None,
820 ExecOnce::Result(r) => r,
821 };
822 if let PySpyResult::Ok { warnings, .. } = &mut result {
824 warnings.push(
825 "--native-all unsupported by this py-spy; fell back to --native".to_string(),
826 );
827 }
828 }
831 match &result {
832 PySpyResult::Ok { .. } => return Some(result),
833 _ => {
834 last_result = Some(result);
835 }
836 }
837 }
838
839 last_result
840}
841
842async fn collect_with_timeout(
851 mut child: tokio::process::Child,
852 pid: u32,
853 binary: &str,
854 timeout: std::time::Duration,
855) -> PySpyResult {
856 let mut stdout_handle = child.stdout.take();
857 let mut stderr_handle = child.stderr.take();
858
859 let collect = async {
860 let stdout_fut = async {
861 let mut buf = Vec::new();
862 if let Some(ref mut r) = stdout_handle {
863 let _ = tokio::io::AsyncReadExt::read_to_end(r, &mut buf).await;
864 }
865 buf
866 };
867 let stderr_fut = async {
868 let mut buf = Vec::new();
869 if let Some(ref mut r) = stderr_handle {
870 let _ = tokio::io::AsyncReadExt::read_to_end(r, &mut buf).await;
871 }
872 buf
873 };
874 let (stdout_bytes, stderr_bytes, status) =
875 tokio::join!(stdout_fut, stderr_fut, child.wait());
876 (stdout_bytes, stderr_bytes, status)
877 };
878
879 match tokio::time::timeout(timeout, collect).await {
880 Ok((stdout_bytes, stderr_bytes, Ok(status))) => {
881 let output = std::process::Output {
882 status,
883 stdout: stdout_bytes,
884 stderr: stderr_bytes,
885 };
886 map_output(output, pid, binary)
887 }
888 Ok((_, _, Err(e))) => PySpyResult::Failed {
889 pid,
890 binary: binary.to_string(),
891 exit_code: None,
892 stderr: format!("failed to wait for child: {}", e),
893 },
894 Err(_) => {
895 let _ = child.start_kill();
897 let _ = child.wait().await;
898 PySpyResult::Failed {
899 pid,
900 binary: binary.to_string(),
901 exit_code: None,
902 stderr: format!("py-spy subprocess timed out after {}s", timeout.as_secs()),
903 }
904 }
905 }
906}
907
908fn build_record_command(
910 binary: &str,
911 pid: u32,
912 request: &ValidatedProfileRequest,
913 output_path: &std::path::Path,
914) -> tokio::process::Command {
915 let mut cmd = tokio::process::Command::new(binary);
916 cmd.arg("record")
917 .arg("--pid")
918 .arg(pid.to_string())
919 .arg("--duration")
920 .arg(request.duration().get().to_string())
921 .arg("--rate")
922 .arg(request.rate().get().to_string())
923 .arg("--format")
924 .arg("flamegraph")
925 .arg("--output")
926 .arg(output_path);
927 if request.native() {
928 cmd.arg("--native");
929 }
930 if request.threads() {
931 cmd.arg("--threads");
932 }
933 if request.nonblocking() {
934 cmd.arg("--nonblocking");
935 }
936 cmd.stdout(std::process::Stdio::null());
939 cmd.stderr(std::process::Stdio::piped());
940 cmd
941}
942
943async fn collect_profile_with_timeout(
946 mut child: tokio::process::Child,
947 pid: u32,
948 binary: &str,
949 timeout: std::time::Duration,
950) -> Result<(std::process::ExitStatus, String), ProfileExecOutcome> {
951 let stderr_handle = child.stderr.take();
955 let stderr_task = tokio::spawn(async move {
956 let mut buf = Vec::new();
957 if let Some(mut r) = stderr_handle {
958 let _ = tokio::io::AsyncReadExt::read_to_end(&mut r, &mut buf).await;
959 }
960 buf
961 });
962
963 match tokio::time::timeout(timeout, child.wait()).await {
964 Ok(Ok(status)) => {
965 let stderr_bytes = stderr_task.await.unwrap_or_default();
966 let stderr = String::from_utf8_lossy(&stderr_bytes).into_owned();
967 Ok((status, stderr))
968 }
969 Ok(Err(e)) => {
970 stderr_task.abort();
971 Err(ProfileExecOutcome::WaitFailure {
972 pid,
973 binary: binary.to_string(),
974 error: e.to_string(),
975 })
976 }
977 Err(_) => {
978 let _ = child.start_kill();
980 let _ = child.wait().await;
981 let stderr_bytes = stderr_task.await.unwrap_or_default();
982 let stderr = String::from_utf8_lossy(&stderr_bytes).into_owned();
983 Err(ProfileExecOutcome::TimedOut {
984 pid,
985 binary: binary.to_string(),
986 timeout,
987 stderr,
988 })
989 }
990 }
991}
992
993async fn try_profile(
996 binary: &str,
997 pid: u32,
998 request: &ValidatedProfileRequest,
999) -> Option<ProfileExecOutcome> {
1000 let timeout = request.subprocess_timeout();
1001 let tmp_dir = match tempfile::tempdir() {
1002 Ok(d) => d,
1003 Err(e) => {
1004 return Some(ProfileExecOutcome::TempDirFailure {
1005 pid,
1006 binary: binary.to_string(),
1007 error: e.to_string(),
1008 });
1009 }
1010 };
1011 let svg_path = tmp_dir.path().join("profile.svg");
1012
1013 let child = match build_record_command(binary, pid, request, &svg_path).spawn() {
1014 Ok(c) => c,
1015 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return None,
1016 Err(e) => {
1017 return Some(ProfileExecOutcome::SubprocessSpawnFailure {
1018 pid,
1019 binary: binary.to_string(),
1020 error: e.to_string(),
1021 });
1022 }
1023 };
1024
1025 let (status, stderr) = match collect_profile_with_timeout(child, pid, binary, timeout).await {
1026 Ok(pair) => pair,
1027 Err(outcome) => return Some(outcome),
1028 };
1029
1030 if !status.success() {
1031 return Some(ProfileExecOutcome::ExitFailure {
1032 pid,
1033 binary: binary.to_string(),
1034 exit_code: status.code(),
1035 stderr,
1036 });
1037 }
1038
1039 match std::fs::read(&svg_path) {
1040 Ok(bytes) if bytes.is_empty() => Some(ProfileExecOutcome::OutputEmpty {
1041 pid,
1042 binary: binary.to_string(),
1043 }),
1044 Ok(svg) => Some(ProfileExecOutcome::Ok {
1045 pid,
1046 binary: binary.to_string(),
1047 svg,
1048 }),
1049 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1050 Some(ProfileExecOutcome::OutputMissing {
1051 pid,
1052 binary: binary.to_string(),
1053 })
1054 }
1055 Err(e) => Some(ProfileExecOutcome::OutputReadFailure {
1056 pid,
1057 binary: binary.to_string(),
1058 error: e.to_string(),
1059 }),
1060 }
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065 use std::io::Write;
1066 use std::os::unix::fs::PermissionsExt;
1067 use std::os::unix::process::ExitStatusExt;
1068 use std::time::Duration;
1069
1070 use tokio::process::Command;
1071
1072 use super::*;
1073
1074 #[test]
1075 fn pyspy_result_wirevalue_roundtrip() {
1076 let original = PySpyResult::Ok {
1081 pid: 42,
1082 binary: "py-spy".to_string(),
1083 stack_traces: vec![PySpyStackTrace {
1084 pid: 42,
1085 thread_id: 1,
1086 thread_name: Some("main".to_string()),
1087 os_thread_id: Some(100),
1088 active: true,
1089 owns_gil: true,
1090 frames: vec![PySpyFrame {
1091 name: "do_work".to_string(),
1092 filename: "test.py".to_string(),
1093 module: None,
1094 short_filename: None,
1095 line: 10,
1096 locals: None,
1097 is_entry: false,
1098 }],
1099 }],
1100 warnings: vec![],
1101 };
1102 let any = wirevalue::Any::serialize(&original).expect("serialize");
1103 let restored: PySpyResult = any.deserialized().expect("deserialize");
1104 assert_eq!(original, restored);
1105 }
1106
1107 #[test]
1108 fn candidates_no_env() {
1109 let candidates = resolve_candidates(None);
1111 assert_eq!(candidates.len(), 1);
1112 assert_eq!(candidates[0].0, "py-spy");
1113 assert_eq!(candidates[0].1, "py-spy on PATH");
1114 }
1115
1116 #[test]
1117 fn candidates_env_first_then_path() {
1118 let candidates = resolve_candidates(Some("/custom/py-spy".to_string()));
1120 assert_eq!(candidates.len(), 2);
1121 assert_eq!(candidates[0].0, "/custom/py-spy");
1122 assert!(candidates[0].1.contains("PYSPY_BIN=/custom/py-spy"));
1123 assert_eq!(candidates[1].0, "py-spy");
1124 }
1125
1126 #[test]
1127 fn candidates_empty_env_ignored() {
1128 let candidates = resolve_candidates(Some(String::new()));
1130 assert_eq!(candidates.len(), 1);
1131 assert_eq!(candidates[0].0, "py-spy");
1132 }
1133
1134 #[test]
1135 fn output_success_parses_json() {
1136 let json = serde_json::json!([{
1138 "pid": 42,
1139 "thread_id": 1234,
1140 "thread_name": "MainThread",
1141 "os_thread_id": 5678,
1142 "active": true,
1143 "owns_gil": true,
1144 "frames": [{
1145 "name": "do_work",
1146 "filename": "foo.py",
1147 "module": null,
1148 "short_filename": null,
1149 "line": 10,
1150 "locals": null,
1151 "is_entry": false
1152 }]
1153 }]);
1154 let output = std::process::Output {
1155 status: std::process::ExitStatus::default(),
1156 stdout: serde_json::to_vec(&json).unwrap(),
1157 stderr: vec![],
1158 };
1159 let result = map_output(output, 42, "/usr/bin/py-spy");
1160 match result {
1161 PySpyResult::Ok {
1162 pid,
1163 binary,
1164 stack_traces,
1165 ..
1166 } => {
1167 assert_eq!(pid, 42);
1168 assert_eq!(binary, "/usr/bin/py-spy");
1169 assert_eq!(stack_traces.len(), 1);
1170 assert_eq!(stack_traces[0].thread_id, 1234);
1171 assert_eq!(stack_traces[0].thread_name.as_deref(), Some("MainThread"));
1172 assert!(stack_traces[0].owns_gil);
1173 assert_eq!(stack_traces[0].frames.len(), 1);
1174 assert_eq!(stack_traces[0].frames[0].name, "do_work");
1175 assert_eq!(stack_traces[0].frames[0].filename, "foo.py");
1176 assert_eq!(stack_traces[0].frames[0].line, 10);
1177 }
1178 other => panic!("expected Ok, got {:?}", other),
1179 }
1180 }
1181
1182 #[test]
1183 fn output_invalid_json_maps_to_failed() {
1184 let output = std::process::Output {
1186 status: std::process::ExitStatus::default(),
1187 stdout: b"not valid json".to_vec(),
1188 stderr: vec![],
1189 };
1190 let result = map_output(output, 42, "py-spy");
1191 match result {
1192 PySpyResult::Failed { pid, stderr, .. } => {
1193 assert_eq!(pid, 42);
1194 assert!(
1195 stderr.contains("failed to parse py-spy JSON output"),
1196 "unexpected stderr: {stderr}"
1197 );
1198 }
1199 other => panic!("expected Failed, got {:?}", other),
1200 }
1201 }
1202
1203 #[test]
1204 fn output_nonzero_exit_maps_to_failed() {
1205 let status = std::process::ExitStatus::from_raw(256); let output = std::process::Output {
1208 status,
1209 stdout: vec![],
1210 stderr: b"Permission denied".to_vec(),
1211 };
1212 let result = map_output(output, 99, "py-spy");
1213 match result {
1214 PySpyResult::Failed {
1215 pid,
1216 binary,
1217 exit_code,
1218 stderr,
1219 } => {
1220 assert_eq!(pid, 99);
1221 assert_eq!(binary, "py-spy");
1222 assert_eq!(exit_code, Some(1));
1223 assert_eq!(stderr, "Permission denied");
1224 }
1225 other => panic!("expected Failed, got {:?}", other),
1226 }
1227 }
1228
1229 #[test]
1230 fn output_preserves_caller_pid() {
1231 let json = serde_json::json!([]);
1233 let output = std::process::Output {
1234 status: std::process::ExitStatus::default(),
1235 stdout: serde_json::to_vec(&json).unwrap(),
1236 stderr: vec![],
1237 };
1238 let result = map_output(output, 12345, "bin");
1239 match result {
1240 PySpyResult::Ok { pid, .. } => assert_eq!(pid, 12345),
1241 other => panic!("expected Ok, got {:?}", other),
1242 }
1243 }
1244
1245 fn default_opts() -> PySpyOpts {
1246 PySpyOpts {
1247 threads: false,
1248 native: false,
1249 native_all: false,
1250 nonblocking: false,
1251 }
1252 }
1253
1254 #[tokio::test]
1255 async fn exec_missing_binary_returns_none() {
1256 let result = try_exec(
1258 "/definitely/not/a/real/binary",
1259 1,
1260 &default_opts(),
1261 std::time::Duration::from_secs(5),
1262 )
1263 .await;
1264 assert!(result.is_none());
1265 }
1266
1267 #[tokio::test]
1268 async fn exec_present_binary_returns_some() {
1269 let result = try_exec(
1272 "true",
1273 1,
1274 &default_opts(),
1275 std::time::Duration::from_secs(5),
1276 )
1277 .await;
1278 match result {
1279 Some(PySpyResult::Failed { stderr, .. }) => {
1280 assert!(
1281 stderr.contains("parse"),
1282 "expected JSON parse error, got: {stderr}"
1283 );
1284 }
1285 other => panic!("expected Some(Failed{{parse..}}), got: {other:?}"),
1286 }
1287 }
1288
1289 #[tokio::test]
1290 async fn collect_timeout_kills_child_and_returns_failed() {
1291 let child = Command::new("sleep")
1294 .arg("100")
1295 .stdout(std::process::Stdio::piped())
1296 .stderr(std::process::Stdio::piped())
1297 .spawn()
1298 .expect("sleep must be available");
1299
1300 let result = collect_with_timeout(
1301 child,
1302 std::process::id(),
1303 "sleep",
1304 std::time::Duration::from_millis(100),
1305 )
1306 .await;
1307
1308 match result {
1309 PySpyResult::Failed { stderr, .. } => {
1310 assert!(
1311 stderr.contains("timed out"),
1312 "expected timeout message, got: {stderr}"
1313 );
1314 }
1315 other => panic!("expected Failed, got {:?}", other),
1316 }
1317 }
1318
1319 #[tokio::test]
1320 async fn exec_failing_binary_returns_failed() {
1321 let result = try_exec(
1323 "false",
1324 42,
1325 &default_opts(),
1326 std::time::Duration::from_secs(5),
1327 )
1328 .await;
1329 assert!(result.is_some());
1330 match result.unwrap() {
1331 PySpyResult::Failed {
1332 pid,
1333 binary,
1334 exit_code,
1335 ..
1336 } => {
1337 assert_eq!(pid, 42);
1338 assert_eq!(binary, "false");
1339 assert!(exit_code.is_some());
1340 }
1341 other => panic!("expected Failed, got {:?}", other),
1342 }
1343 }
1344
1345 fn write_fake_pyspy(script_body: &str) -> tempfile::TempPath {
1353 let mut f = tempfile::NamedTempFile::new().expect("create temp file");
1354 write!(f, "#!/bin/sh\n{script_body}").expect("write script");
1355 f.as_file().sync_all().expect("sync");
1356 std::fs::set_permissions(f.path(), std::fs::Permissions::from_mode(0o755))
1357 .expect("chmod +x");
1358 f.into_temp_path()
1359 }
1360
1361 fn read_log(script_path: &std::path::Path) -> Vec<String> {
1364 let log_path = format!("{}.log", script_path.display());
1365 match std::fs::read_to_string(&log_path) {
1366 Ok(contents) => contents.lines().map(String::from).collect(),
1367 Err(_) => vec![],
1368 }
1369 }
1370
1371 #[tokio::test]
1372 async fn native_all_downgrade_succeeds() {
1373 let script = write_fake_pyspy(
1377 r#"
1378echo "$@" >> "$0.log"
1379for arg in "$@"; do
1380 if [ "$arg" = "--native-all" ]; then
1381 echo "unrecognized option --native-all" >&2
1382 exit 2
1383 fi
1384done
1385echo "[]"
1386exit 0
1387"#,
1388 );
1389 let opts = PySpyOpts {
1390 threads: false,
1391 native: true,
1392 native_all: true,
1393 nonblocking: false,
1394 };
1395 let result = try_exec(
1396 script.to_str().unwrap(),
1397 1,
1398 &opts,
1399 std::time::Duration::from_secs(5),
1400 )
1401 .await;
1402 let result = result.expect("expected Some");
1404 match &result {
1405 PySpyResult::Ok { warnings, .. } => {
1406 assert!(
1407 warnings.iter().any(|w| w.contains("fell back to --native")),
1408 "PS-11c: expected fallback warning, got: {warnings:?}"
1409 );
1410 }
1411 other => panic!("expected Ok, got: {other:?}"),
1412 }
1413 let log = read_log(&script);
1415 assert_eq!(
1416 log.len(),
1417 2,
1418 "PS-11b: expected exactly 2 invocations, got {}",
1419 log.len()
1420 );
1421 assert!(
1422 log[0].contains("--native-all"),
1423 "PS-11a: first invocation must include --native-all, got: {}",
1424 log[0]
1425 );
1426 assert!(
1427 !log[1].contains("--native-all"),
1428 "PS-11a: second invocation must NOT include --native-all, got: {}",
1429 log[1]
1430 );
1431 }
1432
1433 #[tokio::test]
1434 async fn native_all_downgrade_fails_retries_continue() {
1435 let script = write_fake_pyspy(
1438 r#"
1439echo "$@" >> "$0.log"
1440for arg in "$@"; do
1441 if [ "$arg" = "--native-all" ]; then
1442 echo "unrecognized option --native-all" >&2
1443 exit 2
1444 fi
1445done
1446echo "Permission denied" >&2
1447exit 1
1448"#,
1449 );
1450 let opts = PySpyOpts {
1451 threads: false,
1452 native: true,
1453 native_all: true,
1454 nonblocking: true, };
1456 let result = try_exec(
1457 script.to_str().unwrap(),
1458 1,
1459 &opts,
1460 std::time::Duration::from_secs(10),
1461 )
1462 .await;
1463 let result = result.expect("expected Some");
1465 match &result {
1466 PySpyResult::Failed {
1467 stderr, exit_code, ..
1468 } => {
1469 assert!(
1470 stderr.contains("Permission denied"),
1471 "PS-11d: expected generic failure, got: {stderr}"
1472 );
1473 assert_eq!(*exit_code, Some(1));
1474 }
1475 other => panic!("expected Failed, got: {other:?}"),
1476 }
1477 let log = read_log(&script);
1482 assert_eq!(log.len(), 4, "expected 4 invocations, got {}", log.len());
1483 assert!(
1484 log[0].contains("--native-all"),
1485 "PS-11a: first invocation must include --native-all, got: {}",
1486 log[0]
1487 );
1488 for (i, line) in log[1..].iter().enumerate() {
1489 assert!(
1490 !line.contains("--native-all"),
1491 "PS-11e: invocation {} must NOT include --native-all, got: {}",
1492 i + 1,
1493 line
1494 );
1495 }
1496 }
1497
1498 #[tokio::test]
1500 async fn profile_collect_timeout_returns_timed_out() {
1501 let child = Command::new("sh")
1502 .arg("-c")
1503 .arg("echo diag >&2; sleep 60")
1504 .stdout(std::process::Stdio::null())
1505 .stderr(std::process::Stdio::piped())
1506 .spawn()
1507 .expect("sh must be available");
1508
1509 let result = collect_profile_with_timeout(
1510 child,
1511 std::process::id(),
1512 "sh",
1513 std::time::Duration::from_millis(200),
1514 )
1515 .await;
1516
1517 match result {
1518 Err(ProfileExecOutcome::TimedOut { stderr, .. }) => {
1519 assert!(
1520 stderr.contains("diag"),
1521 "expected partial stderr captured after kill, got: {stderr}"
1522 );
1523 }
1524 other => panic!("expected TimedOut, got: {other:?}"),
1525 }
1526 }
1527
1528 fn test_request() -> ValidatedProfileRequest {
1529 ValidatedProfileRequest::try_new(
1530 &PySpyProfileOpts {
1531 duration_s: 1,
1532 rate_hz: 100,
1533 native: false,
1534 threads: false,
1535 nonblocking: false,
1536 },
1537 std::time::Duration::from_secs(300),
1538 )
1539 .unwrap()
1540 }
1541
1542 #[tokio::test]
1544 async fn profile_try_missing_binary_returns_none() {
1545 let result = try_profile("/definitely/not/a/real/binary", 1, &test_request()).await;
1546 assert!(result.is_none(), "missing binary must return None");
1547 }
1548
1549 #[tokio::test]
1551 async fn profile_success_exit_empty_file_returns_output_empty() {
1552 let script = write_fake_pyspy(
1553 r#"
1554output=""
1555while [ $# -gt 0 ]; do
1556 case "$1" in
1557 --output) shift; output="$1" ;;
1558 esac
1559 shift
1560done
1561touch "$output"
1562exit 0
1563"#,
1564 );
1565 let result = try_profile(script.to_str().unwrap(), 1, &test_request()).await;
1566 assert!(
1567 matches!(result, Some(ProfileExecOutcome::OutputEmpty { .. })),
1568 "PP-3: expected OutputEmpty, got: {result:?}"
1569 );
1570 }
1571
1572 #[tokio::test]
1574 async fn profile_success_exit_missing_file_returns_output_missing() {
1575 let script = write_fake_pyspy("exit 0\n");
1576 let result = try_profile(script.to_str().unwrap(), 1, &test_request()).await;
1577 assert!(
1578 matches!(result, Some(ProfileExecOutcome::OutputMissing { .. })),
1579 "PP-3: expected OutputMissing, got: {result:?}"
1580 );
1581 }
1582
1583 #[test]
1585 fn validated_request_rejects_zero_duration() {
1586 let opts = PySpyProfileOpts {
1587 duration_s: 0,
1588 rate_hz: 100,
1589 native: false,
1590 threads: false,
1591 nonblocking: false,
1592 };
1593 let err = ValidatedProfileRequest::try_new(&opts, std::time::Duration::from_secs(300));
1594 assert!(err.is_err());
1595 assert!(err.unwrap_err().contains("positive"));
1596 }
1597
1598 #[test]
1600 fn validated_request_rejects_over_max_duration() {
1601 let opts = PySpyProfileOpts {
1602 duration_s: 999,
1603 rate_hz: 100,
1604 native: false,
1605 threads: false,
1606 nonblocking: false,
1607 };
1608 let err = ValidatedProfileRequest::try_new(&opts, std::time::Duration::from_secs(300));
1609 assert!(err.is_err());
1610 assert!(err.unwrap_err().contains("exceeds max"));
1611 }
1612
1613 #[test]
1615 fn validated_request_rejects_zero_rate() {
1616 let opts = PySpyProfileOpts {
1617 duration_s: 5,
1618 rate_hz: 0,
1619 native: false,
1620 threads: false,
1621 nonblocking: false,
1622 };
1623 let err = ValidatedProfileRequest::try_new(&opts, std::time::Duration::from_secs(300));
1624 assert!(err.is_err());
1625 assert!(err.unwrap_err().contains("rate_hz"));
1626 }
1627
1628 #[test]
1630 fn validated_request_rejects_excessive_rate() {
1631 let opts = PySpyProfileOpts {
1632 duration_s: 5,
1633 rate_hz: 9999,
1634 native: false,
1635 threads: false,
1636 nonblocking: false,
1637 };
1638 let err = ValidatedProfileRequest::try_new(&opts, std::time::Duration::from_secs(300));
1639 assert!(err.is_err());
1640 assert!(err.unwrap_err().contains("rate_hz"));
1641 }
1642
1643 #[test]
1645 fn validated_request_computes_exact_timeouts() {
1646 let opts = PySpyProfileOpts {
1647 duration_s: 30,
1648 rate_hz: 100,
1649 native: true,
1650 threads: false,
1651 nonblocking: false,
1652 };
1653 let req =
1654 ValidatedProfileRequest::try_new(&opts, std::time::Duration::from_secs(300)).unwrap();
1655 assert_eq!(req.duration().get(), 30);
1656 assert_eq!(req.rate().get(), 100);
1657 assert!(req.native());
1658 assert_eq!(req.subprocess_timeout(), std::time::Duration::from_secs(45));
1659 assert_eq!(req.bridge_timeout(), std::time::Duration::from_secs(50));
1660 }
1661
1662 #[test]
1664 fn profile_exec_outcome_conversion_is_identity() {
1665 let r = PySpyProfileResult::from(ProfileExecOutcome::Ok {
1667 pid: 1,
1668 binary: "b".into(),
1669 svg: vec![1],
1670 });
1671 assert!(matches!(r, PySpyProfileResult::Ok { pid: 1, .. }));
1672
1673 let r = PySpyProfileResult::from(ProfileExecOutcome::BinaryNotFound {
1674 searched: vec!["x".into()],
1675 });
1676 assert!(matches!(r, PySpyProfileResult::BinaryNotFound { .. }));
1677
1678 let r = PySpyProfileResult::from(ProfileExecOutcome::TimedOut {
1679 pid: 1,
1680 binary: "b".into(),
1681 timeout: Duration::from_secs(10),
1682 stderr: "s".into(),
1683 });
1684 assert!(matches!(
1685 r,
1686 PySpyProfileResult::TimedOut { timeout_s: 10, .. }
1687 ));
1688
1689 let r = PySpyProfileResult::from(ProfileExecOutcome::ExitFailure {
1690 pid: 1,
1691 binary: "b".into(),
1692 exit_code: Some(2),
1693 stderr: "e".into(),
1694 });
1695 assert!(matches!(
1696 r,
1697 PySpyProfileResult::ExitFailure {
1698 exit_code: Some(2),
1699 ..
1700 }
1701 ));
1702
1703 let r = PySpyProfileResult::from(ProfileExecOutcome::OutputMissing {
1704 pid: 1,
1705 binary: "b".into(),
1706 });
1707 assert!(matches!(
1708 r,
1709 PySpyProfileResult::OutputMissing { pid: 1, .. }
1710 ));
1711
1712 let r = PySpyProfileResult::from(ProfileExecOutcome::OutputEmpty {
1713 pid: 1,
1714 binary: "b".into(),
1715 });
1716 assert!(matches!(r, PySpyProfileResult::OutputEmpty { pid: 1, .. }));
1717
1718 let r = PySpyProfileResult::from(ProfileExecOutcome::OutputReadFailure {
1719 pid: 1,
1720 binary: "b".into(),
1721 error: "permission denied".into(),
1722 });
1723 assert!(matches!(r, PySpyProfileResult::OutputReadFailure { .. }));
1724
1725 let r =
1726 PySpyProfileResult::from(ProfileExecOutcome::WorkerSpawnFailure { error: "w".into() });
1727 assert!(matches!(r, PySpyProfileResult::WorkerSpawnFailure { .. }));
1728
1729 let r = PySpyProfileResult::from(ProfileExecOutcome::SubprocessSpawnFailure {
1730 pid: 1,
1731 binary: "b".into(),
1732 error: "s".into(),
1733 });
1734 assert!(matches!(
1735 r,
1736 PySpyProfileResult::SubprocessSpawnFailure { .. }
1737 ));
1738
1739 let r = PySpyProfileResult::from(ProfileExecOutcome::WaitFailure {
1740 pid: 1,
1741 binary: "b".into(),
1742 error: "w".into(),
1743 });
1744 assert!(matches!(r, PySpyProfileResult::WaitFailure { .. }));
1745
1746 let r = PySpyProfileResult::from(ProfileExecOutcome::TempDirFailure {
1747 pid: 1,
1748 binary: "b".into(),
1749 error: "t".into(),
1750 });
1751 assert!(matches!(r, PySpyProfileResult::TempDirFailure { .. }));
1752 }
1753}