1use std::fmt;
123use std::str::FromStr;
124use std::time::SystemTime;
125
126use hyperactor_config::Attrs;
127use hyperactor_config::INTROSPECT;
128use hyperactor_config::IntrospectAttr;
129use hyperactor_config::declare_attrs;
130use serde::Deserialize;
131use serde::Serialize;
132use typeuri::Named;
133
134use crate::ActorAddr;
135use crate::Addr;
136use crate::AddrParseError;
137use crate::InstanceCell;
138use crate::OncePortRef;
139use crate::ProcAddr;
140#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
148pub enum IntrospectRef {
149 Proc(ProcAddr),
151 Actor(ActorAddr),
153}
154hyperactor_config::impl_attrvalue!(IntrospectRef);
155
156#[derive(Debug, thiserror::Error)]
158pub enum IntrospectRefParseError {
159 #[error(transparent)]
161 Addr(#[from] AddrParseError),
162 #[error("port references are not valid introspection references")]
164 PortNotAllowed,
165}
166
167impl fmt::Display for IntrospectRef {
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169 match self {
170 Self::Proc(id) => fmt::Display::fmt(id, f),
171 Self::Actor(id) => fmt::Display::fmt(id, f),
172 }
173 }
174}
175
176impl FromStr for IntrospectRef {
177 type Err = IntrospectRefParseError;
178
179 fn from_str(s: &str) -> Result<Self, Self::Err> {
180 let r: Addr = s.parse()?;
181 match r {
182 Addr::Proc(id) => Ok(Self::Proc(id)),
183 Addr::Actor(id) => Ok(Self::Actor(id)),
184 Addr::Port(_) => Err(IntrospectRefParseError::PortNotAllowed),
185 }
186 }
187}
188
189impl From<ProcAddr> for IntrospectRef {
190 fn from(id: ProcAddr) -> Self {
191 Self::Proc(id)
192 }
193}
194
195impl From<ActorAddr> for IntrospectRef {
196 fn from(id: ActorAddr) -> Self {
197 Self::Actor(id)
198 }
199}
200
201declare_attrs! {
230 @meta(INTROSPECT = IntrospectAttr {
237 name: "status".into(),
238 desc: "Actor lifecycle status: running, stopped, failed".into(),
239 })
240 pub attr STATUS: String;
241
242 @meta(INTROSPECT = IntrospectAttr {
244 name: "status_reason".into(),
245 desc: "Reason for stop/failure (absent when running)".into(),
246 })
247 pub attr STATUS_REASON: String;
248
249 @meta(INTROSPECT = IntrospectAttr {
251 name: "actor_type".into(),
252 desc: "Fully-qualified actor type name".into(),
253 })
254 pub attr ACTOR_TYPE: String;
255
256 @meta(INTROSPECT = IntrospectAttr {
258 name: "messages_processed".into(),
259 desc: "Number of messages processed by this actor".into(),
260 })
261 pub attr MESSAGES_PROCESSED: u64 = 0;
262
263 @meta(INTROSPECT = IntrospectAttr {
265 name: "created_at".into(),
266 desc: "Timestamp when this actor was created".into(),
267 })
268 pub attr CREATED_AT: SystemTime;
269
270 @meta(INTROSPECT = IntrospectAttr {
272 name: "last_handler".into(),
273 desc: "Name of the last message handler invoked".into(),
274 })
275 pub attr LAST_HANDLER: String;
276
277 @meta(INTROSPECT = IntrospectAttr {
279 name: "total_processing_time_us".into(),
280 desc: "Total CPU time in message handlers (microseconds)".into(),
281 })
282 pub attr TOTAL_PROCESSING_TIME_US: u64 = 0;
283
284 @meta(INTROSPECT = IntrospectAttr {
286 name: "flight_recorder".into(),
287 desc: "Flight recorder JSON (recent trace events)".into(),
288 })
289 pub attr FLIGHT_RECORDER: String;
290
291 @meta(INTROSPECT = IntrospectAttr {
293 name: "is_system".into(),
294 desc: "Whether this actor is infrastructure/system".into(),
295 })
296 pub attr IS_SYSTEM: bool = false;
297
298 @meta(INTROSPECT = IntrospectAttr {
302 name: "children".into(),
303 desc: "Child references for tree navigation".into(),
304 })
305 pub attr CHILDREN: Vec<IntrospectRef>;
306
307 @meta(INTROSPECT = IntrospectAttr {
309 name: "error_code".into(),
310 desc: "Machine-readable error code (e.g. not_found)".into(),
311 })
312 pub attr ERROR_CODE: String;
313
314 @meta(INTROSPECT = IntrospectAttr {
316 name: "error_message".into(),
317 desc: "Human-readable error message".into(),
318 })
319 pub attr ERROR_MESSAGE: String;
320
321 @meta(INTROSPECT = IntrospectAttr {
335 name: "failure_error_message".into(),
336 desc: "Failure error message".into(),
337 })
338 pub attr FAILURE_ERROR_MESSAGE: String;
339
340 @meta(INTROSPECT = IntrospectAttr {
342 name: "failure_root_cause_actor".into(),
343 desc: "Actor that caused the failure (root cause)".into(),
344 })
345 pub attr FAILURE_ROOT_CAUSE_ACTOR: ActorAddr;
346
347 @meta(INTROSPECT = IntrospectAttr {
349 name: "failure_root_cause_name".into(),
350 desc: "Name of root cause actor".into(),
351 })
352 pub attr FAILURE_ROOT_CAUSE_NAME: String;
353
354 @meta(INTROSPECT = IntrospectAttr {
356 name: "failure_occurred_at".into(),
357 desc: "Timestamp when failure occurred".into(),
358 })
359 pub attr FAILURE_OCCURRED_AT: SystemTime;
360
361 @meta(INTROSPECT = IntrospectAttr {
363 name: "failure_is_propagated".into(),
364 desc: "Whether the failure was propagated from a child".into(),
365 })
366 pub attr FAILURE_IS_PROPAGATED: bool = false;
367}
368
369#[derive(Debug, Clone, PartialEq)]
373pub enum AttrsViewError {
374 MissingKey {
376 key: &'static str,
378 },
379 InvariantViolation {
381 label: &'static str,
383 detail: String,
385 },
386}
387
388impl fmt::Display for AttrsViewError {
389 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390 match self {
391 Self::MissingKey { key } => write!(f, "missing required key: {key}"),
392 Self::InvariantViolation { label, detail } => {
393 write!(f, "invariant {label} violated: {detail}")
394 }
395 }
396 }
397}
398
399impl std::error::Error for AttrsViewError {}
400
401impl AttrsViewError {
402 pub fn missing(key: &'static str) -> Self {
404 Self::MissingKey { key }
405 }
406
407 pub fn invariant(label: &'static str, detail: String) -> Self {
409 Self::InvariantViolation { label, detail }
410 }
411}
412
413#[derive(Debug, Clone, PartialEq)]
415pub struct FailureAttrs {
416 pub error_message: String,
418 pub root_cause_actor: ActorAddr,
420 pub root_cause_name: Option<String>,
422 pub occurred_at: SystemTime,
424 pub is_propagated: bool,
426}
427
428#[derive(Debug, Clone, PartialEq)]
430pub struct ActorAttrsView {
431 pub status: String,
433 pub status_reason: Option<String>,
435 pub actor_type: String,
437 pub messages_processed: u64,
439 pub created_at: Option<SystemTime>,
441 pub last_handler: Option<String>,
443 pub total_processing_time_us: u64,
445 pub flight_recorder: Option<String>,
447 pub is_system: bool,
449 pub failure: Option<FailureAttrs>,
451}
452
453impl ActorAttrsView {
454 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
460 let status = attrs
461 .get(STATUS)
462 .ok_or_else(|| AttrsViewError::missing("status"))?
463 .clone();
464 let status_reason = attrs.get(STATUS_REASON).cloned();
465 let actor_type = attrs
466 .get(ACTOR_TYPE)
467 .ok_or_else(|| AttrsViewError::missing("actor_type"))?
468 .clone();
469 let messages_processed = *attrs.get(MESSAGES_PROCESSED).unwrap_or(&0);
470 let created_at = attrs.get(CREATED_AT).copied();
471 let last_handler = attrs.get(LAST_HANDLER).cloned();
472 let total_processing_time_us = *attrs.get(TOTAL_PROCESSING_TIME_US).unwrap_or(&0);
473 let flight_recorder = attrs.get(FLIGHT_RECORDER).cloned();
474 let is_system = *attrs.get(IS_SYSTEM).unwrap_or(&false);
475
476 let is_terminal = status == "stopped" || status == "failed";
480 if status_reason.is_some() && !is_terminal {
481 return Err(AttrsViewError::invariant(
482 "IA-3",
483 format!(
484 "status_reason present but status is '{status}' (expected stopped or failed)"
485 ),
486 ));
487 }
488
489 let has_any_failure = attrs.get(FAILURE_ERROR_MESSAGE).is_some()
496 || attrs.get(FAILURE_ROOT_CAUSE_ACTOR).is_some()
497 || attrs.get(FAILURE_OCCURRED_AT).is_some();
498
499 let failure = if has_any_failure {
500 let error_message = attrs
501 .get(FAILURE_ERROR_MESSAGE)
502 .ok_or_else(|| AttrsViewError::missing("failure_error_message"))?
503 .clone();
504 let root_cause_actor = attrs
505 .get(FAILURE_ROOT_CAUSE_ACTOR)
506 .ok_or_else(|| AttrsViewError::missing("failure_root_cause_actor"))?
507 .clone();
508 let root_cause_name = attrs.get(FAILURE_ROOT_CAUSE_NAME).cloned();
509 let occurred_at = *attrs
510 .get(FAILURE_OCCURRED_AT)
511 .ok_or_else(|| AttrsViewError::missing("failure_occurred_at"))?;
512 let is_propagated = *attrs.get(FAILURE_IS_PROPAGATED).unwrap_or(&false);
514 Some(FailureAttrs {
515 error_message,
516 root_cause_actor,
517 root_cause_name,
518 occurred_at,
519 is_propagated,
520 })
521 } else {
522 None
523 };
524
525 if status == "failed" && failure.is_none() {
527 return Err(AttrsViewError::invariant(
528 "IA-4",
529 "status is 'failed' but no failure_* attrs present".to_string(),
530 ));
531 }
532 if status != "failed" && failure.is_some() {
533 return Err(AttrsViewError::invariant(
534 "IA-4",
535 format!("status is '{status}' but failure_* attrs are present"),
536 ));
537 }
538
539 Ok(Self {
540 status,
541 status_reason,
542 actor_type,
543 messages_processed,
544 created_at,
545 last_handler,
546 total_processing_time_us,
547 flight_recorder,
548 is_system,
549 failure,
550 })
551 }
552
553 pub fn to_attrs(&self) -> Attrs {
555 let mut attrs = Attrs::new();
556 attrs.set(STATUS, self.status.clone());
557 if let Some(reason) = &self.status_reason {
558 attrs.set(STATUS_REASON, reason.clone());
559 }
560 attrs.set(ACTOR_TYPE, self.actor_type.clone());
561 attrs.set(MESSAGES_PROCESSED, self.messages_processed);
562 if let Some(t) = self.created_at {
563 attrs.set(CREATED_AT, t);
564 }
565 if let Some(handler) = &self.last_handler {
566 attrs.set(LAST_HANDLER, handler.clone());
567 }
568 attrs.set(TOTAL_PROCESSING_TIME_US, self.total_processing_time_us);
569 if let Some(fr) = &self.flight_recorder {
570 attrs.set(FLIGHT_RECORDER, fr.clone());
571 }
572 attrs.set(IS_SYSTEM, self.is_system);
573 if let Some(fi) = &self.failure {
574 attrs.set(FAILURE_ERROR_MESSAGE, fi.error_message.clone());
575 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, fi.root_cause_actor.clone());
576 if let Some(name) = &fi.root_cause_name {
577 attrs.set(FAILURE_ROOT_CAUSE_NAME, name.clone());
578 }
579 attrs.set(FAILURE_OCCURRED_AT, fi.occurred_at);
580 attrs.set(FAILURE_IS_PROPAGATED, fi.is_propagated);
581 }
582 attrs
583 }
584}
585
586#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
594pub struct IntrospectResult {
595 pub identity: IntrospectRef,
597 pub attrs: String,
599 pub children: Vec<IntrospectRef>,
601 pub parent: Option<IntrospectRef>,
603 pub as_of: SystemTime,
605}
606wirevalue::register_type!(IntrospectResult);
607
608#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Named)]
620pub enum IntrospectView {
621 Entity,
624 Actor,
627}
628wirevalue::register_type!(IntrospectView);
629
630#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
638pub enum IntrospectMessage {
639 Query {
641 view: IntrospectView,
643 reply: OncePortRef<IntrospectResult>,
645 },
646 QueryChild {
648 child_ref: Addr,
650 reply: OncePortRef<IntrospectResult>,
652 },
653}
654wirevalue::register_type!(IntrospectMessage);
655
656#[derive(Debug, Clone, Serialize, Deserialize)]
660pub struct RecordedEvent {
661 pub timestamp: String,
663 #[serde(default)]
665 pub seq: usize,
666 pub level: String,
668 #[serde(default)]
670 pub target: String,
671 pub name: String,
673 pub fields: serde_json::Value,
675}
676
677pub fn format_timestamp(time: SystemTime) -> String {
680 humantime::format_rfc3339_millis(time).to_string()
681}
682
683struct FailureSnapshot {
697 error_message: String,
698 root_cause_actor: ActorAddr,
699 root_cause_name: Option<String>,
700 occurred_at: SystemTime,
701 is_propagated: bool,
702}
703
704struct ActorSnapshot {
708 status_str: String,
709 is_system: bool,
710 last_handler: Option<String>,
711 flight_recorder: Option<String>,
712 failure: Option<FailureSnapshot>,
713}
714
715fn build_actor_attrs(cell: &crate::InstanceCell, snap: &ActorSnapshot) -> String {
716 let mut attrs = hyperactor_config::Attrs::new();
722
723 if let Some(reason) = snap.status_str.strip_prefix("stopped:") {
725 attrs.set(STATUS, "stopped".to_string());
726 attrs.set(STATUS_REASON, reason.trim().to_string());
727 } else if let Some(reason) = snap.status_str.strip_prefix("failed:") {
728 attrs.set(STATUS, "failed".to_string());
729 attrs.set(STATUS_REASON, reason.trim().to_string());
730 } else {
731 attrs.set(STATUS, snap.status_str.clone());
732 }
735
736 attrs.set(ACTOR_TYPE, cell.actor_type_name().to_string());
737 attrs.set(MESSAGES_PROCESSED, cell.num_processed_messages());
738 attrs.set(CREATED_AT, cell.created_at());
739 attrs.set(TOTAL_PROCESSING_TIME_US, cell.total_processing_time_us());
740 attrs.set(IS_SYSTEM, snap.is_system);
741
742 if let Some(handler) = &snap.last_handler {
743 attrs.set(LAST_HANDLER, handler.clone());
744 }
745 if let Some(fr) = &snap.flight_recorder {
746 attrs.set(FLIGHT_RECORDER, fr.clone());
747 }
748
749 if let Some(fi) = &snap.failure {
751 attrs.set(FAILURE_ERROR_MESSAGE, fi.error_message.clone());
752 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, fi.root_cause_actor.clone());
753 if let Some(name) = &fi.root_cause_name {
754 attrs.set(FAILURE_ROOT_CAUSE_NAME, name.clone());
755 }
756 attrs.set(FAILURE_OCCURRED_AT, fi.occurred_at);
757 attrs.set(FAILURE_IS_PROPAGATED, fi.is_propagated);
758 }
759 serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string())
763}
764
765pub fn live_actor_payload(cell: &InstanceCell) -> IntrospectResult {
771 let actor_id = cell.actor_addr();
772 let status = cell.status().borrow().clone();
773 let last_handler = cell.last_message_handler();
774
775 let children: Vec<IntrospectRef> = cell
776 .child_actor_ids()
777 .into_iter()
778 .map(IntrospectRef::Actor)
779 .collect();
780
781 let events = cell.recording().tail();
782 let flight_recorder_events: Vec<RecordedEvent> = events
783 .into_iter()
784 .map(|event| RecordedEvent {
785 timestamp: format_timestamp(event.time),
786 seq: event.seq,
787 level: event.metadata.level().to_string(),
788 target: event.metadata.target().to_string(),
789 name: event.metadata.name().to_string(),
790 fields: event.json_value(),
791 })
792 .collect();
793
794 let flight_recorder = if flight_recorder_events.is_empty() {
795 None
796 } else {
797 serde_json::to_string(&flight_recorder_events).ok()
798 };
799
800 let supervisor = cell
801 .parent()
802 .map(|p| IntrospectRef::Actor(p.actor_addr().clone()));
803
804 let failure = if status.is_failed() {
807 cell.supervision_event().and_then(|event| {
808 let root = event.actually_failing_actor()?;
809 Some(FailureSnapshot {
810 error_message: event.actor_status.to_string(),
811 root_cause_actor: root.actor_id.clone(),
812 root_cause_name: root.display_name.clone(),
813 occurred_at: event.occurred_at,
814 is_propagated: root.actor_id != actor_id.clone(),
815 })
816 })
817 } else {
818 None
819 };
820
821 let snap = ActorSnapshot {
822 status_str: status.to_string(),
823 is_system: cell.is_system(),
824 last_handler: last_handler.map(|info| info.to_string()),
825 flight_recorder,
826 failure,
827 };
828
829 let attrs = build_actor_attrs(cell, &snap);
830
831 IntrospectResult {
832 identity: IntrospectRef::Actor(actor_id.clone()),
833 attrs,
834 children,
835 parent: supervisor,
836 as_of: SystemTime::now(),
837 }
838}
839
840pub(crate) async fn serve_introspect(
850 cell: InstanceCell,
851 mut receiver: crate::mailbox::PortReceiver<IntrospectMessage>,
852) {
853 use crate::actor::ActorStatus;
854 use crate::mailbox::PortSender as _;
855
856 let mut status = cell.status().clone();
862
863 loop {
864 let msg = tokio::select! {
865 msg = receiver.recv() => {
866 match msg {
867 Ok(msg) => msg,
868 Err(_) => {
869 if cell.status().borrow().is_terminal() {
873 let snapshot = live_actor_payload(&cell);
874 cell.store_terminated_snapshot(snapshot);
875 }
876 break;
877 }
878 }
879 }
880 status_ref = status.wait_for(ActorStatus::is_terminal) => {
881 drop(status_ref);
890 let snapshot = live_actor_payload(&cell);
891 cell.store_terminated_snapshot(snapshot);
892 break;
893 }
894 };
895
896 let result = match msg {
897 IntrospectMessage::Query { view, reply } => {
898 let payload = match view {
899 IntrospectView::Entity => match cell.published_attrs() {
900 Some(published) => {
901 let attrs_json =
902 serde_json::to_string(&published).unwrap_or_else(|_| "{}".into());
903 let children: Vec<IntrospectRef> =
904 published.get(CHILDREN).cloned().unwrap_or_default();
905 IntrospectResult {
906 identity: IntrospectRef::Actor(cell.actor_addr().clone()),
907 attrs: attrs_json,
908 children,
909 parent: cell
910 .parent()
911 .map(|p| IntrospectRef::Actor(p.actor_addr().clone())),
912 as_of: SystemTime::now(),
913 }
914 }
915 None => live_actor_payload(&cell),
916 },
917 IntrospectView::Actor => live_actor_payload(&cell),
918 };
919 cell.proc().serialize_and_send_once(
920 reply,
921 payload,
922 crate::mailbox::monitored_return_handle(),
923 )
924 }
925 IntrospectMessage::QueryChild { child_ref, reply } => {
926 let child_ref_: Addr = child_ref.clone();
927 let payload = cell.query_child(&child_ref_).unwrap_or_else(|| {
928 let mut error_attrs = hyperactor_config::Attrs::new();
929 error_attrs.set(ERROR_CODE, "not_found".to_string());
930 error_attrs.set(
931 ERROR_MESSAGE,
932 format!("child {} not found (no callback registered)", child_ref),
933 );
934 let identity = match &child_ref {
936 Addr::Proc(id) => IntrospectRef::Proc(id.clone()),
937 Addr::Actor(id) => IntrospectRef::Actor(id.clone()),
938 Addr::Port(id) => IntrospectRef::Actor(id.actor_addr()),
939 };
940 IntrospectResult {
941 identity,
942 attrs: serde_json::to_string(&error_attrs)
943 .unwrap_or_else(|_| "{}".to_string()),
944 children: Vec::new(),
945 parent: None,
946 as_of: SystemTime::now(),
947 }
948 });
949 cell.proc().serialize_and_send_once(
950 reply,
951 payload,
952 crate::mailbox::monitored_return_handle(),
953 )
954 }
955 };
956 if let Err(e) = result {
957 tracing::debug!("introspect reply failed: {e}");
958 }
959 }
960 tracing::debug!(
961 actor_id = %cell.actor_addr(),
962 "introspect task exiting"
963 );
964}
965
966#[cfg(test)]
967mod tests {
968 use super::*;
969 use crate::ActorAddr;
970 use crate::ProcAddr;
971 use crate::actor::ActorErrorKind;
972 use crate::actor::ActorStatus;
973 use crate::channel::ChannelAddr;
974 use crate::supervision::ActorSupervisionEvent;
975
976 #[test]
978 fn test_introspect_keys_are_tagged() {
979 let cases = vec![
980 ("status", STATUS.attrs()),
981 ("status_reason", STATUS_REASON.attrs()),
982 ("actor_type", ACTOR_TYPE.attrs()),
983 ("messages_processed", MESSAGES_PROCESSED.attrs()),
984 ("created_at", CREATED_AT.attrs()),
985 ("last_handler", LAST_HANDLER.attrs()),
986 ("total_processing_time_us", TOTAL_PROCESSING_TIME_US.attrs()),
987 ("flight_recorder", FLIGHT_RECORDER.attrs()),
988 ("is_system", IS_SYSTEM.attrs()),
989 ("children", CHILDREN.attrs()),
990 ("error_code", ERROR_CODE.attrs()),
991 ("error_message", ERROR_MESSAGE.attrs()),
992 ("failure_error_message", FAILURE_ERROR_MESSAGE.attrs()),
993 ("failure_root_cause_actor", FAILURE_ROOT_CAUSE_ACTOR.attrs()),
994 ("failure_root_cause_name", FAILURE_ROOT_CAUSE_NAME.attrs()),
995 ("failure_occurred_at", FAILURE_OCCURRED_AT.attrs()),
996 ("failure_is_propagated", FAILURE_IS_PROPAGATED.attrs()),
997 ];
998
999 for (expected_name, meta) in &cases {
1000 let introspect = meta
1002 .get(INTROSPECT)
1003 .unwrap_or_else(|| panic!("{expected_name}: missing INTROSPECT meta-attr"));
1004 assert_eq!(
1005 introspect.name, *expected_name,
1006 "short name mismatch for {expected_name}"
1007 );
1008 assert!(
1009 !introspect.desc.is_empty(),
1010 "{expected_name}: desc should not be empty"
1011 );
1012 }
1013
1014 use hyperactor_config::attrs::AttrKeyInfo;
1017 let registry_count = inventory::iter::<AttrKeyInfo>()
1018 .filter(|info| {
1019 info.name.starts_with("hyperactor::introspect::")
1020 && info.meta.get(INTROSPECT).is_some()
1021 })
1022 .count();
1023 assert_eq!(
1024 cases.len(),
1025 registry_count,
1026 "test must cover all INTROSPECT-tagged keys in this module"
1027 );
1028 }
1029
1030 #[test]
1032 fn test_introspect_short_names_are_globally_unique() {
1033 use hyperactor_config::attrs::AttrKeyInfo;
1034
1035 let mut seen = std::collections::HashMap::new();
1036 for info in inventory::iter::<AttrKeyInfo>() {
1037 let Some(introspect) = info.meta.get(INTROSPECT) else {
1038 continue;
1039 };
1040 assert!(
1043 !introspect.name.is_empty(),
1044 "INTROSPECT key {:?} has empty name",
1045 info.name
1046 );
1047 assert!(
1048 !introspect.desc.is_empty(),
1049 "INTROSPECT key {:?} has empty desc",
1050 info.name
1051 );
1052 if let Some(prev_fq) = seen.insert(introspect.name.clone(), info.name) {
1053 panic!(
1054 "IK-2 violation: duplicate short name {:?} declared by both {:?} and {:?}",
1055 introspect.name, prev_fq, info.name
1056 );
1057 }
1058 }
1059 }
1060
1061 fn running_actor_attrs() -> Attrs {
1066 let mut attrs = Attrs::new();
1067 attrs.set(STATUS, "running".to_string());
1068 attrs.set(ACTOR_TYPE, "MyActor".to_string());
1069 attrs.set(MESSAGES_PROCESSED, 42u64);
1070 attrs.set(CREATED_AT, SystemTime::UNIX_EPOCH);
1071 attrs.set(IS_SYSTEM, false);
1072 attrs
1073 }
1074
1075 fn test_actor_id(proc_name: &str, actor_name: &str) -> ActorAddr {
1076 ProcAddr::singleton(ChannelAddr::Local(0), proc_name).actor_addr(actor_name)
1077 }
1078
1079 fn failed_actor_attrs() -> Attrs {
1080 let mut attrs = running_actor_attrs();
1081 attrs.set(STATUS, "failed".to_string());
1082 attrs.set(STATUS_REASON, "something broke".to_string());
1083 attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
1084 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, test_actor_id("proc", "other"));
1085 attrs.set(FAILURE_ROOT_CAUSE_NAME, "OtherActor".to_string());
1086 attrs.set(FAILURE_OCCURRED_AT, SystemTime::UNIX_EPOCH);
1087 attrs.set(FAILURE_IS_PROPAGATED, true);
1088 attrs
1089 }
1090
1091 #[test]
1093 fn test_actor_view_round_trip_running() {
1094 let view = ActorAttrsView::from_attrs(&running_actor_attrs()).unwrap();
1095 assert_eq!(view.status, "running");
1096 assert_eq!(view.actor_type, "MyActor");
1097 assert_eq!(view.messages_processed, 42);
1098 assert!(view.failure.is_none());
1099
1100 let round_tripped = ActorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1101 assert_eq!(round_tripped, view);
1102 }
1103
1104 #[test]
1106 fn test_actor_view_round_trip_failed() {
1107 let view = ActorAttrsView::from_attrs(&failed_actor_attrs()).unwrap();
1108 assert_eq!(view.status, "failed");
1109 let fi = view.failure.as_ref().unwrap();
1110 assert_eq!(fi.error_message, "boom");
1111 assert!(fi.is_propagated);
1112
1113 let round_tripped = ActorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1114 assert_eq!(round_tripped, view);
1115 }
1116
1117 #[test]
1119 fn test_actor_view_missing_status() {
1120 let mut attrs = Attrs::new();
1121 attrs.set(ACTOR_TYPE, "X".to_string());
1122 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1123 assert_eq!(err, AttrsViewError::MissingKey { key: "status" });
1124 }
1125
1126 #[test]
1128 fn test_actor_view_missing_actor_type() {
1129 let mut attrs = Attrs::new();
1130 attrs.set(STATUS, "running".to_string());
1131 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1132 assert_eq!(err, AttrsViewError::MissingKey { key: "actor_type" });
1133 }
1134
1135 #[test]
1136 fn test_actor_view_ia3_rejects_reason_on_running() {
1137 let mut attrs = running_actor_attrs();
1138 attrs.set(STATUS_REASON, "should not be here".to_string());
1139 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1140 assert!(matches!(
1141 err,
1142 AttrsViewError::InvariantViolation { label: "IA-3", .. }
1143 ));
1144 }
1145
1146 #[test]
1147 fn test_actor_view_ia3_allows_terminal_without_reason() {
1148 let mut attrs = running_actor_attrs();
1149 attrs.set(STATUS, "stopped".to_string());
1150 let view = ActorAttrsView::from_attrs(&attrs).unwrap();
1152 assert_eq!(view.status, "stopped");
1153 assert!(view.status_reason.is_none());
1154 }
1155
1156 #[test]
1157 fn test_actor_view_ia4_rejects_failed_without_failure_attrs() {
1158 let mut attrs = running_actor_attrs();
1159 attrs.set(STATUS, "failed".to_string());
1160 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1162 assert!(matches!(
1163 err,
1164 AttrsViewError::InvariantViolation { label: "IA-4", .. }
1165 ));
1166 }
1167
1168 #[test]
1169 fn test_actor_view_ia4_rejects_failure_attrs_on_running() {
1170 let mut attrs = running_actor_attrs();
1171 attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
1172 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, test_actor_id("proc", "x"));
1173 attrs.set(FAILURE_OCCURRED_AT, SystemTime::UNIX_EPOCH);
1174 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1175 assert!(matches!(
1176 err,
1177 AttrsViewError::InvariantViolation { label: "IA-4", .. }
1178 ));
1179 }
1180
1181 #[test]
1183 fn test_actor_view_partial_failure_attrs_rejected() {
1184 let mut attrs = running_actor_attrs();
1185 attrs.set(STATUS, "failed".to_string());
1186 attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
1188 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1189 assert_eq!(
1190 err,
1191 AttrsViewError::MissingKey {
1192 key: "failure_root_cause_actor"
1193 }
1194 );
1195 }
1196
1197 #[test]
1211 fn test_fi7_fi8_propagated_stopped_child() {
1212 let proc_id = ProcAddr::singleton(ChannelAddr::Local(0), "test_proc");
1213 let child_id = proc_id.actor_addr("proc_agent");
1214 let parent_id = proc_id.actor_addr("mesh_actor");
1215
1216 let child_event = ActorSupervisionEvent::new(
1217 child_id.clone(),
1218 Some("proc_agent".into()),
1219 ActorStatus::Stopped("host died".into()),
1220 None,
1221 );
1222 let parent_event = ActorSupervisionEvent::new(
1223 parent_id.clone(),
1224 Some("mesh_actor".into()),
1225 ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1226 child_event,
1227 ))),
1228 None,
1229 );
1230
1231 let root = parent_event
1234 .actually_failing_actor()
1235 .expect("parent_event is a failure");
1236 let snap = FailureSnapshot {
1237 error_message: parent_event.actor_status.to_string(),
1238 root_cause_actor: root.actor_id.clone(),
1239 root_cause_name: root.display_name.clone(),
1240 occurred_at: parent_event.occurred_at,
1241 is_propagated: root.actor_id != parent_id,
1242 };
1243
1244 assert_eq!(snap.root_cause_actor, child_id);
1246 assert!(snap.is_propagated);
1248 assert_eq!(snap.root_cause_name.as_deref(), Some("proc_agent"));
1250
1251 let mut attrs = failed_actor_attrs();
1253 attrs.set(FAILURE_ERROR_MESSAGE, snap.error_message);
1254 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, snap.root_cause_actor.clone());
1255 if let Some(name) = &snap.root_cause_name {
1256 attrs.set(FAILURE_ROOT_CAUSE_NAME, name.clone());
1257 }
1258 attrs.set(FAILURE_OCCURRED_AT, snap.occurred_at);
1259 attrs.set(FAILURE_IS_PROPAGATED, snap.is_propagated);
1260
1261 let view = ActorAttrsView::from_attrs(&attrs).unwrap();
1262 assert_eq!(view.status, "failed");
1263 let fi = view.failure.as_ref().expect("failure_info must be present");
1264 assert_eq!(fi.root_cause_actor, child_id);
1266 assert!(fi.is_propagated);
1268 assert_eq!(fi.root_cause_name.as_deref(), Some("proc_agent"));
1270 }
1271}