1use std::collections::HashMap;
33use std::fs;
34use std::fs::File;
35use std::io::BufWriter;
36use std::io::Write;
37use std::os::unix::fs::symlink;
38use std::path::Path;
39use std::path::PathBuf;
40use std::sync::atomic::AtomicU64;
41use std::sync::atomic::Ordering;
42use std::time::SystemTime;
43use std::time::UNIX_EPOCH;
44
45use anyhow::Result;
46use hyperactor_config::CONFIG;
47use hyperactor_config::ConfigAttr;
48use hyperactor_config::attrs::AttrValue;
49use hyperactor_config::attrs::declare_attrs;
50use hyperactor_config::typeuri::Named;
51use indexmap::IndexMap;
52use prost::Message;
53use serde::Deserialize;
54use serde::Serialize;
55use tracing_core::LevelFilter;
56use tracing_perfetto_sdk_schema::DebugAnnotation;
57use tracing_perfetto_sdk_schema::DebugAnnotationName;
58use tracing_perfetto_sdk_schema::EventName;
59use tracing_perfetto_sdk_schema::InternedData;
60use tracing_perfetto_sdk_schema::ProcessDescriptor;
61use tracing_perfetto_sdk_schema::ThreadDescriptor;
62use tracing_perfetto_sdk_schema::Trace;
63use tracing_perfetto_sdk_schema::TracePacket;
64use tracing_perfetto_sdk_schema::TrackDescriptor;
65use tracing_perfetto_sdk_schema::TrackEvent;
66use tracing_perfetto_sdk_schema::debug_annotation::NameField;
67use tracing_perfetto_sdk_schema::debug_annotation::Value as DbgValue;
68use tracing_perfetto_sdk_schema::trace_packet::Data;
69use tracing_perfetto_sdk_schema::trace_packet::OptionalTrustedPacketSequenceId;
70use tracing_perfetto_sdk_schema::track_descriptor::StaticOrDynamicName;
71use tracing_perfetto_sdk_schema::track_event::NameField as EventNameField;
72use tracing_perfetto_sdk_schema::track_event::Type as TrackEventType;
73use tracing_subscriber::filter::Targets;
74
75use crate::trace_dispatcher::FieldValue;
76use crate::trace_dispatcher::TraceEvent;
77use crate::trace_dispatcher::TraceEventSink;
78
79pub const USER_TELEMETRY_PREFIX: &str = "monarch_hyperactor::telemetry";
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
84pub enum PerfettoTraceMode {
85 Off,
87 #[default]
89 User,
90 Dev,
92}
93
94impl std::fmt::Display for PerfettoTraceMode {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 PerfettoTraceMode::Off => write!(f, "off"),
98 PerfettoTraceMode::User => write!(f, "user"),
99 PerfettoTraceMode::Dev => write!(f, "dev"),
100 }
101 }
102}
103
104impl std::str::FromStr for PerfettoTraceMode {
105 type Err = anyhow::Error;
106
107 fn from_str(s: &str) -> Result<Self, Self::Err> {
108 match s.to_lowercase().as_str() {
109 "off" | "false" | "0" | "none" => Ok(PerfettoTraceMode::Off),
110 "dev" | "all" | "debug" => Ok(PerfettoTraceMode::Dev),
111 "user" | "true" | "1" => Ok(PerfettoTraceMode::User),
112 _ => Err(anyhow::anyhow!("unknown trace mode: {}", s)),
113 }
114 }
115}
116
117impl Named for PerfettoTraceMode {
118 fn typename() -> &'static str {
119 "hyperactor_telemetry::sinks::perfetto::PerfettoTraceMode"
120 }
121}
122
123impl AttrValue for PerfettoTraceMode {
124 fn display(&self) -> String {
125 self.to_string()
126 }
127
128 fn parse(s: &str) -> Result<Self, anyhow::Error> {
129 s.parse()
130 }
131}
132
133impl PerfettoTraceMode {
134 pub fn should_include(&self, target: &str) -> bool {
136 match self {
137 PerfettoTraceMode::Off => false,
138 PerfettoTraceMode::User => target.starts_with(USER_TELEMETRY_PREFIX),
139 PerfettoTraceMode::Dev => true,
140 }
141 }
142}
143
144declare_attrs! {
145 @meta(CONFIG = ConfigAttr {
151 env_name: Some("PERFETTO_TRACE_MODE".to_string()),
152 py_name: Some("perfetto_trace_mode".to_string()),
153 })
154 pub attr PERFETTO_TRACE_MODE: PerfettoTraceMode = PerfettoTraceMode::User;
155}
156
157pub const MONARCH_TRACE_DIR_ENV: &str = "MONARCH_TRACE_DIR";
159
160pub fn default_trace_dir() -> PathBuf {
164 if let Ok(dir) = std::env::var(MONARCH_TRACE_DIR_ENV) {
165 return PathBuf::from(dir);
166 }
167 let username = whoami::username();
168 PathBuf::from(format!("/tmp/{}/monarch_traces", username))
169}
170
171struct SpanInfo {
173 track: u64,
174 fq_name: String,
176 fields: IndexMap<String, FieldValue>,
177 file: Option<&'static str>,
178 line: Option<u32>,
179}
180
181#[derive(Default)]
183struct InternedStrings {
184 next_iid: u64,
185 strings: HashMap<String, u64>,
186 pending: Vec<(String, u64)>,
187}
188
189impl InternedStrings {
190 fn intern(&mut self, s: &str) -> u64 {
191 if let Some(&iid) = self.strings.get(s) {
192 return iid;
193 }
194 self.next_iid += 1;
195 let iid = self.next_iid;
196 self.strings.insert(s.to_string(), iid);
197 self.pending.push((s.to_string(), iid));
198 iid
199 }
200
201 fn take_pending(&mut self) -> Vec<(String, u64)> {
202 std::mem::take(&mut self.pending)
203 }
204
205 fn has_pending(&self) -> bool {
206 !self.pending.is_empty()
207 }
208}
209
210pub struct PerfettoFileSink {
212 writer: BufWriter<File>,
213 pending_packets: Vec<TracePacket>,
214 next_track_id: AtomicU64,
215 sequence_id: u32,
217 event_names: InternedStrings,
218 annotation_names: InternedStrings,
219 span_info: HashMap<u64, SpanInfo>,
220 thread_tracks: HashMap<String, u64>,
222 process_track: u64,
224 pid: i32,
225 process_name: String,
226 target_filter: Targets,
227 trace_mode: PerfettoTraceMode,
228}
229
230impl PerfettoFileSink {
231 pub fn new(
240 trace_dir: impl AsRef<Path>,
241 execution_id: &str,
242 process_name: &str,
243 ) -> Result<Self> {
244 let trace_dir = trace_dir.as_ref().to_path_buf();
245 let pid = std::process::id() as i32;
246
247 let executions_dir = trace_dir.join("executions");
248 let execution_dir = executions_dir.join(execution_id);
249 fs::create_dir_all(&execution_dir)?;
250
251 let latest_link = executions_dir.join("latest");
253 let _ = fs::remove_file(&latest_link);
254 if let Err(e) = symlink(execution_id, &latest_link) {
255 tracing::debug!("Failed to create latest symlink: {}", e);
256 }
257
258 let path = execution_dir.join(format!("{}.pftrace", process_name));
259 let file = File::create(&path)?;
260 let writer = BufWriter::new(file);
261
262 let sequence_id = SystemTime::now()
263 .duration_since(UNIX_EPOCH)
264 .unwrap()
265 .as_secs() as u32;
266
267 let mut sink = Self {
268 writer,
269 pending_packets: Vec::new(),
270 next_track_id: AtomicU64::new(1), sequence_id,
272 event_names: InternedStrings::default(),
273 annotation_names: InternedStrings::default(),
274 span_info: HashMap::new(),
275 thread_tracks: HashMap::new(),
276 process_track: 0,
277 pid,
278 process_name: process_name.to_string(),
279 target_filter: Targets::new()
280 .with_target("tokio", LevelFilter::OFF)
281 .with_target("runtime", LevelFilter::OFF)
282 .with_default(LevelFilter::TRACE),
283 trace_mode: hyperactor_config::global::get(PERFETTO_TRACE_MODE),
284 };
285
286 sink.write_sequence_header();
287
288 sink.process_track = sink.create_process_track();
289
290 Ok(sink)
291 }
292
293 fn next_track_id(&self) -> u64 {
294 self.next_track_id.fetch_add(1, Ordering::Relaxed)
295 }
296
297 fn write_sequence_header(&mut self) {
298 let packet = TracePacket {
299 incremental_state_cleared: Some(true),
300 first_packet_on_sequence: Some(true),
301 sequence_flags: Some(3),
302 optional_trusted_packet_sequence_id: Some(
303 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
304 ),
305 ..Default::default()
306 };
307
308 self.write_packet(&packet);
309 }
310
311 fn create_process_track(&mut self) -> u64 {
312 let track_id = self.next_track_id();
313
314 let packet = TracePacket {
315 data: Some(Data::TrackDescriptor(TrackDescriptor {
316 uuid: Some(track_id),
317 process: Some(ProcessDescriptor {
318 pid: Some(self.pid),
319 process_name: Some(self.process_name.clone()),
320 ..Default::default()
321 }),
322 ..Default::default()
323 })),
324 optional_trusted_packet_sequence_id: Some(
325 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
326 ),
327 ..Default::default()
328 };
329
330 self.write_packet(&packet);
331
332 track_id
333 }
334
335 fn get_or_create_thread_track(&mut self, thread_name: &str) -> u64 {
336 if let Some(&track) = self.thread_tracks.get(thread_name) {
337 return track;
338 }
339
340 let track_id = self.next_track_id();
341 let tid = self.thread_tracks.len() as i32 + 1;
342
343 let packet = TracePacket {
344 data: Some(Data::TrackDescriptor(TrackDescriptor {
345 uuid: Some(track_id),
346 parent_uuid: Some(self.process_track),
347 static_or_dynamic_name: Some(StaticOrDynamicName::Name(thread_name.to_string())),
348 thread: Some(ThreadDescriptor {
349 pid: Some(self.pid),
350 tid: Some(tid),
351 thread_name: Some(thread_name.to_string()),
352 ..Default::default()
353 }),
354 ..Default::default()
355 })),
356 optional_trusted_packet_sequence_id: Some(
357 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
358 ),
359 ..Default::default()
360 };
361 self.write_packet(&packet);
362
363 self.thread_tracks.insert(thread_name.to_string(), track_id);
364
365 track_id
366 }
367
368 fn write_packet(&mut self, packet: &TracePacket) {
369 self.pending_packets.push(packet.clone());
370 }
371
372 fn write_pending_packets(&mut self) -> Result<()> {
373 if self.pending_packets.is_empty() {
374 return Ok(());
375 }
376
377 let trace = Trace {
378 packet: std::mem::take(&mut self.pending_packets),
379 };
380
381 let bytes = trace.encode_to_vec();
382 self.writer.write_all(&bytes)?;
383 Ok(())
384 }
385
386 fn flush_interned_data(&mut self) {
387 if !self.event_names.has_pending() && !self.annotation_names.has_pending() {
388 return;
389 }
390
391 let mut interned_data = InternedData::default();
392
393 for (name, iid) in self.event_names.take_pending() {
394 interned_data.event_names.push(EventName {
395 iid: Some(iid),
396 name: Some(name),
397 ..Default::default()
398 });
399 }
400
401 for (name, iid) in self.annotation_names.take_pending() {
402 interned_data
403 .debug_annotation_names
404 .push(DebugAnnotationName {
405 iid: Some(iid),
406 name: Some(name),
407 ..Default::default()
408 });
409 }
410
411 let packet = TracePacket {
412 interned_data: Some(interned_data),
413 optional_trusted_packet_sequence_id: Some(
414 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
415 ),
416 sequence_flags: Some(2), ..Default::default()
418 };
419 self.write_packet(&packet);
420 }
421
422 fn timestamp_ns(ts: SystemTime) -> u64 {
423 ts.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as u64
424 }
425
426 fn field_to_debug_annotation(&mut self, key: &str, value: &FieldValue) -> DebugAnnotation {
427 let name_iid = self.annotation_names.intern(key);
428 let dbg_value = match value {
429 FieldValue::Bool(b) => Some(DbgValue::BoolValue(*b)),
430 FieldValue::I64(i) => Some(DbgValue::IntValue(*i)),
431 FieldValue::U64(u) => Some(DbgValue::IntValue(*u as i64)),
432 FieldValue::F64(f) => Some(DbgValue::DoubleValue(*f)),
433 FieldValue::Str(s) => Some(DbgValue::StringValue(s.clone())),
434 FieldValue::Debug(d) => Some(DbgValue::StringValue(d.clone())),
435 };
436
437 DebugAnnotation {
438 name_field: Some(NameField::NameIid(name_iid)),
439 value: dbg_value,
440 ..Default::default()
441 }
442 }
443
444 fn write_slice_begin(
445 &mut self,
446 track: u64,
447 timestamp: SystemTime,
448 name: &str,
449 fields: &IndexMap<String, FieldValue>,
450 file: Option<&str>,
451 line: Option<u32>,
452 ) {
453 self.flush_interned_data();
454
455 let name_iid = self.event_names.intern(name);
456
457 let mut debug_annotations = Vec::new();
458 for (key, value) in fields {
459 debug_annotations.push(self.field_to_debug_annotation(key, value));
460 }
461 if let Some(f) = file {
462 debug_annotations
463 .push(self.field_to_debug_annotation("file", &FieldValue::Str(f.to_string())));
464 }
465 if let Some(l) = line {
466 debug_annotations
467 .push(self.field_to_debug_annotation("line", &FieldValue::U64(l as u64)));
468 }
469
470 self.flush_interned_data();
471
472 let packet = TracePacket {
473 timestamp: Some(Self::timestamp_ns(timestamp)),
474 data: Some(Data::TrackEvent(TrackEvent {
475 track_uuid: Some(track),
476 r#type: Some(TrackEventType::SliceBegin as i32),
477 name_field: Some(EventNameField::NameIid(name_iid)),
478 debug_annotations,
479 ..Default::default()
480 })),
481 optional_trusted_packet_sequence_id: Some(
482 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
483 ),
484 sequence_flags: Some(2),
485 ..Default::default()
486 };
487 self.write_packet(&packet);
488 }
489
490 fn write_slice_end(&mut self, track: u64, timestamp: SystemTime) {
491 let packet = TracePacket {
492 timestamp: Some(Self::timestamp_ns(timestamp)),
493 data: Some(Data::TrackEvent(TrackEvent {
494 track_uuid: Some(track),
495 r#type: Some(TrackEventType::SliceEnd as i32),
496 ..Default::default()
497 })),
498 optional_trusted_packet_sequence_id: Some(
499 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
500 ),
501 sequence_flags: Some(2),
502 ..Default::default()
503 };
504 self.write_packet(&packet);
505 }
506
507 fn write_instant(
508 &mut self,
509 track: u64,
510 timestamp: SystemTime,
511 name: &str,
512 fields: &IndexMap<String, FieldValue>,
513 ) {
514 self.flush_interned_data();
515
516 let name_iid = self.event_names.intern(name);
517
518 let mut debug_annotations = Vec::new();
519 for (key, value) in fields {
520 debug_annotations.push(self.field_to_debug_annotation(key, value));
521 }
522
523 self.flush_interned_data();
524
525 let packet = TracePacket {
526 timestamp: Some(Self::timestamp_ns(timestamp)),
527 data: Some(Data::TrackEvent(TrackEvent {
528 track_uuid: Some(track),
529 r#type: Some(TrackEventType::Instant as i32),
530 name_field: Some(EventNameField::NameIid(name_iid)),
531 debug_annotations,
532 ..Default::default()
533 })),
534 optional_trusted_packet_sequence_id: Some(
535 OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
536 ),
537 sequence_flags: Some(2),
538 ..Default::default()
539 };
540 self.write_packet(&packet);
541 }
542}
543
544impl TraceEventSink for PerfettoFileSink {
545 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
546 match event {
547 TraceEvent::NewSpan {
548 id,
549 name,
550 target,
551 fields,
552 thread_name,
553 file,
554 line,
555 ..
556 } => {
557 if !self.trace_mode.should_include(target) {
558 return Ok(());
559 }
560
561 let track = self.get_or_create_thread_track(thread_name);
562
563 let display_name = if self.trace_mode == PerfettoTraceMode::User {
566 if let Some(FieldValue::Str(n)) = fields.get("name") {
567 n.clone()
568 } else {
569 name.to_string()
570 }
571 } else {
572 format!("{}::{}", target, name)
573 };
574
575 self.span_info.insert(
576 *id,
577 SpanInfo {
578 track,
579 fq_name: display_name,
580 fields: fields.clone(),
581 file: *file,
582 line: *line,
583 },
584 );
585 }
586
587 TraceEvent::SpanEnter { id, timestamp } => {
588 if let Some(info) = self.span_info.get(id) {
589 let track = info.track;
590 let fq_name = info.fq_name.clone();
591 let fields = info.fields.clone();
592 let file = info.file;
593 let line = info.line;
594 self.write_slice_begin(track, *timestamp, &fq_name, &fields, file, line);
595 }
596 }
597
598 TraceEvent::SpanExit { id, timestamp } => {
599 if let Some(info) = self.span_info.get(id) {
600 self.write_slice_end(info.track, *timestamp);
601 }
602 }
603
604 TraceEvent::SpanClose { id, .. } => {
605 self.span_info.remove(id);
606 }
607
608 TraceEvent::Event {
609 name,
610 target,
611 fields,
612 timestamp,
613 thread_name,
614 ..
615 } => {
616 if !self.trace_mode.should_include(target) {
617 return Ok(());
618 }
619
620 let display_name = if self.trace_mode == PerfettoTraceMode::User {
623 if let Some(FieldValue::Str(msg)) = fields.get("message") {
624 msg.clone()
625 } else {
626 name.to_string()
627 }
628 } else {
629 format!("{}::{}", target, name)
630 };
631
632 let track = self.get_or_create_thread_track(thread_name);
633 self.write_instant(track, *timestamp, &display_name, fields);
634 }
635 }
636
637 Ok(())
638 }
639
640 fn flush(&mut self) -> Result<(), anyhow::Error> {
641 self.flush_interned_data();
642 self.write_pending_packets()?;
643 self.writer.flush()?;
644 Ok(())
645 }
646
647 fn name(&self) -> &str {
648 "PerfettoFileSink"
649 }
650
651 fn target_filter(&self) -> Option<&Targets> {
652 Some(&self.target_filter)
653 }
654}