1use async_trait::async_trait;
14use hyperactor::Actor;
15use hyperactor::Context;
16use hyperactor::HandleClient;
17use hyperactor::Handler;
18use hyperactor::RefClient;
19use hyperactor::reference as hyperactor_reference;
20use serde::Deserialize;
21use serde::Serialize;
22use typeuri::Named;
23
24use crate::config::MESH_ADMIN_PYSPY_TIMEOUT;
25use crate::config::PYSPY_BIN;
26
27#[derive(
31 Debug,
32 Clone,
33 PartialEq,
34 Serialize,
35 Deserialize,
36 Named,
37 schemars::JsonSchema
38)]
39pub enum PySpyResult {
40 Ok {
42 pid: u32,
44 binary: String,
46 stack_traces: Vec<PySpyStackTrace>,
48 warnings: Vec<String>,
52 },
53 BinaryNotFound {
55 searched: Vec<String>,
57 },
58 Failed {
60 pid: u32,
62 binary: String,
64 exit_code: Option<i32>,
66 stderr: String,
68 },
69}
70wirevalue::register_type!(PySpyResult);
71
72#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
74pub struct PySpyStackTrace {
75 pub pid: i32,
77 pub thread_id: u64,
79 pub thread_name: Option<String>,
81 pub os_thread_id: Option<u64>,
83 pub active: bool,
85 pub owns_gil: bool,
87 pub frames: Vec<PySpyFrame>,
89}
90
91#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
93pub struct PySpyFrame {
94 pub name: String,
96 pub filename: String,
98 pub module: Option<String>,
100 pub short_filename: Option<String>,
102 pub line: i32,
104 pub locals: Option<Vec<PySpyLocalVariable>>,
106 pub is_entry: bool,
108}
109
110#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
112pub struct PySpyLocalVariable {
113 pub name: String,
115 pub addr: usize,
117 pub arg: bool,
119 pub repr: Option<String>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct PySpyOpts {
126 pub threads: bool,
128 pub native: bool,
130 pub native_all: bool,
133 pub nonblocking: bool,
136}
137
138#[derive(Debug, Serialize, Deserialize, Named, Handler, HandleClient, RefClient)]
146pub struct PySpyDump {
147 pub opts: PySpyOpts,
149 #[reply]
151 pub result: hyperactor_reference::OncePortRef<PySpyResult>,
152}
153wirevalue::register_type!(PySpyDump);
154
155pub struct PySpyRunner;
159
160impl PySpyRunner {
161 pub async fn dump_self(&self, opts: &PySpyOpts) -> PySpyResult {
171 let pid = std::process::id();
172 let pyspy_bin: String = hyperactor_config::global::get_cloned(PYSPY_BIN);
173 let candidates = resolve_candidates(if pyspy_bin.is_empty() {
174 None
175 } else {
176 Some(pyspy_bin)
177 });
178 let mut searched = vec![];
179
180 for (binary, label) in &candidates {
181 searched.push(label.clone());
182 if let Some(result) = try_exec(
183 binary,
184 pid,
185 opts,
186 hyperactor_config::global::get(MESH_ADMIN_PYSPY_TIMEOUT),
187 )
188 .await
189 {
190 return result;
191 }
192 }
193
194 PySpyResult::BinaryNotFound { searched }
195 }
196}
197
198#[derive(Debug, Serialize, Deserialize, Named)]
202pub struct RunPySpyDump {
203 pub opts: PySpyOpts,
204 pub reply_port: hyperactor::reference::OncePortRef<PySpyResult>,
206}
207wirevalue::register_type!(RunPySpyDump);
208
209#[hyperactor::export(handlers = [RunPySpyDump])]
214pub struct PySpyWorker;
215
216impl Actor for PySpyWorker {}
217
218impl PySpyWorker {
219 pub(crate) fn spawn_and_forward(
223 cx: &impl hyperactor::context::Actor,
224 opts: PySpyOpts,
225 reply_port: hyperactor::reference::OncePortRef<PySpyResult>,
226 ) -> Result<(), anyhow::Error> {
227 let worker = match Self.spawn(cx) {
228 Ok(handle) => handle,
229 Err(e) => {
230 let fail = PySpyResult::Failed {
231 pid: std::process::id(),
232 binary: String::new(),
233 exit_code: None,
234 stderr: format!("failed to spawn pyspy worker: {}", e),
235 };
236 reply_port.send(cx, fail)?;
237 return Ok(());
238 }
239 };
240 if let Err(e) = worker.send(cx, RunPySpyDump { opts, reply_port }) {
245 tracing::error!("failed to send to pyspy worker: {}", e);
246 }
247 Ok(())
248 }
249}
250
251#[async_trait]
252impl Handler<RunPySpyDump> for PySpyWorker {
253 async fn handle(
254 &mut self,
255 cx: &Context<Self>,
256 message: RunPySpyDump,
257 ) -> Result<(), anyhow::Error> {
258 let result = PySpyRunner.dump_self(&message.opts).await;
259 message.reply_port.send(cx, result)?;
260 cx.stop("pyspy dump complete")?;
261 Ok(())
262 }
263}
264
265fn resolve_candidates(pyspy_bin_env: Option<String>) -> Vec<(String, String)> {
268 let mut candidates = vec![];
269 if let Some(path) = pyspy_bin_env {
270 if !path.is_empty() {
271 let label = format!("PYSPY_BIN={}", path);
272 candidates.push((path, label));
273 }
274 }
275 candidates.push(("py-spy".to_string(), "py-spy on PATH".to_string()));
276 candidates
277}
278
279fn build_command(binary: &str, pid: u32, opts: &PySpyOpts) -> tokio::process::Command {
281 let mut cmd = tokio::process::Command::new(binary);
282 cmd.arg("dump")
283 .arg("--pid")
284 .arg(pid.to_string())
285 .arg("--json");
286 if opts.threads {
287 cmd.arg("--threads");
288 }
289 if opts.native {
290 cmd.arg("--native");
291 }
292 if opts.native_all {
293 cmd.arg("--native-all");
294 }
295 if opts.nonblocking {
296 cmd.arg("--nonblocking");
297 }
298 cmd.stdout(std::process::Stdio::piped());
299 cmd.stderr(std::process::Stdio::piped());
300 cmd
301}
302
303fn map_output(output: std::process::Output, pid: u32, binary: &str) -> PySpyResult {
307 if output.status.success() {
308 match serde_json::from_slice::<Vec<PySpyStackTrace>>(&output.stdout) {
309 Ok(stack_traces) => PySpyResult::Ok {
310 pid,
311 binary: binary.to_string(),
312 stack_traces,
313 warnings: vec![],
314 },
315 Err(e) => PySpyResult::Failed {
316 pid,
317 binary: binary.to_string(),
318 exit_code: None,
319 stderr: format!("failed to parse py-spy JSON output: {}", e),
320 },
321 }
322 } else {
323 let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
324 PySpyResult::Failed {
325 pid,
326 binary: binary.to_string(),
327 exit_code: output.status.code(),
328 stderr,
329 }
330 }
331}
332
333fn is_unsupported_native_all(result: &PySpyResult) -> bool {
337 matches!(
338 result,
339 PySpyResult::Failed {
340 exit_code: Some(2),
341 stderr,
342 ..
343 } if stderr.contains("--native-all")
344 )
345}
346
347enum ExecOnce {
349 Result(PySpyResult),
351 NotFound,
353}
354
355async fn exec_once(
360 binary: &str,
361 pid: u32,
362 opts: &PySpyOpts,
363 deadline: tokio::time::Instant,
364 timeout: std::time::Duration,
365) -> ExecOnce {
366 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
367 if remaining.is_zero() {
368 return ExecOnce::Result(PySpyResult::Failed {
369 pid,
370 binary: binary.to_string(),
371 exit_code: None,
372 stderr: format!("py-spy subprocess timed out after {}s", timeout.as_secs()),
373 });
374 }
375 let child = match build_command(binary, pid, opts).spawn() {
376 Ok(child) => child,
377 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return ExecOnce::NotFound,
378 Err(e) => {
379 return ExecOnce::Result(PySpyResult::Failed {
380 pid,
381 binary: binary.to_string(),
382 exit_code: None,
383 stderr: format!("failed to execute: {}", e),
384 });
385 }
386 };
387 ExecOnce::Result(collect_with_timeout(child, pid, binary, remaining).await)
388}
389
390async fn try_exec(
404 binary: &str,
405 pid: u32,
406 opts: &PySpyOpts,
407 timeout: std::time::Duration,
408) -> Option<PySpyResult> {
409 let deadline = tokio::time::Instant::now() + timeout;
410 let retries = if opts.nonblocking { 3 } else { 1 };
411 let mut last_result = None;
412 let mut effective_opts = opts.clone();
413
414 for attempt in 0..retries {
415 if attempt > 0 {
416 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
417 }
418 let mut result = match exec_once(binary, pid, &effective_opts, deadline, timeout).await {
419 ExecOnce::NotFound => return None,
420 ExecOnce::Result(r) => r,
421 };
422 if is_unsupported_native_all(&result) && effective_opts.native_all {
426 effective_opts.native_all = false;
429 result = match exec_once(binary, pid, &effective_opts, deadline, timeout).await {
430 ExecOnce::NotFound => return None,
431 ExecOnce::Result(r) => r,
432 };
433 if let PySpyResult::Ok { warnings, .. } = &mut result {
435 warnings.push(
436 "--native-all unsupported by this py-spy; fell back to --native".to_string(),
437 );
438 }
439 }
442 match &result {
443 PySpyResult::Ok { .. } => return Some(result),
444 _ => {
445 last_result = Some(result);
446 }
447 }
448 }
449
450 last_result
451}
452
453async fn collect_with_timeout(
462 mut child: tokio::process::Child,
463 pid: u32,
464 binary: &str,
465 timeout: std::time::Duration,
466) -> PySpyResult {
467 let mut stdout_handle = child.stdout.take();
468 let mut stderr_handle = child.stderr.take();
469
470 let collect = async {
471 let stdout_fut = async {
472 let mut buf = Vec::new();
473 if let Some(ref mut r) = stdout_handle {
474 let _ = tokio::io::AsyncReadExt::read_to_end(r, &mut buf).await;
475 }
476 buf
477 };
478 let stderr_fut = async {
479 let mut buf = Vec::new();
480 if let Some(ref mut r) = stderr_handle {
481 let _ = tokio::io::AsyncReadExt::read_to_end(r, &mut buf).await;
482 }
483 buf
484 };
485 let (stdout_bytes, stderr_bytes, status) =
486 tokio::join!(stdout_fut, stderr_fut, child.wait());
487 (stdout_bytes, stderr_bytes, status)
488 };
489
490 match tokio::time::timeout(timeout, collect).await {
491 Ok((stdout_bytes, stderr_bytes, Ok(status))) => {
492 let output = std::process::Output {
493 status,
494 stdout: stdout_bytes,
495 stderr: stderr_bytes,
496 };
497 map_output(output, pid, binary)
498 }
499 Ok((_, _, Err(e))) => PySpyResult::Failed {
500 pid,
501 binary: binary.to_string(),
502 exit_code: None,
503 stderr: format!("failed to wait for child: {}", e),
504 },
505 Err(_) => {
506 let _ = child.start_kill();
508 let _ = child.wait().await;
509 PySpyResult::Failed {
510 pid,
511 binary: binary.to_string(),
512 exit_code: None,
513 stderr: format!("py-spy subprocess timed out after {}s", timeout.as_secs()),
514 }
515 }
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522
523 #[test]
524 fn pyspy_result_wirevalue_roundtrip() {
525 let original = PySpyResult::Ok {
530 pid: 42,
531 binary: "py-spy".to_string(),
532 stack_traces: vec![PySpyStackTrace {
533 pid: 42,
534 thread_id: 1,
535 thread_name: Some("main".to_string()),
536 os_thread_id: Some(100),
537 active: true,
538 owns_gil: true,
539 frames: vec![PySpyFrame {
540 name: "do_work".to_string(),
541 filename: "test.py".to_string(),
542 module: None,
543 short_filename: None,
544 line: 10,
545 locals: None,
546 is_entry: false,
547 }],
548 }],
549 warnings: vec![],
550 };
551 let any = wirevalue::Any::serialize(&original).expect("serialize");
552 let restored: PySpyResult = any.deserialized().expect("deserialize");
553 assert_eq!(original, restored);
554 }
555
556 #[test]
557 fn candidates_no_env() {
558 let candidates = resolve_candidates(None);
560 assert_eq!(candidates.len(), 1);
561 assert_eq!(candidates[0].0, "py-spy");
562 assert_eq!(candidates[0].1, "py-spy on PATH");
563 }
564
565 #[test]
566 fn candidates_env_first_then_path() {
567 let candidates = resolve_candidates(Some("/custom/py-spy".to_string()));
569 assert_eq!(candidates.len(), 2);
570 assert_eq!(candidates[0].0, "/custom/py-spy");
571 assert!(candidates[0].1.contains("PYSPY_BIN=/custom/py-spy"));
572 assert_eq!(candidates[1].0, "py-spy");
573 }
574
575 #[test]
576 fn candidates_empty_env_ignored() {
577 let candidates = resolve_candidates(Some(String::new()));
579 assert_eq!(candidates.len(), 1);
580 assert_eq!(candidates[0].0, "py-spy");
581 }
582
583 #[test]
584 fn output_success_parses_json() {
585 let json = serde_json::json!([{
587 "pid": 42,
588 "thread_id": 1234,
589 "thread_name": "MainThread",
590 "os_thread_id": 5678,
591 "active": true,
592 "owns_gil": true,
593 "frames": [{
594 "name": "do_work",
595 "filename": "foo.py",
596 "module": null,
597 "short_filename": null,
598 "line": 10,
599 "locals": null,
600 "is_entry": false
601 }]
602 }]);
603 let output = std::process::Output {
604 status: std::process::ExitStatus::default(),
605 stdout: serde_json::to_vec(&json).unwrap(),
606 stderr: vec![],
607 };
608 let result = map_output(output, 42, "/usr/bin/py-spy");
609 match result {
610 PySpyResult::Ok {
611 pid,
612 binary,
613 stack_traces,
614 ..
615 } => {
616 assert_eq!(pid, 42);
617 assert_eq!(binary, "/usr/bin/py-spy");
618 assert_eq!(stack_traces.len(), 1);
619 assert_eq!(stack_traces[0].thread_id, 1234);
620 assert_eq!(stack_traces[0].thread_name.as_deref(), Some("MainThread"));
621 assert!(stack_traces[0].owns_gil);
622 assert_eq!(stack_traces[0].frames.len(), 1);
623 assert_eq!(stack_traces[0].frames[0].name, "do_work");
624 assert_eq!(stack_traces[0].frames[0].filename, "foo.py");
625 assert_eq!(stack_traces[0].frames[0].line, 10);
626 }
627 other => panic!("expected Ok, got {:?}", other),
628 }
629 }
630
631 #[test]
632 fn output_invalid_json_maps_to_failed() {
633 let output = std::process::Output {
635 status: std::process::ExitStatus::default(),
636 stdout: b"not valid json".to_vec(),
637 stderr: vec![],
638 };
639 let result = map_output(output, 42, "py-spy");
640 match result {
641 PySpyResult::Failed { pid, stderr, .. } => {
642 assert_eq!(pid, 42);
643 assert!(
644 stderr.contains("failed to parse py-spy JSON output"),
645 "unexpected stderr: {stderr}"
646 );
647 }
648 other => panic!("expected Failed, got {:?}", other),
649 }
650 }
651
652 #[test]
653 fn output_nonzero_exit_maps_to_failed() {
654 use std::os::unix::process::ExitStatusExt;
656 let status = std::process::ExitStatus::from_raw(256); let output = std::process::Output {
658 status,
659 stdout: vec![],
660 stderr: b"Permission denied".to_vec(),
661 };
662 let result = map_output(output, 99, "py-spy");
663 match result {
664 PySpyResult::Failed {
665 pid,
666 binary,
667 exit_code,
668 stderr,
669 } => {
670 assert_eq!(pid, 99);
671 assert_eq!(binary, "py-spy");
672 assert_eq!(exit_code, Some(1));
673 assert_eq!(stderr, "Permission denied");
674 }
675 other => panic!("expected Failed, got {:?}", other),
676 }
677 }
678
679 #[test]
680 fn output_preserves_caller_pid() {
681 let json = serde_json::json!([]);
683 let output = std::process::Output {
684 status: std::process::ExitStatus::default(),
685 stdout: serde_json::to_vec(&json).unwrap(),
686 stderr: vec![],
687 };
688 let result = map_output(output, 12345, "bin");
689 match result {
690 PySpyResult::Ok { pid, .. } => assert_eq!(pid, 12345),
691 other => panic!("expected Ok, got {:?}", other),
692 }
693 }
694
695 fn default_opts() -> PySpyOpts {
696 PySpyOpts {
697 threads: false,
698 native: false,
699 native_all: false,
700 nonblocking: false,
701 }
702 }
703
704 #[tokio::test]
705 async fn exec_missing_binary_returns_none() {
706 let result = try_exec(
708 "/definitely/not/a/real/binary",
709 1,
710 &default_opts(),
711 std::time::Duration::from_secs(5),
712 )
713 .await;
714 assert!(result.is_none());
715 }
716
717 #[tokio::test]
718 async fn exec_present_binary_returns_some() {
719 let result = try_exec(
722 "true",
723 1,
724 &default_opts(),
725 std::time::Duration::from_secs(5),
726 )
727 .await;
728 match result {
729 Some(PySpyResult::Failed { stderr, .. }) => {
730 assert!(
731 stderr.contains("parse"),
732 "expected JSON parse error, got: {stderr}"
733 );
734 }
735 other => panic!("expected Some(Failed{{parse..}}), got: {other:?}"),
736 }
737 }
738
739 #[tokio::test]
740 async fn collect_timeout_kills_child_and_returns_failed() {
741 use tokio::process::Command;
744
745 let child = Command::new("sleep")
746 .arg("100")
747 .stdout(std::process::Stdio::piped())
748 .stderr(std::process::Stdio::piped())
749 .spawn()
750 .expect("sleep must be available");
751
752 let result = collect_with_timeout(
753 child,
754 std::process::id(),
755 "sleep",
756 std::time::Duration::from_millis(100),
757 )
758 .await;
759
760 match result {
761 PySpyResult::Failed { stderr, .. } => {
762 assert!(
763 stderr.contains("timed out"),
764 "expected timeout message, got: {stderr}"
765 );
766 }
767 other => panic!("expected Failed, got {:?}", other),
768 }
769 }
770
771 #[tokio::test]
772 async fn exec_failing_binary_returns_failed() {
773 let result = try_exec(
775 "false",
776 42,
777 &default_opts(),
778 std::time::Duration::from_secs(5),
779 )
780 .await;
781 assert!(result.is_some());
782 match result.unwrap() {
783 PySpyResult::Failed {
784 pid,
785 binary,
786 exit_code,
787 ..
788 } => {
789 assert_eq!(pid, 42);
790 assert_eq!(binary, "false");
791 assert!(exit_code.is_some());
792 }
793 other => panic!("expected Failed, got {:?}", other),
794 }
795 }
796
797 fn write_fake_pyspy(script_body: &str) -> tempfile::TempPath {
805 use std::io::Write;
806 use std::os::unix::fs::PermissionsExt;
807
808 let mut f = tempfile::NamedTempFile::new().expect("create temp file");
809 write!(f, "#!/bin/sh\n{script_body}").expect("write script");
810 f.as_file().sync_all().expect("sync");
811 std::fs::set_permissions(f.path(), std::fs::Permissions::from_mode(0o755))
812 .expect("chmod +x");
813 f.into_temp_path()
814 }
815
816 fn read_log(script_path: &std::path::Path) -> Vec<String> {
819 let log_path = format!("{}.log", script_path.display());
820 match std::fs::read_to_string(&log_path) {
821 Ok(contents) => contents.lines().map(String::from).collect(),
822 Err(_) => vec![],
823 }
824 }
825
826 #[tokio::test]
827 async fn native_all_downgrade_succeeds() {
828 let script = write_fake_pyspy(
832 r#"
833echo "$@" >> "$0.log"
834for arg in "$@"; do
835 if [ "$arg" = "--native-all" ]; then
836 echo "unrecognized option --native-all" >&2
837 exit 2
838 fi
839done
840echo "[]"
841exit 0
842"#,
843 );
844 let opts = PySpyOpts {
845 threads: false,
846 native: true,
847 native_all: true,
848 nonblocking: false,
849 };
850 let result = try_exec(
851 script.to_str().unwrap(),
852 1,
853 &opts,
854 std::time::Duration::from_secs(5),
855 )
856 .await;
857 let result = result.expect("expected Some");
859 match &result {
860 PySpyResult::Ok { warnings, .. } => {
861 assert!(
862 warnings.iter().any(|w| w.contains("fell back to --native")),
863 "PS-11c: expected fallback warning, got: {warnings:?}"
864 );
865 }
866 other => panic!("expected Ok, got: {other:?}"),
867 }
868 let log = read_log(&script);
870 assert_eq!(
871 log.len(),
872 2,
873 "PS-11b: expected exactly 2 invocations, got {}",
874 log.len()
875 );
876 assert!(
877 log[0].contains("--native-all"),
878 "PS-11a: first invocation must include --native-all, got: {}",
879 log[0]
880 );
881 assert!(
882 !log[1].contains("--native-all"),
883 "PS-11a: second invocation must NOT include --native-all, got: {}",
884 log[1]
885 );
886 }
887
888 #[tokio::test]
889 async fn native_all_downgrade_fails_retries_continue() {
890 let script = write_fake_pyspy(
893 r#"
894echo "$@" >> "$0.log"
895for arg in "$@"; do
896 if [ "$arg" = "--native-all" ]; then
897 echo "unrecognized option --native-all" >&2
898 exit 2
899 fi
900done
901echo "Permission denied" >&2
902exit 1
903"#,
904 );
905 let opts = PySpyOpts {
906 threads: false,
907 native: true,
908 native_all: true,
909 nonblocking: true, };
911 let result = try_exec(
912 script.to_str().unwrap(),
913 1,
914 &opts,
915 std::time::Duration::from_secs(10),
916 )
917 .await;
918 let result = result.expect("expected Some");
920 match &result {
921 PySpyResult::Failed {
922 stderr, exit_code, ..
923 } => {
924 assert!(
925 stderr.contains("Permission denied"),
926 "PS-11d: expected generic failure, got: {stderr}"
927 );
928 assert_eq!(*exit_code, Some(1));
929 }
930 other => panic!("expected Failed, got: {other:?}"),
931 }
932 let log = read_log(&script);
937 assert_eq!(log.len(), 4, "expected 4 invocations, got {}", log.len());
938 assert!(
939 log[0].contains("--native-all"),
940 "PS-11a: first invocation must include --native-all, got: {}",
941 log[0]
942 );
943 for (i, line) in log[1..].iter().enumerate() {
944 assert!(
945 !line.contains("--native-all"),
946 "PS-11e: invocation {} must NOT include --native-all, got: {}",
947 i + 1,
948 line
949 );
950 }
951 }
952}