hyperactor_mesh/
introspect.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mesh-topology introspection types and attrs.
10//!
11//! This module owns the typed internal model used by mesh-admin and the
12//! TUI: mesh-topology attr keys, typed attrs views, `NodeRef`, and the
13//! domain `NodePayload` / `NodeProperties` / `FailureInfo` values derived
14//! from `hyperactor::introspect::IntrospectResult`.
15//!
16//! These keys are published by `HostMeshAgent`, `ProcAgent`, and
17//! `MeshAdminAgent` to describe mesh topology (hosts, procs, root).
18//! Actor-runtime keys (status, actor_type, messages_processed, etc.) are
19//! declared in `hyperactor::introspect`.
20//!
21//! The HTTP wire representations live in [`dto`]. That submodule owns the
22//! curl-friendly JSON contract, schema/OpenAPI generation, and boundary
23//! invariants for string-encoded references and timestamps. This module
24//! keeps the internal typed invariants.
25//!
26//! See `hyperactor::introspect` for naming convention, invariant
27//! labels, and the `IntrospectAttr` meta-attribute pattern.
28//!
29//! ## Mesh key invariants (MK-*)
30//!
31//! - **MK-1 (metadata completeness):** Every mesh-topology
32//!   introspection key must carry `@meta(INTROSPECT = ...)` with
33//!   non-empty `name` and `desc`.
34//! - **MK-2 (short-name uniqueness):** Covered by
35//!   `test_introspect_short_names_are_globally_unique` in
36//!   `hyperactor::introspect` (cross-crate).
37//!
38//! ## HTTP boundary invariants (HB-*)
39//!
40//! These govern the HTTP DTO layer in [`dto`].
41//!
42//! - **HB-1 (typed-internal, string-external):** `NodeRef`, `ActorId`,
43//!   `ProcId`, and `SystemTime` are typed Rust values internally. At the
44//!   HTTP JSON boundary, [`dto::NodePayloadDto`],
45//!   [`dto::NodePropertiesDto`], and [`dto::FailureInfoDto`] encode them
46//!   as canonical strings.
47//! - **HB-2 (round-trip):** The HTTP string forms round-trip through the
48//!   internal typed parsers (`NodeRef::from_str`, `ActorId::from_str`,
49//!   `humantime::parse_rfc3339`). Timestamps are formatted at
50//!   millisecond precision; sub-millisecond values are truncated at
51//!   the boundary.
52//! - **HB-3 (schema-honesty):** Schema/OpenAPI are generated from the DTO
53//!   types, so the published schema reflects the actual wire format rather
54//!   than the internal domain representation.
55//!
56//! ## Attrs invariants (IA-*)
57//!
58//! These govern how `IntrospectResult.attrs` is built in
59//! `hyperactor::introspect` and how `properties` is derived via
60//! `derive_properties`.
61//!
62//! - **IA-1 (attrs-json):** `IntrospectResult.attrs` is always a
63//!   valid JSON object string.
64//! - **IA-2 (runtime-precedence):** Runtime-owned introspection keys
65//!   override any same-named keys in published attrs.
66//! - **IA-3 (status-shape):** `status_reason` is present in attrs
67//!   iff the status string carries a reason.
68//! - **IA-4 (failure-shape):** `failure_*` attrs are present iff
69//!   effective status is `failed`.
70//! - **IA-5 (payload-totality):** Every `IntrospectResult` sets
71//!   `attrs` -- never omitted, never null.
72//! - **IA-6 (open-row-forward-compat):** View decoders ignore
73//!   unknown attrs keys; only required known keys and local
74//!   invariants affect decoding outcome. Concretized by AV-3.
75//!
76//! ## Attrs view invariants (AV-*)
77//!
78//! These govern the typed view layer (`*AttrsView` structs).
79//!
80//! - **AV-1 (view-roundtrip):** For each view V,
81//!   `V::from_attrs(&v.to_attrs()) == Ok(v)` (modulo documented
82//!   normalization/defaulting).
83//! - **AV-2 (required-key-strictness):** `from_attrs` fails iff
84//!   required keys for that view are missing.
85//! - **AV-3 (unknown-key-tolerance):** Unknown attrs keys must
86//!   not affect successful decode outcome. Concretization of
87//!   IA-6.
88//!
89//! ## Derive invariants (DP-*)
90//!
91//! - **DP-1 (derive-precedence):** `derive_properties` dispatches
92//!   on `node_type` first, then falls back to `error_code`,
93//!   then `status`, then unknown. This order is the canonical
94//!   detection chain.
95//! - **DP-2 (derive-totality-on-parse-failure):**
96//!   `derive_properties` is total; malformed or incoherent attrs
97//!   never panic and map to `NodeProperties::Error` with detail.
98//! - **DP-3 (derive-precedence-stability):**
99//!   `derive_properties` detection order is stable and explicit:
100//!   `node_type` > `error_code` > `status` > unknown.
101//! - **DP-4 (error-on-decode-failure):** Any view decode or
102//!   invariant failure maps to a deterministic
103//!   `NodeProperties::Error` with a `malformed_*` code family,
104//!   without panic.
105//!
106//! ## py-spy integration (PS-*)
107//!
108//! - **PS-1 (target locality):** `PySpyDump` always targets
109//!   `std::process::id()` of the handling ProcAgent process. No
110//!   caller-supplied PID exists in the API.
111//! - **PS-2 (deterministic failure shape):** Execution failures are
112//!   classified into `BinaryNotFound { searched }` vs
113//!   `Failed { pid, binary, exit_code, stderr }`, never collapsed.
114//! - **PS-3 (binary resolution order):** Resolution order is exactly:
115//!   `PYSPY_BIN` config attr (if non-empty) then `"py-spy"` on PATH.
116//!   The attr is read via `hyperactor_config::global::get_cloned`;
117//!   env var `PYSPY_BIN` feeds in through the config layer.
118//!   If the first attempt is not found, the fallback attempt is
119//!   required.
120//! - **PS-4 (structured JSON output):** py-spy runs with `--json`;
121//!   output is parsed into `Vec<PySpyStackTrace>`. Parse failure
122//!   maps to `PySpyResult::Failed`.
123//! - **PS-5 (subprocess timeout):** `try_exec` bounds the py-spy
124//!   subprocess inside the worker to `MESH_ADMIN_PYSPY_TIMEOUT`
125//!   (default 10s). The budget is sized for `--native --native-all`
126//!   which unwinds native stacks via libunwind — significantly
127//!   slower than Python-only capture on loaded hosts. On expiry the
128//!   child is killed and reaped, and the worker returns
129//!   `Failed { stderr: "…timed out…" }`.
130//! - **PS-6 (bridge timeout):** The HTTP bridge uses a separate
131//!   `MESH_ADMIN_PYSPY_BRIDGE_TIMEOUT` (default 13s), which must
132//!   exceed `MESH_ADMIN_PYSPY_TIMEOUT` so the subprocess kill/reap
133//!   and reply can arrive before the bridge declares
134//!   `gateway_timeout`. Independent of
135//!   `MESH_ADMIN_SINGLE_HOST_TIMEOUT`.
136//! - **PS-7 (non-blocking delegation):** ProcAgent never awaits
137//!   py-spy execution inline. On `PySpyDump` it spawns a child
138//!   `PySpyWorker`, forwards the request, and returns immediately.
139//! - **PS-8 (worker lifecycle):** Each `PySpyWorker` handles
140//!   exactly one forwarded `RunPySpyDump`, replies directly to the
141//!   forwarded `OncePortRef`, then self-terminates via
142//!   `cx.stop()`. Clean exit, no supervision event.
143//! - **PS-9 (concurrent dumps):** py-spy is spawn-per-request, so
144//!   overlapping dumps on the same proc are allowed. Each worker
145//!   runs independently.
146//! - **PS-10 (nonblocking retry):** In nonblocking mode, `try_exec`
147//!   retries up to 3 times with 100ms backoff on failure, because
148//!   py-spy can segfault reading mutating process memory. All
149//!   attempts share a single deadline bounded by
150//!   `MESH_ADMIN_PYSPY_TIMEOUT` (PS-5).
151//! - **PS-11a (native-all-immediate-downgrade):** If py-spy rejects
152//!   `--native-all` with the recognized unsupported-flag signature
153//!   (exit code 2, stderr mentions `--native-all`), `try_exec`
154//!   retries immediately with `native_all = false` in the same outer
155//!   attempt.
156//! - **PS-11b (native-all-no-retry-consumption):** That downgrade
157//!   retry does not consume an outer nonblocking retry slot (PS-10)
158//!   and does not incur the 100ms inter-attempt backoff.
159//! - **PS-11c (native-all-downgrade-warning):** A successful
160//!   downgraded result includes the warning `"--native-all
161//!   unsupported by this py-spy; fell back to --native"`.
162//! - **PS-11d (native-all-failure-passthrough):** If the downgraded
163//!   retry also fails, the failure flows through the normal
164//!   nonblocking retry logic (PS-10) unchanged.
165//! - **PS-11e (native-all-sticky-downgrade):** Once the
166//!   unsupported-flag signature is detected,
167//!   `effective_opts.native_all` remains `false` for all subsequent
168//!   outer retries. The flag is not re-tested on later attempts.
169//! - **PS-12 (universal py-spy):** Worker procs and the service
170//!   proc can handle `PySpyDump`. Worker procs handle it via
171//!   ProcAgent; the service proc handles it via HostAgent (same
172//!   spawn-worker pattern). `pyspy_bridge` routes by proc name:
173//!   if `proc_id.base_name() == SERVICE_PROC_NAME`, the target
174//!   is `host_agent`; otherwise `proc_agent[0]`. Procs lacking
175//!   either agent (e.g. mesh-admin) fast-fail via PS-13.
176//! - **PS-13 (defensive probe):** Before sending `PySpyDump`,
177//!   `pyspy_bridge` probes the selected actor with an introspect
178//!   query bounded by `MESH_ADMIN_QUERY_CHILD_TIMEOUT` (default
179//!   100ms). Three outcomes: (a) probe reply arrives — proceed
180//!   with `PySpyDump`; (b) probe times out or recv closes —
181//!   return `not_found` (actor absent/unreachable); (c) probe
182//!   send itself fails — return `internal_error` (bridge-side
183//!   infrastructure failure). Cases (b) and (c) fast-fail
184//!   instead of waiting the full 13s
185//!   `MESH_ADMIN_PYSPY_BRIDGE_TIMEOUT`.
186//! - **PS-14 (reachability-based capability):** A proc supports
187//!   py-spy iff its stable handler actor is reachable: the
188//!   service proc requires a reachable `host_agent`; non-service
189//!   procs require a reachable `proc_agent[0]`. `PySpyWorker` is
190//!   transient per-request machinery (spawned on `PySpyDump`,
191//!   stopped after replying) and is not part of the reachability
192//!   contract.
193//!
194//! v1 contract notes:
195//! - The current py-spy bridge expects a ProcId-form reference and
196//!   rejects other forms as `bad_request`. This may be broadened in
197//!   future versions.
198//! - If `worker.send()` fails after the reply port has moved into
199//!   `RunPySpyDump`, the caller receives no explicit
200//!   `PySpyResult::Failed` — they observe a timeout.
201//!   `MailboxSenderError` does not carry the unsent message, so the
202//!   port is irrecoverable on this path.
203//! - **Contract change (D96756537 follow-up):** `PySpyResult::Ok`
204//!   replaced `stack: String` (raw py-spy text) with
205//!   `stack_traces: Vec<PySpyStackTrace>` (structured JSON) and
206//!   added `warnings: Vec<String>`. Clients reading the old `stack`
207//!   field will see it absent; they must migrate to `stack_traces`.
208//!
209//! ## Mesh-admin config (MA-*)
210//!
211//! - **MA-C1 (timeout config centralization):** Mesh-admin timeout
212//!   budgets are read from config attrs at call-time, with defaults
213//!   in `config.rs`. No hardcoded timeout constants in
214//!   `mesh_admin.rs`.
215
216pub mod dto;
217
218use hyperactor_config::Attrs;
219use hyperactor_config::INTROSPECT;
220use hyperactor_config::IntrospectAttr;
221use hyperactor_config::declare_attrs;
222
223// See MK-1, MK-2, IA-1..IA-5 in module doc.
224declare_attrs! {
225    /// Topology role of this node: "root", "host", "proc", "error".
226    @meta(INTROSPECT = IntrospectAttr {
227        name: "node_type".into(),
228        desc: "Topology role: root, host, proc, error".into(),
229    })
230    pub attr NODE_TYPE: String;
231
232    /// Host network address (e.g. "10.0.0.1:8080").
233    @meta(INTROSPECT = IntrospectAttr {
234        name: "addr".into(),
235        desc: "Host network address".into(),
236    })
237    pub attr ADDR: String;
238
239    /// Number of procs on a host.
240    @meta(INTROSPECT = IntrospectAttr {
241        name: "num_procs".into(),
242        desc: "Number of procs on a host".into(),
243    })
244    pub attr NUM_PROCS: usize = 0;
245
246    /// Human-readable proc name.
247    @meta(INTROSPECT = IntrospectAttr {
248        name: "proc_name".into(),
249        desc: "Human-readable proc name".into(),
250    })
251    pub attr PROC_NAME: String;
252
253    /// Number of actors in a proc.
254    @meta(INTROSPECT = IntrospectAttr {
255        name: "num_actors".into(),
256        desc: "Number of actors in a proc".into(),
257    })
258    pub attr NUM_ACTORS: usize = 0;
259
260    /// References of system/infrastructure children.
261    @meta(INTROSPECT = IntrospectAttr {
262        name: "system_children".into(),
263        desc: "References of system/infrastructure children".into(),
264    })
265    pub attr SYSTEM_CHILDREN: Vec<NodeRef>;
266
267    /// References of stopped children (proc only).
268    @meta(INTROSPECT = IntrospectAttr {
269        name: "stopped_children".into(),
270        desc: "References of stopped children".into(),
271    })
272    pub attr STOPPED_CHILDREN: Vec<NodeRef>;
273
274    /// Cap on stopped children retention.
275    @meta(INTROSPECT = IntrospectAttr {
276        name: "stopped_retention_cap".into(),
277        desc: "Maximum number of stopped children retained".into(),
278    })
279    pub attr STOPPED_RETENTION_CAP: usize = 0;
280
281    /// Whether this proc is refusing new spawns due to actor
282    /// failures.
283    @meta(INTROSPECT = IntrospectAttr {
284        name: "is_poisoned".into(),
285        desc: "Whether this proc is poisoned (refusing new spawns)".into(),
286    })
287    pub attr IS_POISONED: bool = false;
288
289    /// Count of failed actors in a proc.
290    @meta(INTROSPECT = IntrospectAttr {
291        name: "failed_actor_count".into(),
292        desc: "Number of failed actors in this proc".into(),
293    })
294    pub attr FAILED_ACTOR_COUNT: usize = 0;
295
296    /// Timestamp when the mesh was started.
297    @meta(INTROSPECT = IntrospectAttr {
298        name: "started_at".into(),
299        desc: "Timestamp when the mesh was started".into(),
300    })
301    pub attr STARTED_AT: std::time::SystemTime;
302
303    /// Username who started the mesh.
304    @meta(INTROSPECT = IntrospectAttr {
305        name: "started_by".into(),
306        desc: "Username who started the mesh".into(),
307    })
308    pub attr STARTED_BY: String;
309
310    /// Number of hosts in the mesh (root only).
311    @meta(INTROSPECT = IntrospectAttr {
312        name: "num_hosts".into(),
313        desc: "Number of hosts in the mesh".into(),
314    })
315    pub attr NUM_HOSTS: usize = 0;
316
317}
318
319use hyperactor::introspect::AttrsViewError;
320
321/// Typed view over attrs for a root node.
322#[derive(Debug, Clone, PartialEq)]
323pub struct RootAttrsView {
324    pub num_hosts: usize,
325    pub started_at: SystemTime,
326    pub started_by: String,
327    pub system_children: Vec<NodeRef>,
328}
329
330impl RootAttrsView {
331    /// Decode from an `Attrs` bag (AV-2, AV-3). Requires
332    /// `STARTED_AT` and `STARTED_BY`; `NUM_HOSTS` defaults to 0,
333    /// `SYSTEM_CHILDREN` defaults to empty.
334    pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
335        let num_hosts = *attrs.get(NUM_HOSTS).unwrap_or(&0);
336        let started_at = *attrs
337            .get(STARTED_AT)
338            .ok_or_else(|| AttrsViewError::missing("started_at"))?;
339        let started_by = attrs
340            .get(STARTED_BY)
341            .ok_or_else(|| AttrsViewError::missing("started_by"))?
342            .clone();
343        let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
344        Ok(Self {
345            num_hosts,
346            started_at,
347            started_by,
348            system_children,
349        })
350    }
351
352    /// Encode into an `Attrs` bag (AV-1 round-trip producer).
353    pub fn to_attrs(&self) -> Attrs {
354        let mut attrs = Attrs::new();
355        attrs.set(NODE_TYPE, "root".to_string());
356        attrs.set(NUM_HOSTS, self.num_hosts);
357        attrs.set(STARTED_AT, self.started_at);
358        attrs.set(STARTED_BY, self.started_by.clone());
359        attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
360        attrs
361    }
362}
363
364/// Typed view over attrs for a host node.
365#[derive(Debug, Clone, PartialEq)]
366pub struct HostAttrsView {
367    pub addr: String,
368    pub num_procs: usize,
369    pub system_children: Vec<NodeRef>,
370}
371
372impl HostAttrsView {
373    /// Decode from an `Attrs` bag (AV-2, AV-3). Requires `ADDR`;
374    /// `NUM_PROCS` defaults to 0, `SYSTEM_CHILDREN` defaults to
375    /// empty.
376    pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
377        let addr = attrs
378            .get(ADDR)
379            .ok_or_else(|| AttrsViewError::missing("addr"))?
380            .clone();
381        let num_procs = *attrs.get(NUM_PROCS).unwrap_or(&0);
382        let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
383        Ok(Self {
384            addr,
385            num_procs,
386            system_children,
387        })
388    }
389
390    /// Encode into an `Attrs` bag (AV-1 round-trip producer).
391    pub fn to_attrs(&self) -> Attrs {
392        let mut attrs = Attrs::new();
393        attrs.set(NODE_TYPE, "host".to_string());
394        attrs.set(ADDR, self.addr.clone());
395        attrs.set(NUM_PROCS, self.num_procs);
396        attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
397        attrs
398    }
399}
400
401/// Typed view over attrs for a proc node.
402#[derive(Debug, Clone, PartialEq)]
403pub struct ProcAttrsView {
404    pub proc_name: String,
405    pub num_actors: usize,
406    pub system_children: Vec<NodeRef>,
407    pub stopped_children: Vec<NodeRef>,
408    pub stopped_retention_cap: usize,
409    pub is_poisoned: bool,
410    pub failed_actor_count: usize,
411}
412
413impl ProcAttrsView {
414    /// Decode from an `Attrs` bag (AV-2, AV-3). Requires
415    /// `PROC_NAME`; remaining fields have defaults. Checks FI-5
416    /// coherence.
417    pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
418        let proc_name = attrs
419            .get(PROC_NAME)
420            .ok_or_else(|| AttrsViewError::missing("proc_name"))?
421            .clone();
422        let num_actors = *attrs.get(NUM_ACTORS).unwrap_or(&0);
423        let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
424        let stopped_children = attrs.get(STOPPED_CHILDREN).cloned().unwrap_or_default();
425        let stopped_retention_cap = *attrs.get(STOPPED_RETENTION_CAP).unwrap_or(&0);
426        let is_poisoned = *attrs.get(IS_POISONED).unwrap_or(&false);
427        let failed_actor_count = *attrs.get(FAILED_ACTOR_COUNT).unwrap_or(&0);
428
429        // FI-5: is_poisoned iff failed_actor_count > 0.
430        if is_poisoned != (failed_actor_count > 0) {
431            return Err(AttrsViewError::invariant(
432                "FI-5",
433                format!("is_poisoned={is_poisoned} but failed_actor_count={failed_actor_count}"),
434            ));
435        }
436
437        Ok(Self {
438            proc_name,
439            num_actors,
440            system_children,
441            stopped_children,
442            stopped_retention_cap,
443            is_poisoned,
444            failed_actor_count,
445        })
446    }
447
448    /// Encode into an `Attrs` bag (AV-1 round-trip producer).
449    pub fn to_attrs(&self) -> Attrs {
450        let mut attrs = Attrs::new();
451        attrs.set(NODE_TYPE, "proc".to_string());
452        attrs.set(PROC_NAME, self.proc_name.clone());
453        attrs.set(NUM_ACTORS, self.num_actors);
454        attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
455        attrs.set(STOPPED_CHILDREN, self.stopped_children.clone());
456        attrs.set(STOPPED_RETENTION_CAP, self.stopped_retention_cap);
457        attrs.set(IS_POISONED, self.is_poisoned);
458        attrs.set(FAILED_ACTOR_COUNT, self.failed_actor_count);
459        attrs
460    }
461}
462
463/// Typed view over attrs for an error node.
464#[derive(Debug, Clone, PartialEq)]
465pub struct ErrorAttrsView {
466    pub code: String,
467    pub message: String,
468}
469
470impl ErrorAttrsView {
471    /// Decode from an `Attrs` bag (AV-2, AV-3). Requires
472    /// `ERROR_CODE`; `ERROR_MESSAGE` defaults to empty.
473    pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
474        use hyperactor::introspect::ERROR_CODE;
475        use hyperactor::introspect::ERROR_MESSAGE;
476
477        let code = attrs
478            .get(ERROR_CODE)
479            .ok_or_else(|| AttrsViewError::missing("error_code"))?
480            .clone();
481        let message = attrs.get(ERROR_MESSAGE).cloned().unwrap_or_default();
482        Ok(Self { code, message })
483    }
484
485    /// Encode into an `Attrs` bag (AV-1 round-trip producer).
486    pub fn to_attrs(&self) -> Attrs {
487        use hyperactor::introspect::ERROR_CODE;
488        use hyperactor::introspect::ERROR_MESSAGE;
489
490        let mut attrs = Attrs::new();
491        attrs.set(ERROR_CODE, self.code.clone());
492        attrs.set(ERROR_MESSAGE, self.message.clone());
493        attrs
494    }
495}
496
497// --- API / presentation types ---
498
499use std::fmt;
500use std::str::FromStr;
501use std::time::SystemTime;
502
503use serde::Deserialize;
504use serde::Serialize;
505use typeuri::Named;
506
507/// Typed reference to a node in the mesh-admin navigation tree.
508///
509/// Extends `IntrospectRef` with mesh-only concepts (`Root`, `Host`).
510/// hyperactor does not know about these variants.
511#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
512pub enum NodeRef {
513    /// Synthetic mesh root node.
514    /// Serializes as lowercase `"root"` to match the HTTP path convention.
515    #[serde(rename = "root")]
516    Root,
517    /// A host in the mesh, identified by its `HostAgent` actor ID.
518    Host(hyperactor::reference::ActorId),
519    /// A proc running on a host.
520    Proc(hyperactor::reference::ProcId),
521    /// An actor instance within a proc.
522    Actor(hyperactor::reference::ActorId),
523}
524
525hyperactor_config::impl_attrvalue!(NodeRef);
526
527impl fmt::Display for NodeRef {
528    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
529        match self {
530            Self::Root => write!(f, "root"),
531            Self::Host(id) => write!(f, "host:{}", id),
532            Self::Proc(id) => fmt::Display::fmt(id, f),
533            Self::Actor(id) => fmt::Display::fmt(id, f),
534        }
535    }
536}
537
538/// Error parsing a `NodeRef` from a string.
539#[derive(Debug, thiserror::Error)]
540pub enum NodeRefParseError {
541    #[error("empty reference string")]
542    Empty,
543    #[error("invalid host reference: {0}")]
544    InvalidHost(hyperactor::reference::ReferenceParsingError),
545    #[error("port references are not valid node references")]
546    PortNotAllowed,
547    #[error(transparent)]
548    Reference(#[from] hyperactor::reference::ReferenceParsingError),
549}
550
551impl FromStr for NodeRef {
552    type Err = NodeRefParseError;
553
554    fn from_str(s: &str) -> Result<Self, Self::Err> {
555        if s.is_empty() {
556            return Err(NodeRefParseError::Empty);
557        }
558        if s == "root" {
559            return Ok(Self::Root);
560        }
561        if let Some(rest) = s.strip_prefix("host:") {
562            let actor_id: hyperactor::reference::ActorId =
563                rest.parse().map_err(NodeRefParseError::InvalidHost)?;
564            return Ok(Self::Host(actor_id));
565        }
566        let r: hyperactor::reference::Reference = s.parse()?;
567        match r {
568            hyperactor::reference::Reference::Proc(id) => Ok(Self::Proc(id)),
569            hyperactor::reference::Reference::Actor(id) => Ok(Self::Actor(id)),
570            hyperactor::reference::Reference::Port(_) => Err(NodeRefParseError::PortNotAllowed),
571        }
572    }
573}
574
575impl From<hyperactor::introspect::IntrospectRef> for NodeRef {
576    fn from(r: hyperactor::introspect::IntrospectRef) -> Self {
577        match r {
578            hyperactor::introspect::IntrospectRef::Proc(id) => Self::Proc(id),
579            hyperactor::introspect::IntrospectRef::Actor(id) => Self::Actor(id),
580        }
581    }
582}
583
584/// Uniform response for any node in the mesh topology.
585///
586/// Every addressable entity (root, host, proc, actor) is represented
587/// as a `NodePayload`. The client navigates the mesh by fetching a
588/// node and following its `children` references.
589///
590/// See IA-1..IA-5 in module doc.
591// Serialize/Deserialize required by wirevalue::register_type! and
592// ResolveReferenceResponse actor messaging. HTTP serialization uses
593// dto::NodePayloadDto, not these derives.
594#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
595pub struct NodePayload {
596    /// Canonical node reference identifying this node.
597    pub identity: NodeRef,
598    /// Node-specific metadata (type, status, metrics, etc.).
599    pub properties: NodeProperties,
600    /// Child node references for downward navigation.
601    pub children: Vec<NodeRef>,
602    /// Parent node reference for upward navigation.
603    pub parent: Option<NodeRef>,
604    /// When this payload was captured.
605    pub as_of: SystemTime,
606}
607wirevalue::register_type!(NodePayload);
608
609/// Node-specific metadata. Externally-tagged enum — the variant
610/// name is the discriminator (Root, Host, Proc, Actor, Error).
611// Serialize/Deserialize required by wirevalue::register_type! and
612// ResolveReferenceResponse actor messaging. HTTP serialization uses
613// dto::NodePropertiesDto, not these derives.
614#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
615pub enum NodeProperties {
616    /// Synthetic mesh root node (not a real actor/proc).
617    Root {
618        num_hosts: usize,
619        started_at: SystemTime,
620        started_by: String,
621        system_children: Vec<NodeRef>,
622    },
623    /// A host in the mesh, represented by its `HostAgent`.
624    Host {
625        addr: String,
626        num_procs: usize,
627        system_children: Vec<NodeRef>,
628    },
629    /// Properties describing a proc running on a host.
630    Proc {
631        proc_name: String,
632        num_actors: usize,
633        system_children: Vec<NodeRef>,
634        stopped_children: Vec<NodeRef>,
635        stopped_retention_cap: usize,
636        is_poisoned: bool,
637        failed_actor_count: usize,
638    },
639    /// Runtime metadata for a single actor instance.
640    Actor {
641        actor_status: String,
642        actor_type: String,
643        messages_processed: u64,
644        created_at: Option<SystemTime>,
645        last_message_handler: Option<String>,
646        total_processing_time_us: u64,
647        flight_recorder: Option<String>,
648        is_system: bool,
649        failure_info: Option<FailureInfo>,
650    },
651    /// Error sentinel returned when a child reference cannot be resolved.
652    Error { code: String, message: String },
653}
654wirevalue::register_type!(NodeProperties);
655
656/// Structured failure information for failed actors.
657// Serialize/Deserialize required by wirevalue::register_type! and
658// ResolveReferenceResponse actor messaging. HTTP serialization uses
659// dto::FailureInfoDto, not these derives.
660#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
661pub struct FailureInfo {
662    /// Error message describing the failure.
663    pub error_message: String,
664    /// Actor that caused the failure (root cause).
665    pub root_cause_actor: hyperactor::reference::ActorId,
666    /// Display name of the root-cause actor, if available.
667    pub root_cause_name: Option<String>,
668    /// When the failure occurred.
669    pub occurred_at: SystemTime,
670    /// Whether this failure was propagated from a child.
671    pub is_propagated: bool,
672}
673wirevalue::register_type!(FailureInfo);
674
675/// Mesh-layer conversion from a typed attrs view to `NodeProperties`.
676///
677/// Defined here so that `hyperactor` views (e.g. `ActorAttrsView`) can
678/// produce `NodeProperties` without depending on the mesh crate.
679trait IntoNodeProperties {
680    fn into_node_properties(self) -> NodeProperties;
681}
682
683impl IntoNodeProperties for RootAttrsView {
684    fn into_node_properties(self) -> NodeProperties {
685        NodeProperties::Root {
686            num_hosts: self.num_hosts,
687            started_at: self.started_at,
688            started_by: self.started_by,
689            system_children: self.system_children,
690        }
691    }
692}
693
694impl IntoNodeProperties for HostAttrsView {
695    fn into_node_properties(self) -> NodeProperties {
696        NodeProperties::Host {
697            addr: self.addr,
698            num_procs: self.num_procs,
699            system_children: self.system_children,
700        }
701    }
702}
703
704impl IntoNodeProperties for ProcAttrsView {
705    fn into_node_properties(self) -> NodeProperties {
706        NodeProperties::Proc {
707            proc_name: self.proc_name,
708            num_actors: self.num_actors,
709            system_children: self.system_children,
710            stopped_children: self.stopped_children,
711            stopped_retention_cap: self.stopped_retention_cap,
712            is_poisoned: self.is_poisoned,
713            failed_actor_count: self.failed_actor_count,
714        }
715    }
716}
717
718impl IntoNodeProperties for ErrorAttrsView {
719    fn into_node_properties(self) -> NodeProperties {
720        NodeProperties::Error {
721            code: self.code,
722            message: self.message,
723        }
724    }
725}
726
727impl IntoNodeProperties for hyperactor::introspect::ActorAttrsView {
728    fn into_node_properties(self) -> NodeProperties {
729        let actor_status = match &self.status_reason {
730            Some(reason) => format!("{}: {}", self.status, reason),
731            None => self.status.clone(),
732        };
733
734        let failure_info = self.failure.map(|fi| FailureInfo {
735            error_message: fi.error_message,
736            root_cause_actor: fi.root_cause_actor,
737            root_cause_name: fi.root_cause_name,
738            occurred_at: fi.occurred_at,
739            is_propagated: fi.is_propagated,
740        });
741
742        NodeProperties::Actor {
743            actor_status,
744            actor_type: self.actor_type,
745            messages_processed: self.messages_processed,
746            created_at: self.created_at,
747            last_message_handler: self.last_handler,
748            total_processing_time_us: self.total_processing_time_us,
749            flight_recorder: self.flight_recorder,
750            is_system: self.is_system,
751            failure_info,
752        }
753    }
754}
755
756/// Derive `NodeProperties` from a JSON-serialized attrs string.
757///
758/// Detection precedence (DP-1, DP-3):
759/// 1. `node_type` = "root" / "host" / "proc" → corresponding variant
760/// 2. `error_code` present → Error
761/// 3. `STATUS` key present → Actor
762/// 4. none of the above → Error("unknown_node_type")
763///
764/// DP-2 / DP-4: this function is total — malformed attrs never
765/// panic; view decode failures map to `NodeProperties::Error`
766/// with a `malformed_*` code.
767/// AV-3 / IA-6: view decoders ignore unknown keys.
768pub fn derive_properties(attrs_json: &str) -> NodeProperties {
769    let attrs: Attrs = match serde_json::from_str(attrs_json) {
770        Ok(a) => a,
771        Err(_) => {
772            return NodeProperties::Error {
773                code: "parse_error".into(),
774                message: "failed to parse attrs JSON".into(),
775            };
776        }
777    };
778
779    let node_type = attrs.get(NODE_TYPE).cloned().unwrap_or_default();
780
781    match node_type.as_str() {
782        "root" => match RootAttrsView::from_attrs(&attrs) {
783            Ok(v) => v.into_node_properties(),
784            Err(e) => NodeProperties::Error {
785                code: "malformed_root".into(),
786                message: e.to_string(),
787            },
788        },
789        "host" => match HostAttrsView::from_attrs(&attrs) {
790            Ok(v) => v.into_node_properties(),
791            Err(e) => NodeProperties::Error {
792                code: "malformed_host".into(),
793                message: e.to_string(),
794            },
795        },
796        "proc" => match ProcAttrsView::from_attrs(&attrs) {
797            Ok(v) => v.into_node_properties(),
798            Err(e) => NodeProperties::Error {
799                code: "malformed_proc".into(),
800                message: e.to_string(),
801            },
802        },
803        _ => {
804            // DP-1: error_code → Error, STATUS present → Actor,
805            // else → Error("unknown_node_type").
806            use hyperactor::introspect::ERROR_CODE;
807            use hyperactor::introspect::STATUS;
808
809            if attrs.get(ERROR_CODE).is_some() {
810                return match ErrorAttrsView::from_attrs(&attrs) {
811                    Ok(v) => v.into_node_properties(),
812                    Err(e) => NodeProperties::Error {
813                        code: "malformed_error".into(),
814                        message: e.to_string(),
815                    },
816                };
817            }
818
819            if attrs.get(STATUS).is_none() {
820                return NodeProperties::Error {
821                    code: "unknown_node_type".into(),
822                    message: format!("unrecognized node_type: {:?}", node_type),
823                };
824            }
825
826            match hyperactor::introspect::ActorAttrsView::from_attrs(&attrs) {
827                Ok(v) => v.into_node_properties(),
828                Err(e) => NodeProperties::Error {
829                    code: "malformed_actor".into(),
830                    message: e.to_string(),
831                },
832            }
833        }
834    }
835}
836
837/// Convert an `IntrospectResult` to a presentation `NodePayload`.
838/// Lifts `IntrospectRef` → `NodeRef` and passes through typed timestamps.
839pub fn to_node_payload(result: hyperactor::introspect::IntrospectResult) -> NodePayload {
840    NodePayload {
841        identity: result.identity.into(),
842        properties: derive_properties(&result.attrs),
843        children: result.children.into_iter().map(NodeRef::from).collect(),
844        parent: result.parent.map(NodeRef::from),
845        as_of: result.as_of,
846    }
847}
848
849/// Convert an `IntrospectResult` to a `NodePayload`, overriding
850/// identity and parent for correct tree navigation.
851pub fn to_node_payload_with(
852    result: hyperactor::introspect::IntrospectResult,
853    identity: NodeRef,
854    parent: Option<NodeRef>,
855) -> NodePayload {
856    NodePayload {
857        identity,
858        properties: derive_properties(&result.attrs),
859        children: result.children.into_iter().map(NodeRef::from).collect(),
860        parent,
861        as_of: result.as_of,
862    }
863}
864
865#[cfg(test)]
866mod tests {
867    use super::*;
868
869    /// Enforces MK-1 (metadata completeness) for all mesh-topology
870    /// introspection keys.
871    #[test]
872    fn test_mesh_introspect_keys_are_tagged() {
873        let cases = vec![
874            ("node_type", NODE_TYPE.attrs()),
875            ("addr", ADDR.attrs()),
876            ("num_procs", NUM_PROCS.attrs()),
877            ("proc_name", PROC_NAME.attrs()),
878            ("num_actors", NUM_ACTORS.attrs()),
879            ("system_children", SYSTEM_CHILDREN.attrs()),
880            ("stopped_children", STOPPED_CHILDREN.attrs()),
881            ("stopped_retention_cap", STOPPED_RETENTION_CAP.attrs()),
882            ("is_poisoned", IS_POISONED.attrs()),
883            ("failed_actor_count", FAILED_ACTOR_COUNT.attrs()),
884            ("started_at", STARTED_AT.attrs()),
885            ("started_by", STARTED_BY.attrs()),
886            ("num_hosts", NUM_HOSTS.attrs()),
887        ];
888
889        for (expected_name, meta) in &cases {
890            // MK-1: every key must have INTROSPECT with non-empty
891            // name and desc.
892            let introspect = meta
893                .get(INTROSPECT)
894                .unwrap_or_else(|| panic!("{expected_name}: missing INTROSPECT meta-attr"));
895            assert_eq!(
896                introspect.name, *expected_name,
897                "short name mismatch for {expected_name}"
898            );
899            assert!(
900                !introspect.desc.is_empty(),
901                "{expected_name}: desc should not be empty"
902            );
903        }
904
905        // Exhaustiveness: verify cases covers all INTROSPECT-tagged
906        // keys declared in this module.
907        use hyperactor_config::attrs::AttrKeyInfo;
908        let registry_count = inventory::iter::<AttrKeyInfo>()
909            .filter(|info| {
910                info.name.starts_with("hyperactor_mesh::introspect::")
911                    && info.meta.get(INTROSPECT).is_some()
912            })
913            .count();
914        assert_eq!(
915            cases.len(),
916            registry_count,
917            "test must cover all INTROSPECT-tagged keys in this module"
918        );
919    }
920
921    fn test_actor_ref(proc_name: &str, actor_name: &str, pid: usize) -> NodeRef {
922        use hyperactor::channel::ChannelAddr;
923        use hyperactor::reference::ProcId;
924        NodeRef::Actor(
925            ProcId::with_name(ChannelAddr::Local(0), proc_name).actor_id(actor_name, pid),
926        )
927    }
928
929    fn root_view() -> RootAttrsView {
930        RootAttrsView {
931            num_hosts: 3,
932            started_at: std::time::UNIX_EPOCH,
933            started_by: "testuser".into(),
934            system_children: vec![test_actor_ref("proc", "child1", 0)],
935        }
936    }
937
938    fn host_view() -> HostAttrsView {
939        HostAttrsView {
940            addr: "10.0.0.1:8080".into(),
941            num_procs: 2,
942            system_children: vec![test_actor_ref("proc", "sys", 0)],
943        }
944    }
945
946    fn proc_view() -> ProcAttrsView {
947        ProcAttrsView {
948            proc_name: "worker".into(),
949            num_actors: 5,
950            system_children: vec![],
951            stopped_children: vec![test_actor_ref("proc", "old", 0)],
952            stopped_retention_cap: 10,
953            is_poisoned: false,
954            failed_actor_count: 0,
955        }
956    }
957
958    fn error_view() -> ErrorAttrsView {
959        ErrorAttrsView {
960            code: "not_found".into(),
961            message: "child not found".into(),
962        }
963    }
964
965    /// AV-1: from_attrs(to_attrs(v)) == v.
966    #[test]
967    fn test_root_view_round_trip() {
968        let view = root_view();
969        let rt = RootAttrsView::from_attrs(&view.to_attrs()).unwrap();
970        assert_eq!(rt, view);
971    }
972
973    /// AV-1.
974    #[test]
975    fn test_host_view_round_trip() {
976        let view = host_view();
977        let rt = HostAttrsView::from_attrs(&view.to_attrs()).unwrap();
978        assert_eq!(rt, view);
979    }
980
981    /// AV-1.
982    #[test]
983    fn test_proc_view_round_trip() {
984        let view = proc_view();
985        let rt = ProcAttrsView::from_attrs(&view.to_attrs()).unwrap();
986        assert_eq!(rt, view);
987    }
988
989    /// AV-1.
990    #[test]
991    fn test_error_view_round_trip() {
992        let view = error_view();
993        let rt = ErrorAttrsView::from_attrs(&view.to_attrs()).unwrap();
994        assert_eq!(rt, view);
995    }
996
997    /// AV-2: missing required key rejected.
998    #[test]
999    fn test_root_view_missing_started_at() {
1000        let mut attrs = Attrs::new();
1001        attrs.set(NODE_TYPE, "root".into());
1002        attrs.set(STARTED_BY, "user".into());
1003        let err = RootAttrsView::from_attrs(&attrs).unwrap_err();
1004        assert_eq!(err, AttrsViewError::MissingKey { key: "started_at" });
1005    }
1006
1007    /// AV-2.
1008    #[test]
1009    fn test_root_view_missing_started_by() {
1010        let mut attrs = Attrs::new();
1011        attrs.set(NODE_TYPE, "root".into());
1012        attrs.set(STARTED_AT, std::time::UNIX_EPOCH);
1013        let err = RootAttrsView::from_attrs(&attrs).unwrap_err();
1014        assert_eq!(err, AttrsViewError::MissingKey { key: "started_by" });
1015    }
1016
1017    /// AV-2.
1018    #[test]
1019    fn test_host_view_missing_addr() {
1020        let attrs = Attrs::new();
1021        let err = HostAttrsView::from_attrs(&attrs).unwrap_err();
1022        assert_eq!(err, AttrsViewError::MissingKey { key: "addr" });
1023    }
1024
1025    /// AV-2.
1026    #[test]
1027    fn test_proc_view_missing_proc_name() {
1028        let attrs = Attrs::new();
1029        let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1030        assert_eq!(err, AttrsViewError::MissingKey { key: "proc_name" });
1031    }
1032
1033    /// FI-5: poisoned without failures rejected.
1034    #[test]
1035    fn test_proc_view_fi5_poisoned_but_no_failures() {
1036        let mut attrs = Attrs::new();
1037        attrs.set(PROC_NAME, "bad".into());
1038        attrs.set(IS_POISONED, true);
1039        attrs.set(FAILED_ACTOR_COUNT, 0usize);
1040        let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1041        assert!(matches!(
1042            err,
1043            AttrsViewError::InvariantViolation { label: "FI-5", .. }
1044        ));
1045    }
1046
1047    /// FI-5: failures without poisoned rejected.
1048    #[test]
1049    fn test_proc_view_fi5_failures_but_not_poisoned() {
1050        let mut attrs = Attrs::new();
1051        attrs.set(PROC_NAME, "bad".into());
1052        attrs.set(IS_POISONED, false);
1053        attrs.set(FAILED_ACTOR_COUNT, 2usize);
1054        let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1055        assert!(matches!(
1056            err,
1057            AttrsViewError::InvariantViolation { label: "FI-5", .. }
1058        ));
1059    }
1060
1061    /// DP-2 / DP-4: unparseable JSON → Error.
1062    #[test]
1063    fn test_derive_properties_unparseable_json() {
1064        let props = derive_properties("not json");
1065        assert!(matches!(props, NodeProperties::Error { code, .. } if code == "parse_error"));
1066    }
1067
1068    /// DP-3: unknown node_type → Error.
1069    #[test]
1070    fn test_derive_properties_unknown_node_type() {
1071        let attrs = Attrs::new();
1072        let json = serde_json::to_string(&attrs).unwrap();
1073        let props = derive_properties(&json);
1074        assert!(matches!(props, NodeProperties::Error { code, .. } if code == "unknown_node_type"));
1075    }
1076
1077    /// DP-4: view decode failure → malformed_* Error.
1078    #[test]
1079    fn test_derive_properties_malformed_root() {
1080        let mut attrs = Attrs::new();
1081        attrs.set(NODE_TYPE, "root".into());
1082        let json = serde_json::to_string(&attrs).unwrap();
1083        let props = derive_properties(&json);
1084        assert!(matches!(props, NodeProperties::Error { code, .. } if code == "malformed_root"));
1085    }
1086
1087    /// DP-4: invariant violation → malformed_* Error.
1088    #[test]
1089    fn test_derive_properties_malformed_proc_fi5() {
1090        let mut attrs = Attrs::new();
1091        attrs.set(NODE_TYPE, "proc".into());
1092        attrs.set(PROC_NAME, "bad".into());
1093        attrs.set(IS_POISONED, true);
1094        attrs.set(FAILED_ACTOR_COUNT, 0usize);
1095        let json = serde_json::to_string(&attrs).unwrap();
1096        let props = derive_properties(&json);
1097        assert!(matches!(props, NodeProperties::Error { code, .. } if code == "malformed_proc"));
1098    }
1099
1100    /// DP-3: node_type "root" → Root variant.
1101    #[test]
1102    fn test_derive_properties_valid_root() {
1103        let view = root_view();
1104        let json = serde_json::to_string(&view.to_attrs()).unwrap();
1105        let props = derive_properties(&json);
1106        assert!(matches!(props, NodeProperties::Root { num_hosts: 3, .. }));
1107    }
1108
1109    /// DP-3: node_type "host" → Host variant.
1110    #[test]
1111    fn test_derive_properties_valid_host() {
1112        let view = host_view();
1113        let json = serde_json::to_string(&view.to_attrs()).unwrap();
1114        let props = derive_properties(&json);
1115        assert!(matches!(props, NodeProperties::Host { num_procs: 2, .. }));
1116    }
1117
1118    /// DP-3: node_type "proc" → Proc variant.
1119    #[test]
1120    fn test_derive_properties_valid_proc() {
1121        let view = proc_view();
1122        let json = serde_json::to_string(&view.to_attrs()).unwrap();
1123        let props = derive_properties(&json);
1124        assert!(matches!(props, NodeProperties::Proc { num_actors: 5, .. }));
1125    }
1126
1127    /// DP-3: error_code present → Error variant.
1128    #[test]
1129    fn test_derive_properties_valid_error() {
1130        let view = error_view();
1131        let json = serde_json::to_string(&view.to_attrs()).unwrap();
1132        let props = derive_properties(&json);
1133        assert!(matches!(props, NodeProperties::Error { .. }));
1134        if let NodeProperties::Error { code, message } = props {
1135            assert_eq!(code, "not_found");
1136            assert_eq!(message, "child not found");
1137        }
1138    }
1139
1140    /// DP-3: STATUS present → Actor variant.
1141    #[test]
1142    fn test_derive_properties_valid_actor() {
1143        use hyperactor::introspect::ACTOR_TYPE;
1144        use hyperactor::introspect::MESSAGES_PROCESSED;
1145        use hyperactor::introspect::STATUS;
1146
1147        let mut attrs = Attrs::new();
1148        attrs.set(STATUS, "running".into());
1149        attrs.set(ACTOR_TYPE, "TestActor".into());
1150        attrs.set(MESSAGES_PROCESSED, 7u64);
1151        let json = serde_json::to_string(&attrs).unwrap();
1152        let props = derive_properties(&json);
1153        assert!(matches!(
1154            props,
1155            NodeProperties::Actor {
1156                messages_processed: 7,
1157                ..
1158            }
1159        ));
1160    }
1161
1162    /// Injects an unknown key into serialized attrs JSON and
1163    /// verifies that derive_properties still decodes successfully.
1164    /// Exercises IA-6 (open-row-forward-compat) for each view.
1165    fn inject_unknown_key(attrs: &Attrs) -> String {
1166        let mut map: serde_json::Map<String, serde_json::Value> =
1167            serde_json::from_str(&serde_json::to_string(attrs).unwrap()).unwrap();
1168        map.insert(
1169            "unknown_future_key".into(),
1170            serde_json::Value::String("surprise".into()),
1171        );
1172        serde_json::to_string(&map).unwrap()
1173    }
1174
1175    #[test]
1176    fn test_ia6_root_ignores_unknown_keys() {
1177        let json = inject_unknown_key(&root_view().to_attrs());
1178        let props = derive_properties(&json);
1179        assert!(matches!(props, NodeProperties::Root { num_hosts: 3, .. }));
1180    }
1181
1182    #[test]
1183    fn test_ia6_host_ignores_unknown_keys() {
1184        let json = inject_unknown_key(&host_view().to_attrs());
1185        let props = derive_properties(&json);
1186        assert!(matches!(props, NodeProperties::Host { num_procs: 2, .. }));
1187    }
1188
1189    #[test]
1190    fn test_ia6_proc_ignores_unknown_keys() {
1191        let json = inject_unknown_key(&proc_view().to_attrs());
1192        let props = derive_properties(&json);
1193        assert!(matches!(props, NodeProperties::Proc { num_actors: 5, .. }));
1194    }
1195
1196    #[test]
1197    fn test_ia6_error_ignores_unknown_keys() {
1198        let json = inject_unknown_key(&error_view().to_attrs());
1199        let props = derive_properties(&json);
1200        assert!(matches!(props, NodeProperties::Error { .. }));
1201    }
1202
1203    #[test]
1204    fn test_ia6_actor_ignores_unknown_keys() {
1205        use hyperactor::introspect::ACTOR_TYPE;
1206        use hyperactor::introspect::STATUS;
1207
1208        let mut attrs = Attrs::new();
1209        attrs.set(STATUS, "running".into());
1210        attrs.set(ACTOR_TYPE, "TestActor".into());
1211        let json = inject_unknown_key(&attrs);
1212        let props = derive_properties(&json);
1213        assert!(matches!(props, NodeProperties::Actor { .. }));
1214    }
1215
1216    /// SC-1 / SC-2: schema is derived from types and matches the
1217    /// checked-in snapshot.
1218    ///
1219    /// To update after intentional type changes:
1220    /// ```sh
1221    /// buck run fbcode//monarch/hyperactor_mesh:generate_api_artifacts \
1222    ///   @fbcode//mode/dev-nosan -- \
1223    ///   fbcode/monarch/hyperactor_mesh/src/testdata
1224    /// ```
1225    /// Strip the `$comment` field (containing the @\u{200B}generated marker)
1226    /// from a JSON value so snapshot comparisons ignore it.
1227    fn strip_comment(mut value: serde_json::Value) -> serde_json::Value {
1228        if let Some(obj) = value.as_object_mut() {
1229            obj.remove("$comment");
1230        }
1231        value
1232    }
1233
1234    #[test]
1235    fn test_node_payload_schema_snapshot() {
1236        let schema = schemars::schema_for!(dto::NodePayloadDto);
1237        let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1238        let expected: serde_json::Value = strip_comment(
1239            serde_json::from_str(include_str!("testdata/node_payload_schema.json"))
1240                .expect("snapshot must be valid JSON"),
1241        );
1242        assert_eq!(
1243            actual, expected,
1244            "schema changed — review and update snapshot if intentional"
1245        );
1246    }
1247
1248    /// SC-3: real payloads validate against the generated schema.
1249    #[test]
1250    fn test_payloads_validate_against_schema() {
1251        use hyperactor::channel::ChannelAddr;
1252        use hyperactor::reference::ProcId;
1253
1254        let schema = schemars::schema_for!(dto::NodePayloadDto);
1255        let schema_value = serde_json::to_value(&schema).unwrap();
1256        let compiled = jsonschema::JSONSchema::compile(&schema_value).expect("schema must compile");
1257
1258        let epoch = std::time::UNIX_EPOCH;
1259        let proc_id = ProcId::with_name(ChannelAddr::Local(0), "worker");
1260        let actor_id = proc_id.actor_id("actor", 0);
1261
1262        let samples = [
1263            NodePayload {
1264                identity: NodeRef::Root,
1265                properties: NodeProperties::Root {
1266                    num_hosts: 2,
1267                    started_at: epoch,
1268                    started_by: "testuser".into(),
1269                    system_children: vec![],
1270                },
1271                children: vec![NodeRef::Host(actor_id.clone())],
1272                parent: None,
1273                as_of: epoch,
1274            },
1275            NodePayload {
1276                identity: NodeRef::Host(actor_id.clone()),
1277                properties: NodeProperties::Host {
1278                    addr: "10.0.0.1:8080".into(),
1279                    num_procs: 2,
1280                    system_children: vec![test_actor_ref("proc", "sys", 0)],
1281                },
1282                children: vec![NodeRef::Proc(proc_id.clone())],
1283                parent: Some(NodeRef::Root),
1284                as_of: epoch,
1285            },
1286            NodePayload {
1287                identity: NodeRef::Proc(proc_id.clone()),
1288                properties: NodeProperties::Proc {
1289                    proc_name: "worker".into(),
1290                    num_actors: 5,
1291                    system_children: vec![],
1292                    stopped_children: vec![],
1293                    stopped_retention_cap: 10,
1294                    is_poisoned: false,
1295                    failed_actor_count: 0,
1296                },
1297                children: vec![NodeRef::Actor(actor_id.clone())],
1298                parent: Some(NodeRef::Host(actor_id.clone())),
1299                as_of: epoch,
1300            },
1301            NodePayload {
1302                identity: NodeRef::Actor(actor_id.clone()),
1303                properties: NodeProperties::Actor {
1304                    actor_status: "running".into(),
1305                    actor_type: "MyActor".into(),
1306                    messages_processed: 42,
1307                    created_at: Some(epoch),
1308                    last_message_handler: Some("handle_ping".into()),
1309                    total_processing_time_us: 1000,
1310                    flight_recorder: None,
1311                    is_system: false,
1312                    failure_info: None,
1313                },
1314                children: vec![],
1315                parent: Some(NodeRef::Proc(proc_id.clone())),
1316                as_of: epoch,
1317            },
1318            NodePayload {
1319                identity: NodeRef::Actor(actor_id.clone()),
1320                properties: NodeProperties::Error {
1321                    code: "not_found".into(),
1322                    message: "child not found".into(),
1323                },
1324                children: vec![],
1325                parent: None,
1326                as_of: epoch,
1327            },
1328        ];
1329
1330        for (i, payload) in samples.iter().enumerate() {
1331            let dto = dto::NodePayloadDto::from(payload.clone());
1332            let value = serde_json::to_value(&dto).unwrap();
1333            assert!(
1334                compiled.is_valid(&value),
1335                "sample {i} failed schema validation"
1336            );
1337        }
1338    }
1339
1340    /// SC-4: `$id` is injected only at the serve boundary.
1341    /// Stripping `$id` from the served schema must yield the raw
1342    /// schemars output.
1343    #[test]
1344    fn test_served_schema_is_raw_plus_id() {
1345        let raw: serde_json::Value =
1346            serde_json::to_value(schemars::schema_for!(dto::NodePayloadDto)).unwrap();
1347
1348        // Simulate what the endpoint does.
1349        let mut served = raw.clone();
1350        served.as_object_mut().unwrap().insert(
1351            "$id".into(),
1352            serde_json::Value::String("https://monarch.meta.com/schemas/v1/node_payload".into()),
1353        );
1354
1355        // Strip $id — remainder must equal raw.
1356        let mut stripped = served;
1357        stripped.as_object_mut().unwrap().remove("$id");
1358        assert_eq!(raw, stripped, "served schema differs from raw beyond $id");
1359    }
1360
1361    /// SC-2: error envelope schema matches checked-in snapshot.
1362    #[test]
1363    fn test_error_schema_snapshot() {
1364        use crate::mesh_admin::ApiErrorEnvelope;
1365
1366        let schema = schemars::schema_for!(ApiErrorEnvelope);
1367        let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1368        let expected: serde_json::Value = strip_comment(
1369            serde_json::from_str(include_str!("testdata/error_schema.json"))
1370                .expect("error snapshot must be valid JSON"),
1371        );
1372        assert_eq!(
1373            actual, expected,
1374            "error schema changed — review and update snapshot if intentional"
1375        );
1376    }
1377
1378    /// SC-2: AdminInfo schema matches checked-in snapshot.
1379    #[test]
1380    fn test_admin_info_schema_snapshot() {
1381        use crate::mesh_admin::AdminInfo;
1382
1383        let schema = schemars::schema_for!(AdminInfo);
1384        let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1385        let expected: serde_json::Value = strip_comment(
1386            serde_json::from_str(include_str!("testdata/admin_info_schema.json"))
1387                .expect("admin info snapshot must be valid JSON"),
1388        );
1389        assert_eq!(
1390            actual, expected,
1391            "AdminInfo schema changed — review and update snapshot if intentional"
1392        );
1393    }
1394
1395    /// SC-2: OpenAPI spec matches checked-in snapshot.
1396    #[test]
1397    fn test_openapi_spec_snapshot() {
1398        let actual = crate::mesh_admin::build_openapi_spec();
1399        let expected: serde_json::Value = strip_comment(
1400            serde_json::from_str(include_str!("testdata/openapi.json"))
1401                .expect("OpenAPI snapshot must be valid JSON"),
1402        );
1403        assert_eq!(
1404            actual, expected,
1405            "OpenAPI spec changed — review and update snapshot if intentional"
1406        );
1407    }
1408}