1use std::collections::HashMap;
33use std::fs;
34use std::fs::File;
35use std::io::BufWriter;
36use std::io::Write;
37use std::os::unix::fs::symlink;
38use std::path::Path;
39use std::path::PathBuf;
40use std::str::FromStr;
41use std::sync::atomic::AtomicU64;
42use std::sync::atomic::Ordering;
43use std::time::SystemTime;
44use std::time::UNIX_EPOCH;
45
46use anyhow::Result;
47use hyperactor_config::CONFIG;
48use hyperactor_config::ConfigAttr;
49use hyperactor_config::attrs::AttrValue;
50use hyperactor_config::attrs::declare_attrs;
51use hyperactor_config::typeuri::Named;
52use prost::Message;
53use serde::Deserialize;
54use serde::Serialize;
55use tracing_core::LevelFilter;
56use tracing_perfetto_sdk_schema::DebugAnnotation;
57use tracing_perfetto_sdk_schema::DebugAnnotationName;
58use tracing_perfetto_sdk_schema::EventName;
59use tracing_perfetto_sdk_schema::InternedData;
60use tracing_perfetto_sdk_schema::ProcessDescriptor;
61use tracing_perfetto_sdk_schema::ThreadDescriptor;
62use tracing_perfetto_sdk_schema::Trace;
63use tracing_perfetto_sdk_schema::TracePacket;
64use tracing_perfetto_sdk_schema::TrackDescriptor;
65use tracing_perfetto_sdk_schema::TrackEvent;
66use tracing_perfetto_sdk_schema::debug_annotation::NameField;
67use tracing_perfetto_sdk_schema::debug_annotation::Value as DbgValue;
68use tracing_perfetto_sdk_schema::trace_packet::Data;
69use tracing_perfetto_sdk_schema::trace_packet::OptionalTrustedPacketSequenceId;
70use tracing_perfetto_sdk_schema::track_descriptor::StaticOrDynamicName;
71use tracing_perfetto_sdk_schema::track_event::NameField as EventNameField;
72use tracing_perfetto_sdk_schema::track_event::Type as TrackEventType;
73use tracing_subscriber::filter::Targets;
74
75use crate::config::MONARCH_FILE_LOG_LEVEL;
76use crate::trace_dispatcher::FieldValue;
77use crate::trace_dispatcher::TraceEvent;
78use crate::trace_dispatcher::TraceEventSink;
79use crate::trace_dispatcher::TraceFields;
80use crate::trace_dispatcher::get_field;
81
82pub const USER_TELEMETRY_PREFIX: &str = "monarch_hyperactor::telemetry";
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
87pub enum PerfettoTraceMode {
88 Off,
90 #[default]
92 User,
93 Dev,
95}
96
97impl std::fmt::Display for PerfettoTraceMode {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 match self {
100 PerfettoTraceMode::Off => write!(f, "off"),
101 PerfettoTraceMode::User => write!(f, "user"),
102 PerfettoTraceMode::Dev => write!(f, "dev"),
103 }
104 }
105}
106
107impl std::str::FromStr for PerfettoTraceMode {
108 type Err = anyhow::Error;
109
110 fn from_str(s: &str) -> Result<Self, Self::Err> {
111 match s.to_lowercase().as_str() {
112 "off" | "false" | "0" | "none" => Ok(PerfettoTraceMode::Off),
113 "dev" | "all" | "debug" => Ok(PerfettoTraceMode::Dev),
114 "user" | "true" | "1" => Ok(PerfettoTraceMode::User),
115 _ => Err(anyhow::anyhow!("unknown trace mode: {}", s)),
116 }
117 }
118}
119
120impl Named for PerfettoTraceMode {
121 fn typename() -> &'static str {
122 "hyperactor_telemetry::sinks::perfetto::PerfettoTraceMode"
123 }
124}
125
126impl AttrValue for PerfettoTraceMode {
127 fn display(&self) -> String {
128 self.to_string()
129 }
130
131 fn parse(s: &str) -> Result<Self, anyhow::Error> {
132 s.parse()
133 }
134}
135
136impl PerfettoTraceMode {
137 pub fn should_include(&self, target: &str) -> bool {
139 match self {
140 PerfettoTraceMode::Off => false,
141 PerfettoTraceMode::User => target.starts_with(USER_TELEMETRY_PREFIX),
142 PerfettoTraceMode::Dev => true,
143 }
144 }
145}
146
147declare_attrs! {
148 @meta(CONFIG = ConfigAttr::new(
154 Some("PERFETTO_TRACE_MODE".to_string()),
155 Some("perfetto_trace_mode".to_string()),
156 ))
157 pub attr PERFETTO_TRACE_MODE: PerfettoTraceMode = PerfettoTraceMode::User;
158}
159
160pub const MONARCH_TRACE_DIR_ENV: &str = "MONARCH_TRACE_DIR";
162
163pub fn default_trace_dir() -> PathBuf {
167 if let Ok(dir) = std::env::var(MONARCH_TRACE_DIR_ENV) {
168 return PathBuf::from(dir);
169 }
170 let username = whoami::username();
171 PathBuf::from(format!("/tmp/{}/monarch_traces", username))
172}
173
174struct SpanInfo {
176 fq_name: String,
178 fields: TraceFields,
179 file: Option<&'static str>,
180 line: Option<u32>,
181}
182
183#[derive(Default)]
185struct InternedStrings {
186 next_iid: u64,
187 strings: HashMap<String, u64>,
188 pending: Vec<(String, u64)>,
189}
190
191impl InternedStrings {
192 fn intern(&mut self, s: &str) -> u64 {
193 if let Some(&iid) = self.strings.get(s) {
194 return iid;
195 }
196 self.next_iid += 1;
197 let iid = self.next_iid;
198 self.strings.insert(s.to_string(), iid);
199 self.pending.push((s.to_string(), iid));
200 iid
201 }
202
203 fn take_pending(&mut self) -> Vec<(String, u64)> {
204 std::mem::take(&mut self.pending)
205 }
206
207 fn has_pending(&self) -> bool {
208 !self.pending.is_empty()
209 }
210}
211
212pub struct PerfettoFileSink {
214 writer: BufWriter<File>,
215 pending_packets: Vec<TracePacket>,
216 next_track_id: AtomicU64,
217 sequence_id: u32,
219 event_names: InternedStrings,
220 annotation_names: InternedStrings,
221 span_info: HashMap<u64, SpanInfo>,
222 thread_tracks: HashMap<String, u64>,
224 process_track: u64,
226 pid: i32,
227 process_name: String,
228 target_filter: Targets,
229 trace_mode: PerfettoTraceMode,
230}
231
232impl PerfettoFileSink {
233 pub fn new(
242 trace_dir: impl AsRef<Path>,
243 execution_id: &str,
244 process_name: &str,
245 ) -> Result<Self> {
246 let trace_dir = trace_dir.as_ref().to_path_buf();
247 let pid = std::process::id() as i32;
248
249 let executions_dir = trace_dir.join("executions");
250 let execution_dir = executions_dir.join(execution_id);
251 fs::create_dir_all(&execution_dir)?;
252
253 let latest_link = executions_dir.join("latest");
255 let _ = fs::remove_file(&latest_link);
256 if let Err(e) = symlink(execution_id, &latest_link) {
257 tracing::debug!("Failed to create latest symlink: {}", e);
258 }
259
260 let path = execution_dir.join(format!("{}.pftrace", process_name));
261 let file = File::create(&path)?;
262 let writer = BufWriter::new(file);
263
264 let sequence_id = SystemTime::now()
265 .duration_since(UNIX_EPOCH)
266 .unwrap()
267 .as_secs() as u32;
268
269 let mut sink = Self {
270 writer,
271 pending_packets: Vec::new(),
272 next_track_id: AtomicU64::new(1), sequence_id,
274 event_names: InternedStrings::default(),
275 annotation_names: InternedStrings::default(),
276 span_info: HashMap::new(),
277 thread_tracks: HashMap::new(),
278 process_track: 0,
279 pid,
280 process_name: process_name.to_string(),
281 target_filter: Targets::new().with_default({
282 let log_level_str =
283 hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
284 .unwrap_or_else(|| "info".to_string());
285 let level =
286 tracing::Level::from_str(&log_level_str).unwrap_or(tracing::Level::INFO);
287 LevelFilter::from_level(level)
288 }),
289 trace_mode: hyperactor_config::global::get(PERFETTO_TRACE_MODE),
290 };
291
292 sink.write_sequence_header();
293
294 sink.process_track = sink.create_process_track();
295
296 Ok(sink)
297 }
298
299 fn next_track_id(&self) -> u64 {
300 self.next_track_id.fetch_add(1, Ordering::Relaxed)
301 }
302
303 fn write_sequence_header(&mut self) {
304 let packet = TracePacket {
305 incremental_state_cleared: Some(true),
306 first_packet_on_sequence: Some(true),
307 sequence_flags: Some(3),
308 optional_trusted_packet_sequence_id: Some(
309 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
310 ),
311 ..Default::default()
312 };
313
314 self.write_packet(&packet);
315 }
316
317 fn create_process_track(&mut self) -> u64 {
318 let track_id = self.next_track_id();
319
320 let packet = TracePacket {
321 data: Some(Data::TrackDescriptor(TrackDescriptor {
322 uuid: Some(track_id),
323 process: Some(ProcessDescriptor {
324 pid: Some(self.pid),
325 process_name: Some(self.process_name.clone()),
326 ..Default::default()
327 }),
328 ..Default::default()
329 })),
330 optional_trusted_packet_sequence_id: Some(
331 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
332 ),
333 ..Default::default()
334 };
335
336 self.write_packet(&packet);
337
338 track_id
339 }
340
341 fn get_or_create_thread_track(&mut self, thread_name: &str) -> u64 {
342 if let Some(&track) = self.thread_tracks.get(thread_name) {
343 return track;
344 }
345
346 let track_id = self.next_track_id();
347 let tid = self.thread_tracks.len() as i32 + 1;
348
349 let packet = TracePacket {
350 data: Some(Data::TrackDescriptor(TrackDescriptor {
351 uuid: Some(track_id),
352 parent_uuid: Some(self.process_track),
353 static_or_dynamic_name: Some(StaticOrDynamicName::Name(thread_name.to_string())),
354 thread: Some(ThreadDescriptor {
355 pid: Some(self.pid),
356 tid: Some(tid),
357 thread_name: Some(thread_name.to_string()),
358 ..Default::default()
359 }),
360 ..Default::default()
361 })),
362 optional_trusted_packet_sequence_id: Some(
363 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
364 ),
365 ..Default::default()
366 };
367 self.write_packet(&packet);
368
369 self.thread_tracks.insert(thread_name.to_string(), track_id);
370
371 track_id
372 }
373
374 fn write_packet(&mut self, packet: &TracePacket) {
375 self.pending_packets.push(packet.clone());
376 }
377
378 fn write_pending_packets(&mut self) -> Result<()> {
379 if self.pending_packets.is_empty() {
380 return Ok(());
381 }
382
383 let trace = Trace {
384 packet: std::mem::take(&mut self.pending_packets),
385 };
386
387 let bytes = trace.encode_to_vec();
388 self.writer.write_all(&bytes)?;
389 Ok(())
390 }
391
392 fn flush_interned_data(&mut self) {
393 if !self.event_names.has_pending() && !self.annotation_names.has_pending() {
394 return;
395 }
396
397 let mut interned_data = InternedData::default();
398
399 for (name, iid) in self.event_names.take_pending() {
400 interned_data.event_names.push(EventName {
401 iid: Some(iid),
402 name: Some(name),
403 ..Default::default()
404 });
405 }
406
407 for (name, iid) in self.annotation_names.take_pending() {
408 interned_data
409 .debug_annotation_names
410 .push(DebugAnnotationName {
411 iid: Some(iid),
412 name: Some(name),
413 ..Default::default()
414 });
415 }
416
417 let packet = TracePacket {
418 interned_data: Some(interned_data),
419 optional_trusted_packet_sequence_id: Some(
420 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
421 ),
422 sequence_flags: Some(2), ..Default::default()
424 };
425 self.write_packet(&packet);
426 }
427
428 fn timestamp_ns(ts: SystemTime) -> u64 {
429 ts.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as u64
430 }
431
432 fn field_to_debug_annotation(&mut self, key: &str, value: &FieldValue) -> DebugAnnotation {
433 let name_iid = self.annotation_names.intern(key);
434 let dbg_value = match value {
435 FieldValue::Bool(b) => Some(DbgValue::BoolValue(*b)),
436 FieldValue::I64(i) => Some(DbgValue::IntValue(*i)),
437 FieldValue::U64(u) => Some(DbgValue::IntValue(*u as i64)),
438 FieldValue::F64(f) => Some(DbgValue::DoubleValue(*f)),
439 FieldValue::Str(s) => Some(DbgValue::StringValue(s.clone())),
440 FieldValue::Debug(d) => Some(DbgValue::StringValue(d.clone())),
441 };
442
443 DebugAnnotation {
444 name_field: Some(NameField::NameIid(name_iid)),
445 value: dbg_value,
446 ..Default::default()
447 }
448 }
449
450 fn write_slice_begin(
451 &mut self,
452 track: u64,
453 timestamp: SystemTime,
454 name: &str,
455 fields: &TraceFields,
456 file: Option<&str>,
457 line: Option<u32>,
458 ) {
459 self.flush_interned_data();
460
461 let name_iid = self.event_names.intern(name);
462
463 let mut debug_annotations = Vec::new();
464 for (key, value) in fields {
465 debug_annotations.push(self.field_to_debug_annotation(key, value));
466 }
467 if let Some(f) = file {
468 debug_annotations
469 .push(self.field_to_debug_annotation("file", &FieldValue::Str(f.to_string())));
470 }
471 if let Some(l) = line {
472 debug_annotations
473 .push(self.field_to_debug_annotation("line", &FieldValue::U64(l as u64)));
474 }
475
476 self.flush_interned_data();
477
478 let packet = TracePacket {
479 timestamp: Some(Self::timestamp_ns(timestamp)),
480 data: Some(Data::TrackEvent(TrackEvent {
481 track_uuid: Some(track),
482 r#type: Some(TrackEventType::SliceBegin as i32),
483 name_field: Some(EventNameField::NameIid(name_iid)),
484 debug_annotations,
485 ..Default::default()
486 })),
487 optional_trusted_packet_sequence_id: Some(
488 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
489 ),
490 sequence_flags: Some(2),
491 ..Default::default()
492 };
493 self.write_packet(&packet);
494 }
495
496 fn write_slice_end(&mut self, track: u64, timestamp: SystemTime) {
497 let packet = TracePacket {
498 timestamp: Some(Self::timestamp_ns(timestamp)),
499 data: Some(Data::TrackEvent(TrackEvent {
500 track_uuid: Some(track),
501 r#type: Some(TrackEventType::SliceEnd as i32),
502 ..Default::default()
503 })),
504 optional_trusted_packet_sequence_id: Some(
505 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
506 ),
507 sequence_flags: Some(2),
508 ..Default::default()
509 };
510 self.write_packet(&packet);
511 }
512
513 fn write_instant(
514 &mut self,
515 track: u64,
516 timestamp: SystemTime,
517 name: &str,
518 fields: &TraceFields,
519 ) {
520 self.flush_interned_data();
521
522 let name_iid = self.event_names.intern(name);
523
524 let mut debug_annotations = Vec::new();
525 for (key, value) in fields {
526 debug_annotations.push(self.field_to_debug_annotation(key, value));
527 }
528
529 self.flush_interned_data();
530
531 let packet = TracePacket {
532 timestamp: Some(Self::timestamp_ns(timestamp)),
533 data: Some(Data::TrackEvent(TrackEvent {
534 track_uuid: Some(track),
535 r#type: Some(TrackEventType::Instant as i32),
536 name_field: Some(EventNameField::NameIid(name_iid)),
537 debug_annotations,
538 ..Default::default()
539 })),
540 optional_trusted_packet_sequence_id: Some(
541 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
542 ),
543 sequence_flags: Some(2),
544 ..Default::default()
545 };
546 self.write_packet(&packet);
547 }
548}
549
550impl TraceEventSink for PerfettoFileSink {
551 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
552 match event {
553 TraceEvent::NewSpan {
554 id,
555 name,
556 target,
557 fields,
558 file,
559 line,
560 ..
561 } => {
562 if !self.trace_mode.should_include(target) {
563 return Ok(());
564 }
565
566 let display_name = if *target == USER_TELEMETRY_PREFIX {
569 if let Some(FieldValue::Str(n)) = get_field(fields, "name") {
570 n.clone()
571 } else {
572 name.to_string()
573 }
574 } else {
575 format!("{}::{}", target, name)
576 };
577
578 self.span_info.insert(
579 *id,
580 SpanInfo {
581 fq_name: display_name,
582 fields: fields.clone(),
583 file: *file,
584 line: *line,
585 },
586 );
587 }
588
589 TraceEvent::SpanEnter {
590 id,
591 timestamp,
592 thread_name,
593 } => {
594 if let Some(info) = self.span_info.get(id) {
595 let fq_name = info.fq_name.clone();
596 let fields = info.fields.clone();
597 let file = info.file;
598 let line = info.line;
599 let track = self.get_or_create_thread_track(thread_name);
600 self.write_slice_begin(track, *timestamp, &fq_name, &fields, file, line);
601 }
602 }
603
604 TraceEvent::SpanExit {
605 id,
606 timestamp,
607 thread_name,
608 } => {
609 if self.span_info.contains_key(id) {
610 let track = self.get_or_create_thread_track(thread_name);
611 self.write_slice_end(track, *timestamp);
612 }
613 }
614
615 TraceEvent::SpanClose { id, .. } => {
616 self.span_info.remove(id);
617 }
618
619 TraceEvent::Event {
620 name,
621 target,
622 fields,
623 timestamp,
624 thread_name,
625 ..
626 } => {
627 if !self.trace_mode.should_include(target) {
628 return Ok(());
629 }
630
631 let display_name = if self.trace_mode == PerfettoTraceMode::User {
634 if let Some(FieldValue::Str(msg)) = get_field(fields, "message") {
635 msg.clone()
636 } else {
637 name.to_string()
638 }
639 } else {
640 format!("{}::{}", target, name)
641 };
642
643 let track = self.get_or_create_thread_track(thread_name);
644 self.write_instant(track, *timestamp, &display_name, fields);
645 }
646 }
647
648 Ok(())
649 }
650
651 fn flush(&mut self) -> Result<(), anyhow::Error> {
652 self.flush_interned_data();
653 self.write_pending_packets()?;
654 self.writer.flush()?;
655 Ok(())
656 }
657
658 fn name(&self) -> &str {
659 "PerfettoFileSink"
660 }
661
662 fn target_filter(&self) -> Option<&Targets> {
663 Some(&self.target_filter)
664 }
665}