hyperactor_telemetry/
trace_dispatcher.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Unified telemetry layer that captures trace events once and fans out to multiple exporters
10//! on a background thread, eliminating redundant capture and moving work off the application
11//! thread.
12
13use std::cell::Cell;
14use std::sync::Arc;
15use std::sync::atomic::AtomicU64;
16use std::sync::atomic::Ordering;
17use std::sync::mpsc;
18use std::thread::JoinHandle;
19use std::time::Duration;
20use std::time::SystemTime;
21
22use smallvec::SmallVec;
23use tracing::Id;
24use tracing::Subscriber;
25use tracing::level_filters::LevelFilter;
26use tracing_subscriber::filter::Targets;
27use tracing_subscriber::layer::Context;
28use tracing_subscriber::layer::Layer;
29use tracing_subscriber::registry::LookupSpan;
30
31const QUEUE_CAPACITY: usize = 100_000;
32
33/// Type alias for trace event fields
34/// We expect that most trace events have fewer than 4 fields.
35pub(crate) type TraceFields = SmallVec<[(&'static str, FieldValue); 4]>;
36
37#[inline]
38pub(crate) fn get_field<'a>(fields: &'a TraceFields, key: &str) -> Option<&'a FieldValue> {
39    fields.iter().find(|(k, _)| *k == key).map(|(_, v)| v)
40}
41
42/// Unified representation of a trace event captured from the tracing layer.
43/// This is captured once on the application thread, then sent to the background
44/// worker for fan-out to multiple exporters.
45#[derive(Debug, Clone)]
46pub enum TraceEvent {
47    /// A new span was created (on_new_span)
48    NewSpan {
49        id: u64,
50        name: &'static str,
51        target: &'static str,
52        level: tracing::Level,
53        fields: TraceFields,
54        timestamp: SystemTime,
55        parent_id: Option<u64>,
56        thread_name: &'static str,
57        file: Option<&'static str>,
58        line: Option<u32>,
59    },
60    /// A span was entered (on_enter)
61    SpanEnter {
62        id: u64,
63        timestamp: SystemTime,
64        thread_name: &'static str,
65    },
66    /// A span was exited (on_exit)
67    SpanExit {
68        id: u64,
69        timestamp: SystemTime,
70        thread_name: &'static str,
71    },
72    /// A span was closed (dropped)
73    SpanClose { id: u64, timestamp: SystemTime },
74    /// A tracing event occurred (e.g., tracing::info!())
75    Event {
76        name: &'static str,
77        target: &'static str,
78        level: tracing::Level,
79        fields: TraceFields,
80        timestamp: SystemTime,
81        parent_span: Option<u64>,
82        thread_id: &'static str,
83        thread_name: &'static str,
84        module_path: Option<&'static str>,
85        file: Option<&'static str>,
86        line: Option<u32>,
87    },
88}
89
90/// Simplified field value representation for trace events
91#[derive(Debug, Clone)]
92pub enum FieldValue {
93    Bool(bool),
94    I64(i64),
95    U64(u64),
96    F64(f64),
97    Str(String),
98    Debug(String),
99}
100
101/// Trait for sinks that receive trace events from the dispatcher.
102/// Implementations run on the background worker thread and can perform
103/// expensive I/O operations without blocking the application.
104pub trait TraceEventSink: Send + 'static {
105    /// Consume a single event. Called on background thread.
106    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error>;
107
108    /// Optional target/level filter for this sink.
109    ///
110    /// The worker loop automatically applies this filter before calling `consume()`,
111    /// so sinks don't need to check target/level in their consume implementation.
112    /// Only `NewSpan` and `Event` are filtered by target/level; other event types
113    /// are always passed through.
114    ///
115    /// # Returns
116    /// - `None` - No filtering, all events are consumed (default)
117    /// - `Some(Targets)` - Only consume events matching the target/level filter
118    ///
119    /// # Example
120    /// ```ignore
121    /// fn target_filter(&self) -> Option<&Targets> {
122    ///     Some(Targets::new()
123    ///         .with_target("opentelemetry", LevelFilter::OFF)
124    ///         .with_default(LevelFilter::DEBUG))
125    /// }
126    /// ```
127    fn target_filter(&self) -> Option<&Targets> {
128        None
129    }
130
131    /// Flush any buffered events to the backend.
132    /// Called periodically and on shutdown.
133    fn flush(&mut self) -> Result<(), anyhow::Error>;
134
135    /// Optional: return name for debugging/logging
136    fn name(&self) -> &str {
137        std::any::type_name::<Self>()
138    }
139}
140
141thread_local! {
142    /// Cached thread info (thread_name, thread_id) for minimal overhead.
143    /// Strings are leaked once per thread to get &'static str - threads are long-lived so this is fine.
144    /// Uses Cell since (&'static str, &'static str) is Copy.
145    static CACHED_THREAD_INFO: Cell<Option<(&'static str, &'static str)>> = const { Cell::new(None) };
146}
147
148#[inline(always)]
149fn get_thread_info() -> (&'static str, &'static str) {
150    CACHED_THREAD_INFO.with(|cache| {
151        if let Some(info) = cache.get() {
152            return info;
153        }
154
155        let thread_name: &'static str = Box::leak(
156            std::thread::current()
157                .name()
158                .unwrap_or("")
159                .to_string()
160                .into_boxed_str(),
161        );
162
163        #[cfg(target_os = "linux")]
164        let thread_id: &'static str = {
165            // SAFETY: syscall(SYS_gettid) is always safe to call - it's a read-only
166            // syscall that returns the current thread's kernel thread ID (TID).
167            // The cast to u64 is safe because gettid() returns a positive pid_t.
168            let tid = unsafe { libc::syscall(libc::SYS_gettid) as u64 };
169            Box::leak(tid.to_string().into_boxed_str())
170        };
171        #[cfg(not(target_os = "linux"))]
172        let thread_id: &'static str = {
173            let tid = std::thread::current().id();
174            // SAFETY: ThreadId is a newtype wrapper around a u64 counter.
175            // This transmute relies on the internal representation of ThreadId,
176            // which is stable in practice but not guaranteed by Rust's API.
177            // On non-Linux platforms this is a best-effort approximation.
178            // See: https://doc.rust-lang.org/std/thread/struct.ThreadId.html
179            let tid_num = unsafe { std::mem::transmute::<std::thread::ThreadId, u64>(tid) };
180            Box::leak(tid_num.to_string().into_boxed_str())
181        };
182
183        cache.set(Some((thread_name, thread_id)));
184        (thread_name, thread_id)
185    })
186}
187
188/// Control messages for the dispatcher (e.g., adding sinks dynamically)
189pub enum DispatcherControl {
190    /// Add a new sink to receive events
191    AddSink(Box<dyn TraceEventSink>),
192}
193
194/// The trace event dispatcher that captures events once and dispatches to multiple sinks
195/// on a background thread.
196pub struct TraceEventDispatcher {
197    sender: Option<mpsc::SyncSender<TraceEvent>>,
198    /// Separate channel so we are always notified of when the main queue is full and events are being dropped.
199    dropped_sender: Option<mpsc::Sender<TraceEvent>>,
200    _worker_handle: WorkerHandle,
201    max_level: Option<LevelFilter>,
202    dropped_events: Arc<AtomicU64>,
203}
204
205struct WorkerHandle {
206    join_handle: Option<JoinHandle<()>>,
207}
208
209impl TraceEventDispatcher {
210    /// Create a new trace event dispatcher with the given sinks.
211    /// Uses a bounded channel (capacity QUEUE_CAPACITY) to ensure telemetry never blocks
212    /// the application. Events are dropped with a warning if the queue is full.
213    /// A separate unbounded priority channel guarantees delivery of critical events
214    /// like drop notifications (safe because drop events are rate-limited).
215    ///
216    /// Takes the global control receiver for dynamic sink registration. Sinks registered
217    /// via `register_sink()` before or after this call will be added to the dispatcher.
218    ///
219    /// # Arguments
220    /// * `sinks` - List of sinks to dispatch events to.
221    pub(crate) fn new(sinks: Vec<Box<dyn TraceEventSink>>) -> Self {
222        let max_level = Self::derive_max_level(&sinks);
223
224        let (sender, receiver) = mpsc::sync_channel(QUEUE_CAPACITY);
225        let (dropped_sender, dropped_receiver) = mpsc::channel();
226        // Take the global control receiver - sinks registered via register_sink() will be received here
227        let control_receiver = crate::take_sink_control_receiver();
228        let dropped_events = Arc::new(AtomicU64::new(0));
229        let dropped_events_worker = Arc::clone(&dropped_events);
230
231        let worker_handle = std::thread::Builder::new()
232            .name("telemetry-worker".into())
233            .spawn(move || {
234                worker_loop(
235                    receiver,
236                    dropped_receiver,
237                    control_receiver,
238                    sinks,
239                    dropped_events_worker,
240                );
241            })
242            .expect("failed to spawn telemetry worker thread");
243
244        Self {
245            sender: Some(sender),
246            dropped_sender: Some(dropped_sender),
247            _worker_handle: WorkerHandle {
248                join_handle: Some(worker_handle),
249            },
250            max_level,
251            dropped_events,
252        }
253    }
254
255    fn derive_max_level(sinks: &[Box<dyn TraceEventSink>]) -> Option<LevelFilter> {
256        let mut max_level: Option<LevelFilter> = None;
257
258        for sink in sinks {
259            let sink_max = match sink.target_filter() {
260                None => LevelFilter::TRACE,
261                Some(targets) => {
262                    let levels = [
263                        (tracing::Level::TRACE, LevelFilter::TRACE),
264                        (tracing::Level::DEBUG, LevelFilter::DEBUG),
265                        (tracing::Level::INFO, LevelFilter::INFO),
266                        (tracing::Level::WARN, LevelFilter::WARN),
267                        (tracing::Level::ERROR, LevelFilter::ERROR),
268                    ];
269                    let mut result = LevelFilter::OFF;
270                    for (level, filter) in levels {
271                        if targets.would_enable("", &level) {
272                            result = filter;
273                            break;
274                        }
275                    }
276                    result
277                }
278            };
279
280            max_level = Some(match max_level {
281                None => sink_max,
282                Some(current) => std::cmp::max(current, sink_max),
283            });
284        }
285
286        max_level
287    }
288
289    fn send_event(&self, event: TraceEvent) {
290        if let Some(sender) = &self.sender {
291            if let Err(mpsc::TrySendError::Full(_)) = sender.try_send(event) {
292                let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
293
294                if dropped == 1 || dropped.is_multiple_of(1000) {
295                    eprintln!(
296                        "[telemetry]: {}  events and log lines dropped que to full queue (capacity: {})",
297                        dropped, QUEUE_CAPACITY
298                    );
299                    self.send_drop_event(dropped);
300                }
301            }
302        }
303    }
304
305    fn send_drop_event(&self, total_dropped: u64) {
306        if let Some(dropped_sender) = &self.dropped_sender {
307            let (thread_name, thread_id) = get_thread_info();
308
309            let mut fields = TraceFields::new();
310            fields.push((
311                "message",
312                FieldValue::Str(format!(
313                    "Telemetry events and log lines dropped due to full queue (capacity: {}). Worker may be falling behind.",
314                    QUEUE_CAPACITY
315                )),
316            ));
317            fields.push(("dropped_count", FieldValue::U64(total_dropped)));
318
319            // We want to just directly construct and send a `TraceEvent::Event` here so we don't need to
320            // reason very hard about whether or not we are creating a DoS loop
321            let drop_event = TraceEvent::Event {
322                name: "dropped events",
323                target: module_path!(),
324                level: tracing::Level::ERROR,
325                fields,
326                timestamp: SystemTime::now(),
327                parent_span: None,
328                thread_id,
329                thread_name,
330                module_path: Some(module_path!()),
331                file: Some(file!()),
332                line: Some(line!()),
333            };
334
335            if dropped_sender.send(drop_event).is_err() {
336                // Last resort
337                eprintln!(
338                    "[telemetry] CRITICAL: {} events and log lines dropped and unable to log to telemetry \
339                     (worker thread may have died). Telemetry system offline.",
340                    total_dropped
341                );
342            }
343        }
344    }
345}
346
347impl Drop for TraceEventDispatcher {
348    fn drop(&mut self) {
349        // Explicitly drop both senders to close the channels.
350        // The next field to be dropped is `worker_handle` which
351        // will run its own drop impl to join the thread and flush
352        drop(self.sender.take());
353        drop(self.dropped_sender.take());
354    }
355}
356
357impl<S> Layer<S> for TraceEventDispatcher
358where
359    S: Subscriber + for<'a> LookupSpan<'a>,
360{
361    fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
362        let metadata = attrs.metadata();
363        let mut fields = TraceFields::new();
364
365        let mut visitor = FieldVisitor(&mut fields);
366        attrs.record(&mut visitor);
367
368        let parent_id = if let Some(parent) = attrs.parent() {
369            Some(parent.into_u64())
370        } else {
371            ctx.current_span().id().map(|id| id.into_u64())
372        };
373
374        let (thread_name, _) = get_thread_info();
375
376        let event = TraceEvent::NewSpan {
377            id: id.into_u64(),
378            name: metadata.name(),
379            target: metadata.target(),
380            level: *metadata.level(),
381            fields,
382            timestamp: SystemTime::now(),
383            parent_id,
384            thread_name,
385            file: metadata.file(),
386            line: metadata.line(),
387        };
388
389        self.send_event(event);
390    }
391
392    fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
393        let (thread_name, _) = get_thread_info();
394        let event = TraceEvent::SpanEnter {
395            id: id.into_u64(),
396            timestamp: SystemTime::now(),
397            thread_name,
398        };
399
400        self.send_event(event);
401    }
402
403    fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
404        let (thread_name, _) = get_thread_info();
405        let event = TraceEvent::SpanExit {
406            id: id.into_u64(),
407            timestamp: SystemTime::now(),
408            thread_name,
409        };
410
411        self.send_event(event);
412    }
413
414    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
415        let metadata = event.metadata();
416        let mut fields = TraceFields::new();
417        let mut visitor = FieldVisitor(&mut fields);
418        event.record(&mut visitor);
419
420        let parent_span = ctx.event_span(event).map(|span| span.id().into_u64());
421
422        let (thread_name, thread_id) = get_thread_info();
423
424        let trace_event = TraceEvent::Event {
425            name: metadata.name(),
426            target: metadata.target(),
427            level: *metadata.level(),
428            fields,
429            timestamp: SystemTime::now(),
430            parent_span,
431            thread_id,
432            thread_name,
433            module_path: metadata.module_path(),
434            file: metadata.file(),
435            line: metadata.line(),
436        };
437
438        self.send_event(trace_event);
439    }
440
441    fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
442        let event = TraceEvent::SpanClose {
443            id: id.into_u64(),
444            timestamp: SystemTime::now(),
445        };
446
447        self.send_event(event);
448    }
449
450    fn max_level_hint(&self) -> Option<LevelFilter> {
451        self.max_level
452    }
453}
454
455struct FieldVisitor<'a>(&'a mut TraceFields);
456
457impl<'a> tracing::field::Visit for FieldVisitor<'a> {
458    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
459        self.0.push((field.name(), FieldValue::Bool(value)));
460    }
461
462    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
463        self.0.push((field.name(), FieldValue::I64(value)));
464    }
465
466    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
467        self.0.push((field.name(), FieldValue::U64(value)));
468    }
469
470    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
471        self.0.push((field.name(), FieldValue::F64(value)));
472    }
473
474    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
475        self.0
476            .push((field.name(), FieldValue::Str(value.to_string())));
477    }
478
479    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
480        self.0
481            .push((field.name(), FieldValue::Debug(format!("{:?}", value))));
482    }
483}
484
485/// Background worker loop that receives events from both regular and priority channels,
486/// and dispatches them to sinks. Priority events are processed first.
487/// Runs until both senders are dropped.
488fn worker_loop(
489    receiver: mpsc::Receiver<TraceEvent>,
490    dropped_receiver: mpsc::Receiver<TraceEvent>,
491    control_receiver: Option<mpsc::Receiver<DispatcherControl>>,
492    mut sinks: Vec<Box<dyn TraceEventSink>>,
493    dropped_events: Arc<AtomicU64>,
494) {
495    const FLUSH_INTERVAL: Duration = Duration::from_millis(100);
496    const FLUSH_EVENT_COUNT: usize = 1000;
497    let mut last_flush = std::time::Instant::now();
498    let mut events_since_flush = 0;
499
500    fn flush_sinks(sinks: &mut [Box<dyn TraceEventSink>]) {
501        for sink in sinks {
502            if let Err(e) = sink.flush() {
503                eprintln!("[telemetry] sink {} failed to flush: {}", sink.name(), e);
504            }
505        }
506    }
507
508    fn dispatch_to_sinks(sinks: &mut [Box<dyn TraceEventSink>], event: TraceEvent) {
509        for sink in sinks {
510            if match &event {
511                TraceEvent::NewSpan { target, level, .. }
512                | TraceEvent::Event { target, level, .. } => match sink.target_filter() {
513                    Some(targets) => targets.would_enable(target, level),
514                    None => true,
515                },
516                _ => true,
517            } {
518                if let Err(e) = sink.consume(&event) {
519                    eprintln!(
520                        "[telemetry] sink {} failed to consume event: {}",
521                        sink.name(),
522                        e
523                    );
524                }
525            }
526        }
527    }
528
529    loop {
530        while let Ok(event) = dropped_receiver.try_recv() {
531            dispatch_to_sinks(&mut sinks, event);
532            events_since_flush += 1;
533        }
534
535        // Process any pending control messages (e.g., adding new sinks)
536        if let Some(ref ctrl_rx) = control_receiver {
537            while let Ok(control) = ctrl_rx.try_recv() {
538                match control {
539                    DispatcherControl::AddSink(sink) => {
540                        sinks.push(sink);
541                    }
542                }
543            }
544        }
545
546        match receiver.recv_timeout(FLUSH_INTERVAL) {
547            Ok(event) => {
548                dispatch_to_sinks(&mut sinks, event);
549                events_since_flush += 1;
550
551                if events_since_flush >= FLUSH_EVENT_COUNT || last_flush.elapsed() >= FLUSH_INTERVAL
552                {
553                    flush_sinks(&mut sinks);
554                    last_flush = std::time::Instant::now();
555                    events_since_flush = 0;
556                }
557            }
558            Err(mpsc::RecvTimeoutError::Timeout) => {
559                flush_sinks(&mut sinks);
560                last_flush = std::time::Instant::now();
561                events_since_flush = 0;
562            }
563            Err(mpsc::RecvTimeoutError::Disconnected) => {
564                break;
565            }
566        }
567    }
568
569    while let Ok(event) = dropped_receiver.try_recv() {
570        dispatch_to_sinks(&mut sinks, event);
571    }
572    while let Ok(event) = receiver.try_recv() {
573        dispatch_to_sinks(&mut sinks, event);
574    }
575
576    flush_sinks(&mut sinks);
577
578    let total_dropped = dropped_events.load(Ordering::Relaxed);
579    if total_dropped > 0 {
580        eprintln!(
581            "[telemetry] Telemetry worker shutting down. Total events dropped during session: {}",
582            total_dropped
583        );
584    }
585}
586
587impl Drop for WorkerHandle {
588    fn drop(&mut self) {
589        if let Some(handle) = self.join_handle.take() {
590            if let Err(e) = handle.join() {
591                eprintln!("[telemetry] worker thread panicked: {:?}", e);
592            }
593        }
594    }
595}