Skip to main content

hyperactor_mesh/
introspect.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mesh-topology introspection types and attrs.
10//!
11//! This module owns the typed internal model used by mesh-admin and the
12//! TUI: mesh-topology attr keys, typed attrs views, `NodeRef`, and the
13//! domain `NodePayload` / `NodeProperties` / `FailureInfo` values derived
14//! from `hyperactor::introspect::IntrospectResult`.
15//!
16//! These keys are published by `HostMeshAgent`, `ProcAgent`, and
17//! `MeshAdminAgent` to describe mesh topology (hosts, procs, root).
18//! Actor-runtime keys (status, actor_type, messages_processed, etc.) are
19//! declared in `hyperactor::introspect`.
20//!
21//! The HTTP wire representations live in [`dto`]. That submodule owns the
22//! curl-friendly JSON contract, schema/OpenAPI generation, and boundary
23//! invariants for string-encoded references and timestamps. This module
24//! keeps the internal typed invariants.
25//!
26//! These invariants govern the introspection model and derived
27//! payloads exposed by mesh-admin; lower-level runtime accounting
28//! invariants remain owned by the runtime modules that produce those
29//! values.
30//!
31//! See `hyperactor::introspect` for naming convention, invariant
32//! labels, and the `IntrospectAttr` meta-attribute pattern.
33//!
34//! ## Mesh key invariants (MK-*)
35//!
36//! - **MK-1 (metadata completeness):** Every mesh-topology
37//!   introspection key must carry `@meta(INTROSPECT = ...)` with
38//!   non-empty `name` and `desc`.
39//! - **MK-2 (short-name uniqueness):** Covered by
40//!   `test_introspect_short_names_are_globally_unique` in
41//!   `hyperactor::introspect` (cross-crate).
42//!
43//! ## Proc debug stats invariants (PD-*)
44//!
45//! These invariants govern the proc-debug introspection surface
46//! exposed by mesh-admin: proc attrs, typed proc views, and the
47//! proc-debug portion of `NodeProperties::Proc`.
48//!
49//! They do not define proc runtime mechanics. The underlying
50//! per-actor queue-depth accounting invariants live in
51//! `hyperactor::proc`; this module owns the proc-level debug values
52//! derived from that runtime state.
53//!
54//! - **PD-1:** `actor_work_queue_depth_max <=
55//!   actor_work_queue_depth_total`.
56//! - **PD-2:** `process_rss_bytes` and `process_vm_size_bytes` are
57//!   `None` on non-Linux or read failure. Never fabricated.
58//! - **PD-3:** All debug fields default to zero/None for backward
59//!   compatibility. Old procs that haven't published yet produce a
60//!   valid `ProcDebugStats::default()`.
61//! - **PD-4:** Queue depth aggregation covers live actors only.
62//!   Stopped/retained actor snapshots are excluded.
63//! - **PD-5:** See `hyperactor::proc` module doc for the per-actor
64//!   queue depth accounting invariants (PD-5a through PD-5e).
65//!
66//! ## HTTP boundary invariants (HB-*)
67//!
68//! These govern the HTTP DTO layer in [`dto`].
69//!
70//! - **HB-1 (typed-internal, string-external):** `NodeRef`, `ActorAddr`,
71//!   `ProcAddr`, and `SystemTime` are typed Rust values internally. At the
72//!   HTTP JSON boundary, [`dto::NodePayloadDto`],
73//!   [`dto::NodePropertiesDto`], and [`dto::FailureInfoDto`] encode them
74//!   as canonical strings.
75//! - **HB-2 (round-trip):** The HTTP string forms round-trip through the
76//!   internal typed parsers (`NodeRef::from_str`, `ActorAddr::from_str`,
77//!   `humantime::parse_rfc3339`). Timestamps are formatted at
78//!   millisecond precision; sub-millisecond values are truncated at
79//!   the boundary.
80//! - **HB-3 (schema-honesty):** Schema/OpenAPI are generated from the DTO
81//!   types, so the published schema reflects the actual wire format rather
82//!   than the internal domain representation.
83//!
84//! ## Attrs invariants (IA-*)
85//!
86//! These govern how `IntrospectResult.attrs` is built in
87//! `hyperactor::introspect` and how `properties` is derived via
88//! `derive_properties`.
89//!
90//! - **IA-1 (attrs-json):** `IntrospectResult.attrs` is always a
91//!   valid JSON object string.
92//! - **IA-2 (runtime-precedence):** Runtime-owned introspection keys
93//!   override any same-named keys in published attrs.
94//! - **IA-3 (status-shape):** `status_reason` is present in attrs
95//!   iff the status string carries a reason.
96//! - **IA-4 (failure-shape):** `failure_*` attrs are present iff
97//!   effective status is `failed`.
98//! - **IA-5 (payload-totality):** Every `IntrospectResult` sets
99//!   `attrs` -- never omitted, never null.
100//! - **IA-6 (open-row-forward-compat):** View decoders ignore
101//!   unknown attrs keys; only required known keys and local
102//!   invariants affect decoding outcome. Concretized by AV-3.
103//!
104//! ## Attrs view invariants (AV-*)
105//!
106//! These govern the typed view layer (`*AttrsView` structs).
107//!
108//! - **AV-1 (view-roundtrip):** For each view V,
109//!   `V::from_attrs(&v.to_attrs()) == Ok(v)` (modulo documented
110//!   normalization/defaulting).
111//! - **AV-2 (required-key-strictness):** `from_attrs` fails iff
112//!   required keys for that view are missing.
113//! - **AV-3 (unknown-key-tolerance):** Unknown attrs keys must
114//!   not affect successful decode outcome. Concretization of
115//!   IA-6.
116//!
117//! ## Derive invariants (DP-*)
118//!
119//! - **DP-1 (derive-precedence):** `derive_properties` dispatches
120//!   on `node_type` first, then falls back to `error_code`,
121//!   then `status`, then unknown. This order is the canonical
122//!   detection chain.
123//! - **DP-2 (derive-totality-on-parse-failure):**
124//!   `derive_properties` is total; malformed or incoherent attrs
125//!   never panic and map to `NodeProperties::Error` with detail.
126//! - **DP-3 (derive-precedence-stability):**
127//!   `derive_properties` detection order is stable and explicit:
128//!   `node_type` > `error_code` > `status` > unknown.
129//! - **DP-4 (error-on-decode-failure):** Any view decode or
130//!   invariant failure maps to a deterministic
131//!   `NodeProperties::Error` with a `malformed_*` code family,
132//!   without panic.
133//!
134//! ## py-spy integration (PS-*)
135//!
136//! - **PS-1 (target locality):** `PySpyDump` always targets
137//!   `std::process::id()` of the handling ProcAgent process. No
138//!   caller-supplied PID exists in the API.
139//! - **PS-2 (deterministic failure shape):** Execution failures are
140//!   classified into `BinaryNotFound { searched }` vs
141//!   `Failed { pid, binary, exit_code, stderr }`, never collapsed.
142//! - **PS-3 (binary resolution order):** Resolution order is exactly:
143//!   `PYSPY_BIN` config attr (if non-empty) then `"py-spy"` on PATH.
144//!   The attr is read via `hyperactor_config::global::get_cloned`;
145//!   env var `PYSPY_BIN` feeds in through the config layer.
146//!   If the first attempt is not found, the fallback attempt is
147//!   required.
148//! - **PS-4 (structured JSON output):** py-spy runs with `--json`;
149//!   output is parsed into `Vec<PySpyStackTrace>`. Parse failure
150//!   maps to `PySpyResult::Failed`.
151//! - **PS-5 (subprocess timeout):** `try_exec` bounds the py-spy
152//!   subprocess inside the worker to `MESH_ADMIN_PYSPY_TIMEOUT`
153//!   (default 10s). The budget is sized for `--native --native-all`
154//!   which unwinds native stacks via libunwind — significantly
155//!   slower than Python-only capture on loaded hosts. On expiry the
156//!   child is killed and reaped, and the worker returns
157//!   `Failed { stderr: "…timed out…" }`.
158//! - **PS-6 (bridge timeout):** The HTTP bridge uses a separate
159//!   `MESH_ADMIN_PYSPY_BRIDGE_TIMEOUT` (default 13s), which must
160//!   exceed `MESH_ADMIN_PYSPY_TIMEOUT` so the subprocess kill/reap
161//!   and reply can arrive before the bridge declares
162//!   `gateway_timeout`. Independent of
163//!   `MESH_ADMIN_SINGLE_HOST_TIMEOUT`.
164//! - **PS-7 (non-blocking delegation):** ProcAgent never awaits
165//!   py-spy execution inline. On `PySpyDump` it spawns a child
166//!   `PySpyWorker`, forwards the request, and returns immediately.
167//! - **PS-8 (worker lifecycle):** Each `PySpyWorker` handles
168//!   exactly one forwarded `RunPySpyDump`, replies directly to the
169//!   forwarded `OncePortRef`, then self-terminates via
170//!   `cx.stop()`. Clean exit, no supervision event.
171//! - **PS-9 (concurrent dumps):** py-spy is spawn-per-request, so
172//!   overlapping dumps on the same proc are allowed. Each worker
173//!   runs independently.
174//! - **PS-10 (nonblocking retry):** In nonblocking mode, `try_exec`
175//!   retries up to 3 times with 100ms backoff on failure, because
176//!   py-spy can segfault reading mutating process memory. All
177//!   attempts share a single deadline bounded by
178//!   `MESH_ADMIN_PYSPY_TIMEOUT` (PS-5).
179//! - **PS-11a (native-all-immediate-downgrade):** If py-spy rejects
180//!   `--native-all` with the recognized unsupported-flag signature
181//!   (exit code 2, stderr mentions `--native-all`), `try_exec`
182//!   retries immediately with `native_all = false` in the same outer
183//!   attempt.
184//! - **PS-11b (native-all-no-retry-consumption):** That downgrade
185//!   retry does not consume an outer nonblocking retry slot (PS-10)
186//!   and does not incur the 100ms inter-attempt backoff.
187//! - **PS-11c (native-all-downgrade-warning):** A successful
188//!   downgraded result includes the warning `"--native-all
189//!   unsupported by this py-spy; fell back to --native"`.
190//! - **PS-11d (native-all-failure-passthrough):** If the downgraded
191//!   retry also fails, the failure flows through the normal
192//!   nonblocking retry logic (PS-10) unchanged.
193//! - **PS-11e (native-all-sticky-downgrade):** Once the
194//!   unsupported-flag signature is detected,
195//!   `effective_opts.native_all` remains `false` for all subsequent
196//!   outer retries. The flag is not re-tested on later attempts.
197//! - **PS-12 (universal py-spy):** Worker procs and the service
198//!   proc can handle `PySpyDump`. Worker procs handle it via
199//!   ProcAgent; the service proc handles it via HostAgent (same
200//!   spawn-worker pattern). `pyspy_bridge` routes by proc name:
201//!   if `proc_id.base_name() == SERVICE_PROC_NAME`, the target
202//!   is `host_agent`; otherwise `proc_agent[0]`. Procs lacking
203//!   either agent (e.g. mesh-admin) fast-fail via PS-13.
204//! - **PS-13 (defensive probe):** Before sending `PySpyDump`,
205//!   `pyspy_bridge` probes the selected actor with an introspect
206//!   query bounded by `MESH_ADMIN_QUERY_CHILD_TIMEOUT` (default
207//!   100ms). Three outcomes: (a) probe reply arrives — proceed
208//!   with `PySpyDump`; (b) probe times out or recv closes —
209//!   return `not_found` (actor absent/unreachable); (c) probe
210//!   send itself fails — return `internal_error` (bridge-side
211//!   infrastructure failure). Cases (b) and (c) fast-fail
212//!   instead of waiting the full 13s
213//!   `MESH_ADMIN_PYSPY_BRIDGE_TIMEOUT`.
214//! - **PS-14 (reachability-based capability):** A proc supports
215//!   py-spy iff its stable handler actor is reachable: the
216//!   service proc requires a reachable `host_agent`; non-service
217//!   procs require a reachable `proc_agent[0]`. `PySpyWorker` is
218//!   transient per-request machinery (spawned on `PySpyDump`,
219//!   stopped after replying) and is not part of the reachability
220//!   contract.
221//!
222//! v1 contract notes:
223//! - The current py-spy bridge expects a ProcAddr-form reference and
224//!   rejects other forms as `bad_request`. This may be broadened in
225//!   future versions.
226//! - If `worker.send()` fails after the reply port has moved into
227//!   `RunPySpyDump`, the caller receives no explicit
228//!   `PySpyResult::Failed` — they observe a timeout.
229//!   `MailboxSenderError` does not carry the unsent message, so the
230//!   port is irrecoverable on this path.
231//! - **Contract change (D96756537 follow-up):** `PySpyResult::Ok`
232//!   replaced `stack: String` (raw py-spy text) with
233//!   `stack_traces: Vec<PySpyStackTrace>` (structured JSON) and
234//!   added `warnings: Vec<String>`. Clients reading the old `stack`
235//!   field will see it absent; they must migrate to `stack_traces`.
236//!
237//! ## py-spy profiling (PP-*)
238//!
239//! Profile capture (`py-spy record`) is a separate contract from
240//! dump (`py-spy dump`). Types, messages, workers, and routes are
241//! independent — no shared state, no shared timeout budget.
242//!
243//! - **PP-1 (input validation):** `duration_s` (u32) must be
244//!   non-zero and at most `MESH_ADMIN_PYSPY_MAX_PROFILE_DURATION`.
245//!   `rate_hz` must be 1..1000. Violations → 400 before any
246//!   actor messaging.
247//! - **PP-2 (dynamic timeout cascade):** Subprocess timeout =
248//!   `duration_s + 15s`. Bridge timeout = subprocess + 5s.
249//!   Computed per-request from validated opts, not static config.
250//! - **PP-3 (temp file lifecycle):** `py-spy record` writes to a
251//!   temp file; the worker reads it after successful exit and
252//!   deletes via tempfile drop. On failure or timeout, stderr is
253//!   captured. On timeout, the child is explicitly killed and
254//!   reaped via `start_kill()` + `wait().await`. If the file is
255//!   missing, empty, or unreadable after successful exit, the
256//!   result is `OutputMissing`, `OutputEmpty`, or
257//!   `OutputReadFailure`, not `Ok`.
258//! - **PP-4 (target locality):** Inherits PS-1 — always targets
259//!   `std::process::id()`, never a caller-supplied PID.
260//! - **PP-5 (separate worker):** `PySpyProfileWorker` is a
261//!   distinct actor from `PySpyWorker`. Profile durations block
262//!   for seconds to minutes; isolation prevents starving dumps.
263//! - **PP-6 (wire projection):** `ProfileExecOutcome` maps to
264//!   `PySpyProfileResult` 1:1 via `From`. Every internal variant
265//!   has an identically-named wire variant. The only shape change
266//!   is `TimedOut.timeout: Duration` → `TimedOut.timeout_s: u64`.
267//!
268//! ## Mesh-admin config (MA-*)
269//!
270//! - **MA-C1 (timeout config centralization):** Mesh-admin timeout
271//!   budgets are read from config attrs at call-time, with defaults
272//!   in `config.rs`. No hardcoded timeout constants in
273//!   `mesh_admin.rs`.
274
275pub mod dto;
276
277use hyperactor_config::Attrs;
278use hyperactor_config::INTROSPECT;
279use hyperactor_config::IntrospectAttr;
280use hyperactor_config::declare_attrs;
281
282// See MK-1, MK-2, IA-1..IA-5 in module doc.
283declare_attrs! {
284    /// Topology role of this node: "root", "host", "proc", "error".
285    @meta(INTROSPECT = IntrospectAttr {
286        name: "node_type".into(),
287        desc: "Topology role: root, host, proc, error".into(),
288    })
289    pub attr NODE_TYPE: String;
290
291    /// Host network address (e.g. "10.0.0.1:8080").
292    @meta(INTROSPECT = IntrospectAttr {
293        name: "addr".into(),
294        desc: "Host network address".into(),
295    })
296    pub attr ADDR: String;
297
298    /// Number of procs on a host.
299    @meta(INTROSPECT = IntrospectAttr {
300        name: "num_procs".into(),
301        desc: "Number of procs on a host".into(),
302    })
303    pub attr NUM_PROCS: usize = 0;
304
305    /// Human-readable proc name.
306    @meta(INTROSPECT = IntrospectAttr {
307        name: "proc_name".into(),
308        desc: "Human-readable proc name".into(),
309    })
310    pub attr PROC_NAME: String;
311
312    /// Number of actors in a proc.
313    @meta(INTROSPECT = IntrospectAttr {
314        name: "num_actors".into(),
315        desc: "Number of actors in a proc".into(),
316    })
317    pub attr NUM_ACTORS: usize = 0;
318
319    /// References of system/infrastructure children.
320    @meta(INTROSPECT = IntrospectAttr {
321        name: "system_children".into(),
322        desc: "References of system/infrastructure children".into(),
323    })
324    pub attr SYSTEM_CHILDREN: Vec<NodeRef>;
325
326    /// References of stopped children (proc only).
327    @meta(INTROSPECT = IntrospectAttr {
328        name: "stopped_children".into(),
329        desc: "References of stopped children".into(),
330    })
331    pub attr STOPPED_CHILDREN: Vec<NodeRef>;
332
333    /// Cap on stopped children retention.
334    @meta(INTROSPECT = IntrospectAttr {
335        name: "stopped_retention_cap".into(),
336        desc: "Maximum number of stopped children retained".into(),
337    })
338    pub attr STOPPED_RETENTION_CAP: usize = 0;
339
340    /// Whether this proc is refusing new spawns due to actor
341    /// failures.
342    @meta(INTROSPECT = IntrospectAttr {
343        name: "is_poisoned".into(),
344        desc: "Whether this proc is poisoned (refusing new spawns)".into(),
345    })
346    pub attr IS_POISONED: bool = false;
347
348    /// Count of failed actors in a proc.
349    @meta(INTROSPECT = IntrospectAttr {
350        name: "failed_actor_count".into(),
351        desc: "Number of failed actors in this proc".into(),
352    })
353    pub attr FAILED_ACTOR_COUNT: usize = 0;
354
355    /// Timestamp when the mesh was started.
356    @meta(INTROSPECT = IntrospectAttr {
357        name: "started_at".into(),
358        desc: "Timestamp when the mesh was started".into(),
359    })
360    pub attr STARTED_AT: std::time::SystemTime;
361
362    /// Username who started the mesh.
363    @meta(INTROSPECT = IntrospectAttr {
364        name: "started_by".into(),
365        desc: "Username who started the mesh".into(),
366    })
367    pub attr STARTED_BY: String;
368
369    /// Number of hosts in the mesh (root only).
370    @meta(INTROSPECT = IntrospectAttr {
371        name: "num_hosts".into(),
372        desc: "Number of hosts in the mesh".into(),
373    })
374    pub attr NUM_HOSTS: usize = 0;
375
376    // ── Proc debug stats (PD-*) ──────────────────────────────
377
378    /// RSS of the hosting OS process (bytes). `None` means the
379    /// measurement was unavailable (for example non-Linux or procfs
380    /// read/parse failure); values are never fabricated (PD-2).
381    @meta(INTROSPECT = IntrospectAttr {
382        name: "process_rss_bytes".into(),
383        desc: "RSS of the hosting OS process (bytes)".into(),
384    })
385    pub attr PROCESS_RSS_BYTES: Option<u64>;
386
387    /// Virtual memory size of the hosting OS process (bytes). `None`
388    /// means the measurement was unavailable (for example non-Linux
389    /// or procfs read/parse failure); values are never fabricated
390    /// (PD-2).
391    @meta(INTROSPECT = IntrospectAttr {
392        name: "process_vm_size_bytes".into(),
393        desc: "Virtual memory size of the hosting OS process (bytes)".into(),
394    })
395    pub attr PROCESS_VM_SIZE_BYTES: Option<u64>;
396
397    /// Sum of per-actor message queue depths across live actors.
398    @meta(INTROSPECT = IntrospectAttr {
399        name: "actor_work_queue_depth_total".into(),
400        desc: "Sum of per-actor message queue depths (live actors only)".into(),
401    })
402    pub attr ACTOR_WORK_QUEUE_DEPTH_TOTAL: u64 = 0;
403
404    /// Maximum current per-actor message queue depth across live
405    /// actors at publish time. This is not a historical high-water
406    /// mark.
407    @meta(INTROSPECT = IntrospectAttr {
408        name: "actor_work_queue_depth_max".into(),
409        desc: "Maximum per-actor message queue depth (live actors only)".into(),
410    })
411    pub attr ACTOR_WORK_QUEUE_DEPTH_MAX: u64 = 0;
412
413    /// Maximum proc-wide queue depth observed since startup (PD-6).
414    /// Eventually consistent — concurrent readers may transiently
415    /// observe total > high_water_mark. Retained evidence — driven
416    /// from the runtime accounting path, not publish-time sampling.
417    @meta(INTROSPECT = IntrospectAttr {
418        name: "actor_work_queue_depth_high_water_mark".into(),
419        desc: "Maximum proc-wide queue depth since startup (eventually consistent)".into(),
420    })
421    pub attr ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK: u64 = 0;
422
423    /// How long ago proc-wide queue depth was last observed non-zero
424    /// (PD-7). `None` means no counted actor work has traversed the
425    /// queue accounting path since startup. Uses wall clock, so the
426    /// age is best-effort telemetry and may not be strictly monotonic.
427    /// Retained evidence — driven from the runtime accounting path.
428    @meta(INTROSPECT = IntrospectAttr {
429        name: "last_nonzero_queue_depth_age_ms".into(),
430        desc: "Milliseconds since proc-wide queue depth was last observed non-zero (wall clock)".into(),
431    })
432    pub attr LAST_NONZERO_QUEUE_DEPTH_AGE_MS: Option<u64>;
433
434}
435
436use hyperactor::introspect::AttrsViewError;
437
438/// Typed view over attrs for a root node.
439#[derive(Debug, Clone, PartialEq)]
440pub struct RootAttrsView {
441    pub num_hosts: usize,
442    pub started_at: SystemTime,
443    pub started_by: String,
444    pub system_children: Vec<NodeRef>,
445}
446
447impl RootAttrsView {
448    /// Decode from an `Attrs` bag (AV-2, AV-3). Requires
449    /// `STARTED_AT` and `STARTED_BY`; `NUM_HOSTS` defaults to 0,
450    /// `SYSTEM_CHILDREN` defaults to empty.
451    pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
452        let num_hosts = *attrs.get(NUM_HOSTS).unwrap_or(&0);
453        let started_at = *attrs
454            .get(STARTED_AT)
455            .ok_or_else(|| AttrsViewError::missing("started_at"))?;
456        let started_by = attrs
457            .get(STARTED_BY)
458            .ok_or_else(|| AttrsViewError::missing("started_by"))?
459            .clone();
460        let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
461        Ok(Self {
462            num_hosts,
463            started_at,
464            started_by,
465            system_children,
466        })
467    }
468
469    /// Encode into an `Attrs` bag (AV-1 round-trip producer).
470    pub fn to_attrs(&self) -> Attrs {
471        let mut attrs = Attrs::new();
472        attrs.set(NODE_TYPE, "root".to_string());
473        attrs.set(NUM_HOSTS, self.num_hosts);
474        attrs.set(STARTED_AT, self.started_at);
475        attrs.set(STARTED_BY, self.started_by.clone());
476        attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
477        attrs
478    }
479}
480
481/// Memory stats of the hosting OS process. Shared by host and
482/// proc introspection surfaces — both agents are authoritative
483/// for the OS process they run in.
484///
485/// In the common one-proc-per-process deployment these read like
486/// "proc memory". In multi-proc-per-process setups, co-hosted procs
487/// report the same hosting-process values.
488#[derive(
489    Debug,
490    Clone,
491    Copy,
492    PartialEq,
493    Eq,
494    Default,
495    Serialize,
496    Deserialize,
497    Named
498)]
499pub struct ProcessMemoryStats {
500    /// RSS of the hosting OS process (bytes). `None` on non-Linux
501    /// or read failure (PD-2).
502    pub process_rss_bytes: Option<u64>,
503    /// Virtual memory size of the hosting OS process (bytes).
504    /// `None` on non-Linux or read failure (PD-2).
505    pub process_vm_size_bytes: Option<u64>,
506}
507
508impl ProcessMemoryStats {
509    /// Read the hosting OS process memory stats from procfs.
510    /// Returns `ProcessMemoryStats` with `None` fields on non-Linux
511    /// or any read/parse failure (PD-2: never fabricated).
512    pub fn read_from_procfs() -> Self {
513        let (rss, vm) = read_procfs_memory();
514        Self {
515            process_rss_bytes: rss,
516            process_vm_size_bytes: vm,
517        }
518    }
519
520    pub fn from_attrs(attrs: &Attrs) -> Self {
521        Self {
522            process_rss_bytes: attrs.get(PROCESS_RSS_BYTES).copied().flatten(),
523            process_vm_size_bytes: attrs.get(PROCESS_VM_SIZE_BYTES).copied().flatten(),
524        }
525    }
526
527    pub fn to_attrs(&self, attrs: &mut Attrs) {
528        attrs.set(PROCESS_RSS_BYTES, self.process_rss_bytes);
529        attrs.set(PROCESS_VM_SIZE_BYTES, self.process_vm_size_bytes);
530    }
531}
532
533/// Read RSS and VM size from `/proc/self/statm`.
534///
535/// `statm` field 0 is total program size (virtual memory) in pages;
536/// field 1 is resident set size in pages. This is sufficient for the
537/// Stage 1 operator signal and avoids parsing a larger procfs file.
538/// Returns `(Some(rss_bytes), Some(vm_bytes))` on success and `(None,
539/// None)` on any failure.
540#[cfg(target_os = "linux")]
541fn read_procfs_memory() -> (Option<u64>, Option<u64>) {
542    // SAFETY: sysconf(_SC_PAGESIZE) is a read-only query with no
543    // preconditions. It returns the system page size or -1 on error.
544    let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
545    if page_size <= 0 {
546        return (None, None);
547    }
548    let page_size = page_size as u64;
549    // Sync I/O is intentional even though callers may invoke this from
550    // async contexts. `/proc/self/statm` is in the O(1) procfs tier —
551    // the kernel formats values from `mm_struct` atomic counters
552    // maintained on the page-fault and exit paths, with no page-table
553    // walk; typical wall time is a few microseconds. Dispatching via
554    // `tokio::fs::read_to_string` would cost more than the read
555    // itself, and this call cannot block on real disk I/O.
556    match std::fs::read_to_string("/proc/self/statm") {
557        Ok(contents) => {
558            let mut fields = contents.split_whitespace();
559            let vm_pages: Option<u64> = fields.next().and_then(|s| s.parse().ok());
560            let rss_pages: Option<u64> = fields.next().and_then(|s| s.parse().ok());
561            (
562                rss_pages.map(|p| p * page_size),
563                vm_pages.map(|p| p * page_size),
564            )
565        }
566        Err(_) => (None, None),
567    }
568}
569
570#[cfg(not(target_os = "linux"))]
571fn read_procfs_memory() -> (Option<u64>, Option<u64>) {
572    (None, None)
573}
574
575/// Proc-level debug/operational stats. Groups hosting-process memory
576/// (process-scoped) and actor queue pressure (proc-scoped) into one
577/// operational summary.
578///
579/// This asymmetry is intentional: memory belongs to the hosting OS
580/// process, while queue pressure is aggregated over live actors in
581/// this Monarch proc only.
582///
583/// Queue depth is an **instantaneous snapshot** at publish time, not
584/// a historical watermark or backlog accumulator. It reflects
585/// currently queued work that has not yet been received by the
586/// actor's run loop. Transient bursts that drain between publishes
587/// will not be observed.
588#[derive(
589    Debug,
590    Clone,
591    Copy,
592    PartialEq,
593    Eq,
594    Default,
595    Serialize,
596    Deserialize,
597    Named
598)]
599pub struct ProcDebugStats {
600    /// Hosting-process memory (shared type with host surface).
601    pub memory: ProcessMemoryStats,
602    /// Sum of per-actor message queue depths across live actors in
603    /// this proc (PD-4: live actors only).
604    pub actor_work_queue_depth_total: u64,
605    /// Maximum current per-actor message queue depth across live
606    /// actors in this proc at publish time. Not a historical
607    /// high-water mark.
608    pub actor_work_queue_depth_max: u64,
609    /// Maximum proc-wide queue depth observed since startup (PD-6).
610    /// Eventually consistent — see PD-6 docs. Retained — driven
611    /// from the runtime accounting path.
612    pub actor_work_queue_depth_high_water_mark: u64,
613    /// How long ago proc-wide queue depth was last observed non-zero
614    /// (PD-7). `None` means never. Wall clock — see PD-9 docs.
615    /// Retained — driven from the runtime accounting path.
616    pub last_nonzero_queue_depth_age_ms: Option<u64>,
617}
618
619impl ProcDebugStats {
620    pub fn from_attrs(attrs: &Attrs) -> Self {
621        let total = attrs
622            .get(ACTOR_WORK_QUEUE_DEPTH_TOTAL)
623            .copied()
624            .unwrap_or(0);
625        let max = attrs.get(ACTOR_WORK_QUEUE_DEPTH_MAX).copied().unwrap_or(0);
626        // PD-1: max <= total.
627        if max > total {
628            tracing::warn!(
629                "PD-1 violation: actor_work_queue_depth_max ({}) > total ({})",
630                max,
631                total,
632            );
633        }
634        let high_water = attrs
635            .get(ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK)
636            .copied()
637            .unwrap_or(0);
638        // PD-6: high_water_mark >= total eventually, but concurrent
639        // readers may transiently see total > high_water_mark (a
640        // sampling artifact, not an accounting error).
641        let last_nonzero = attrs
642            .get(LAST_NONZERO_QUEUE_DEPTH_AGE_MS)
643            .copied()
644            .flatten();
645        Self {
646            memory: ProcessMemoryStats::from_attrs(attrs),
647            actor_work_queue_depth_total: total,
648            actor_work_queue_depth_max: max,
649            actor_work_queue_depth_high_water_mark: high_water,
650            last_nonzero_queue_depth_age_ms: last_nonzero,
651        }
652    }
653
654    pub fn to_attrs(&self, attrs: &mut Attrs) {
655        self.memory.to_attrs(attrs);
656        attrs.set(
657            ACTOR_WORK_QUEUE_DEPTH_TOTAL,
658            self.actor_work_queue_depth_total,
659        );
660        attrs.set(ACTOR_WORK_QUEUE_DEPTH_MAX, self.actor_work_queue_depth_max);
661        attrs.set(
662            ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK,
663            self.actor_work_queue_depth_high_water_mark,
664        );
665        attrs.set(
666            LAST_NONZERO_QUEUE_DEPTH_AGE_MS,
667            self.last_nonzero_queue_depth_age_ms,
668        );
669    }
670}
671
672/// Typed view over attrs for a host node.
673#[derive(Debug, Clone, PartialEq)]
674pub struct HostAttrsView {
675    pub addr: String,
676    pub num_procs: usize,
677    pub system_children: Vec<NodeRef>,
678    /// Hosting-process memory stats.
679    pub memory: ProcessMemoryStats,
680}
681
682impl HostAttrsView {
683    /// Decode from an `Attrs` bag (AV-2, AV-3). Requires `ADDR`;
684    /// `NUM_PROCS` defaults to 0, `SYSTEM_CHILDREN` defaults to
685    /// empty.
686    pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
687        let addr = attrs
688            .get(ADDR)
689            .ok_or_else(|| AttrsViewError::missing("addr"))?
690            .clone();
691        let num_procs = *attrs.get(NUM_PROCS).unwrap_or(&0);
692        let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
693        let memory = ProcessMemoryStats::from_attrs(attrs);
694        Ok(Self {
695            addr,
696            num_procs,
697            system_children,
698            memory,
699        })
700    }
701
702    /// Encode into an `Attrs` bag (AV-1 round-trip producer).
703    pub fn to_attrs(&self) -> Attrs {
704        let mut attrs = Attrs::new();
705        attrs.set(NODE_TYPE, "host".to_string());
706        attrs.set(ADDR, self.addr.clone());
707        attrs.set(NUM_PROCS, self.num_procs);
708        attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
709        self.memory.to_attrs(&mut attrs);
710        attrs
711    }
712}
713
714/// Typed view over attrs for a proc node.
715#[derive(Debug, Clone, PartialEq)]
716pub struct ProcAttrsView {
717    pub proc_name: String,
718    pub num_actors: usize,
719    pub system_children: Vec<NodeRef>,
720    pub stopped_children: Vec<NodeRef>,
721    pub stopped_retention_cap: usize,
722    pub is_poisoned: bool,
723    pub failed_actor_count: usize,
724    /// Runtime debug/operational stats (PD-*).
725    pub debug: ProcDebugStats,
726}
727
728impl ProcAttrsView {
729    /// Decode from an `Attrs` bag (AV-2, AV-3). Requires
730    /// `PROC_NAME`; remaining fields have defaults. Checks FI-5
731    /// coherence.
732    pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
733        let proc_name = attrs
734            .get(PROC_NAME)
735            .ok_or_else(|| AttrsViewError::missing("proc_name"))?
736            .clone();
737        let num_actors = *attrs.get(NUM_ACTORS).unwrap_or(&0);
738        let system_children = attrs.get(SYSTEM_CHILDREN).cloned().unwrap_or_default();
739        let stopped_children = attrs.get(STOPPED_CHILDREN).cloned().unwrap_or_default();
740        let stopped_retention_cap = *attrs.get(STOPPED_RETENTION_CAP).unwrap_or(&0);
741        let is_poisoned = *attrs.get(IS_POISONED).unwrap_or(&false);
742        let failed_actor_count = *attrs.get(FAILED_ACTOR_COUNT).unwrap_or(&0);
743
744        // FI-5: is_poisoned iff failed_actor_count > 0.
745        if is_poisoned != (failed_actor_count > 0) {
746            return Err(AttrsViewError::invariant(
747                "FI-5",
748                format!("is_poisoned={is_poisoned} but failed_actor_count={failed_actor_count}"),
749            ));
750        }
751
752        let debug = ProcDebugStats::from_attrs(attrs);
753
754        Ok(Self {
755            proc_name,
756            num_actors,
757            system_children,
758            stopped_children,
759            stopped_retention_cap,
760            is_poisoned,
761            failed_actor_count,
762            debug,
763        })
764    }
765
766    /// Encode into an `Attrs` bag (AV-1 round-trip producer).
767    pub fn to_attrs(&self) -> Attrs {
768        let mut attrs = Attrs::new();
769        attrs.set(NODE_TYPE, "proc".to_string());
770        attrs.set(PROC_NAME, self.proc_name.clone());
771        attrs.set(NUM_ACTORS, self.num_actors);
772        attrs.set(SYSTEM_CHILDREN, self.system_children.clone());
773        attrs.set(STOPPED_CHILDREN, self.stopped_children.clone());
774        attrs.set(STOPPED_RETENTION_CAP, self.stopped_retention_cap);
775        attrs.set(IS_POISONED, self.is_poisoned);
776        attrs.set(FAILED_ACTOR_COUNT, self.failed_actor_count);
777        self.debug.to_attrs(&mut attrs);
778        attrs
779    }
780}
781
782/// Typed view over attrs for an error node.
783#[derive(Debug, Clone, PartialEq)]
784pub struct ErrorAttrsView {
785    pub code: String,
786    pub message: String,
787}
788
789impl ErrorAttrsView {
790    /// Decode from an `Attrs` bag (AV-2, AV-3). Requires
791    /// `ERROR_CODE`; `ERROR_MESSAGE` defaults to empty.
792    pub fn from_attrs(attrs: &Attrs) -> Result<Self, AttrsViewError> {
793        use hyperactor::introspect::ERROR_CODE;
794        use hyperactor::introspect::ERROR_MESSAGE;
795
796        let code = attrs
797            .get(ERROR_CODE)
798            .ok_or_else(|| AttrsViewError::missing("error_code"))?
799            .clone();
800        let message = attrs.get(ERROR_MESSAGE).cloned().unwrap_or_default();
801        Ok(Self { code, message })
802    }
803
804    /// Encode into an `Attrs` bag (AV-1 round-trip producer).
805    pub fn to_attrs(&self) -> Attrs {
806        use hyperactor::introspect::ERROR_CODE;
807        use hyperactor::introspect::ERROR_MESSAGE;
808
809        let mut attrs = Attrs::new();
810        attrs.set(ERROR_CODE, self.code.clone());
811        attrs.set(ERROR_MESSAGE, self.message.clone());
812        attrs
813    }
814}
815
816// --- API / presentation types ---
817
818use std::fmt;
819use std::str::FromStr;
820use std::time::SystemTime;
821
822use serde::Deserialize;
823use serde::Serialize;
824use typeuri::Named;
825
826/// Typed reference to a node in the mesh-admin navigation tree.
827///
828/// Extends `IntrospectRef` with mesh-only concepts (`Root`, `Host`).
829/// hyperactor does not know about these variants.
830#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
831pub enum NodeRef {
832    /// Synthetic mesh root node.
833    /// Serializes as lowercase `"root"` to match the HTTP path convention.
834    #[serde(rename = "root")]
835    Root,
836    /// A host in the mesh, identified by its `HostAgent` actor ID.
837    Host(hyperactor::ActorAddr),
838    /// A proc running on a host.
839    Proc(hyperactor::ProcAddr),
840    /// An actor instance within a proc.
841    Actor(hyperactor::ActorAddr),
842}
843
844hyperactor_config::impl_attrvalue!(NodeRef);
845
846impl fmt::Display for NodeRef {
847    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
848        match self {
849            Self::Root => write!(f, "root"),
850            Self::Host(id) => write!(f, "host:{}", id),
851            Self::Proc(id) => fmt::Display::fmt(id, f),
852            Self::Actor(id) => fmt::Display::fmt(id, f),
853        }
854    }
855}
856
857/// Error parsing a `NodeRef` from a string.
858#[derive(Debug, thiserror::Error)]
859pub enum NodeRefParseError {
860    #[error("empty reference string")]
861    Empty,
862    #[error("invalid host reference: {0}")]
863    InvalidHost(hyperactor::AddrParseError),
864    #[error("port references are not valid node references")]
865    PortNotAllowed,
866    #[error(transparent)]
867    Reference(#[from] hyperactor::AddrParseError),
868}
869
870impl FromStr for NodeRef {
871    type Err = NodeRefParseError;
872
873    fn from_str(s: &str) -> Result<Self, Self::Err> {
874        if s.is_empty() {
875            return Err(NodeRefParseError::Empty);
876        }
877        if s == "root" {
878            return Ok(Self::Root);
879        }
880        if let Some(rest) = s.strip_prefix("host:") {
881            let actor_id: hyperactor::ActorAddr =
882                rest.parse().map_err(NodeRefParseError::InvalidHost)?;
883            return Ok(Self::Host(actor_id));
884        }
885        let r: hyperactor::Addr = s.parse()?;
886        match r {
887            hyperactor::Addr::Proc(id) => Ok(Self::Proc(id)),
888            hyperactor::Addr::Actor(id) => Ok(Self::Actor(id)),
889            hyperactor::Addr::Port(_) => Err(NodeRefParseError::PortNotAllowed),
890        }
891    }
892}
893
894impl From<hyperactor::introspect::IntrospectRef> for NodeRef {
895    fn from(r: hyperactor::introspect::IntrospectRef) -> Self {
896        match r {
897            hyperactor::introspect::IntrospectRef::Proc(id) => Self::Proc(id),
898            hyperactor::introspect::IntrospectRef::Actor(id) => Self::Actor(id),
899        }
900    }
901}
902
903/// Uniform response for any node in the mesh topology.
904///
905/// Every addressable entity (root, host, proc, actor) is represented
906/// as a `NodePayload`. The client navigates the mesh by fetching a
907/// node and following its `children` references.
908///
909/// See IA-1..IA-5 in module doc.
910// Serialize/Deserialize required by wirevalue::register_type! and
911// ResolveReferenceResponse actor messaging. HTTP serialization uses
912// dto::NodePayloadDto, not these derives.
913#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
914pub struct NodePayload {
915    /// Canonical node reference identifying this node.
916    pub identity: NodeRef,
917    /// Node-specific metadata (type, status, metrics, etc.).
918    pub properties: NodeProperties,
919    /// Child node references for downward navigation.
920    pub children: Vec<NodeRef>,
921    /// Parent node reference for upward navigation.
922    pub parent: Option<NodeRef>,
923    /// When this payload was captured.
924    pub as_of: SystemTime,
925}
926wirevalue::register_type!(NodePayload);
927
928/// Node-specific metadata. Externally-tagged enum — the variant
929/// name is the discriminator (Root, Host, Proc, Actor, Error).
930// Serialize/Deserialize required by wirevalue::register_type! and
931// ResolveReferenceResponse actor messaging. HTTP serialization uses
932// dto::NodePropertiesDto, not these derives.
933#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
934pub enum NodeProperties {
935    /// Synthetic mesh root node (not a real actor/proc).
936    Root {
937        num_hosts: usize,
938        started_at: SystemTime,
939        started_by: String,
940        system_children: Vec<NodeRef>,
941    },
942    /// A host in the mesh, represented by its `HostAgent`.
943    Host {
944        addr: String,
945        num_procs: usize,
946        system_children: Vec<NodeRef>,
947        memory: ProcessMemoryStats,
948    },
949    /// Properties describing a proc running on a host.
950    Proc {
951        proc_name: String,
952        num_actors: usize,
953        system_children: Vec<NodeRef>,
954        stopped_children: Vec<NodeRef>,
955        stopped_retention_cap: usize,
956        is_poisoned: bool,
957        failed_actor_count: usize,
958        debug: ProcDebugStats,
959    },
960    /// Runtime metadata for a single actor instance.
961    Actor {
962        actor_status: String,
963        actor_type: String,
964        messages_processed: u64,
965        created_at: Option<SystemTime>,
966        last_message_handler: Option<String>,
967        total_processing_time_us: u64,
968        flight_recorder: Option<String>,
969        is_system: bool,
970        failure_info: Option<FailureInfo>,
971    },
972    /// Error sentinel returned when a child reference cannot be resolved.
973    Error { code: String, message: String },
974}
975wirevalue::register_type!(NodeProperties);
976
977/// Structured failure information for failed actors.
978// Serialize/Deserialize required by wirevalue::register_type! and
979// ResolveReferenceResponse actor messaging. HTTP serialization uses
980// dto::FailureInfoDto, not these derives.
981#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
982pub struct FailureInfo {
983    /// Error message describing the failure.
984    pub error_message: String,
985    /// Actor that caused the failure (root cause).
986    pub root_cause_actor: hyperactor::ActorAddr,
987    /// Display name of the root-cause actor, if available.
988    pub root_cause_name: Option<String>,
989    /// When the failure occurred.
990    pub occurred_at: SystemTime,
991    /// Whether this failure was propagated from a child.
992    pub is_propagated: bool,
993}
994wirevalue::register_type!(FailureInfo);
995
996/// Mesh-layer conversion from a typed attrs view to `NodeProperties`.
997///
998/// Defined here so that `hyperactor` views (e.g. `ActorAttrsView`) can
999/// produce `NodeProperties` without depending on the mesh crate.
1000trait IntoNodeProperties {
1001    fn into_node_properties(self) -> NodeProperties;
1002}
1003
1004impl IntoNodeProperties for RootAttrsView {
1005    fn into_node_properties(self) -> NodeProperties {
1006        NodeProperties::Root {
1007            num_hosts: self.num_hosts,
1008            started_at: self.started_at,
1009            started_by: self.started_by,
1010            system_children: self.system_children,
1011        }
1012    }
1013}
1014
1015impl IntoNodeProperties for HostAttrsView {
1016    fn into_node_properties(self) -> NodeProperties {
1017        NodeProperties::Host {
1018            addr: self.addr,
1019            num_procs: self.num_procs,
1020            system_children: self.system_children,
1021            memory: self.memory,
1022        }
1023    }
1024}
1025
1026impl IntoNodeProperties for ProcAttrsView {
1027    fn into_node_properties(self) -> NodeProperties {
1028        NodeProperties::Proc {
1029            proc_name: self.proc_name,
1030            num_actors: self.num_actors,
1031            system_children: self.system_children,
1032            stopped_children: self.stopped_children,
1033            stopped_retention_cap: self.stopped_retention_cap,
1034            is_poisoned: self.is_poisoned,
1035            failed_actor_count: self.failed_actor_count,
1036            debug: self.debug,
1037        }
1038    }
1039}
1040
1041impl IntoNodeProperties for ErrorAttrsView {
1042    fn into_node_properties(self) -> NodeProperties {
1043        NodeProperties::Error {
1044            code: self.code,
1045            message: self.message,
1046        }
1047    }
1048}
1049
1050impl IntoNodeProperties for hyperactor::introspect::ActorAttrsView {
1051    fn into_node_properties(self) -> NodeProperties {
1052        let actor_status = match &self.status_reason {
1053            Some(reason) => format!("{}: {}", self.status, reason),
1054            None => self.status.clone(),
1055        };
1056
1057        let failure_info = self.failure.map(|fi| FailureInfo {
1058            error_message: fi.error_message,
1059            root_cause_actor: fi.root_cause_actor,
1060            root_cause_name: fi.root_cause_name,
1061            occurred_at: fi.occurred_at,
1062            is_propagated: fi.is_propagated,
1063        });
1064
1065        NodeProperties::Actor {
1066            actor_status,
1067            actor_type: self.actor_type,
1068            messages_processed: self.messages_processed,
1069            created_at: self.created_at,
1070            last_message_handler: self.last_handler,
1071            total_processing_time_us: self.total_processing_time_us,
1072            flight_recorder: self.flight_recorder,
1073            is_system: self.is_system,
1074            failure_info,
1075        }
1076    }
1077}
1078
1079/// Derive `NodeProperties` from a JSON-serialized attrs string.
1080///
1081/// Detection precedence (DP-1, DP-3):
1082/// 1. `node_type` = "root" / "host" / "proc" → corresponding variant
1083/// 2. `error_code` present → Error
1084/// 3. `STATUS` key present → Actor
1085/// 4. none of the above → Error("unknown_node_type")
1086///
1087/// DP-2 / DP-4: this function is total — malformed attrs never
1088/// panic; view decode failures map to `NodeProperties::Error`
1089/// with a `malformed_*` code.
1090/// AV-3 / IA-6: view decoders ignore unknown keys.
1091pub fn derive_properties(attrs_json: &str) -> NodeProperties {
1092    let attrs: Attrs = match serde_json::from_str(attrs_json) {
1093        Ok(a) => a,
1094        Err(_) => {
1095            return NodeProperties::Error {
1096                code: "parse_error".into(),
1097                message: "failed to parse attrs JSON".into(),
1098            };
1099        }
1100    };
1101
1102    let node_type = attrs.get(NODE_TYPE).cloned().unwrap_or_default();
1103
1104    match node_type.as_str() {
1105        "root" => match RootAttrsView::from_attrs(&attrs) {
1106            Ok(v) => v.into_node_properties(),
1107            Err(e) => NodeProperties::Error {
1108                code: "malformed_root".into(),
1109                message: e.to_string(),
1110            },
1111        },
1112        "host" => match HostAttrsView::from_attrs(&attrs) {
1113            Ok(v) => v.into_node_properties(),
1114            Err(e) => NodeProperties::Error {
1115                code: "malformed_host".into(),
1116                message: e.to_string(),
1117            },
1118        },
1119        "proc" => match ProcAttrsView::from_attrs(&attrs) {
1120            Ok(v) => v.into_node_properties(),
1121            Err(e) => NodeProperties::Error {
1122                code: "malformed_proc".into(),
1123                message: e.to_string(),
1124            },
1125        },
1126        _ => {
1127            // DP-1: error_code → Error, STATUS present → Actor,
1128            // else → Error("unknown_node_type").
1129            use hyperactor::introspect::ERROR_CODE;
1130            use hyperactor::introspect::STATUS;
1131
1132            if attrs.get(ERROR_CODE).is_some() {
1133                return match ErrorAttrsView::from_attrs(&attrs) {
1134                    Ok(v) => v.into_node_properties(),
1135                    Err(e) => NodeProperties::Error {
1136                        code: "malformed_error".into(),
1137                        message: e.to_string(),
1138                    },
1139                };
1140            }
1141
1142            if attrs.get(STATUS).is_none() {
1143                return NodeProperties::Error {
1144                    code: "unknown_node_type".into(),
1145                    message: format!("unrecognized node_type: {:?}", node_type),
1146                };
1147            }
1148
1149            match hyperactor::introspect::ActorAttrsView::from_attrs(&attrs) {
1150                Ok(v) => v.into_node_properties(),
1151                Err(e) => NodeProperties::Error {
1152                    code: "malformed_actor".into(),
1153                    message: e.to_string(),
1154                },
1155            }
1156        }
1157    }
1158}
1159
1160/// Convert an `IntrospectResult` to a presentation `NodePayload`.
1161/// Lifts `IntrospectRef` → `NodeRef` and passes through typed timestamps.
1162pub fn to_node_payload(result: hyperactor::introspect::IntrospectResult) -> NodePayload {
1163    NodePayload {
1164        identity: result.identity.into(),
1165        properties: derive_properties(&result.attrs),
1166        children: result.children.into_iter().map(NodeRef::from).collect(),
1167        parent: result.parent.map(NodeRef::from),
1168        as_of: result.as_of,
1169    }
1170}
1171
1172/// Convert an `IntrospectResult` to a `NodePayload`, overriding
1173/// identity and parent for correct tree navigation.
1174pub fn to_node_payload_with(
1175    result: hyperactor::introspect::IntrospectResult,
1176    identity: NodeRef,
1177    parent: Option<NodeRef>,
1178) -> NodePayload {
1179    NodePayload {
1180        identity,
1181        properties: derive_properties(&result.attrs),
1182        children: result.children.into_iter().map(NodeRef::from).collect(),
1183        parent,
1184        as_of: result.as_of,
1185    }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190    use super::*;
1191    use crate::mesh_id::ResourceId;
1192
1193    /// Enforces MK-1 (metadata completeness) for all mesh-topology
1194    /// introspection keys.
1195    #[test]
1196    fn test_mesh_introspect_keys_are_tagged() {
1197        let cases = vec![
1198            ("node_type", NODE_TYPE.attrs()),
1199            ("addr", ADDR.attrs()),
1200            ("num_procs", NUM_PROCS.attrs()),
1201            ("proc_name", PROC_NAME.attrs()),
1202            ("num_actors", NUM_ACTORS.attrs()),
1203            ("system_children", SYSTEM_CHILDREN.attrs()),
1204            ("stopped_children", STOPPED_CHILDREN.attrs()),
1205            ("stopped_retention_cap", STOPPED_RETENTION_CAP.attrs()),
1206            ("is_poisoned", IS_POISONED.attrs()),
1207            ("failed_actor_count", FAILED_ACTOR_COUNT.attrs()),
1208            ("started_at", STARTED_AT.attrs()),
1209            ("started_by", STARTED_BY.attrs()),
1210            ("num_hosts", NUM_HOSTS.attrs()),
1211            // PD-* proc debug stats keys.
1212            ("process_rss_bytes", PROCESS_RSS_BYTES.attrs()),
1213            ("process_vm_size_bytes", PROCESS_VM_SIZE_BYTES.attrs()),
1214            (
1215                "actor_work_queue_depth_total",
1216                ACTOR_WORK_QUEUE_DEPTH_TOTAL.attrs(),
1217            ),
1218            (
1219                "actor_work_queue_depth_max",
1220                ACTOR_WORK_QUEUE_DEPTH_MAX.attrs(),
1221            ),
1222            (
1223                "actor_work_queue_depth_high_water_mark",
1224                ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK.attrs(),
1225            ),
1226            (
1227                "last_nonzero_queue_depth_age_ms",
1228                LAST_NONZERO_QUEUE_DEPTH_AGE_MS.attrs(),
1229            ),
1230        ];
1231
1232        for (expected_name, meta) in &cases {
1233            // MK-1: every key must have INTROSPECT with non-empty
1234            // name and desc.
1235            let introspect = meta
1236                .get(INTROSPECT)
1237                .unwrap_or_else(|| panic!("{expected_name}: missing INTROSPECT meta-attr"));
1238            assert_eq!(
1239                introspect.name, *expected_name,
1240                "short name mismatch for {expected_name}"
1241            );
1242            assert!(
1243                !introspect.desc.is_empty(),
1244                "{expected_name}: desc should not be empty"
1245            );
1246        }
1247
1248        // Exhaustiveness: verify cases covers all INTROSPECT-tagged
1249        // keys declared in this module.
1250        use hyperactor_config::attrs::AttrKeyInfo;
1251        let registry_count = inventory::iter::<AttrKeyInfo>()
1252            .filter(|info| {
1253                info.name.starts_with("hyperactor_mesh::introspect::")
1254                    && info.meta.get(INTROSPECT).is_some()
1255            })
1256            .count();
1257        assert_eq!(
1258            cases.len(),
1259            registry_count,
1260            "test must cover all INTROSPECT-tagged keys in this module"
1261        );
1262    }
1263
1264    fn test_actor_ref(proc_name: &str, actor_name: &str) -> NodeRef {
1265        use hyperactor::channel::ChannelAddr;
1266
1267        NodeRef::Actor(
1268            ResourceId::proc_addr_from_name(ChannelAddr::Local(0), proc_name)
1269                .actor_addr(actor_name),
1270        )
1271    }
1272
1273    fn root_view() -> RootAttrsView {
1274        RootAttrsView {
1275            num_hosts: 3,
1276            started_at: std::time::UNIX_EPOCH,
1277            started_by: "testuser".into(),
1278            system_children: vec![test_actor_ref("proc", "child1")],
1279        }
1280    }
1281
1282    fn host_view() -> HostAttrsView {
1283        HostAttrsView {
1284            addr: "10.0.0.1:8080".into(),
1285            num_procs: 2,
1286            system_children: vec![test_actor_ref("proc", "sys")],
1287            memory: Default::default(),
1288        }
1289    }
1290
1291    fn proc_view() -> ProcAttrsView {
1292        ProcAttrsView {
1293            proc_name: "worker".into(),
1294            num_actors: 5,
1295            system_children: vec![],
1296            stopped_children: vec![test_actor_ref("proc", "old")],
1297            stopped_retention_cap: 10,
1298            is_poisoned: false,
1299            failed_actor_count: 0,
1300            debug: Default::default(),
1301        }
1302    }
1303
1304    fn error_view() -> ErrorAttrsView {
1305        ErrorAttrsView {
1306            code: "not_found".into(),
1307            message: "child not found".into(),
1308        }
1309    }
1310
1311    /// AV-1: from_attrs(to_attrs(v)) == v.
1312    #[test]
1313    fn test_root_view_round_trip() {
1314        let view = root_view();
1315        let rt = RootAttrsView::from_attrs(&view.to_attrs()).unwrap();
1316        assert_eq!(rt, view);
1317    }
1318
1319    /// AV-1.
1320    #[test]
1321    fn test_host_view_round_trip() {
1322        let view = host_view();
1323        let rt = HostAttrsView::from_attrs(&view.to_attrs()).unwrap();
1324        assert_eq!(rt, view);
1325    }
1326
1327    /// AV-1.
1328    #[test]
1329    fn test_proc_view_round_trip() {
1330        let view = proc_view();
1331        let rt = ProcAttrsView::from_attrs(&view.to_attrs()).unwrap();
1332        assert_eq!(rt, view);
1333    }
1334
1335    /// AV-1: host view with non-default memory round-trips.
1336    #[test]
1337    fn test_host_view_round_trip_with_memory() {
1338        let view = HostAttrsView {
1339            addr: "10.0.0.1:8080".into(),
1340            num_procs: 2,
1341            system_children: vec![],
1342            memory: ProcessMemoryStats {
1343                process_rss_bytes: Some(512 * 1024 * 1024),
1344                process_vm_size_bytes: Some(2 * 1024 * 1024 * 1024),
1345            },
1346        };
1347        let rt = HostAttrsView::from_attrs(&view.to_attrs()).unwrap();
1348        assert_eq!(rt, view);
1349    }
1350
1351    /// AV-1: proc view with non-default debug stats round-trips.
1352    #[test]
1353    fn test_proc_view_round_trip_with_debug() {
1354        let view = ProcAttrsView {
1355            proc_name: "worker".into(),
1356            num_actors: 5,
1357            system_children: vec![],
1358            stopped_children: vec![],
1359            stopped_retention_cap: 10,
1360            is_poisoned: false,
1361            failed_actor_count: 0,
1362            debug: ProcDebugStats {
1363                memory: ProcessMemoryStats {
1364                    process_rss_bytes: Some(256 * 1024 * 1024),
1365                    process_vm_size_bytes: Some(1024 * 1024 * 1024),
1366                },
1367                actor_work_queue_depth_total: 42,
1368                actor_work_queue_depth_max: 7,
1369                actor_work_queue_depth_high_water_mark: 100,
1370                last_nonzero_queue_depth_age_ms: Some(5000),
1371            },
1372        };
1373        let rt = ProcAttrsView::from_attrs(&view.to_attrs()).unwrap();
1374        assert_eq!(rt, view);
1375    }
1376
1377    /// PD-1: max <= total enforced (warning logged, no error).
1378    #[test]
1379    fn test_proc_debug_stats_pd1_warning_on_violation() {
1380        let mut attrs = Attrs::new();
1381        attrs.set(PROC_NAME, "test".to_string());
1382        attrs.set(ACTOR_WORK_QUEUE_DEPTH_TOTAL, 5u64);
1383        attrs.set(ACTOR_WORK_QUEUE_DEPTH_MAX, 10u64); // violation
1384        // Should not error, but should log warning.
1385        let view = ProcAttrsView::from_attrs(&attrs).unwrap();
1386        assert_eq!(view.debug.actor_work_queue_depth_total, 5);
1387        assert_eq!(view.debug.actor_work_queue_depth_max, 10);
1388    }
1389
1390    /// PD-3: missing debug attrs default to zero/None.
1391    #[test]
1392    fn test_proc_debug_stats_defaults_on_missing_attrs() {
1393        let mut attrs = Attrs::new();
1394        attrs.set(PROC_NAME, "old_proc".to_string());
1395        let view = ProcAttrsView::from_attrs(&attrs).unwrap();
1396        assert_eq!(view.debug, ProcDebugStats::default());
1397    }
1398
1399    /// AV-1.
1400    #[test]
1401    fn test_error_view_round_trip() {
1402        let view = error_view();
1403        let rt = ErrorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1404        assert_eq!(rt, view);
1405    }
1406
1407    /// AV-2: missing required key rejected.
1408    #[test]
1409    fn test_root_view_missing_started_at() {
1410        let mut attrs = Attrs::new();
1411        attrs.set(NODE_TYPE, "root".into());
1412        attrs.set(STARTED_BY, "user".into());
1413        let err = RootAttrsView::from_attrs(&attrs).unwrap_err();
1414        assert_eq!(err, AttrsViewError::MissingKey { key: "started_at" });
1415    }
1416
1417    /// AV-2.
1418    #[test]
1419    fn test_root_view_missing_started_by() {
1420        let mut attrs = Attrs::new();
1421        attrs.set(NODE_TYPE, "root".into());
1422        attrs.set(STARTED_AT, std::time::UNIX_EPOCH);
1423        let err = RootAttrsView::from_attrs(&attrs).unwrap_err();
1424        assert_eq!(err, AttrsViewError::MissingKey { key: "started_by" });
1425    }
1426
1427    /// AV-2.
1428    #[test]
1429    fn test_host_view_missing_addr() {
1430        let attrs = Attrs::new();
1431        let err = HostAttrsView::from_attrs(&attrs).unwrap_err();
1432        assert_eq!(err, AttrsViewError::MissingKey { key: "addr" });
1433    }
1434
1435    /// AV-2.
1436    #[test]
1437    fn test_proc_view_missing_proc_name() {
1438        let attrs = Attrs::new();
1439        let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1440        assert_eq!(err, AttrsViewError::MissingKey { key: "proc_name" });
1441    }
1442
1443    /// FI-5: poisoned without failures rejected.
1444    #[test]
1445    fn test_proc_view_fi5_poisoned_but_no_failures() {
1446        let mut attrs = Attrs::new();
1447        attrs.set(PROC_NAME, "bad".into());
1448        attrs.set(IS_POISONED, true);
1449        attrs.set(FAILED_ACTOR_COUNT, 0usize);
1450        let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1451        assert!(matches!(
1452            err,
1453            AttrsViewError::InvariantViolation { label: "FI-5", .. }
1454        ));
1455    }
1456
1457    /// FI-5: failures without poisoned rejected.
1458    #[test]
1459    fn test_proc_view_fi5_failures_but_not_poisoned() {
1460        let mut attrs = Attrs::new();
1461        attrs.set(PROC_NAME, "bad".into());
1462        attrs.set(IS_POISONED, false);
1463        attrs.set(FAILED_ACTOR_COUNT, 2usize);
1464        let err = ProcAttrsView::from_attrs(&attrs).unwrap_err();
1465        assert!(matches!(
1466            err,
1467            AttrsViewError::InvariantViolation { label: "FI-5", .. }
1468        ));
1469    }
1470
1471    /// DP-2 / DP-4: unparseable JSON → Error.
1472    #[test]
1473    fn test_derive_properties_unparseable_json() {
1474        let props = derive_properties("not json");
1475        assert!(matches!(props, NodeProperties::Error { code, .. } if code == "parse_error"));
1476    }
1477
1478    /// DP-3: unknown node_type → Error.
1479    #[test]
1480    fn test_derive_properties_unknown_node_type() {
1481        let attrs = Attrs::new();
1482        let json = serde_json::to_string(&attrs).unwrap();
1483        let props = derive_properties(&json);
1484        assert!(matches!(props, NodeProperties::Error { code, .. } if code == "unknown_node_type"));
1485    }
1486
1487    /// DP-4: view decode failure → malformed_* Error.
1488    #[test]
1489    fn test_derive_properties_malformed_root() {
1490        let mut attrs = Attrs::new();
1491        attrs.set(NODE_TYPE, "root".into());
1492        let json = serde_json::to_string(&attrs).unwrap();
1493        let props = derive_properties(&json);
1494        assert!(matches!(props, NodeProperties::Error { code, .. } if code == "malformed_root"));
1495    }
1496
1497    /// DP-4: invariant violation → malformed_* Error.
1498    #[test]
1499    fn test_derive_properties_malformed_proc_fi5() {
1500        let mut attrs = Attrs::new();
1501        attrs.set(NODE_TYPE, "proc".into());
1502        attrs.set(PROC_NAME, "bad".into());
1503        attrs.set(IS_POISONED, true);
1504        attrs.set(FAILED_ACTOR_COUNT, 0usize);
1505        let json = serde_json::to_string(&attrs).unwrap();
1506        let props = derive_properties(&json);
1507        assert!(matches!(props, NodeProperties::Error { code, .. } if code == "malformed_proc"));
1508    }
1509
1510    /// DP-3: node_type "root" → Root variant.
1511    #[test]
1512    fn test_derive_properties_valid_root() {
1513        let view = root_view();
1514        let json = serde_json::to_string(&view.to_attrs()).unwrap();
1515        let props = derive_properties(&json);
1516        assert!(matches!(props, NodeProperties::Root { num_hosts: 3, .. }));
1517    }
1518
1519    /// DP-3: node_type "host" → Host variant.
1520    #[test]
1521    fn test_derive_properties_valid_host() {
1522        let view = host_view();
1523        let json = serde_json::to_string(&view.to_attrs()).unwrap();
1524        let props = derive_properties(&json);
1525        assert!(matches!(props, NodeProperties::Host { num_procs: 2, .. }));
1526    }
1527
1528    /// DP-3: node_type "proc" → Proc variant.
1529    #[test]
1530    fn test_derive_properties_valid_proc() {
1531        let view = proc_view();
1532        let json = serde_json::to_string(&view.to_attrs()).unwrap();
1533        let props = derive_properties(&json);
1534        assert!(matches!(props, NodeProperties::Proc { num_actors: 5, .. }));
1535    }
1536
1537    /// DP-3: error_code present → Error variant.
1538    #[test]
1539    fn test_derive_properties_valid_error() {
1540        let view = error_view();
1541        let json = serde_json::to_string(&view.to_attrs()).unwrap();
1542        let props = derive_properties(&json);
1543        assert!(matches!(props, NodeProperties::Error { .. }));
1544        if let NodeProperties::Error { code, message } = props {
1545            assert_eq!(code, "not_found");
1546            assert_eq!(message, "child not found");
1547        }
1548    }
1549
1550    /// DP-3: STATUS present → Actor variant.
1551    #[test]
1552    fn test_derive_properties_valid_actor() {
1553        use hyperactor::introspect::ACTOR_TYPE;
1554        use hyperactor::introspect::MESSAGES_PROCESSED;
1555        use hyperactor::introspect::STATUS;
1556
1557        let mut attrs = Attrs::new();
1558        attrs.set(STATUS, "running".into());
1559        attrs.set(ACTOR_TYPE, "TestActor".into());
1560        attrs.set(MESSAGES_PROCESSED, 7u64);
1561        let json = serde_json::to_string(&attrs).unwrap();
1562        let props = derive_properties(&json);
1563        assert!(matches!(
1564            props,
1565            NodeProperties::Actor {
1566                messages_processed: 7,
1567                ..
1568            }
1569        ));
1570    }
1571
1572    /// Injects an unknown key into serialized attrs JSON and
1573    /// verifies that derive_properties still decodes successfully.
1574    /// Exercises IA-6 (open-row-forward-compat) for each view.
1575    fn inject_unknown_key(attrs: &Attrs) -> String {
1576        let mut map: serde_json::Map<String, serde_json::Value> =
1577            serde_json::from_str(&serde_json::to_string(attrs).unwrap()).unwrap();
1578        map.insert(
1579            "unknown_future_key".into(),
1580            serde_json::Value::String("surprise".into()),
1581        );
1582        serde_json::to_string(&map).unwrap()
1583    }
1584
1585    #[test]
1586    fn test_ia6_root_ignores_unknown_keys() {
1587        let json = inject_unknown_key(&root_view().to_attrs());
1588        let props = derive_properties(&json);
1589        assert!(matches!(props, NodeProperties::Root { num_hosts: 3, .. }));
1590    }
1591
1592    #[test]
1593    fn test_ia6_host_ignores_unknown_keys() {
1594        let json = inject_unknown_key(&host_view().to_attrs());
1595        let props = derive_properties(&json);
1596        assert!(matches!(props, NodeProperties::Host { num_procs: 2, .. }));
1597    }
1598
1599    #[test]
1600    fn test_ia6_proc_ignores_unknown_keys() {
1601        let json = inject_unknown_key(&proc_view().to_attrs());
1602        let props = derive_properties(&json);
1603        assert!(matches!(props, NodeProperties::Proc { num_actors: 5, .. }));
1604    }
1605
1606    #[test]
1607    fn test_ia6_error_ignores_unknown_keys() {
1608        let json = inject_unknown_key(&error_view().to_attrs());
1609        let props = derive_properties(&json);
1610        assert!(matches!(props, NodeProperties::Error { .. }));
1611    }
1612
1613    #[test]
1614    fn test_ia6_actor_ignores_unknown_keys() {
1615        use hyperactor::introspect::ACTOR_TYPE;
1616        use hyperactor::introspect::STATUS;
1617
1618        let mut attrs = Attrs::new();
1619        attrs.set(STATUS, "running".into());
1620        attrs.set(ACTOR_TYPE, "TestActor".into());
1621        let json = inject_unknown_key(&attrs);
1622        let props = derive_properties(&json);
1623        assert!(matches!(props, NodeProperties::Actor { .. }));
1624    }
1625
1626    /// SC-1 / SC-2: schema is derived from types and matches the
1627    /// checked-in snapshot.
1628    ///
1629    /// To update after intentional type changes:
1630    /// ```sh
1631    /// buck run fbcode//monarch/hyperactor_mesh:generate_api_artifacts \
1632    ///   @fbcode//mode/dev-nosan -- \
1633    ///   fbcode/monarch/hyperactor_mesh/src/testdata
1634    /// ```
1635    /// Strip the `$comment` field (containing the @\u{200B}generated marker)
1636    /// from a JSON value so snapshot comparisons ignore it.
1637    fn strip_comment(mut value: serde_json::Value) -> serde_json::Value {
1638        if let Some(obj) = value.as_object_mut() {
1639            obj.remove("$comment");
1640        }
1641        value
1642    }
1643
1644    #[test]
1645    fn test_node_payload_schema_snapshot() {
1646        let schema = schemars::schema_for!(dto::NodePayloadDto);
1647        let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1648        let expected: serde_json::Value = strip_comment(
1649            serde_json::from_str(include_str!("testdata/node_payload_schema.json"))
1650                .expect("snapshot must be valid JSON"),
1651        );
1652        assert_eq!(
1653            actual, expected,
1654            "schema changed — review and update snapshot if intentional"
1655        );
1656    }
1657
1658    /// SC-3: real payloads validate against the generated schema.
1659    #[test]
1660    fn test_payloads_validate_against_schema() {
1661        use hyperactor::channel::ChannelAddr;
1662
1663        let schema = schemars::schema_for!(dto::NodePayloadDto);
1664        let schema_value = serde_json::to_value(&schema).unwrap();
1665        let compiled = jsonschema::JSONSchema::compile(&schema_value).expect("schema must compile");
1666
1667        let epoch = std::time::UNIX_EPOCH;
1668        let proc_id = ResourceId::proc_addr_from_name(ChannelAddr::Local(0), "worker");
1669        let actor_id = proc_id.actor_addr("actor");
1670
1671        let samples = [
1672            NodePayload {
1673                identity: NodeRef::Root,
1674                properties: NodeProperties::Root {
1675                    num_hosts: 2,
1676                    started_at: epoch,
1677                    started_by: "testuser".into(),
1678                    system_children: vec![],
1679                },
1680                children: vec![NodeRef::Host(actor_id.clone())],
1681                parent: None,
1682                as_of: epoch,
1683            },
1684            NodePayload {
1685                identity: NodeRef::Host(actor_id.clone()),
1686                properties: NodeProperties::Host {
1687                    addr: "10.0.0.1:8080".into(),
1688                    num_procs: 2,
1689                    system_children: vec![test_actor_ref("proc", "sys")],
1690                    memory: Default::default(),
1691                },
1692                children: vec![NodeRef::Proc(proc_id.clone())],
1693                parent: Some(NodeRef::Root),
1694                as_of: epoch,
1695            },
1696            NodePayload {
1697                identity: NodeRef::Proc(proc_id.clone()),
1698                properties: NodeProperties::Proc {
1699                    proc_name: "worker".into(),
1700                    num_actors: 5,
1701                    system_children: vec![],
1702                    stopped_children: vec![],
1703                    stopped_retention_cap: 10,
1704                    is_poisoned: false,
1705                    failed_actor_count: 0,
1706                    debug: Default::default(),
1707                },
1708                children: vec![NodeRef::Actor(actor_id.clone())],
1709                parent: Some(NodeRef::Host(actor_id.clone())),
1710                as_of: epoch,
1711            },
1712            NodePayload {
1713                identity: NodeRef::Actor(actor_id.clone()),
1714                properties: NodeProperties::Actor {
1715                    actor_status: "running".into(),
1716                    actor_type: "MyActor".into(),
1717                    messages_processed: 42,
1718                    created_at: Some(epoch),
1719                    last_message_handler: Some("handle_ping".into()),
1720                    total_processing_time_us: 1000,
1721                    flight_recorder: None,
1722                    is_system: false,
1723                    failure_info: None,
1724                },
1725                children: vec![],
1726                parent: Some(NodeRef::Proc(proc_id.clone())),
1727                as_of: epoch,
1728            },
1729            NodePayload {
1730                identity: NodeRef::Actor(actor_id.clone()),
1731                properties: NodeProperties::Error {
1732                    code: "not_found".into(),
1733                    message: "child not found".into(),
1734                },
1735                children: vec![],
1736                parent: None,
1737                as_of: epoch,
1738            },
1739        ];
1740
1741        for (i, payload) in samples.iter().enumerate() {
1742            let dto = dto::NodePayloadDto::from(payload.clone());
1743            let value = serde_json::to_value(&dto).unwrap();
1744            assert!(
1745                compiled.is_valid(&value),
1746                "sample {i} failed schema validation"
1747            );
1748        }
1749    }
1750
1751    /// SC-4: `$id` is injected only at the serve boundary.
1752    /// Stripping `$id` from the served schema must yield the raw
1753    /// schemars output.
1754    #[test]
1755    fn test_served_schema_is_raw_plus_id() {
1756        let raw: serde_json::Value =
1757            serde_json::to_value(schemars::schema_for!(dto::NodePayloadDto)).unwrap();
1758
1759        // Simulate what the endpoint does.
1760        let mut served = raw.clone();
1761        served.as_object_mut().unwrap().insert(
1762            "$id".into(),
1763            serde_json::Value::String("https://monarch.meta.com/schemas/v1/node_payload".into()),
1764        );
1765
1766        // Strip $id — remainder must equal raw.
1767        let mut stripped = served;
1768        stripped.as_object_mut().unwrap().remove("$id");
1769        assert_eq!(raw, stripped, "served schema differs from raw beyond $id");
1770    }
1771
1772    /// SC-2: error envelope schema matches checked-in snapshot.
1773    #[test]
1774    fn test_error_schema_snapshot() {
1775        use crate::mesh_admin::ApiErrorEnvelope;
1776
1777        let schema = schemars::schema_for!(ApiErrorEnvelope);
1778        let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1779        let expected: serde_json::Value = strip_comment(
1780            serde_json::from_str(include_str!("testdata/error_schema.json"))
1781                .expect("error snapshot must be valid JSON"),
1782        );
1783        assert_eq!(
1784            actual, expected,
1785            "error schema changed — review and update snapshot if intentional"
1786        );
1787    }
1788
1789    /// SC-2: AdminInfo schema matches checked-in snapshot.
1790    #[test]
1791    fn test_admin_info_schema_snapshot() {
1792        use crate::mesh_admin::AdminInfo;
1793
1794        let schema = schemars::schema_for!(AdminInfo);
1795        let actual: serde_json::Value = serde_json::to_value(&schema).unwrap();
1796        let expected: serde_json::Value = strip_comment(
1797            serde_json::from_str(include_str!("testdata/admin_info_schema.json"))
1798                .expect("admin info snapshot must be valid JSON"),
1799        );
1800        assert_eq!(
1801            actual, expected,
1802            "AdminInfo schema changed — review and update snapshot if intentional"
1803        );
1804    }
1805
1806    /// SC-2: OpenAPI spec matches checked-in snapshot.
1807    #[test]
1808    fn test_openapi_spec_snapshot() {
1809        let actual = crate::mesh_admin::build_openapi_spec();
1810        let expected: serde_json::Value = strip_comment(
1811            serde_json::from_str(include_str!("testdata/openapi.json"))
1812                .expect("OpenAPI snapshot must be valid JSON"),
1813        );
1814        assert_eq!(
1815            actual, expected,
1816            "OpenAPI spec changed — review and update snapshot if intentional"
1817        );
1818    }
1819}