hyperactor_telemetry/sinks/
perfetto.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Perfetto sink that writes trace events directly to .pftrace files on disk.
10//!
11//! This provides an alternative to Scuba-based tracing that:
12//! - Has no row limits (vs 400K Scuba limit)
13//! - Has no ingestion latency (immediate file writes)
14//! - Uses native Perfetto protobuf format (no conversion needed)
15//! - Supports distributed file systems like OILFS
16//!
17//! ## Directory Layout
18//!
19//! ```text
20//! {trace_dir}/
21//! ├── executions/
22//! │   ├── {execution_id}/
23//! │   │   ├── {process_name}.pftrace
24//! │   │   └── ...
25//! │   └── latest -> {execution_id}/   # symlink to most recent
26//! ```
27//!
28//! ## Default Trace Directory
29//!
30//! If not specified, traces are written to `/tmp/{username}/monarch_traces/`
31
32use std::collections::HashMap;
33use std::fs;
34use std::fs::File;
35use std::io::BufWriter;
36use std::io::Write;
37use std::os::unix::fs::symlink;
38use std::path::Path;
39use std::path::PathBuf;
40use std::sync::atomic::AtomicU64;
41use std::sync::atomic::Ordering;
42use std::time::SystemTime;
43use std::time::UNIX_EPOCH;
44
45use anyhow::Result;
46use hyperactor_config::CONFIG;
47use hyperactor_config::ConfigAttr;
48use hyperactor_config::attrs::AttrValue;
49use hyperactor_config::attrs::declare_attrs;
50use hyperactor_config::typeuri::Named;
51use indexmap::IndexMap;
52use prost::Message;
53use serde::Deserialize;
54use serde::Serialize;
55use tracing_core::LevelFilter;
56use tracing_perfetto_sdk_schema::DebugAnnotation;
57use tracing_perfetto_sdk_schema::DebugAnnotationName;
58use tracing_perfetto_sdk_schema::EventName;
59use tracing_perfetto_sdk_schema::InternedData;
60use tracing_perfetto_sdk_schema::ProcessDescriptor;
61use tracing_perfetto_sdk_schema::ThreadDescriptor;
62use tracing_perfetto_sdk_schema::Trace;
63use tracing_perfetto_sdk_schema::TracePacket;
64use tracing_perfetto_sdk_schema::TrackDescriptor;
65use tracing_perfetto_sdk_schema::TrackEvent;
66use tracing_perfetto_sdk_schema::debug_annotation::NameField;
67use tracing_perfetto_sdk_schema::debug_annotation::Value as DbgValue;
68use tracing_perfetto_sdk_schema::trace_packet::Data;
69use tracing_perfetto_sdk_schema::trace_packet::OptionalTrustedPacketSequenceId;
70use tracing_perfetto_sdk_schema::track_descriptor::StaticOrDynamicName;
71use tracing_perfetto_sdk_schema::track_event::NameField as EventNameField;
72use tracing_perfetto_sdk_schema::track_event::Type as TrackEventType;
73use tracing_subscriber::filter::Targets;
74
75use crate::trace_dispatcher::FieldValue;
76use crate::trace_dispatcher::TraceEvent;
77use crate::trace_dispatcher::TraceEventSink;
78
79/// The target prefix for user-facing telemetry spans.
80pub const USER_TELEMETRY_PREFIX: &str = "monarch_hyperactor::telemetry";
81
82/// Controls what events are captured in Perfetto traces.
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
84pub enum PerfettoTraceMode {
85    /// Tracing is disabled - no events are written.
86    Off,
87    /// Only user-facing telemetry events (target starts with `monarch_hyperactor::telemetry`).
88    #[default]
89    User,
90    /// All events (for debugging/development).
91    Dev,
92}
93
94impl std::fmt::Display for PerfettoTraceMode {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            PerfettoTraceMode::Off => write!(f, "off"),
98            PerfettoTraceMode::User => write!(f, "user"),
99            PerfettoTraceMode::Dev => write!(f, "dev"),
100        }
101    }
102}
103
104impl std::str::FromStr for PerfettoTraceMode {
105    type Err = anyhow::Error;
106
107    fn from_str(s: &str) -> Result<Self, Self::Err> {
108        match s.to_lowercase().as_str() {
109            "off" | "false" | "0" | "none" => Ok(PerfettoTraceMode::Off),
110            "dev" | "all" | "debug" => Ok(PerfettoTraceMode::Dev),
111            "user" | "true" | "1" => Ok(PerfettoTraceMode::User),
112            _ => Err(anyhow::anyhow!("unknown trace mode: {}", s)),
113        }
114    }
115}
116
117impl Named for PerfettoTraceMode {
118    fn typename() -> &'static str {
119        "hyperactor_telemetry::sinks::perfetto::PerfettoTraceMode"
120    }
121}
122
123impl AttrValue for PerfettoTraceMode {
124    fn display(&self) -> String {
125        self.to_string()
126    }
127
128    fn parse(s: &str) -> Result<Self, anyhow::Error> {
129        s.parse()
130    }
131}
132
133impl PerfettoTraceMode {
134    /// Returns true if the given target should be included in the trace.
135    pub fn should_include(&self, target: &str) -> bool {
136        match self {
137            PerfettoTraceMode::Off => false,
138            PerfettoTraceMode::User => target.starts_with(USER_TELEMETRY_PREFIX),
139            PerfettoTraceMode::Dev => true,
140        }
141    }
142}
143
144declare_attrs! {
145    /// Perfetto trace mode controlling which events are captured.
146    /// Valid values: "off", "user" (default), "dev"
147    /// - "off": Tracing is disabled
148    /// - "user": Only user-facing telemetry events (monarch_hyperactor::telemetry::*)
149    /// - "dev": All events (for debugging)
150    @meta(CONFIG = ConfigAttr {
151        env_name: Some("PERFETTO_TRACE_MODE".to_string()),
152        py_name: Some("perfetto_trace_mode".to_string()),
153    })
154    pub attr PERFETTO_TRACE_MODE: PerfettoTraceMode = PerfettoTraceMode::User;
155}
156
157/// Environment variable to override the default trace directory.
158pub const MONARCH_TRACE_DIR_ENV: &str = "MONARCH_TRACE_DIR";
159
160/// Returns the default trace directory.
161///
162/// Uses `$MONARCH_TRACE_DIR` if set, otherwise `/tmp/{username}/monarch_traces/`.
163pub fn default_trace_dir() -> PathBuf {
164    if let Ok(dir) = std::env::var(MONARCH_TRACE_DIR_ENV) {
165        return PathBuf::from(dir);
166    }
167    let username = whoami::username();
168    PathBuf::from(format!("/tmp/{}/monarch_traces", username))
169}
170
171/// Metadata stored for each span, used when Enter/Exit events occur.
172struct SpanInfo {
173    track: u64,
174    /// Fully qualified name: {target}::{name}
175    fq_name: String,
176    fields: IndexMap<String, FieldValue>,
177    file: Option<&'static str>,
178    line: Option<u32>,
179}
180
181/// String interning for Perfetto trace compression.
182#[derive(Default)]
183struct InternedStrings {
184    next_iid: u64,
185    strings: HashMap<String, u64>,
186    pending: Vec<(String, u64)>,
187}
188
189impl InternedStrings {
190    fn intern(&mut self, s: &str) -> u64 {
191        if let Some(&iid) = self.strings.get(s) {
192            return iid;
193        }
194        self.next_iid += 1;
195        let iid = self.next_iid;
196        self.strings.insert(s.to_string(), iid);
197        self.pending.push((s.to_string(), iid));
198        iid
199    }
200
201    fn take_pending(&mut self) -> Vec<(String, u64)> {
202        std::mem::take(&mut self.pending)
203    }
204
205    fn has_pending(&self) -> bool {
206        !self.pending.is_empty()
207    }
208}
209
210/// File-based Perfetto sink that writes .pftrace files.
211pub struct PerfettoFileSink {
212    writer: BufWriter<File>,
213    pending_packets: Vec<TracePacket>,
214    next_track_id: AtomicU64,
215    /// Trusted packet sequence ID
216    sequence_id: u32,
217    event_names: InternedStrings,
218    annotation_names: InternedStrings,
219    span_info: HashMap<u64, SpanInfo>,
220    /// Maps thread names to track ids
221    thread_tracks: HashMap<String, u64>,
222    /// track_id of this process
223    process_track: u64,
224    pid: i32,
225    process_name: String,
226    target_filter: Targets,
227    trace_mode: PerfettoTraceMode,
228}
229
230impl PerfettoFileSink {
231    /// Create a new Perfetto file sink.
232    ///
233    /// # Arguments
234    /// * `trace_dir` - Base directory for trace files (use `default_trace_dir()` for default)
235    /// * `execution_id` - Unique identifier for this execution/run
236    /// * `process_name` - Name of this process (used in directory layout)
237    ///
238    /// Creates the directory structure and updates the `latest` symlink.
239    pub fn new(
240        trace_dir: impl AsRef<Path>,
241        execution_id: &str,
242        process_name: &str,
243    ) -> Result<Self> {
244        let trace_dir = trace_dir.as_ref().to_path_buf();
245        let pid = std::process::id() as i32;
246
247        let executions_dir = trace_dir.join("executions");
248        let execution_dir = executions_dir.join(execution_id);
249        fs::create_dir_all(&execution_dir)?;
250
251        // Update the `latest` symlink
252        let latest_link = executions_dir.join("latest");
253        let _ = fs::remove_file(&latest_link);
254        if let Err(e) = symlink(execution_id, &latest_link) {
255            tracing::debug!("Failed to create latest symlink: {}", e);
256        }
257
258        let path = execution_dir.join(format!("{}.pftrace", process_name));
259        let file = File::create(&path)?;
260        let writer = BufWriter::new(file);
261
262        let sequence_id = SystemTime::now()
263            .duration_since(UNIX_EPOCH)
264            .unwrap()
265            .as_secs() as u32;
266
267        let mut sink = Self {
268            writer,
269            pending_packets: Vec::new(),
270            next_track_id: AtomicU64::new(1), // Start at 1, 0 is reserved
271            sequence_id,
272            event_names: InternedStrings::default(),
273            annotation_names: InternedStrings::default(),
274            span_info: HashMap::new(),
275            thread_tracks: HashMap::new(),
276            process_track: 0,
277            pid,
278            process_name: process_name.to_string(),
279            target_filter: Targets::new()
280                .with_target("tokio", LevelFilter::OFF)
281                .with_target("runtime", LevelFilter::OFF)
282                .with_default(LevelFilter::TRACE),
283            trace_mode: hyperactor_config::global::get(PERFETTO_TRACE_MODE),
284        };
285
286        sink.write_sequence_header();
287
288        sink.process_track = sink.create_process_track();
289
290        Ok(sink)
291    }
292
293    fn next_track_id(&self) -> u64 {
294        self.next_track_id.fetch_add(1, Ordering::Relaxed)
295    }
296
297    fn write_sequence_header(&mut self) {
298        let packet = TracePacket {
299            incremental_state_cleared: Some(true),
300            first_packet_on_sequence: Some(true),
301            sequence_flags: Some(3),
302            optional_trusted_packet_sequence_id: Some(
303                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
304            ),
305            ..Default::default()
306        };
307
308        self.write_packet(&packet);
309    }
310
311    fn create_process_track(&mut self) -> u64 {
312        let track_id = self.next_track_id();
313
314        let packet = TracePacket {
315            data: Some(Data::TrackDescriptor(TrackDescriptor {
316                uuid: Some(track_id),
317                process: Some(ProcessDescriptor {
318                    pid: Some(self.pid),
319                    process_name: Some(self.process_name.clone()),
320                    ..Default::default()
321                }),
322                ..Default::default()
323            })),
324            optional_trusted_packet_sequence_id: Some(
325                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
326            ),
327            ..Default::default()
328        };
329
330        self.write_packet(&packet);
331
332        track_id
333    }
334
335    fn get_or_create_thread_track(&mut self, thread_name: &str) -> u64 {
336        if let Some(&track) = self.thread_tracks.get(thread_name) {
337            return track;
338        }
339
340        let track_id = self.next_track_id();
341        let tid = self.thread_tracks.len() as i32 + 1;
342
343        let packet = TracePacket {
344            data: Some(Data::TrackDescriptor(TrackDescriptor {
345                uuid: Some(track_id),
346                parent_uuid: Some(self.process_track),
347                static_or_dynamic_name: Some(StaticOrDynamicName::Name(thread_name.to_string())),
348                thread: Some(ThreadDescriptor {
349                    pid: Some(self.pid),
350                    tid: Some(tid),
351                    thread_name: Some(thread_name.to_string()),
352                    ..Default::default()
353                }),
354                ..Default::default()
355            })),
356            optional_trusted_packet_sequence_id: Some(
357                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
358            ),
359            ..Default::default()
360        };
361        self.write_packet(&packet);
362
363        self.thread_tracks.insert(thread_name.to_string(), track_id);
364
365        track_id
366    }
367
368    fn write_packet(&mut self, packet: &TracePacket) {
369        self.pending_packets.push(packet.clone());
370    }
371
372    fn write_pending_packets(&mut self) -> Result<()> {
373        if self.pending_packets.is_empty() {
374            return Ok(());
375        }
376
377        let trace = Trace {
378            packet: std::mem::take(&mut self.pending_packets),
379        };
380
381        let bytes = trace.encode_to_vec();
382        self.writer.write_all(&bytes)?;
383        Ok(())
384    }
385
386    fn flush_interned_data(&mut self) {
387        if !self.event_names.has_pending() && !self.annotation_names.has_pending() {
388            return;
389        }
390
391        let mut interned_data = InternedData::default();
392
393        for (name, iid) in self.event_names.take_pending() {
394            interned_data.event_names.push(EventName {
395                iid: Some(iid),
396                name: Some(name),
397                ..Default::default()
398            });
399        }
400
401        for (name, iid) in self.annotation_names.take_pending() {
402            interned_data
403                .debug_annotation_names
404                .push(DebugAnnotationName {
405                    iid: Some(iid),
406                    name: Some(name),
407                    ..Default::default()
408                });
409        }
410
411        let packet = TracePacket {
412            interned_data: Some(interned_data),
413            optional_trusted_packet_sequence_id: Some(
414                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
415            ),
416            sequence_flags: Some(2), // SEQ_NEEDS_INCREMENTAL_STATE
417            ..Default::default()
418        };
419        self.write_packet(&packet);
420    }
421
422    fn timestamp_ns(ts: SystemTime) -> u64 {
423        ts.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as u64
424    }
425
426    fn field_to_debug_annotation(&mut self, key: &str, value: &FieldValue) -> DebugAnnotation {
427        let name_iid = self.annotation_names.intern(key);
428        let dbg_value = match value {
429            FieldValue::Bool(b) => Some(DbgValue::BoolValue(*b)),
430            FieldValue::I64(i) => Some(DbgValue::IntValue(*i)),
431            FieldValue::U64(u) => Some(DbgValue::IntValue(*u as i64)),
432            FieldValue::F64(f) => Some(DbgValue::DoubleValue(*f)),
433            FieldValue::Str(s) => Some(DbgValue::StringValue(s.clone())),
434            FieldValue::Debug(d) => Some(DbgValue::StringValue(d.clone())),
435        };
436
437        DebugAnnotation {
438            name_field: Some(NameField::NameIid(name_iid)),
439            value: dbg_value,
440            ..Default::default()
441        }
442    }
443
444    fn write_slice_begin(
445        &mut self,
446        track: u64,
447        timestamp: SystemTime,
448        name: &str,
449        fields: &IndexMap<String, FieldValue>,
450        file: Option<&str>,
451        line: Option<u32>,
452    ) {
453        self.flush_interned_data();
454
455        let name_iid = self.event_names.intern(name);
456
457        let mut debug_annotations = Vec::new();
458        for (key, value) in fields {
459            debug_annotations.push(self.field_to_debug_annotation(key, value));
460        }
461        if let Some(f) = file {
462            debug_annotations
463                .push(self.field_to_debug_annotation("file", &FieldValue::Str(f.to_string())));
464        }
465        if let Some(l) = line {
466            debug_annotations
467                .push(self.field_to_debug_annotation("line", &FieldValue::U64(l as u64)));
468        }
469
470        self.flush_interned_data();
471
472        let packet = TracePacket {
473            timestamp: Some(Self::timestamp_ns(timestamp)),
474            data: Some(Data::TrackEvent(TrackEvent {
475                track_uuid: Some(track),
476                r#type: Some(TrackEventType::SliceBegin as i32),
477                name_field: Some(EventNameField::NameIid(name_iid)),
478                debug_annotations,
479                ..Default::default()
480            })),
481            optional_trusted_packet_sequence_id: Some(
482                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
483            ),
484            sequence_flags: Some(2),
485            ..Default::default()
486        };
487        self.write_packet(&packet);
488    }
489
490    fn write_slice_end(&mut self, track: u64, timestamp: SystemTime) {
491        let packet = TracePacket {
492            timestamp: Some(Self::timestamp_ns(timestamp)),
493            data: Some(Data::TrackEvent(TrackEvent {
494                track_uuid: Some(track),
495                r#type: Some(TrackEventType::SliceEnd as i32),
496                ..Default::default()
497            })),
498            optional_trusted_packet_sequence_id: Some(
499                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
500            ),
501            sequence_flags: Some(2),
502            ..Default::default()
503        };
504        self.write_packet(&packet);
505    }
506
507    fn write_instant(
508        &mut self,
509        track: u64,
510        timestamp: SystemTime,
511        name: &str,
512        fields: &IndexMap<String, FieldValue>,
513    ) {
514        self.flush_interned_data();
515
516        let name_iid = self.event_names.intern(name);
517
518        let mut debug_annotations = Vec::new();
519        for (key, value) in fields {
520            debug_annotations.push(self.field_to_debug_annotation(key, value));
521        }
522
523        self.flush_interned_data();
524
525        let packet = TracePacket {
526            timestamp: Some(Self::timestamp_ns(timestamp)),
527            data: Some(Data::TrackEvent(TrackEvent {
528                track_uuid: Some(track),
529                r#type: Some(TrackEventType::Instant as i32),
530                name_field: Some(EventNameField::NameIid(name_iid)),
531                debug_annotations,
532                ..Default::default()
533            })),
534            optional_trusted_packet_sequence_id: Some(
535                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
536            ),
537            sequence_flags: Some(2),
538            ..Default::default()
539        };
540        self.write_packet(&packet);
541    }
542}
543
544impl TraceEventSink for PerfettoFileSink {
545    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
546        match event {
547            TraceEvent::NewSpan {
548                id,
549                name,
550                target,
551                fields,
552                thread_name,
553                file,
554                line,
555                ..
556            } => {
557                if !self.trace_mode.should_include(target) {
558                    return Ok(());
559                }
560
561                let track = self.get_or_create_thread_track(thread_name);
562
563                // In user mode, prefer the "name" field if present for display
564                // In dev mode, use the fully qualified name
565                let display_name = if self.trace_mode == PerfettoTraceMode::User {
566                    if let Some(FieldValue::Str(n)) = fields.get("name") {
567                        n.clone()
568                    } else {
569                        name.to_string()
570                    }
571                } else {
572                    format!("{}::{}", target, name)
573                };
574
575                self.span_info.insert(
576                    *id,
577                    SpanInfo {
578                        track,
579                        fq_name: display_name,
580                        fields: fields.clone(),
581                        file: *file,
582                        line: *line,
583                    },
584                );
585            }
586
587            TraceEvent::SpanEnter { id, timestamp } => {
588                if let Some(info) = self.span_info.get(id) {
589                    let track = info.track;
590                    let fq_name = info.fq_name.clone();
591                    let fields = info.fields.clone();
592                    let file = info.file;
593                    let line = info.line;
594                    self.write_slice_begin(track, *timestamp, &fq_name, &fields, file, line);
595                }
596            }
597
598            TraceEvent::SpanExit { id, timestamp } => {
599                if let Some(info) = self.span_info.get(id) {
600                    self.write_slice_end(info.track, *timestamp);
601                }
602            }
603
604            TraceEvent::SpanClose { id, .. } => {
605                self.span_info.remove(id);
606            }
607
608            TraceEvent::Event {
609                name,
610                target,
611                fields,
612                timestamp,
613                thread_name,
614                ..
615            } => {
616                if !self.trace_mode.should_include(target) {
617                    return Ok(());
618                }
619
620                // In user mode, prefer the "message" field if present for display
621                // In dev mode, use the fully qualified name
622                let display_name = if self.trace_mode == PerfettoTraceMode::User {
623                    if let Some(FieldValue::Str(msg)) = fields.get("message") {
624                        msg.clone()
625                    } else {
626                        name.to_string()
627                    }
628                } else {
629                    format!("{}::{}", target, name)
630                };
631
632                let track = self.get_or_create_thread_track(thread_name);
633                self.write_instant(track, *timestamp, &display_name, fields);
634            }
635        }
636
637        Ok(())
638    }
639
640    fn flush(&mut self) -> Result<(), anyhow::Error> {
641        self.flush_interned_data();
642        self.write_pending_packets()?;
643        self.writer.flush()?;
644        Ok(())
645    }
646
647    fn name(&self) -> &str {
648        "PerfettoFileSink"
649    }
650
651    fn target_filter(&self) -> Option<&Targets> {
652        Some(&self.target_filter)
653    }
654}