hyperactor_mesh/introspect/
dto.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! HTTP boundary DTO types for mesh-admin introspection.
10//!
11//! These types own the HTTP JSON wire contract. Domain types
12//! (`NodePayload`, `NodeProperties`, `FailureInfo`) stay clean of
13//! HTTP serialization concerns; conversion happens at the boundary
14//! via `From` / `TryFrom` impls defined here.
15//!
16//! ## Invariants
17//!
18//! - **HB-1 (typed-internal, string-external):** `NodeRef`, `ActorId`,
19//!   `ProcId`, and `SystemTime` are encoded as canonical strings in the
20//!   DTO types.
21//! - **HB-2 (round-trip):** `NodePayload → NodePayloadDto → NodePayload`
22//!   is lossless for values representable in the wire format.
23//!   Timestamps are formatted at millisecond precision
24//!   (`humantime::format_rfc3339_millis`), matching the established
25//!   HTTP contract; sub-millisecond precision is truncated at the
26//!   boundary.
27//! - **HB-3 (schema-honesty):** Schema/OpenAPI are generated from these
28//!   DTO types, so the published schema reflects the actual wire format.
29
30use std::time::SystemTime;
31
32use anyhow::Context;
33use schemars::JsonSchema;
34use serde::Deserialize;
35use serde::Serialize;
36
37use super::FailureInfo;
38use super::NodePayload;
39use super::NodeProperties;
40use super::NodeRef;
41
42// DTO struct definitions
43
44/// Uniform response for any node in the mesh topology.
45///
46/// Every addressable entity (root, host, proc, actor) is represented
47/// as a `NodePayload`. The client navigates the mesh by fetching a
48/// node and following its `children` references.
49///
50/// `identity`, `children`, and `parent` are plain reference strings.
51/// `as_of` is an ISO 8601 timestamp string.
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
53#[schemars(title = "NodePayload")]
54pub struct NodePayloadDto {
55    /// Canonical node reference identifying this node.
56    pub identity: String,
57    /// Node-specific metadata (type, status, metrics, etc.).
58    pub properties: NodePropertiesDto,
59    /// Child node reference strings the client can URL-encode and
60    /// fetch via `GET /v1/{reference}`.
61    pub children: Vec<String>,
62    /// Parent node reference for upward navigation.
63    pub parent: Option<String>,
64    /// When this payload was captured (ISO 8601 timestamp string).
65    pub as_of: String,
66}
67
68/// Node-specific metadata. Externally-tagged enum — the JSON
69/// key is the variant name (Root, Host, Proc, Actor, Error).
70#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
71#[schemars(rename = "NodeProperties")]
72pub enum NodePropertiesDto {
73    /// Synthetic mesh root node (not a real actor/proc).
74    Root {
75        num_hosts: usize,
76        started_at: String,
77        started_by: String,
78        system_children: Vec<String>,
79    },
80    /// A host in the mesh, represented by its `HostAgent`.
81    Host {
82        addr: String,
83        num_procs: usize,
84        system_children: Vec<String>,
85    },
86    /// Properties describing a proc running on a host.
87    Proc {
88        proc_name: String,
89        num_actors: usize,
90        system_children: Vec<String>,
91        stopped_children: Vec<String>,
92        stopped_retention_cap: usize,
93        is_poisoned: bool,
94        failed_actor_count: usize,
95    },
96    /// Runtime metadata for a single actor instance.
97    Actor {
98        actor_status: String,
99        actor_type: String,
100        messages_processed: u64,
101        created_at: Option<String>,
102        last_message_handler: Option<String>,
103        total_processing_time_us: u64,
104        flight_recorder: Option<String>,
105        is_system: bool,
106        failure_info: Option<FailureInfoDto>,
107    },
108    /// Error sentinel returned when a child reference cannot be resolved.
109    Error { code: String, message: String },
110}
111
112/// Structured failure information for failed actors.
113#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
114#[schemars(rename = "FailureInfo")]
115pub struct FailureInfoDto {
116    /// Error message describing the failure.
117    pub error_message: String,
118    /// Actor that caused the failure (root cause).
119    pub root_cause_actor: String,
120    /// Display name of the root-cause actor, if available.
121    pub root_cause_name: Option<String>,
122    /// When the failure occurred (ISO 8601 timestamp string).
123    pub occurred_at: String,
124    /// Whether this failure was propagated from a child.
125    pub is_propagated: bool,
126}
127
128// Helpers
129
130fn format_time(t: &SystemTime) -> String {
131    humantime::format_rfc3339_millis(*t).to_string()
132}
133
134fn refs_to_strings(refs: &[NodeRef]) -> Vec<String> {
135    refs.iter().map(|r| r.to_string()).collect()
136}
137
138fn parse_refs(field: &str, strings: &[String]) -> anyhow::Result<Vec<NodeRef>> {
139    strings
140        .iter()
141        .enumerate()
142        .map(|(i, s)| {
143            s.parse()
144                .with_context(|| format!("failed to parse {field}[{i}]: {s:?}"))
145        })
146        .collect()
147}
148
149// Domain → DTO conversions (infallible)
150
151impl From<NodePayload> for NodePayloadDto {
152    fn from(p: NodePayload) -> Self {
153        Self {
154            identity: p.identity.to_string(),
155            properties: p.properties.into(),
156            children: refs_to_strings(&p.children),
157            parent: p.parent.as_ref().map(|r| r.to_string()),
158            as_of: format_time(&p.as_of),
159        }
160    }
161}
162
163impl From<NodeProperties> for NodePropertiesDto {
164    fn from(p: NodeProperties) -> Self {
165        match p {
166            NodeProperties::Root {
167                num_hosts,
168                started_at,
169                started_by,
170                system_children,
171            } => Self::Root {
172                num_hosts,
173                started_at: format_time(&started_at),
174                started_by,
175                system_children: refs_to_strings(&system_children),
176            },
177            NodeProperties::Host {
178                addr,
179                num_procs,
180                system_children,
181            } => Self::Host {
182                addr,
183                num_procs,
184                system_children: refs_to_strings(&system_children),
185            },
186            NodeProperties::Proc {
187                proc_name,
188                num_actors,
189                system_children,
190                stopped_children,
191                stopped_retention_cap,
192                is_poisoned,
193                failed_actor_count,
194            } => Self::Proc {
195                proc_name,
196                num_actors,
197                system_children: refs_to_strings(&system_children),
198                stopped_children: refs_to_strings(&stopped_children),
199                stopped_retention_cap,
200                is_poisoned,
201                failed_actor_count,
202            },
203            NodeProperties::Actor {
204                actor_status,
205                actor_type,
206                messages_processed,
207                created_at,
208                last_message_handler,
209                total_processing_time_us,
210                flight_recorder,
211                is_system,
212                failure_info,
213            } => Self::Actor {
214                actor_status,
215                actor_type,
216                messages_processed,
217                created_at: created_at.as_ref().map(format_time),
218                last_message_handler,
219                total_processing_time_us,
220                flight_recorder,
221                is_system,
222                failure_info: failure_info.map(Into::into),
223            },
224            NodeProperties::Error { code, message } => Self::Error { code, message },
225        }
226    }
227}
228
229impl From<FailureInfo> for FailureInfoDto {
230    fn from(f: FailureInfo) -> Self {
231        Self {
232            error_message: f.error_message,
233            root_cause_actor: f.root_cause_actor.to_string(),
234            root_cause_name: f.root_cause_name,
235            occurred_at: format_time(&f.occurred_at),
236            is_propagated: f.is_propagated,
237        }
238    }
239}
240
241// DTO → Domain conversions (fallible)
242
243impl TryFrom<NodePayloadDto> for NodePayload {
244    type Error = anyhow::Error;
245
246    fn try_from(dto: NodePayloadDto) -> Result<Self, Self::Error> {
247        Ok(Self {
248            identity: dto
249                .identity
250                .parse()
251                .with_context(|| format!("failed to parse identity: {:?}", dto.identity))?,
252            properties: dto
253                .properties
254                .try_into()
255                .context("failed to parse properties")?,
256            children: parse_refs("children", &dto.children)?,
257            parent: dto
258                .parent
259                .map(|s| {
260                    s.parse()
261                        .with_context(|| format!("failed to parse parent: {s:?}"))
262                })
263                .transpose()?,
264            as_of: humantime::parse_rfc3339(&dto.as_of)
265                .with_context(|| format!("failed to parse as_of: {:?}", dto.as_of))?,
266        })
267    }
268}
269
270impl TryFrom<NodePropertiesDto> for NodeProperties {
271    type Error = anyhow::Error;
272
273    fn try_from(
274        dto: NodePropertiesDto,
275    ) -> Result<Self, <Self as TryFrom<NodePropertiesDto>>::Error> {
276        Ok(match dto {
277            NodePropertiesDto::Root {
278                num_hosts,
279                started_at,
280                started_by,
281                system_children,
282            } => Self::Root {
283                num_hosts,
284                started_at: humantime::parse_rfc3339(&started_at)
285                    .context("failed to parse Root.started_at")?,
286                started_by,
287                system_children: parse_refs("Root.system_children", &system_children)?,
288            },
289            NodePropertiesDto::Host {
290                addr,
291                num_procs,
292                system_children,
293            } => Self::Host {
294                addr,
295                num_procs,
296                system_children: parse_refs("Host.system_children", &system_children)?,
297            },
298            NodePropertiesDto::Proc {
299                proc_name,
300                num_actors,
301                system_children,
302                stopped_children,
303                stopped_retention_cap,
304                is_poisoned,
305                failed_actor_count,
306            } => Self::Proc {
307                proc_name,
308                num_actors,
309                system_children: parse_refs("Proc.system_children", &system_children)?,
310                stopped_children: parse_refs("Proc.stopped_children", &stopped_children)?,
311                stopped_retention_cap,
312                is_poisoned,
313                failed_actor_count,
314            },
315            NodePropertiesDto::Actor {
316                actor_status,
317                actor_type,
318                messages_processed,
319                created_at,
320                last_message_handler,
321                total_processing_time_us,
322                flight_recorder,
323                is_system,
324                failure_info,
325            } => Self::Actor {
326                actor_status,
327                actor_type,
328                messages_processed,
329                created_at: created_at
330                    .map(|s| {
331                        humantime::parse_rfc3339(&s)
332                            .with_context(|| format!("failed to parse Actor.created_at: {s:?}"))
333                    })
334                    .transpose()?,
335                last_message_handler,
336                total_processing_time_us,
337                flight_recorder,
338                is_system,
339                failure_info: failure_info
340                    .map(TryInto::try_into)
341                    .transpose()
342                    .context("failed to parse Actor.failure_info")?,
343            },
344            NodePropertiesDto::Error { code, message } => Self::Error { code, message },
345        })
346    }
347}
348
349impl TryFrom<FailureInfoDto> for FailureInfo {
350    type Error = anyhow::Error;
351
352    fn try_from(dto: FailureInfoDto) -> Result<Self, Self::Error> {
353        Ok(Self {
354            error_message: dto.error_message,
355            root_cause_actor: dto.root_cause_actor.parse().with_context(|| {
356                format!(
357                    "failed to parse FailureInfo.root_cause_actor: {:?}",
358                    dto.root_cause_actor
359                )
360            })?,
361            root_cause_name: dto.root_cause_name,
362            occurred_at: humantime::parse_rfc3339(&dto.occurred_at).with_context(|| {
363                format!(
364                    "failed to parse FailureInfo.occurred_at: {:?}",
365                    dto.occurred_at
366                )
367            })?,
368            is_propagated: dto.is_propagated,
369        })
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    // Test fixtures
378
379    fn test_proc_id() -> hyperactor::reference::ProcId {
380        hyperactor::reference::ProcId::with_name(
381            hyperactor::channel::ChannelAddr::Local(0),
382            "worker",
383        )
384    }
385
386    fn test_actor_id() -> hyperactor::reference::ActorId {
387        test_proc_id().actor_id("actor", 0)
388    }
389
390    fn test_host_actor_id() -> hyperactor::reference::ActorId {
391        test_proc_id().actor_id("host_agent", 0)
392    }
393
394    fn test_time() -> SystemTime {
395        humantime::parse_rfc3339("2025-01-15T10:30:00.123Z").unwrap()
396    }
397
398    fn test_time_2() -> SystemTime {
399        humantime::parse_rfc3339("2025-01-15T11:00:00.456Z").unwrap()
400    }
401
402    fn make_root_payload() -> NodePayload {
403        NodePayload {
404            identity: NodeRef::Root,
405            properties: NodeProperties::Root {
406                num_hosts: 2,
407                started_at: test_time(),
408                started_by: "test_user".to_string(),
409                system_children: vec![],
410            },
411            children: vec![NodeRef::Host(test_host_actor_id())],
412            parent: None,
413            as_of: test_time(),
414        }
415    }
416
417    fn make_host_payload() -> NodePayload {
418        NodePayload {
419            identity: NodeRef::Host(test_host_actor_id()),
420            properties: NodeProperties::Host {
421                addr: "127.0.0.1:8080".to_string(),
422                num_procs: 1,
423                system_children: vec![],
424            },
425            children: vec![NodeRef::Proc(test_proc_id())],
426            parent: Some(NodeRef::Root),
427            as_of: test_time(),
428        }
429    }
430
431    fn make_proc_payload() -> NodePayload {
432        NodePayload {
433            identity: NodeRef::Proc(test_proc_id()),
434            properties: NodeProperties::Proc {
435                proc_name: "worker".to_string(),
436                num_actors: 3,
437                system_children: vec![NodeRef::Actor(test_actor_id())],
438                stopped_children: vec![],
439                stopped_retention_cap: 100,
440                is_poisoned: false,
441                failed_actor_count: 0,
442            },
443            children: vec![NodeRef::Actor(test_actor_id())],
444            parent: Some(NodeRef::Host(test_host_actor_id())),
445            as_of: test_time(),
446        }
447    }
448
449    fn make_actor_payload_no_failure() -> NodePayload {
450        NodePayload {
451            identity: NodeRef::Actor(test_actor_id()),
452            properties: NodeProperties::Actor {
453                actor_status: "running".to_string(),
454                actor_type: "MyActor".to_string(),
455                messages_processed: 42,
456                created_at: Some(test_time()),
457                last_message_handler: Some("handle_msg".to_string()),
458                total_processing_time_us: 1500,
459                flight_recorder: None,
460                is_system: false,
461                failure_info: None,
462            },
463            children: vec![],
464            parent: Some(NodeRef::Proc(test_proc_id())),
465            as_of: test_time(),
466        }
467    }
468
469    fn make_actor_payload_with_failure() -> NodePayload {
470        NodePayload {
471            identity: NodeRef::Actor(test_actor_id()),
472            properties: NodeProperties::Actor {
473                actor_status: "failed".to_string(),
474                actor_type: "MyActor".to_string(),
475                messages_processed: 10,
476                created_at: Some(test_time()),
477                last_message_handler: None,
478                total_processing_time_us: 500,
479                flight_recorder: Some("trace-abc".to_string()),
480                is_system: true,
481                failure_info: Some(FailureInfo {
482                    error_message: "boom".to_string(),
483                    root_cause_actor: test_actor_id(),
484                    root_cause_name: Some("root_actor".to_string()),
485                    occurred_at: test_time_2(),
486                    is_propagated: true,
487                }),
488            },
489            children: vec![],
490            parent: Some(NodeRef::Proc(test_proc_id())),
491            as_of: test_time(),
492        }
493    }
494
495    fn make_actor_payload_minimal() -> NodePayload {
496        NodePayload {
497            identity: NodeRef::Actor(test_actor_id()),
498            properties: NodeProperties::Actor {
499                actor_status: "idle".to_string(),
500                actor_type: "MinimalActor".to_string(),
501                messages_processed: 0,
502                created_at: None,
503                last_message_handler: None,
504                total_processing_time_us: 0,
505                flight_recorder: None,
506                is_system: false,
507                failure_info: None,
508            },
509            children: vec![],
510            parent: Some(NodeRef::Proc(test_proc_id())),
511            as_of: test_time(),
512        }
513    }
514
515    fn make_error_payload() -> NodePayload {
516        NodePayload {
517            identity: NodeRef::Actor(test_actor_id()),
518            properties: NodeProperties::Error {
519                code: "not_found".to_string(),
520                message: "actor not found".to_string(),
521            },
522            children: vec![],
523            parent: None,
524            as_of: test_time(),
525        }
526    }
527
528    // HB-2 (round-trip): NodePayload → NodePayloadDto → NodePayload is
529    // lossless for values representable in the wire format.
530
531    fn assert_round_trip(payload: &NodePayload) {
532        let dto: NodePayloadDto = payload.clone().into();
533        let back: NodePayload = dto.try_into().expect("round-trip conversion");
534        assert_eq!(payload, &back);
535    }
536
537    /// HB-2: Root variant round-trips.
538    #[test]
539    fn test_round_trip_root() {
540        assert_round_trip(&make_root_payload());
541    }
542
543    /// HB-2: Host variant round-trips.
544    #[test]
545    fn test_round_trip_host() {
546        assert_round_trip(&make_host_payload());
547    }
548
549    /// HB-2: Proc variant round-trips.
550    #[test]
551    fn test_round_trip_proc() {
552        assert_round_trip(&make_proc_payload());
553    }
554
555    /// HB-2: Actor variant without failure round-trips.
556    #[test]
557    fn test_round_trip_actor_no_failure() {
558        assert_round_trip(&make_actor_payload_no_failure());
559    }
560
561    /// HB-2: Actor variant with failure round-trips.
562    #[test]
563    fn test_round_trip_actor_with_failure() {
564        assert_round_trip(&make_actor_payload_with_failure());
565    }
566
567    /// HB-2: Actor variant with all optional fields absent round-trips.
568    #[test]
569    fn test_round_trip_actor_minimal() {
570        assert_round_trip(&make_actor_payload_minimal());
571    }
572
573    /// HB-2: Error variant round-trips.
574    #[test]
575    fn test_round_trip_error() {
576        assert_round_trip(&make_error_payload());
577    }
578
579    // HB-1 (typed-internal, string-external): typed Rust values serialize
580    // as canonical strings in the DTO JSON output.
581
582    /// HB-1: Root identity, children, parent, and timestamps serialize
583    /// as strings; externally-tagged enum key is "Root".
584    #[test]
585    fn test_json_shape_root() {
586        let dto: NodePayloadDto = make_root_payload().into();
587        let json = serde_json::to_value(&dto).unwrap();
588
589        assert_eq!(json["identity"], "root");
590        assert!(json["parent"].is_null());
591        assert_eq!(json["as_of"], "2025-01-15T10:30:00.123Z");
592
593        let children = json["children"].as_array().unwrap();
594        assert_eq!(children.len(), 1);
595        assert_eq!(children[0], format!("host:{}", test_host_actor_id()));
596
597        let root = &json["properties"]["Root"];
598        assert_eq!(root["num_hosts"], 2);
599        assert_eq!(root["started_at"], "2025-01-15T10:30:00.123Z");
600        assert_eq!(root["started_by"], "test_user");
601        assert!(root["system_children"].as_array().unwrap().is_empty());
602    }
603
604    /// HB-1: Actor variant with failure — ActorId, SystemTime, and
605    /// nested FailureInfo fields all serialize as strings.
606    #[test]
607    fn test_json_shape_actor_with_failure() {
608        let dto: NodePayloadDto = make_actor_payload_with_failure().into();
609        let json = serde_json::to_value(&dto).unwrap();
610
611        assert_eq!(json["identity"], test_actor_id().to_string());
612        assert_eq!(json["parent"], test_proc_id().to_string());
613
614        let actor = &json["properties"]["Actor"];
615        assert_eq!(actor["actor_status"], "failed");
616        assert_eq!(actor["messages_processed"], 10);
617        assert_eq!(actor["created_at"], "2025-01-15T10:30:00.123Z");
618        assert!(actor["last_message_handler"].is_null());
619        assert_eq!(actor["flight_recorder"], "trace-abc");
620        assert_eq!(actor["is_system"], true);
621
622        let fi = &actor["failure_info"];
623        assert_eq!(fi["error_message"], "boom");
624        assert_eq!(fi["root_cause_actor"], test_actor_id().to_string());
625        assert_eq!(fi["root_cause_name"], "root_actor");
626        assert_eq!(fi["occurred_at"], "2025-01-15T11:00:00.456Z");
627        assert_eq!(fi["is_propagated"], true);
628    }
629
630    /// HB-1: Option fields serialize as JSON null when absent.
631    #[test]
632    fn test_json_shape_optional_none_fields() {
633        let dto: NodePayloadDto = make_actor_payload_minimal().into();
634        let json = serde_json::to_value(&dto).unwrap();
635
636        let actor = &json["properties"]["Actor"];
637        assert!(actor["created_at"].is_null());
638        assert!(actor["last_message_handler"].is_null());
639        assert!(actor["flight_recorder"].is_null());
640        assert!(actor["failure_info"].is_null());
641    }
642
643    /// HB-1: Error variant preserves code/message as plain strings.
644    #[test]
645    fn test_json_shape_error() {
646        let dto: NodePayloadDto = make_error_payload().into();
647        let json = serde_json::to_value(&dto).unwrap();
648
649        let err = &json["properties"]["Error"];
650        assert_eq!(err["code"], "not_found");
651        assert_eq!(err["message"], "actor not found");
652    }
653
654    /// HB-1: Empty children vec serializes as `[]`.
655    #[test]
656    fn test_json_shape_empty_children() {
657        let dto: NodePayloadDto = make_actor_payload_no_failure().into();
658        let json = serde_json::to_value(&dto).unwrap();
659        assert!(json["children"].as_array().unwrap().is_empty());
660    }
661
662    // HB-3 (schema-honesty): published schema reflects the actual wire
663    // format. The schemars(rename/title) attributes must produce $defs
664    // keys and title matching the domain type names, not the Dto suffixes.
665
666    /// HB-3: $defs keys are "NodeProperties" and "FailureInfo", not
667    /// "NodePropertiesDto" / "FailureInfoDto".
668    #[test]
669    fn test_schema_defs_keys() {
670        let schema = schemars::schema_for!(NodePayloadDto);
671        let json = serde_json::to_value(&schema).unwrap();
672        let defs = json["$defs"].as_object().unwrap();
673        assert!(
674            defs.contains_key("NodeProperties"),
675            "$defs must contain 'NodeProperties', got: {:?}",
676            defs.keys().collect::<Vec<_>>()
677        );
678        assert!(
679            defs.contains_key("FailureInfo"),
680            "$defs must contain 'FailureInfo', got: {:?}",
681            defs.keys().collect::<Vec<_>>()
682        );
683    }
684
685    /// HB-3: Top-level schema title is "NodePayload", not
686    /// "NodePayloadDto".
687    #[test]
688    fn test_schema_title() {
689        let schema = schemars::schema_for!(NodePayloadDto);
690        let json = serde_json::to_value(&schema).unwrap();
691        assert_eq!(json["title"], "NodePayload");
692    }
693}