hyperactor/
introspect.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Introspection protocol for hyperactor actors.
10//!
11//! Every actor has a dedicated introspect task that handles
12//! [`IntrospectMessage`] by reading [`InstanceCell`] state directly,
13//! without going through the actor's message loop. This means:
14//!
15//! - Stuck actors can be introspected (the task runs independently).
16//! - Introspection does not perturb observed state (no Heisenberg).
17//! - Live status is reported accurately.
18//!
19//! Infrastructure actors publish domain-specific metadata via
20//! `publish_attrs()`, which the introspect task reads for
21//! Entity-view queries. Non-addressable children (e.g., system procs)
22//! are resolved via a callback registered on [`InstanceCell`].
23//!
24//! Callers navigate topology by fetching an [`IntrospectResult`]
25//! and following its `children` references.
26//!
27//! # Design Invariants
28//!
29//! The introspection subsystem maintains eleven invariants (S1--S11).
30//! Each is documented at the code site that enforces it.
31//!
32//! - **S1.** Introspection must not depend on actor responsiveness --
33//!   a wedged actor can still be introspected (runtime task, not
34//!   actor loop).
35//! - **S2.** Introspection must not perturb observed state -- reading
36//!   `InstanceCell` never sets `last_message_handler` to
37//!   `IntrospectMessage`.
38//! - **S3.** Sender routing is unchanged -- senders target the same
39//!   `PortId` (`IntrospectMessage::port()`) across processes.
40//! - **S4.** `IntrospectMessage` never produces a `WorkCell` --
41//!   pre-registration via `open_message_port` gives the introspect
42//!   port its own channel, independent of the actor's work queue.
43//! - **S5.** Replies never use `PanickingMailboxSender` -- the
44//!   introspect task replies via `Mailbox::serialize_and_send_once`.
45//! - **S6.** View semantics are stable -- Actor view uses live
46//!   structural state + supervision children; Entity view uses
47//!   published properties + domain children.
48//! - **S7.** `QueryChild` must work without actor handlers -- system
49//!   procs are resolved via a per-actor callback on `InstanceCell`.
50//! - **S8.** Published properties are constrained -- actors cannot
51//!   publish `Root` or `Error` payloads (only `Host` and `Proc`
52//!   variants).
53//! - **S9.** Port binding is single source of truth -- the introspect
54//!   port is bound exactly once via `bind_actor_port()` in
55//!   `Instance::new()`.
56//! - **S10.** Introspect receiver lifecycle -- created in
57//!   `Instance::new()`, spawned in `start()`, dropped in
58//!   `child_instance()`.
59//! - **S11.** Terminated snapshots do not keep actors resolvable --
60//!   `store_terminated_snapshot` writes to the proc's snapshot map,
61//!   not the instances map. `resolve_actor_ref` checks terminal
62//!   status independently and is unaffected by snapshot storage.
63//! - **S12.** Introspection must not impair actor liveness --
64//!   introspection queries (including DashMap reads for actor
65//!   enumeration) must not cause convoy starvation or scheduling
66//!   delays that stall concurrent actor spawn/stop operations.
67//!
68//! ## Introspection key invariants (IK-*)
69//!
70//! - **IK-1 (metadata completeness):** Every actor-runtime
71//!   introspection key must carry `@meta(INTROSPECT = ...)` with
72//!   non-empty `name` and `desc`.
73//! - **IK-2 (short-name uniqueness):** No two introspection keys
74//!   may share the same `IntrospectAttr.name`. Duplicates would break
75//!   the FQ-to-short HTTP remap and schema output.
76//!
77//! ## Failure introspection invariants (FI-*)
78//!
79//! The FailureInfo presentation type lives in
80//! `hyperactor_mesh::introspect`; these invariants are documented
81//! here because the enforcement sites are in hyperactor
82//! (`proc.rs` `serve()`, `live_actor_payload`).
83//!
84//! - **FI-1 (event-before-status):** All `InstanceCell` state that
85//!   `live_actor_payload` reads must be written BEFORE
86//!   `change_status()` transitions to terminal.
87//! - **FI-2 (write-once):** `InstanceCellState::supervision_event`
88//!   is written at most once per actor lifetime.
89//! - **FI-3 (failure attrs <-> status):** Failure attrs are present
90//!   iff status is `"failed"`.
91//! - **FI-4 (is_propagated <-> root_cause_actor):**
92//!   `failure_is_propagated == true` iff
93//!   `failure_root_cause_actor != this_actor_id`.
94//! - **FI-5 (is_poisoned <-> failed_actor_count):**
95//!   `is_poisoned == true` iff `failed_actor_count > 0`.
96//! - **FI-6 (clean stop = no artifacts):** When an actor stops
97//!   cleanly, `supervision_event` is `None`, failure attrs are
98//!   absent, and the actor does not contribute to
99//!   `failed_actor_count`.
100//!
101//! ## Attrs view invariants (AV-*)
102//!
103//! These govern the typed view layer (`ActorAttrsView`). The
104//! full AV-* / DP-* family is documented in
105//! `hyperactor_mesh::introspect`; the subset relevant to this
106//! crate:
107//!
108//! - **AV-1 (view-roundtrip):** For each view V,
109//!   `V::from_attrs(&v.to_attrs()) == Ok(v)`.
110//! - **AV-2 (required-key-strictness):** `from_attrs` fails iff
111//!   required keys for that view are missing.
112//! - **AV-3 (unknown-key-tolerance):** Unknown attrs keys must
113//!   not affect successful decode outcome.
114
115use std::fmt;
116use std::time::SystemTime;
117
118use hyperactor_config::Attrs;
119use hyperactor_config::INTROSPECT;
120use hyperactor_config::IntrospectAttr;
121use hyperactor_config::declare_attrs;
122use serde::Deserialize;
123use serde::Serialize;
124use typeuri::Named;
125
126use crate::InstanceCell;
127use crate::reference;
128
129// Introspection attr keys — actor-runtime concepts.
130//
131// These keys are populated by the introspect handler from
132// InstanceCell data. Mesh-topology keys (node_type, addr, num_procs,
133// etc.) are declared in hyperactor_mesh::introspect.
134//
135// Naming convention:
136//
137// - Attr names are node-type-agnostic. The `node_type` attr (from the
138//   mesh layer) identifies what kind of node it is; individual attr
139//   names don't repeat that. So `status`, not `actor_status`.
140// - Related attrs share a prefix to form a group. The `failure_*`
141//   keys decompose failure info into flat attrs — the `failure_`
142//   prefix groups them semantically.
143// - `actor_type` is an exception: the `actor_` prefix disambiguates
144//   it from `node_type` (mesh-layer concept). `actor_type` is the
145//   Rust actor type name; `node_type` is the topology role.
146// - Use real types where possible (e.g. SystemTime for timestamps),
147//   not String. Serialization format is a presentation concern.
148// - Internal key names are fully-qualified by `declare_attrs!`
149//   (module_path + attr constant), e.g.
150//   `hyperactor::introspect::status`.
151// - HTTP/schema public key names come from `@meta(INTROSPECT =
152//   IntrospectAttr { name, desc })`. Keep `name` explicit so API
153//   stability is decoupled from internal refactors.
154//
155// See IK-1 (metadata completeness) and IK-2 (short-name uniqueness)
156// in module doc.
157declare_attrs! {
158    /// Actor lifecycle status: "running", "stopped", "failed".
159    ///
160    /// Together with `STATUS_REASON`, these two attrs replace the
161    /// former `actor_status` prefix protocol (`"stopped:reason"`,
162    /// `"failed:reason"`) with structured fields, eliminating string
163    /// prefix parsing in consumers.
164    @meta(INTROSPECT = IntrospectAttr {
165        name: "status".into(),
166        desc: "Actor lifecycle status: running, stopped, failed".into(),
167    })
168    pub attr STATUS: String;
169
170    /// Reason for stop/failure (absent when running).
171    @meta(INTROSPECT = IntrospectAttr {
172        name: "status_reason".into(),
173        desc: "Reason for stop/failure (absent when running)".into(),
174    })
175    pub attr STATUS_REASON: String;
176
177    /// Fully-qualified actor type name.
178    @meta(INTROSPECT = IntrospectAttr {
179        name: "actor_type".into(),
180        desc: "Fully-qualified actor type name".into(),
181    })
182    pub attr ACTOR_TYPE: String;
183
184    /// Number of messages processed by this actor.
185    @meta(INTROSPECT = IntrospectAttr {
186        name: "messages_processed".into(),
187        desc: "Number of messages processed by this actor".into(),
188    })
189    pub attr MESSAGES_PROCESSED: u64 = 0;
190
191    /// Timestamp when this actor was created.
192    @meta(INTROSPECT = IntrospectAttr {
193        name: "created_at".into(),
194        desc: "Timestamp when this actor was created".into(),
195    })
196    pub attr CREATED_AT: SystemTime;
197
198    /// Name of the last message handler invoked.
199    @meta(INTROSPECT = IntrospectAttr {
200        name: "last_handler".into(),
201        desc: "Name of the last message handler invoked".into(),
202    })
203    pub attr LAST_HANDLER: String;
204
205    /// Total CPU time in message handlers (microseconds).
206    @meta(INTROSPECT = IntrospectAttr {
207        name: "total_processing_time_us".into(),
208        desc: "Total CPU time in message handlers (microseconds)".into(),
209    })
210    pub attr TOTAL_PROCESSING_TIME_US: u64 = 0;
211
212    /// Flight recorder JSON (recent trace events).
213    @meta(INTROSPECT = IntrospectAttr {
214        name: "flight_recorder".into(),
215        desc: "Flight recorder JSON (recent trace events)".into(),
216    })
217    pub attr FLIGHT_RECORDER: String;
218
219    /// Whether this actor is infrastructure/system.
220    @meta(INTROSPECT = IntrospectAttr {
221        name: "is_system".into(),
222        desc: "Whether this actor is infrastructure/system".into(),
223    })
224    pub attr IS_SYSTEM: bool = false;
225
226    /// Child reference strings for tree navigation. Published by
227    /// infrastructure actors (HostMeshAgent, ProcAgent) so the
228    /// Entity view can return children without parsing mesh-layer keys.
229    @meta(INTROSPECT = IntrospectAttr {
230        name: "children".into(),
231        desc: "Child reference strings for tree navigation".into(),
232    })
233    pub attr CHILDREN: Vec<String>;
234
235    /// Machine-readable error code for error nodes.
236    @meta(INTROSPECT = IntrospectAttr {
237        name: "error_code".into(),
238        desc: "Machine-readable error code (e.g. not_found)".into(),
239    })
240    pub attr ERROR_CODE: String;
241
242    /// Human-readable error message for error nodes.
243    @meta(INTROSPECT = IntrospectAttr {
244        name: "error_message".into(),
245        desc: "Human-readable error message".into(),
246    })
247    pub attr ERROR_MESSAGE: String;
248
249    // Failure attrs — decomposition of FailureInfo into flat attrs.
250    //
251    // - **FI-A1 (presence):** failure_* attrs are present iff
252    //   status == "failed"; absent otherwise. (Attr-level restatement
253    //   of FI-3.)
254    // - **FI-A2 (propagation):** failure_is_propagated == true iff
255    //   failure_root_cause_actor != this actor's id. (Attr-level
256    //   restatement of FI-4.)
257    // FI-1, FI-2 (write ordering) are enforced in proc.rs serve()
258    // and are unaffected by the representation change.
259    // FI-5, FI-6 are proc/mesh-level and unaffected.
260
261    /// Failure error message.
262    @meta(INTROSPECT = IntrospectAttr {
263        name: "failure_error_message".into(),
264        desc: "Failure error message".into(),
265    })
266    pub attr FAILURE_ERROR_MESSAGE: String;
267
268    /// Actor that caused the failure (root cause).
269    @meta(INTROSPECT = IntrospectAttr {
270        name: "failure_root_cause_actor".into(),
271        desc: "Actor that caused the failure (root cause)".into(),
272    })
273    pub attr FAILURE_ROOT_CAUSE_ACTOR: String;
274
275    /// Name of root cause actor.
276    @meta(INTROSPECT = IntrospectAttr {
277        name: "failure_root_cause_name".into(),
278        desc: "Name of root cause actor".into(),
279    })
280    pub attr FAILURE_ROOT_CAUSE_NAME: String;
281
282    /// Timestamp when failure occurred.
283    @meta(INTROSPECT = IntrospectAttr {
284        name: "failure_occurred_at".into(),
285        desc: "Timestamp when failure occurred".into(),
286    })
287    pub attr FAILURE_OCCURRED_AT: SystemTime;
288
289    /// Whether the failure was propagated from a child.
290    @meta(INTROSPECT = IntrospectAttr {
291        name: "failure_is_propagated".into(),
292        desc: "Whether the failure was propagated from a child".into(),
293    })
294    pub attr FAILURE_IS_PROPAGATED: bool = false;
295}
296
297// See FI-1 through FI-6 in module doc.
298
299/// Error from decoding an `Attrs` bag into a typed view.
300#[derive(Debug, Clone, PartialEq)]
301pub enum AttrsViewError {
302    /// A required key was absent (and has no default).
303    MissingKey {
304        /// The attr key that was absent.
305        key: &'static str,
306    },
307    /// A cross-field coherence check failed.
308    InvariantViolation {
309        /// Invariant label (e.g. "IA-4").
310        label: &'static str,
311        /// Human-readable description of the violation.
312        detail: String,
313    },
314}
315
316impl fmt::Display for AttrsViewError {
317    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
318        match self {
319            Self::MissingKey { key } => write!(f, "missing required key: {key}"),
320            Self::InvariantViolation { label, detail } => {
321                write!(f, "invariant {label} violated: {detail}")
322            }
323        }
324    }
325}
326
327impl std::error::Error for AttrsViewError {}
328
329impl AttrsViewError {
330    /// Convenience constructor for a missing required key.
331    pub fn missing(key: &'static str) -> Self {
332        Self::MissingKey { key }
333    }
334
335    /// Convenience constructor for an invariant violation.
336    pub fn invariant(label: &'static str, detail: String) -> Self {
337        Self::InvariantViolation { label, detail }
338    }
339}
340
341/// Structured failure fields decoded from `FAILURE_*` attrs.
342#[derive(Debug, Clone, PartialEq)]
343pub struct FailureAttrs {
344    /// Error message describing the failure.
345    pub error_message: String,
346    /// Actor ID of the root-cause actor.
347    pub root_cause_actor: String,
348    /// Display name of the root-cause actor, if available.
349    pub root_cause_name: Option<String>,
350    /// When the failure occurred.
351    pub occurred_at: SystemTime,
352    /// Whether this failure was propagated from a child.
353    pub is_propagated: bool,
354}
355
356/// Typed view over attrs for an actor node.
357#[derive(Debug, Clone, PartialEq)]
358pub struct ActorAttrsView {
359    /// Lifecycle status: "running", "stopped", "failed".
360    pub status: String,
361    /// Reason for stop/failure, if any.
362    pub status_reason: Option<String>,
363    /// Fully-qualified actor type name.
364    pub actor_type: String,
365    /// Number of messages processed.
366    pub messages_processed: u64,
367    /// When this actor was created.
368    pub created_at: Option<SystemTime>,
369    /// Name of the last message handler invoked.
370    pub last_handler: Option<String>,
371    /// Total CPU time in message handlers (microseconds).
372    pub total_processing_time_us: u64,
373    /// Flight recorder JSON, if available.
374    pub flight_recorder: Option<String>,
375    /// Whether this is a system/infrastructure actor.
376    pub is_system: bool,
377    /// Failure details, present iff status == "failed".
378    pub failure: Option<FailureAttrs>,
379}
380
381impl ActorAttrsView {
382    /// Decode from an `Attrs` bag (AV-2, AV-3). Requires `STATUS`
383    /// and `ACTOR_TYPE`. Enforces IA-3 (status_reason must not be
384    /// present for non-terminal status), IA-4 (failure attrs iff
385    /// failed), and failure completeness (if any required failure
386    /// key is present, all three required keys must be).
387    pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
388        let status = attrs
389            .get(STATUS)
390            .ok_or_else(|| AttrsViewError::missing("status"))?
391            .clone();
392        let status_reason = attrs.get(STATUS_REASON).cloned();
393        let actor_type = attrs
394            .get(ACTOR_TYPE)
395            .ok_or_else(|| AttrsViewError::missing("actor_type"))?
396            .clone();
397        let messages_processed = *attrs.get(MESSAGES_PROCESSED).unwrap_or(&0);
398        let created_at = attrs.get(CREATED_AT).copied();
399        let last_handler = attrs.get(LAST_HANDLER).cloned();
400        let total_processing_time_us = *attrs.get(TOTAL_PROCESSING_TIME_US).unwrap_or(&0);
401        let flight_recorder = attrs.get(FLIGHT_RECORDER).cloned();
402        let is_system = *attrs.get(IS_SYSTEM).unwrap_or(&false);
403
404        // IA-3 (one-sided): status_reason must not be present for
405        // non-terminal status. The converse is not enforced —
406        // terminal status without a reason is valid (clean shutdown).
407        let is_terminal = status == "stopped" || status == "failed";
408        if status_reason.is_some() && !is_terminal {
409            return Err(AttrsViewError::invariant(
410                "IA-3",
411                format!(
412                    "status_reason present but status is '{status}' (expected stopped or failed)"
413                ),
414            ));
415        }
416
417        // Decode failure attrs. If any of the three required
418        // failure keys is present, require all three.
419        // FAILURE_IS_PROPAGATED has a declare_attrs! default of
420        // false, so it always resolves via attrs.get() and needs
421        // no explicit presence check. FAILURE_ROOT_CAUSE_NAME is
422        // genuinely optional.
423        let has_any_failure = attrs.get(FAILURE_ERROR_MESSAGE).is_some()
424            || attrs.get(FAILURE_ROOT_CAUSE_ACTOR).is_some()
425            || attrs.get(FAILURE_OCCURRED_AT).is_some();
426
427        let failure = if has_any_failure {
428            let error_message = attrs
429                .get(FAILURE_ERROR_MESSAGE)
430                .ok_or_else(|| AttrsViewError::missing("failure_error_message"))?
431                .clone();
432            let root_cause_actor = attrs
433                .get(FAILURE_ROOT_CAUSE_ACTOR)
434                .ok_or_else(|| AttrsViewError::missing("failure_root_cause_actor"))?
435                .clone();
436            let root_cause_name = attrs.get(FAILURE_ROOT_CAUSE_NAME).cloned();
437            let occurred_at = *attrs
438                .get(FAILURE_OCCURRED_AT)
439                .ok_or_else(|| AttrsViewError::missing("failure_occurred_at"))?;
440            // Default false: failure originated at this actor.
441            let is_propagated = *attrs.get(FAILURE_IS_PROPAGATED).unwrap_or(&false);
442            Some(FailureAttrs {
443                error_message,
444                root_cause_actor,
445                root_cause_name,
446                occurred_at,
447                is_propagated,
448            })
449        } else {
450            None
451        };
452
453        // IA-4: failure attrs present iff status == "failed".
454        if status == "failed" && failure.is_none() {
455            return Err(AttrsViewError::invariant(
456                "IA-4",
457                "status is 'failed' but no failure_* attrs present".to_string(),
458            ));
459        }
460        if status != "failed" && failure.is_some() {
461            return Err(AttrsViewError::invariant(
462                "IA-4",
463                format!("status is '{status}' but failure_* attrs are present"),
464            ));
465        }
466
467        Ok(Self {
468            status,
469            status_reason,
470            actor_type,
471            messages_processed,
472            created_at,
473            last_handler,
474            total_processing_time_us,
475            flight_recorder,
476            is_system,
477            failure,
478        })
479    }
480
481    /// Encode into an `Attrs` bag (AV-1 round-trip producer).
482    pub fn to_attrs(&self) -> Attrs {
483        let mut attrs = Attrs::new();
484        attrs.set(STATUS, self.status.clone());
485        if let Some(reason) = &self.status_reason {
486            attrs.set(STATUS_REASON, reason.clone());
487        }
488        attrs.set(ACTOR_TYPE, self.actor_type.clone());
489        attrs.set(MESSAGES_PROCESSED, self.messages_processed);
490        if let Some(t) = self.created_at {
491            attrs.set(CREATED_AT, t);
492        }
493        if let Some(handler) = &self.last_handler {
494            attrs.set(LAST_HANDLER, handler.clone());
495        }
496        attrs.set(TOTAL_PROCESSING_TIME_US, self.total_processing_time_us);
497        if let Some(fr) = &self.flight_recorder {
498            attrs.set(FLIGHT_RECORDER, fr.clone());
499        }
500        attrs.set(IS_SYSTEM, self.is_system);
501        if let Some(fi) = &self.failure {
502            attrs.set(FAILURE_ERROR_MESSAGE, fi.error_message.clone());
503            attrs.set(FAILURE_ROOT_CAUSE_ACTOR, fi.root_cause_actor.clone());
504            if let Some(name) = &fi.root_cause_name {
505                attrs.set(FAILURE_ROOT_CAUSE_NAME, name.clone());
506            }
507            attrs.set(FAILURE_OCCURRED_AT, fi.occurred_at);
508            attrs.set(FAILURE_IS_PROPAGATED, fi.is_propagated);
509        }
510        attrs
511    }
512}
513
514/// Internal introspection result. Carries attrs as a JSON string.
515/// The mesh layer constructs the API-facing `NodePayload` (with
516/// `properties`) from this via `derive_properties`.
517///
518/// This is the internal wire type — it travels over actor ports
519/// via `IntrospectMessage`. The presentation-layer `NodePayload`
520/// (with `NodeProperties`) lives in `hyperactor_mesh::introspect`.
521#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
522pub struct IntrospectResult {
523    /// Canonical reference string for this node.
524    pub identity: String,
525    /// JSON-serialized `Attrs` bag containing introspection attributes.
526    pub attrs: String,
527    /// Reference strings the client can GET next to descend the tree.
528    pub children: Vec<String>,
529    /// Parent node reference for upward navigation.
530    pub parent: Option<String>,
531    /// ISO 8601 timestamp indicating when this data was captured.
532    pub as_of: String,
533}
534wirevalue::register_type!(IntrospectResult);
535
536/// Context for introspection query - what aspect of the actor to
537/// describe.
538///
539/// Infrastructure actors (e.g., ProcAgent, HostAgent)
540/// have dual nature: they manage entities (Proc, Host) while also
541/// being actors themselves. IntrospectView allows callers to
542/// specify which aspect to query.
543// TODO(monarch-introspection): IntrospectView currently uses
544// Entity/Actor naming. Consider renaming to runtime-neutral query
545// modes (e.g. Published/Runtime) to avoid mesh-domain wording in
546// hyperactor while preserving behavior and wire compatibility.
547#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Named)]
548pub enum IntrospectView {
549    /// Return managed-entity properties (Proc, Host, etc.) for
550    /// infrastructure actors.
551    Entity,
552    /// Return standard actor properties (status, messages_processed,
553    /// flight_recorder).
554    Actor,
555}
556wirevalue::register_type!(IntrospectView);
557
558/// Introspection query sent to any actor.
559///
560/// `Query` asks the actor to describe itself. `QueryChild` asks the
561/// actor to describe one of its non-addressable children — an entity
562/// that appears in the navigation tree but has no mailbox of its own
563/// (e.g. a system proc owned by a host). The parent actor answers on
564/// the child's behalf.
565#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
566pub enum IntrospectMessage {
567    /// "Describe yourself."
568    Query {
569        /// View context - Entity or Actor.
570        view: IntrospectView,
571        /// Reply port receiving the actor's self-description.
572        reply: reference::OncePortRef<IntrospectResult>,
573    },
574    /// "Describe one of your children."
575    QueryChild {
576        /// Reference identifying the child to describe.
577        child_ref: reference::Reference,
578        /// Reply port receiving the child's description.
579        reply: reference::OncePortRef<IntrospectResult>,
580    },
581}
582wirevalue::register_type!(IntrospectMessage);
583
584/// Structured tracing event from the actor-local flight recorder.
585///
586/// Deserialization target for the `FLIGHT_RECORDER` attrs JSON string.
587#[derive(Debug, Clone, Serialize, Deserialize)]
588pub struct RecordedEvent {
589    /// ISO 8601 timestamp of the event.
590    pub timestamp: String,
591    /// Monotonic sequence number for ordering.
592    #[serde(default)]
593    pub seq: usize,
594    /// Event level (INFO, DEBUG, etc.).
595    pub level: String,
596    /// Event target (module path).
597    #[serde(default)]
598    pub target: String,
599    /// Event name.
600    pub name: String,
601    /// Event fields as JSON.
602    pub fields: serde_json::Value,
603}
604
605/// Format a [`SystemTime`] as an ISO 8601 timestamp with millisecond
606/// precision.
607pub fn format_timestamp(time: SystemTime) -> String {
608    humantime::format_rfc3339_millis(time).to_string()
609}
610
611/// Build a JSON-serialized `Attrs` string from values already
612/// computed by `live_actor_payload`. Reuses the same data — no
613/// redundant reads from `InstanceCell`.
614///
615/// Populates actor-runtime keys (STATUS, ACTOR_TYPE, etc.),
616/// decomposes the status prefix protocol into STATUS + STATUS_REASON,
617/// and decomposes failure fields into individual FAILURE_* attrs.
618///
619/// Starts from a fresh `Attrs` bag — published attrs (node_type,
620/// addr, etc.) are NOT included. This ensures the Actor view
621/// produces actor-only data; the Entity view handles published
622/// attrs separately.
623/// Failure fields extracted from a supervision event.
624struct FailureSnapshot {
625    error_message: String,
626    root_cause_actor: String,
627    root_cause_name: Option<String>,
628    occurred_at: String,
629    is_propagated: bool,
630}
631
632/// Pre-computed actor state for building the attrs JSON string.
633/// Avoids redundant InstanceCell reads — `live_actor_payload`
634/// computes these once and passes them in.
635struct ActorSnapshot {
636    status_str: String,
637    is_system: bool,
638    last_handler: Option<String>,
639    flight_recorder: Option<String>,
640    failure: Option<FailureSnapshot>,
641}
642
643fn build_actor_attrs(cell: &crate::InstanceCell, snap: &ActorSnapshot) -> String {
644    // Actor view builds a clean attrs bag with only actor-runtime
645    // keys. Published attrs (node_type, addr, etc.) belong to the
646    // Entity view — they are NOT merged here. This ensures that
647    // e.g. a HostMeshAgent resolved via Actor view produces Actor
648    // properties, not Host properties.
649    let mut attrs = hyperactor_config::Attrs::new();
650
651    // IA-3: status_reason present iff status carries a reason.
652    if let Some(reason) = snap.status_str.strip_prefix("stopped:") {
653        attrs.set(STATUS, "stopped".to_string());
654        attrs.set(STATUS_REASON, reason.trim().to_string());
655    } else if let Some(reason) = snap.status_str.strip_prefix("failed:") {
656        attrs.set(STATUS, "failed".to_string());
657        attrs.set(STATUS_REASON, reason.trim().to_string());
658    } else {
659        attrs.set(STATUS, snap.status_str.clone());
660        // IA-3: no status_reason for non-terminal states —
661        // guaranteed by fresh Attrs bag.
662    }
663
664    attrs.set(ACTOR_TYPE, cell.actor_type_name().to_string());
665    attrs.set(MESSAGES_PROCESSED, cell.num_processed_messages());
666    attrs.set(CREATED_AT, cell.created_at());
667    attrs.set(TOTAL_PROCESSING_TIME_US, cell.total_processing_time_us());
668    attrs.set(IS_SYSTEM, snap.is_system);
669
670    if let Some(handler) = &snap.last_handler {
671        attrs.set(LAST_HANDLER, handler.clone());
672    }
673    if let Some(fr) = &snap.flight_recorder {
674        attrs.set(FLIGHT_RECORDER, fr.clone());
675    }
676
677    // IA-4 / FI-A1: failure attrs present iff status == "failed".
678    if let Some(fi) = &snap.failure {
679        attrs.set(FAILURE_ERROR_MESSAGE, fi.error_message.clone());
680        attrs.set(FAILURE_ROOT_CAUSE_ACTOR, fi.root_cause_actor.clone());
681        if let Some(name) = &fi.root_cause_name {
682            attrs.set(FAILURE_ROOT_CAUSE_NAME, name.clone());
683        }
684        if let Ok(t) = humantime::parse_rfc3339(&fi.occurred_at) {
685            attrs.set(FAILURE_OCCURRED_AT, t);
686        }
687        attrs.set(FAILURE_IS_PROPAGATED, fi.is_propagated);
688    }
689    // IA-4: failure attrs absent when not failed — guaranteed by
690    // starting from a fresh Attrs bag (no stale keys possible).
691
692    serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string())
693}
694
695/// Build an [`IntrospectResult`] from live [`InstanceCell`] state.
696///
697/// Reads the current live status and last handler directly from
698/// the cell. Used by the introspect task (which runs outside
699/// the actor's message loop) and by `Instance::introspect_payload`.
700pub fn live_actor_payload(cell: &InstanceCell) -> IntrospectResult {
701    let actor_id = cell.actor_id();
702    let status = cell.status().borrow().clone();
703    let last_handler = cell.last_message_handler();
704
705    let children: Vec<String> = cell
706        .child_actor_ids()
707        .into_iter()
708        .map(|id| id.to_string())
709        .collect();
710
711    let events = cell.recording().tail();
712    let flight_recorder_events: Vec<RecordedEvent> = events
713        .into_iter()
714        .map(|event| RecordedEvent {
715            timestamp: format_timestamp(event.time),
716            seq: event.seq,
717            level: event.metadata.level().to_string(),
718            target: event.metadata.target().to_string(),
719            name: event.metadata.name().to_string(),
720            fields: event.json_value(),
721        })
722        .collect();
723
724    let flight_recorder = if flight_recorder_events.is_empty() {
725        None
726    } else {
727        serde_json::to_string(&flight_recorder_events).ok()
728    };
729
730    let supervisor = cell.parent().map(|p| p.actor_id().to_string());
731
732    // FI-3: failure_info is computed from the same status value as
733    // actor_status, ensuring they agree on whether the actor failed.
734    let failure = if status.is_failed() {
735        cell.supervision_event().map(|event| {
736            let root = event.actually_failing_actor();
737            FailureSnapshot {
738                error_message: event.actor_status.to_string(),
739                root_cause_actor: root.actor_id.to_string(),
740                root_cause_name: root.display_name.clone(),
741                occurred_at: format_timestamp(event.occurred_at),
742                is_propagated: root.actor_id != *actor_id,
743            }
744        })
745    } else {
746        None
747    };
748
749    let snap = ActorSnapshot {
750        status_str: status.to_string(),
751        is_system: cell.is_system(),
752        last_handler: last_handler.map(|info| info.to_string()),
753        flight_recorder,
754        failure,
755    };
756
757    let attrs = build_actor_attrs(cell, &snap);
758
759    IntrospectResult {
760        identity: actor_id.to_string(),
761        attrs,
762        children,
763        parent: supervisor,
764        as_of: format_timestamp(std::time::SystemTime::now()),
765    }
766}
767
768/// Introspect task: runs on a dedicated tokio task per actor,
769/// handling [`IntrospectMessage`] by reading [`InstanceCell`]
770/// directly and replying via the actor's [`Mailbox`].
771///
772/// The actor's message loop never sees these messages.
773///
774/// # Invariants exercised
775///
776/// Exercises S1, S2, S4, S5, S6, S11 (see module doc).
777pub async fn serve_introspect(
778    cell: InstanceCell,
779    mailbox: crate::mailbox::Mailbox,
780    mut receiver: crate::mailbox::PortReceiver<IntrospectMessage>,
781) {
782    use crate::actor::ActorStatus;
783    use crate::mailbox::PortSender as _;
784
785    // Watch for terminal status so we can break the reference cycle:
786    // InstanceCellState → Ports → introspect sender → keeps receiver
787    // open → this task holds InstanceCell → InstanceCellState.
788    // Without this, a stopped actor's InstanceCellState is never
789    // dropped and the actor lingers in the proc's instances map.
790    let mut status = cell.status().clone();
791
792    loop {
793        let msg = tokio::select! {
794            msg = receiver.recv() => {
795                match msg {
796                    Ok(msg) => msg,
797                    Err(_) => {
798                        // Channel closed. If the actor reached a
799                        // terminal state, snapshot it before exiting
800                        // so it remains queryable post-mortem.
801                        if cell.status().borrow().is_terminal() {
802                            let snapshot = live_actor_payload(&cell);
803                            cell.store_terminated_snapshot(snapshot);
804                        }
805                        break;
806                    }
807                }
808            }
809            _ = status.wait_for(ActorStatus::is_terminal) => {
810                // Snapshot for post-mortem introspection before
811                // dropping our InstanceCell reference.
812                let snapshot = live_actor_payload(&cell);
813                cell.store_terminated_snapshot(snapshot);
814                break;
815            }
816        };
817
818        let result = match msg {
819            IntrospectMessage::Query { view, reply } => {
820                let payload = match view {
821                    IntrospectView::Entity => match cell.published_attrs() {
822                        Some(published) => {
823                            let attrs_json =
824                                serde_json::to_string(&published).unwrap_or_else(|_| "{}".into());
825                            let children: Vec<String> =
826                                published.get(CHILDREN).cloned().unwrap_or_default();
827                            IntrospectResult {
828                                identity: cell.actor_id().to_string(),
829                                attrs: attrs_json,
830                                children,
831                                parent: cell.parent().map(|p| p.actor_id().to_string()),
832                                as_of: format_timestamp(std::time::SystemTime::now()),
833                            }
834                        }
835                        None => live_actor_payload(&cell),
836                    },
837                    IntrospectView::Actor => live_actor_payload(&cell),
838                };
839                mailbox.serialize_and_send_once(
840                    reply,
841                    payload,
842                    crate::mailbox::monitored_return_handle(),
843                )
844            }
845            IntrospectMessage::QueryChild { child_ref, reply } => {
846                let payload = cell.query_child(&child_ref).unwrap_or_else(|| {
847                    let mut error_attrs = hyperactor_config::Attrs::new();
848                    error_attrs.set(ERROR_CODE, "not_found".to_string());
849                    error_attrs.set(
850                        ERROR_MESSAGE,
851                        format!("child {} not found (no callback registered)", child_ref),
852                    );
853                    IntrospectResult {
854                        identity: String::new(),
855                        attrs: serde_json::to_string(&error_attrs)
856                            .unwrap_or_else(|_| "{}".to_string()),
857                        children: Vec::new(),
858                        parent: None,
859                        as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now())
860                            .to_string(),
861                    }
862                });
863                mailbox.serialize_and_send_once(
864                    reply,
865                    payload,
866                    crate::mailbox::monitored_return_handle(),
867                )
868            }
869        };
870        if let Err(e) = result {
871            tracing::debug!("introspect reply failed: {e}");
872        }
873    }
874    tracing::debug!(
875        actor_id = %cell.actor_id(),
876        "introspect task exiting"
877    );
878}
879
880#[cfg(test)]
881mod tests {
882    use super::*;
883
884    /// Exercises IK-1 (see module doc).
885    #[test]
886    fn test_introspect_keys_are_tagged() {
887        let cases = vec![
888            ("status", STATUS.attrs()),
889            ("status_reason", STATUS_REASON.attrs()),
890            ("actor_type", ACTOR_TYPE.attrs()),
891            ("messages_processed", MESSAGES_PROCESSED.attrs()),
892            ("created_at", CREATED_AT.attrs()),
893            ("last_handler", LAST_HANDLER.attrs()),
894            ("total_processing_time_us", TOTAL_PROCESSING_TIME_US.attrs()),
895            ("flight_recorder", FLIGHT_RECORDER.attrs()),
896            ("is_system", IS_SYSTEM.attrs()),
897            ("children", CHILDREN.attrs()),
898            ("error_code", ERROR_CODE.attrs()),
899            ("error_message", ERROR_MESSAGE.attrs()),
900            ("failure_error_message", FAILURE_ERROR_MESSAGE.attrs()),
901            ("failure_root_cause_actor", FAILURE_ROOT_CAUSE_ACTOR.attrs()),
902            ("failure_root_cause_name", FAILURE_ROOT_CAUSE_NAME.attrs()),
903            ("failure_occurred_at", FAILURE_OCCURRED_AT.attrs()),
904            ("failure_is_propagated", FAILURE_IS_PROPAGATED.attrs()),
905        ];
906
907        for (expected_name, meta) in &cases {
908            // IK-1: see module doc.
909            let introspect = meta
910                .get(INTROSPECT)
911                .unwrap_or_else(|| panic!("{expected_name}: missing INTROSPECT meta-attr"));
912            assert_eq!(
913                introspect.name, *expected_name,
914                "short name mismatch for {expected_name}"
915            );
916            assert!(
917                !introspect.desc.is_empty(),
918                "{expected_name}: desc should not be empty"
919            );
920        }
921
922        // Exhaustiveness: verify cases covers all INTROSPECT-tagged
923        // keys declared in this module.
924        use hyperactor_config::attrs::AttrKeyInfo;
925        let registry_count = inventory::iter::<AttrKeyInfo>()
926            .filter(|info| {
927                info.name.starts_with("hyperactor::introspect::")
928                    && info.meta.get(INTROSPECT).is_some()
929            })
930            .count();
931        assert_eq!(
932            cases.len(),
933            registry_count,
934            "test must cover all INTROSPECT-tagged keys in this module"
935        );
936    }
937
938    /// Exercises IK-2 (see module doc).
939    #[test]
940    fn test_introspect_short_names_are_globally_unique() {
941        use hyperactor_config::attrs::AttrKeyInfo;
942
943        let mut seen = std::collections::HashMap::new();
944        for info in inventory::iter::<AttrKeyInfo>() {
945            let Some(introspect) = info.meta.get(INTROSPECT) else {
946                continue;
947            };
948            // Metadata quality: every tagged key must have
949            // non-empty name and desc.
950            assert!(
951                !introspect.name.is_empty(),
952                "INTROSPECT key {:?} has empty name",
953                info.name
954            );
955            assert!(
956                !introspect.desc.is_empty(),
957                "INTROSPECT key {:?} has empty desc",
958                info.name
959            );
960            if let Some(prev_fq) = seen.insert(introspect.name.clone(), info.name) {
961                panic!(
962                    "IK-2 violation: duplicate short name {:?} declared by both {:?} and {:?}",
963                    introspect.name, prev_fq, info.name
964                );
965            }
966        }
967    }
968
969    // IA-1 tests require spawning actors and live in actor.rs
970    // where #[hyperactor::export] and test infrastructure are
971    // available. IA-3 and IA-4 are tested below at the view level.
972
973    fn running_actor_attrs() -> Attrs {
974        let mut attrs = Attrs::new();
975        attrs.set(STATUS, "running".to_string());
976        attrs.set(ACTOR_TYPE, "MyActor".to_string());
977        attrs.set(MESSAGES_PROCESSED, 42u64);
978        attrs.set(CREATED_AT, SystemTime::UNIX_EPOCH);
979        attrs.set(IS_SYSTEM, false);
980        attrs
981    }
982
983    fn failed_actor_attrs() -> Attrs {
984        let mut attrs = running_actor_attrs();
985        attrs.set(STATUS, "failed".to_string());
986        attrs.set(STATUS_REASON, "something broke".to_string());
987        attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
988        attrs.set(FAILURE_ROOT_CAUSE_ACTOR, "other[0]".to_string());
989        attrs.set(FAILURE_ROOT_CAUSE_NAME, "OtherActor".to_string());
990        attrs.set(FAILURE_OCCURRED_AT, SystemTime::UNIX_EPOCH);
991        attrs.set(FAILURE_IS_PROPAGATED, true);
992        attrs
993    }
994
995    /// AV-1: from_attrs(to_attrs(v)) == v.
996    #[test]
997    fn test_actor_view_round_trip_running() {
998        let view = ActorAttrsView::from_attrs(&running_actor_attrs()).unwrap();
999        assert_eq!(view.status, "running");
1000        assert_eq!(view.actor_type, "MyActor");
1001        assert_eq!(view.messages_processed, 42);
1002        assert!(view.failure.is_none());
1003
1004        let round_tripped = ActorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1005        assert_eq!(round_tripped, view);
1006    }
1007
1008    /// AV-1.
1009    #[test]
1010    fn test_actor_view_round_trip_failed() {
1011        let view = ActorAttrsView::from_attrs(&failed_actor_attrs()).unwrap();
1012        assert_eq!(view.status, "failed");
1013        let fi = view.failure.as_ref().unwrap();
1014        assert_eq!(fi.error_message, "boom");
1015        assert!(fi.is_propagated);
1016
1017        let round_tripped = ActorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1018        assert_eq!(round_tripped, view);
1019    }
1020
1021    /// AV-2: missing required key rejected.
1022    #[test]
1023    fn test_actor_view_missing_status() {
1024        let mut attrs = Attrs::new();
1025        attrs.set(ACTOR_TYPE, "X".to_string());
1026        let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1027        assert_eq!(err, AttrsViewError::MissingKey { key: "status" });
1028    }
1029
1030    /// AV-2.
1031    #[test]
1032    fn test_actor_view_missing_actor_type() {
1033        let mut attrs = Attrs::new();
1034        attrs.set(STATUS, "running".to_string());
1035        let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1036        assert_eq!(err, AttrsViewError::MissingKey { key: "actor_type" });
1037    }
1038
1039    #[test]
1040    fn test_actor_view_ia3_rejects_reason_on_running() {
1041        let mut attrs = running_actor_attrs();
1042        attrs.set(STATUS_REASON, "should not be here".to_string());
1043        let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1044        assert!(matches!(
1045            err,
1046            AttrsViewError::InvariantViolation { label: "IA-3", .. }
1047        ));
1048    }
1049
1050    #[test]
1051    fn test_actor_view_ia3_allows_terminal_without_reason() {
1052        let mut attrs = running_actor_attrs();
1053        attrs.set(STATUS, "stopped".to_string());
1054        // No status_reason — should be fine.
1055        let view = ActorAttrsView::from_attrs(&attrs).unwrap();
1056        assert_eq!(view.status, "stopped");
1057        assert!(view.status_reason.is_none());
1058    }
1059
1060    #[test]
1061    fn test_actor_view_ia4_rejects_failed_without_failure_attrs() {
1062        let mut attrs = running_actor_attrs();
1063        attrs.set(STATUS, "failed".to_string());
1064        // No failure_* keys.
1065        let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1066        assert!(matches!(
1067            err,
1068            AttrsViewError::InvariantViolation { label: "IA-4", .. }
1069        ));
1070    }
1071
1072    #[test]
1073    fn test_actor_view_ia4_rejects_failure_attrs_on_running() {
1074        let mut attrs = running_actor_attrs();
1075        attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
1076        attrs.set(FAILURE_ROOT_CAUSE_ACTOR, "x[0]".to_string());
1077        attrs.set(FAILURE_OCCURRED_AT, SystemTime::UNIX_EPOCH);
1078        let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1079        assert!(matches!(
1080            err,
1081            AttrsViewError::InvariantViolation { label: "IA-4", .. }
1082        ));
1083    }
1084
1085    /// AV-2: partial failure set → missing key.
1086    #[test]
1087    fn test_actor_view_partial_failure_attrs_rejected() {
1088        let mut attrs = running_actor_attrs();
1089        attrs.set(STATUS, "failed".to_string());
1090        // Only one of the three required failure keys.
1091        attrs.set(FAILURE_ERROR_MESSAGE, "boom".to_string());
1092        let err = ActorAttrsView::from_attrs(&attrs).unwrap_err();
1093        assert_eq!(
1094            err,
1095            AttrsViewError::MissingKey {
1096                key: "failure_root_cause_actor"
1097            }
1098        );
1099    }
1100}