hyperactor_telemetry/sinks/
perfetto.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Perfetto sink that writes trace events directly to .pftrace files on disk.
10//!
11//! This provides an alternative to Scuba-based tracing that:
12//! - Has no row limits (vs 400K Scuba limit)
13//! - Has no ingestion latency (immediate file writes)
14//! - Uses native Perfetto protobuf format (no conversion needed)
15//! - Supports distributed file systems like OILFS
16//!
17//! ## Directory Layout
18//!
19//! ```text
20//! {trace_dir}/
21//! ├── executions/
22//! │   ├── {execution_id}/
23//! │   │   ├── {process_name}.pftrace
24//! │   │   └── ...
25//! │   └── latest -> {execution_id}/   # symlink to most recent
26//! ```
27//!
28//! ## Default Trace Directory
29//!
30//! If not specified, traces are written to `/tmp/{username}/monarch_traces/`
31
32use std::collections::HashMap;
33use std::fs;
34use std::fs::File;
35use std::io::BufWriter;
36use std::io::Write;
37use std::os::unix::fs::symlink;
38use std::path::Path;
39use std::path::PathBuf;
40use std::str::FromStr;
41use std::sync::atomic::AtomicU64;
42use std::sync::atomic::Ordering;
43use std::time::SystemTime;
44use std::time::UNIX_EPOCH;
45
46use anyhow::Result;
47use hyperactor_config::CONFIG;
48use hyperactor_config::ConfigAttr;
49use hyperactor_config::attrs::AttrValue;
50use hyperactor_config::attrs::declare_attrs;
51use hyperactor_config::typeuri::Named;
52use prost::Message;
53use serde::Deserialize;
54use serde::Serialize;
55use tracing_core::LevelFilter;
56use tracing_perfetto_sdk_schema::DebugAnnotation;
57use tracing_perfetto_sdk_schema::DebugAnnotationName;
58use tracing_perfetto_sdk_schema::EventName;
59use tracing_perfetto_sdk_schema::InternedData;
60use tracing_perfetto_sdk_schema::ProcessDescriptor;
61use tracing_perfetto_sdk_schema::ThreadDescriptor;
62use tracing_perfetto_sdk_schema::Trace;
63use tracing_perfetto_sdk_schema::TracePacket;
64use tracing_perfetto_sdk_schema::TrackDescriptor;
65use tracing_perfetto_sdk_schema::TrackEvent;
66use tracing_perfetto_sdk_schema::debug_annotation::NameField;
67use tracing_perfetto_sdk_schema::debug_annotation::Value as DbgValue;
68use tracing_perfetto_sdk_schema::trace_packet::Data;
69use tracing_perfetto_sdk_schema::trace_packet::OptionalTrustedPacketSequenceId;
70use tracing_perfetto_sdk_schema::track_descriptor::StaticOrDynamicName;
71use tracing_perfetto_sdk_schema::track_event::NameField as EventNameField;
72use tracing_perfetto_sdk_schema::track_event::Type as TrackEventType;
73use tracing_subscriber::filter::Targets;
74
75use crate::config::MONARCH_FILE_LOG_LEVEL;
76use crate::trace_dispatcher::FieldValue;
77use crate::trace_dispatcher::TraceEvent;
78use crate::trace_dispatcher::TraceEventSink;
79use crate::trace_dispatcher::TraceFields;
80use crate::trace_dispatcher::get_field;
81
82/// The target prefix for user-facing telemetry spans.
83pub const USER_TELEMETRY_PREFIX: &str = "monarch_hyperactor::telemetry";
84
85/// Controls what events are captured in Perfetto traces.
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
87pub enum PerfettoTraceMode {
88    /// Tracing is disabled - no events are written.
89    Off,
90    /// Only user-facing telemetry events (target starts with `monarch_hyperactor::telemetry`).
91    #[default]
92    User,
93    /// All events (for debugging/development).
94    Dev,
95}
96
97impl std::fmt::Display for PerfettoTraceMode {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        match self {
100            PerfettoTraceMode::Off => write!(f, "off"),
101            PerfettoTraceMode::User => write!(f, "user"),
102            PerfettoTraceMode::Dev => write!(f, "dev"),
103        }
104    }
105}
106
107impl std::str::FromStr for PerfettoTraceMode {
108    type Err = anyhow::Error;
109
110    fn from_str(s: &str) -> Result<Self, Self::Err> {
111        match s.to_lowercase().as_str() {
112            "off" | "false" | "0" | "none" => Ok(PerfettoTraceMode::Off),
113            "dev" | "all" | "debug" => Ok(PerfettoTraceMode::Dev),
114            "user" | "true" | "1" => Ok(PerfettoTraceMode::User),
115            _ => Err(anyhow::anyhow!("unknown trace mode: {}", s)),
116        }
117    }
118}
119
120impl Named for PerfettoTraceMode {
121    fn typename() -> &'static str {
122        "hyperactor_telemetry::sinks::perfetto::PerfettoTraceMode"
123    }
124}
125
126impl AttrValue for PerfettoTraceMode {
127    fn display(&self) -> String {
128        self.to_string()
129    }
130
131    fn parse(s: &str) -> Result<Self, anyhow::Error> {
132        s.parse()
133    }
134}
135
136impl PerfettoTraceMode {
137    /// Returns true if the given target should be included in the trace.
138    pub fn should_include(&self, target: &str) -> bool {
139        match self {
140            PerfettoTraceMode::Off => false,
141            PerfettoTraceMode::User => target.starts_with(USER_TELEMETRY_PREFIX),
142            PerfettoTraceMode::Dev => true,
143        }
144    }
145}
146
147declare_attrs! {
148    /// Perfetto trace mode controlling which events are captured.
149    /// Valid values: "off", "user" (default), "dev"
150    /// - "off": Tracing is disabled
151    /// - "user": Only user-facing telemetry events (monarch_hyperactor::telemetry::*)
152    /// - "dev": All events (for debugging)
153    @meta(CONFIG = ConfigAttr::new(
154        Some("PERFETTO_TRACE_MODE".to_string()),
155        Some("perfetto_trace_mode".to_string()),
156    ))
157    pub attr PERFETTO_TRACE_MODE: PerfettoTraceMode = PerfettoTraceMode::User;
158}
159
160/// Environment variable to override the default trace directory.
161pub const MONARCH_TRACE_DIR_ENV: &str = "MONARCH_TRACE_DIR";
162
163/// Returns the default trace directory.
164///
165/// Uses `$MONARCH_TRACE_DIR` if set, otherwise `/tmp/{username}/monarch_traces/`.
166pub fn default_trace_dir() -> PathBuf {
167    if let Ok(dir) = std::env::var(MONARCH_TRACE_DIR_ENV) {
168        return PathBuf::from(dir);
169    }
170    let username = whoami::username();
171    PathBuf::from(format!("/tmp/{}/monarch_traces", username))
172}
173
174/// Metadata stored for each span, used when Enter/Exit events occur.
175struct SpanInfo {
176    /// Fully qualified name: {target}::{name}
177    fq_name: String,
178    fields: TraceFields,
179    file: Option<&'static str>,
180    line: Option<u32>,
181}
182
183/// String interning for Perfetto trace compression.
184#[derive(Default)]
185struct InternedStrings {
186    next_iid: u64,
187    strings: HashMap<String, u64>,
188    pending: Vec<(String, u64)>,
189}
190
191impl InternedStrings {
192    fn intern(&mut self, s: &str) -> u64 {
193        if let Some(&iid) = self.strings.get(s) {
194            return iid;
195        }
196        self.next_iid += 1;
197        let iid = self.next_iid;
198        self.strings.insert(s.to_string(), iid);
199        self.pending.push((s.to_string(), iid));
200        iid
201    }
202
203    fn take_pending(&mut self) -> Vec<(String, u64)> {
204        std::mem::take(&mut self.pending)
205    }
206
207    fn has_pending(&self) -> bool {
208        !self.pending.is_empty()
209    }
210}
211
212/// File-based Perfetto sink that writes .pftrace files.
213pub struct PerfettoFileSink {
214    writer: BufWriter<File>,
215    pending_packets: Vec<TracePacket>,
216    next_track_id: AtomicU64,
217    /// Trusted packet sequence ID
218    sequence_id: u32,
219    event_names: InternedStrings,
220    annotation_names: InternedStrings,
221    span_info: HashMap<u64, SpanInfo>,
222    /// Maps thread names to track ids
223    thread_tracks: HashMap<String, u64>,
224    /// track_id of this process
225    process_track: u64,
226    pid: i32,
227    process_name: String,
228    target_filter: Targets,
229    trace_mode: PerfettoTraceMode,
230}
231
232impl PerfettoFileSink {
233    /// Create a new Perfetto file sink.
234    ///
235    /// # Arguments
236    /// * `trace_dir` - Base directory for trace files (use `default_trace_dir()` for default)
237    /// * `execution_id` - Unique identifier for this execution/run
238    /// * `process_name` - Name of this process (used in directory layout)
239    ///
240    /// Creates the directory structure and updates the `latest` symlink.
241    pub fn new(
242        trace_dir: impl AsRef<Path>,
243        execution_id: &str,
244        process_name: &str,
245    ) -> Result<Self> {
246        let trace_dir = trace_dir.as_ref().to_path_buf();
247        let pid = std::process::id() as i32;
248
249        let executions_dir = trace_dir.join("executions");
250        let execution_dir = executions_dir.join(execution_id);
251        fs::create_dir_all(&execution_dir)?;
252
253        // Update the `latest` symlink
254        let latest_link = executions_dir.join("latest");
255        let _ = fs::remove_file(&latest_link);
256        if let Err(e) = symlink(execution_id, &latest_link) {
257            tracing::debug!("Failed to create latest symlink: {}", e);
258        }
259
260        let path = execution_dir.join(format!("{}.pftrace", process_name));
261        let file = File::create(&path)?;
262        let writer = BufWriter::new(file);
263
264        let sequence_id = SystemTime::now()
265            .duration_since(UNIX_EPOCH)
266            .unwrap()
267            .as_secs() as u32;
268
269        let mut sink = Self {
270            writer,
271            pending_packets: Vec::new(),
272            next_track_id: AtomicU64::new(1), // Start at 1, 0 is reserved
273            sequence_id,
274            event_names: InternedStrings::default(),
275            annotation_names: InternedStrings::default(),
276            span_info: HashMap::new(),
277            thread_tracks: HashMap::new(),
278            process_track: 0,
279            pid,
280            process_name: process_name.to_string(),
281            target_filter: Targets::new().with_default({
282                let log_level_str =
283                    hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
284                        .unwrap_or_else(|| "info".to_string());
285                let level =
286                    tracing::Level::from_str(&log_level_str).unwrap_or(tracing::Level::INFO);
287                LevelFilter::from_level(level)
288            }),
289            trace_mode: hyperactor_config::global::get(PERFETTO_TRACE_MODE),
290        };
291
292        sink.write_sequence_header();
293
294        sink.process_track = sink.create_process_track();
295
296        Ok(sink)
297    }
298
299    fn next_track_id(&self) -> u64 {
300        self.next_track_id.fetch_add(1, Ordering::Relaxed)
301    }
302
303    fn write_sequence_header(&mut self) {
304        let packet = TracePacket {
305            incremental_state_cleared: Some(true),
306            first_packet_on_sequence: Some(true),
307            sequence_flags: Some(3),
308            optional_trusted_packet_sequence_id: Some(
309                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
310            ),
311            ..Default::default()
312        };
313
314        self.write_packet(&packet);
315    }
316
317    fn create_process_track(&mut self) -> u64 {
318        let track_id = self.next_track_id();
319
320        let packet = TracePacket {
321            data: Some(Data::TrackDescriptor(TrackDescriptor {
322                uuid: Some(track_id),
323                process: Some(ProcessDescriptor {
324                    pid: Some(self.pid),
325                    process_name: Some(self.process_name.clone()),
326                    ..Default::default()
327                }),
328                ..Default::default()
329            })),
330            optional_trusted_packet_sequence_id: Some(
331                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
332            ),
333            ..Default::default()
334        };
335
336        self.write_packet(&packet);
337
338        track_id
339    }
340
341    fn get_or_create_thread_track(&mut self, thread_name: &str) -> u64 {
342        if let Some(&track) = self.thread_tracks.get(thread_name) {
343            return track;
344        }
345
346        let track_id = self.next_track_id();
347        let tid = self.thread_tracks.len() as i32 + 1;
348
349        let packet = TracePacket {
350            data: Some(Data::TrackDescriptor(TrackDescriptor {
351                uuid: Some(track_id),
352                parent_uuid: Some(self.process_track),
353                static_or_dynamic_name: Some(StaticOrDynamicName::Name(thread_name.to_string())),
354                thread: Some(ThreadDescriptor {
355                    pid: Some(self.pid),
356                    tid: Some(tid),
357                    thread_name: Some(thread_name.to_string()),
358                    ..Default::default()
359                }),
360                ..Default::default()
361            })),
362            optional_trusted_packet_sequence_id: Some(
363                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
364            ),
365            ..Default::default()
366        };
367        self.write_packet(&packet);
368
369        self.thread_tracks.insert(thread_name.to_string(), track_id);
370
371        track_id
372    }
373
374    fn write_packet(&mut self, packet: &TracePacket) {
375        self.pending_packets.push(packet.clone());
376    }
377
378    fn write_pending_packets(&mut self) -> Result<()> {
379        if self.pending_packets.is_empty() {
380            return Ok(());
381        }
382
383        let trace = Trace {
384            packet: std::mem::take(&mut self.pending_packets),
385        };
386
387        let bytes = trace.encode_to_vec();
388        self.writer.write_all(&bytes)?;
389        Ok(())
390    }
391
392    fn flush_interned_data(&mut self) {
393        if !self.event_names.has_pending() && !self.annotation_names.has_pending() {
394            return;
395        }
396
397        let mut interned_data = InternedData::default();
398
399        for (name, iid) in self.event_names.take_pending() {
400            interned_data.event_names.push(EventName {
401                iid: Some(iid),
402                name: Some(name),
403                ..Default::default()
404            });
405        }
406
407        for (name, iid) in self.annotation_names.take_pending() {
408            interned_data
409                .debug_annotation_names
410                .push(DebugAnnotationName {
411                    iid: Some(iid),
412                    name: Some(name),
413                    ..Default::default()
414                });
415        }
416
417        let packet = TracePacket {
418            interned_data: Some(interned_data),
419            optional_trusted_packet_sequence_id: Some(
420                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
421            ),
422            sequence_flags: Some(2), // SEQ_NEEDS_INCREMENTAL_STATE
423            ..Default::default()
424        };
425        self.write_packet(&packet);
426    }
427
428    fn timestamp_ns(ts: SystemTime) -> u64 {
429        ts.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as u64
430    }
431
432    fn field_to_debug_annotation(&mut self, key: &str, value: &FieldValue) -> DebugAnnotation {
433        let name_iid = self.annotation_names.intern(key);
434        let dbg_value = match value {
435            FieldValue::Bool(b) => Some(DbgValue::BoolValue(*b)),
436            FieldValue::I64(i) => Some(DbgValue::IntValue(*i)),
437            FieldValue::U64(u) => Some(DbgValue::IntValue(*u as i64)),
438            FieldValue::F64(f) => Some(DbgValue::DoubleValue(*f)),
439            FieldValue::Str(s) => Some(DbgValue::StringValue(s.clone())),
440            FieldValue::Debug(d) => Some(DbgValue::StringValue(d.clone())),
441        };
442
443        DebugAnnotation {
444            name_field: Some(NameField::NameIid(name_iid)),
445            value: dbg_value,
446            ..Default::default()
447        }
448    }
449
450    fn write_slice_begin(
451        &mut self,
452        track: u64,
453        timestamp: SystemTime,
454        name: &str,
455        fields: &TraceFields,
456        file: Option<&str>,
457        line: Option<u32>,
458    ) {
459        self.flush_interned_data();
460
461        let name_iid = self.event_names.intern(name);
462
463        let mut debug_annotations = Vec::new();
464        for (key, value) in fields {
465            debug_annotations.push(self.field_to_debug_annotation(key, value));
466        }
467        if let Some(f) = file {
468            debug_annotations
469                .push(self.field_to_debug_annotation("file", &FieldValue::Str(f.to_string())));
470        }
471        if let Some(l) = line {
472            debug_annotations
473                .push(self.field_to_debug_annotation("line", &FieldValue::U64(l as u64)));
474        }
475
476        self.flush_interned_data();
477
478        let packet = TracePacket {
479            timestamp: Some(Self::timestamp_ns(timestamp)),
480            data: Some(Data::TrackEvent(TrackEvent {
481                track_uuid: Some(track),
482                r#type: Some(TrackEventType::SliceBegin as i32),
483                name_field: Some(EventNameField::NameIid(name_iid)),
484                debug_annotations,
485                ..Default::default()
486            })),
487            optional_trusted_packet_sequence_id: Some(
488                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
489            ),
490            sequence_flags: Some(2),
491            ..Default::default()
492        };
493        self.write_packet(&packet);
494    }
495
496    fn write_slice_end(&mut self, track: u64, timestamp: SystemTime) {
497        let packet = TracePacket {
498            timestamp: Some(Self::timestamp_ns(timestamp)),
499            data: Some(Data::TrackEvent(TrackEvent {
500                track_uuid: Some(track),
501                r#type: Some(TrackEventType::SliceEnd as i32),
502                ..Default::default()
503            })),
504            optional_trusted_packet_sequence_id: Some(
505                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
506            ),
507            sequence_flags: Some(2),
508            ..Default::default()
509        };
510        self.write_packet(&packet);
511    }
512
513    fn write_instant(
514        &mut self,
515        track: u64,
516        timestamp: SystemTime,
517        name: &str,
518        fields: &TraceFields,
519    ) {
520        self.flush_interned_data();
521
522        let name_iid = self.event_names.intern(name);
523
524        let mut debug_annotations = Vec::new();
525        for (key, value) in fields {
526            debug_annotations.push(self.field_to_debug_annotation(key, value));
527        }
528
529        self.flush_interned_data();
530
531        let packet = TracePacket {
532            timestamp: Some(Self::timestamp_ns(timestamp)),
533            data: Some(Data::TrackEvent(TrackEvent {
534                track_uuid: Some(track),
535                r#type: Some(TrackEventType::Instant as i32),
536                name_field: Some(EventNameField::NameIid(name_iid)),
537                debug_annotations,
538                ..Default::default()
539            })),
540            optional_trusted_packet_sequence_id: Some(
541                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
542            ),
543            sequence_flags: Some(2),
544            ..Default::default()
545        };
546        self.write_packet(&packet);
547    }
548}
549
550impl TraceEventSink for PerfettoFileSink {
551    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
552        match event {
553            TraceEvent::NewSpan {
554                id,
555                name,
556                target,
557                fields,
558                file,
559                line,
560                ..
561            } => {
562                if !self.trace_mode.should_include(target) {
563                    return Ok(());
564                }
565
566                // In user mode, prefer the "name" field if present for display
567                // In dev mode, use the fully qualified name
568                let display_name = if *target == USER_TELEMETRY_PREFIX {
569                    if let Some(FieldValue::Str(n)) = get_field(fields, "name") {
570                        n.clone()
571                    } else {
572                        name.to_string()
573                    }
574                } else {
575                    format!("{}::{}", target, name)
576                };
577
578                self.span_info.insert(
579                    *id,
580                    SpanInfo {
581                        fq_name: display_name,
582                        fields: fields.clone(),
583                        file: *file,
584                        line: *line,
585                    },
586                );
587            }
588
589            TraceEvent::SpanEnter {
590                id,
591                timestamp,
592                thread_name,
593            } => {
594                if let Some(info) = self.span_info.get(id) {
595                    let fq_name = info.fq_name.clone();
596                    let fields = info.fields.clone();
597                    let file = info.file;
598                    let line = info.line;
599                    let track = self.get_or_create_thread_track(thread_name);
600                    self.write_slice_begin(track, *timestamp, &fq_name, &fields, file, line);
601                }
602            }
603
604            TraceEvent::SpanExit {
605                id,
606                timestamp,
607                thread_name,
608            } => {
609                if self.span_info.contains_key(id) {
610                    let track = self.get_or_create_thread_track(thread_name);
611                    self.write_slice_end(track, *timestamp);
612                }
613            }
614
615            TraceEvent::SpanClose { id, .. } => {
616                self.span_info.remove(id);
617            }
618
619            TraceEvent::Event {
620                name,
621                target,
622                fields,
623                timestamp,
624                thread_name,
625                ..
626            } => {
627                if !self.trace_mode.should_include(target) {
628                    return Ok(());
629                }
630
631                // In user mode, prefer the "message" field if present for display
632                // In dev mode, use the fully qualified name
633                let display_name = if self.trace_mode == PerfettoTraceMode::User {
634                    if let Some(FieldValue::Str(msg)) = get_field(fields, "message") {
635                        msg.clone()
636                    } else {
637                        name.to_string()
638                    }
639                } else {
640                    format!("{}::{}", target, name)
641                };
642
643                let track = self.get_or_create_thread_track(thread_name);
644                self.write_instant(track, *timestamp, &display_name, fields);
645            }
646        }
647
648        Ok(())
649    }
650
651    fn flush(&mut self) -> Result<(), anyhow::Error> {
652        self.flush_interned_data();
653        self.write_pending_packets()?;
654        self.writer.flush()?;
655        Ok(())
656    }
657
658    fn name(&self) -> &str {
659        "PerfettoFileSink"
660    }
661
662    fn target_filter(&self) -> Option<&Targets> {
663        Some(&self.target_filter)
664    }
665}