1pub mod dto;
276
277use hyperactor_config::Attrs;
278use hyperactor_config::INTROSPECT;
279use hyperactor_config::IntrospectAttr;
280use hyperactor_config::declare_attrs;
281
282declare_attrs! {
284 @meta(INTROSPECT = IntrospectAttr {
286 name: "node_type".into(),
287 desc: "Topology role: root, host, proc, error".into(),
288 })
289 pub attr NODE_TYPE: String;
290
291 @meta(INTROSPECT = IntrospectAttr {
293 name: "addr".into(),
294 desc: "Host network address".into(),
295 })
296 pub attr ADDR: String;
297
298 @meta(INTROSPECT = IntrospectAttr {
300 name: "num_procs".into(),
301 desc: "Number of procs on a host".into(),
302 })
303 pub attr NUM_PROCS: usize = 0;
304
305 @meta(INTROSPECT = IntrospectAttr {
307 name: "proc_name".into(),
308 desc: "Human-readable proc name".into(),
309 })
310 pub attr PROC_NAME: String;
311
312 @meta(INTROSPECT = IntrospectAttr {
314 name: "num_actors".into(),
315 desc: "Number of actors in a proc".into(),
316 })
317 pub attr NUM_ACTORS: usize = 0;
318
319 @meta(INTROSPECT = IntrospectAttr {
321 name: "system_children".into(),
322 desc: "References of system/infrastructure children".into(),
323 })
324 pub attr SYSTEM_CHILDREN: Vec<NodeRef>;
325
326 @meta(INTROSPECT = IntrospectAttr {
328 name: "stopped_children".into(),
329 desc: "References of stopped children".into(),
330 })
331 pub attr STOPPED_CHILDREN: Vec<NodeRef>;
332
333 @meta(INTROSPECT = IntrospectAttr {
335 name: "stopped_retention_cap".into(),
336 desc: "Maximum number of stopped children retained".into(),
337 })
338 pub attr STOPPED_RETENTION_CAP: usize = 0;
339
340 @meta(INTROSPECT = IntrospectAttr {
343 name: "is_poisoned".into(),
344 desc: "Whether this proc is poisoned (refusing new spawns)".into(),
345 })
346 pub attr IS_POISONED: bool = false;
347
348 @meta(INTROSPECT = IntrospectAttr {
350 name: "failed_actor_count".into(),
351 desc: "Number of failed actors in this proc".into(),
352 })
353 pub attr FAILED_ACTOR_COUNT: usize = 0;
354
355 @meta(INTROSPECT = IntrospectAttr {
357 name: "started_at".into(),
358 desc: "Timestamp when the mesh was started".into(),
359 })
360 pub attr STARTED_AT: std::time::SystemTime;
361
362 @meta(INTROSPECT = IntrospectAttr {
364 name: "started_by".into(),
365 desc: "Username who started the mesh".into(),
366 })
367 pub attr STARTED_BY: String;
368
369 @meta(INTROSPECT = IntrospectAttr {
371 name: "num_hosts".into(),
372 desc: "Number of hosts in the mesh".into(),
373 })
374 pub attr NUM_HOSTS: usize = 0;
375
376 @meta(INTROSPECT = IntrospectAttr {
382 name: "process_rss_bytes".into(),
383 desc: "RSS of the hosting OS process (bytes)".into(),
384 })
385 pub attr PROCESS_RSS_BYTES: Option<u64>;
386
387 @meta(INTROSPECT = IntrospectAttr {
392 name: "process_vm_size_bytes".into(),
393 desc: "Virtual memory size of the hosting OS process (bytes)".into(),
394 })
395 pub attr PROCESS_VM_SIZE_BYTES: Option<u64>;
396
397 @meta(INTROSPECT = IntrospectAttr {
399 name: "actor_work_queue_depth_total".into(),
400 desc: "Sum of per-actor message queue depths (live actors only)".into(),
401 })
402 pub attr ACTOR_WORK_QUEUE_DEPTH_TOTAL: u64 = 0;
403
404 @meta(INTROSPECT = IntrospectAttr {
408 name: "actor_work_queue_depth_max".into(),
409 desc: "Maximum per-actor message queue depth (live actors only)".into(),
410 })
411 pub attr ACTOR_WORK_QUEUE_DEPTH_MAX: u64 = 0;
412
413 @meta(INTROSPECT = IntrospectAttr {
418 name: "actor_work_queue_depth_high_water_mark".into(),
419 desc: "Maximum proc-wide queue depth since startup (eventually consistent)".into(),
420 })
421 pub attr ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK: u64 = 0;
422
423 @meta(INTROSPECT = IntrospectAttr {
429 name: "last_nonzero_queue_depth_age_ms".into(),
430 desc: "Milliseconds since proc-wide queue depth was last observed non-zero (wall clock)".into(),
431 })
432 pub attr LAST_NONZERO_QUEUE_DEPTH_AGE_MS: Option<u64>;
433
434}
435
436use hyperactor::introspect::AttrsViewError;
437
438#[derive(Debug, Clone, PartialEq)]
440pub struct RootAttrsView {
441 pub num_hosts: usize,
442 pub started_at: SystemTime,
443 pub started_by: String,
444 pub system_children: Vec<NodeRef>,
445}
446
447impl RootAttrsView {
448 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
452 let num_hosts = *attrs.get(NUM_HOSTS).unwrap_or(&0);
453 let started_at = *attrs
454 .get(STARTED_AT)
455 .ok_or_else(|| AttrsViewError::missing("started_at"))?;
456 let started_by = attrs
457 .get(STARTED_BY)
458 .ok_or_else(|| AttrsViewError::missing("started_by"))?
459 .clone();
460 let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
461 Ok(Self {
462 num_hosts,
463 started_at,
464 started_by,
465 system_children,
466 })
467 }
468
469 pub fn to_attrs(&self) -> Attrs {
471 let mut attrs = Attrs::new();
472 attrs.set(NODE_TYPE, "root".to_string());
473 attrs.set(NUM_HOSTS, self.num_hosts);
474 attrs.set(STARTED_AT, self.started_at);
475 attrs.set(STARTED_BY, self.started_by.clone());
476 attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
477 attrs
478 }
479}
480
481#[derive(
489 Debug,
490 Clone,
491 Copy,
492 PartialEq,
493 Eq,
494 Default,
495 Serialize,
496 Deserialize,
497 Named
498)]
499pub struct ProcessMemoryStats {
500 pub process_rss_bytes: Option<u64>,
503 pub process_vm_size_bytes: Option<u64>,
506}
507
508impl ProcessMemoryStats {
509 pub fn read_from_procfs() -> Self {
513 let (rss, vm) = read_procfs_memory();
514 Self {
515 process_rss_bytes: rss,
516 process_vm_size_bytes: vm,
517 }
518 }
519
520 pub fn from_attrs(attrs: &Attrs) -> Self {
521 Self {
522 process_rss_bytes: attrs.get(PROCESS_RSS_BYTES).copied().flatten(),
523 process_vm_size_bytes: attrs.get(PROCESS_VM_SIZE_BYTES).copied().flatten(),
524 }
525 }
526
527 pub fn to_attrs(&self, attrs: &mut Attrs) {
528 attrs.set(PROCESS_RSS_BYTES, self.process_rss_bytes);
529 attrs.set(PROCESS_VM_SIZE_BYTES, self.process_vm_size_bytes);
530 }
531}
532
533#[cfg(target_os = "linux")]
541fn read_procfs_memory() -> (Option<u64>, Option<u64>) {
542 let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
545 if page_size <= 0 {
546 return (None, None);
547 }
548 let page_size = page_size as u64;
549 match std::fs::read_to_string("/proc/self/statm") {
557 Ok(contents) => {
558 let mut fields = contents.split_whitespace();
559 let vm_pages: Option<u64> = fields.next().and_then(|s| s.parse().ok());
560 let rss_pages: Option<u64> = fields.next().and_then(|s| s.parse().ok());
561 (
562 rss_pages.map(|p| p * page_size),
563 vm_pages.map(|p| p * page_size),
564 )
565 }
566 Err(_) => (None, None),
567 }
568}
569
570#[cfg(not(target_os = "linux"))]
571fn read_procfs_memory() -> (Option<u64>, Option<u64>) {
572 (None, None)
573}
574
575#[derive(
589 Debug,
590 Clone,
591 Copy,
592 PartialEq,
593 Eq,
594 Default,
595 Serialize,
596 Deserialize,
597 Named
598)]
599pub struct ProcDebugStats {
600 pub memory: ProcessMemoryStats,
602 pub actor_work_queue_depth_total: u64,
605 pub actor_work_queue_depth_max: u64,
609 pub actor_work_queue_depth_high_water_mark: u64,
613 pub last_nonzero_queue_depth_age_ms: Option<u64>,
617}
618
619impl ProcDebugStats {
620 pub fn from_attrs(attrs: &Attrs) -> Self {
621 let total = attrs
622 .get(ACTOR_WORK_QUEUE_DEPTH_TOTAL)
623 .copied()
624 .unwrap_or(0);
625 let max = attrs.get(ACTOR_WORK_QUEUE_DEPTH_MAX).copied().unwrap_or(0);
626 if max > total {
628 tracing::warn!(
629 "PD-1 violation: actor_work_queue_depth_max ({}) > total ({})",
630 max,
631 total,
632 );
633 }
634 let high_water = attrs
635 .get(ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK)
636 .copied()
637 .unwrap_or(0);
638 let last_nonzero = attrs
642 .get(LAST_NONZERO_QUEUE_DEPTH_AGE_MS)
643 .copied()
644 .flatten();
645 Self {
646 memory: ProcessMemoryStats::from_attrs(attrs),
647 actor_work_queue_depth_total: total,
648 actor_work_queue_depth_max: max,
649 actor_work_queue_depth_high_water_mark: high_water,
650 last_nonzero_queue_depth_age_ms: last_nonzero,
651 }
652 }
653
654 pub fn to_attrs(&self, attrs: &mut Attrs) {
655 self.memory.to_attrs(attrs);
656 attrs.set(
657 ACTOR_WORK_QUEUE_DEPTH_TOTAL,
658 self.actor_work_queue_depth_total,
659 );
660 attrs.set(ACTOR_WORK_QUEUE_DEPTH_MAX, self.actor_work_queue_depth_max);
661 attrs.set(
662 ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK,
663 self.actor_work_queue_depth_high_water_mark,
664 );
665 attrs.set(
666 LAST_NONZERO_QUEUE_DEPTH_AGE_MS,
667 self.last_nonzero_queue_depth_age_ms,
668 );
669 }
670}
671
672#[derive(Debug, Clone, PartialEq)]
674pub struct HostAttrsView {
675 pub addr: String,
676 pub num_procs: usize,
677 pub system_children: Vec<NodeRef>,
678 pub memory: ProcessMemoryStats,
680}
681
682impl HostAttrsView {
683 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
687 let addr = attrs
688 .get(ADDR)
689 .ok_or_else(|| AttrsViewError::missing("addr"))?
690 .clone();
691 let num_procs = *attrs.get(NUM_PROCS).unwrap_or(&0);
692 let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
693 let memory = ProcessMemoryStats::from_attrs(attrs);
694 Ok(Self {
695 addr,
696 num_procs,
697 system_children,
698 memory,
699 })
700 }
701
702 pub fn to_attrs(&self) -> Attrs {
704 let mut attrs = Attrs::new();
705 attrs.set(NODE_TYPE, "host".to_string());
706 attrs.set(ADDR, self.addr.clone());
707 attrs.set(NUM_PROCS, self.num_procs);
708 attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
709 self.memory.to_attrs(&mut attrs);
710 attrs
711 }
712}
713
714#[derive(Debug, Clone, PartialEq)]
716pub struct ProcAttrsView {
717 pub proc_name: String,
718 pub num_actors: usize,
719 pub system_children: Vec<NodeRef>,
720 pub stopped_children: Vec<NodeRef>,
721 pub stopped_retention_cap: usize,
722 pub is_poisoned: bool,
723 pub failed_actor_count: usize,
724 pub debug: ProcDebugStats,
726}
727
728impl ProcAttrsView {
729 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
733 let proc_name = attrs
734 .get(PROC_NAME)
735 .ok_or_else(|| AttrsViewError::missing("proc_name"))?
736 .clone();
737 let num_actors = *attrs.get(NUM_ACTORS).unwrap_or(&0);
738 let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
739 let stopped_children = attrs.get(STOPPED_CHILDREN).cloned().unwrap_or_default();
740 let stopped_retention_cap = *attrs.get(STOPPED_RETENTION_CAP).unwrap_or(&0);
741 let is_poisoned = *attrs.get(IS_POISONED).unwrap_or(&false);
742 let failed_actor_count = *attrs.get(FAILED_ACTOR_COUNT).unwrap_or(&0);
743
744 if is_poisoned != (failed_actor_count > 0) {
746 return Err(AttrsViewError::invariant(
747 "FI-5",
748 format!("is_poisoned={is_poisoned} but failed_actor_count={failed_actor_count}"),
749 ));
750 }
751
752 let debug = ProcDebugStats::from_attrs(attrs);
753
754 Ok(Self {
755 proc_name,
756 num_actors,
757 system_children,
758 stopped_children,
759 stopped_retention_cap,
760 is_poisoned,
761 failed_actor_count,
762 debug,
763 })
764 }
765
766 pub fn to_attrs(&self) -> Attrs {
768 let mut attrs = Attrs::new();
769 attrs.set(NODE_TYPE, "proc".to_string());
770 attrs.set(PROC_NAME, self.proc_name.clone());
771 attrs.set(NUM_ACTORS, self.num_actors);
772 attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
773 attrs.set(STOPPED_CHILDREN, self.stopped_children.clone());
774 attrs.set(STOPPED_RETENTION_CAP, self.stopped_retention_cap);
775 attrs.set(IS_POISONED, self.is_poisoned);
776 attrs.set(FAILED_ACTOR_COUNT, self.failed_actor_count);
777 self.debug.to_attrs(&mut attrs);
778 attrs
779 }
780}
781
782#[derive(Debug, Clone, PartialEq)]
784pub struct ErrorAttrsView {
785 pub code: String,
786 pub message: String,
787}
788
789impl ErrorAttrsView {
790 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
793 use hyperactor::introspect::ERROR_CODE;
794 use hyperactor::introspect::ERROR_MESSAGE;
795
796 let code = attrs
797 .get(ERROR_CODE)
798 .ok_or_else(|| AttrsViewError::missing("error_code"))?
799 .clone();
800 let message = attrs.get(ERROR_MESSAGE).cloned().unwrap_or_default();
801 Ok(Self { code, message })
802 }
803
804 pub fn to_attrs(&self) -> Attrs {
806 use hyperactor::introspect::ERROR_CODE;
807 use hyperactor::introspect::ERROR_MESSAGE;
808
809 let mut attrs = Attrs::new();
810 attrs.set(ERROR_CODE, self.code.clone());
811 attrs.set(ERROR_MESSAGE, self.message.clone());
812 attrs
813 }
814}
815
816use std::fmt;
819use std::str::FromStr;
820use std::time::SystemTime;
821
822use serde::Deserialize;
823use serde::Serialize;
824use typeuri::Named;
825
826#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
831pub enum NodeRef {
832 #[serde(rename = "root")]
835 Root,
836 Host(hyperactor::ActorAddr),
838 Proc(hyperactor::ProcAddr),
840 Actor(hyperactor::ActorAddr),
842}
843
844hyperactor_config::impl_attrvalue!(NodeRef);
845
846impl fmt::Display for NodeRef {
847 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
848 match self {
849 Self::Root => write!(f, "root"),
850 Self::Host(id) => write!(f, "host:{}", id),
851 Self::Proc(id) => fmt::Display::fmt(id, f),
852 Self::Actor(id) => fmt::Display::fmt(id, f),
853 }
854 }
855}
856
857#[derive(Debug, thiserror::Error)]
859pub enum NodeRefParseError {
860 #[error("empty reference string")]
861 Empty,
862 #[error("invalid host reference: {0}")]
863 InvalidHost(hyperactor::AddrParseError),
864 #[error("port references are not valid node references")]
865 PortNotAllowed,
866 #[error(transparent)]
867 Reference(#[from] hyperactor::AddrParseError),
868}
869
870impl FromStr for NodeRef {
871 type Err = NodeRefParseError;
872
873 fn from_str(s: &str) -> Result<Self, Self::Err> {
874 if s.is_empty() {
875 return Err(NodeRefParseError::Empty);
876 }
877 if s == "root" {
878 return Ok(Self::Root);
879 }
880 if let Some(rest) = s.strip_prefix("host:") {
881 let actor_id: hyperactor::ActorAddr =
882 rest.parse().map_err(NodeRefParseError::InvalidHost)?;
883 return Ok(Self::Host(actor_id));
884 }
885 let r: hyperactor::Addr = s.parse()?;
886 match r {
887 hyperactor::Addr::Proc(id) => Ok(Self::Proc(id)),
888 hyperactor::Addr::Actor(id) => Ok(Self::Actor(id)),
889 hyperactor::Addr::Port(_) => Err(NodeRefParseError::PortNotAllowed),
890 }
891 }
892}
893
894impl From<hyperactor::introspect::IntrospectRef> for NodeRef {
895 fn from(r: hyperactor::introspect::IntrospectRef) -> Self {
896 match r {
897 hyperactor::introspect::IntrospectRef::Proc(id) => Self::Proc(id),
898 hyperactor::introspect::IntrospectRef::Actor(id) => Self::Actor(id),
899 }
900 }
901}
902
903#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
914pub struct NodePayload {
915 pub identity: NodeRef,
917 pub properties: NodeProperties,
919 pub children: Vec<NodeRef>,
921 pub parent: Option<NodeRef>,
923 pub as_of: SystemTime,
925}
926wirevalue::register_type!(NodePayload);
927
928#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
934pub enum NodeProperties {
935 Root {
937 num_hosts: usize,
938 started_at: SystemTime,
939 started_by: String,
940 system_children: Vec<NodeRef>,
941 },
942 Host {
944 addr: String,
945 num_procs: usize,
946 system_children: Vec<NodeRef>,
947 memory: ProcessMemoryStats,
948 },
949 Proc {
951 proc_name: String,
952 num_actors: usize,
953 system_children: Vec<NodeRef>,
954 stopped_children: Vec<NodeRef>,
955 stopped_retention_cap: usize,
956 is_poisoned: bool,
957 failed_actor_count: usize,
958 debug: ProcDebugStats,
959 },
960 Actor {
962 actor_status: String,
963 actor_type: String,
964 messages_processed: u64,
965 created_at: Option<SystemTime>,
966 last_message_handler: Option<String>,
967 total_processing_time_us: u64,
968 flight_recorder: Option<String>,
969 is_system: bool,
970 failure_info: Option<FailureInfo>,
971 },
972 Error { code: String, message: String },
974}
975wirevalue::register_type!(NodeProperties);
976
977#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
982pub struct FailureInfo {
983 pub error_message: String,
985 pub root_cause_actor: hyperactor::ActorAddr,
987 pub root_cause_name: Option<String>,
989 pub occurred_at: SystemTime,
991 pub is_propagated: bool,
993}
994wirevalue::register_type!(FailureInfo);
995
996trait IntoNodeProperties {
1001 fn into_node_properties(self) -> NodeProperties;
1002}
1003
1004impl IntoNodeProperties for RootAttrsView {
1005 fn into_node_properties(self) -> NodeProperties {
1006 NodeProperties::Root {
1007 num_hosts: self.num_hosts,
1008 started_at: self.started_at,
1009 started_by: self.started_by,
1010 system_children: self.system_children,
1011 }
1012 }
1013}
1014
1015impl IntoNodeProperties for HostAttrsView {
1016 fn into_node_properties(self) -> NodeProperties {
1017 NodeProperties::Host {
1018 addr: self.addr,
1019 num_procs: self.num_procs,
1020 system_children: self.system_children,
1021 memory: self.memory,
1022 }
1023 }
1024}
1025
1026impl IntoNodeProperties for ProcAttrsView {
1027 fn into_node_properties(self) -> NodeProperties {
1028 NodeProperties::Proc {
1029 proc_name: self.proc_name,
1030 num_actors: self.num_actors,
1031 system_children: self.system_children,
1032 stopped_children: self.stopped_children,
1033 stopped_retention_cap: self.stopped_retention_cap,
1034 is_poisoned: self.is_poisoned,
1035 failed_actor_count: self.failed_actor_count,
1036 debug: self.debug,
1037 }
1038 }
1039}
1040
1041impl IntoNodeProperties for ErrorAttrsView {
1042 fn into_node_properties(self) -> NodeProperties {
1043 NodeProperties::Error {
1044 code: self.code,
1045 message: self.message,
1046 }
1047 }
1048}
1049
1050impl IntoNodeProperties for hyperactor::introspect::ActorAttrsView {
1051 fn into_node_properties(self) -> NodeProperties {
1052 let actor_status = match &self.status_reason {
1053 Some(reason) => format!("{}: {}", self.status, reason),
1054 None => self.status.clone(),
1055 };
1056
1057 let failure_info = self.failure.map(|fi| FailureInfo {
1058 error_message: fi.error_message,
1059 root_cause_actor: fi.root_cause_actor,
1060 root_cause_name: fi.root_cause_name,
1061 occurred_at: fi.occurred_at,
1062 is_propagated: fi.is_propagated,
1063 });
1064
1065 NodeProperties::Actor {
1066 actor_status,
1067 actor_type: self.actor_type,
1068 messages_processed: self.messages_processed,
1069 created_at: self.created_at,
1070 last_message_handler: self.last_handler,
1071 total_processing_time_us: self.total_processing_time_us,
1072 flight_recorder: self.flight_recorder,
1073 is_system: self.is_system,
1074 failure_info,
1075 }
1076 }
1077}
1078
1079pub fn derive_properties(attrs_json: &str) -> NodeProperties {
1092 let attrs: Attrs = match serde_json::from_str(attrs_json) {
1093 Ok(a) => a,
1094 Err(_) => {
1095 return NodeProperties::Error {
1096 code: "parse_error".into(),
1097 message: "failed to parse attrs JSON".into(),
1098 };
1099 }
1100 };
1101
1102 let node_type = attrs.get(NODE_TYPE).cloned().unwrap_or_default();
1103
1104 match node_type.as_str() {
1105 "root" => match RootAttrsView::from_attrs(&attrs) {
1106 Ok(v) => v.into_node_properties(),
1107 Err(e) => NodeProperties::Error {
1108 code: "malformed_root".into(),
1109 message: e.to_string(),
1110 },
1111 },
1112 "host" => match HostAttrsView::from_attrs(&attrs) {
1113 Ok(v) => v.into_node_properties(),
1114 Err(e) => NodeProperties::Error {
1115 code: "malformed_host".into(),
1116 message: e.to_string(),
1117 },
1118 },
1119 "proc" => match ProcAttrsView::from_attrs(&attrs) {
1120 Ok(v) => v.into_node_properties(),
1121 Err(e) => NodeProperties::Error {
1122 code: "malformed_proc".into(),
1123 message: e.to_string(),
1124 },
1125 },
1126 _ => {
1127 use hyperactor::introspect::ERROR_CODE;
1130 use hyperactor::introspect::STATUS;
1131
1132 if attrs.get(ERROR_CODE).is_some() {
1133 return match ErrorAttrsView::from_attrs(&attrs) {
1134 Ok(v) => v.into_node_properties(),
1135 Err(e) => NodeProperties::Error {
1136 code: "malformed_error".into(),
1137 message: e.to_string(),
1138 },
1139 };
1140 }
1141
1142 if attrs.get(STATUS).is_none() {
1143 return NodeProperties::Error {
1144 code: "unknown_node_type".into(),
1145 message: format!("unrecognized node_type: {:?}", node_type),
1146 };
1147 }
1148
1149 match hyperactor::introspect::ActorAttrsView::from_attrs(&attrs) {
1150 Ok(v) => v.into_node_properties(),
1151 Err(e) => NodeProperties::Error {
1152 code: "malformed_actor".into(),
1153 message: e.to_string(),
1154 },
1155 }
1156 }
1157 }
1158}
1159
1160pub fn to_node_payload(result: hyperactor::introspect::IntrospectResult) -> NodePayload {
1163 NodePayload {
1164 identity: result.identity.into(),
1165 properties: derive_properties(&result.attrs),
1166 children: result.children.into_iter().map(NodeRef::from).collect(),
1167 parent: result.parent.map(NodeRef::from),
1168 as_of: result.as_of,
1169 }
1170}
1171
1172pub fn to_node_payload_with(
1175 result: hyperactor::introspect::IntrospectResult,
1176 identity: NodeRef,
1177 parent: Option<NodeRef>,
1178) -> NodePayload {
1179 NodePayload {
1180 identity,
1181 properties: derive_properties(&result.attrs),
1182 children: result.children.into_iter().map(NodeRef::from).collect(),
1183 parent,
1184 as_of: result.as_of,
1185 }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190 use super::*;
1191 use crate::mesh_id::ResourceId;
1192
1193 #[test]
1196 fn test_mesh_introspect_keys_are_tagged() {
1197 let cases = vec![
1198 ("node_type", NODE_TYPE.attrs()),
1199 ("addr", ADDR.attrs()),
1200 ("num_procs", NUM_PROCS.attrs()),
1201 ("proc_name", PROC_NAME.attrs()),
1202 ("num_actors", NUM_ACTORS.attrs()),
1203 ("system_children", SYSTEM_CHILDREN.attrs()),
1204 ("stopped_children", STOPPED_CHILDREN.attrs()),
1205 ("stopped_retention_cap", STOPPED_RETENTION_CAP.attrs()),
1206 ("is_poisoned", IS_POISONED.attrs()),
1207 ("failed_actor_count", FAILED_ACTOR_COUNT.attrs()),
1208 ("started_at", STARTED_AT.attrs()),
1209 ("started_by", STARTED_BY.attrs()),
1210 ("num_hosts", NUM_HOSTS.attrs()),
1211 ("process_rss_bytes", PROCESS_RSS_BYTES.attrs()),
1213 ("process_vm_size_bytes", PROCESS_VM_SIZE_BYTES.attrs()),
1214 (
1215 "actor_work_queue_depth_total",
1216 ACTOR_WORK_QUEUE_DEPTH_TOTAL.attrs(),
1217 ),
1218 (
1219 "actor_work_queue_depth_max",
1220 ACTOR_WORK_QUEUE_DEPTH_MAX.attrs(),
1221 ),
1222 (
1223 "actor_work_queue_depth_high_water_mark",
1224 ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK.attrs(),
1225 ),
1226 (
1227 "last_nonzero_queue_depth_age_ms",
1228 LAST_NONZERO_QUEUE_DEPTH_AGE_MS.attrs(),
1229 ),
1230 ];
1231
1232 for (expected_name, meta) in &cases {
1233 let introspect = meta
1236 .get(INTROSPECT)
1237 .unwrap_or_else(|| panic!("{expected_name}: missing INTROSPECT meta-attr"));
1238 assert_eq!(
1239 introspect.name, *expected_name,
1240 "short name mismatch for {expected_name}"
1241 );
1242 assert!(
1243 !introspect.desc.is_empty(),
1244 "{expected_name}: desc should not be empty"
1245 );
1246 }
1247
1248 use hyperactor_config::attrs::AttrKeyInfo;
1251 let registry_count = inventory::iter::<AttrKeyInfo>()
1252 .filter(|info| {
1253 info.name.starts_with("hyperactor_mesh::introspect::")
1254 && info.meta.get(INTROSPECT).is_some()
1255 })
1256 .count();
1257 assert_eq!(
1258 cases.len(),
1259 registry_count,
1260 "test must cover all INTROSPECT-tagged keys in this module"
1261 );
1262 }
1263
1264 fn test_actor_ref(proc_name: &str, actor_name: &str) -> NodeRef {
1265 use hyperactor::channel::ChannelAddr;
1266
1267 NodeRef::Actor(
1268 ResourceId::proc_addr_from_name(ChannelAddr::Local(0), proc_name)
1269 .actor_addr(actor_name),
1270 )
1271 }
1272
1273 fn root_view() -> RootAttrsView {
1274 RootAttrsView {
1275 num_hosts: 3,
1276 started_at: std::time::UNIX_EPOCH,
1277 started_by: "testuser".into(),
1278 system_children: vec![test_actor_ref("proc", "child1")],
1279 }
1280 }
1281
1282 fn host_view() -> HostAttrsView {
1283 HostAttrsView {
1284 addr: "10.0.0.1:8080".into(),
1285 num_procs: 2,
1286 system_children: vec![test_actor_ref("proc", "sys")],
1287 memory: Default::default(),
1288 }
1289 }
1290
1291 fn proc_view() -> ProcAttrsView {
1292 ProcAttrsView {
1293 proc_name: "worker".into(),
1294 num_actors: 5,
1295 system_children: vec![],
1296 stopped_children: vec![test_actor_ref("proc", "old")],
1297 stopped_retention_cap: 10,
1298 is_poisoned: false,
1299 failed_actor_count: 0,
1300 debug: Default::default(),
1301 }
1302 }
1303
1304 fn error_view() -> ErrorAttrsView {
1305 ErrorAttrsView {
1306 code: "not_found".into(),
1307 message: "child not found".into(),
1308 }
1309 }
1310
1311 #[test]
1313 fn test_root_view_round_trip() {
1314 let view = root_view();
1315 let rt = RootAttrsView::from_attrs(&view.to_attrs()).unwrap();
1316 assert_eq!(rt, view);
1317 }
1318
1319 #[test]
1321 fn test_host_view_round_trip() {
1322 let view = host_view();
1323 let rt = HostAttrsView::from_attrs(&view.to_attrs()).unwrap();
1324 assert_eq!(rt, view);
1325 }
1326
1327 #[test]
1329 fn test_proc_view_round_trip() {
1330 let view = proc_view();
1331 let rt = ProcAttrsView::from_attrs(&view.to_attrs()).unwrap();
1332 assert_eq!(rt, view);
1333 }
1334
1335 #[test]
1337 fn test_host_view_round_trip_with_memory() {
1338 let view = HostAttrsView {
1339 addr: "10.0.0.1:8080".into(),
1340 num_procs: 2,
1341 system_children: vec![],
1342 memory: ProcessMemoryStats {
1343 process_rss_bytes: Some(512 * 1024 * 1024),
1344 process_vm_size_bytes: Some(2 * 1024 * 1024 * 1024),
1345 },
1346 };
1347 let rt = HostAttrsView::from_attrs(&view.to_attrs()).unwrap();
1348 assert_eq!(rt, view);
1349 }
1350
1351 #[test]
1353 fn test_proc_view_round_trip_with_debug() {
1354 let view = ProcAttrsView {
1355 proc_name: "worker".into(),
1356 num_actors: 5,
1357 system_children: vec![],
1358 stopped_children: vec![],
1359 stopped_retention_cap: 10,
1360 is_poisoned: false,
1361 failed_actor_count: 0,
1362 debug: ProcDebugStats {
1363 memory: ProcessMemoryStats {
1364 process_rss_bytes: Some(256 * 1024 * 1024),
1365 process_vm_size_bytes: Some(1024 * 1024 * 1024),
1366 },
1367 actor_work_queue_depth_total: 42,
1368 actor_work_queue_depth_max: 7,
1369 actor_work_queue_depth_high_water_mark: 100,
1370 last_nonzero_queue_depth_age_ms: Some(5000),
1371 },
1372 };
1373 let rt = ProcAttrsView::from_attrs(&view.to_attrs()).unwrap();
1374 assert_eq!(rt, view);
1375 }
1376
1377 #[test]
1379 fn test_proc_debug_stats_pd1_warning_on_violation() {
1380 let mut attrs = Attrs::new();
1381 attrs.set(PROC_NAME, "test".to_string());
1382 attrs.set(ACTOR_WORK_QUEUE_DEPTH_TOTAL, 5u64);
1383 attrs.set(ACTOR_WORK_QUEUE_DEPTH_MAX, 10u64); let view = ProcAttrsView::from_attrs(&attrs).unwrap();
1386 assert_eq!(view.debug.actor_work_queue_depth_total, 5);
1387 assert_eq!(view.debug.actor_work_queue_depth_max, 10);
1388 }
1389
1390 #[test]
1392 fn test_proc_debug_stats_defaults_on_missing_attrs() {
1393 let mut attrs = Attrs::new();
1394 attrs.set(PROC_NAME, "old_proc".to_string());
1395 let view = ProcAttrsView::from_attrs(&attrs).unwrap();
1396 assert_eq!(view.debug, ProcDebugStats::default());
1397 }
1398
1399 #[test]
1401 fn test_error_view_round_trip() {
1402 let view = error_view();
1403 let rt = ErrorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1404 assert_eq!(rt, view);
1405 }
1406
1407 #[test]
1409 fn test_root_view_missing_started_at() {
1410 let mut attrs = Attrs::new();
1411 attrs.set(NODE_TYPE, "root".into());
1412 attrs.set(STARTED_BY, "user".into());
1413 let err = RootAttrsView::from_attrs(&attrs).unwrap_err();
1414 assert_eq!(err, AttrsViewError::MissingKey { key: "started_at" });
1415 }
1416
1417 #[test]
1419 fn test_root_view_missing_started_by() {
1420 let mut attrs = Attrs::new();
1421 attrs.set(NODE_TYPE, "root".into());
1422 attrs.set(STARTED_AT, std::time::UNIX_EPOCH);
1423 let err = RootAttrsView::from_attrs(&attrs).unwrap_err();
1424 assert_eq!(err, AttrsViewError::MissingKey { key: "started_by" });
1425 }
1426
1427 #[test]
1429 fn test_host_view_missing_addr() {
1430 let attrs = Attrs::new();
1431 let err = HostAttrsView::from_attrs(&attrs).unwrap_err();
1432 assert_eq!(err, AttrsViewError::MissingKey { key: "addr" });
1433 }
1434
1435 #[test]
1437 fn test_proc_view_missing_proc_name() {
1438 let attrs = Attrs::new();
1439 let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1440 assert_eq!(err, AttrsViewError::MissingKey { key: "proc_name" });
1441 }
1442
1443 #[test]
1445 fn test_proc_view_fi5_poisoned_but_no_failures() {
1446 let mut attrs = Attrs::new();
1447 attrs.set(PROC_NAME, "bad".into());
1448 attrs.set(IS_POISONED, true);
1449 attrs.set(FAILED_ACTOR_COUNT, 0usize);
1450 let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1451 assert!(matches!(
1452 err,
1453 AttrsViewError::InvariantViolation { label: "FI-5", .. }
1454 ));
1455 }
1456
1457 #[test]
1459 fn test_proc_view_fi5_failures_but_not_poisoned() {
1460 let mut attrs = Attrs::new();
1461 attrs.set(PROC_NAME, "bad".into());
1462 attrs.set(IS_POISONED, false);
1463 attrs.set(FAILED_ACTOR_COUNT, 2usize);
1464 let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1465 assert!(matches!(
1466 err,
1467 AttrsViewError::InvariantViolation { label: "FI-5", .. }
1468 ));
1469 }
1470
1471 #[test]
1473 fn test_derive_properties_unparseable_json() {
1474 let props = derive_properties("not json");
1475 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "parse_error"));
1476 }
1477
1478 #[test]
1480 fn test_derive_properties_unknown_node_type() {
1481 let attrs = Attrs::new();
1482 let json = serde_json::to_string(&attrs).unwrap();
1483 let props = derive_properties(&json);
1484 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "unknown_node_type"));
1485 }
1486
1487 #[test]
1489 fn test_derive_properties_malformed_root() {
1490 let mut attrs = Attrs::new();
1491 attrs.set(NODE_TYPE, "root".into());
1492 let json = serde_json::to_string(&attrs).unwrap();
1493 let props = derive_properties(&json);
1494 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "malformed_root"));
1495 }
1496
1497 #[test]
1499 fn test_derive_properties_malformed_proc_fi5() {
1500 let mut attrs = Attrs::new();
1501 attrs.set(NODE_TYPE, "proc".into());
1502 attrs.set(PROC_NAME, "bad".into());
1503 attrs.set(IS_POISONED, true);
1504 attrs.set(FAILED_ACTOR_COUNT, 0usize);
1505 let json = serde_json::to_string(&attrs).unwrap();
1506 let props = derive_properties(&json);
1507 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "malformed_proc"));
1508 }
1509
1510 #[test]
1512 fn test_derive_properties_valid_root() {
1513 let view = root_view();
1514 let json = serde_json::to_string(&view.to_attrs()).unwrap();
1515 let props = derive_properties(&json);
1516 assert!(matches!(props, NodeProperties::Root { num_hosts: 3, .. }));
1517 }
1518
1519 #[test]
1521 fn test_derive_properties_valid_host() {
1522 let view = host_view();
1523 let json = serde_json::to_string(&view.to_attrs()).unwrap();
1524 let props = derive_properties(&json);
1525 assert!(matches!(props, NodeProperties::Host { num_procs: 2, .. }));
1526 }
1527
1528 #[test]
1530 fn test_derive_properties_valid_proc() {
1531 let view = proc_view();
1532 let json = serde_json::to_string(&view.to_attrs()).unwrap();
1533 let props = derive_properties(&json);
1534 assert!(matches!(props, NodeProperties::Proc { num_actors: 5, .. }));
1535 }
1536
1537 #[test]
1539 fn test_derive_properties_valid_error() {
1540 let view = error_view();
1541 let json = serde_json::to_string(&view.to_attrs()).unwrap();
1542 let props = derive_properties(&json);
1543 assert!(matches!(props, NodeProperties::Error { .. }));
1544 if let NodeProperties::Error { code, message } = props {
1545 assert_eq!(code, "not_found");
1546 assert_eq!(message, "child not found");
1547 }
1548 }
1549
1550 #[test]
1552 fn test_derive_properties_valid_actor() {
1553 use hyperactor::introspect::ACTOR_TYPE;
1554 use hyperactor::introspect::MESSAGES_PROCESSED;
1555 use hyperactor::introspect::STATUS;
1556
1557 let mut attrs = Attrs::new();
1558 attrs.set(STATUS, "running".into());
1559 attrs.set(ACTOR_TYPE, "TestActor".into());
1560 attrs.set(MESSAGES_PROCESSED, 7u64);
1561 let json = serde_json::to_string(&attrs).unwrap();
1562 let props = derive_properties(&json);
1563 assert!(matches!(
1564 props,
1565 NodeProperties::Actor {
1566 messages_processed: 7,
1567 ..
1568 }
1569 ));
1570 }
1571
1572 fn inject_unknown_key(attrs: &Attrs) -> String {
1576 let mut map: serde_json::Map<String, serde_json::Value> =
1577 serde_json::from_str(&serde_json::to_string(attrs).unwrap()).unwrap();
1578 map.insert(
1579 "unknown_future_key".into(),
1580 serde_json::Value::String("surprise".into()),
1581 );
1582 serde_json::to_string(&map).unwrap()
1583 }
1584
1585 #[test]
1586 fn test_ia6_root_ignores_unknown_keys() {
1587 let json = inject_unknown_key(&root_view().to_attrs());
1588 let props = derive_properties(&json);
1589 assert!(matches!(props, NodeProperties::Root { num_hosts: 3, .. }));
1590 }
1591
1592 #[test]
1593 fn test_ia6_host_ignores_unknown_keys() {
1594 let json = inject_unknown_key(&host_view().to_attrs());
1595 let props = derive_properties(&json);
1596 assert!(matches!(props, NodeProperties::Host { num_procs: 2, .. }));
1597 }
1598
1599 #[test]
1600 fn test_ia6_proc_ignores_unknown_keys() {
1601 let json = inject_unknown_key(&proc_view().to_attrs());
1602 let props = derive_properties(&json);
1603 assert!(matches!(props, NodeProperties::Proc { num_actors: 5, .. }));
1604 }
1605
1606 #[test]
1607 fn test_ia6_error_ignores_unknown_keys() {
1608 let json = inject_unknown_key(&error_view().to_attrs());
1609 let props = derive_properties(&json);
1610 assert!(matches!(props, NodeProperties::Error { .. }));
1611 }
1612
1613 #[test]
1614 fn test_ia6_actor_ignores_unknown_keys() {
1615 use hyperactor::introspect::ACTOR_TYPE;
1616 use hyperactor::introspect::STATUS;
1617
1618 let mut attrs = Attrs::new();
1619 attrs.set(STATUS, "running".into());
1620 attrs.set(ACTOR_TYPE, "TestActor".into());
1621 let json = inject_unknown_key(&attrs);
1622 let props = derive_properties(&json);
1623 assert!(matches!(props, NodeProperties::Actor { .. }));
1624 }
1625
1626 fn strip_comment(mut value: serde_json::Value) -> serde_json::Value {
1638 if let Some(obj) = value.as_object_mut() {
1639 obj.remove("$comment");
1640 }
1641 value
1642 }
1643
1644 #[test]
1645 fn test_node_payload_schema_snapshot() {
1646 let schema = schemars::schema_for!(dto::NodePayloadDto);
1647 let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1648 let expected: serde_json::Value = strip_comment(
1649 serde_json::from_str(include_str!("testdata/node_payload_schema.json"))
1650 .expect("snapshot must be valid JSON"),
1651 );
1652 assert_eq!(
1653 actual, expected,
1654 "schema changed — review and update snapshot if intentional"
1655 );
1656 }
1657
1658 #[test]
1660 fn test_payloads_validate_against_schema() {
1661 use hyperactor::channel::ChannelAddr;
1662
1663 let schema = schemars::schema_for!(dto::NodePayloadDto);
1664 let schema_value = serde_json::to_value(&schema).unwrap();
1665 let compiled = jsonschema::JSONSchema::compile(&schema_value).expect("schema must compile");
1666
1667 let epoch = std::time::UNIX_EPOCH;
1668 let proc_id = ResourceId::proc_addr_from_name(ChannelAddr::Local(0), "worker");
1669 let actor_id = proc_id.actor_addr("actor");
1670
1671 let samples = [
1672 NodePayload {
1673 identity: NodeRef::Root,
1674 properties: NodeProperties::Root {
1675 num_hosts: 2,
1676 started_at: epoch,
1677 started_by: "testuser".into(),
1678 system_children: vec![],
1679 },
1680 children: vec![NodeRef::Host(actor_id.clone())],
1681 parent: None,
1682 as_of: epoch,
1683 },
1684 NodePayload {
1685 identity: NodeRef::Host(actor_id.clone()),
1686 properties: NodeProperties::Host {
1687 addr: "10.0.0.1:8080".into(),
1688 num_procs: 2,
1689 system_children: vec![test_actor_ref("proc", "sys")],
1690 memory: Default::default(),
1691 },
1692 children: vec![NodeRef::Proc(proc_id.clone())],
1693 parent: Some(NodeRef::Root),
1694 as_of: epoch,
1695 },
1696 NodePayload {
1697 identity: NodeRef::Proc(proc_id.clone()),
1698 properties: NodeProperties::Proc {
1699 proc_name: "worker".into(),
1700 num_actors: 5,
1701 system_children: vec![],
1702 stopped_children: vec![],
1703 stopped_retention_cap: 10,
1704 is_poisoned: false,
1705 failed_actor_count: 0,
1706 debug: Default::default(),
1707 },
1708 children: vec![NodeRef::Actor(actor_id.clone())],
1709 parent: Some(NodeRef::Host(actor_id.clone())),
1710 as_of: epoch,
1711 },
1712 NodePayload {
1713 identity: NodeRef::Actor(actor_id.clone()),
1714 properties: NodeProperties::Actor {
1715 actor_status: "running".into(),
1716 actor_type: "MyActor".into(),
1717 messages_processed: 42,
1718 created_at: Some(epoch),
1719 last_message_handler: Some("handle_ping".into()),
1720 total_processing_time_us: 1000,
1721 flight_recorder: None,
1722 is_system: false,
1723 failure_info: None,
1724 },
1725 children: vec![],
1726 parent: Some(NodeRef::Proc(proc_id.clone())),
1727 as_of: epoch,
1728 },
1729 NodePayload {
1730 identity: NodeRef::Actor(actor_id.clone()),
1731 properties: NodeProperties::Error {
1732 code: "not_found".into(),
1733 message: "child not found".into(),
1734 },
1735 children: vec![],
1736 parent: None,
1737 as_of: epoch,
1738 },
1739 ];
1740
1741 for (i, payload) in samples.iter().enumerate() {
1742 let dto = dto::NodePayloadDto::from(payload.clone());
1743 let value = serde_json::to_value(&dto).unwrap();
1744 assert!(
1745 compiled.is_valid(&value),
1746 "sample {i} failed schema validation"
1747 );
1748 }
1749 }
1750
1751 #[test]
1755 fn test_served_schema_is_raw_plus_id() {
1756 let raw: serde_json::Value =
1757 serde_json::to_value(schemars::schema_for!(dto::NodePayloadDto)).unwrap();
1758
1759 let mut served = raw.clone();
1761 served.as_object_mut().unwrap().insert(
1762 "$id".into(),
1763 serde_json::Value::String("https://monarch.meta.com/schemas/v1/node_payload".into()),
1764 );
1765
1766 let mut stripped = served;
1768 stripped.as_object_mut().unwrap().remove("$id");
1769 assert_eq!(raw, stripped, "served schema differs from raw beyond $id");
1770 }
1771
1772 #[test]
1774 fn test_error_schema_snapshot() {
1775 use crate::mesh_admin::ApiErrorEnvelope;
1776
1777 let schema = schemars::schema_for!(ApiErrorEnvelope);
1778 let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1779 let expected: serde_json::Value = strip_comment(
1780 serde_json::from_str(include_str!("testdata/error_schema.json"))
1781 .expect("error snapshot must be valid JSON"),
1782 );
1783 assert_eq!(
1784 actual, expected,
1785 "error schema changed — review and update snapshot if intentional"
1786 );
1787 }
1788
1789 #[test]
1791 fn test_admin_info_schema_snapshot() {
1792 use crate::mesh_admin::AdminInfo;
1793
1794 let schema = schemars::schema_for!(AdminInfo);
1795 let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1796 let expected: serde_json::Value = strip_comment(
1797 serde_json::from_str(include_str!("testdata/admin_info_schema.json"))
1798 .expect("admin info snapshot must be valid JSON"),
1799 );
1800 assert_eq!(
1801 actual, expected,
1802 "AdminInfo schema changed — review and update snapshot if intentional"
1803 );
1804 }
1805
1806 #[test]
1808 fn test_openapi_spec_snapshot() {
1809 let actual = crate::mesh_admin::build_openapi_spec();
1810 let expected: serde_json::Value = strip_comment(
1811 serde_json::from_str(include_str!("testdata/openapi.json"))
1812 .expect("OpenAPI snapshot must be valid JSON"),
1813 );
1814 assert_eq!(
1815 actual, expected,
1816 "OpenAPI spec changed — review and update snapshot if intentional"
1817 );
1818 }
1819}