Skip to main content

hyperactor_telemetry/sinks/
perfetto.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Perfetto sink that writes trace events directly to .pftrace files on disk.
10//!
11//! This provides an alternative to Scuba-based tracing that:
12//! - Has no row limits (vs 400K Scuba limit)
13//! - Has no ingestion latency (immediate file writes)
14//! - Uses native Perfetto protobuf format (no conversion needed)
15//! - Supports distributed file systems like OILFS
16//!
17//! ## Directory Layout
18//!
19//! ```text
20//! {trace_dir}/
21//! ├── executions/
22//! │   ├── {execution_id}/
23//! │   │   ├── {process_name}.pftrace
24//! │   │   └── ...
25//! │   └── latest -> {execution_id}/   # symlink to most recent
26//! ```
27//!
28//! ## Default Trace Directory
29//!
30//! If not specified, traces are written to `/tmp/{username}/monarch_traces/`
31
32use std::collections::HashMap;
33use std::fs;
34use std::fs::File;
35use std::io::BufWriter;
36use std::io::Write;
37use std::os::unix::fs::symlink;
38use std::path::Path;
39use std::path::PathBuf;
40use std::str::FromStr;
41use std::sync::atomic::AtomicU64;
42use std::sync::atomic::Ordering;
43use std::time::SystemTime;
44use std::time::UNIX_EPOCH;
45
46use anyhow::Result;
47use hyperactor_config::CONFIG;
48use hyperactor_config::ConfigAttr;
49use hyperactor_config::attrs::AttrValue;
50use hyperactor_config::attrs::declare_attrs;
51use hyperactor_config::typeuri::Named;
52use prost::Message;
53use serde::Deserialize;
54use serde::Serialize;
55use tracing_core::LevelFilter;
56use tracing_perfetto_sdk_schema::DebugAnnotation;
57use tracing_perfetto_sdk_schema::DebugAnnotationName;
58use tracing_perfetto_sdk_schema::EventName;
59use tracing_perfetto_sdk_schema::InternedData;
60use tracing_perfetto_sdk_schema::ProcessDescriptor;
61use tracing_perfetto_sdk_schema::ThreadDescriptor;
62use tracing_perfetto_sdk_schema::Trace;
63use tracing_perfetto_sdk_schema::TracePacket;
64use tracing_perfetto_sdk_schema::TrackDescriptor;
65use tracing_perfetto_sdk_schema::TrackEvent;
66use tracing_perfetto_sdk_schema::debug_annotation::NameField;
67use tracing_perfetto_sdk_schema::debug_annotation::Value as DbgValue;
68use tracing_perfetto_sdk_schema::trace_packet::Data;
69use tracing_perfetto_sdk_schema::trace_packet::OptionalTrustedPacketSequenceId;
70use tracing_perfetto_sdk_schema::track_descriptor::StaticOrDynamicName;
71use tracing_perfetto_sdk_schema::track_event::NameField as EventNameField;
72use tracing_perfetto_sdk_schema::track_event::Type as TrackEventType;
73use tracing_subscriber::filter::Targets;
74
75use crate::config::MONARCH_FILE_LOG_LEVEL;
76use crate::trace_dispatcher::FieldValue;
77use crate::trace_dispatcher::TraceEvent;
78use crate::trace_dispatcher::TraceEventSink;
79use crate::trace_dispatcher::TraceFields;
80use crate::trace_dispatcher::get_field;
81
82/// The target prefix for user-facing telemetry spans.
83pub const USER_TELEMETRY_PREFIX: &str = "monarch_hyperactor::telemetry";
84
85/// Dedicated target for endpoint spans. Spans with this target get their
86/// display name synthesized as `{method}.{span_name}`, where `span_name`
87/// is the adverb (e.g., "call", "call_one", "choose").
88pub const ENDPOINT_TELEMETRY_TARGET: &str = "monarch_hyperactor::telemetry::endpoint";
89const ACTOR_ID_FIELD: &str = "actor_id";
90
91/// Controls what events are captured in Perfetto traces.
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
93pub enum PerfettoTraceMode {
94    /// Tracing is disabled - no events are written.
95    Off,
96    /// Only user-facing telemetry events (target starts with `monarch_hyperactor::telemetry`).
97    #[default]
98    User,
99    /// All events (for debugging/development).
100    Dev,
101}
102
103impl std::fmt::Display for PerfettoTraceMode {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        match self {
106            PerfettoTraceMode::Off => write!(f, "off"),
107            PerfettoTraceMode::User => write!(f, "user"),
108            PerfettoTraceMode::Dev => write!(f, "dev"),
109        }
110    }
111}
112
113impl std::str::FromStr for PerfettoTraceMode {
114    type Err = anyhow::Error;
115
116    fn from_str(s: &str) -> Result<Self, Self::Err> {
117        match s.to_lowercase().as_str() {
118            "off" | "false" | "0" | "none" => Ok(PerfettoTraceMode::Off),
119            "dev" | "all" | "debug" => Ok(PerfettoTraceMode::Dev),
120            "user" | "true" | "1" => Ok(PerfettoTraceMode::User),
121            _ => Err(anyhow::anyhow!("unknown trace mode: {}", s)),
122        }
123    }
124}
125
126impl Named for PerfettoTraceMode {
127    fn typename() -> &'static str {
128        "hyperactor_telemetry::sinks::perfetto::PerfettoTraceMode"
129    }
130}
131
132impl AttrValue for PerfettoTraceMode {
133    fn display(&self) -> String {
134        self.to_string()
135    }
136
137    fn parse(s: &str) -> Result<Self, anyhow::Error> {
138        s.parse()
139    }
140}
141
142impl PerfettoTraceMode {
143    /// Returns true if the given target should be included in the trace.
144    pub fn should_include(&self, target: &str) -> bool {
145        match self {
146            PerfettoTraceMode::Off => false,
147            PerfettoTraceMode::User => target.starts_with(USER_TELEMETRY_PREFIX),
148            PerfettoTraceMode::Dev => true,
149        }
150    }
151}
152
153declare_attrs! {
154    /// Perfetto trace mode controlling which events are captured.
155    /// Valid values: "off", "user" (default), "dev"
156    /// - "off": Tracing is disabled
157    /// - "user": Only user-facing telemetry events (monarch_hyperactor::telemetry::*)
158    /// - "dev": All events (for debugging)
159    @meta(CONFIG = ConfigAttr::new(
160        Some("PERFETTO_TRACE_MODE".to_string()),
161        Some("perfetto_trace_mode".to_string()),
162    ))
163    pub attr PERFETTO_TRACE_MODE: PerfettoTraceMode = PerfettoTraceMode::User;
164}
165
166/// Environment variable to override the default trace directory.
167pub const MONARCH_TRACE_DIR_ENV: &str = "MONARCH_TRACE_DIR";
168
169/// Returns the default trace directory.
170///
171/// Uses `$MONARCH_TRACE_DIR` if set, otherwise `/tmp/{username}/monarch_traces/`.
172pub fn default_trace_dir() -> PathBuf {
173    if let Ok(dir) = std::env::var(MONARCH_TRACE_DIR_ENV) {
174        return PathBuf::from(dir);
175    }
176    let username = whoami::username();
177    PathBuf::from(format!("/tmp/{}/monarch_traces", username))
178}
179
180/// Metadata stored for each span, used when Enter/Exit events occur.
181struct SpanInfo {
182    /// Fully qualified name: {target}::{name}
183    fq_name: String,
184    fields: TraceFields,
185    file: Option<&'static str>,
186    line: Option<u32>,
187    prefer_actor_track: bool,
188}
189
190/// String interning for Perfetto trace compression.
191#[derive(Default)]
192struct InternedStrings {
193    next_iid: u64,
194    strings: HashMap<String, u64>,
195    pending: Vec<(String, u64)>,
196}
197
198impl InternedStrings {
199    fn intern(&mut self, s: &str) -> u64 {
200        if let Some(&iid) = self.strings.get(s) {
201            return iid;
202        }
203        self.next_iid += 1;
204        let iid = self.next_iid;
205        self.strings.insert(s.to_string(), iid);
206        self.pending.push((s.to_string(), iid));
207        iid
208    }
209
210    fn take_pending(&mut self) -> Vec<(String, u64)> {
211        std::mem::take(&mut self.pending)
212    }
213
214    fn has_pending(&self) -> bool {
215        !self.pending.is_empty()
216    }
217}
218
219/// File-based Perfetto sink that writes .pftrace files.
220pub struct PerfettoFileSink {
221    writer: BufWriter<File>,
222    pending_packets: Vec<TracePacket>,
223    next_track_id: AtomicU64,
224    /// Trusted packet sequence ID
225    sequence_id: u32,
226    event_names: InternedStrings,
227    annotation_names: InternedStrings,
228    span_info: HashMap<u64, SpanInfo>,
229    /// Maps logical track names (thread names or actor ids) to track ids.
230    named_tracks: HashMap<String, u64>,
231    /// Maps span id to the track it entered on, so that certain Exit/Close events
232    /// are emitted on the same track even when they fire from a different thread.
233    span_tracks: HashMap<u64, u64>,
234    /// track_id of this process
235    process_track: u64,
236    pid: i32,
237    process_name: String,
238    target_filter: Targets,
239    trace_mode: PerfettoTraceMode,
240}
241
242impl PerfettoFileSink {
243    /// Create a new Perfetto file sink.
244    ///
245    /// # Arguments
246    /// * `trace_dir` - Base directory for trace files (use `default_trace_dir()` for default)
247    /// * `execution_id` - Unique identifier for this execution/run
248    /// * `process_name` - Name of this process (used in directory layout)
249    ///
250    /// Creates the directory structure and updates the `latest` symlink.
251    pub fn new(
252        trace_dir: impl AsRef<Path>,
253        execution_id: &str,
254        process_name: &str,
255    ) -> Result<Self> {
256        let trace_dir = trace_dir.as_ref().to_path_buf();
257        let pid = std::process::id() as i32;
258
259        let executions_dir = trace_dir.join("executions");
260        let execution_dir = executions_dir.join(execution_id);
261        fs::create_dir_all(&execution_dir)?;
262
263        // Update the `latest` symlink
264        let latest_link = executions_dir.join("latest");
265        let _ = fs::remove_file(&latest_link);
266        if let Err(e) = symlink(execution_id, &latest_link) {
267            tracing::debug!("Failed to create latest symlink: {}", e);
268        }
269
270        let path = execution_dir.join(format!("{}.pftrace", process_name));
271        let file = File::create(&path)?;
272        let writer = BufWriter::new(file);
273
274        let sequence_id = SystemTime::now()
275            .duration_since(UNIX_EPOCH)
276            .unwrap()
277            .as_secs() as u32;
278
279        let mut sink = Self {
280            writer,
281            pending_packets: Vec::new(),
282            next_track_id: AtomicU64::new(1), // Start at 1, 0 is reserved
283            sequence_id,
284            event_names: InternedStrings::default(),
285            annotation_names: InternedStrings::default(),
286            span_info: HashMap::new(),
287            named_tracks: HashMap::new(),
288            span_tracks: HashMap::new(),
289            process_track: 0,
290            pid,
291            process_name: process_name.to_string(),
292            target_filter: Targets::new().with_default({
293                let log_level_str =
294                    hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
295                        .unwrap_or_else(|| "info".to_string());
296                let level =
297                    tracing::Level::from_str(&log_level_str).unwrap_or(tracing::Level::INFO);
298                LevelFilter::from_level(level)
299            }),
300            trace_mode: hyperactor_config::global::get(PERFETTO_TRACE_MODE),
301        };
302
303        sink.write_sequence_header();
304
305        sink.process_track = sink.create_process_track();
306
307        Ok(sink)
308    }
309
310    fn next_track_id(&self) -> u64 {
311        self.next_track_id.fetch_add(1, Ordering::Relaxed)
312    }
313
314    fn write_sequence_header(&mut self) {
315        let packet = TracePacket {
316            incremental_state_cleared: Some(true),
317            first_packet_on_sequence: Some(true),
318            sequence_flags: Some(3),
319            optional_trusted_packet_sequence_id: Some(
320                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
321            ),
322            ..Default::default()
323        };
324
325        self.write_packet(&packet);
326    }
327
328    fn create_process_track(&mut self) -> u64 {
329        let track_id = self.next_track_id();
330
331        let packet = TracePacket {
332            data: Some(Data::TrackDescriptor(TrackDescriptor {
333                uuid: Some(track_id),
334                process: Some(ProcessDescriptor {
335                    pid: Some(self.pid),
336                    process_name: Some(self.process_name.clone()),
337                    ..Default::default()
338                }),
339                ..Default::default()
340            })),
341            optional_trusted_packet_sequence_id: Some(
342                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
343            ),
344            ..Default::default()
345        };
346
347        self.write_packet(&packet);
348
349        track_id
350    }
351
352    fn get_or_create_named_track(&mut self, track_name: &str) -> u64 {
353        if let Some(&track) = self.named_tracks.get(track_name) {
354            return track;
355        }
356
357        let track_id = self.next_track_id();
358        let tid = self.named_tracks.len() as i32 + 1;
359
360        let packet = TracePacket {
361            data: Some(Data::TrackDescriptor(TrackDescriptor {
362                uuid: Some(track_id),
363                parent_uuid: Some(self.process_track),
364                static_or_dynamic_name: Some(StaticOrDynamicName::Name(track_name.to_string())),
365                thread: Some(ThreadDescriptor {
366                    pid: Some(self.pid),
367                    tid: Some(tid),
368                    thread_name: Some(track_name.to_string()),
369                    ..Default::default()
370                }),
371                ..Default::default()
372            })),
373            optional_trusted_packet_sequence_id: Some(
374                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
375            ),
376            ..Default::default()
377        };
378        self.write_packet(&packet);
379
380        self.named_tracks.insert(track_name.to_string(), track_id);
381
382        track_id
383    }
384
385    fn actor_track_name(fields: &TraceFields) -> Option<&str> {
386        match get_field(fields, ACTOR_ID_FIELD) {
387            Some(FieldValue::Str(actor_id) | FieldValue::Debug(actor_id))
388                if !actor_id.is_empty() =>
389            {
390                Some(Self::short_actor_track_name(actor_id))
391            }
392            _ => None,
393        }
394    }
395
396    fn short_actor_track_name(actor_id: &str) -> &str {
397        actor_id
398            .rsplit_once(',')
399            .map(|(_, actor_name_and_pid)| actor_name_and_pid)
400            .unwrap_or(actor_id)
401    }
402
403    fn preferred_track_name<'a>(
404        prefer_actor_track: bool,
405        fields: &'a TraceFields,
406        thread_name: &'a str,
407    ) -> &'a str {
408        if prefer_actor_track {
409            Self::actor_track_name(fields).unwrap_or(thread_name)
410        } else {
411            thread_name
412        }
413    }
414
415    fn write_packet(&mut self, packet: &TracePacket) {
416        self.pending_packets.push(packet.clone());
417    }
418
419    fn write_pending_packets(&mut self) -> Result<()> {
420        if self.pending_packets.is_empty() {
421            return Ok(());
422        }
423
424        let trace = Trace {
425            packet: std::mem::take(&mut self.pending_packets),
426        };
427
428        let bytes = trace.encode_to_vec();
429        self.writer.write_all(&bytes)?;
430        Ok(())
431    }
432
433    fn flush_interned_data(&mut self) {
434        if !self.event_names.has_pending() && !self.annotation_names.has_pending() {
435            return;
436        }
437
438        let mut interned_data = InternedData::default();
439
440        for (name, iid) in self.event_names.take_pending() {
441            interned_data.event_names.push(EventName {
442                iid: Some(iid),
443                name: Some(name),
444                ..Default::default()
445            });
446        }
447
448        for (name, iid) in self.annotation_names.take_pending() {
449            interned_data
450                .debug_annotation_names
451                .push(DebugAnnotationName {
452                    iid: Some(iid),
453                    name: Some(name),
454                    ..Default::default()
455                });
456        }
457
458        let packet = TracePacket {
459            interned_data: Some(interned_data),
460            optional_trusted_packet_sequence_id: Some(
461                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
462            ),
463            sequence_flags: Some(2), // SEQ_NEEDS_INCREMENTAL_STATE
464            ..Default::default()
465        };
466        self.write_packet(&packet);
467    }
468
469    fn timestamp_ns(ts: SystemTime) -> u64 {
470        ts.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as u64
471    }
472
473    fn field_to_debug_annotation(&mut self, key: &str, value: &FieldValue) -> DebugAnnotation {
474        let name_iid = self.annotation_names.intern(key);
475        let dbg_value = match value {
476            FieldValue::Bool(b) => Some(DbgValue::BoolValue(*b)),
477            FieldValue::I64(i) => Some(DbgValue::IntValue(*i)),
478            FieldValue::U64(u) => Some(DbgValue::IntValue(*u as i64)),
479            FieldValue::F64(f) => Some(DbgValue::DoubleValue(*f)),
480            FieldValue::Str(s) => Some(DbgValue::StringValue(s.clone())),
481            FieldValue::Debug(d) => Some(DbgValue::StringValue(d.clone())),
482        };
483
484        DebugAnnotation {
485            name_field: Some(NameField::NameIid(name_iid)),
486            value: dbg_value,
487            ..Default::default()
488        }
489    }
490
491    fn write_slice_begin(
492        &mut self,
493        track: u64,
494        timestamp: SystemTime,
495        name: &str,
496        fields: &TraceFields,
497        file: Option<&str>,
498        line: Option<u32>,
499    ) {
500        self.flush_interned_data();
501
502        let name_iid = self.event_names.intern(name);
503
504        let mut debug_annotations = Vec::new();
505        for (key, value) in fields {
506            debug_annotations.push(self.field_to_debug_annotation(key, value));
507        }
508        if let Some(f) = file {
509            debug_annotations
510                .push(self.field_to_debug_annotation("file", &FieldValue::Str(f.to_string())));
511        }
512        if let Some(l) = line {
513            debug_annotations
514                .push(self.field_to_debug_annotation("line", &FieldValue::U64(l as u64)));
515        }
516
517        self.flush_interned_data();
518
519        let packet = TracePacket {
520            timestamp: Some(Self::timestamp_ns(timestamp)),
521            data: Some(Data::TrackEvent(TrackEvent {
522                track_uuid: Some(track),
523                r#type: Some(TrackEventType::SliceBegin as i32),
524                name_field: Some(EventNameField::NameIid(name_iid)),
525                debug_annotations,
526                ..Default::default()
527            })),
528            optional_trusted_packet_sequence_id: Some(
529                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
530            ),
531            sequence_flags: Some(2),
532            ..Default::default()
533        };
534        self.write_packet(&packet);
535    }
536
537    fn write_slice_end(&mut self, track: u64, timestamp: SystemTime) {
538        let packet = TracePacket {
539            timestamp: Some(Self::timestamp_ns(timestamp)),
540            data: Some(Data::TrackEvent(TrackEvent {
541                track_uuid: Some(track),
542                r#type: Some(TrackEventType::SliceEnd as i32),
543                ..Default::default()
544            })),
545            optional_trusted_packet_sequence_id: Some(
546                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
547            ),
548            sequence_flags: Some(2),
549            ..Default::default()
550        };
551        self.write_packet(&packet);
552    }
553
554    fn write_instant(
555        &mut self,
556        track: u64,
557        timestamp: SystemTime,
558        name: &str,
559        fields: &TraceFields,
560    ) {
561        self.flush_interned_data();
562
563        let name_iid = self.event_names.intern(name);
564
565        let mut debug_annotations = Vec::new();
566        for (key, value) in fields {
567            debug_annotations.push(self.field_to_debug_annotation(key, value));
568        }
569
570        self.flush_interned_data();
571
572        let packet = TracePacket {
573            timestamp: Some(Self::timestamp_ns(timestamp)),
574            data: Some(Data::TrackEvent(TrackEvent {
575                track_uuid: Some(track),
576                r#type: Some(TrackEventType::Instant as i32),
577                name_field: Some(EventNameField::NameIid(name_iid)),
578                debug_annotations,
579                ..Default::default()
580            })),
581            optional_trusted_packet_sequence_id: Some(
582                OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(self.sequence_id),
583            ),
584            sequence_flags: Some(2),
585            ..Default::default()
586        };
587        self.write_packet(&packet);
588    }
589}
590
591impl TraceEventSink for PerfettoFileSink {
592    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
593        match event {
594            TraceEvent::NewSpan {
595                id,
596                name,
597                target,
598                fields,
599                file,
600                line,
601                ..
602            } => {
603                if !self.trace_mode.should_include(target) {
604                    return Ok(());
605                }
606
607                // In user mode, synthesize a friendly display name.
608                // - endpoint target (ActorEndpoint): "{mesh}.{method}.{span_name}"
609                // - endpoint target (Remote):       "{call_name}.{span_name}"
610                // - generic telemetry target: "name" field, else static span name
611                // In dev mode, use the fully qualified name.
612                let display_name = if *target == ENDPOINT_TELEMETRY_TARGET {
613                    let mesh = match get_field(fields, "mesh") {
614                        Some(FieldValue::Str(m)) => Some(m.as_str()),
615                        _ => None,
616                    };
617                    let method = match get_field(fields, "method") {
618                        Some(FieldValue::Str(m)) => Some(m.as_str()),
619                        _ => None,
620                    };
621                    let call_name = match get_field(fields, "call_name") {
622                        Some(FieldValue::Str(c)) if !c.is_empty() => Some(c.as_str()),
623                        _ => None,
624                    };
625                    match (mesh, method, call_name) {
626                        (Some(mesh), Some(method), _) => format!("{}.{}.{}()", mesh, method, name),
627                        (_, _, Some(call_name)) => format!("{}.{}()", call_name, name),
628                        _ => name.to_string(),
629                    }
630                } else if *target == USER_TELEMETRY_PREFIX {
631                    match get_field(fields, "name") {
632                        Some(FieldValue::Str(n)) => n.clone(),
633                        _ => name.to_string(),
634                    }
635                } else {
636                    format!("{}::{}", target, name)
637                };
638
639                self.span_info.insert(
640                    *id,
641                    SpanInfo {
642                        fq_name: display_name,
643                        fields: fields.clone(),
644                        file: *file,
645                        line: *line,
646                        prefer_actor_track: target.starts_with(USER_TELEMETRY_PREFIX),
647                    },
648                );
649            }
650
651            TraceEvent::SpanEnter {
652                id,
653                timestamp,
654                thread_name,
655            } => {
656                if let Some(info) = self.span_info.get(id) {
657                    let fq_name = info.fq_name.clone();
658                    let fields = info.fields.clone();
659                    let file = info.file;
660                    let line = info.line;
661                    let prefer_actor_track = info.prefer_actor_track;
662                    let track_name =
663                        Self::preferred_track_name(prefer_actor_track, &fields, thread_name)
664                            .to_string();
665                    let track = self.get_or_create_named_track(&track_name);
666                    self.span_tracks.insert(*id, track);
667                    self.write_slice_begin(track, *timestamp, &fq_name, &fields, file, line);
668                }
669            }
670
671            TraceEvent::SpanExit {
672                id,
673                timestamp,
674                thread_name,
675            } => {
676                if let Some(info) = self.span_info.get(id) {
677                    let fields = info.fields.clone();
678                    let prefer_actor_track = info.prefer_actor_track;
679                    let track_name =
680                        Self::preferred_track_name(prefer_actor_track, &fields, thread_name)
681                            .to_string();
682                    let track = self
683                        .span_tracks
684                        .remove(id)
685                        .unwrap_or_else(|| self.get_or_create_named_track(&track_name));
686                    self.write_slice_end(track, *timestamp);
687                }
688            }
689
690            TraceEvent::SpanClose { id, .. } => {
691                self.span_info.remove(id);
692                self.span_tracks.remove(id);
693            }
694
695            TraceEvent::Event {
696                name,
697                target,
698                fields,
699                timestamp,
700                thread_name,
701                ..
702            } => {
703                if !self.trace_mode.should_include(target) {
704                    return Ok(());
705                }
706
707                // In user mode, prefer the "message" field if present for display
708                // In dev mode, use the fully qualified name
709                let display_name = if self.trace_mode == PerfettoTraceMode::User {
710                    if let Some(FieldValue::Str(msg)) = get_field(fields, "message") {
711                        msg.clone()
712                    } else {
713                        name.to_string()
714                    }
715                } else {
716                    format!("{}::{}", target, name)
717                };
718
719                let track = self.get_or_create_named_track(Self::preferred_track_name(
720                    target.starts_with(USER_TELEMETRY_PREFIX),
721                    fields,
722                    thread_name,
723                ));
724                self.write_instant(track, *timestamp, &display_name, fields);
725            }
726        }
727
728        Ok(())
729    }
730
731    fn flush(&mut self) -> Result<(), anyhow::Error> {
732        self.flush_interned_data();
733        self.write_pending_packets()?;
734        self.writer.flush()?;
735        Ok(())
736    }
737
738    fn name(&self) -> &str {
739        "PerfettoFileSink"
740    }
741
742    fn target_filter(&self) -> Option<&Targets> {
743        Some(&self.target_filter)
744    }
745}