1use std::time::SystemTime;
31
32use anyhow::Context;
33use schemars::JsonSchema;
34use serde::Deserialize;
35use serde::Serialize;
36
37use super::FailureInfo;
38use super::NodePayload;
39use super::NodeProperties;
40use super::NodeRef;
41
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
53#[schemars(title = "NodePayload")]
54pub struct NodePayloadDto {
55 pub identity: String,
57 pub properties: NodePropertiesDto,
59 pub children: Vec<String>,
62 pub parent: Option<String>,
64 pub as_of: String,
66}
67
68#[derive(
71 Debug,
72 Clone,
73 Copy,
74 PartialEq,
75 Eq,
76 Default,
77 Serialize,
78 Deserialize,
79 JsonSchema
80)]
81#[schemars(rename = "ProcessMemoryStats")]
82pub struct ProcessMemoryStatsDto {
83 pub process_rss_bytes: Option<u64>,
85 pub process_vm_size_bytes: Option<u64>,
87}
88
89#[derive(
92 Debug,
93 Clone,
94 Copy,
95 PartialEq,
96 Eq,
97 Default,
98 Serialize,
99 Deserialize,
100 JsonSchema
101)]
102#[schemars(rename = "ProcDebugStats")]
103pub struct ProcDebugStatsDto {
104 pub memory: ProcessMemoryStatsDto,
106 pub actor_work_queue_depth_total: u64,
108 pub actor_work_queue_depth_max: u64,
110 pub actor_work_queue_depth_high_water_mark: u64,
112 pub last_nonzero_queue_depth_age_ms: Option<u64>,
114}
115
116#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
119#[schemars(rename = "NodeProperties")]
120pub enum NodePropertiesDto {
121 Root {
123 num_hosts: usize,
124 started_at: String,
125 started_by: String,
126 system_children: Vec<String>,
127 },
128 Host {
130 addr: String,
131 num_procs: usize,
132 system_children: Vec<String>,
133 memory: ProcessMemoryStatsDto,
135 },
136 Proc {
138 proc_name: String,
139 num_actors: usize,
140 system_children: Vec<String>,
141 stopped_children: Vec<String>,
142 stopped_retention_cap: usize,
143 is_poisoned: bool,
144 failed_actor_count: usize,
145 debug: ProcDebugStatsDto,
147 },
148 Actor {
150 actor_status: String,
151 actor_type: String,
152 messages_processed: u64,
153 created_at: Option<String>,
154 last_message_handler: Option<String>,
155 total_processing_time_us: u64,
156 flight_recorder: Option<String>,
157 is_system: bool,
158 failure_info: Option<FailureInfoDto>,
159 },
160 Error { code: String, message: String },
162}
163
164#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
166#[schemars(rename = "FailureInfo")]
167pub struct FailureInfoDto {
168 pub error_message: String,
170 pub root_cause_actor: String,
172 pub root_cause_name: Option<String>,
174 pub occurred_at: String,
176 pub is_propagated: bool,
178}
179
180fn format_time(t: &SystemTime) -> String {
183 humantime::format_rfc3339_millis(*t).to_string()
184}
185
186fn refs_to_strings(refs: &[NodeRef]) -> Vec<String> {
187 refs.iter().map(|r| r.to_string()).collect()
188}
189
190fn parse_refs(field: &str, strings: &[String]) -> anyhow::Result<Vec<NodeRef>> {
191 strings
192 .iter()
193 .enumerate()
194 .map(|(i, s)| {
195 s.parse()
196 .with_context(|| format!("failed to parse {field}[{i}]: {s:?}"))
197 })
198 .collect()
199}
200
201impl From<NodePayload> for NodePayloadDto {
204 fn from(p: NodePayload) -> Self {
205 Self {
206 identity: p.identity.to_string(),
207 properties: p.properties.into(),
208 children: refs_to_strings(&p.children),
209 parent: p.parent.as_ref().map(|r| r.to_string()),
210 as_of: format_time(&p.as_of),
211 }
212 }
213}
214
215impl From<NodeProperties> for NodePropertiesDto {
216 fn from(p: NodeProperties) -> Self {
217 match p {
218 NodeProperties::Root {
219 num_hosts,
220 started_at,
221 started_by,
222 system_children,
223 } => Self::Root {
224 num_hosts,
225 started_at: format_time(&started_at),
226 started_by,
227 system_children: refs_to_strings(&system_children),
228 },
229 NodeProperties::Host {
230 addr,
231 num_procs,
232 system_children,
233 memory,
234 } => Self::Host {
235 addr,
236 num_procs,
237 system_children: refs_to_strings(&system_children),
238 memory: ProcessMemoryStatsDto {
239 process_rss_bytes: memory.process_rss_bytes,
240 process_vm_size_bytes: memory.process_vm_size_bytes,
241 },
242 },
243 NodeProperties::Proc {
244 proc_name,
245 num_actors,
246 system_children,
247 stopped_children,
248 stopped_retention_cap,
249 is_poisoned,
250 failed_actor_count,
251 debug,
252 } => Self::Proc {
253 proc_name,
254 num_actors,
255 system_children: refs_to_strings(&system_children),
256 stopped_children: refs_to_strings(&stopped_children),
257 stopped_retention_cap,
258 is_poisoned,
259 failed_actor_count,
260 debug: ProcDebugStatsDto {
261 memory: ProcessMemoryStatsDto {
262 process_rss_bytes: debug.memory.process_rss_bytes,
263 process_vm_size_bytes: debug.memory.process_vm_size_bytes,
264 },
265 actor_work_queue_depth_total: debug.actor_work_queue_depth_total,
266 actor_work_queue_depth_max: debug.actor_work_queue_depth_max,
267 actor_work_queue_depth_high_water_mark: debug
268 .actor_work_queue_depth_high_water_mark,
269 last_nonzero_queue_depth_age_ms: debug.last_nonzero_queue_depth_age_ms,
270 },
271 },
272 NodeProperties::Actor {
273 actor_status,
274 actor_type,
275 messages_processed,
276 created_at,
277 last_message_handler,
278 total_processing_time_us,
279 flight_recorder,
280 is_system,
281 failure_info,
282 } => Self::Actor {
283 actor_status,
284 actor_type,
285 messages_processed,
286 created_at: created_at.as_ref().map(format_time),
287 last_message_handler,
288 total_processing_time_us,
289 flight_recorder,
290 is_system,
291 failure_info: failure_info.map(Into::into),
292 },
293 NodeProperties::Error { code, message } => Self::Error { code, message },
294 }
295 }
296}
297
298impl From<FailureInfo> for FailureInfoDto {
299 fn from(f: FailureInfo) -> Self {
300 Self {
301 error_message: f.error_message,
302 root_cause_actor: f.root_cause_actor.to_string(),
303 root_cause_name: f.root_cause_name,
304 occurred_at: format_time(&f.occurred_at),
305 is_propagated: f.is_propagated,
306 }
307 }
308}
309
310impl TryFrom<NodePayloadDto> for NodePayload {
313 type Error = anyhow::Error;
314
315 fn try_from(dto: NodePayloadDto) -> Result<Self, Self::Error> {
316 Ok(Self {
317 identity: dto
318 .identity
319 .parse()
320 .with_context(|| format!("failed to parse identity: {:?}", dto.identity))?,
321 properties: dto
322 .properties
323 .try_into()
324 .context("failed to parse properties")?,
325 children: parse_refs("children", &dto.children)?,
326 parent: dto
327 .parent
328 .map(|s| {
329 s.parse()
330 .with_context(|| format!("failed to parse parent: {s:?}"))
331 })
332 .transpose()?,
333 as_of: humantime::parse_rfc3339(&dto.as_of)
334 .with_context(|| format!("failed to parse as_of: {:?}", dto.as_of))?,
335 })
336 }
337}
338
339impl TryFrom<NodePropertiesDto> for NodeProperties {
340 type Error = anyhow::Error;
341
342 fn try_from(
343 dto: NodePropertiesDto,
344 ) -> Result<Self, <Self as TryFrom<NodePropertiesDto>>::Error> {
345 Ok(match dto {
346 NodePropertiesDto::Root {
347 num_hosts,
348 started_at,
349 started_by,
350 system_children,
351 } => Self::Root {
352 num_hosts,
353 started_at: humantime::parse_rfc3339(&started_at)
354 .context("failed to parse Root.started_at")?,
355 started_by,
356 system_children: parse_refs("Root.system_children", &system_children)?,
357 },
358 NodePropertiesDto::Host {
359 addr,
360 num_procs,
361 system_children,
362 memory,
363 } => Self::Host {
364 addr,
365 num_procs,
366 system_children: parse_refs("Host.system_children", &system_children)?,
367 memory: super::ProcessMemoryStats {
368 process_rss_bytes: memory.process_rss_bytes,
369 process_vm_size_bytes: memory.process_vm_size_bytes,
370 },
371 },
372 NodePropertiesDto::Proc {
373 proc_name,
374 num_actors,
375 system_children,
376 stopped_children,
377 stopped_retention_cap,
378 is_poisoned,
379 failed_actor_count,
380 debug,
381 } => Self::Proc {
382 proc_name,
383 num_actors,
384 system_children: parse_refs("Proc.system_children", &system_children)?,
385 stopped_children: parse_refs("Proc.stopped_children", &stopped_children)?,
386 stopped_retention_cap,
387 is_poisoned,
388 failed_actor_count,
389 debug: super::ProcDebugStats {
390 memory: super::ProcessMemoryStats {
391 process_rss_bytes: debug.memory.process_rss_bytes,
392 process_vm_size_bytes: debug.memory.process_vm_size_bytes,
393 },
394 actor_work_queue_depth_total: debug.actor_work_queue_depth_total,
395 actor_work_queue_depth_max: debug.actor_work_queue_depth_max,
396 actor_work_queue_depth_high_water_mark: debug
397 .actor_work_queue_depth_high_water_mark,
398 last_nonzero_queue_depth_age_ms: debug.last_nonzero_queue_depth_age_ms,
399 },
400 },
401 NodePropertiesDto::Actor {
402 actor_status,
403 actor_type,
404 messages_processed,
405 created_at,
406 last_message_handler,
407 total_processing_time_us,
408 flight_recorder,
409 is_system,
410 failure_info,
411 } => Self::Actor {
412 actor_status,
413 actor_type,
414 messages_processed,
415 created_at: created_at
416 .map(|s| {
417 humantime::parse_rfc3339(&s)
418 .with_context(|| format!("failed to parse Actor.created_at: {s:?}"))
419 })
420 .transpose()?,
421 last_message_handler,
422 total_processing_time_us,
423 flight_recorder,
424 is_system,
425 failure_info: failure_info
426 .map(TryInto::try_into)
427 .transpose()
428 .context("failed to parse Actor.failure_info")?,
429 },
430 NodePropertiesDto::Error { code, message } => Self::Error { code, message },
431 })
432 }
433}
434
435impl TryFrom<FailureInfoDto> for FailureInfo {
436 type Error = anyhow::Error;
437
438 fn try_from(dto: FailureInfoDto) -> Result<Self, Self::Error> {
439 Ok(Self {
440 error_message: dto.error_message,
441 root_cause_actor: dto.root_cause_actor.parse().with_context(|| {
442 format!(
443 "failed to parse FailureInfo.root_cause_actor: {:?}",
444 dto.root_cause_actor
445 )
446 })?,
447 root_cause_name: dto.root_cause_name,
448 occurred_at: humantime::parse_rfc3339(&dto.occurred_at).with_context(|| {
449 format!(
450 "failed to parse FailureInfo.occurred_at: {:?}",
451 dto.occurred_at
452 )
453 })?,
454 is_propagated: dto.is_propagated,
455 })
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use crate::mesh_id::ResourceId;
463
464 fn test_proc_id() -> hyperactor::ProcAddr {
467 ResourceId::proc_addr_from_name(hyperactor::channel::ChannelAddr::Local(0), "worker")
468 }
469
470 fn test_actor_id() -> hyperactor::ActorAddr {
471 test_proc_id().actor_addr("actor")
472 }
473
474 fn test_host_actor_id() -> hyperactor::ActorAddr {
475 test_proc_id().actor_addr("host_agent")
476 }
477
478 fn test_time() -> SystemTime {
479 humantime::parse_rfc3339("2025-01-15T10:30:00.123Z").unwrap()
480 }
481
482 fn test_time_2() -> SystemTime {
483 humantime::parse_rfc3339("2025-01-15T11:00:00.456Z").unwrap()
484 }
485
486 fn make_root_payload() -> NodePayload {
487 NodePayload {
488 identity: NodeRef::Root,
489 properties: NodeProperties::Root {
490 num_hosts: 2,
491 started_at: test_time(),
492 started_by: "test_user".to_string(),
493 system_children: vec![],
494 },
495 children: vec![NodeRef::Host(test_host_actor_id())],
496 parent: None,
497 as_of: test_time(),
498 }
499 }
500
501 fn make_host_payload() -> NodePayload {
502 NodePayload {
503 identity: NodeRef::Host(test_host_actor_id()),
504 properties: NodeProperties::Host {
505 addr: "127.0.0.1:8080".to_string(),
506 num_procs: 1,
507 system_children: vec![],
508 memory: Default::default(),
509 },
510 children: vec![NodeRef::Proc(test_proc_id())],
511 parent: Some(NodeRef::Root),
512 as_of: test_time(),
513 }
514 }
515
516 fn make_proc_payload() -> NodePayload {
517 NodePayload {
518 identity: NodeRef::Proc(test_proc_id()),
519 properties: NodeProperties::Proc {
520 proc_name: "worker".to_string(),
521 num_actors: 3,
522 system_children: vec![NodeRef::Actor(test_actor_id())],
523 stopped_children: vec![],
524 stopped_retention_cap: 100,
525 is_poisoned: false,
526 failed_actor_count: 0,
527 debug: Default::default(),
528 },
529 children: vec![NodeRef::Actor(test_actor_id())],
530 parent: Some(NodeRef::Host(test_host_actor_id())),
531 as_of: test_time(),
532 }
533 }
534
535 fn make_actor_payload_no_failure() -> NodePayload {
536 NodePayload {
537 identity: NodeRef::Actor(test_actor_id()),
538 properties: NodeProperties::Actor {
539 actor_status: "running".to_string(),
540 actor_type: "MyActor".to_string(),
541 messages_processed: 42,
542 created_at: Some(test_time()),
543 last_message_handler: Some("handle_msg".to_string()),
544 total_processing_time_us: 1500,
545 flight_recorder: None,
546 is_system: false,
547 failure_info: None,
548 },
549 children: vec![],
550 parent: Some(NodeRef::Proc(test_proc_id())),
551 as_of: test_time(),
552 }
553 }
554
555 fn make_actor_payload_with_failure() -> NodePayload {
556 NodePayload {
557 identity: NodeRef::Actor(test_actor_id()),
558 properties: NodeProperties::Actor {
559 actor_status: "failed".to_string(),
560 actor_type: "MyActor".to_string(),
561 messages_processed: 10,
562 created_at: Some(test_time()),
563 last_message_handler: None,
564 total_processing_time_us: 500,
565 flight_recorder: Some("trace-abc".to_string()),
566 is_system: true,
567 failure_info: Some(FailureInfo {
568 error_message: "boom".to_string(),
569 root_cause_actor: test_actor_id(),
570 root_cause_name: Some("root_actor".to_string()),
571 occurred_at: test_time_2(),
572 is_propagated: true,
573 }),
574 },
575 children: vec![],
576 parent: Some(NodeRef::Proc(test_proc_id())),
577 as_of: test_time(),
578 }
579 }
580
581 fn make_actor_payload_minimal() -> NodePayload {
582 NodePayload {
583 identity: NodeRef::Actor(test_actor_id()),
584 properties: NodeProperties::Actor {
585 actor_status: "idle".to_string(),
586 actor_type: "MinimalActor".to_string(),
587 messages_processed: 0,
588 created_at: None,
589 last_message_handler: None,
590 total_processing_time_us: 0,
591 flight_recorder: None,
592 is_system: false,
593 failure_info: None,
594 },
595 children: vec![],
596 parent: Some(NodeRef::Proc(test_proc_id())),
597 as_of: test_time(),
598 }
599 }
600
601 fn make_error_payload() -> NodePayload {
602 NodePayload {
603 identity: NodeRef::Actor(test_actor_id()),
604 properties: NodeProperties::Error {
605 code: "not_found".to_string(),
606 message: "actor not found".to_string(),
607 },
608 children: vec![],
609 parent: None,
610 as_of: test_time(),
611 }
612 }
613
614 fn assert_round_trip(payload: &NodePayload) {
618 let dto: NodePayloadDto = payload.clone().into();
619 let back: NodePayload = dto.try_into().expect("round-trip conversion");
620 assert_eq!(payload, &back);
621 }
622
623 #[test]
625 fn test_round_trip_root() {
626 assert_round_trip(&make_root_payload());
627 }
628
629 #[test]
631 fn test_round_trip_host() {
632 assert_round_trip(&make_host_payload());
633 }
634
635 #[test]
637 fn test_round_trip_proc() {
638 assert_round_trip(&make_proc_payload());
639 }
640
641 #[test]
643 fn test_round_trip_actor_no_failure() {
644 assert_round_trip(&make_actor_payload_no_failure());
645 }
646
647 #[test]
649 fn test_round_trip_actor_with_failure() {
650 assert_round_trip(&make_actor_payload_with_failure());
651 }
652
653 #[test]
655 fn test_round_trip_actor_minimal() {
656 assert_round_trip(&make_actor_payload_minimal());
657 }
658
659 #[test]
661 fn test_round_trip_error() {
662 assert_round_trip(&make_error_payload());
663 }
664
665 #[test]
671 fn test_json_shape_root() {
672 let dto: NodePayloadDto = make_root_payload().into();
673 let json = serde_json::to_value(&dto).unwrap();
674
675 assert_eq!(json["identity"], "root");
676 assert!(json["parent"].is_null());
677 assert_eq!(json["as_of"], "2025-01-15T10:30:00.123Z");
678
679 let children = json["children"].as_array().unwrap();
680 assert_eq!(children.len(), 1);
681 assert_eq!(children[0], format!("host:{}", test_host_actor_id()));
682
683 let root = &json["properties"]["Root"];
684 assert_eq!(root["num_hosts"], 2);
685 assert_eq!(root["started_at"], "2025-01-15T10:30:00.123Z");
686 assert_eq!(root["started_by"], "test_user");
687 assert!(root["system_children"].as_array().unwrap().is_empty());
688 }
689
690 #[test]
693 fn test_json_shape_actor_with_failure() {
694 let dto: NodePayloadDto = make_actor_payload_with_failure().into();
695 let json = serde_json::to_value(&dto).unwrap();
696
697 assert_eq!(json["identity"], test_actor_id().to_string());
698 assert_eq!(json["parent"], test_proc_id().to_string());
699
700 let actor = &json["properties"]["Actor"];
701 assert_eq!(actor["actor_status"], "failed");
702 assert_eq!(actor["messages_processed"], 10);
703 assert_eq!(actor["created_at"], "2025-01-15T10:30:00.123Z");
704 assert!(actor["last_message_handler"].is_null());
705 assert_eq!(actor["flight_recorder"], "trace-abc");
706 assert_eq!(actor["is_system"], true);
707
708 let fi = &actor["failure_info"];
709 assert_eq!(fi["error_message"], "boom");
710 assert_eq!(fi["root_cause_actor"], test_actor_id().to_string());
711 assert_eq!(fi["root_cause_name"], "root_actor");
712 assert_eq!(fi["occurred_at"], "2025-01-15T11:00:00.456Z");
713 assert_eq!(fi["is_propagated"], true);
714 }
715
716 #[test]
718 fn test_json_shape_optional_none_fields() {
719 let dto: NodePayloadDto = make_actor_payload_minimal().into();
720 let json = serde_json::to_value(&dto).unwrap();
721
722 let actor = &json["properties"]["Actor"];
723 assert!(actor["created_at"].is_null());
724 assert!(actor["last_message_handler"].is_null());
725 assert!(actor["flight_recorder"].is_null());
726 assert!(actor["failure_info"].is_null());
727 }
728
729 #[test]
731 fn test_json_shape_error() {
732 let dto: NodePayloadDto = make_error_payload().into();
733 let json = serde_json::to_value(&dto).unwrap();
734
735 let err = &json["properties"]["Error"];
736 assert_eq!(err["code"], "not_found");
737 assert_eq!(err["message"], "actor not found");
738 }
739
740 #[test]
742 fn test_json_shape_empty_children() {
743 let dto: NodePayloadDto = make_actor_payload_no_failure().into();
744 let json = serde_json::to_value(&dto).unwrap();
745 assert!(json["children"].as_array().unwrap().is_empty());
746 }
747
748 #[test]
755 fn test_schema_defs_keys() {
756 let schema = schemars::schema_for!(NodePayloadDto);
757 let json = serde_json::to_value(&schema).unwrap();
758 let defs = json["$defs"].as_object().unwrap();
759 assert!(
760 defs.contains_key("NodeProperties"),
761 "$defs must contain 'NodeProperties', got: {:?}",
762 defs.keys().collect::<Vec<_>>()
763 );
764 assert!(
765 defs.contains_key("FailureInfo"),
766 "$defs must contain 'FailureInfo', got: {:?}",
767 defs.keys().collect::<Vec<_>>()
768 );
769 }
770
771 #[test]
774 fn test_schema_title() {
775 let schema = schemars::schema_for!(NodePayloadDto);
776 let json = serde_json::to_value(&schema).unwrap();
777 assert_eq!(json["title"], "NodePayload");
778 }
779}