1use std::collections::HashMap;
33use std::fs;
34use std::fs::File;
35use std::io::BufWriter;
36use std::io::Write;
37use std::os::unix::fs::symlink;
38use std::path::Path;
39use std::path::PathBuf;
40use std::str::FromStr;
41use std::sync::atomic::AtomicU64;
42use std::sync::atomic::Ordering;
43use std::time::SystemTime;
44use std::time::UNIX_EPOCH;
45
46use anyhow::Result;
47use hyperactor_config::CONFIG;
48use hyperactor_config::ConfigAttr;
49use hyperactor_config::attrs::AttrValue;
50use hyperactor_config::attrs::declare_attrs;
51use hyperactor_config::typeuri::Named;
52use prost::Message;
53use serde::Deserialize;
54use serde::Serialize;
55use tracing_core::LevelFilter;
56use tracing_perfetto_sdk_schema::DebugAnnotation;
57use tracing_perfetto_sdk_schema::DebugAnnotationName;
58use tracing_perfetto_sdk_schema::EventName;
59use tracing_perfetto_sdk_schema::InternedData;
60use tracing_perfetto_sdk_schema::ProcessDescriptor;
61use tracing_perfetto_sdk_schema::ThreadDescriptor;
62use tracing_perfetto_sdk_schema::Trace;
63use tracing_perfetto_sdk_schema::TracePacket;
64use tracing_perfetto_sdk_schema::TrackDescriptor;
65use tracing_perfetto_sdk_schema::TrackEvent;
66use tracing_perfetto_sdk_schema::debug_annotation::NameField;
67use tracing_perfetto_sdk_schema::debug_annotation::Value as DbgValue;
68use tracing_perfetto_sdk_schema::trace_packet::Data;
69use tracing_perfetto_sdk_schema::trace_packet::OptionalTrustedPacketSequenceId;
70use tracing_perfetto_sdk_schema::track_descriptor::StaticOrDynamicName;
71use tracing_perfetto_sdk_schema::track_event::NameField as EventNameField;
72use tracing_perfetto_sdk_schema::track_event::Type as TrackEventType;
73use tracing_subscriber::filter::Targets;
74
75use crate::config::MONARCH_FILE_LOG_LEVEL;
76use crate::trace_dispatcher::FieldValue;
77use crate::trace_dispatcher::TraceEvent;
78use crate::trace_dispatcher::TraceEventSink;
79use crate::trace_dispatcher::TraceFields;
80use crate::trace_dispatcher::get_field;
81
82pub const USER_TELEMETRY_PREFIX: &str = "monarch_hyperactor::telemetry";
84
85pub const ENDPOINT_TELEMETRY_TARGET: &str = "monarch_hyperactor::telemetry::endpoint";
89const ACTOR_ID_FIELD: &str = "actor_id";
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
93pub enum PerfettoTraceMode {
94 Off,
96 #[default]
98 User,
99 Dev,
101}
102
103impl std::fmt::Display for PerfettoTraceMode {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 match self {
106 PerfettoTraceMode::Off => write!(f, "off"),
107 PerfettoTraceMode::User => write!(f, "user"),
108 PerfettoTraceMode::Dev => write!(f, "dev"),
109 }
110 }
111}
112
113impl std::str::FromStr for PerfettoTraceMode {
114 type Err = anyhow::Error;
115
116 fn from_str(s: &str) -> Result<Self, Self::Err> {
117 match s.to_lowercase().as_str() {
118 "off" | "false" | "0" | "none" => Ok(PerfettoTraceMode::Off),
119 "dev" | "all" | "debug" => Ok(PerfettoTraceMode::Dev),
120 "user" | "true" | "1" => Ok(PerfettoTraceMode::User),
121 _ => Err(anyhow::anyhow!("unknown trace mode: {}", s)),
122 }
123 }
124}
125
126impl Named for PerfettoTraceMode {
127 fn typename() -> &'static str {
128 "hyperactor_telemetry::sinks::perfetto::PerfettoTraceMode"
129 }
130}
131
132impl AttrValue for PerfettoTraceMode {
133 fn display(&self) -> String {
134 self.to_string()
135 }
136
137 fn parse(s: &str) -> Result<Self, anyhow::Error> {
138 s.parse()
139 }
140}
141
142impl PerfettoTraceMode {
143 pub fn should_include(&self, target: &str) -> bool {
145 match self {
146 PerfettoTraceMode::Off => false,
147 PerfettoTraceMode::User => target.starts_with(USER_TELEMETRY_PREFIX),
148 PerfettoTraceMode::Dev => true,
149 }
150 }
151}
152
153declare_attrs! {
154 @meta(CONFIG = ConfigAttr::new(
160 Some("PERFETTO_TRACE_MODE".to_string()),
161 Some("perfetto_trace_mode".to_string()),
162 ))
163 pub attr PERFETTO_TRACE_MODE: PerfettoTraceMode = PerfettoTraceMode::User;
164}
165
166pub const MONARCH_TRACE_DIR_ENV: &str = "MONARCH_TRACE_DIR";
168
169pub fn default_trace_dir() -> PathBuf {
173 if let Ok(dir) = std::env::var(MONARCH_TRACE_DIR_ENV) {
174 return PathBuf::from(dir);
175 }
176 let username = whoami::username();
177 PathBuf::from(format!("/tmp/{}/monarch_traces", username))
178}
179
180struct SpanInfo {
182 fq_name: String,
184 fields: TraceFields,
185 file: Option<&'static str>,
186 line: Option<u32>,
187 prefer_actor_track: bool,
188}
189
190#[derive(Default)]
192struct InternedStrings {
193 next_iid: u64,
194 strings: HashMap<String, u64>,
195 pending: Vec<(String, u64)>,
196}
197
198impl InternedStrings {
199 fn intern(&mut self, s: &str) -> u64 {
200 if let Some(&iid) = self.strings.get(s) {
201 return iid;
202 }
203 self.next_iid += 1;
204 let iid = self.next_iid;
205 self.strings.insert(s.to_string(), iid);
206 self.pending.push((s.to_string(), iid));
207 iid
208 }
209
210 fn take_pending(&mut self) -> Vec<(String, u64)> {
211 std::mem::take(&mut self.pending)
212 }
213
214 fn has_pending(&self) -> bool {
215 !self.pending.is_empty()
216 }
217}
218
219pub struct PerfettoFileSink {
221 writer: BufWriter<File>,
222 pending_packets: Vec<TracePacket>,
223 next_track_id: AtomicU64,
224 sequence_id: u32,
226 event_names: InternedStrings,
227 annotation_names: InternedStrings,
228 span_info: HashMap<u64, SpanInfo>,
229 named_tracks: HashMap<String, u64>,
231 span_tracks: HashMap<u64, u64>,
234 process_track: u64,
236 pid: i32,
237 process_name: String,
238 target_filter: Targets,
239 trace_mode: PerfettoTraceMode,
240}
241
242impl PerfettoFileSink {
243 pub fn new(
252 trace_dir: impl AsRef<Path>,
253 execution_id: &str,
254 process_name: &str,
255 ) -> Result<Self> {
256 let trace_dir = trace_dir.as_ref().to_path_buf();
257 let pid = std::process::id() as i32;
258
259 let executions_dir = trace_dir.join("executions");
260 let execution_dir = executions_dir.join(execution_id);
261 fs::create_dir_all(&execution_dir)?;
262
263 let latest_link = executions_dir.join("latest");
265 let _ = fs::remove_file(&latest_link);
266 if let Err(e) = symlink(execution_id, &latest_link) {
267 tracing::debug!("Failed to create latest symlink: {}", e);
268 }
269
270 let path = execution_dir.join(format!("{}.pftrace", process_name));
271 let file = File::create(&path)?;
272 let writer = BufWriter::new(file);
273
274 let sequence_id = SystemTime::now()
275 .duration_since(UNIX_EPOCH)
276 .unwrap()
277 .as_secs() as u32;
278
279 let mut sink = Self {
280 writer,
281 pending_packets: Vec::new(),
282 next_track_id: AtomicU64::new(1), sequence_id,
284 event_names: InternedStrings::default(),
285 annotation_names: InternedStrings::default(),
286 span_info: HashMap::new(),
287 named_tracks: HashMap::new(),
288 span_tracks: HashMap::new(),
289 process_track: 0,
290 pid,
291 process_name: process_name.to_string(),
292 target_filter: Targets::new().with_default({
293 let log_level_str =
294 hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
295 .unwrap_or_else(|| "info".to_string());
296 let level =
297 tracing::Level::from_str(&log_level_str).unwrap_or(tracing::Level::INFO);
298 LevelFilter::from_level(level)
299 }),
300 trace_mode: hyperactor_config::global::get(PERFETTO_TRACE_MODE),
301 };
302
303 sink.write_sequence_header();
304
305 sink.process_track = sink.create_process_track();
306
307 Ok(sink)
308 }
309
310 fn next_track_id(&self) -> u64 {
311 self.next_track_id.fetch_add(1, Ordering::Relaxed)
312 }
313
314 fn write_sequence_header(&mut self) {
315 let packet = TracePacket {
316 incremental_state_cleared: Some(true),
317 first_packet_on_sequence: Some(true),
318 sequence_flags: Some(3),
319 optional_trusted_packet_sequence_id: Some(
320 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
321 ),
322 ..Default::default()
323 };
324
325 self.write_packet(&packet);
326 }
327
328 fn create_process_track(&mut self) -> u64 {
329 let track_id = self.next_track_id();
330
331 let packet = TracePacket {
332 data: Some(Data::TrackDescriptor(TrackDescriptor {
333 uuid: Some(track_id),
334 process: Some(ProcessDescriptor {
335 pid: Some(self.pid),
336 process_name: Some(self.process_name.clone()),
337 ..Default::default()
338 }),
339 ..Default::default()
340 })),
341 optional_trusted_packet_sequence_id: Some(
342 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
343 ),
344 ..Default::default()
345 };
346
347 self.write_packet(&packet);
348
349 track_id
350 }
351
352 fn get_or_create_named_track(&mut self, track_name: &str) -> u64 {
353 if let Some(&track) = self.named_tracks.get(track_name) {
354 return track;
355 }
356
357 let track_id = self.next_track_id();
358 let tid = self.named_tracks.len() as i32 + 1;
359
360 let packet = TracePacket {
361 data: Some(Data::TrackDescriptor(TrackDescriptor {
362 uuid: Some(track_id),
363 parent_uuid: Some(self.process_track),
364 static_or_dynamic_name: Some(StaticOrDynamicName::Name(track_name.to_string())),
365 thread: Some(ThreadDescriptor {
366 pid: Some(self.pid),
367 tid: Some(tid),
368 thread_name: Some(track_name.to_string()),
369 ..Default::default()
370 }),
371 ..Default::default()
372 })),
373 optional_trusted_packet_sequence_id: Some(
374 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
375 ),
376 ..Default::default()
377 };
378 self.write_packet(&packet);
379
380 self.named_tracks.insert(track_name.to_string(), track_id);
381
382 track_id
383 }
384
385 fn actor_track_name(fields: &TraceFields) -> Option<&str> {
386 match get_field(fields, ACTOR_ID_FIELD) {
387 Some(FieldValue::Str(actor_id) | FieldValue::Debug(actor_id))
388 if !actor_id.is_empty() =>
389 {
390 Some(Self::short_actor_track_name(actor_id))
391 }
392 _ => None,
393 }
394 }
395
396 fn short_actor_track_name(actor_id: &str) -> &str {
397 actor_id
398 .rsplit_once(',')
399 .map(|(_, actor_name_and_pid)| actor_name_and_pid)
400 .unwrap_or(actor_id)
401 }
402
403 fn preferred_track_name<'a>(
404 prefer_actor_track: bool,
405 fields: &'a TraceFields,
406 thread_name: &'a str,
407 ) -> &'a str {
408 if prefer_actor_track {
409 Self::actor_track_name(fields).unwrap_or(thread_name)
410 } else {
411 thread_name
412 }
413 }
414
415 fn write_packet(&mut self, packet: &TracePacket) {
416 self.pending_packets.push(packet.clone());
417 }
418
419 fn write_pending_packets(&mut self) -> Result<()> {
420 if self.pending_packets.is_empty() {
421 return Ok(());
422 }
423
424 let trace = Trace {
425 packet: std::mem::take(&mut self.pending_packets),
426 };
427
428 let bytes = trace.encode_to_vec();
429 self.writer.write_all(&bytes)?;
430 Ok(())
431 }
432
433 fn flush_interned_data(&mut self) {
434 if !self.event_names.has_pending() && !self.annotation_names.has_pending() {
435 return;
436 }
437
438 let mut interned_data = InternedData::default();
439
440 for (name, iid) in self.event_names.take_pending() {
441 interned_data.event_names.push(EventName {
442 iid: Some(iid),
443 name: Some(name),
444 ..Default::default()
445 });
446 }
447
448 for (name, iid) in self.annotation_names.take_pending() {
449 interned_data
450 .debug_annotation_names
451 .push(DebugAnnotationName {
452 iid: Some(iid),
453 name: Some(name),
454 ..Default::default()
455 });
456 }
457
458 let packet = TracePacket {
459 interned_data: Some(interned_data),
460 optional_trusted_packet_sequence_id: Some(
461 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
462 ),
463 sequence_flags: Some(2), ..Default::default()
465 };
466 self.write_packet(&packet);
467 }
468
469 fn timestamp_ns(ts: SystemTime) -> u64 {
470 ts.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as u64
471 }
472
473 fn field_to_debug_annotation(&mut self, key: &str, value: &FieldValue) -> DebugAnnotation {
474 let name_iid = self.annotation_names.intern(key);
475 let dbg_value = match value {
476 FieldValue::Bool(b) => Some(DbgValue::BoolValue(*b)),
477 FieldValue::I64(i) => Some(DbgValue::IntValue(*i)),
478 FieldValue::U64(u) => Some(DbgValue::IntValue(*u as i64)),
479 FieldValue::F64(f) => Some(DbgValue::DoubleValue(*f)),
480 FieldValue::Str(s) => Some(DbgValue::StringValue(s.clone())),
481 FieldValue::Debug(d) => Some(DbgValue::StringValue(d.clone())),
482 };
483
484 DebugAnnotation {
485 name_field: Some(NameField::NameIid(name_iid)),
486 value: dbg_value,
487 ..Default::default()
488 }
489 }
490
491 fn write_slice_begin(
492 &mut self,
493 track: u64,
494 timestamp: SystemTime,
495 name: &str,
496 fields: &TraceFields,
497 file: Option<&str>,
498 line: Option<u32>,
499 ) {
500 self.flush_interned_data();
501
502 let name_iid = self.event_names.intern(name);
503
504 let mut debug_annotations = Vec::new();
505 for (key, value) in fields {
506 debug_annotations.push(self.field_to_debug_annotation(key, value));
507 }
508 if let Some(f) = file {
509 debug_annotations
510 .push(self.field_to_debug_annotation("file", &FieldValue::Str(f.to_string())));
511 }
512 if let Some(l) = line {
513 debug_annotations
514 .push(self.field_to_debug_annotation("line", &FieldValue::U64(l as u64)));
515 }
516
517 self.flush_interned_data();
518
519 let packet = TracePacket {
520 timestamp: Some(Self::timestamp_ns(timestamp)),
521 data: Some(Data::TrackEvent(TrackEvent {
522 track_uuid: Some(track),
523 r#type: Some(TrackEventType::SliceBegin as i32),
524 name_field: Some(EventNameField::NameIid(name_iid)),
525 debug_annotations,
526 ..Default::default()
527 })),
528 optional_trusted_packet_sequence_id: Some(
529 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
530 ),
531 sequence_flags: Some(2),
532 ..Default::default()
533 };
534 self.write_packet(&packet);
535 }
536
537 fn write_slice_end(&mut self, track: u64, timestamp: SystemTime) {
538 let packet = TracePacket {
539 timestamp: Some(Self::timestamp_ns(timestamp)),
540 data: Some(Data::TrackEvent(TrackEvent {
541 track_uuid: Some(track),
542 r#type: Some(TrackEventType::SliceEnd as i32),
543 ..Default::default()
544 })),
545 optional_trusted_packet_sequence_id: Some(
546 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
547 ),
548 sequence_flags: Some(2),
549 ..Default::default()
550 };
551 self.write_packet(&packet);
552 }
553
554 fn write_instant(
555 &mut self,
556 track: u64,
557 timestamp: SystemTime,
558 name: &str,
559 fields: &TraceFields,
560 ) {
561 self.flush_interned_data();
562
563 let name_iid = self.event_names.intern(name);
564
565 let mut debug_annotations = Vec::new();
566 for (key, value) in fields {
567 debug_annotations.push(self.field_to_debug_annotation(key, value));
568 }
569
570 self.flush_interned_data();
571
572 let packet = TracePacket {
573 timestamp: Some(Self::timestamp_ns(timestamp)),
574 data: Some(Data::TrackEvent(TrackEvent {
575 track_uuid: Some(track),
576 r#type: Some(TrackEventType::Instant as i32),
577 name_field: Some(EventNameField::NameIid(name_iid)),
578 debug_annotations,
579 ..Default::default()
580 })),
581 optional_trusted_packet_sequence_id: Some(
582 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
583 ),
584 sequence_flags: Some(2),
585 ..Default::default()
586 };
587 self.write_packet(&packet);
588 }
589}
590
591impl TraceEventSink for PerfettoFileSink {
592 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
593 match event {
594 TraceEvent::NewSpan {
595 id,
596 name,
597 target,
598 fields,
599 file,
600 line,
601 ..
602 } => {
603 if !self.trace_mode.should_include(target) {
604 return Ok(());
605 }
606
607 let display_name = if *target == ENDPOINT_TELEMETRY_TARGET {
613 let mesh = match get_field(fields, "mesh") {
614 Some(FieldValue::Str(m)) => Some(m.as_str()),
615 _ => None,
616 };
617 let method = match get_field(fields, "method") {
618 Some(FieldValue::Str(m)) => Some(m.as_str()),
619 _ => None,
620 };
621 let call_name = match get_field(fields, "call_name") {
622 Some(FieldValue::Str(c)) if !c.is_empty() => Some(c.as_str()),
623 _ => None,
624 };
625 match (mesh, method, call_name) {
626 (Some(mesh), Some(method), _) => format!("{}.{}.{}()", mesh, method, name),
627 (_, _, Some(call_name)) => format!("{}.{}()", call_name, name),
628 _ => name.to_string(),
629 }
630 } else if *target == USER_TELEMETRY_PREFIX {
631 match get_field(fields, "name") {
632 Some(FieldValue::Str(n)) => n.clone(),
633 _ => name.to_string(),
634 }
635 } else {
636 format!("{}::{}", target, name)
637 };
638
639 self.span_info.insert(
640 *id,
641 SpanInfo {
642 fq_name: display_name,
643 fields: fields.clone(),
644 file: *file,
645 line: *line,
646 prefer_actor_track: target.starts_with(USER_TELEMETRY_PREFIX),
647 },
648 );
649 }
650
651 TraceEvent::SpanEnter {
652 id,
653 timestamp,
654 thread_name,
655 } => {
656 if let Some(info) = self.span_info.get(id) {
657 let fq_name = info.fq_name.clone();
658 let fields = info.fields.clone();
659 let file = info.file;
660 let line = info.line;
661 let prefer_actor_track = info.prefer_actor_track;
662 let track_name =
663 Self::preferred_track_name(prefer_actor_track, &fields, thread_name)
664 .to_string();
665 let track = self.get_or_create_named_track(&track_name);
666 self.span_tracks.insert(*id, track);
667 self.write_slice_begin(track, *timestamp, &fq_name, &fields, file, line);
668 }
669 }
670
671 TraceEvent::SpanExit {
672 id,
673 timestamp,
674 thread_name,
675 } => {
676 if let Some(info) = self.span_info.get(id) {
677 let fields = info.fields.clone();
678 let prefer_actor_track = info.prefer_actor_track;
679 let track_name =
680 Self::preferred_track_name(prefer_actor_track, &fields, thread_name)
681 .to_string();
682 let track = self
683 .span_tracks
684 .remove(id)
685 .unwrap_or_else(|| self.get_or_create_named_track(&track_name));
686 self.write_slice_end(track, *timestamp);
687 }
688 }
689
690 TraceEvent::SpanClose { id, .. } => {
691 self.span_info.remove(id);
692 self.span_tracks.remove(id);
693 }
694
695 TraceEvent::Event {
696 name,
697 target,
698 fields,
699 timestamp,
700 thread_name,
701 ..
702 } => {
703 if !self.trace_mode.should_include(target) {
704 return Ok(());
705 }
706
707 let display_name = if self.trace_mode == PerfettoTraceMode::User {
710 if let Some(FieldValue::Str(msg)) = get_field(fields, "message") {
711 msg.clone()
712 } else {
713 name.to_string()
714 }
715 } else {
716 format!("{}::{}", target, name)
717 };
718
719 let track = self.get_or_create_named_track(Self::preferred_track_name(
720 target.starts_with(USER_TELEMETRY_PREFIX),
721 fields,
722 thread_name,
723 ));
724 self.write_instant(track, *timestamp, &display_name, fields);
725 }
726 }
727
728 Ok(())
729 }
730
731 fn flush(&mut self) -> Result<(), anyhow::Error> {
732 self.flush_interned_data();
733 self.write_pending_packets()?;
734 self.writer.flush()?;
735 Ok(())
736 }
737
738 fn name(&self) -> &str {
739 "PerfettoFileSink"
740 }
741
742 fn target_filter(&self) -> Option<&Targets> {
743 Some(&self.target_filter)
744 }
745}