1use hyperactor_config::Attrs;
82use hyperactor_config::INTROSPECT;
83use hyperactor_config::IntrospectAttr;
84use hyperactor_config::declare_attrs;
85
86declare_attrs! {
88 @meta(INTROSPECT = IntrospectAttr {
90 name: "node_type".into(),
91 desc: "Topology role: root, host, proc, error".into(),
92 })
93 pub attr NODE_TYPE: String;
94
95 @meta(INTROSPECT = IntrospectAttr {
97 name: "addr".into(),
98 desc: "Host network address".into(),
99 })
100 pub attr ADDR: String;
101
102 @meta(INTROSPECT = IntrospectAttr {
104 name: "num_procs".into(),
105 desc: "Number of procs on a host".into(),
106 })
107 pub attr NUM_PROCS: usize = 0;
108
109 @meta(INTROSPECT = IntrospectAttr {
111 name: "proc_name".into(),
112 desc: "Human-readable proc name".into(),
113 })
114 pub attr PROC_NAME: String;
115
116 @meta(INTROSPECT = IntrospectAttr {
118 name: "num_actors".into(),
119 desc: "Number of actors in a proc".into(),
120 })
121 pub attr NUM_ACTORS: usize = 0;
122
123 @meta(INTROSPECT = IntrospectAttr {
125 name: "system_children".into(),
126 desc: "References of system/infrastructure children".into(),
127 })
128 pub attr SYSTEM_CHILDREN: Vec<String>;
129
130 @meta(INTROSPECT = IntrospectAttr {
132 name: "stopped_children".into(),
133 desc: "References of stopped children".into(),
134 })
135 pub attr STOPPED_CHILDREN: Vec<String>;
136
137 @meta(INTROSPECT = IntrospectAttr {
139 name: "stopped_retention_cap".into(),
140 desc: "Maximum number of stopped children retained".into(),
141 })
142 pub attr STOPPED_RETENTION_CAP: usize = 0;
143
144 @meta(INTROSPECT = IntrospectAttr {
147 name: "is_poisoned".into(),
148 desc: "Whether this proc is poisoned (refusing new spawns)".into(),
149 })
150 pub attr IS_POISONED: bool = false;
151
152 @meta(INTROSPECT = IntrospectAttr {
154 name: "failed_actor_count".into(),
155 desc: "Number of failed actors in this proc".into(),
156 })
157 pub attr FAILED_ACTOR_COUNT: usize = 0;
158
159 @meta(INTROSPECT = IntrospectAttr {
161 name: "started_at".into(),
162 desc: "Timestamp when the mesh was started".into(),
163 })
164 pub attr STARTED_AT: std::time::SystemTime;
165
166 @meta(INTROSPECT = IntrospectAttr {
168 name: "started_by".into(),
169 desc: "Username who started the mesh".into(),
170 })
171 pub attr STARTED_BY: String;
172
173 @meta(INTROSPECT = IntrospectAttr {
175 name: "num_hosts".into(),
176 desc: "Number of hosts in the mesh".into(),
177 })
178 pub attr NUM_HOSTS: usize = 0;
179
180}
181
182use hyperactor::introspect::AttrsViewError;
183
184#[derive(Debug, Clone, PartialEq)]
186pub struct RootAttrsView {
187 pub num_hosts: usize,
188 pub started_at: std::time::SystemTime,
189 pub started_by: String,
190 pub system_children: Vec<String>,
191}
192
193impl RootAttrsView {
194 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
198 let num_hosts = *attrs.get(NUM_HOSTS).unwrap_or(&0);
199 let started_at = *attrs
200 .get(STARTED_AT)
201 .ok_or_else(|| AttrsViewError::missing("started_at"))?;
202 let started_by = attrs
203 .get(STARTED_BY)
204 .ok_or_else(|| AttrsViewError::missing("started_by"))?
205 .clone();
206 let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
207 Ok(Self {
208 num_hosts,
209 started_at,
210 started_by,
211 system_children,
212 })
213 }
214
215 pub fn to_attrs(&self) -> Attrs {
217 let mut attrs = Attrs::new();
218 attrs.set(NODE_TYPE, "root".to_string());
219 attrs.set(NUM_HOSTS, self.num_hosts);
220 attrs.set(STARTED_AT, self.started_at);
221 attrs.set(STARTED_BY, self.started_by.clone());
222 attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
223 attrs
224 }
225}
226
227#[derive(Debug, Clone, PartialEq)]
229pub struct HostAttrsView {
230 pub addr: String,
231 pub num_procs: usize,
232 pub system_children: Vec<String>,
233}
234
235impl HostAttrsView {
236 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
240 let addr = attrs
241 .get(ADDR)
242 .ok_or_else(|| AttrsViewError::missing("addr"))?
243 .clone();
244 let num_procs = *attrs.get(NUM_PROCS).unwrap_or(&0);
245 let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
246 Ok(Self {
247 addr,
248 num_procs,
249 system_children,
250 })
251 }
252
253 pub fn to_attrs(&self) -> Attrs {
255 let mut attrs = Attrs::new();
256 attrs.set(NODE_TYPE, "host".to_string());
257 attrs.set(ADDR, self.addr.clone());
258 attrs.set(NUM_PROCS, self.num_procs);
259 attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
260 attrs
261 }
262}
263
264#[derive(Debug, Clone, PartialEq)]
266pub struct ProcAttrsView {
267 pub proc_name: String,
268 pub num_actors: usize,
269 pub system_children: Vec<String>,
270 pub stopped_children: Vec<String>,
271 pub stopped_retention_cap: usize,
272 pub is_poisoned: bool,
273 pub failed_actor_count: usize,
274}
275
276impl ProcAttrsView {
277 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
281 let proc_name = attrs
282 .get(PROC_NAME)
283 .ok_or_else(|| AttrsViewError::missing("proc_name"))?
284 .clone();
285 let num_actors = *attrs.get(NUM_ACTORS).unwrap_or(&0);
286 let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
287 let stopped_children = attrs.get(STOPPED_CHILDREN).cloned().unwrap_or_default();
288 let stopped_retention_cap = *attrs.get(STOPPED_RETENTION_CAP).unwrap_or(&0);
289 let is_poisoned = *attrs.get(IS_POISONED).unwrap_or(&false);
290 let failed_actor_count = *attrs.get(FAILED_ACTOR_COUNT).unwrap_or(&0);
291
292 if is_poisoned != (failed_actor_count > 0) {
294 return Err(AttrsViewError::invariant(
295 "FI-5",
296 format!("is_poisoned={is_poisoned} but failed_actor_count={failed_actor_count}"),
297 ));
298 }
299
300 Ok(Self {
301 proc_name,
302 num_actors,
303 system_children,
304 stopped_children,
305 stopped_retention_cap,
306 is_poisoned,
307 failed_actor_count,
308 })
309 }
310
311 pub fn to_attrs(&self) -> Attrs {
313 let mut attrs = Attrs::new();
314 attrs.set(NODE_TYPE, "proc".to_string());
315 attrs.set(PROC_NAME, self.proc_name.clone());
316 attrs.set(NUM_ACTORS, self.num_actors);
317 attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
318 attrs.set(STOPPED_CHILDREN, self.stopped_children.clone());
319 attrs.set(STOPPED_RETENTION_CAP, self.stopped_retention_cap);
320 attrs.set(IS_POISONED, self.is_poisoned);
321 attrs.set(FAILED_ACTOR_COUNT, self.failed_actor_count);
322 attrs
323 }
324}
325
326#[derive(Debug, Clone, PartialEq)]
328pub struct ErrorAttrsView {
329 pub code: String,
330 pub message: String,
331}
332
333impl ErrorAttrsView {
334 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
337 use hyperactor::introspect::ERROR_CODE;
338 use hyperactor::introspect::ERROR_MESSAGE;
339
340 let code = attrs
341 .get(ERROR_CODE)
342 .ok_or_else(|| AttrsViewError::missing("error_code"))?
343 .clone();
344 let message = attrs.get(ERROR_MESSAGE).cloned().unwrap_or_default();
345 Ok(Self { code, message })
346 }
347
348 pub fn to_attrs(&self) -> Attrs {
350 use hyperactor::introspect::ERROR_CODE;
351 use hyperactor::introspect::ERROR_MESSAGE;
352
353 let mut attrs = Attrs::new();
354 attrs.set(ERROR_CODE, self.code.clone());
355 attrs.set(ERROR_MESSAGE, self.message.clone());
356 attrs
357 }
358}
359
360use schemars::JsonSchema;
367use serde::Deserialize;
368use serde::Serialize;
369use typeuri::Named;
370
371#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, JsonSchema)]
379pub struct NodePayload {
380 pub identity: String,
382 pub properties: NodeProperties,
384 pub children: Vec<String>,
386 pub parent: Option<String>,
388 pub as_of: String,
390}
391wirevalue::register_type!(NodePayload);
392
393#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, JsonSchema)]
396pub enum NodeProperties {
397 Root {
399 num_hosts: usize,
400 started_at: String,
401 started_by: String,
402 system_children: Vec<String>,
403 },
404 Host {
406 addr: String,
407 num_procs: usize,
408 system_children: Vec<String>,
409 },
410 Proc {
412 proc_name: String,
413 num_actors: usize,
414 system_children: Vec<String>,
415 stopped_children: Vec<String>,
416 stopped_retention_cap: usize,
417 is_poisoned: bool,
418 failed_actor_count: usize,
419 },
420 Actor {
422 actor_status: String,
423 actor_type: String,
424 messages_processed: u64,
425 created_at: String,
426 last_message_handler: Option<String>,
427 total_processing_time_us: u64,
428 flight_recorder: Option<String>,
429 is_system: bool,
430 failure_info: Option<FailureInfo>,
431 },
432 Error { code: String, message: String },
434}
435wirevalue::register_type!(NodeProperties);
436
437#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, JsonSchema)]
439pub struct FailureInfo {
440 pub error_message: String,
441 pub root_cause_actor: String,
442 pub root_cause_name: Option<String>,
443 pub occurred_at: String,
444 pub is_propagated: bool,
445}
446wirevalue::register_type!(FailureInfo);
447
448trait IntoNodeProperties {
453 fn into_node_properties(self) -> NodeProperties;
454}
455
456impl IntoNodeProperties for RootAttrsView {
457 fn into_node_properties(self) -> NodeProperties {
458 NodeProperties::Root {
459 num_hosts: self.num_hosts,
460 started_at: humantime::format_rfc3339_millis(self.started_at).to_string(),
461 started_by: self.started_by,
462 system_children: self.system_children,
463 }
464 }
465}
466
467impl IntoNodeProperties for HostAttrsView {
468 fn into_node_properties(self) -> NodeProperties {
469 NodeProperties::Host {
470 addr: self.addr,
471 num_procs: self.num_procs,
472 system_children: self.system_children,
473 }
474 }
475}
476
477impl IntoNodeProperties for ProcAttrsView {
478 fn into_node_properties(self) -> NodeProperties {
479 NodeProperties::Proc {
480 proc_name: self.proc_name,
481 num_actors: self.num_actors,
482 system_children: self.system_children,
483 stopped_children: self.stopped_children,
484 stopped_retention_cap: self.stopped_retention_cap,
485 is_poisoned: self.is_poisoned,
486 failed_actor_count: self.failed_actor_count,
487 }
488 }
489}
490
491impl IntoNodeProperties for ErrorAttrsView {
492 fn into_node_properties(self) -> NodeProperties {
493 NodeProperties::Error {
494 code: self.code,
495 message: self.message,
496 }
497 }
498}
499
500impl IntoNodeProperties for hyperactor::introspect::ActorAttrsView {
501 fn into_node_properties(self) -> NodeProperties {
502 let actor_status = match &self.status_reason {
504 Some(reason) => format!("{}: {}", self.status, reason),
505 None => self.status.clone(),
506 };
507
508 let failure_info = self.failure.map(|fi| FailureInfo {
509 error_message: fi.error_message,
510 root_cause_actor: fi.root_cause_actor,
511 root_cause_name: fi.root_cause_name,
512 occurred_at: humantime::format_rfc3339_millis(fi.occurred_at).to_string(),
513 is_propagated: fi.is_propagated,
514 });
515
516 NodeProperties::Actor {
517 actor_status,
518 actor_type: self.actor_type,
519 messages_processed: self.messages_processed,
520 created_at: self
521 .created_at
522 .map(|t| humantime::format_rfc3339_millis(t).to_string())
523 .unwrap_or_default(),
524 last_message_handler: self.last_handler,
525 total_processing_time_us: self.total_processing_time_us,
526 flight_recorder: self.flight_recorder,
527 is_system: self.is_system,
528 failure_info,
529 }
530 }
531}
532
533pub fn derive_properties(attrs_json: &str) -> NodeProperties {
546 let attrs: Attrs = match serde_json::from_str(attrs_json) {
547 Ok(a) => a,
548 Err(_) => {
549 return NodeProperties::Error {
550 code: "parse_error".into(),
551 message: "failed to parse attrs JSON".into(),
552 };
553 }
554 };
555
556 let node_type = attrs.get(NODE_TYPE).cloned().unwrap_or_default();
557
558 match node_type.as_str() {
559 "root" => match RootAttrsView::from_attrs(&attrs) {
560 Ok(v) => v.into_node_properties(),
561 Err(e) => NodeProperties::Error {
562 code: "malformed_root".into(),
563 message: e.to_string(),
564 },
565 },
566 "host" => match HostAttrsView::from_attrs(&attrs) {
567 Ok(v) => v.into_node_properties(),
568 Err(e) => NodeProperties::Error {
569 code: "malformed_host".into(),
570 message: e.to_string(),
571 },
572 },
573 "proc" => match ProcAttrsView::from_attrs(&attrs) {
574 Ok(v) => v.into_node_properties(),
575 Err(e) => NodeProperties::Error {
576 code: "malformed_proc".into(),
577 message: e.to_string(),
578 },
579 },
580 _ => {
581 use hyperactor::introspect::ERROR_CODE;
584 use hyperactor::introspect::STATUS;
585
586 if attrs.get(ERROR_CODE).is_some() {
587 return match ErrorAttrsView::from_attrs(&attrs) {
588 Ok(v) => v.into_node_properties(),
589 Err(e) => NodeProperties::Error {
590 code: "malformed_error".into(),
591 message: e.to_string(),
592 },
593 };
594 }
595
596 if attrs.get(STATUS).is_none() {
597 return NodeProperties::Error {
598 code: "unknown_node_type".into(),
599 message: format!("unrecognized node_type: {:?}", node_type),
600 };
601 }
602
603 match hyperactor::introspect::ActorAttrsView::from_attrs(&attrs) {
604 Ok(v) => v.into_node_properties(),
605 Err(e) => NodeProperties::Error {
606 code: "malformed_actor".into(),
607 message: e.to_string(),
608 },
609 }
610 }
611 }
612}
613
614pub fn to_node_payload(result: hyperactor::introspect::IntrospectResult) -> NodePayload {
619 NodePayload {
620 identity: result.identity,
621 properties: derive_properties(&result.attrs),
622 children: result.children,
623 parent: result.parent,
624 as_of: result.as_of,
625 }
626}
627
628pub fn to_node_payload_with(
631 result: hyperactor::introspect::IntrospectResult,
632 identity: String,
633 parent: Option<String>,
634) -> NodePayload {
635 NodePayload {
636 identity,
637 properties: derive_properties(&result.attrs),
638 children: result.children,
639 parent,
640 as_of: result.as_of,
641 }
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647
648 #[test]
651 fn test_mesh_introspect_keys_are_tagged() {
652 let cases = vec![
653 ("node_type", NODE_TYPE.attrs()),
654 ("addr", ADDR.attrs()),
655 ("num_procs", NUM_PROCS.attrs()),
656 ("proc_name", PROC_NAME.attrs()),
657 ("num_actors", NUM_ACTORS.attrs()),
658 ("system_children", SYSTEM_CHILDREN.attrs()),
659 ("stopped_children", STOPPED_CHILDREN.attrs()),
660 ("stopped_retention_cap", STOPPED_RETENTION_CAP.attrs()),
661 ("is_poisoned", IS_POISONED.attrs()),
662 ("failed_actor_count", FAILED_ACTOR_COUNT.attrs()),
663 ("started_at", STARTED_AT.attrs()),
664 ("started_by", STARTED_BY.attrs()),
665 ("num_hosts", NUM_HOSTS.attrs()),
666 ];
667
668 for (expected_name, meta) in &cases {
669 let introspect = meta
672 .get(INTROSPECT)
673 .unwrap_or_else(|| panic!("{expected_name}: missing INTROSPECT meta-attr"));
674 assert_eq!(
675 introspect.name, *expected_name,
676 "short name mismatch for {expected_name}"
677 );
678 assert!(
679 !introspect.desc.is_empty(),
680 "{expected_name}: desc should not be empty"
681 );
682 }
683
684 use hyperactor_config::attrs::AttrKeyInfo;
687 let registry_count = inventory::iter::<AttrKeyInfo>()
688 .filter(|info| {
689 info.name.starts_with("hyperactor_mesh::introspect::")
690 && info.meta.get(INTROSPECT).is_some()
691 })
692 .count();
693 assert_eq!(
694 cases.len(),
695 registry_count,
696 "test must cover all INTROSPECT-tagged keys in this module"
697 );
698 }
699
700 fn root_view() -> RootAttrsView {
701 RootAttrsView {
702 num_hosts: 3,
703 started_at: std::time::UNIX_EPOCH,
704 started_by: "testuser".into(),
705 system_children: vec!["child1".into()],
706 }
707 }
708
709 fn host_view() -> HostAttrsView {
710 HostAttrsView {
711 addr: "10.0.0.1:8080".into(),
712 num_procs: 2,
713 system_children: vec!["sys".into()],
714 }
715 }
716
717 fn proc_view() -> ProcAttrsView {
718 ProcAttrsView {
719 proc_name: "worker".into(),
720 num_actors: 5,
721 system_children: vec![],
722 stopped_children: vec!["old".into()],
723 stopped_retention_cap: 10,
724 is_poisoned: false,
725 failed_actor_count: 0,
726 }
727 }
728
729 fn error_view() -> ErrorAttrsView {
730 ErrorAttrsView {
731 code: "not_found".into(),
732 message: "child not found".into(),
733 }
734 }
735
736 #[test]
738 fn test_root_view_round_trip() {
739 let view = root_view();
740 let rt = RootAttrsView::from_attrs(&view.to_attrs()).unwrap();
741 assert_eq!(rt, view);
742 }
743
744 #[test]
746 fn test_host_view_round_trip() {
747 let view = host_view();
748 let rt = HostAttrsView::from_attrs(&view.to_attrs()).unwrap();
749 assert_eq!(rt, view);
750 }
751
752 #[test]
754 fn test_proc_view_round_trip() {
755 let view = proc_view();
756 let rt = ProcAttrsView::from_attrs(&view.to_attrs()).unwrap();
757 assert_eq!(rt, view);
758 }
759
760 #[test]
762 fn test_error_view_round_trip() {
763 let view = error_view();
764 let rt = ErrorAttrsView::from_attrs(&view.to_attrs()).unwrap();
765 assert_eq!(rt, view);
766 }
767
768 #[test]
770 fn test_root_view_missing_started_at() {
771 let mut attrs = Attrs::new();
772 attrs.set(NODE_TYPE, "root".into());
773 attrs.set(STARTED_BY, "user".into());
774 let err = RootAttrsView::from_attrs(&attrs).unwrap_err();
775 assert_eq!(err, AttrsViewError::MissingKey { key: "started_at" });
776 }
777
778 #[test]
780 fn test_root_view_missing_started_by() {
781 let mut attrs = Attrs::new();
782 attrs.set(NODE_TYPE, "root".into());
783 attrs.set(STARTED_AT, std::time::UNIX_EPOCH);
784 let err = RootAttrsView::from_attrs(&attrs).unwrap_err();
785 assert_eq!(err, AttrsViewError::MissingKey { key: "started_by" });
786 }
787
788 #[test]
790 fn test_host_view_missing_addr() {
791 let attrs = Attrs::new();
792 let err = HostAttrsView::from_attrs(&attrs).unwrap_err();
793 assert_eq!(err, AttrsViewError::MissingKey { key: "addr" });
794 }
795
796 #[test]
798 fn test_proc_view_missing_proc_name() {
799 let attrs = Attrs::new();
800 let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
801 assert_eq!(err, AttrsViewError::MissingKey { key: "proc_name" });
802 }
803
804 #[test]
806 fn test_proc_view_fi5_poisoned_but_no_failures() {
807 let mut attrs = Attrs::new();
808 attrs.set(PROC_NAME, "bad".into());
809 attrs.set(IS_POISONED, true);
810 attrs.set(FAILED_ACTOR_COUNT, 0usize);
811 let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
812 assert!(matches!(
813 err,
814 AttrsViewError::InvariantViolation { label: "FI-5", .. }
815 ));
816 }
817
818 #[test]
820 fn test_proc_view_fi5_failures_but_not_poisoned() {
821 let mut attrs = Attrs::new();
822 attrs.set(PROC_NAME, "bad".into());
823 attrs.set(IS_POISONED, false);
824 attrs.set(FAILED_ACTOR_COUNT, 2usize);
825 let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
826 assert!(matches!(
827 err,
828 AttrsViewError::InvariantViolation { label: "FI-5", .. }
829 ));
830 }
831
832 #[test]
834 fn test_derive_properties_unparseable_json() {
835 let props = derive_properties("not json");
836 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "parse_error"));
837 }
838
839 #[test]
841 fn test_derive_properties_unknown_node_type() {
842 let attrs = Attrs::new();
843 let json = serde_json::to_string(&attrs).unwrap();
844 let props = derive_properties(&json);
845 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "unknown_node_type"));
846 }
847
848 #[test]
850 fn test_derive_properties_malformed_root() {
851 let mut attrs = Attrs::new();
852 attrs.set(NODE_TYPE, "root".into());
853 let json = serde_json::to_string(&attrs).unwrap();
854 let props = derive_properties(&json);
855 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "malformed_root"));
856 }
857
858 #[test]
860 fn test_derive_properties_malformed_proc_fi5() {
861 let mut attrs = Attrs::new();
862 attrs.set(NODE_TYPE, "proc".into());
863 attrs.set(PROC_NAME, "bad".into());
864 attrs.set(IS_POISONED, true);
865 attrs.set(FAILED_ACTOR_COUNT, 0usize);
866 let json = serde_json::to_string(&attrs).unwrap();
867 let props = derive_properties(&json);
868 assert!(matches!(props, NodeProperties::Error { code, .. } if code == "malformed_proc"));
869 }
870
871 #[test]
873 fn test_derive_properties_valid_root() {
874 let view = root_view();
875 let json = serde_json::to_string(&view.to_attrs()).unwrap();
876 let props = derive_properties(&json);
877 assert!(matches!(props, NodeProperties::Root { num_hosts: 3, .. }));
878 }
879
880 #[test]
882 fn test_derive_properties_valid_host() {
883 let view = host_view();
884 let json = serde_json::to_string(&view.to_attrs()).unwrap();
885 let props = derive_properties(&json);
886 assert!(matches!(props, NodeProperties::Host { num_procs: 2, .. }));
887 }
888
889 #[test]
891 fn test_derive_properties_valid_proc() {
892 let view = proc_view();
893 let json = serde_json::to_string(&view.to_attrs()).unwrap();
894 let props = derive_properties(&json);
895 assert!(matches!(props, NodeProperties::Proc { num_actors: 5, .. }));
896 }
897
898 #[test]
900 fn test_derive_properties_valid_error() {
901 let view = error_view();
902 let json = serde_json::to_string(&view.to_attrs()).unwrap();
903 let props = derive_properties(&json);
904 assert!(matches!(props, NodeProperties::Error { .. }));
905 if let NodeProperties::Error { code, message } = props {
906 assert_eq!(code, "not_found");
907 assert_eq!(message, "child not found");
908 }
909 }
910
911 #[test]
913 fn test_derive_properties_valid_actor() {
914 use hyperactor::introspect::ACTOR_TYPE;
915 use hyperactor::introspect::MESSAGES_PROCESSED;
916 use hyperactor::introspect::STATUS;
917
918 let mut attrs = Attrs::new();
919 attrs.set(STATUS, "running".into());
920 attrs.set(ACTOR_TYPE, "TestActor".into());
921 attrs.set(MESSAGES_PROCESSED, 7u64);
922 let json = serde_json::to_string(&attrs).unwrap();
923 let props = derive_properties(&json);
924 assert!(matches!(
925 props,
926 NodeProperties::Actor {
927 messages_processed: 7,
928 ..
929 }
930 ));
931 }
932
933 fn inject_unknown_key(attrs: &Attrs) -> String {
937 let mut map: serde_json::Map<String, serde_json::Value> =
938 serde_json::from_str(&serde_json::to_string(attrs).unwrap()).unwrap();
939 map.insert(
940 "unknown_future_key".into(),
941 serde_json::Value::String("surprise".into()),
942 );
943 serde_json::to_string(&map).unwrap()
944 }
945
946 #[test]
947 fn test_ia6_root_ignores_unknown_keys() {
948 let json = inject_unknown_key(&root_view().to_attrs());
949 let props = derive_properties(&json);
950 assert!(matches!(props, NodeProperties::Root { num_hosts: 3, .. }));
951 }
952
953 #[test]
954 fn test_ia6_host_ignores_unknown_keys() {
955 let json = inject_unknown_key(&host_view().to_attrs());
956 let props = derive_properties(&json);
957 assert!(matches!(props, NodeProperties::Host { num_procs: 2, .. }));
958 }
959
960 #[test]
961 fn test_ia6_proc_ignores_unknown_keys() {
962 let json = inject_unknown_key(&proc_view().to_attrs());
963 let props = derive_properties(&json);
964 assert!(matches!(props, NodeProperties::Proc { num_actors: 5, .. }));
965 }
966
967 #[test]
968 fn test_ia6_error_ignores_unknown_keys() {
969 let json = inject_unknown_key(&error_view().to_attrs());
970 let props = derive_properties(&json);
971 assert!(matches!(props, NodeProperties::Error { .. }));
972 }
973
974 #[test]
975 fn test_ia6_actor_ignores_unknown_keys() {
976 use hyperactor::introspect::ACTOR_TYPE;
977 use hyperactor::introspect::STATUS;
978
979 let mut attrs = Attrs::new();
980 attrs.set(STATUS, "running".into());
981 attrs.set(ACTOR_TYPE, "TestActor".into());
982 let json = inject_unknown_key(&attrs);
983 let props = derive_properties(&json);
984 assert!(matches!(props, NodeProperties::Actor { .. }));
985 }
986
987 fn strip_comment(mut value: serde_json::Value) -> serde_json::Value {
999 if let Some(obj) = value.as_object_mut() {
1000 obj.remove("$comment");
1001 }
1002 value
1003 }
1004
1005 #[test]
1006 fn test_node_payload_schema_snapshot() {
1007 let schema = schemars::schema_for!(NodePayload);
1008 let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1009 let expected: serde_json::Value = strip_comment(
1010 serde_json::from_str(include_str!("testdata/node_payload_schema.json"))
1011 .expect("snapshot must be valid JSON"),
1012 );
1013 assert_eq!(
1014 actual, expected,
1015 "schema changed — review and update snapshot if intentional"
1016 );
1017 }
1018
1019 #[test]
1021 fn test_payloads_validate_against_schema() {
1022 let schema = schemars::schema_for!(NodePayload);
1023 let schema_value = serde_json::to_value(&schema).unwrap();
1024 let compiled = jsonschema::JSONSchema::compile(&schema_value).expect("schema must compile");
1025
1026 let samples = [
1027 NodePayload {
1028 identity: "root".into(),
1029 properties: NodeProperties::Root {
1030 num_hosts: 2,
1031 started_at: "2024-01-01T00:00:00.000Z".into(),
1032 started_by: "testuser".into(),
1033 system_children: vec![],
1034 },
1035 children: vec!["host1".into()],
1036 parent: None,
1037 as_of: "2024-01-01T00:00:00.000Z".into(),
1038 },
1039 NodePayload {
1040 identity: "host1".into(),
1041 properties: NodeProperties::Host {
1042 addr: "10.0.0.1:8080".into(),
1043 num_procs: 2,
1044 system_children: vec!["sys".into()],
1045 },
1046 children: vec!["proc1".into()],
1047 parent: Some("root".into()),
1048 as_of: "2024-01-01T00:00:00.000Z".into(),
1049 },
1050 NodePayload {
1051 identity: "proc1".into(),
1052 properties: NodeProperties::Proc {
1053 proc_name: "worker".into(),
1054 num_actors: 5,
1055 system_children: vec![],
1056 stopped_children: vec![],
1057 stopped_retention_cap: 10,
1058 is_poisoned: false,
1059 failed_actor_count: 0,
1060 },
1061 children: vec!["actor[0]".into()],
1062 parent: Some("host1".into()),
1063 as_of: "2024-01-01T00:00:00.000Z".into(),
1064 },
1065 NodePayload {
1066 identity: "actor[0]".into(),
1067 properties: NodeProperties::Actor {
1068 actor_status: "running".into(),
1069 actor_type: "MyActor".into(),
1070 messages_processed: 42,
1071 created_at: "2024-01-01T00:00:00.000Z".into(),
1072 last_message_handler: Some("handle_ping".into()),
1073 total_processing_time_us: 1000,
1074 flight_recorder: None,
1075 is_system: false,
1076 failure_info: None,
1077 },
1078 children: vec![],
1079 parent: Some("proc1".into()),
1080 as_of: "2024-01-01T00:00:00.000Z".into(),
1081 },
1082 NodePayload {
1083 identity: "err".into(),
1084 properties: NodeProperties::Error {
1085 code: "not_found".into(),
1086 message: "child not found".into(),
1087 },
1088 children: vec![],
1089 parent: None,
1090 as_of: "2024-01-01T00:00:00.000Z".into(),
1091 },
1092 ];
1093
1094 for (i, payload) in samples.iter().enumerate() {
1095 let value = serde_json::to_value(payload).unwrap();
1096 assert!(
1097 compiled.is_valid(&value),
1098 "sample {i} failed schema validation"
1099 );
1100 }
1101 }
1102
1103 #[test]
1107 fn test_served_schema_is_raw_plus_id() {
1108 let raw: serde_json::Value =
1109 serde_json::to_value(schemars::schema_for!(NodePayload)).unwrap();
1110
1111 let mut served = raw.clone();
1113 served.as_object_mut().unwrap().insert(
1114 "$id".into(),
1115 serde_json::Value::String("https://monarch.meta.com/schemas/v1/node_payload".into()),
1116 );
1117
1118 let mut stripped = served;
1120 stripped.as_object_mut().unwrap().remove("$id");
1121 assert_eq!(raw, stripped, "served schema differs from raw beyond $id");
1122 }
1123
1124 #[test]
1126 fn test_error_schema_snapshot() {
1127 use crate::mesh_admin::ApiErrorEnvelope;
1128
1129 let schema = schemars::schema_for!(ApiErrorEnvelope);
1130 let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1131 let expected: serde_json::Value = strip_comment(
1132 serde_json::from_str(include_str!("testdata/error_schema.json"))
1133 .expect("error snapshot must be valid JSON"),
1134 );
1135 assert_eq!(
1136 actual, expected,
1137 "error schema changed — review and update snapshot if intentional"
1138 );
1139 }
1140
1141 #[test]
1143 fn test_openapi_spec_snapshot() {
1144 let actual = crate::mesh_admin::build_openapi_spec();
1145 let expected: serde_json::Value = strip_comment(
1146 serde_json::from_str(include_str!("testdata/openapi.json"))
1147 .expect("OpenAPI snapshot must be valid JSON"),
1148 );
1149 assert_eq!(
1150 actual, expected,
1151 "OpenAPI spec changed — review and update snapshot if intentional"
1152 );
1153 }
1154}