Skip to main content

hyperactor_telemetry/
trace_dispatcher.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Unified telemetry layer that captures trace events once and fans out to multiple exporters
10//! on a background thread, eliminating redundant capture and moving work off the application
11//! thread.
12
13use std::cell::Cell;
14use std::sync::Arc;
15use std::sync::atomic::AtomicU64;
16use std::sync::atomic::Ordering;
17use std::sync::mpsc;
18use std::thread::JoinHandle;
19use std::time::Duration;
20use std::time::SystemTime;
21
22use smallvec::SmallVec;
23use tracing::Id;
24use tracing::Subscriber;
25use tracing::level_filters::LevelFilter;
26use tracing_subscriber::filter::Targets;
27use tracing_subscriber::layer::Context;
28use tracing_subscriber::layer::Layer;
29use tracing_subscriber::registry::LookupSpan;
30
31const QUEUE_CAPACITY: usize = 100_000;
32
33/// Type alias for trace event fields
34/// We expect that most trace events have fewer than 4 fields.
35pub(crate) type TraceFields = SmallVec<[(&'static str, FieldValue); 4]>;
36
37#[inline]
38pub(crate) fn get_field<'a>(fields: &'a TraceFields, key: &str) -> Option<&'a FieldValue> {
39    fields.iter().find(|(k, _)| *k == key).map(|(_, v)| v)
40}
41
42/// Unified representation of a trace event captured from the tracing layer.
43/// This is captured once on the application thread, then sent to the background
44/// worker for fan-out to multiple exporters.
45#[derive(Debug, Clone)]
46pub enum TraceEvent {
47    /// A new span was created (on_new_span)
48    NewSpan {
49        id: u64,
50        name: &'static str,
51        target: &'static str,
52        level: tracing::Level,
53        fields: TraceFields,
54        timestamp: SystemTime,
55        parent_id: Option<u64>,
56        thread_name: &'static str,
57        file: Option<&'static str>,
58        line: Option<u32>,
59    },
60    /// A span was entered (on_enter)
61    SpanEnter {
62        id: u64,
63        timestamp: SystemTime,
64        thread_name: &'static str,
65    },
66    /// A span was exited (on_exit)
67    SpanExit {
68        id: u64,
69        timestamp: SystemTime,
70        thread_name: &'static str,
71    },
72    /// A span was closed (dropped)
73    SpanClose { id: u64, timestamp: SystemTime },
74    /// A tracing event occurred (e.g., tracing::info!())
75    Event {
76        name: &'static str,
77        target: &'static str,
78        level: tracing::Level,
79        fields: TraceFields,
80        timestamp: SystemTime,
81        parent_span: Option<u64>,
82        thread_id: &'static str,
83        thread_name: &'static str,
84        module_path: Option<&'static str>,
85        file: Option<&'static str>,
86        line: Option<u32>,
87    },
88}
89
90/// Simplified field value representation for trace events
91#[derive(Debug, Clone)]
92pub enum FieldValue {
93    Bool(bool),
94    I64(i64),
95    U64(u64),
96    F64(f64),
97    Str(String),
98    Debug(String),
99}
100
101/// Trait for sinks that receive trace events from the dispatcher.
102/// Implementations run on the background worker thread and can perform
103/// expensive I/O operations without blocking the application.
104pub trait TraceEventSink: Send + 'static {
105    /// Consume a single event. Called on background thread.
106    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error>;
107
108    /// Optional target/level filter for this sink.
109    ///
110    /// The worker loop automatically applies this filter before calling `consume()`,
111    /// so sinks don't need to check target/level in their consume implementation.
112    /// Only `NewSpan` and `Event` are filtered by target/level; other event types
113    /// are always passed through.
114    ///
115    /// # Returns
116    /// - `None` - No filtering, all events are consumed (default)
117    /// - `Some(Targets)` - Only consume events matching the target/level filter
118    ///
119    /// # Example
120    /// ```ignore
121    /// fn target_filter(&self) -> Option<&Targets> {
122    ///     Some(Targets::new()
123    ///         .with_target("opentelemetry", LevelFilter::OFF)
124    ///         .with_default(LevelFilter::DEBUG))
125    /// }
126    /// ```
127    fn target_filter(&self) -> Option<&Targets> {
128        None
129    }
130
131    /// Flush any buffered events to the backend.
132    /// Called periodically and on shutdown.
133    fn flush(&mut self) -> Result<(), anyhow::Error>;
134
135    /// Optional: return name for debugging/logging
136    fn name(&self) -> &str {
137        std::any::type_name::<Self>()
138    }
139}
140
141thread_local! {
142    /// Cached thread info (thread_name, thread_id) for minimal overhead.
143    /// Strings are leaked once per thread to get &'static str - threads are long-lived so this is fine.
144    /// Uses Cell since (&'static str, &'static str) is Copy.
145    static CACHED_THREAD_INFO: Cell<Option<(&'static str, &'static str)>> = const { Cell::new(None) };
146}
147
148#[inline(always)]
149fn get_thread_info() -> (&'static str, &'static str) {
150    CACHED_THREAD_INFO.with(|cache| {
151        if let Some(info) = cache.get() {
152            return info;
153        }
154
155        let thread_name: &'static str = Box::leak(
156            std::thread::current()
157                .name()
158                .unwrap_or("")
159                .to_string()
160                .into_boxed_str(),
161        );
162
163        #[cfg(target_os = "linux")]
164        let thread_id: &'static str = {
165            // SAFETY: syscall(SYS_gettid) is always safe to call - it's a read-only
166            // syscall that returns the current thread's kernel thread ID (TID).
167            // The cast to u64 is safe because gettid() returns a positive pid_t.
168            let tid = unsafe { libc::syscall(libc::SYS_gettid) as u64 };
169            Box::leak(tid.to_string().into_boxed_str())
170        };
171        #[cfg(not(target_os = "linux"))]
172        let thread_id: &'static str = {
173            let tid_num = std::thread::current().id().as_u64().get();
174            Box::leak(tid_num.to_string().into_boxed_str())
175        };
176
177        cache.set(Some((thread_name, thread_id)));
178        (thread_name, thread_id)
179    })
180}
181
182/// Control messages for the dispatcher (e.g., adding sinks dynamically)
183pub enum DispatcherControl {
184    /// Add a new sink to receive events
185    AddSink(Box<dyn TraceEventSink>),
186}
187
188/// The trace event dispatcher that captures events once and dispatches to multiple sinks
189/// on a background thread.
190pub struct TraceEventDispatcher {
191    sender: Option<mpsc::SyncSender<TraceEvent>>,
192    /// Separate channel so we are always notified of when the main queue is full and events are being dropped.
193    dropped_sender: Option<mpsc::Sender<TraceEvent>>,
194    _worker_handle: WorkerHandle,
195    max_level: Option<LevelFilter>,
196    dropped_events: Arc<AtomicU64>,
197}
198
199struct WorkerHandle {
200    join_handle: Option<JoinHandle<()>>,
201}
202
203thread_local! {
204    static IN_SEND: Cell<bool> = const { Cell::new(false) };
205}
206
207struct InSendGuard;
208
209impl Drop for InSendGuard {
210    fn drop(&mut self) {
211        IN_SEND.with(|f| f.set(false));
212    }
213}
214
215impl TraceEventDispatcher {
216    /// Create a new trace event dispatcher with the given sinks.
217    /// Uses a bounded channel (capacity QUEUE_CAPACITY) to ensure telemetry never blocks
218    /// the application. Events are dropped with a warning if the queue is full.
219    /// A separate unbounded priority channel guarantees delivery of critical events
220    /// like drop notifications (safe because drop events are rate-limited).
221    ///
222    /// Takes the global control receiver for dynamic sink registration. Sinks registered
223    /// via `register_sink()` before or after this call will be added to the dispatcher.
224    ///
225    /// # Arguments
226    /// * `sinks` - List of sinks to dispatch events to.
227    pub(crate) fn new(sinks: Vec<Box<dyn TraceEventSink>>) -> Self {
228        let max_level = Self::derive_max_level(&sinks);
229
230        let (sender, receiver) = mpsc::sync_channel(QUEUE_CAPACITY);
231        let (dropped_sender, dropped_receiver) = mpsc::channel();
232        // Take the global control receiver - sinks registered via register_sink() will be received here
233        let control_receiver = crate::take_sink_control_receiver();
234        let dropped_events = Arc::new(AtomicU64::new(0));
235        let dropped_events_worker = Arc::clone(&dropped_events);
236
237        let worker_handle = std::thread::Builder::new()
238            .name("telemetry-worker".into())
239            .spawn(move || {
240                worker_loop(
241                    receiver,
242                    dropped_receiver,
243                    control_receiver,
244                    sinks,
245                    dropped_events_worker,
246                );
247            })
248            .expect("failed to spawn telemetry worker thread");
249
250        Self {
251            sender: Some(sender),
252            dropped_sender: Some(dropped_sender),
253            _worker_handle: WorkerHandle {
254                join_handle: Some(worker_handle),
255            },
256            max_level,
257            dropped_events,
258        }
259    }
260
261    fn derive_max_level(sinks: &[Box<dyn TraceEventSink>]) -> Option<LevelFilter> {
262        let mut max_level: Option<LevelFilter> = None;
263
264        for sink in sinks {
265            let sink_max = match sink.target_filter() {
266                None => LevelFilter::TRACE,
267                Some(targets) => {
268                    let levels = [
269                        (tracing::Level::TRACE, LevelFilter::TRACE),
270                        (tracing::Level::DEBUG, LevelFilter::DEBUG),
271                        (tracing::Level::INFO, LevelFilter::INFO),
272                        (tracing::Level::WARN, LevelFilter::WARN),
273                        (tracing::Level::ERROR, LevelFilter::ERROR),
274                    ];
275                    let mut result = LevelFilter::OFF;
276                    for (level, filter) in levels {
277                        if targets.would_enable("", &level) {
278                            result = filter;
279                            break;
280                        }
281                    }
282                    result
283                }
284            };
285
286            max_level = Some(match max_level {
287                None => sink_max,
288                Some(current) => std::cmp::max(current, sink_max),
289            });
290        }
291
292        max_level
293    }
294
295    fn send_event(&self, event: TraceEvent) {
296        // Re-entrancy guard. A `Layer` callback may emit `tracing` events
297        // through code it touches—notably std's mpmc channel, which is itself
298        // instrumented—and those events loop back through this subscriber.
299        // Without this guard, the recursion exhausts the stack and SIGSEGVs.
300        if IN_SEND.with(|f| f.replace(true)) {
301            return;
302        }
303        let _reset = InSendGuard;
304
305        if let Some(sender) = &self.sender
306            && let Err(mpsc::TrySendError::Full(_)) = sender.try_send(event)
307        {
308            let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
309
310            if dropped == 1 || dropped.is_multiple_of(1000) {
311                eprintln!(
312                    "[telemetry]: {}  events and log lines dropped que to full queue (capacity: {})",
313                    dropped, QUEUE_CAPACITY
314                );
315                self.send_drop_event(dropped);
316            }
317        }
318    }
319
320    pub(crate) fn sender(&self) -> mpsc::SyncSender<TraceEvent> {
321        self.sender
322            .as_ref()
323            .expect("trace event dispatcher sender should exist during initialization")
324            .clone()
325    }
326
327    fn send_drop_event(&self, total_dropped: u64) {
328        if let Some(dropped_sender) = &self.dropped_sender {
329            let (thread_name, thread_id) = get_thread_info();
330
331            let mut fields = TraceFields::new();
332            fields.push((
333                "message",
334                FieldValue::Str(format!(
335                    "Telemetry events and log lines dropped due to full queue (capacity: {}). Worker may be falling behind.",
336                    QUEUE_CAPACITY
337                )),
338            ));
339            fields.push(("dropped_count", FieldValue::U64(total_dropped)));
340
341            // We want to just directly construct and send a `TraceEvent::Event` here so we don't need to
342            // reason very hard about whether or not we are creating a DoS loop
343            let drop_event = TraceEvent::Event {
344                name: "dropped events",
345                target: module_path!(),
346                level: tracing::Level::ERROR,
347                fields,
348                timestamp: SystemTime::now(),
349                parent_span: None,
350                thread_id,
351                thread_name,
352                module_path: Some(module_path!()),
353                file: Some(file!()),
354                line: Some(line!()),
355            };
356
357            if dropped_sender.send(drop_event).is_err() {
358                // Last resort
359                eprintln!(
360                    "[telemetry] CRITICAL: {} events and log lines dropped and unable to log to telemetry \
361                     (worker thread may have died). Telemetry system offline.",
362                    total_dropped
363                );
364            }
365        }
366    }
367}
368
369impl Drop for TraceEventDispatcher {
370    fn drop(&mut self) {
371        // Explicitly drop both senders to close the channels.
372        // The next field to be dropped is `worker_handle` which
373        // will run its own drop impl to join the thread and flush
374        drop(self.sender.take());
375        drop(self.dropped_sender.take());
376    }
377}
378
379impl<S> Layer<S> for TraceEventDispatcher
380where
381    S: Subscriber + for<'a> LookupSpan<'a>,
382{
383    fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
384        let metadata = attrs.metadata();
385        let mut fields = TraceFields::new();
386
387        let mut visitor = FieldVisitor(&mut fields);
388        attrs.record(&mut visitor);
389
390        let parent_id = if let Some(parent) = attrs.parent() {
391            Some(parent.into_u64())
392        } else {
393            ctx.current_span().id().map(|id| id.into_u64())
394        };
395
396        let (thread_name, _) = get_thread_info();
397
398        let event = TraceEvent::NewSpan {
399            id: id.into_u64(),
400            name: metadata.name(),
401            target: metadata.target(),
402            level: *metadata.level(),
403            fields,
404            timestamp: SystemTime::now(),
405            parent_id,
406            thread_name,
407            file: metadata.file(),
408            line: metadata.line(),
409        };
410
411        self.send_event(event);
412    }
413
414    fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
415        let (thread_name, _) = get_thread_info();
416        let event = TraceEvent::SpanEnter {
417            id: id.into_u64(),
418            timestamp: SystemTime::now(),
419            thread_name,
420        };
421
422        self.send_event(event);
423    }
424
425    fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
426        let (thread_name, _) = get_thread_info();
427        let event = TraceEvent::SpanExit {
428            id: id.into_u64(),
429            timestamp: SystemTime::now(),
430            thread_name,
431        };
432
433        self.send_event(event);
434    }
435
436    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
437        let metadata = event.metadata();
438        let mut fields = TraceFields::new();
439        let mut visitor = FieldVisitor(&mut fields);
440        event.record(&mut visitor);
441
442        let parent_span = ctx.event_span(event).map(|span| span.id().into_u64());
443
444        let (thread_name, thread_id) = get_thread_info();
445
446        let trace_event = TraceEvent::Event {
447            name: metadata.name(),
448            target: metadata.target(),
449            level: *metadata.level(),
450            fields,
451            timestamp: SystemTime::now(),
452            parent_span,
453            thread_id,
454            thread_name,
455            module_path: metadata.module_path(),
456            file: metadata.file(),
457            line: metadata.line(),
458        };
459
460        self.send_event(trace_event);
461    }
462
463    fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
464        let event = TraceEvent::SpanClose {
465            id: id.into_u64(),
466            timestamp: SystemTime::now(),
467        };
468
469        self.send_event(event);
470    }
471
472    fn max_level_hint(&self) -> Option<LevelFilter> {
473        self.max_level
474    }
475}
476
477struct FieldVisitor<'a>(&'a mut TraceFields);
478
479impl tracing::field::Visit for FieldVisitor<'_> {
480    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
481        self.0.push((field.name(), FieldValue::Bool(value)));
482    }
483
484    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
485        self.0.push((field.name(), FieldValue::I64(value)));
486    }
487
488    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
489        self.0.push((field.name(), FieldValue::U64(value)));
490    }
491
492    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
493        self.0.push((field.name(), FieldValue::F64(value)));
494    }
495
496    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
497        self.0
498            .push((field.name(), FieldValue::Str(value.to_string())));
499    }
500
501    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
502        self.0
503            .push((field.name(), FieldValue::Debug(format!("{:?}", value))));
504    }
505}
506
507/// Background worker loop that receives events from both regular and priority channels,
508/// and dispatches them to sinks. Priority events are processed first.
509/// Runs until both senders are dropped.
510fn worker_loop(
511    receiver: mpsc::Receiver<TraceEvent>,
512    dropped_receiver: mpsc::Receiver<TraceEvent>,
513    control_receiver: Option<mpsc::Receiver<DispatcherControl>>,
514    mut sinks: Vec<Box<dyn TraceEventSink>>,
515    dropped_events: Arc<AtomicU64>,
516) {
517    const FLUSH_INTERVAL: Duration = Duration::from_millis(100);
518    const FLUSH_EVENT_COUNT: usize = 1000;
519    let mut last_flush = std::time::Instant::now();
520    let mut events_since_flush = 0;
521
522    fn flush_sinks(sinks: &mut [Box<dyn TraceEventSink>]) {
523        for sink in sinks {
524            if let Err(e) = sink.flush() {
525                eprintln!("[telemetry] sink {} failed to flush: {}", sink.name(), e);
526            }
527        }
528    }
529
530    fn dispatch_to_sinks(sinks: &mut [Box<dyn TraceEventSink>], event: TraceEvent) {
531        for sink in sinks {
532            if match &event {
533                TraceEvent::NewSpan { target, level, .. }
534                | TraceEvent::Event { target, level, .. } => match sink.target_filter() {
535                    Some(targets) => targets.would_enable(target, level),
536                    None => true,
537                },
538                _ => true,
539            } && let Err(e) = sink.consume(&event)
540            {
541                eprintln!(
542                    "[telemetry] sink {} failed to consume event: {}",
543                    sink.name(),
544                    e
545                );
546            }
547        }
548    }
549
550    loop {
551        while let Ok(event) = dropped_receiver.try_recv() {
552            dispatch_to_sinks(&mut sinks, event);
553            events_since_flush += 1;
554        }
555
556        // Process any pending control messages (e.g., adding new sinks)
557        if let Some(ref ctrl_rx) = control_receiver {
558            while let Ok(control) = ctrl_rx.try_recv() {
559                match control {
560                    DispatcherControl::AddSink(sink) => {
561                        sinks.push(sink);
562                    }
563                }
564            }
565        }
566
567        match receiver.recv_timeout(FLUSH_INTERVAL) {
568            Ok(event) => {
569                dispatch_to_sinks(&mut sinks, event);
570                events_since_flush += 1;
571
572                if events_since_flush >= FLUSH_EVENT_COUNT || last_flush.elapsed() >= FLUSH_INTERVAL
573                {
574                    flush_sinks(&mut sinks);
575                    last_flush = std::time::Instant::now();
576                    events_since_flush = 0;
577                }
578            }
579            Err(mpsc::RecvTimeoutError::Timeout) => {
580                flush_sinks(&mut sinks);
581                last_flush = std::time::Instant::now();
582                events_since_flush = 0;
583            }
584            Err(mpsc::RecvTimeoutError::Disconnected) => {
585                break;
586            }
587        }
588    }
589
590    while let Ok(event) = dropped_receiver.try_recv() {
591        dispatch_to_sinks(&mut sinks, event);
592    }
593    while let Ok(event) = receiver.try_recv() {
594        dispatch_to_sinks(&mut sinks, event);
595    }
596
597    flush_sinks(&mut sinks);
598
599    let total_dropped = dropped_events.load(Ordering::Relaxed);
600    if total_dropped > 0 {
601        eprintln!(
602            "[telemetry] Telemetry worker shutting down. Total events dropped during session: {}",
603            total_dropped
604        );
605    }
606}
607
608impl Drop for WorkerHandle {
609    fn drop(&mut self) {
610        if let Some(handle) = self.join_handle.take()
611            && let Err(e) = handle.join()
612        {
613            eprintln!("[telemetry] worker thread panicked: {:?}", e);
614        }
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use std::sync::Arc;
621    use std::sync::Mutex;
622
623    use super::*;
624
625    #[derive(Default)]
626    struct RecordingSink {
627        events: Arc<Mutex<Vec<TraceEvent>>>,
628    }
629
630    impl TraceEventSink for RecordingSink {
631        fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
632            self.events.lock().unwrap().push(event.clone());
633            Ok(())
634        }
635
636        fn flush(&mut self) -> Result<(), anyhow::Error> {
637            Ok(())
638        }
639    }
640
641    fn span_close(id: u64) -> TraceEvent {
642        TraceEvent::SpanClose {
643            id,
644            timestamp: SystemTime::now(),
645        }
646    }
647
648    #[test]
649    fn send_event_delivers_repeatedly() {
650        let sink = RecordingSink::default();
651        let recorded = Arc::clone(&sink.events);
652        let dispatcher = TraceEventDispatcher::new(vec![Box::new(sink)]);
653
654        dispatcher.send_event(span_close(1));
655        dispatcher.send_event(span_close(2));
656        dispatcher.send_event(span_close(3));
657
658        drop(dispatcher);
659        assert_eq!(recorded.lock().unwrap().len(), 3);
660    }
661
662    #[test]
663    fn send_event_drops_on_reentrance() {
664        let sink = RecordingSink::default();
665        let recorded = Arc::clone(&sink.events);
666        let dispatcher = TraceEventDispatcher::new(vec![Box::new(sink)]);
667
668        // Simulate that this thread is already inside `send_event`. The nested
669        // call must short-circuit; otherwise a `Layer` callback that re-enters
670        // the subscriber would recurse without bound.
671        IN_SEND.with(|f| f.set(true));
672        dispatcher.send_event(span_close(1));
673        IN_SEND.with(|f| f.set(false));
674
675        drop(dispatcher);
676        assert!(recorded.lock().unwrap().is_empty());
677    }
678}