1use std::cell::Cell;
14use std::sync::Arc;
15use std::sync::atomic::AtomicU64;
16use std::sync::atomic::Ordering;
17use std::sync::mpsc;
18use std::thread::JoinHandle;
19use std::time::Duration;
20use std::time::SystemTime;
21
22use smallvec::SmallVec;
23use tracing::Id;
24use tracing::Subscriber;
25use tracing::level_filters::LevelFilter;
26use tracing_subscriber::filter::Targets;
27use tracing_subscriber::layer::Context;
28use tracing_subscriber::layer::Layer;
29use tracing_subscriber::registry::LookupSpan;
30
31const QUEUE_CAPACITY: usize = 100_000;
32
33pub(crate) type TraceFields = SmallVec<[(&'static str, FieldValue); 4]>;
36
37#[inline]
38pub(crate) fn get_field<'a>(fields: &'a TraceFields, key: &str) -> Option<&'a FieldValue> {
39 fields.iter().find(|(k, _)| *k == key).map(|(_, v)| v)
40}
41
42#[derive(Debug, Clone)]
46pub enum TraceEvent {
47 NewSpan {
49 id: u64,
50 name: &'static str,
51 target: &'static str,
52 level: tracing::Level,
53 fields: TraceFields,
54 timestamp: SystemTime,
55 parent_id: Option<u64>,
56 thread_name: &'static str,
57 file: Option<&'static str>,
58 line: Option<u32>,
59 },
60 SpanEnter {
62 id: u64,
63 timestamp: SystemTime,
64 thread_name: &'static str,
65 },
66 SpanExit {
68 id: u64,
69 timestamp: SystemTime,
70 thread_name: &'static str,
71 },
72 SpanClose { id: u64, timestamp: SystemTime },
74 Event {
76 name: &'static str,
77 target: &'static str,
78 level: tracing::Level,
79 fields: TraceFields,
80 timestamp: SystemTime,
81 parent_span: Option<u64>,
82 thread_id: &'static str,
83 thread_name: &'static str,
84 module_path: Option<&'static str>,
85 file: Option<&'static str>,
86 line: Option<u32>,
87 },
88}
89
90#[derive(Debug, Clone)]
92pub enum FieldValue {
93 Bool(bool),
94 I64(i64),
95 U64(u64),
96 F64(f64),
97 Str(String),
98 Debug(String),
99}
100
101pub trait TraceEventSink: Send + 'static {
105 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error>;
107
108 fn target_filter(&self) -> Option<&Targets> {
128 None
129 }
130
131 fn flush(&mut self) -> Result<(), anyhow::Error>;
134
135 fn name(&self) -> &str {
137 std::any::type_name::<Self>()
138 }
139}
140
141thread_local! {
142 static CACHED_THREAD_INFO: Cell<Option<(&'static str, &'static str)>> = const { Cell::new(None) };
146}
147
148#[inline(always)]
149fn get_thread_info() -> (&'static str, &'static str) {
150 CACHED_THREAD_INFO.with(|cache| {
151 if let Some(info) = cache.get() {
152 return info;
153 }
154
155 let thread_name: &'static str = Box::leak(
156 std::thread::current()
157 .name()
158 .unwrap_or("")
159 .to_string()
160 .into_boxed_str(),
161 );
162
163 #[cfg(target_os = "linux")]
164 let thread_id: &'static str = {
165 let tid = unsafe { libc::syscall(libc::SYS_gettid) as u64 };
169 Box::leak(tid.to_string().into_boxed_str())
170 };
171 #[cfg(not(target_os = "linux"))]
172 let thread_id: &'static str = {
173 let tid_num = std::thread::current().id().as_u64().get();
174 Box::leak(tid_num.to_string().into_boxed_str())
175 };
176
177 cache.set(Some((thread_name, thread_id)));
178 (thread_name, thread_id)
179 })
180}
181
182pub enum DispatcherControl {
184 AddSink(Box<dyn TraceEventSink>),
186}
187
188pub struct TraceEventDispatcher {
191 sender: Option<mpsc::SyncSender<TraceEvent>>,
192 dropped_sender: Option<mpsc::Sender<TraceEvent>>,
194 _worker_handle: WorkerHandle,
195 max_level: Option<LevelFilter>,
196 dropped_events: Arc<AtomicU64>,
197}
198
199struct WorkerHandle {
200 join_handle: Option<JoinHandle<()>>,
201}
202
203thread_local! {
204 static IN_SEND: Cell<bool> = const { Cell::new(false) };
205}
206
207struct InSendGuard;
208
209impl Drop for InSendGuard {
210 fn drop(&mut self) {
211 IN_SEND.with(|f| f.set(false));
212 }
213}
214
215impl TraceEventDispatcher {
216 pub(crate) fn new(sinks: Vec<Box<dyn TraceEventSink>>) -> Self {
228 let max_level = Self::derive_max_level(&sinks);
229
230 let (sender, receiver) = mpsc::sync_channel(QUEUE_CAPACITY);
231 let (dropped_sender, dropped_receiver) = mpsc::channel();
232 let control_receiver = crate::take_sink_control_receiver();
234 let dropped_events = Arc::new(AtomicU64::new(0));
235 let dropped_events_worker = Arc::clone(&dropped_events);
236
237 let worker_handle = std::thread::Builder::new()
238 .name("telemetry-worker".into())
239 .spawn(move || {
240 worker_loop(
241 receiver,
242 dropped_receiver,
243 control_receiver,
244 sinks,
245 dropped_events_worker,
246 );
247 })
248 .expect("failed to spawn telemetry worker thread");
249
250 Self {
251 sender: Some(sender),
252 dropped_sender: Some(dropped_sender),
253 _worker_handle: WorkerHandle {
254 join_handle: Some(worker_handle),
255 },
256 max_level,
257 dropped_events,
258 }
259 }
260
261 fn derive_max_level(sinks: &[Box<dyn TraceEventSink>]) -> Option<LevelFilter> {
262 let mut max_level: Option<LevelFilter> = None;
263
264 for sink in sinks {
265 let sink_max = match sink.target_filter() {
266 None => LevelFilter::TRACE,
267 Some(targets) => {
268 let levels = [
269 (tracing::Level::TRACE, LevelFilter::TRACE),
270 (tracing::Level::DEBUG, LevelFilter::DEBUG),
271 (tracing::Level::INFO, LevelFilter::INFO),
272 (tracing::Level::WARN, LevelFilter::WARN),
273 (tracing::Level::ERROR, LevelFilter::ERROR),
274 ];
275 let mut result = LevelFilter::OFF;
276 for (level, filter) in levels {
277 if targets.would_enable("", &level) {
278 result = filter;
279 break;
280 }
281 }
282 result
283 }
284 };
285
286 max_level = Some(match max_level {
287 None => sink_max,
288 Some(current) => std::cmp::max(current, sink_max),
289 });
290 }
291
292 max_level
293 }
294
295 fn send_event(&self, event: TraceEvent) {
296 if IN_SEND.with(|f| f.replace(true)) {
301 return;
302 }
303 let _reset = InSendGuard;
304
305 if let Some(sender) = &self.sender
306 && let Err(mpsc::TrySendError::Full(_)) = sender.try_send(event)
307 {
308 let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
309
310 if dropped == 1 || dropped.is_multiple_of(1000) {
311 eprintln!(
312 "[telemetry]: {} events and log lines dropped que to full queue (capacity: {})",
313 dropped, QUEUE_CAPACITY
314 );
315 self.send_drop_event(dropped);
316 }
317 }
318 }
319
320 pub(crate) fn sender(&self) -> mpsc::SyncSender<TraceEvent> {
321 self.sender
322 .as_ref()
323 .expect("trace event dispatcher sender should exist during initialization")
324 .clone()
325 }
326
327 fn send_drop_event(&self, total_dropped: u64) {
328 if let Some(dropped_sender) = &self.dropped_sender {
329 let (thread_name, thread_id) = get_thread_info();
330
331 let mut fields = TraceFields::new();
332 fields.push((
333 "message",
334 FieldValue::Str(format!(
335 "Telemetry events and log lines dropped due to full queue (capacity: {}). Worker may be falling behind.",
336 QUEUE_CAPACITY
337 )),
338 ));
339 fields.push(("dropped_count", FieldValue::U64(total_dropped)));
340
341 let drop_event = TraceEvent::Event {
344 name: "dropped events",
345 target: module_path!(),
346 level: tracing::Level::ERROR,
347 fields,
348 timestamp: SystemTime::now(),
349 parent_span: None,
350 thread_id,
351 thread_name,
352 module_path: Some(module_path!()),
353 file: Some(file!()),
354 line: Some(line!()),
355 };
356
357 if dropped_sender.send(drop_event).is_err() {
358 eprintln!(
360 "[telemetry] CRITICAL: {} events and log lines dropped and unable to log to telemetry \
361 (worker thread may have died). Telemetry system offline.",
362 total_dropped
363 );
364 }
365 }
366 }
367}
368
369impl Drop for TraceEventDispatcher {
370 fn drop(&mut self) {
371 drop(self.sender.take());
375 drop(self.dropped_sender.take());
376 }
377}
378
379impl<S> Layer<S> for TraceEventDispatcher
380where
381 S: Subscriber + for<'a> LookupSpan<'a>,
382{
383 fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
384 let metadata = attrs.metadata();
385 let mut fields = TraceFields::new();
386
387 let mut visitor = FieldVisitor(&mut fields);
388 attrs.record(&mut visitor);
389
390 let parent_id = if let Some(parent) = attrs.parent() {
391 Some(parent.into_u64())
392 } else {
393 ctx.current_span().id().map(|id| id.into_u64())
394 };
395
396 let (thread_name, _) = get_thread_info();
397
398 let event = TraceEvent::NewSpan {
399 id: id.into_u64(),
400 name: metadata.name(),
401 target: metadata.target(),
402 level: *metadata.level(),
403 fields,
404 timestamp: SystemTime::now(),
405 parent_id,
406 thread_name,
407 file: metadata.file(),
408 line: metadata.line(),
409 };
410
411 self.send_event(event);
412 }
413
414 fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
415 let (thread_name, _) = get_thread_info();
416 let event = TraceEvent::SpanEnter {
417 id: id.into_u64(),
418 timestamp: SystemTime::now(),
419 thread_name,
420 };
421
422 self.send_event(event);
423 }
424
425 fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
426 let (thread_name, _) = get_thread_info();
427 let event = TraceEvent::SpanExit {
428 id: id.into_u64(),
429 timestamp: SystemTime::now(),
430 thread_name,
431 };
432
433 self.send_event(event);
434 }
435
436 fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
437 let metadata = event.metadata();
438 let mut fields = TraceFields::new();
439 let mut visitor = FieldVisitor(&mut fields);
440 event.record(&mut visitor);
441
442 let parent_span = ctx.event_span(event).map(|span| span.id().into_u64());
443
444 let (thread_name, thread_id) = get_thread_info();
445
446 let trace_event = TraceEvent::Event {
447 name: metadata.name(),
448 target: metadata.target(),
449 level: *metadata.level(),
450 fields,
451 timestamp: SystemTime::now(),
452 parent_span,
453 thread_id,
454 thread_name,
455 module_path: metadata.module_path(),
456 file: metadata.file(),
457 line: metadata.line(),
458 };
459
460 self.send_event(trace_event);
461 }
462
463 fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
464 let event = TraceEvent::SpanClose {
465 id: id.into_u64(),
466 timestamp: SystemTime::now(),
467 };
468
469 self.send_event(event);
470 }
471
472 fn max_level_hint(&self) -> Option<LevelFilter> {
473 self.max_level
474 }
475}
476
477struct FieldVisitor<'a>(&'a mut TraceFields);
478
479impl tracing::field::Visit for FieldVisitor<'_> {
480 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
481 self.0.push((field.name(), FieldValue::Bool(value)));
482 }
483
484 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
485 self.0.push((field.name(), FieldValue::I64(value)));
486 }
487
488 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
489 self.0.push((field.name(), FieldValue::U64(value)));
490 }
491
492 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
493 self.0.push((field.name(), FieldValue::F64(value)));
494 }
495
496 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
497 self.0
498 .push((field.name(), FieldValue::Str(value.to_string())));
499 }
500
501 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
502 self.0
503 .push((field.name(), FieldValue::Debug(format!("{:?}", value))));
504 }
505}
506
507fn worker_loop(
511 receiver: mpsc::Receiver<TraceEvent>,
512 dropped_receiver: mpsc::Receiver<TraceEvent>,
513 control_receiver: Option<mpsc::Receiver<DispatcherControl>>,
514 mut sinks: Vec<Box<dyn TraceEventSink>>,
515 dropped_events: Arc<AtomicU64>,
516) {
517 const FLUSH_INTERVAL: Duration = Duration::from_millis(100);
518 const FLUSH_EVENT_COUNT: usize = 1000;
519 let mut last_flush = std::time::Instant::now();
520 let mut events_since_flush = 0;
521
522 fn flush_sinks(sinks: &mut [Box<dyn TraceEventSink>]) {
523 for sink in sinks {
524 if let Err(e) = sink.flush() {
525 eprintln!("[telemetry] sink {} failed to flush: {}", sink.name(), e);
526 }
527 }
528 }
529
530 fn dispatch_to_sinks(sinks: &mut [Box<dyn TraceEventSink>], event: TraceEvent) {
531 for sink in sinks {
532 if match &event {
533 TraceEvent::NewSpan { target, level, .. }
534 | TraceEvent::Event { target, level, .. } => match sink.target_filter() {
535 Some(targets) => targets.would_enable(target, level),
536 None => true,
537 },
538 _ => true,
539 } && let Err(e) = sink.consume(&event)
540 {
541 eprintln!(
542 "[telemetry] sink {} failed to consume event: {}",
543 sink.name(),
544 e
545 );
546 }
547 }
548 }
549
550 loop {
551 while let Ok(event) = dropped_receiver.try_recv() {
552 dispatch_to_sinks(&mut sinks, event);
553 events_since_flush += 1;
554 }
555
556 if let Some(ref ctrl_rx) = control_receiver {
558 while let Ok(control) = ctrl_rx.try_recv() {
559 match control {
560 DispatcherControl::AddSink(sink) => {
561 sinks.push(sink);
562 }
563 }
564 }
565 }
566
567 match receiver.recv_timeout(FLUSH_INTERVAL) {
568 Ok(event) => {
569 dispatch_to_sinks(&mut sinks, event);
570 events_since_flush += 1;
571
572 if events_since_flush >= FLUSH_EVENT_COUNT || last_flush.elapsed() >= FLUSH_INTERVAL
573 {
574 flush_sinks(&mut sinks);
575 last_flush = std::time::Instant::now();
576 events_since_flush = 0;
577 }
578 }
579 Err(mpsc::RecvTimeoutError::Timeout) => {
580 flush_sinks(&mut sinks);
581 last_flush = std::time::Instant::now();
582 events_since_flush = 0;
583 }
584 Err(mpsc::RecvTimeoutError::Disconnected) => {
585 break;
586 }
587 }
588 }
589
590 while let Ok(event) = dropped_receiver.try_recv() {
591 dispatch_to_sinks(&mut sinks, event);
592 }
593 while let Ok(event) = receiver.try_recv() {
594 dispatch_to_sinks(&mut sinks, event);
595 }
596
597 flush_sinks(&mut sinks);
598
599 let total_dropped = dropped_events.load(Ordering::Relaxed);
600 if total_dropped > 0 {
601 eprintln!(
602 "[telemetry] Telemetry worker shutting down. Total events dropped during session: {}",
603 total_dropped
604 );
605 }
606}
607
608impl Drop for WorkerHandle {
609 fn drop(&mut self) {
610 if let Some(handle) = self.join_handle.take()
611 && let Err(e) = handle.join()
612 {
613 eprintln!("[telemetry] worker thread panicked: {:?}", e);
614 }
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use std::sync::Arc;
621 use std::sync::Mutex;
622
623 use super::*;
624
625 #[derive(Default)]
626 struct RecordingSink {
627 events: Arc<Mutex<Vec<TraceEvent>>>,
628 }
629
630 impl TraceEventSink for RecordingSink {
631 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
632 self.events.lock().unwrap().push(event.clone());
633 Ok(())
634 }
635
636 fn flush(&mut self) -> Result<(), anyhow::Error> {
637 Ok(())
638 }
639 }
640
641 fn span_close(id: u64) -> TraceEvent {
642 TraceEvent::SpanClose {
643 id,
644 timestamp: SystemTime::now(),
645 }
646 }
647
648 #[test]
649 fn send_event_delivers_repeatedly() {
650 let sink = RecordingSink::default();
651 let recorded = Arc::clone(&sink.events);
652 let dispatcher = TraceEventDispatcher::new(vec![Box::new(sink)]);
653
654 dispatcher.send_event(span_close(1));
655 dispatcher.send_event(span_close(2));
656 dispatcher.send_event(span_close(3));
657
658 drop(dispatcher);
659 assert_eq!(recorded.lock().unwrap().len(), 3);
660 }
661
662 #[test]
663 fn send_event_drops_on_reentrance() {
664 let sink = RecordingSink::default();
665 let recorded = Arc::clone(&sink.events);
666 let dispatcher = TraceEventDispatcher::new(vec![Box::new(sink)]);
667
668 IN_SEND.with(|f| f.set(true));
672 dispatcher.send_event(span_close(1));
673 IN_SEND.with(|f| f.set(false));
674
675 drop(dispatcher);
676 assert!(recorded.lock().unwrap().is_empty());
677 }
678}