hyperactor/introspect.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Introspection protocol for hyperactor actors.
10//!
11//! Every actor has a dedicated introspect task that handles
12//! [`IntrospectMessage`] by reading [`InstanceCell`] state directly,
13//! without going through the actor's message loop. This means:
14//!
15//! - Stuck actors can be introspected (the task runs independently).
16//! - Introspection does not perturb observed state (no Heisenberg).
17//! - Live status is reported accurately.
18//!
19//! Infrastructure actors publish domain-specific metadata via
20//! `publish_attrs()`, which the introspect task reads for
21//! Entity-view queries. Non-addressable children (e.g., system procs)
22//! are resolved via a callback registered on [`InstanceCell`].
23//!
24//! Callers navigate topology by fetching an [`IntrospectResult`]
25//! and following its `children` references.
26//!
27//! # Design Invariants
28//!
29//! The introspection subsystem maintains eleven invariants (S1--S11).
30//! Each is documented at the code site that enforces it.
31//!
32//! - **S1.** Introspection must not depend on actor responsiveness --
33//! a wedged actor can still be introspected (runtime task, not
34//! actor loop).
35//! - **S2.** Introspection must not perturb observed state -- reading
36//! `InstanceCell` never sets `last_message_handler` to
37//! `IntrospectMessage`.
38//! - **S3.** Sender routing is unchanged -- senders target the same
39//! `PortId` (`IntrospectMessage::port()`) across processes.
40//! - **S4.** `IntrospectMessage` never produces a `WorkCell` --
41//! pre-registration via `open_message_port` gives the introspect
42//! port its own channel, independent of the actor's work queue.
43//! - **S5.** Replies never use `PanickingMailboxSender` -- the
44//! introspect task replies via `Mailbox::serialize_and_send_once`.
45//! - **S6.** View semantics are stable -- Actor view uses live
46//! structural state + supervision children; Entity view uses
47//! published properties + domain children.
48//! - **S7.** `QueryChild` must work without actor handlers -- system
49//! procs are resolved via a per-actor callback on `InstanceCell`.
50//! - **S8.** Published properties are constrained -- actors cannot
51//! publish `Root` or `Error` payloads (only `Host` and `Proc`
52//! variants).
53//! - **S9.** Port binding is single source of truth -- the introspect
54//! port is bound exactly once via `bind_actor_port()` in
55//! `Instance::new()`.
56//! - **S10.** Introspect receiver lifecycle -- created in
57//! `Instance::new()`, spawned in `start()`, dropped in
58//! `child_instance()`.
59//! - **S11.** Terminated snapshots do not keep actors resolvable --
60//! `store_terminated_snapshot` writes to the proc's snapshot map,
61//! not the instances map. `resolve_actor_ref` checks terminal
62//! status independently and is unaffected by snapshot storage.
63//! - **S12.** Introspection must not impair actor liveness --
64//! introspection queries (including DashMap reads for actor
65//! enumeration) must not cause convoy starvation or scheduling
66//! delays that stall concurrent actor spawn/stop operations.
67//!
68//! ## Introspection key invariants (IK-*)
69//!
70//! - **IK-1 (metadata completeness):** Every actor-runtime
71//! introspection key must carry `@meta(INTROSPECT = ...)` with
72//! non-empty `name` and `desc`.
73//! - **IK-2 (short-name uniqueness):** No two introspection keys
74//! may share the same `IntrospectAttr.name`. Duplicates would break
75//! the FQ-to-short HTTP remap and schema output.
76//!
77//! ## Failure introspection invariants (FI-*)
78//!
79//! The FailureInfo presentation type lives in
80//! `hyperactor_mesh::introspect`; these invariants are documented
81//! here because the enforcement sites are in hyperactor
82//! (`proc.rs` `serve()`, `live_actor_payload`).
83//!
84//! - **FI-1 (event-before-status):** All `InstanceCell` state that
85//! `live_actor_payload` reads must be written BEFORE
86//! `change_status()` transitions to terminal.
87//! - **FI-2 (write-once):** `InstanceCellState::supervision_event`
88//! is written at most once per actor lifetime.
89//! - **FI-3 (failure attrs <-> status):** Failure attrs are present
90//! iff status is `"failed"`.
91//! - **FI-4 (is_propagated <-> root_cause_actor):**
92//! `failure_is_propagated == true` iff
93//! `failure_root_cause_actor != this_actor_id`.
94//! - **FI-5 (is_poisoned <-> failed_actor_count):**
95//! `is_poisoned == true` iff `failed_actor_count > 0`.
96//! - **FI-6 (clean stop = no artifacts):** When an actor stops
97//! cleanly, `supervision_event` is `None`, failure attrs are
98//! absent, and the actor does not contribute to
99//! `failed_actor_count`.
100//!
101//! ## Attrs view invariants (AV-*)
102//!
103//! These govern the typed view layer (`ActorAttrsView`). The
104//! full AV-* / DP-* family is documented in
105//! `hyperactor_mesh::introspect`; the subset relevant to this
106//! crate:
107//!
108//! - **AV-1 (view-roundtrip):** For each view V,
109//! `V::from_attrs(&v.to_attrs()) == Ok(v)`.
110//! - **AV-2 (required-key-strictness):** `from_attrs` fails iff
111//! required keys for that view are missing.
112//! - **AV-3 (unknown-key-tolerance):** Unknown attrs keys must
113//! not affect successful decode outcome.
114
115use std::fmt;
116use std::time::SystemTime;
117
118use hyperactor_config::Attrs;
119use hyperactor_config::INTROSPECT;
120use hyperactor_config::IntrospectAttr;
121use hyperactor_config::declare_attrs;
122use serde::Deserialize;
123use serde::Serialize;
124use typeuri::Named;
125
126use crate::InstanceCell;
127use crate::reference;
128
129// Introspection attr keys — actor-runtime concepts.
130//
131// These keys are populated by the introspect handler from
132// InstanceCell data. Mesh-topology keys (node_type, addr, num_procs,
133// etc.) are declared in hyperactor_mesh::introspect.
134//
135// Naming convention:
136//
137// - Attr names are node-type-agnostic. The `node_type` attr (from the
138// mesh layer) identifies what kind of node it is; individual attr
139// names don't repeat that. So `status`, not `actor_status`.
140// - Related attrs share a prefix to form a group. The `failure_*`
141// keys decompose failure info into flat attrs — the `failure_`
142// prefix groups them semantically.
143// - `actor_type` is an exception: the `actor_` prefix disambiguates
144// it from `node_type` (mesh-layer concept). `actor_type` is the
145// Rust actor type name; `node_type` is the topology role.
146// - Use real types where possible (e.g. SystemTime for timestamps),
147// not String. Serialization format is a presentation concern.
148// - Internal key names are fully-qualified by `declare_attrs!`
149// (module_path + attr constant), e.g.
150// `hyperactor::introspect::status`.
151// - HTTP/schema public key names come from `@meta(INTROSPECT =
152// IntrospectAttr { name, desc })`. Keep `name` explicit so API
153// stability is decoupled from internal refactors.
154//
155// See IK-1 (metadata completeness) and IK-2 (short-name uniqueness)
156// in module doc.
157declare_attrs! {
158 /// Actor lifecycle status: "running", "stopped", "failed".
159 ///
160 /// Together with `STATUS_REASON`, these two attrs replace the
161 /// former `actor_status` prefix protocol (`"stopped:reason"`,
162 /// `"failed:reason"`) with structured fields, eliminating string
163 /// prefix parsing in consumers.
164 @meta(INTROSPECT = IntrospectAttr {
165 name: "status".into(),
166 desc: "Actor lifecycle status: running, stopped, failed".into(),
167 })
168 pub attr STATUS: String;
169
170 /// Reason for stop/failure (absent when running).
171 @meta(INTROSPECT = IntrospectAttr {
172 name: "status_reason".into(),
173 desc: "Reason for stop/failure (absent when running)".into(),
174 })
175 pub attr STATUS_REASON: String;
176
177 /// Fully-qualified actor type name.
178 @meta(INTROSPECT = IntrospectAttr {
179 name: "actor_type".into(),
180 desc: "Fully-qualified actor type name".into(),
181 })
182 pub attr ACTOR_TYPE: String;
183
184 /// Number of messages processed by this actor.
185 @meta(INTROSPECT = IntrospectAttr {
186 name: "messages_processed".into(),
187 desc: "Number of messages processed by this actor".into(),
188 })
189 pub attr MESSAGES_PROCESSED: u64 = 0;
190
191 /// Timestamp when this actor was created.
192 @meta(INTROSPECT = IntrospectAttr {
193 name: "created_at".into(),
194 desc: "Timestamp when this actor was created".into(),
195 })
196 pub attr CREATED_AT: SystemTime;
197
198 /// Name of the last message handler invoked.
199 @meta(INTROSPECT = IntrospectAttr {
200 name: "last_handler".into(),
201 desc: "Name of the last message handler invoked".into(),
202 })
203 pub attr LAST_HANDLER: String;
204
205 /// Total CPU time in message handlers (microseconds).
206 @meta(INTROSPECT = IntrospectAttr {
207 name: "total_processing_time_us".into(),
208 desc: "Total CPU time in message handlers (microseconds)".into(),
209 })
210 pub attr TOTAL_PROCESSING_TIME_US: u64 = 0;
211
212 /// Flight recorder JSON (recent trace events).
213 @meta(INTROSPECT = IntrospectAttr {
214 name: "flight_recorder".into(),
215 desc: "Flight recorder JSON (recent trace events)".into(),
216 })
217 pub attr FLIGHT_RECORDER: String;
218
219 /// Whether this actor is infrastructure/system.
220 @meta(INTROSPECT = IntrospectAttr {
221 name: "is_system".into(),
222 desc: "Whether this actor is infrastructure/system".into(),
223 })
224 pub attr IS_SYSTEM: bool = false;
225
226 /// Child reference strings for tree navigation. Published by
227 /// infrastructure actors (HostMeshAgent, ProcAgent) so the
228 /// Entity view can return children without parsing mesh-layer keys.
229 @meta(INTROSPECT = IntrospectAttr {
230 name: "children".into(),
231 desc: "Child reference strings for tree navigation".into(),
232 })
233 pub attr CHILDREN: Vec<String>;
234
235 /// Machine-readable error code for error nodes.
236 @meta(INTROSPECT = IntrospectAttr {
237 name: "error_code".into(),
238 desc: "Machine-readable error code (e.g. not_found)".into(),
239 })
240 pub attr ERROR_CODE: String;
241
242 /// Human-readable error message for error nodes.
243 @meta(INTROSPECT = IntrospectAttr {
244 name: "error_message".into(),
245 desc: "Human-readable error message".into(),
246 })
247 pub attr ERROR_MESSAGE: String;
248
249 // Failure attrs — decomposition of FailureInfo into flat attrs.
250 //
251 // - **FI-A1 (presence):** failure_* attrs are present iff
252 // status == "failed"; absent otherwise. (Attr-level restatement
253 // of FI-3.)
254 // - **FI-A2 (propagation):** failure_is_propagated == true iff
255 // failure_root_cause_actor != this actor's id. (Attr-level
256 // restatement of FI-4.)
257 // FI-1, FI-2 (write ordering) are enforced in proc.rs serve()
258 // and are unaffected by the representation change.
259 // FI-5, FI-6 are proc/mesh-level and unaffected.
260
261 /// Failure error message.
262 @meta(INTROSPECT = IntrospectAttr {
263 name: "failure_error_message".into(),
264 desc: "Failure error message".into(),
265 })
266 pub attr FAILURE_ERROR_MESSAGE: String;
267
268 /// Actor that caused the failure (root cause).
269 @meta(INTROSPECT = IntrospectAttr {
270 name: "failure_root_cause_actor".into(),
271 desc: "Actor that caused the failure (root cause)".into(),
272 })
273 pub attr FAILURE_ROOT_CAUSE_ACTOR: String;
274
275 /// Name of root cause actor.
276 @meta(INTROSPECT = IntrospectAttr {
277 name: "failure_root_cause_name".into(),
278 desc: "Name of root cause actor".into(),
279 })
280 pub attr FAILURE_ROOT_CAUSE_NAME: String;
281
282 /// Timestamp when failure occurred.
283 @meta(INTROSPECT = IntrospectAttr {
284 name: "failure_occurred_at".into(),
285 desc: "Timestamp when failure occurred".into(),
286 })
287 pub attr FAILURE_OCCURRED_AT: SystemTime;
288
289 /// Whether the failure was propagated from a child.
290 @meta(INTROSPECT = IntrospectAttr {
291 name: "failure_is_propagated".into(),
292 desc: "Whether the failure was propagated from a child".into(),
293 })
294 pub attr FAILURE_IS_PROPAGATED: bool = false;
295}
296
297// See FI-1 through FI-6 in module doc.
298
299/// Error from decoding an `Attrs` bag into a typed view.
300#[derive(Debug, Clone, PartialEq)]
301pub enum AttrsViewError {
302 /// A required key was absent (and has no default).
303 MissingKey {
304 /// The attr key that was absent.
305 key: &'static str,
306 },
307 /// A cross-field coherence check failed.
308 InvariantViolation {
309 /// Invariant label (e.g. "IA-4").
310 label: &'static str,
311 /// Human-readable description of the violation.
312 detail: String,
313 },
314}
315
316impl fmt::Display for AttrsViewError {
317 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
318 match self {
319 Self::MissingKey { key } => write!(f, "missing required key: {key}"),
320 Self::InvariantViolation { label, detail } => {
321 write!(f, "invariant {label} violated: {detail}")
322 }
323 }
324 }
325}
326
327impl std::error::Error for AttrsViewError {}
328
329impl AttrsViewError {
330 /// Convenience constructor for a missing required key.
331 pub fn missing(key: &'static str) -> Self {
332 Self::MissingKey { key }
333 }
334
335 /// Convenience constructor for an invariant violation.
336 pub fn invariant(label: &'static str, detail: String) -> Self {
337 Self::InvariantViolation { label, detail }
338 }
339}
340
341/// Structured failure fields decoded from `FAILURE_*` attrs.
342#[derive(Debug, Clone, PartialEq)]
343pub struct FailureAttrs {
344 /// Error message describing the failure.
345 pub error_message: String,
346 /// Actor ID of the root-cause actor.
347 pub root_cause_actor: String,
348 /// Display name of the root-cause actor, if available.
349 pub root_cause_name: Option<String>,
350 /// When the failure occurred.
351 pub occurred_at: SystemTime,
352 /// Whether this failure was propagated from a child.
353 pub is_propagated: bool,
354}
355
356/// Typed view over attrs for an actor node.
357#[derive(Debug, Clone, PartialEq)]
358pub struct ActorAttrsView {
359 /// Lifecycle status: "running", "stopped", "failed".
360 pub status: String,
361 /// Reason for stop/failure, if any.
362 pub status_reason: Option<String>,
363 /// Fully-qualified actor type name.
364 pub actor_type: String,
365 /// Number of messages processed.
366 pub messages_processed: u64,
367 /// When this actor was created.
368 pub created_at: Option<SystemTime>,
369 /// Name of the last message handler invoked.
370 pub last_handler: Option<String>,
371 /// Total CPU time in message handlers (microseconds).
372 pub total_processing_time_us: u64,
373 /// Flight recorder JSON, if available.
374 pub flight_recorder: Option<String>,
375 /// Whether this is a system/infrastructure actor.
376 pub is_system: bool,
377 /// Failure details, present iff status == "failed".
378 pub failure: Option<FailureAttrs>,
379}
380
381impl ActorAttrsView {
382 /// Decode from an `Attrs` bag (AV-2, AV-3). Requires `STATUS`
383 /// and `ACTOR_TYPE`. Enforces IA-3 (status_reason must not be
384 /// present for non-terminal status), IA-4 (failure attrs iff
385 /// failed), and failure completeness (if any required failure
386 /// key is present, all three required keys must be).
387 pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
388 let status = attrs
389 .get(STATUS)
390 .ok_or_else(|| AttrsViewError::missing("status"))?
391 .clone();
392 let status_reason = attrs.get(STATUS_REASON).cloned();
393 let actor_type = attrs
394 .get(ACTOR_TYPE)
395 .ok_or_else(|| AttrsViewError::missing("actor_type"))?
396 .clone();
397 let messages_processed = *attrs.get(MESSAGES_PROCESSED).unwrap_or(&0);
398 let created_at = attrs.get(CREATED_AT).copied();
399 let last_handler = attrs.get(LAST_HANDLER).cloned();
400 let total_processing_time_us = *attrs.get(TOTAL_PROCESSING_TIME_US).unwrap_or(&0);
401 let flight_recorder = attrs.get(FLIGHT_RECORDER).cloned();
402 let is_system = *attrs.get(IS_SYSTEM).unwrap_or(&false);
403
404 // IA-3 (one-sided): status_reason must not be present for
405 // non-terminal status. The converse is not enforced —
406 // terminal status without a reason is valid (clean shutdown).
407 let is_terminal = status == "stopped" || status == "failed";
408 if status_reason.is_some() && !is_terminal {
409 return Err(AttrsViewError::invariant(
410 "IA-3",
411 format!(
412 "status_reason present but status is '{status}' (expected stopped or failed)"
413 ),
414 ));
415 }
416
417 // Decode failure attrs. If any of the three required
418 // failure keys is present, require all three.
419 // FAILURE_IS_PROPAGATED has a declare_attrs! default of
420 // false, so it always resolves via attrs.get() and needs
421 // no explicit presence check. FAILURE_ROOT_CAUSE_NAME is
422 // genuinely optional.
423 let has_any_failure = attrs.get(FAILURE_ERROR_MESSAGE).is_some()
424 || attrs.get(FAILURE_ROOT_CAUSE_ACTOR).is_some()
425 || attrs.get(FAILURE_OCCURRED_AT).is_some();
426
427 let failure = if has_any_failure {
428 let error_message = attrs
429 .get(FAILURE_ERROR_MESSAGE)
430 .ok_or_else(|| AttrsViewError::missing("failure_error_message"))?
431 .clone();
432 let root_cause_actor = attrs
433 .get(FAILURE_ROOT_CAUSE_ACTOR)
434 .ok_or_else(|| AttrsViewError::missing("failure_root_cause_actor"))?
435 .clone();
436 let root_cause_name = attrs.get(FAILURE_ROOT_CAUSE_NAME).cloned();
437 let occurred_at = *attrs
438 .get(FAILURE_OCCURRED_AT)
439 .ok_or_else(|| AttrsViewError::missing("failure_occurred_at"))?;
440 // Default false: failure originated at this actor.
441 let is_propagated = *attrs.get(FAILURE_IS_PROPAGATED).unwrap_or(&false);
442 Some(FailureAttrs {
443 error_message,
444 root_cause_actor,
445 root_cause_name,
446 occurred_at,
447 is_propagated,
448 })
449 } else {
450 None
451 };
452
453 // IA-4: failure attrs present iff status == "failed".
454 if status == "failed" && failure.is_none() {
455 return Err(AttrsViewError::invariant(
456 "IA-4",
457 "status is 'failed' but no failure_* attrs present".to_string(),
458 ));
459 }
460 if status != "failed" && failure.is_some() {
461 return Err(AttrsViewError::invariant(
462 "IA-4",
463 format!("status is '{status}' but failure_* attrs are present"),
464 ));
465 }
466
467 Ok(Self {
468 status,
469 status_reason,
470 actor_type,
471 messages_processed,
472 created_at,
473 last_handler,
474 total_processing_time_us,
475 flight_recorder,
476 is_system,
477 failure,
478 })
479 }
480
481 /// Encode into an `Attrs` bag (AV-1 round-trip producer).
482 pub fn to_attrs(&self) -> Attrs {
483 let mut attrs = Attrs::new();
484 attrs.set(STATUS, self.status.clone());
485 if let Some(reason) = &self.status_reason {
486 attrs.set(STATUS_REASON, reason.clone());
487 }
488 attrs.set(ACTOR_TYPE, self.actor_type.clone());
489 attrs.set(MESSAGES_PROCESSED, self.messages_processed);
490 if let Some(t) = self.created_at {
491 attrs.set(CREATED_AT, t);
492 }
493 if let Some(handler) = &self.last_handler {
494 attrs.set(LAST_HANDLER, handler.clone());
495 }
496 attrs.set(TOTAL_PROCESSING_TIME_US, self.total_processing_time_us);
497 if let Some(fr) = &self.flight_recorder {
498 attrs.set(FLIGHT_RECORDER, fr.clone());
499 }
500 attrs.set(IS_SYSTEM, self.is_system);
501 if let Some(fi) = &self.failure {
502 attrs.set(FAILURE_ERROR_MESSAGE, fi.error_message.clone());
503 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, fi.root_cause_actor.clone());
504 if let Some(name) = &fi.root_cause_name {
505 attrs.set(FAILURE_ROOT_CAUSE_NAME, name.clone());
506 }
507 attrs.set(FAILURE_OCCURRED_AT, fi.occurred_at);
508 attrs.set(FAILURE_IS_PROPAGATED, fi.is_propagated);
509 }
510 attrs
511 }
512}
513
514/// Internal introspection result. Carries attrs as a JSON string.
515/// The mesh layer constructs the API-facing `NodePayload` (with
516/// `properties`) from this via `derive_properties`.
517///
518/// This is the internal wire type — it travels over actor ports
519/// via `IntrospectMessage`. The presentation-layer `NodePayload`
520/// (with `NodeProperties`) lives in `hyperactor_mesh::introspect`.
521#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
522pub struct IntrospectResult {
523 /// Canonical reference string for this node.
524 pub identity: String,
525 /// JSON-serialized `Attrs` bag containing introspection attributes.
526 pub attrs: String,
527 /// Reference strings the client can GET next to descend the tree.
528 pub children: Vec<String>,
529 /// Parent node reference for upward navigation.
530 pub parent: Option<String>,
531 /// ISO 8601 timestamp indicating when this data was captured.
532 pub as_of: String,
533}
534wirevalue::register_type!(IntrospectResult);
535
536/// Context for introspection query - what aspect of the actor to
537/// describe.
538///
539/// Infrastructure actors (e.g., ProcAgent, HostAgent)
540/// have dual nature: they manage entities (Proc, Host) while also
541/// being actors themselves. IntrospectView allows callers to
542/// specify which aspect to query.
543// TODO(monarch-introspection): IntrospectView currently uses
544// Entity/Actor naming. Consider renaming to runtime-neutral query
545// modes (e.g. Published/Runtime) to avoid mesh-domain wording in
546// hyperactor while preserving behavior and wire compatibility.
547#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Named)]
548pub enum IntrospectView {
549 /// Return managed-entity properties (Proc, Host, etc.) for
550 /// infrastructure actors.
551 Entity,
552 /// Return standard actor properties (status, messages_processed,
553 /// flight_recorder).
554 Actor,
555}
556wirevalue::register_type!(IntrospectView);
557
558/// Introspection query sent to any actor.
559///
560/// `Query` asks the actor to describe itself. `QueryChild` asks the
561/// actor to describe one of its non-addressable children — an entity
562/// that appears in the navigation tree but has no mailbox of its own
563/// (e.g. a system proc owned by a host). The parent actor answers on
564/// the child's behalf.
565#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
566pub enum IntrospectMessage {
567 /// "Describe yourself."
568 Query {
569 /// View context - Entity or Actor.
570 view: IntrospectView,
571 /// Reply port receiving the actor's self-description.
572 reply: reference::OncePortRef<IntrospectResult>,
573 },
574 /// "Describe one of your children."
575 QueryChild {
576 /// Reference identifying the child to describe.
577 child_ref: reference::Reference,
578 /// Reply port receiving the child's description.
579 reply: reference::OncePortRef<IntrospectResult>,
580 },
581}
582wirevalue::register_type!(IntrospectMessage);
583
584/// Structured tracing event from the actor-local flight recorder.
585///
586/// Deserialization target for the `FLIGHT_RECORDER` attrs JSON string.
587#[derive(Debug, Clone, Serialize, Deserialize)]
588pub struct RecordedEvent {
589 /// ISO 8601 timestamp of the event.
590 pub timestamp: String,
591 /// Monotonic sequence number for ordering.
592 #[serde(default)]
593 pub seq: usize,
594 /// Event level (INFO, DEBUG, etc.).
595 pub level: String,
596 /// Event target (module path).
597 #[serde(default)]
598 pub target: String,
599 /// Event name.
600 pub name: String,
601 /// Event fields as JSON.
602 pub fields: serde_json::Value,
603}
604
605/// Format a [`SystemTime`] as an ISO 8601 timestamp with millisecond
606/// precision.
607pub fn format_timestamp(time: SystemTime) -> String {
608 humantime::format_rfc3339_millis(time).to_string()
609}
610
611/// Build a JSON-serialized `Attrs` string from values already
612/// computed by `live_actor_payload`. Reuses the same data — no
613/// redundant reads from `InstanceCell`.
614///
615/// Populates actor-runtime keys (STATUS, ACTOR_TYPE, etc.),
616/// decomposes the status prefix protocol into STATUS + STATUS_REASON,
617/// and decomposes failure fields into individual FAILURE_* attrs.
618///
619/// Starts from a fresh `Attrs` bag — published attrs (node_type,
620/// addr, etc.) are NOT included. This ensures the Actor view
621/// produces actor-only data; the Entity view handles published
622/// attrs separately.
623/// Failure fields extracted from a supervision event.
624struct FailureSnapshot {
625 error_message: String,
626 root_cause_actor: String,
627 root_cause_name: Option<String>,
628 occurred_at: String,
629 is_propagated: bool,
630}
631
632/// Pre-computed actor state for building the attrs JSON string.
633/// Avoids redundant InstanceCell reads — `live_actor_payload`
634/// computes these once and passes them in.
635struct ActorSnapshot {
636 status_str: String,
637 is_system: bool,
638 last_handler: Option<String>,
639 flight_recorder: Option<String>,
640 failure: Option<FailureSnapshot>,
641}
642
643fn build_actor_attrs(cell: &crate::InstanceCell, snap: &ActorSnapshot) -> String {
644 // Actor view builds a clean attrs bag with only actor-runtime
645 // keys. Published attrs (node_type, addr, etc.) belong to the
646 // Entity view — they are NOT merged here. This ensures that
647 // e.g. a HostMeshAgent resolved via Actor view produces Actor
648 // properties, not Host properties.
649 let mut attrs = hyperactor_config::Attrs::new();
650
651 // IA-3: status_reason present iff status carries a reason.
652 if let Some(reason) = snap.status_str.strip_prefix("stopped:") {
653 attrs.set(STATUS, "stopped".to_string());
654 attrs.set(STATUS_REASON, reason.trim().to_string());
655 } else if let Some(reason) = snap.status_str.strip_prefix("failed:") {
656 attrs.set(STATUS, "failed".to_string());
657 attrs.set(STATUS_REASON, reason.trim().to_string());
658 } else {
659 attrs.set(STATUS, snap.status_str.clone());
660 // IA-3: no status_reason for non-terminal states —
661 // guaranteed by fresh Attrs bag.
662 }
663
664 attrs.set(ACTOR_TYPE, cell.actor_type_name().to_string());
665 attrs.set(MESSAGES_PROCESSED, cell.num_processed_messages());
666 attrs.set(CREATED_AT, cell.created_at());
667 attrs.set(TOTAL_PROCESSING_TIME_US, cell.total_processing_time_us());
668 attrs.set(IS_SYSTEM, snap.is_system);
669
670 if let Some(handler) = &snap.last_handler {
671 attrs.set(LAST_HANDLER, handler.clone());
672 }
673 if let Some(fr) = &snap.flight_recorder {
674 attrs.set(FLIGHT_RECORDER, fr.clone());
675 }
676
677 // IA-4 / FI-A1: failure attrs present iff status == "failed".
678 if let Some(fi) = &snap.failure {
679 attrs.set(FAILURE_ERROR_MESSAGE, fi.error_message.clone());
680 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, fi.root_cause_actor.clone());
681 if let Some(name) = &fi.root_cause_name {
682 attrs.set(FAILURE_ROOT_CAUSE_NAME, name.clone());
683 }
684 if let Ok(t) = humantime::parse_rfc3339(&fi.occurred_at) {
685 attrs.set(FAILURE_OCCURRED_AT, t);
686 }
687 attrs.set(FAILURE_IS_PROPAGATED, fi.is_propagated);
688 }
689 // IA-4: failure attrs absent when not failed — guaranteed by
690 // starting from a fresh Attrs bag (no stale keys possible).
691
692 serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string())
693}
694
695/// Build an [`IntrospectResult`] from live [`InstanceCell`] state.
696///
697/// Reads the current live status and last handler directly from
698/// the cell. Used by the introspect task (which runs outside
699/// the actor's message loop) and by `Instance::introspect_payload`.
700pub fn live_actor_payload(cell: &InstanceCell) -> IntrospectResult {
701 let actor_id = cell.actor_id();
702 let status = cell.status().borrow().clone();
703 let last_handler = cell.last_message_handler();
704
705 let children: Vec<String> = cell
706 .child_actor_ids()
707 .into_iter()
708 .map(|id| id.to_string())
709 .collect();
710
711 let events = cell.recording().tail();
712 let flight_recorder_events: Vec<RecordedEvent> = events
713 .into_iter()
714 .map(|event| RecordedEvent {
715 timestamp: format_timestamp(event.time),
716 seq: event.seq,
717 level: event.metadata.level().to_string(),
718 target: event.metadata.target().to_string(),
719 name: event.metadata.name().to_string(),
720 fields: event.json_value(),
721 })
722 .collect();
723
724 let flight_recorder = if flight_recorder_events.is_empty() {
725 None
726 } else {
727 serde_json::to_string(&flight_recorder_events).ok()
728 };
729
730 let supervisor = cell.parent().map(|p| p.actor_id().to_string());
731
732 // FI-3: failure_info is computed from the same status value as
733 // actor_status, ensuring they agree on whether the actor failed.
734 let failure = if status.is_failed() {
735 cell.supervision_event().map(|event| {
736 let root = event.actually_failing_actor();
737 FailureSnapshot {
738 error_message: event.actor_status.to_string(),
739 root_cause_actor: root.actor_id.to_string(),
740 root_cause_name: root.display_name.clone(),
741 occurred_at: format_timestamp(event.occurred_at),
742 is_propagated: root.actor_id != *actor_id,
743 }
744 })
745 } else {
746 None
747 };
748
749 let snap = ActorSnapshot {
750 status_str: status.to_string(),
751 is_system: cell.is_system(),
752 last_handler: last_handler.map(|info| info.to_string()),
753 flight_recorder,
754 failure,
755 };
756
757 let attrs = build_actor_attrs(cell, &snap);
758
759 IntrospectResult {
760 identity: actor_id.to_string(),
761 attrs,
762 children,
763 parent: supervisor,
764 as_of: format_timestamp(std::time::SystemTime::now()),
765 }
766}
767
768/// Introspect task: runs on a dedicated tokio task per actor,
769/// handling [`IntrospectMessage`] by reading [`InstanceCell`]
770/// directly and replying via the actor's [`Mailbox`].
771///
772/// The actor's message loop never sees these messages.
773///
774/// # Invariants exercised
775///
776/// Exercises S1, S2, S4, S5, S6, S11 (see module doc).
777pub async fn serve_introspect(
778 cell: InstanceCell,
779 mailbox: crate::mailbox::Mailbox,
780 mut receiver: crate::mailbox::PortReceiver<IntrospectMessage>,
781) {
782 use crate::actor::ActorStatus;
783 use crate::mailbox::PortSender as _;
784
785 // Watch for terminal status so we can break the reference cycle:
786 // InstanceCellState → Ports → introspect sender → keeps receiver
787 // open → this task holds InstanceCell → InstanceCellState.
788 // Without this, a stopped actor's InstanceCellState is never
789 // dropped and the actor lingers in the proc's instances map.
790 let mut status = cell.status().clone();
791
792 loop {
793 let msg = tokio::select! {
794 msg = receiver.recv() => {
795 match msg {
796 Ok(msg) => msg,
797 Err(_) => {
798 // Channel closed. If the actor reached a
799 // terminal state, snapshot it before exiting
800 // so it remains queryable post-mortem.
801 if cell.status().borrow().is_terminal() {
802 let snapshot = live_actor_payload(&cell);
803 cell.store_terminated_snapshot(snapshot);
804 }
805 break;
806 }
807 }
808 }
809 _ = status.wait_for(ActorStatus::is_terminal) => {
810 // Snapshot for post-mortem introspection before
811 // dropping our InstanceCell reference.
812 let snapshot = live_actor_payload(&cell);
813 cell.store_terminated_snapshot(snapshot);
814 break;
815 }
816 };
817
818 let result = match msg {
819 IntrospectMessage::Query { view, reply } => {
820 let payload = match view {
821 IntrospectView::Entity => match cell.published_attrs() {
822 Some(published) => {
823 let attrs_json =
824 serde_json::to_string(&published).unwrap_or_else(|_| "{}".into());
825 let children: Vec<String> =
826 published.get(CHILDREN).cloned().unwrap_or_default();
827 IntrospectResult {
828 identity: cell.actor_id().to_string(),
829 attrs: attrs_json,
830 children,
831 parent: cell.parent().map(|p| p.actor_id().to_string()),
832 as_of: format_timestamp(std::time::SystemTime::now()),
833 }
834 }
835 None => live_actor_payload(&cell),
836 },
837 IntrospectView::Actor => live_actor_payload(&cell),
838 };
839 mailbox.serialize_and_send_once(
840 reply,
841 payload,
842 crate::mailbox::monitored_return_handle(),
843 )
844 }
845 IntrospectMessage::QueryChild { child_ref, reply } => {
846 let payload = cell.query_child(&child_ref).unwrap_or_else(|| {
847 let mut error_attrs = hyperactor_config::Attrs::new();
848 error_attrs.set(ERROR_CODE, "not_found".to_string());
849 error_attrs.set(
850 ERROR_MESSAGE,
851 format!("child {} not found (no callback registered)", child_ref),
852 );
853 IntrospectResult {
854 identity: String::new(),
855 attrs: serde_json::to_string(&error_attrs)
856 .unwrap_or_else(|_| "{}".to_string()),
857 children: Vec::new(),
858 parent: None,
859 as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now())
860 .to_string(),
861 }
862 });
863 mailbox.serialize_and_send_once(
864 reply,
865 payload,
866 crate::mailbox::monitored_return_handle(),
867 )
868 }
869 };
870 if let Err(e) = result {
871 tracing::debug!("introspect reply failed: {e}");
872 }
873 }
874 tracing::debug!(
875 actor_id = %cell.actor_id(),
876 "introspect task exiting"
877 );
878}
879
880#[cfg(test)]
881mod tests {
882 use super::*;
883
884 /// Exercises IK-1 (see module doc).
885 #[test]
886 fn test_introspect_keys_are_tagged() {
887 let cases = vec![
888 ("status", STATUS.attrs()),
889 ("status_reason", STATUS_REASON.attrs()),
890 ("actor_type", ACTOR_TYPE.attrs()),
891 ("messages_processed", MESSAGES_PROCESSED.attrs()),
892 ("created_at", CREATED_AT.attrs()),
893 ("last_handler", LAST_HANDLER.attrs()),
894 ("total_processing_time_us", TOTAL_PROCESSING_TIME_US.attrs()),
895 ("flight_recorder", FLIGHT_RECORDER.attrs()),
896 ("is_system", IS_SYSTEM.attrs()),
897 ("children", CHILDREN.attrs()),
898 ("error_code", ERROR_CODE.attrs()),
899 ("error_message", ERROR_MESSAGE.attrs()),
900 ("failure_error_message", FAILURE_ERROR_MESSAGE.attrs()),
901 ("failure_root_cause_actor", FAILURE_ROOT_CAUSE_ACTOR.attrs()),
902 ("failure_root_cause_name", FAILURE_ROOT_CAUSE_NAME.attrs()),
903 ("failure_occurred_at", FAILURE_OCCURRED_AT.attrs()),
904 ("failure_is_propagated", FAILURE_IS_PROPAGATED.attrs()),
905 ];
906
907 for (expected_name, meta) in &cases {
908 // IK-1: see module doc.
909 let introspect = meta
910 .get(INTROSPECT)
911 .unwrap_or_else(|| panic!("{expected_name}: missing INTROSPECT meta-attr"));
912 assert_eq!(
913 introspect.name, *expected_name,
914 "short name mismatch for {expected_name}"
915 );
916 assert!(
917 !introspect.desc.is_empty(),
918 "{expected_name}: desc should not be empty"
919 );
920 }
921
922 // Exhaustiveness: verify cases covers all INTROSPECT-tagged
923 // keys declared in this module.
924 use hyperactor_config::attrs::AttrKeyInfo;
925 let registry_count = inventory::iter::<AttrKeyInfo>()
926 .filter(|info| {
927 info.name.starts_with("hyperactor::introspect::")
928 && info.meta.get(INTROSPECT).is_some()
929 })
930 .count();
931 assert_eq!(
932 cases.len(),
933 registry_count,
934 "test must cover all INTROSPECT-tagged keys in this module"
935 );
936 }
937
938 /// Exercises IK-2 (see module doc).
939 #[test]
940 fn test_introspect_short_names_are_globally_unique() {
941 use hyperactor_config::attrs::AttrKeyInfo;
942
943 let mut seen = std::collections::HashMap::new();
944 for info in inventory::iter::<AttrKeyInfo>() {
945 let Some(introspect) = info.meta.get(INTROSPECT) else {
946 continue;
947 };
948 // Metadata quality: every tagged key must have
949 // non-empty name and desc.
950 assert!(
951 !introspect.name.is_empty(),
952 "INTROSPECT key {:?} has empty name",
953 info.name
954 );
955 assert!(
956 !introspect.desc.is_empty(),
957 "INTROSPECT key {:?} has empty desc",
958 info.name
959 );
960 if let Some(prev_fq) = seen.insert(introspect.name.clone(), info.name) {
961 panic!(
962 "IK-2 violation: duplicate short name {:?} declared by both {:?} and {:?}",
963 introspect.name, prev_fq, info.name
964 );
965 }
966 }
967 }
968
969 // IA-1 tests require spawning actors and live in actor.rs
970 // where #[hyperactor::export] and test infrastructure are
971 // available. IA-3 and IA-4 are tested below at the view level.
972
973 fn running_actor_attrs() -> Attrs {
974 let mut attrs = Attrs::new();
975 attrs.set(STATUS, "running".to_string());
976 attrs.set(ACTOR_TYPE, "MyActor".to_string());
977 attrs.set(MESSAGES_PROCESSED, 42u64);
978 attrs.set(CREATED_AT, SystemTime::UNIX_EPOCH);
979 attrs.set(IS_SYSTEM, false);
980 attrs
981 }
982
983 fn failed_actor_attrs() -> Attrs {
984 let mut attrs = running_actor_attrs();
985 attrs.set(STATUS, "failed".to_string());
986 attrs.set(STATUS_REASON, "something broke".to_string());
987 attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
988 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, "other[0]".to_string());
989 attrs.set(FAILURE_ROOT_CAUSE_NAME, "OtherActor".to_string());
990 attrs.set(FAILURE_OCCURRED_AT, SystemTime::UNIX_EPOCH);
991 attrs.set(FAILURE_IS_PROPAGATED, true);
992 attrs
993 }
994
995 /// AV-1: from_attrs(to_attrs(v)) == v.
996 #[test]
997 fn test_actor_view_round_trip_running() {
998 let view = ActorAttrsView::from_attrs(&running_actor_attrs()).unwrap();
999 assert_eq!(view.status, "running");
1000 assert_eq!(view.actor_type, "MyActor");
1001 assert_eq!(view.messages_processed, 42);
1002 assert!(view.failure.is_none());
1003
1004 let round_tripped = ActorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1005 assert_eq!(round_tripped, view);
1006 }
1007
1008 /// AV-1.
1009 #[test]
1010 fn test_actor_view_round_trip_failed() {
1011 let view = ActorAttrsView::from_attrs(&failed_actor_attrs()).unwrap();
1012 assert_eq!(view.status, "failed");
1013 let fi = view.failure.as_ref().unwrap();
1014 assert_eq!(fi.error_message, "boom");
1015 assert!(fi.is_propagated);
1016
1017 let round_tripped = ActorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1018 assert_eq!(round_tripped, view);
1019 }
1020
1021 /// AV-2: missing required key rejected.
1022 #[test]
1023 fn test_actor_view_missing_status() {
1024 let mut attrs = Attrs::new();
1025 attrs.set(ACTOR_TYPE, "X".to_string());
1026 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1027 assert_eq!(err, AttrsViewError::MissingKey { key: "status" });
1028 }
1029
1030 /// AV-2.
1031 #[test]
1032 fn test_actor_view_missing_actor_type() {
1033 let mut attrs = Attrs::new();
1034 attrs.set(STATUS, "running".to_string());
1035 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1036 assert_eq!(err, AttrsViewError::MissingKey { key: "actor_type" });
1037 }
1038
1039 #[test]
1040 fn test_actor_view_ia3_rejects_reason_on_running() {
1041 let mut attrs = running_actor_attrs();
1042 attrs.set(STATUS_REASON, "should not be here".to_string());
1043 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1044 assert!(matches!(
1045 err,
1046 AttrsViewError::InvariantViolation { label: "IA-3", .. }
1047 ));
1048 }
1049
1050 #[test]
1051 fn test_actor_view_ia3_allows_terminal_without_reason() {
1052 let mut attrs = running_actor_attrs();
1053 attrs.set(STATUS, "stopped".to_string());
1054 // No status_reason — should be fine.
1055 let view = ActorAttrsView::from_attrs(&attrs).unwrap();
1056 assert_eq!(view.status, "stopped");
1057 assert!(view.status_reason.is_none());
1058 }
1059
1060 #[test]
1061 fn test_actor_view_ia4_rejects_failed_without_failure_attrs() {
1062 let mut attrs = running_actor_attrs();
1063 attrs.set(STATUS, "failed".to_string());
1064 // No failure_* keys.
1065 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1066 assert!(matches!(
1067 err,
1068 AttrsViewError::InvariantViolation { label: "IA-4", .. }
1069 ));
1070 }
1071
1072 #[test]
1073 fn test_actor_view_ia4_rejects_failure_attrs_on_running() {
1074 let mut attrs = running_actor_attrs();
1075 attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
1076 attrs.set(FAILURE_ROOT_CAUSE_ACTOR, "x[0]".to_string());
1077 attrs.set(FAILURE_OCCURRED_AT, SystemTime::UNIX_EPOCH);
1078 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1079 assert!(matches!(
1080 err,
1081 AttrsViewError::InvariantViolation { label: "IA-4", .. }
1082 ));
1083 }
1084
1085 /// AV-2: partial failure set → missing key.
1086 #[test]
1087 fn test_actor_view_partial_failure_attrs_rejected() {
1088 let mut attrs = running_actor_attrs();
1089 attrs.set(STATUS, "failed".to_string());
1090 // Only one of the three required failure keys.
1091 attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
1092 let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1093 assert_eq!(
1094 err,
1095 AttrsViewError::MissingKey {
1096 key: "failure_root_cause_actor"
1097 }
1098 );
1099 }
1100}