hyperactor_telemetry/
trace_dispatcher.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Unified telemetry layer that captures trace events once and fans out to multiple exporters
10//! on a background thread, eliminating redundant capture and moving work off the application
11//! thread.
12
13use std::sync::Arc;
14use std::sync::atomic::AtomicU64;
15use std::sync::atomic::Ordering;
16use std::sync::mpsc;
17use std::thread::JoinHandle;
18use std::time::Duration;
19use std::time::SystemTime;
20
21use indexmap::IndexMap;
22use tracing::Id;
23use tracing::Subscriber;
24use tracing_subscriber::filter::Targets;
25use tracing_subscriber::layer::Context;
26use tracing_subscriber::layer::Layer;
27use tracing_subscriber::registry::LookupSpan;
28
29const QUEUE_CAPACITY: usize = 100_000;
30
31/// Unified representation of a trace event captured from the tracing layer.
32/// This is captured once on the application thread, then sent to the background
33/// worker for fan-out to multiple exporters.
34#[derive(Debug, Clone)]
35pub(crate) enum TraceEvent {
36    /// A new span was created (on_new_span)
37    NewSpan {
38        id: u64,
39        name: &'static str,
40        target: &'static str,
41        level: tracing::Level,
42        fields: IndexMap<String, FieldValue>,
43        timestamp: SystemTime,
44        parent_id: Option<u64>,
45        thread_name: String,
46        file: Option<&'static str>,
47        line: Option<u32>,
48    },
49    /// A span was entered (on_enter)
50    SpanEnter { id: u64, timestamp: SystemTime },
51    /// A span was exited (on_exit)
52    SpanExit { id: u64, timestamp: SystemTime },
53    /// A span was closed (dropped)
54    SpanClose { id: u64, timestamp: SystemTime },
55    /// A tracing event occurred (e.g., tracing::info!())
56    Event {
57        name: &'static str,
58        target: &'static str,
59        level: tracing::Level,
60        fields: IndexMap<String, FieldValue>,
61        timestamp: SystemTime,
62        parent_span: Option<u64>,
63        thread_id: String,
64        thread_name: String,
65        module_path: Option<&'static str>,
66        file: Option<&'static str>,
67        line: Option<u32>,
68    },
69}
70
71/// Simplified field value representation for trace events
72#[derive(Debug, Clone)]
73pub(crate) enum FieldValue {
74    Bool(bool),
75    I64(i64),
76    U64(u64),
77    F64(f64),
78    Str(String),
79    Debug(String),
80}
81
82/// Trait for sinks that receive trace events from the dispatcher.
83/// Implementations run on the background worker thread and can perform
84/// expensive I/O operations without blocking the application.
85pub(crate) trait TraceEventSink: Send + 'static {
86    /// Consume a single event. Called on background thread.
87    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error>;
88
89    /// Optional target/level filter for this sink.
90    ///
91    /// The worker loop automatically applies this filter before calling `consume()`,
92    /// so sinks don't need to check target/level in their consume implementation.
93    /// Only `NewSpan` and `Event` are filtered by target/level; other event types
94    /// are always passed through.
95    ///
96    /// # Returns
97    /// - `None` - No filtering, all events are consumed (default)
98    /// - `Some(Targets)` - Only consume events matching the target/level filter
99    ///
100    /// # Example
101    /// ```ignore
102    /// fn target_filter(&self) -> Option<&Targets> {
103    ///     Some(Targets::new()
104    ///         .with_target("opentelemetry", LevelFilter::OFF)
105    ///         .with_default(LevelFilter::DEBUG))
106    /// }
107    /// ```
108    fn target_filter(&self) -> Option<&Targets> {
109        None
110    }
111
112    /// Flush any buffered events to the backend.
113    /// Called periodically and on shutdown.
114    fn flush(&mut self) -> Result<(), anyhow::Error>;
115
116    /// Optional: return name for debugging/logging
117    fn name(&self) -> &str {
118        std::any::type_name::<Self>()
119    }
120}
121
122/// The trace event dispatcher that captures events once and dispatches to multiple sinks
123/// on a background thread.
124pub struct TraceEventDispatcher {
125    sender: Option<mpsc::SyncSender<TraceEvent>>,
126    /// Separate channel so we are always notified of when the main queue is full and events are being dropped.
127    dropped_sender: Option<mpsc::Sender<TraceEvent>>,
128    _worker_handle: WorkerHandle,
129    max_level: Option<tracing::level_filters::LevelFilter>,
130    dropped_events: Arc<AtomicU64>,
131}
132
133struct WorkerHandle {
134    join_handle: Option<JoinHandle<()>>,
135}
136
137impl TraceEventDispatcher {
138    /// Create a new trace event dispatcher with the given sinks.
139    /// Uses a bounded channel (capacity QUEUE_CAPACITY) to ensure telemetry never blocks
140    /// the application. Events are dropped with a warning if the queue is full.
141    /// A separate unbounded priority channel guarantees delivery of critical events
142    /// like drop notifications (safe because drop events are rate-limited).
143    ///
144    /// # Arguments
145    /// * `sinks` - List of sinks to dispatch events to.
146    /// * `max_level` - Maximum level filter hint (None for no filtering)
147    pub(crate) fn new(
148        sinks: Vec<Box<dyn TraceEventSink>>,
149        max_level: Option<tracing::level_filters::LevelFilter>,
150    ) -> Self {
151        let (sender, receiver) = mpsc::sync_channel(QUEUE_CAPACITY);
152        let (dropped_sender, dropped_receiver) = mpsc::channel();
153        let dropped_events = Arc::new(AtomicU64::new(0));
154        let dropped_events_worker = Arc::clone(&dropped_events);
155
156        let worker_handle = std::thread::Builder::new()
157            .name("telemetry-worker".into())
158            .spawn(move || {
159                worker_loop(receiver, dropped_receiver, sinks, dropped_events_worker);
160            })
161            .expect("failed to spawn telemetry worker thread");
162
163        Self {
164            sender: Some(sender),
165            dropped_sender: Some(dropped_sender),
166            _worker_handle: WorkerHandle {
167                join_handle: Some(worker_handle),
168            },
169            max_level,
170            dropped_events,
171        }
172    }
173
174    fn send_event(&self, event: TraceEvent) {
175        if let Some(sender) = &self.sender {
176            if let Err(mpsc::TrySendError::Full(_)) = sender.try_send(event) {
177                let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
178
179                if dropped == 1 || dropped.is_multiple_of(1000) {
180                    eprintln!(
181                        "[telemetry]: {}  events and log lines dropped que to full queue (capacity: {})",
182                        dropped, QUEUE_CAPACITY
183                    );
184                    self.send_drop_event(dropped);
185                }
186            }
187        }
188    }
189
190    fn send_drop_event(&self, total_dropped: u64) {
191        if let Some(dropped_sender) = &self.dropped_sender {
192            #[cfg(target_os = "linux")]
193            let thread_id_num = {
194                // SAFETY: syscall(SYS_gettid) is always safe to call
195                unsafe { libc::syscall(libc::SYS_gettid) as u64 }
196            };
197            #[cfg(not(target_os = "linux"))]
198            let thread_id_num = {
199                let tid = std::thread::current().id();
200                // SAFETY: ThreadId transmute for non-Linux platforms
201                unsafe { std::mem::transmute::<std::thread::ThreadId, u64>(tid) }
202            };
203
204            let mut fields = IndexMap::new();
205            fields.insert(
206                "message".to_string(),
207                FieldValue::Str(format!(
208                    "Telemetry events and log lines dropped due to full queue (capacity: {}). Worker may be falling behind.",
209                    QUEUE_CAPACITY
210                )),
211            );
212            fields.insert("dropped_count".to_string(), FieldValue::U64(total_dropped));
213
214            // We want to just directly construct and send a `TraceEvent::Event` here so we don't need to
215            // reason very hard about whether or not we are creating a DoS loop
216            let drop_event = TraceEvent::Event {
217                name: "dropped events",
218                target: module_path!(),
219                level: tracing::Level::ERROR,
220                fields,
221                timestamp: SystemTime::now(),
222                parent_span: None,
223                thread_id: thread_id_num.to_string(),
224                thread_name: std::thread::current()
225                    .name()
226                    .unwrap_or_default()
227                    .to_string(),
228                module_path: Some(module_path!()),
229                file: Some(file!()),
230                line: Some(line!()),
231            };
232
233            if dropped_sender.send(drop_event).is_err() {
234                // Last resort
235                eprintln!(
236                    "[telemetry] CRITICAL: {} events and log lines dropped and unable to log to telemetry \
237                     (worker thread may have died). Telemetry system offline.",
238                    total_dropped
239                );
240            }
241        }
242    }
243}
244
245impl Drop for TraceEventDispatcher {
246    fn drop(&mut self) {
247        // Explicitly drop both senders to close the channels.
248        // The next field to be dropped is `worker_handle` which
249        // will run its own drop impl to join the thread and flush
250        drop(self.sender.take());
251        drop(self.dropped_sender.take());
252    }
253}
254
255impl<S> Layer<S> for TraceEventDispatcher
256where
257    S: Subscriber + for<'a> LookupSpan<'a>,
258{
259    fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
260        let metadata = attrs.metadata();
261        let mut fields = IndexMap::new();
262
263        let mut visitor = FieldVisitor(&mut fields);
264        attrs.record(&mut visitor);
265
266        let parent_id = if let Some(parent) = attrs.parent() {
267            Some(parent.into_u64())
268        } else {
269            ctx.current_span().id().map(|id| id.into_u64())
270        };
271
272        let thread_name = std::thread::current()
273            .name()
274            .unwrap_or_default()
275            .to_string();
276
277        let event = TraceEvent::NewSpan {
278            id: id.into_u64(),
279            name: metadata.name(),
280            target: metadata.target(),
281            level: *metadata.level(),
282            fields,
283            timestamp: SystemTime::now(),
284            parent_id,
285            thread_name,
286            file: metadata.file(),
287            line: metadata.line(),
288        };
289
290        self.send_event(event);
291    }
292
293    fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
294        let event = TraceEvent::SpanEnter {
295            id: id.into_u64(),
296            timestamp: SystemTime::now(),
297        };
298
299        self.send_event(event);
300    }
301
302    fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
303        let event = TraceEvent::SpanExit {
304            id: id.into_u64(),
305            timestamp: SystemTime::now(),
306        };
307
308        self.send_event(event);
309    }
310
311    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
312        let metadata = event.metadata();
313        let mut fields = IndexMap::new();
314        let mut visitor = FieldVisitor(&mut fields);
315        event.record(&mut visitor);
316
317        let parent_span = ctx.event_span(event).map(|span| span.id().into_u64());
318
319        #[cfg(target_os = "linux")]
320        let thread_id_num = {
321            // SAFETY: syscall(SYS_gettid) is always safe to call - it's a read-only
322            // syscall that returns the current thread's kernel thread ID (TID).
323            // The cast to u64 is safe because gettid() returns a positive pid_t.
324            unsafe { libc::syscall(libc::SYS_gettid) as u64 }
325        };
326        #[cfg(not(target_os = "linux"))]
327        let thread_id_num = {
328            let tid = std::thread::current().id();
329            // SAFETY: ThreadId is a newtype wrapper around a u64 counter.
330            // This transmute relies on the internal representation of ThreadId,
331            // which is stable in practice but not guaranteed by Rust's API.
332            // On non-Linux platforms this is a best-effort approximation.
333            // See: https://doc.rust-lang.org/std/thread/struct.ThreadId.html
334            unsafe { std::mem::transmute::<std::thread::ThreadId, u64>(tid) }
335        };
336        let thread_id_str = thread_id_num.to_string();
337
338        let thread_name = std::thread::current()
339            .name()
340            .unwrap_or_default()
341            .to_string();
342
343        let trace_event = TraceEvent::Event {
344            name: metadata.name(),
345            target: metadata.target(),
346            level: *metadata.level(),
347            fields,
348            timestamp: SystemTime::now(),
349            parent_span,
350            thread_id: thread_id_str,
351            thread_name,
352            module_path: metadata.module_path(),
353            file: metadata.file(),
354            line: metadata.line(),
355        };
356
357        self.send_event(trace_event);
358    }
359
360    fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
361        let event = TraceEvent::SpanClose {
362            id: id.into_u64(),
363            timestamp: SystemTime::now(),
364        };
365
366        self.send_event(event);
367    }
368
369    fn max_level_hint(&self) -> Option<tracing::level_filters::LevelFilter> {
370        self.max_level
371    }
372}
373
374struct FieldVisitor<'a>(&'a mut IndexMap<String, FieldValue>);
375
376impl<'a> tracing::field::Visit for FieldVisitor<'a> {
377    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
378        self.0
379            .insert(field.name().to_string(), FieldValue::Bool(value));
380    }
381
382    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
383        self.0
384            .insert(field.name().to_string(), FieldValue::I64(value));
385    }
386
387    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
388        self.0
389            .insert(field.name().to_string(), FieldValue::U64(value));
390    }
391
392    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
393        self.0
394            .insert(field.name().to_string(), FieldValue::F64(value));
395    }
396
397    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
398        self.0
399            .insert(field.name().to_string(), FieldValue::Str(value.to_string()));
400    }
401
402    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
403        self.0.insert(
404            field.name().to_string(),
405            FieldValue::Debug(format!("{:?}", value)),
406        );
407    }
408}
409
410/// Background worker loop that receives events from both regular and priority channels,
411/// and dispatches them to sinks. Priority events are processed first.
412/// Runs until both senders are dropped.
413fn worker_loop(
414    receiver: mpsc::Receiver<TraceEvent>,
415    dropped_receiver: mpsc::Receiver<TraceEvent>,
416    mut sinks: Vec<Box<dyn TraceEventSink>>,
417    dropped_events: Arc<AtomicU64>,
418) {
419    const FLUSH_INTERVAL: Duration = Duration::from_millis(100);
420    const FLUSH_EVENT_COUNT: usize = 1000;
421    let mut last_flush = std::time::Instant::now();
422    let mut events_since_flush = 0;
423
424    fn flush_sinks(sinks: &mut [Box<dyn TraceEventSink>]) {
425        for sink in sinks {
426            if let Err(e) = sink.flush() {
427                eprintln!("[telemetry] sink {} failed to flush: {}", sink.name(), e);
428            }
429        }
430    }
431
432    fn dispatch_to_sinks(sinks: &mut [Box<dyn TraceEventSink>], event: TraceEvent) {
433        for sink in sinks {
434            if match &event {
435                TraceEvent::NewSpan { target, level, .. }
436                | TraceEvent::Event { target, level, .. } => match sink.target_filter() {
437                    Some(targets) => targets.would_enable(target, level),
438                    None => true,
439                },
440                _ => true,
441            } {
442                if let Err(e) = sink.consume(&event) {
443                    eprintln!(
444                        "[telemetry] sink {} failed to consume event: {}",
445                        sink.name(),
446                        e
447                    );
448                }
449            }
450        }
451    }
452
453    loop {
454        while let Ok(event) = dropped_receiver.try_recv() {
455            dispatch_to_sinks(&mut sinks, event);
456            events_since_flush += 1;
457        }
458
459        match receiver.recv_timeout(FLUSH_INTERVAL) {
460            Ok(event) => {
461                dispatch_to_sinks(&mut sinks, event);
462                events_since_flush += 1;
463
464                if events_since_flush >= FLUSH_EVENT_COUNT || last_flush.elapsed() >= FLUSH_INTERVAL
465                {
466                    flush_sinks(&mut sinks);
467                    last_flush = std::time::Instant::now();
468                    events_since_flush = 0;
469                }
470            }
471            Err(mpsc::RecvTimeoutError::Timeout) => {
472                flush_sinks(&mut sinks);
473                last_flush = std::time::Instant::now();
474                events_since_flush = 0;
475            }
476            Err(mpsc::RecvTimeoutError::Disconnected) => {
477                break;
478            }
479        }
480    }
481
482    while let Ok(event) = dropped_receiver.try_recv() {
483        dispatch_to_sinks(&mut sinks, event);
484    }
485    while let Ok(event) = receiver.try_recv() {
486        dispatch_to_sinks(&mut sinks, event);
487    }
488
489    flush_sinks(&mut sinks);
490
491    let total_dropped = dropped_events.load(Ordering::Relaxed);
492    if total_dropped > 0 {
493        eprintln!(
494            "[telemetry] Telemetry worker shutting down. Total events dropped during session: {}",
495            total_dropped
496        );
497    }
498}
499
500impl Drop for WorkerHandle {
501    fn drop(&mut self) {
502        if let Some(handle) = self.join_handle.take() {
503            if let Err(e) = handle.join() {
504                eprintln!("[telemetry] worker thread panicked: {:?}", e);
505            }
506        }
507    }
508}