1use std::fmt;
123use std::str::FromStr;
124use std::time::SystemTime;
125
126use hyperactor_config::Attrs;
127use hyperactor_config::INTROSPECT;
128use hyperactor_config::IntrospectAttr;
129use hyperactor_config::declare_attrs;
130use serde::Deserialize;
131use serde::Serialize;
132use typeuri::Named;
133
134use crate::InstanceCell;
135use crate::reference;
136
137#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
145pub enum IntrospectRef {
146 Proc(reference::ProcId),
148 Actor(reference::ActorId),
150}
151hyperactor_config::impl_attrvalue!(IntrospectRef);
152
153impl fmt::Display for IntrospectRef {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 match self {
156 Self::Proc(id) => fmt::Display::fmt(id, f),
157 Self::Actor(id) => fmt::Display::fmt(id, f),
158 }
159 }
160}
161
162impl FromStr for IntrospectRef {
163 type Err = reference::ReferenceParsingError;
164
165 fn from_str(s: &str) -> Result<Self, Self::Err> {
166 let r: reference::Reference = s.parse()?;
167 match r {
168 reference::Reference::Proc(id) => Ok(Self::Proc(id)),
169 reference::Reference::Actor(id) => Ok(Self::Actor(id)),
170 reference::Reference::Port(_) => Err(reference::ReferenceParsingError::WrongType(
171 "port references are not valid introspection references".to_string(),
172 )),
173 }
174 }
175}
176
177impl From<reference::ProcId> for IntrospectRef {
178 fn from(id: reference::ProcId) -> Self {
179 Self::Proc(id)
180 }
181}
182
183impl From<reference::ActorId> for IntrospectRef {
184 fn from(id: reference::ActorId) -> Self {
185 Self::Actor(id)
186 }
187}
188
189declare_attrs! {
218 @meta(INTROSPECT = IntrospectAttr {
225 name: "status".into(),
226 desc: "Actor lifecycle status: running, stopped, failed".into(),
227 })
228 pub attr STATUS: String;
229
230 @meta(INTROSPECT = IntrospectAttr {
232 name: "status_reason".into(),
233 desc: "Reason for stop/failure (absent when running)".into(),
234 })
235 pub attr STATUS_REASON: String;
236
237 @meta(INTROSPECT = IntrospectAttr {
239 name: "actor_type".into(),
240 desc: "Fully-qualified actor type name".into(),
241 })
242 pub attr ACTOR_TYPE: String;
243
244 @meta(INTROSPECT = IntrospectAttr {
246 name: "messages_processed".into(),
247 desc: "Number of messages processed by this actor".into(),
248 })
249 pub attr MESSAGES_PROCESSED: u64 = 0;
250
251 @meta(INTROSPECT = IntrospectAttr {
253 name: "created_at".into(),
254 desc: "Timestamp when this actor was created".into(),
255 })
256 pub attr CREATED_AT: SystemTime;
257
258 @meta(INTROSPECT = IntrospectAttr {
260 name: "last_handler".into(),
261 desc: "Name of the last message handler invoked".into(),
262 })
263 pub attr LAST_HANDLER: String;
264
265 @meta(INTROSPECT = IntrospectAttr {
267 name: "total_processing_time_us".into(),
268 desc: "Total CPU time in message handlers (microseconds)".into(),
269 })
270 pub attr TOTAL_PROCESSING_TIME_US: u64 = 0;
271
272 @meta(INTROSPECT = IntrospectAttr {
274 name: "flight_recorder".into(),
275 desc: "Flight recorder JSON (recent trace events)".into(),
276 })
277 pub attr FLIGHT_RECORDER: String;
278
279 @meta(INTROSPECT = IntrospectAttr {
281 name: "is_system".into(),
282 desc: "Whether this actor is infrastructure/system".into(),
283 })
284 pub attr IS_SYSTEM: bool = false;
285
286 @meta(INTROSPECT = IntrospectAttr {
290 name: "children".into(),
291 desc: "Child references for tree navigation".into(),
292 })
293 pub attr CHILDREN: Vec<IntrospectRef>;
294
295 @meta(INTROSPECT = IntrospectAttr {
297 name: "error_code".into(),
298 desc: "Machine-readable error code (e.g. not_found)".into(),
299 })
300 pub attr ERROR_CODE: String;
301
302 @meta(INTROSPECT = IntrospectAttr {
304 name: "error_message".into(),
305 desc: "Human-readable error message".into(),
306 })
307 pub attr ERROR_MESSAGE: String;
308
309 @meta(INTROSPECT = IntrospectAttr {
323 name: "failure_error_message".into(),
324 desc: "Failure error message".into(),
325 })
326 pub attr FAILURE_ERROR_MESSAGE: String;
327
328 @meta(INTROSPECT = IntrospectAttr {
330 name: "failure_root_cause_actor".into(),
331 desc: "Actor that caused the failure (root cause)".into(),
332 })
333 pub attr FAILURE_ROOT_CAUSE_ACTOR: reference::ActorId;
334
335 @meta(INTROSPECT = IntrospectAttr {
337 name: "failure_root_cause_name".into(),
338 desc: "Name of root cause actor".into(),
339 })
340 pub attr FAILURE_ROOT_CAUSE_NAME: String;
341
342 @meta(INTROSPECT = IntrospectAttr {
344 name: "failure_occurred_at".into(),
345 desc: "Timestamp when failure occurred".into(),
346 })
347 pub attr FAILURE_OCCURRED_AT: SystemTime;
348
349 @meta(INTROSPECT = IntrospectAttr {
351 name: "failure_is_propagated".into(),
352 desc: "Whether the failure was propagated from a child".into(),
353 })
354 pub attr FAILURE_IS_PROPAGATED: bool = false;
355}
356
357#[derive(Debug, Clone, PartialEq)]
361pub enum AttrsViewError {
362 MissingKey {
364 key: &'static str,
366 },
367 InvariantViolation {
369 label: &'static str,
371 detail: String,
373 },
374}
375
376impl fmt::Display for AttrsViewError {
377 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
378 match self {
379 Self::MissingKey { key } => write!(f, "missing required key: {key}"),
380 Self::InvariantViolation { label, detail } => {
381 write!(f, "invariant {label} violated: {detail}")
382 }
383 }
384 }
385}
386
387impl std::error::Error for AttrsViewError {}
388
389impl AttrsViewError {
390 pub fn missing(key: &'static str) -> Self {
392 Self::MissingKey { key }
393 }
394
395 pub fn invariant(label: &'static str, detail: String) -> Self {
397 Self::InvariantViolation { label, detail }
398 }
399}
400
401#[derive(Debug, Clone, PartialEq)]
403pub struct FailureAttrs {
404 pub error_message: String,
406 pub root_cause_actor: reference::ActorId,
408 pub root_cause_name: Option<String>,
410 pub occurred_at: SystemTime,
412 pub is_propagated: bool,
414}
415
416#[derive(Debug, Clone, PartialEq)]
418pub struct ActorAttrsView {
419 pub status: String,
421 pub status_reason: Option<String>,
423 pub actor_type: String,
425 pub messages_processed: u64,
427 pub created_at: Option<SystemTime>,
429 pub last_handler: Option<String>,
431 pub total_processing_time_us: u64,
433 pub flight_recorder: Option<String>,
435 pub is_system: bool,
437 pub failure: Option<FailureAttrs>,
439}
440
441impl ActorAttrsView {
442 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
448 let status = attrs
449 .get(STATUS)
450 .ok_or_else(|| AttrsViewError::missing("status"))?
451 .clone();
452 let status_reason = attrs.get(STATUS_REASON).cloned();
453 let actor_type = attrs
454 .get(ACTOR_TYPE)
455 .ok_or_else(|| AttrsViewError::missing("actor_type"))?
456 .clone();
457 let messages_processed = *attrs.get(MESSAGES_PROCESSED).unwrap_or(&0);
458 let created_at = attrs.get(CREATED_AT).copied();
459 let last_handler = attrs.get(LAST_HANDLER).cloned();
460 let total_processing_time_us = *attrs.get(TOTAL_PROCESSING_TIME_US).unwrap_or(&0);
461 let flight_recorder = attrs.get(FLIGHT_RECORDER).cloned();
462 let is_system = *attrs.get(IS_SYSTEM).unwrap_or(&false);
463
464 let is_terminal = status == "stopped" || status == "failed";
468 if status_reason.is_some() && !is_terminal {
469 return Err(AttrsViewError::invariant(
470 "IA-3",
471 format!(
472 "status_reason present but status is '{status}' (expected stopped or failed)"
473 ),
474 ));
475 }
476
477 let has_any_failure = attrs.get(FAILURE_ERROR_MESSAGE).is_some()
484 || attrs.get(FAILURE_ROOT_CAUSE_ACTOR).is_some()
485 || attrs.get(FAILURE_OCCURRED_AT).is_some();
486
487 let failure = if has_any_failure {
488 let error_message = attrs
489 .get(FAILURE_ERROR_MESSAGE)
490 .ok_or_else(|| AttrsViewError::missing("failure_error_message"))?
491 .clone();
492 let root_cause_actor = attrs
493 .get(FAILURE_ROOT_CAUSE_ACTOR)
494 .ok_or_else(|| AttrsViewError::missing("failure_root_cause_actor"))?
495 .clone();
496 let root_cause_name = attrs.get(FAILURE_ROOT_CAUSE_NAME).cloned();
497 let occurred_at = *attrs
498 .get(FAILURE_OCCURRED_AT)
499 .ok_or_else(|| AttrsViewError::missing("failure_occurred_at"))?;
500 let is_propagated = *attrs.get(FAILURE_IS_PROPAGATED).unwrap_or(&false);
502 Some(FailureAttrs {
503 error_message,
504 root_cause_actor,
505 root_cause_name,
506 occurred_at,
507 is_propagated,
508 })
509 } else {
510 None
511 };
512
513 if status == "failed" && failure.is_none() {
515 return Err(AttrsViewError::invariant(
516 "IA-4",
517 "status is 'failed' but no failure_* attrs present".to_string(),
518 ));
519 }
520 if status != "failed" && failure.is_some() {
521 return Err(AttrsViewError::invariant(
522 "IA-4",
523 format!("status is '{status}' but failure_* attrs are present"),
524 ));
525 }
526
527 Ok(Self {
528 status,
529 status_reason,
530 actor_type,
531 messages_processed,
532 created_at,
533 last_handler,
534 total_processing_time_us,
535 flight_recorder,
536 is_system,
537 failure,
538 })
539 }
540
541 pub fn to_attrs(&self) -> Attrs {
543 let mut attrs = Attrs::new();
544 attrs.set(STATUS, self.status.clone());
545 if let Some(reason) = &self.status_reason {
546 attrs.set(STATUS_REASON, reason.clone());
547 }
548 attrs.set(ACTOR_TYPE, self.actor_type.clone());
549 attrs.set(MESSAGES_PROCESSED, self.messages_processed);
550 if let Some(t) = self.created_at {
551 attrs.set(CREATED_AT, t);
552 }
553 if let Some(handler) = &self.last_handler {
554 attrs.set(LAST_HANDLER, handler.clone());
555 }
556 attrs.set(TOTAL_PROCESSING_TIME_US, self.total_processing_time_us);
557 if let Some(fr) = &self.flight_recorder {
558 attrs.set(FLIGHT_RECORDER, fr.clone());
559 }
560 attrs.set(IS_SYSTEM, self.is_system);
561 if let Some(fi) = &self.failure {
562 attrs.set(FAILURE_ERROR_MESSAGE, fi.error_message.clone());
563 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, fi.root_cause_actor.clone());
564 if let Some(name) = &fi.root_cause_name {
565 attrs.set(FAILURE_ROOT_CAUSE_NAME, name.clone());
566 }
567 attrs.set(FAILURE_OCCURRED_AT, fi.occurred_at);
568 attrs.set(FAILURE_IS_PROPAGATED, fi.is_propagated);
569 }
570 attrs
571 }
572}
573
574#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
582pub struct IntrospectResult {
583 pub identity: IntrospectRef,
585 pub attrs: String,
587 pub children: Vec<IntrospectRef>,
589 pub parent: Option<IntrospectRef>,
591 pub as_of: SystemTime,
593}
594wirevalue::register_type!(IntrospectResult);
595
596#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Named)]
608pub enum IntrospectView {
609 Entity,
612 Actor,
615}
616wirevalue::register_type!(IntrospectView);
617
618#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
626pub enum IntrospectMessage {
627 Query {
629 view: IntrospectView,
631 reply: reference::OncePortRef<IntrospectResult>,
633 },
634 QueryChild {
636 child_ref: reference::Reference,
638 reply: reference::OncePortRef<IntrospectResult>,
640 },
641}
642wirevalue::register_type!(IntrospectMessage);
643
644#[derive(Debug, Clone, Serialize, Deserialize)]
648pub struct RecordedEvent {
649 pub timestamp: String,
651 #[serde(default)]
653 pub seq: usize,
654 pub level: String,
656 #[serde(default)]
658 pub target: String,
659 pub name: String,
661 pub fields: serde_json::Value,
663}
664
665pub fn format_timestamp(time: SystemTime) -> String {
668 humantime::format_rfc3339_millis(time).to_string()
669}
670
671struct FailureSnapshot {
685 error_message: String,
686 root_cause_actor: reference::ActorId,
687 root_cause_name: Option<String>,
688 occurred_at: SystemTime,
689 is_propagated: bool,
690}
691
692struct ActorSnapshot {
696 status_str: String,
697 is_system: bool,
698 last_handler: Option<String>,
699 flight_recorder: Option<String>,
700 failure: Option<FailureSnapshot>,
701}
702
703fn build_actor_attrs(cell: &crate::InstanceCell, snap: &ActorSnapshot) -> String {
704 let mut attrs = hyperactor_config::Attrs::new();
710
711 if let Some(reason) = snap.status_str.strip_prefix("stopped:") {
713 attrs.set(STATUS, "stopped".to_string());
714 attrs.set(STATUS_REASON, reason.trim().to_string());
715 } else if let Some(reason) = snap.status_str.strip_prefix("failed:") {
716 attrs.set(STATUS, "failed".to_string());
717 attrs.set(STATUS_REASON, reason.trim().to_string());
718 } else {
719 attrs.set(STATUS, snap.status_str.clone());
720 }
723
724 attrs.set(ACTOR_TYPE, cell.actor_type_name().to_string());
725 attrs.set(MESSAGES_PROCESSED, cell.num_processed_messages());
726 attrs.set(CREATED_AT, cell.created_at());
727 attrs.set(TOTAL_PROCESSING_TIME_US, cell.total_processing_time_us());
728 attrs.set(IS_SYSTEM, snap.is_system);
729
730 if let Some(handler) = &snap.last_handler {
731 attrs.set(LAST_HANDLER, handler.clone());
732 }
733 if let Some(fr) = &snap.flight_recorder {
734 attrs.set(FLIGHT_RECORDER, fr.clone());
735 }
736
737 if let Some(fi) = &snap.failure {
739 attrs.set(FAILURE_ERROR_MESSAGE, fi.error_message.clone());
740 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, fi.root_cause_actor.clone());
741 if let Some(name) = &fi.root_cause_name {
742 attrs.set(FAILURE_ROOT_CAUSE_NAME, name.clone());
743 }
744 attrs.set(FAILURE_OCCURRED_AT, fi.occurred_at);
745 attrs.set(FAILURE_IS_PROPAGATED, fi.is_propagated);
746 }
747 serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string())
751}
752
753pub fn live_actor_payload(cell: &InstanceCell) -> IntrospectResult {
759 let actor_id = cell.actor_id();
760 let status = cell.status().borrow().clone();
761 let last_handler = cell.last_message_handler();
762
763 let children: Vec<IntrospectRef> = cell
764 .child_actor_ids()
765 .into_iter()
766 .map(IntrospectRef::Actor)
767 .collect();
768
769 let events = cell.recording().tail();
770 let flight_recorder_events: Vec<RecordedEvent> = events
771 .into_iter()
772 .map(|event| RecordedEvent {
773 timestamp: format_timestamp(event.time),
774 seq: event.seq,
775 level: event.metadata.level().to_string(),
776 target: event.metadata.target().to_string(),
777 name: event.metadata.name().to_string(),
778 fields: event.json_value(),
779 })
780 .collect();
781
782 let flight_recorder = if flight_recorder_events.is_empty() {
783 None
784 } else {
785 serde_json::to_string(&flight_recorder_events).ok()
786 };
787
788 let supervisor = cell
789 .parent()
790 .map(|p| IntrospectRef::Actor(p.actor_id().clone()));
791
792 let failure = if status.is_failed() {
795 cell.supervision_event().and_then(|event| {
796 let root = event.actually_failing_actor()?;
797 Some(FailureSnapshot {
798 error_message: event.actor_status.to_string(),
799 root_cause_actor: root.actor_id.clone(),
800 root_cause_name: root.display_name.clone(),
801 occurred_at: event.occurred_at,
802 is_propagated: root.actor_id != *actor_id,
803 })
804 })
805 } else {
806 None
807 };
808
809 let snap = ActorSnapshot {
810 status_str: status.to_string(),
811 is_system: cell.is_system(),
812 last_handler: last_handler.map(|info| info.to_string()),
813 flight_recorder,
814 failure,
815 };
816
817 let attrs = build_actor_attrs(cell, &snap);
818
819 IntrospectResult {
820 identity: IntrospectRef::Actor(actor_id.clone()),
821 attrs,
822 children,
823 parent: supervisor,
824 as_of: SystemTime::now(),
825 }
826}
827
828pub(crate) async fn serve_introspect(
838 cell: InstanceCell,
839 mailbox: crate::mailbox::Mailbox,
840 mut receiver: crate::mailbox::PortReceiver<IntrospectMessage>,
841) {
842 use crate::actor::ActorStatus;
843 use crate::mailbox::PortSender as _;
844
845 let mut status = cell.status().clone();
851
852 loop {
853 let msg = tokio::select! {
854 msg = receiver.recv() => {
855 match msg {
856 Ok(msg) => msg,
857 Err(_) => {
858 if cell.status().borrow().is_terminal() {
862 let snapshot = live_actor_payload(&cell);
863 cell.store_terminated_snapshot(snapshot);
864 }
865 break;
866 }
867 }
868 }
869 _ = status.wait_for(ActorStatus::is_terminal) => {
870 let snapshot = live_actor_payload(&cell);
873 cell.store_terminated_snapshot(snapshot);
874 break;
875 }
876 };
877
878 let result = match msg {
879 IntrospectMessage::Query { view, reply } => {
880 let payload = match view {
881 IntrospectView::Entity => match cell.published_attrs() {
882 Some(published) => {
883 let attrs_json =
884 serde_json::to_string(&published).unwrap_or_else(|_| "{}".into());
885 let children: Vec<IntrospectRef> =
886 published.get(CHILDREN).cloned().unwrap_or_default();
887 IntrospectResult {
888 identity: IntrospectRef::Actor(cell.actor_id().clone()),
889 attrs: attrs_json,
890 children,
891 parent: cell
892 .parent()
893 .map(|p| IntrospectRef::Actor(p.actor_id().clone())),
894 as_of: SystemTime::now(),
895 }
896 }
897 None => live_actor_payload(&cell),
898 },
899 IntrospectView::Actor => live_actor_payload(&cell),
900 };
901 mailbox.serialize_and_send_once(
902 reply,
903 payload,
904 crate::mailbox::monitored_return_handle(),
905 )
906 }
907 IntrospectMessage::QueryChild { child_ref, reply } => {
908 let payload = cell.query_child(&child_ref).unwrap_or_else(|| {
909 let mut error_attrs = hyperactor_config::Attrs::new();
910 error_attrs.set(ERROR_CODE, "not_found".to_string());
911 error_attrs.set(
912 ERROR_MESSAGE,
913 format!("child {} not found (no callback registered)", child_ref),
914 );
915 let identity = match &child_ref {
917 reference::Reference::Proc(id) => IntrospectRef::Proc(id.clone()),
918 reference::Reference::Actor(id) => IntrospectRef::Actor(id.clone()),
919 reference::Reference::Port(id) => {
920 IntrospectRef::Actor(id.actor_id().clone())
921 }
922 };
923 IntrospectResult {
924 identity,
925 attrs: serde_json::to_string(&error_attrs)
926 .unwrap_or_else(|_| "{}".to_string()),
927 children: Vec::new(),
928 parent: None,
929 as_of: SystemTime::now(),
930 }
931 });
932 mailbox.serialize_and_send_once(
933 reply,
934 payload,
935 crate::mailbox::monitored_return_handle(),
936 )
937 }
938 };
939 if let Err(e) = result {
940 tracing::debug!("introspect reply failed: {e}");
941 }
942 }
943 tracing::debug!(
944 actor_id = %cell.actor_id(),
945 "introspect task exiting"
946 );
947}
948
949#[cfg(test)]
950mod tests {
951 use super::*;
952 use crate::actor::ActorErrorKind;
953 use crate::actor::ActorStatus;
954 use crate::channel::ChannelAddr;
955 use crate::reference::ProcId;
956 use crate::supervision::ActorSupervisionEvent;
957
958 #[test]
960 fn test_introspect_keys_are_tagged() {
961 let cases = vec![
962 ("status", STATUS.attrs()),
963 ("status_reason", STATUS_REASON.attrs()),
964 ("actor_type", ACTOR_TYPE.attrs()),
965 ("messages_processed", MESSAGES_PROCESSED.attrs()),
966 ("created_at", CREATED_AT.attrs()),
967 ("last_handler", LAST_HANDLER.attrs()),
968 ("total_processing_time_us", TOTAL_PROCESSING_TIME_US.attrs()),
969 ("flight_recorder", FLIGHT_RECORDER.attrs()),
970 ("is_system", IS_SYSTEM.attrs()),
971 ("children", CHILDREN.attrs()),
972 ("error_code", ERROR_CODE.attrs()),
973 ("error_message", ERROR_MESSAGE.attrs()),
974 ("failure_error_message", FAILURE_ERROR_MESSAGE.attrs()),
975 ("failure_root_cause_actor", FAILURE_ROOT_CAUSE_ACTOR.attrs()),
976 ("failure_root_cause_name", FAILURE_ROOT_CAUSE_NAME.attrs()),
977 ("failure_occurred_at", FAILURE_OCCURRED_AT.attrs()),
978 ("failure_is_propagated", FAILURE_IS_PROPAGATED.attrs()),
979 ];
980
981 for (expected_name, meta) in &cases {
982 let introspect = meta
984 .get(INTROSPECT)
985 .unwrap_or_else(|| panic!("{expected_name}: missing INTROSPECT meta-attr"));
986 assert_eq!(
987 introspect.name, *expected_name,
988 "short name mismatch for {expected_name}"
989 );
990 assert!(
991 !introspect.desc.is_empty(),
992 "{expected_name}: desc should not be empty"
993 );
994 }
995
996 use hyperactor_config::attrs::AttrKeyInfo;
999 let registry_count = inventory::iter::<AttrKeyInfo>()
1000 .filter(|info| {
1001 info.name.starts_with("hyperactor::introspect::")
1002 && info.meta.get(INTROSPECT).is_some()
1003 })
1004 .count();
1005 assert_eq!(
1006 cases.len(),
1007 registry_count,
1008 "test must cover all INTROSPECT-tagged keys in this module"
1009 );
1010 }
1011
1012 #[test]
1014 fn test_introspect_short_names_are_globally_unique() {
1015 use hyperactor_config::attrs::AttrKeyInfo;
1016
1017 let mut seen = std::collections::HashMap::new();
1018 for info in inventory::iter::<AttrKeyInfo>() {
1019 let Some(introspect) = info.meta.get(INTROSPECT) else {
1020 continue;
1021 };
1022 assert!(
1025 !introspect.name.is_empty(),
1026 "INTROSPECT key {:?} has empty name",
1027 info.name
1028 );
1029 assert!(
1030 !introspect.desc.is_empty(),
1031 "INTROSPECT key {:?} has empty desc",
1032 info.name
1033 );
1034 if let Some(prev_fq) = seen.insert(introspect.name.clone(), info.name) {
1035 panic!(
1036 "IK-2 violation: duplicate short name {:?} declared by both {:?} and {:?}",
1037 introspect.name, prev_fq, info.name
1038 );
1039 }
1040 }
1041 }
1042
1043 fn running_actor_attrs() -> Attrs {
1048 let mut attrs = Attrs::new();
1049 attrs.set(STATUS, "running".to_string());
1050 attrs.set(ACTOR_TYPE, "MyActor".to_string());
1051 attrs.set(MESSAGES_PROCESSED, 42u64);
1052 attrs.set(CREATED_AT, SystemTime::UNIX_EPOCH);
1053 attrs.set(IS_SYSTEM, false);
1054 attrs
1055 }
1056
1057 fn test_actor_id(proc_name: &str, actor_name: &str, pid: usize) -> crate::reference::ActorId {
1058 ProcId::with_name(ChannelAddr::Local(0), proc_name).actor_id(actor_name, pid)
1059 }
1060
1061 fn failed_actor_attrs() -> Attrs {
1062 let mut attrs = running_actor_attrs();
1063 attrs.set(STATUS, "failed".to_string());
1064 attrs.set(STATUS_REASON, "something broke".to_string());
1065 attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
1066 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, test_actor_id("proc", "other", 0));
1067 attrs.set(FAILURE_ROOT_CAUSE_NAME, "OtherActor".to_string());
1068 attrs.set(FAILURE_OCCURRED_AT, SystemTime::UNIX_EPOCH);
1069 attrs.set(FAILURE_IS_PROPAGATED, true);
1070 attrs
1071 }
1072
1073 #[test]
1075 fn test_actor_view_round_trip_running() {
1076 let view = ActorAttrsView::from_attrs(&running_actor_attrs()).unwrap();
1077 assert_eq!(view.status, "running");
1078 assert_eq!(view.actor_type, "MyActor");
1079 assert_eq!(view.messages_processed, 42);
1080 assert!(view.failure.is_none());
1081
1082 let round_tripped = ActorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1083 assert_eq!(round_tripped, view);
1084 }
1085
1086 #[test]
1088 fn test_actor_view_round_trip_failed() {
1089 let view = ActorAttrsView::from_attrs(&failed_actor_attrs()).unwrap();
1090 assert_eq!(view.status, "failed");
1091 let fi = view.failure.as_ref().unwrap();
1092 assert_eq!(fi.error_message, "boom");
1093 assert!(fi.is_propagated);
1094
1095 let round_tripped = ActorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1096 assert_eq!(round_tripped, view);
1097 }
1098
1099 #[test]
1101 fn test_actor_view_missing_status() {
1102 let mut attrs = Attrs::new();
1103 attrs.set(ACTOR_TYPE, "X".to_string());
1104 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1105 assert_eq!(err, AttrsViewError::MissingKey { key: "status" });
1106 }
1107
1108 #[test]
1110 fn test_actor_view_missing_actor_type() {
1111 let mut attrs = Attrs::new();
1112 attrs.set(STATUS, "running".to_string());
1113 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1114 assert_eq!(err, AttrsViewError::MissingKey { key: "actor_type" });
1115 }
1116
1117 #[test]
1118 fn test_actor_view_ia3_rejects_reason_on_running() {
1119 let mut attrs = running_actor_attrs();
1120 attrs.set(STATUS_REASON, "should not be here".to_string());
1121 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1122 assert!(matches!(
1123 err,
1124 AttrsViewError::InvariantViolation { label: "IA-3", .. }
1125 ));
1126 }
1127
1128 #[test]
1129 fn test_actor_view_ia3_allows_terminal_without_reason() {
1130 let mut attrs = running_actor_attrs();
1131 attrs.set(STATUS, "stopped".to_string());
1132 let view = ActorAttrsView::from_attrs(&attrs).unwrap();
1134 assert_eq!(view.status, "stopped");
1135 assert!(view.status_reason.is_none());
1136 }
1137
1138 #[test]
1139 fn test_actor_view_ia4_rejects_failed_without_failure_attrs() {
1140 let mut attrs = running_actor_attrs();
1141 attrs.set(STATUS, "failed".to_string());
1142 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1144 assert!(matches!(
1145 err,
1146 AttrsViewError::InvariantViolation { label: "IA-4", .. }
1147 ));
1148 }
1149
1150 #[test]
1151 fn test_actor_view_ia4_rejects_failure_attrs_on_running() {
1152 let mut attrs = running_actor_attrs();
1153 attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
1154 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, test_actor_id("proc", "x", 0));
1155 attrs.set(FAILURE_OCCURRED_AT, SystemTime::UNIX_EPOCH);
1156 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1157 assert!(matches!(
1158 err,
1159 AttrsViewError::InvariantViolation { label: "IA-4", .. }
1160 ));
1161 }
1162
1163 #[test]
1165 fn test_actor_view_partial_failure_attrs_rejected() {
1166 let mut attrs = running_actor_attrs();
1167 attrs.set(STATUS, "failed".to_string());
1168 attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
1170 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1171 assert_eq!(
1172 err,
1173 AttrsViewError::MissingKey {
1174 key: "failure_root_cause_actor"
1175 }
1176 );
1177 }
1178
1179 #[test]
1193 fn test_fi7_fi8_propagated_stopped_child() {
1194 let proc_id = ProcId::with_name(ChannelAddr::Local(0), "test_proc");
1195 let child_id = proc_id.actor_id("proc_agent", 0);
1196 let parent_id = proc_id.actor_id("mesh_actor", 0);
1197
1198 let child_event = ActorSupervisionEvent::new(
1199 child_id.clone(),
1200 Some("proc_agent".into()),
1201 ActorStatus::Stopped("host died".into()),
1202 None,
1203 );
1204 let parent_event = ActorSupervisionEvent::new(
1205 parent_id.clone(),
1206 Some("mesh_actor".into()),
1207 ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(
1208 child_event,
1209 ))),
1210 None,
1211 );
1212
1213 let root = parent_event
1216 .actually_failing_actor()
1217 .expect("parent_event is a failure");
1218 let snap = FailureSnapshot {
1219 error_message: parent_event.actor_status.to_string(),
1220 root_cause_actor: root.actor_id.clone(),
1221 root_cause_name: root.display_name.clone(),
1222 occurred_at: parent_event.occurred_at,
1223 is_propagated: root.actor_id != parent_id,
1224 };
1225
1226 assert_eq!(snap.root_cause_actor, child_id);
1228 assert!(snap.is_propagated);
1230 assert_eq!(snap.root_cause_name.as_deref(), Some("proc_agent"));
1232
1233 let mut attrs = failed_actor_attrs();
1235 attrs.set(FAILURE_ERROR_MESSAGE, snap.error_message);
1236 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, snap.root_cause_actor.clone());
1237 if let Some(name) = &snap.root_cause_name {
1238 attrs.set(FAILURE_ROOT_CAUSE_NAME, name.clone());
1239 }
1240 attrs.set(FAILURE_OCCURRED_AT, snap.occurred_at);
1241 attrs.set(FAILURE_IS_PROPAGATED, snap.is_propagated);
1242
1243 let view = ActorAttrsView::from_attrs(&attrs).unwrap();
1244 assert_eq!(view.status, "failed");
1245 let fi = view.failure.as_ref().expect("failure_info must be present");
1246 assert_eq!(fi.root_cause_actor, child_id);
1248 assert!(fi.is_propagated);
1250 assert_eq!(fi.root_cause_name.as_deref(), Some("proc_agent"));
1252 }
1253}