1use std::time::SystemTime;
31
32use anyhow::Context;
33use schemars::JsonSchema;
34use serde::Deserialize;
35use serde::Serialize;
36
37use super::FailureInfo;
38use super::NodePayload;
39use super::NodeProperties;
40use super::NodeRef;
41
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
53#[schemars(title = "NodePayload")]
54pub struct NodePayloadDto {
55 pub identity: String,
57 pub properties: NodePropertiesDto,
59 pub children: Vec<String>,
62 pub parent: Option<String>,
64 pub as_of: String,
66}
67
68#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
71#[schemars(rename = "NodeProperties")]
72pub enum NodePropertiesDto {
73 Root {
75 num_hosts: usize,
76 started_at: String,
77 started_by: String,
78 system_children: Vec<String>,
79 },
80 Host {
82 addr: String,
83 num_procs: usize,
84 system_children: Vec<String>,
85 },
86 Proc {
88 proc_name: String,
89 num_actors: usize,
90 system_children: Vec<String>,
91 stopped_children: Vec<String>,
92 stopped_retention_cap: usize,
93 is_poisoned: bool,
94 failed_actor_count: usize,
95 },
96 Actor {
98 actor_status: String,
99 actor_type: String,
100 messages_processed: u64,
101 created_at: Option<String>,
102 last_message_handler: Option<String>,
103 total_processing_time_us: u64,
104 flight_recorder: Option<String>,
105 is_system: bool,
106 failure_info: Option<FailureInfoDto>,
107 },
108 Error { code: String, message: String },
110}
111
112#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
114#[schemars(rename = "FailureInfo")]
115pub struct FailureInfoDto {
116 pub error_message: String,
118 pub root_cause_actor: String,
120 pub root_cause_name: Option<String>,
122 pub occurred_at: String,
124 pub is_propagated: bool,
126}
127
128fn format_time(t: &SystemTime) -> String {
131 humantime::format_rfc3339_millis(*t).to_string()
132}
133
134fn refs_to_strings(refs: &[NodeRef]) -> Vec<String> {
135 refs.iter().map(|r| r.to_string()).collect()
136}
137
138fn parse_refs(field: &str, strings: &[String]) -> anyhow::Result<Vec<NodeRef>> {
139 strings
140 .iter()
141 .enumerate()
142 .map(|(i, s)| {
143 s.parse()
144 .with_context(|| format!("failed to parse {field}[{i}]: {s:?}"))
145 })
146 .collect()
147}
148
149impl From<NodePayload> for NodePayloadDto {
152 fn from(p: NodePayload) -> Self {
153 Self {
154 identity: p.identity.to_string(),
155 properties: p.properties.into(),
156 children: refs_to_strings(&p.children),
157 parent: p.parent.as_ref().map(|r| r.to_string()),
158 as_of: format_time(&p.as_of),
159 }
160 }
161}
162
163impl From<NodeProperties> for NodePropertiesDto {
164 fn from(p: NodeProperties) -> Self {
165 match p {
166 NodeProperties::Root {
167 num_hosts,
168 started_at,
169 started_by,
170 system_children,
171 } => Self::Root {
172 num_hosts,
173 started_at: format_time(&started_at),
174 started_by,
175 system_children: refs_to_strings(&system_children),
176 },
177 NodeProperties::Host {
178 addr,
179 num_procs,
180 system_children,
181 } => Self::Host {
182 addr,
183 num_procs,
184 system_children: refs_to_strings(&system_children),
185 },
186 NodeProperties::Proc {
187 proc_name,
188 num_actors,
189 system_children,
190 stopped_children,
191 stopped_retention_cap,
192 is_poisoned,
193 failed_actor_count,
194 } => Self::Proc {
195 proc_name,
196 num_actors,
197 system_children: refs_to_strings(&system_children),
198 stopped_children: refs_to_strings(&stopped_children),
199 stopped_retention_cap,
200 is_poisoned,
201 failed_actor_count,
202 },
203 NodeProperties::Actor {
204 actor_status,
205 actor_type,
206 messages_processed,
207 created_at,
208 last_message_handler,
209 total_processing_time_us,
210 flight_recorder,
211 is_system,
212 failure_info,
213 } => Self::Actor {
214 actor_status,
215 actor_type,
216 messages_processed,
217 created_at: created_at.as_ref().map(format_time),
218 last_message_handler,
219 total_processing_time_us,
220 flight_recorder,
221 is_system,
222 failure_info: failure_info.map(Into::into),
223 },
224 NodeProperties::Error { code, message } => Self::Error { code, message },
225 }
226 }
227}
228
229impl From<FailureInfo> for FailureInfoDto {
230 fn from(f: FailureInfo) -> Self {
231 Self {
232 error_message: f.error_message,
233 root_cause_actor: f.root_cause_actor.to_string(),
234 root_cause_name: f.root_cause_name,
235 occurred_at: format_time(&f.occurred_at),
236 is_propagated: f.is_propagated,
237 }
238 }
239}
240
241impl TryFrom<NodePayloadDto> for NodePayload {
244 type Error = anyhow::Error;
245
246 fn try_from(dto: NodePayloadDto) -> Result<Self, Self::Error> {
247 Ok(Self {
248 identity: dto
249 .identity
250 .parse()
251 .with_context(|| format!("failed to parse identity: {:?}", dto.identity))?,
252 properties: dto
253 .properties
254 .try_into()
255 .context("failed to parse properties")?,
256 children: parse_refs("children", &dto.children)?,
257 parent: dto
258 .parent
259 .map(|s| {
260 s.parse()
261 .with_context(|| format!("failed to parse parent: {s:?}"))
262 })
263 .transpose()?,
264 as_of: humantime::parse_rfc3339(&dto.as_of)
265 .with_context(|| format!("failed to parse as_of: {:?}", dto.as_of))?,
266 })
267 }
268}
269
270impl TryFrom<NodePropertiesDto> for NodeProperties {
271 type Error = anyhow::Error;
272
273 fn try_from(
274 dto: NodePropertiesDto,
275 ) -> Result<Self, <Self as TryFrom<NodePropertiesDto>>::Error> {
276 Ok(match dto {
277 NodePropertiesDto::Root {
278 num_hosts,
279 started_at,
280 started_by,
281 system_children,
282 } => Self::Root {
283 num_hosts,
284 started_at: humantime::parse_rfc3339(&started_at)
285 .context("failed to parse Root.started_at")?,
286 started_by,
287 system_children: parse_refs("Root.system_children", &system_children)?,
288 },
289 NodePropertiesDto::Host {
290 addr,
291 num_procs,
292 system_children,
293 } => Self::Host {
294 addr,
295 num_procs,
296 system_children: parse_refs("Host.system_children", &system_children)?,
297 },
298 NodePropertiesDto::Proc {
299 proc_name,
300 num_actors,
301 system_children,
302 stopped_children,
303 stopped_retention_cap,
304 is_poisoned,
305 failed_actor_count,
306 } => Self::Proc {
307 proc_name,
308 num_actors,
309 system_children: parse_refs("Proc.system_children", &system_children)?,
310 stopped_children: parse_refs("Proc.stopped_children", &stopped_children)?,
311 stopped_retention_cap,
312 is_poisoned,
313 failed_actor_count,
314 },
315 NodePropertiesDto::Actor {
316 actor_status,
317 actor_type,
318 messages_processed,
319 created_at,
320 last_message_handler,
321 total_processing_time_us,
322 flight_recorder,
323 is_system,
324 failure_info,
325 } => Self::Actor {
326 actor_status,
327 actor_type,
328 messages_processed,
329 created_at: created_at
330 .map(|s| {
331 humantime::parse_rfc3339(&s)
332 .with_context(|| format!("failed to parse Actor.created_at: {s:?}"))
333 })
334 .transpose()?,
335 last_message_handler,
336 total_processing_time_us,
337 flight_recorder,
338 is_system,
339 failure_info: failure_info
340 .map(TryInto::try_into)
341 .transpose()
342 .context("failed to parse Actor.failure_info")?,
343 },
344 NodePropertiesDto::Error { code, message } => Self::Error { code, message },
345 })
346 }
347}
348
349impl TryFrom<FailureInfoDto> for FailureInfo {
350 type Error = anyhow::Error;
351
352 fn try_from(dto: FailureInfoDto) -> Result<Self, Self::Error> {
353 Ok(Self {
354 error_message: dto.error_message,
355 root_cause_actor: dto.root_cause_actor.parse().with_context(|| {
356 format!(
357 "failed to parse FailureInfo.root_cause_actor: {:?}",
358 dto.root_cause_actor
359 )
360 })?,
361 root_cause_name: dto.root_cause_name,
362 occurred_at: humantime::parse_rfc3339(&dto.occurred_at).with_context(|| {
363 format!(
364 "failed to parse FailureInfo.occurred_at: {:?}",
365 dto.occurred_at
366 )
367 })?,
368 is_propagated: dto.is_propagated,
369 })
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 fn test_proc_id() -> hyperactor::reference::ProcId {
380 hyperactor::reference::ProcId::with_name(
381 hyperactor::channel::ChannelAddr::Local(0),
382 "worker",
383 )
384 }
385
386 fn test_actor_id() -> hyperactor::reference::ActorId {
387 test_proc_id().actor_id("actor", 0)
388 }
389
390 fn test_host_actor_id() -> hyperactor::reference::ActorId {
391 test_proc_id().actor_id("host_agent", 0)
392 }
393
394 fn test_time() -> SystemTime {
395 humantime::parse_rfc3339("2025-01-15T10:30:00.123Z").unwrap()
396 }
397
398 fn test_time_2() -> SystemTime {
399 humantime::parse_rfc3339("2025-01-15T11:00:00.456Z").unwrap()
400 }
401
402 fn make_root_payload() -> NodePayload {
403 NodePayload {
404 identity: NodeRef::Root,
405 properties: NodeProperties::Root {
406 num_hosts: 2,
407 started_at: test_time(),
408 started_by: "test_user".to_string(),
409 system_children: vec![],
410 },
411 children: vec![NodeRef::Host(test_host_actor_id())],
412 parent: None,
413 as_of: test_time(),
414 }
415 }
416
417 fn make_host_payload() -> NodePayload {
418 NodePayload {
419 identity: NodeRef::Host(test_host_actor_id()),
420 properties: NodeProperties::Host {
421 addr: "127.0.0.1:8080".to_string(),
422 num_procs: 1,
423 system_children: vec![],
424 },
425 children: vec![NodeRef::Proc(test_proc_id())],
426 parent: Some(NodeRef::Root),
427 as_of: test_time(),
428 }
429 }
430
431 fn make_proc_payload() -> NodePayload {
432 NodePayload {
433 identity: NodeRef::Proc(test_proc_id()),
434 properties: NodeProperties::Proc {
435 proc_name: "worker".to_string(),
436 num_actors: 3,
437 system_children: vec![NodeRef::Actor(test_actor_id())],
438 stopped_children: vec![],
439 stopped_retention_cap: 100,
440 is_poisoned: false,
441 failed_actor_count: 0,
442 },
443 children: vec![NodeRef::Actor(test_actor_id())],
444 parent: Some(NodeRef::Host(test_host_actor_id())),
445 as_of: test_time(),
446 }
447 }
448
449 fn make_actor_payload_no_failure() -> NodePayload {
450 NodePayload {
451 identity: NodeRef::Actor(test_actor_id()),
452 properties: NodeProperties::Actor {
453 actor_status: "running".to_string(),
454 actor_type: "MyActor".to_string(),
455 messages_processed: 42,
456 created_at: Some(test_time()),
457 last_message_handler: Some("handle_msg".to_string()),
458 total_processing_time_us: 1500,
459 flight_recorder: None,
460 is_system: false,
461 failure_info: None,
462 },
463 children: vec![],
464 parent: Some(NodeRef::Proc(test_proc_id())),
465 as_of: test_time(),
466 }
467 }
468
469 fn make_actor_payload_with_failure() -> NodePayload {
470 NodePayload {
471 identity: NodeRef::Actor(test_actor_id()),
472 properties: NodeProperties::Actor {
473 actor_status: "failed".to_string(),
474 actor_type: "MyActor".to_string(),
475 messages_processed: 10,
476 created_at: Some(test_time()),
477 last_message_handler: None,
478 total_processing_time_us: 500,
479 flight_recorder: Some("trace-abc".to_string()),
480 is_system: true,
481 failure_info: Some(FailureInfo {
482 error_message: "boom".to_string(),
483 root_cause_actor: test_actor_id(),
484 root_cause_name: Some("root_actor".to_string()),
485 occurred_at: test_time_2(),
486 is_propagated: true,
487 }),
488 },
489 children: vec![],
490 parent: Some(NodeRef::Proc(test_proc_id())),
491 as_of: test_time(),
492 }
493 }
494
495 fn make_actor_payload_minimal() -> NodePayload {
496 NodePayload {
497 identity: NodeRef::Actor(test_actor_id()),
498 properties: NodeProperties::Actor {
499 actor_status: "idle".to_string(),
500 actor_type: "MinimalActor".to_string(),
501 messages_processed: 0,
502 created_at: None,
503 last_message_handler: None,
504 total_processing_time_us: 0,
505 flight_recorder: None,
506 is_system: false,
507 failure_info: None,
508 },
509 children: vec![],
510 parent: Some(NodeRef::Proc(test_proc_id())),
511 as_of: test_time(),
512 }
513 }
514
515 fn make_error_payload() -> NodePayload {
516 NodePayload {
517 identity: NodeRef::Actor(test_actor_id()),
518 properties: NodeProperties::Error {
519 code: "not_found".to_string(),
520 message: "actor not found".to_string(),
521 },
522 children: vec![],
523 parent: None,
524 as_of: test_time(),
525 }
526 }
527
528 fn assert_round_trip(payload: &NodePayload) {
532 let dto: NodePayloadDto = payload.clone().into();
533 let back: NodePayload = dto.try_into().expect("round-trip conversion");
534 assert_eq!(payload, &back);
535 }
536
537 #[test]
539 fn test_round_trip_root() {
540 assert_round_trip(&make_root_payload());
541 }
542
543 #[test]
545 fn test_round_trip_host() {
546 assert_round_trip(&make_host_payload());
547 }
548
549 #[test]
551 fn test_round_trip_proc() {
552 assert_round_trip(&make_proc_payload());
553 }
554
555 #[test]
557 fn test_round_trip_actor_no_failure() {
558 assert_round_trip(&make_actor_payload_no_failure());
559 }
560
561 #[test]
563 fn test_round_trip_actor_with_failure() {
564 assert_round_trip(&make_actor_payload_with_failure());
565 }
566
567 #[test]
569 fn test_round_trip_actor_minimal() {
570 assert_round_trip(&make_actor_payload_minimal());
571 }
572
573 #[test]
575 fn test_round_trip_error() {
576 assert_round_trip(&make_error_payload());
577 }
578
579 #[test]
585 fn test_json_shape_root() {
586 let dto: NodePayloadDto = make_root_payload().into();
587 let json = serde_json::to_value(&dto).unwrap();
588
589 assert_eq!(json["identity"], "root");
590 assert!(json["parent"].is_null());
591 assert_eq!(json["as_of"], "2025-01-15T10:30:00.123Z");
592
593 let children = json["children"].as_array().unwrap();
594 assert_eq!(children.len(), 1);
595 assert_eq!(children[0], format!("host:{}", test_host_actor_id()));
596
597 let root = &json["properties"]["Root"];
598 assert_eq!(root["num_hosts"], 2);
599 assert_eq!(root["started_at"], "2025-01-15T10:30:00.123Z");
600 assert_eq!(root["started_by"], "test_user");
601 assert!(root["system_children"].as_array().unwrap().is_empty());
602 }
603
604 #[test]
607 fn test_json_shape_actor_with_failure() {
608 let dto: NodePayloadDto = make_actor_payload_with_failure().into();
609 let json = serde_json::to_value(&dto).unwrap();
610
611 assert_eq!(json["identity"], test_actor_id().to_string());
612 assert_eq!(json["parent"], test_proc_id().to_string());
613
614 let actor = &json["properties"]["Actor"];
615 assert_eq!(actor["actor_status"], "failed");
616 assert_eq!(actor["messages_processed"], 10);
617 assert_eq!(actor["created_at"], "2025-01-15T10:30:00.123Z");
618 assert!(actor["last_message_handler"].is_null());
619 assert_eq!(actor["flight_recorder"], "trace-abc");
620 assert_eq!(actor["is_system"], true);
621
622 let fi = &actor["failure_info"];
623 assert_eq!(fi["error_message"], "boom");
624 assert_eq!(fi["root_cause_actor"], test_actor_id().to_string());
625 assert_eq!(fi["root_cause_name"], "root_actor");
626 assert_eq!(fi["occurred_at"], "2025-01-15T11:00:00.456Z");
627 assert_eq!(fi["is_propagated"], true);
628 }
629
630 #[test]
632 fn test_json_shape_optional_none_fields() {
633 let dto: NodePayloadDto = make_actor_payload_minimal().into();
634 let json = serde_json::to_value(&dto).unwrap();
635
636 let actor = &json["properties"]["Actor"];
637 assert!(actor["created_at"].is_null());
638 assert!(actor["last_message_handler"].is_null());
639 assert!(actor["flight_recorder"].is_null());
640 assert!(actor["failure_info"].is_null());
641 }
642
643 #[test]
645 fn test_json_shape_error() {
646 let dto: NodePayloadDto = make_error_payload().into();
647 let json = serde_json::to_value(&dto).unwrap();
648
649 let err = &json["properties"]["Error"];
650 assert_eq!(err["code"], "not_found");
651 assert_eq!(err["message"], "actor not found");
652 }
653
654 #[test]
656 fn test_json_shape_empty_children() {
657 let dto: NodePayloadDto = make_actor_payload_no_failure().into();
658 let json = serde_json::to_value(&dto).unwrap();
659 assert!(json["children"].as_array().unwrap().is_empty());
660 }
661
662 #[test]
669 fn test_schema_defs_keys() {
670 let schema = schemars::schema_for!(NodePayloadDto);
671 let json = serde_json::to_value(&schema).unwrap();
672 let defs = json["$defs"].as_object().unwrap();
673 assert!(
674 defs.contains_key("NodeProperties"),
675 "$defs must contain 'NodeProperties', got: {:?}",
676 defs.keys().collect::<Vec<_>>()
677 );
678 assert!(
679 defs.contains_key("FailureInfo"),
680 "$defs must contain 'FailureInfo', got: {:?}",
681 defs.keys().collect::<Vec<_>>()
682 );
683 }
684
685 #[test]
688 fn test_schema_title() {
689 let schema = schemars::schema_for!(NodePayloadDto);
690 let json = serde_json::to_value(&schema).unwrap();
691 assert_eq!(json["title"], "NodePayload");
692 }
693}