1pub mod dto;
217
218use hyperactor_config::Attrs;
219use hyperactor_config::INTROSPECT;
220use hyperactor_config::IntrospectAttr;
221use hyperactor_config::declare_attrs;
222
223declare_attrs! {
225 @meta(INTROSPECT = IntrospectAttr {
227 name: "node_type".into(),
228 desc: "Topology role: root, host, proc, error".into(),
229 })
230 pub attr NODE_TYPE: String;
231
232 @meta(INTROSPECT = IntrospectAttr {
234 name: "addr".into(),
235 desc: "Host network address".into(),
236 })
237 pub attr ADDR: String;
238
239 @meta(INTROSPECT = IntrospectAttr {
241 name: "num_procs".into(),
242 desc: "Number of procs on a host".into(),
243 })
244 pub attr NUM_PROCS: usize = 0;
245
246 @meta(INTROSPECT = IntrospectAttr {
248 name: "proc_name".into(),
249 desc: "Human-readable proc name".into(),
250 })
251 pub attr PROC_NAME: String;
252
253 @meta(INTROSPECT = IntrospectAttr {
255 name: "num_actors".into(),
256 desc: "Number of actors in a proc".into(),
257 })
258 pub attr NUM_ACTORS: usize = 0;
259
260 @meta(INTROSPECT = IntrospectAttr {
262 name: "system_children".into(),
263 desc: "References of system/infrastructure children".into(),
264 })
265 pub attr SYSTEM_CHILDREN: Vec<NodeRef>;
266
267 @meta(INTROSPECT = IntrospectAttr {
269 name: "stopped_children".into(),
270 desc: "References of stopped children".into(),
271 })
272 pub attr STOPPED_CHILDREN: Vec<NodeRef>;
273
274 @meta(INTROSPECT = IntrospectAttr {
276 name: "stopped_retention_cap".into(),
277 desc: "Maximum number of stopped children retained".into(),
278 })
279 pub attr STOPPED_RETENTION_CAP: usize = 0;
280
281 @meta(INTROSPECT = IntrospectAttr {
284 name: "is_poisoned".into(),
285 desc: "Whether this proc is poisoned (refusing new spawns)".into(),
286 })
287 pub attr IS_POISONED: bool = false;
288
289 @meta(INTROSPECT = IntrospectAttr {
291 name: "failed_actor_count".into(),
292 desc: "Number of failed actors in this proc".into(),
293 })
294 pub attr FAILED_ACTOR_COUNT: usize = 0;
295
296 @meta(INTROSPECT = IntrospectAttr {
298 name: "started_at".into(),
299 desc: "Timestamp when the mesh was started".into(),
300 })
301 pub attr STARTED_AT: std::time::SystemTime;
302
303 @meta(INTROSPECT = IntrospectAttr {
305 name: "started_by".into(),
306 desc: "Username who started the mesh".into(),
307 })
308 pub attr STARTED_BY: String;
309
310 @meta(INTROSPECT = IntrospectAttr {
312 name: "num_hosts".into(),
313 desc: "Number of hosts in the mesh".into(),
314 })
315 pub attr NUM_HOSTS: usize = 0;
316
317}
318
319use hyperactor::introspect::AttrsViewError;
320
321#[derive(Debug, Clone, PartialEq)]
323pub struct RootAttrsView {
324 pub num_hosts: usize,
325 pub started_at: SystemTime,
326 pub started_by: String,
327 pub system_children: Vec<NodeRef>,
328}
329
330impl RootAttrsView {
331 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
335 let num_hosts = *attrs.get(NUM_HOSTS).unwrap_or(&0);
336 let started_at = *attrs
337 .get(STARTED_AT)
338 .ok_or_else(|| AttrsViewError::missing("started_at"))?;
339 let started_by = attrs
340 .get(STARTED_BY)
341 .ok_or_else(|| AttrsViewError::missing("started_by"))?
342 .clone();
343 let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
344 Ok(Self {
345 num_hosts,
346 started_at,
347 started_by,
348 system_children,
349 })
350 }
351
352 pub fn to_attrs(&self) -> Attrs {
354 let mut attrs = Attrs::new();
355 attrs.set(NODE_TYPE, "root".to_string());
356 attrs.set(NUM_HOSTS, self.num_hosts);
357 attrs.set(STARTED_AT, self.started_at);
358 attrs.set(STARTED_BY, self.started_by.clone());
359 attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
360 attrs
361 }
362}
363
364#[derive(Debug, Clone, PartialEq)]
366pub struct HostAttrsView {
367 pub addr: String,
368 pub num_procs: usize,
369 pub system_children: Vec<NodeRef>,
370}
371
372impl HostAttrsView {
373 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
377 let addr = attrs
378 .get(ADDR)
379 .ok_or_else(|| AttrsViewError::missing("addr"))?
380 .clone();
381 let num_procs = *attrs.get(NUM_PROCS).unwrap_or(&0);
382 let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
383 Ok(Self {
384 addr,
385 num_procs,
386 system_children,
387 })
388 }
389
390 pub fn to_attrs(&self) -> Attrs {
392 let mut attrs = Attrs::new();
393 attrs.set(NODE_TYPE, "host".to_string());
394 attrs.set(ADDR, self.addr.clone());
395 attrs.set(NUM_PROCS, self.num_procs);
396 attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
397 attrs
398 }
399}
400
401#[derive(Debug, Clone, PartialEq)]
403pub struct ProcAttrsView {
404 pub proc_name: String,
405 pub num_actors: usize,
406 pub system_children: Vec<NodeRef>,
407 pub stopped_children: Vec<NodeRef>,
408 pub stopped_retention_cap: usize,
409 pub is_poisoned: bool,
410 pub failed_actor_count: usize,
411}
412
413impl ProcAttrsView {
414 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
418 let proc_name = attrs
419 .get(PROC_NAME)
420 .ok_or_else(|| AttrsViewError::missing("proc_name"))?
421 .clone();
422 let num_actors = *attrs.get(NUM_ACTORS).unwrap_or(&0);
423 let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
424 let stopped_children = attrs.get(STOPPED_CHILDREN).cloned().unwrap_or_default();
425 let stopped_retention_cap = *attrs.get(STOPPED_RETENTION_CAP).unwrap_or(&0);
426 let is_poisoned = *attrs.get(IS_POISONED).unwrap_or(&false);
427 let failed_actor_count = *attrs.get(FAILED_ACTOR_COUNT).unwrap_or(&0);
428
429 if is_poisoned != (failed_actor_count > 0) {
431 return Err(AttrsViewError::invariant(
432 "FI-5",
433 format!("is_poisoned={is_poisoned} but failed_actor_count={failed_actor_count}"),
434 ));
435 }
436
437 Ok(Self {
438 proc_name,
439 num_actors,
440 system_children,
441 stopped_children,
442 stopped_retention_cap,
443 is_poisoned,
444 failed_actor_count,
445 })
446 }
447
448 pub fn to_attrs(&self) -> Attrs {
450 let mut attrs = Attrs::new();
451 attrs.set(NODE_TYPE, "proc".to_string());
452 attrs.set(PROC_NAME, self.proc_name.clone());
453 attrs.set(NUM_ACTORS, self.num_actors);
454 attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
455 attrs.set(STOPPED_CHILDREN, self.stopped_children.clone());
456 attrs.set(STOPPED_RETENTION_CAP, self.stopped_retention_cap);
457 attrs.set(IS_POISONED, self.is_poisoned);
458 attrs.set(FAILED_ACTOR_COUNT, self.failed_actor_count);
459 attrs
460 }
461}
462
463#[derive(Debug, Clone, PartialEq)]
465pub struct ErrorAttrsView {
466 pub code: String,
467 pub message: String,
468}
469
470impl ErrorAttrsView {
471 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
474 use hyperactor::introspect::ERROR_CODE;
475 use hyperactor::introspect::ERROR_MESSAGE;
476
477 let code = attrs
478 .get(ERROR_CODE)
479 .ok_or_else(|| AttrsViewError::missing("error_code"))?
480 .clone();
481 let message = attrs.get(ERROR_MESSAGE).cloned().unwrap_or_default();
482 Ok(Self { code, message })
483 }
484
485 pub fn to_attrs(&self) -> Attrs {
487 use hyperactor::introspect::ERROR_CODE;
488 use hyperactor::introspect::ERROR_MESSAGE;
489
490 let mut attrs = Attrs::new();
491 attrs.set(ERROR_CODE, self.code.clone());
492 attrs.set(ERROR_MESSAGE, self.message.clone());
493 attrs
494 }
495}
496
497use std::fmt;
500use std::str::FromStr;
501use std::time::SystemTime;
502
503use serde::Deserialize;
504use serde::Serialize;
505use typeuri::Named;
506
507#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
512pub enum NodeRef {
513 #[serde(rename = "root")]
516 Root,
517 Host(hyperactor::reference::ActorId),
519 Proc(hyperactor::reference::ProcId),
521 Actor(hyperactor::reference::ActorId),
523}
524
525hyperactor_config::impl_attrvalue!(NodeRef);
526
527impl fmt::Display for NodeRef {
528 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
529 match self {
530 Self::Root => write!(f, "root"),
531 Self::Host(id) => write!(f, "host:{}", id),
532 Self::Proc(id) => fmt::Display::fmt(id, f),
533 Self::Actor(id) => fmt::Display::fmt(id, f),
534 }
535 }
536}
537
538#[derive(Debug, thiserror::Error)]
540pub enum NodeRefParseError {
541 #[error("empty reference string")]
542 Empty,
543 #[error("invalid host reference: {0}")]
544 InvalidHost(hyperactor::reference::ReferenceParsingError),
545 #[error("port references are not valid node references")]
546 PortNotAllowed,
547 #[error(transparent)]
548 Reference(#[from] hyperactor::reference::ReferenceParsingError),
549}
550
551impl FromStr for NodeRef {
552 type Err = NodeRefParseError;
553
554 fn from_str(s: &str) -> Result<Self, Self::Err> {
555 if s.is_empty() {
556 return Err(NodeRefParseError::Empty);
557 }
558 if s == "root" {
559 return Ok(Self::Root);
560 }
561 if let Some(rest) = s.strip_prefix("host:") {
562 let actor_id: hyperactor::reference::ActorId =
563 rest.parse().map_err(NodeRefParseError::InvalidHost)?;
564 return Ok(Self::Host(actor_id));
565 }
566 let r: hyperactor::reference::Reference = s.parse()?;
567 match r {
568 hyperactor::reference::Reference::Proc(id) => Ok(Self::Proc(id)),
569 hyperactor::reference::Reference::Actor(id) => Ok(Self::Actor(id)),
570 hyperactor::reference::Reference::Port(_) => Err(NodeRefParseError::PortNotAllowed),
571 }
572 }
573}
574
575impl From<hyperactor::introspect::IntrospectRef> for NodeRef {
576 fn from(r: hyperactor::introspect::IntrospectRef) -> Self {
577 match r {
578 hyperactor::introspect::IntrospectRef::Proc(id) => Self::Proc(id),
579 hyperactor::introspect::IntrospectRef::Actor(id) => Self::Actor(id),
580 }
581 }
582}
583
584#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
595pub struct NodePayload {
596 pub identity: NodeRef,
598 pub properties: NodeProperties,
600 pub children: Vec<NodeRef>,
602 pub parent: Option<NodeRef>,
604 pub as_of: SystemTime,
606}
607wirevalue::register_type!(NodePayload);
608
609#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
615pub enum NodeProperties {
616 Root {
618 num_hosts: usize,
619 started_at: SystemTime,
620 started_by: String,
621 system_children: Vec<NodeRef>,
622 },
623 Host {
625 addr: String,
626 num_procs: usize,
627 system_children: Vec<NodeRef>,
628 },
629 Proc {
631 proc_name: String,
632 num_actors: usize,
633 system_children: Vec<NodeRef>,
634 stopped_children: Vec<NodeRef>,
635 stopped_retention_cap: usize,
636 is_poisoned: bool,
637 failed_actor_count: usize,
638 },
639 Actor {
641 actor_status: String,
642 actor_type: String,
643 messages_processed: u64,
644 created_at: Option<SystemTime>,
645 last_message_handler: Option<String>,
646 total_processing_time_us: u64,
647 flight_recorder: Option<String>,
648 is_system: bool,
649 failure_info: Option<FailureInfo>,
650 },
651 Error { code: String, message: String },
653}
654wirevalue::register_type!(NodeProperties);
655
656#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
661pub struct FailureInfo {
662 pub error_message: String,
664 pub root_cause_actor: hyperactor::reference::ActorId,
666 pub root_cause_name: Option<String>,
668 pub occurred_at: SystemTime,
670 pub is_propagated: bool,
672}
673wirevalue::register_type!(FailureInfo);
674
675trait IntoNodeProperties {
680 fn into_node_properties(self) -> NodeProperties;
681}
682
683impl IntoNodeProperties for RootAttrsView {
684 fn into_node_properties(self) -> NodeProperties {
685 NodeProperties::Root {
686 num_hosts: self.num_hosts,
687 started_at: self.started_at,
688 started_by: self.started_by,
689 system_children: self.system_children,
690 }
691 }
692}
693
694impl IntoNodeProperties for HostAttrsView {
695 fn into_node_properties(self) -> NodeProperties {
696 NodeProperties::Host {
697 addr: self.addr,
698 num_procs: self.num_procs,
699 system_children: self.system_children,
700 }
701 }
702}
703
704impl IntoNodeProperties for ProcAttrsView {
705 fn into_node_properties(self) -> NodeProperties {
706 NodeProperties::Proc {
707 proc_name: self.proc_name,
708 num_actors: self.num_actors,
709 system_children: self.system_children,
710 stopped_children: self.stopped_children,
711 stopped_retention_cap: self.stopped_retention_cap,
712 is_poisoned: self.is_poisoned,
713 failed_actor_count: self.failed_actor_count,
714 }
715 }
716}
717
718impl IntoNodeProperties for ErrorAttrsView {
719 fn into_node_properties(self) -> NodeProperties {
720 NodeProperties::Error {
721 code: self.code,
722 message: self.message,
723 }
724 }
725}
726
727impl IntoNodeProperties for hyperactor::introspect::ActorAttrsView {
728 fn into_node_properties(self) -> NodeProperties {
729 let actor_status = match &self.status_reason {
730 Some(reason) => format!("{}: {}", self.status, reason),
731 None => self.status.clone(),
732 };
733
734 let failure_info = self.failure.map(|fi| FailureInfo {
735 error_message: fi.error_message,
736 root_cause_actor: fi.root_cause_actor,
737 root_cause_name: fi.root_cause_name,
738 occurred_at: fi.occurred_at,
739 is_propagated: fi.is_propagated,
740 });
741
742 NodeProperties::Actor {
743 actor_status,
744 actor_type: self.actor_type,
745 messages_processed: self.messages_processed,
746 created_at: self.created_at,
747 last_message_handler: self.last_handler,
748 total_processing_time_us: self.total_processing_time_us,
749 flight_recorder: self.flight_recorder,
750 is_system: self.is_system,
751 failure_info,
752 }
753 }
754}
755
756pub fn derive_properties(attrs_json: &str) -> NodeProperties {
769 let attrs: Attrs = match serde_json::from_str(attrs_json) {
770 Ok(a) => a,
771 Err(_) => {
772 return NodeProperties::Error {
773 code: "parse_error".into(),
774 message: "failed to parse attrs JSON".into(),
775 };
776 }
777 };
778
779 let node_type = attrs.get(NODE_TYPE).cloned().unwrap_or_default();
780
781 match node_type.as_str() {
782 "root" => match RootAttrsView::from_attrs(&attrs) {
783 Ok(v) => v.into_node_properties(),
784 Err(e) => NodeProperties::Error {
785 code: "malformed_root".into(),
786 message: e.to_string(),
787 },
788 },
789 "host" => match HostAttrsView::from_attrs(&attrs) {
790 Ok(v) => v.into_node_properties(),
791 Err(e) => NodeProperties::Error {
792 code: "malformed_host".into(),
793 message: e.to_string(),
794 },
795 },
796 "proc" => match ProcAttrsView::from_attrs(&attrs) {
797 Ok(v) => v.into_node_properties(),
798 Err(e) => NodeProperties::Error {
799 code: "malformed_proc".into(),
800 message: e.to_string(),
801 },
802 },
803 _ => {
804 use hyperactor::introspect::ERROR_CODE;
807 use hyperactor::introspect::STATUS;
808
809 if attrs.get(ERROR_CODE).is_some() {
810 return match ErrorAttrsView::from_attrs(&attrs) {
811 Ok(v) => v.into_node_properties(),
812 Err(e) => NodeProperties::Error {
813 code: "malformed_error".into(),
814 message: e.to_string(),
815 },
816 };
817 }
818
819 if attrs.get(STATUS).is_none() {
820 return NodeProperties::Error {
821 code: "unknown_node_type".into(),
822 message: format!("unrecognized node_type: {:?}", node_type),
823 };
824 }
825
826 match hyperactor::introspect::ActorAttrsView::from_attrs(&attrs) {
827 Ok(v) => v.into_node_properties(),
828 Err(e) => NodeProperties::Error {
829 code: "malformed_actor".into(),
830 message: e.to_string(),
831 },
832 }
833 }
834 }
835}
836
837pub fn to_node_payload(result: hyperactor::introspect::IntrospectResult) -> NodePayload {
840 NodePayload {
841 identity: result.identity.into(),
842 properties: derive_properties(&result.attrs),
843 children: result.children.into_iter().map(NodeRef::from).collect(),
844 parent: result.parent.map(NodeRef::from),
845 as_of: result.as_of,
846 }
847}
848
849pub fn to_node_payload_with(
852 result: hyperactor::introspect::IntrospectResult,
853 identity: NodeRef,
854 parent: Option<NodeRef>,
855) -> NodePayload {
856 NodePayload {
857 identity,
858 properties: derive_properties(&result.attrs),
859 children: result.children.into_iter().map(NodeRef::from).collect(),
860 parent,
861 as_of: result.as_of,
862 }
863}
864
865#[cfg(test)]
866mod tests {
867 use super::*;
868
869 #[test]
872 fn test_mesh_introspect_keys_are_tagged() {
873 let cases = vec![
874 ("node_type", NODE_TYPE.attrs()),
875 ("addr", ADDR.attrs()),
876 ("num_procs", NUM_PROCS.attrs()),
877 ("proc_name", PROC_NAME.attrs()),
878 ("num_actors", NUM_ACTORS.attrs()),
879 ("system_children", SYSTEM_CHILDREN.attrs()),
880 ("stopped_children", STOPPED_CHILDREN.attrs()),
881 ("stopped_retention_cap", STOPPED_RETENTION_CAP.attrs()),
882 ("is_poisoned", IS_POISONED.attrs()),
883 ("failed_actor_count", FAILED_ACTOR_COUNT.attrs()),
884 ("started_at", STARTED_AT.attrs()),
885 ("started_by", STARTED_BY.attrs()),
886 ("num_hosts", NUM_HOSTS.attrs()),
887 ];
888
889 for (expected_name, meta) in &cases {
890 let introspect = meta
893 .get(INTROSPECT)
894 .unwrap_or_else(|| panic!("{expected_name}: missing INTROSPECT meta-attr"));
895 assert_eq!(
896 introspect.name, *expected_name,
897 "short name mismatch for {expected_name}"
898 );
899 assert!(
900 !introspect.desc.is_empty(),
901 "{expected_name}: desc should not be empty"
902 );
903 }
904
905 use hyperactor_config::attrs::AttrKeyInfo;
908 let registry_count = inventory::iter::<AttrKeyInfo>()
909 .filter(|info| {
910 info.name.starts_with("hyperactor_mesh::introspect::")
911 && info.meta.get(INTROSPECT).is_some()
912 })
913 .count();
914 assert_eq!(
915 cases.len(),
916 registry_count,
917 "test must cover all INTROSPECT-tagged keys in this module"
918 );
919 }
920
921 fn test_actor_ref(proc_name: &str, actor_name: &str, pid: usize) -> NodeRef {
922 use hyperactor::channel::ChannelAddr;
923 use hyperactor::reference::ProcId;
924 NodeRef::Actor(
925 ProcId::with_name(ChannelAddr::Local(0), proc_name).actor_id(actor_name, pid),
926 )
927 }
928
929 fn root_view() -> RootAttrsView {
930 RootAttrsView {
931 num_hosts: 3,
932 started_at: std::time::UNIX_EPOCH,
933 started_by: "testuser".into(),
934 system_children: vec![test_actor_ref("proc", "child1", 0)],
935 }
936 }
937
938 fn host_view() -> HostAttrsView {
939 HostAttrsView {
940 addr: "10.0.0.1:8080".into(),
941 num_procs: 2,
942 system_children: vec![test_actor_ref("proc", "sys", 0)],
943 }
944 }
945
946 fn proc_view() -> ProcAttrsView {
947 ProcAttrsView {
948 proc_name: "worker".into(),
949 num_actors: 5,
950 system_children: vec![],
951 stopped_children: vec![test_actor_ref("proc", "old", 0)],
952 stopped_retention_cap: 10,
953 is_poisoned: false,
954 failed_actor_count: 0,
955 }
956 }
957
958 fn error_view() -> ErrorAttrsView {
959 ErrorAttrsView {
960 code: "not_found".into(),
961 message: "child not found".into(),
962 }
963 }
964
965 #[test]
967 fn test_root_view_round_trip() {
968 let view = root_view();
969 let rt = RootAttrsView::from_attrs(&view.to_attrs()).unwrap();
970 assert_eq!(rt, view);
971 }
972
973 #[test]
975 fn test_host_view_round_trip() {
976 let view = host_view();
977 let rt = HostAttrsView::from_attrs(&view.to_attrs()).unwrap();
978 assert_eq!(rt, view);
979 }
980
981 #[test]
983 fn test_proc_view_round_trip() {
984 let view = proc_view();
985 let rt = ProcAttrsView::from_attrs(&view.to_attrs()).unwrap();
986 assert_eq!(rt, view);
987 }
988
989 #[test]
991 fn test_error_view_round_trip() {
992 let view = error_view();
993 let rt = ErrorAttrsView::from_attrs(&view.to_attrs()).unwrap();
994 assert_eq!(rt, view);
995 }
996
997 #[test]
999 fn test_root_view_missing_started_at() {
1000 let mut attrs = Attrs::new();
1001 attrs.set(NODE_TYPE, "root".into());
1002 attrs.set(STARTED_BY, "user".into());
1003 let err = RootAttrsView::from_attrs(&attrs).unwrap_err();
1004 assert_eq!(err, AttrsViewError::MissingKey { key: "started_at" });
1005 }
1006
1007 #[test]
1009 fn test_root_view_missing_started_by() {
1010 let mut attrs = Attrs::new();
1011 attrs.set(NODE_TYPE, "root".into());
1012 attrs.set(STARTED_AT, std::time::UNIX_EPOCH);
1013 let err = RootAttrsView::from_attrs(&attrs).unwrap_err();
1014 assert_eq!(err, AttrsViewError::MissingKey { key: "started_by" });
1015 }
1016
1017 #[test]
1019 fn test_host_view_missing_addr() {
1020 let attrs = Attrs::new();
1021 let err = HostAttrsView::from_attrs(&attrs).unwrap_err();
1022 assert_eq!(err, AttrsViewError::MissingKey { key: "addr" });
1023 }
1024
1025 #[test]
1027 fn test_proc_view_missing_proc_name() {
1028 let attrs = Attrs::new();
1029 let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1030 assert_eq!(err, AttrsViewError::MissingKey { key: "proc_name" });
1031 }
1032
1033 #[test]
1035 fn test_proc_view_fi5_poisoned_but_no_failures() {
1036 let mut attrs = Attrs::new();
1037 attrs.set(PROC_NAME, "bad".into());
1038 attrs.set(IS_POISONED, true);
1039 attrs.set(FAILED_ACTOR_COUNT, 0usize);
1040 let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1041 assert!(matches!(
1042 err,
1043 AttrsViewError::InvariantViolation { label: "FI-5", .. }
1044 ));
1045 }
1046
1047 #[test]
1049 fn test_proc_view_fi5_failures_but_not_poisoned() {
1050 let mut attrs = Attrs::new();
1051 attrs.set(PROC_NAME, "bad".into());
1052 attrs.set(IS_POISONED, false);
1053 attrs.set(FAILED_ACTOR_COUNT, 2usize);
1054 let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1055 assert!(matches!(
1056 err,
1057 AttrsViewError::InvariantViolation { label: "FI-5", .. }
1058 ));
1059 }
1060
1061 #[test]
1063 fn test_derive_properties_unparseable_json() {
1064 let props = derive_properties("not json");
1065 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "parse_error"));
1066 }
1067
1068 #[test]
1070 fn test_derive_properties_unknown_node_type() {
1071 let attrs = Attrs::new();
1072 let json = serde_json::to_string(&attrs).unwrap();
1073 let props = derive_properties(&json);
1074 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "unknown_node_type"));
1075 }
1076
1077 #[test]
1079 fn test_derive_properties_malformed_root() {
1080 let mut attrs = Attrs::new();
1081 attrs.set(NODE_TYPE, "root".into());
1082 let json = serde_json::to_string(&attrs).unwrap();
1083 let props = derive_properties(&json);
1084 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "malformed_root"));
1085 }
1086
1087 #[test]
1089 fn test_derive_properties_malformed_proc_fi5() {
1090 let mut attrs = Attrs::new();
1091 attrs.set(NODE_TYPE, "proc".into());
1092 attrs.set(PROC_NAME, "bad".into());
1093 attrs.set(IS_POISONED, true);
1094 attrs.set(FAILED_ACTOR_COUNT, 0usize);
1095 let json = serde_json::to_string(&attrs).unwrap();
1096 let props = derive_properties(&json);
1097 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "malformed_proc"));
1098 }
1099
1100 #[test]
1102 fn test_derive_properties_valid_root() {
1103 let view = root_view();
1104 let json = serde_json::to_string(&view.to_attrs()).unwrap();
1105 let props = derive_properties(&json);
1106 assert!(matches!(props, NodeProperties::Root { num_hosts: 3, .. }));
1107 }
1108
1109 #[test]
1111 fn test_derive_properties_valid_host() {
1112 let view = host_view();
1113 let json = serde_json::to_string(&view.to_attrs()).unwrap();
1114 let props = derive_properties(&json);
1115 assert!(matches!(props, NodeProperties::Host { num_procs: 2, .. }));
1116 }
1117
1118 #[test]
1120 fn test_derive_properties_valid_proc() {
1121 let view = proc_view();
1122 let json = serde_json::to_string(&view.to_attrs()).unwrap();
1123 let props = derive_properties(&json);
1124 assert!(matches!(props, NodeProperties::Proc { num_actors: 5, .. }));
1125 }
1126
1127 #[test]
1129 fn test_derive_properties_valid_error() {
1130 let view = error_view();
1131 let json = serde_json::to_string(&view.to_attrs()).unwrap();
1132 let props = derive_properties(&json);
1133 assert!(matches!(props, NodeProperties::Error { .. }));
1134 if let NodeProperties::Error { code, message } = props {
1135 assert_eq!(code, "not_found");
1136 assert_eq!(message, "child not found");
1137 }
1138 }
1139
1140 #[test]
1142 fn test_derive_properties_valid_actor() {
1143 use hyperactor::introspect::ACTOR_TYPE;
1144 use hyperactor::introspect::MESSAGES_PROCESSED;
1145 use hyperactor::introspect::STATUS;
1146
1147 let mut attrs = Attrs::new();
1148 attrs.set(STATUS, "running".into());
1149 attrs.set(ACTOR_TYPE, "TestActor".into());
1150 attrs.set(MESSAGES_PROCESSED, 7u64);
1151 let json = serde_json::to_string(&attrs).unwrap();
1152 let props = derive_properties(&json);
1153 assert!(matches!(
1154 props,
1155 NodeProperties::Actor {
1156 messages_processed: 7,
1157 ..
1158 }
1159 ));
1160 }
1161
1162 fn inject_unknown_key(attrs: &Attrs) -> String {
1166 let mut map: serde_json::Map<String, serde_json::Value> =
1167 serde_json::from_str(&serde_json::to_string(attrs).unwrap()).unwrap();
1168 map.insert(
1169 "unknown_future_key".into(),
1170 serde_json::Value::String("surprise".into()),
1171 );
1172 serde_json::to_string(&map).unwrap()
1173 }
1174
1175 #[test]
1176 fn test_ia6_root_ignores_unknown_keys() {
1177 let json = inject_unknown_key(&root_view().to_attrs());
1178 let props = derive_properties(&json);
1179 assert!(matches!(props, NodeProperties::Root { num_hosts: 3, .. }));
1180 }
1181
1182 #[test]
1183 fn test_ia6_host_ignores_unknown_keys() {
1184 let json = inject_unknown_key(&host_view().to_attrs());
1185 let props = derive_properties(&json);
1186 assert!(matches!(props, NodeProperties::Host { num_procs: 2, .. }));
1187 }
1188
1189 #[test]
1190 fn test_ia6_proc_ignores_unknown_keys() {
1191 let json = inject_unknown_key(&proc_view().to_attrs());
1192 let props = derive_properties(&json);
1193 assert!(matches!(props, NodeProperties::Proc { num_actors: 5, .. }));
1194 }
1195
1196 #[test]
1197 fn test_ia6_error_ignores_unknown_keys() {
1198 let json = inject_unknown_key(&error_view().to_attrs());
1199 let props = derive_properties(&json);
1200 assert!(matches!(props, NodeProperties::Error { .. }));
1201 }
1202
1203 #[test]
1204 fn test_ia6_actor_ignores_unknown_keys() {
1205 use hyperactor::introspect::ACTOR_TYPE;
1206 use hyperactor::introspect::STATUS;
1207
1208 let mut attrs = Attrs::new();
1209 attrs.set(STATUS, "running".into());
1210 attrs.set(ACTOR_TYPE, "TestActor".into());
1211 let json = inject_unknown_key(&attrs);
1212 let props = derive_properties(&json);
1213 assert!(matches!(props, NodeProperties::Actor { .. }));
1214 }
1215
1216 fn strip_comment(mut value: serde_json::Value) -> serde_json::Value {
1228 if let Some(obj) = value.as_object_mut() {
1229 obj.remove("$comment");
1230 }
1231 value
1232 }
1233
1234 #[test]
1235 fn test_node_payload_schema_snapshot() {
1236 let schema = schemars::schema_for!(dto::NodePayloadDto);
1237 let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1238 let expected: serde_json::Value = strip_comment(
1239 serde_json::from_str(include_str!("testdata/node_payload_schema.json"))
1240 .expect("snapshot must be valid JSON"),
1241 );
1242 assert_eq!(
1243 actual, expected,
1244 "schema changed — review and update snapshot if intentional"
1245 );
1246 }
1247
1248 #[test]
1250 fn test_payloads_validate_against_schema() {
1251 use hyperactor::channel::ChannelAddr;
1252 use hyperactor::reference::ProcId;
1253
1254 let schema = schemars::schema_for!(dto::NodePayloadDto);
1255 let schema_value = serde_json::to_value(&schema).unwrap();
1256 let compiled = jsonschema::JSONSchema::compile(&schema_value).expect("schema must compile");
1257
1258 let epoch = std::time::UNIX_EPOCH;
1259 let proc_id = ProcId::with_name(ChannelAddr::Local(0), "worker");
1260 let actor_id = proc_id.actor_id("actor", 0);
1261
1262 let samples = [
1263 NodePayload {
1264 identity: NodeRef::Root,
1265 properties: NodeProperties::Root {
1266 num_hosts: 2,
1267 started_at: epoch,
1268 started_by: "testuser".into(),
1269 system_children: vec![],
1270 },
1271 children: vec![NodeRef::Host(actor_id.clone())],
1272 parent: None,
1273 as_of: epoch,
1274 },
1275 NodePayload {
1276 identity: NodeRef::Host(actor_id.clone()),
1277 properties: NodeProperties::Host {
1278 addr: "10.0.0.1:8080".into(),
1279 num_procs: 2,
1280 system_children: vec![test_actor_ref("proc", "sys", 0)],
1281 },
1282 children: vec![NodeRef::Proc(proc_id.clone())],
1283 parent: Some(NodeRef::Root),
1284 as_of: epoch,
1285 },
1286 NodePayload {
1287 identity: NodeRef::Proc(proc_id.clone()),
1288 properties: NodeProperties::Proc {
1289 proc_name: "worker".into(),
1290 num_actors: 5,
1291 system_children: vec![],
1292 stopped_children: vec![],
1293 stopped_retention_cap: 10,
1294 is_poisoned: false,
1295 failed_actor_count: 0,
1296 },
1297 children: vec![NodeRef::Actor(actor_id.clone())],
1298 parent: Some(NodeRef::Host(actor_id.clone())),
1299 as_of: epoch,
1300 },
1301 NodePayload {
1302 identity: NodeRef::Actor(actor_id.clone()),
1303 properties: NodeProperties::Actor {
1304 actor_status: "running".into(),
1305 actor_type: "MyActor".into(),
1306 messages_processed: 42,
1307 created_at: Some(epoch),
1308 last_message_handler: Some("handle_ping".into()),
1309 total_processing_time_us: 1000,
1310 flight_recorder: None,
1311 is_system: false,
1312 failure_info: None,
1313 },
1314 children: vec![],
1315 parent: Some(NodeRef::Proc(proc_id.clone())),
1316 as_of: epoch,
1317 },
1318 NodePayload {
1319 identity: NodeRef::Actor(actor_id.clone()),
1320 properties: NodeProperties::Error {
1321 code: "not_found".into(),
1322 message: "child not found".into(),
1323 },
1324 children: vec![],
1325 parent: None,
1326 as_of: epoch,
1327 },
1328 ];
1329
1330 for (i, payload) in samples.iter().enumerate() {
1331 let dto = dto::NodePayloadDto::from(payload.clone());
1332 let value = serde_json::to_value(&dto).unwrap();
1333 assert!(
1334 compiled.is_valid(&value),
1335 "sample {i} failed schema validation"
1336 );
1337 }
1338 }
1339
1340 #[test]
1344 fn test_served_schema_is_raw_plus_id() {
1345 let raw: serde_json::Value =
1346 serde_json::to_value(schemars::schema_for!(dto::NodePayloadDto)).unwrap();
1347
1348 let mut served = raw.clone();
1350 served.as_object_mut().unwrap().insert(
1351 "$id".into(),
1352 serde_json::Value::String("https://monarch.meta.com/schemas/v1/node_payload".into()),
1353 );
1354
1355 let mut stripped = served;
1357 stripped.as_object_mut().unwrap().remove("$id");
1358 assert_eq!(raw, stripped, "served schema differs from raw beyond $id");
1359 }
1360
1361 #[test]
1363 fn test_error_schema_snapshot() {
1364 use crate::mesh_admin::ApiErrorEnvelope;
1365
1366 let schema = schemars::schema_for!(ApiErrorEnvelope);
1367 let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1368 let expected: serde_json::Value = strip_comment(
1369 serde_json::from_str(include_str!("testdata/error_schema.json"))
1370 .expect("error snapshot must be valid JSON"),
1371 );
1372 assert_eq!(
1373 actual, expected,
1374 "error schema changed — review and update snapshot if intentional"
1375 );
1376 }
1377
1378 #[test]
1380 fn test_admin_info_schema_snapshot() {
1381 use crate::mesh_admin::AdminInfo;
1382
1383 let schema = schemars::schema_for!(AdminInfo);
1384 let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1385 let expected: serde_json::Value = strip_comment(
1386 serde_json::from_str(include_str!("testdata/admin_info_schema.json"))
1387 .expect("admin info snapshot must be valid JSON"),
1388 );
1389 assert_eq!(
1390 actual, expected,
1391 "AdminInfo schema changed — review and update snapshot if intentional"
1392 );
1393 }
1394
1395 #[test]
1397 fn test_openapi_spec_snapshot() {
1398 let actual = crate::mesh_admin::build_openapi_spec();
1399 let expected: serde_json::Value = strip_comment(
1400 serde_json::from_str(include_str!("testdata/openapi.json"))
1401 .expect("OpenAPI snapshot must be valid JSON"),
1402 );
1403 assert_eq!(
1404 actual, expected,
1405 "OpenAPI spec changed — review and update snapshot if intentional"
1406 );
1407 }
1408}