1use std::cell::Cell;
14use std::sync::Arc;
15use std::sync::atomic::AtomicU64;
16use std::sync::atomic::Ordering;
17use std::sync::mpsc;
18use std::thread::JoinHandle;
19use std::time::Duration;
20use std::time::SystemTime;
21
22use smallvec::SmallVec;
23use tracing::Id;
24use tracing::Subscriber;
25use tracing::level_filters::LevelFilter;
26use tracing_subscriber::filter::Targets;
27use tracing_subscriber::layer::Context;
28use tracing_subscriber::layer::Layer;
29use tracing_subscriber::registry::LookupSpan;
30
31const QUEUE_CAPACITY: usize = 100_000;
32
33pub(crate) type TraceFields = SmallVec<[(&'static str, FieldValue); 4]>;
36
37#[inline]
38pub(crate) fn get_field<'a>(fields: &'a TraceFields, key: &str) -> Option<&'a FieldValue> {
39 fields.iter().find(|(k, _)| *k == key).map(|(_, v)| v)
40}
41
42#[derive(Debug, Clone)]
46pub enum TraceEvent {
47 NewSpan {
49 id: u64,
50 name: &'static str,
51 target: &'static str,
52 level: tracing::Level,
53 fields: TraceFields,
54 timestamp: SystemTime,
55 parent_id: Option<u64>,
56 thread_name: &'static str,
57 file: Option<&'static str>,
58 line: Option<u32>,
59 },
60 SpanEnter {
62 id: u64,
63 timestamp: SystemTime,
64 thread_name: &'static str,
65 },
66 SpanExit {
68 id: u64,
69 timestamp: SystemTime,
70 thread_name: &'static str,
71 },
72 SpanClose { id: u64, timestamp: SystemTime },
74 Event {
76 name: &'static str,
77 target: &'static str,
78 level: tracing::Level,
79 fields: TraceFields,
80 timestamp: SystemTime,
81 parent_span: Option<u64>,
82 thread_id: &'static str,
83 thread_name: &'static str,
84 module_path: Option<&'static str>,
85 file: Option<&'static str>,
86 line: Option<u32>,
87 },
88}
89
90#[derive(Debug, Clone)]
92pub enum FieldValue {
93 Bool(bool),
94 I64(i64),
95 U64(u64),
96 F64(f64),
97 Str(String),
98 Debug(String),
99}
100
101pub trait TraceEventSink: Send + 'static {
105 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error>;
107
108 fn target_filter(&self) -> Option<&Targets> {
128 None
129 }
130
131 fn flush(&mut self) -> Result<(), anyhow::Error>;
134
135 fn name(&self) -> &str {
137 std::any::type_name::<Self>()
138 }
139}
140
141thread_local! {
142 static CACHED_THREAD_INFO: Cell<Option<(&'static str, &'static str)>> = const { Cell::new(None) };
146}
147
148#[inline(always)]
149fn get_thread_info() -> (&'static str, &'static str) {
150 CACHED_THREAD_INFO.with(|cache| {
151 if let Some(info) = cache.get() {
152 return info;
153 }
154
155 let thread_name: &'static str = Box::leak(
156 std::thread::current()
157 .name()
158 .unwrap_or("")
159 .to_string()
160 .into_boxed_str(),
161 );
162
163 #[cfg(target_os = "linux")]
164 let thread_id: &'static str = {
165 let tid = unsafe { libc::syscall(libc::SYS_gettid) as u64 };
169 Box::leak(tid.to_string().into_boxed_str())
170 };
171 #[cfg(not(target_os = "linux"))]
172 let thread_id: &'static str = {
173 let tid = std::thread::current().id();
174 let tid_num = unsafe { std::mem::transmute::<std::thread::ThreadId, u64>(tid) };
180 Box::leak(tid_num.to_string().into_boxed_str())
181 };
182
183 cache.set(Some((thread_name, thread_id)));
184 (thread_name, thread_id)
185 })
186}
187
188pub enum DispatcherControl {
190 AddSink(Box<dyn TraceEventSink>),
192}
193
194pub struct TraceEventDispatcher {
197 sender: Option<mpsc::SyncSender<TraceEvent>>,
198 dropped_sender: Option<mpsc::Sender<TraceEvent>>,
200 _worker_handle: WorkerHandle,
201 max_level: Option<LevelFilter>,
202 dropped_events: Arc<AtomicU64>,
203}
204
205struct WorkerHandle {
206 join_handle: Option<JoinHandle<()>>,
207}
208
209impl TraceEventDispatcher {
210 pub(crate) fn new(sinks: Vec<Box<dyn TraceEventSink>>) -> Self {
222 let max_level = Self::derive_max_level(&sinks);
223
224 let (sender, receiver) = mpsc::sync_channel(QUEUE_CAPACITY);
225 let (dropped_sender, dropped_receiver) = mpsc::channel();
226 let control_receiver = crate::take_sink_control_receiver();
228 let dropped_events = Arc::new(AtomicU64::new(0));
229 let dropped_events_worker = Arc::clone(&dropped_events);
230
231 let worker_handle = std::thread::Builder::new()
232 .name("telemetry-worker".into())
233 .spawn(move || {
234 worker_loop(
235 receiver,
236 dropped_receiver,
237 control_receiver,
238 sinks,
239 dropped_events_worker,
240 );
241 })
242 .expect("failed to spawn telemetry worker thread");
243
244 Self {
245 sender: Some(sender),
246 dropped_sender: Some(dropped_sender),
247 _worker_handle: WorkerHandle {
248 join_handle: Some(worker_handle),
249 },
250 max_level,
251 dropped_events,
252 }
253 }
254
255 fn derive_max_level(sinks: &[Box<dyn TraceEventSink>]) -> Option<LevelFilter> {
256 let mut max_level: Option<LevelFilter> = None;
257
258 for sink in sinks {
259 let sink_max = match sink.target_filter() {
260 None => LevelFilter::TRACE,
261 Some(targets) => {
262 let levels = [
263 (tracing::Level::TRACE, LevelFilter::TRACE),
264 (tracing::Level::DEBUG, LevelFilter::DEBUG),
265 (tracing::Level::INFO, LevelFilter::INFO),
266 (tracing::Level::WARN, LevelFilter::WARN),
267 (tracing::Level::ERROR, LevelFilter::ERROR),
268 ];
269 let mut result = LevelFilter::OFF;
270 for (level, filter) in levels {
271 if targets.would_enable("", &level) {
272 result = filter;
273 break;
274 }
275 }
276 result
277 }
278 };
279
280 max_level = Some(match max_level {
281 None => sink_max,
282 Some(current) => std::cmp::max(current, sink_max),
283 });
284 }
285
286 max_level
287 }
288
289 fn send_event(&self, event: TraceEvent) {
290 if let Some(sender) = &self.sender {
291 if let Err(mpsc::TrySendError::Full(_)) = sender.try_send(event) {
292 let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
293
294 if dropped == 1 || dropped.is_multiple_of(1000) {
295 eprintln!(
296 "[telemetry]: {} events and log lines dropped que to full queue (capacity: {})",
297 dropped, QUEUE_CAPACITY
298 );
299 self.send_drop_event(dropped);
300 }
301 }
302 }
303 }
304
305 fn send_drop_event(&self, total_dropped: u64) {
306 if let Some(dropped_sender) = &self.dropped_sender {
307 let (thread_name, thread_id) = get_thread_info();
308
309 let mut fields = TraceFields::new();
310 fields.push((
311 "message",
312 FieldValue::Str(format!(
313 "Telemetry events and log lines dropped due to full queue (capacity: {}). Worker may be falling behind.",
314 QUEUE_CAPACITY
315 )),
316 ));
317 fields.push(("dropped_count", FieldValue::U64(total_dropped)));
318
319 let drop_event = TraceEvent::Event {
322 name: "dropped events",
323 target: module_path!(),
324 level: tracing::Level::ERROR,
325 fields,
326 timestamp: SystemTime::now(),
327 parent_span: None,
328 thread_id,
329 thread_name,
330 module_path: Some(module_path!()),
331 file: Some(file!()),
332 line: Some(line!()),
333 };
334
335 if dropped_sender.send(drop_event).is_err() {
336 eprintln!(
338 "[telemetry] CRITICAL: {} events and log lines dropped and unable to log to telemetry \
339 (worker thread may have died). Telemetry system offline.",
340 total_dropped
341 );
342 }
343 }
344 }
345}
346
347impl Drop for TraceEventDispatcher {
348 fn drop(&mut self) {
349 drop(self.sender.take());
353 drop(self.dropped_sender.take());
354 }
355}
356
357impl<S> Layer<S> for TraceEventDispatcher
358where
359 S: Subscriber + for<'a> LookupSpan<'a>,
360{
361 fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
362 let metadata = attrs.metadata();
363 let mut fields = TraceFields::new();
364
365 let mut visitor = FieldVisitor(&mut fields);
366 attrs.record(&mut visitor);
367
368 let parent_id = if let Some(parent) = attrs.parent() {
369 Some(parent.into_u64())
370 } else {
371 ctx.current_span().id().map(|id| id.into_u64())
372 };
373
374 let (thread_name, _) = get_thread_info();
375
376 let event = TraceEvent::NewSpan {
377 id: id.into_u64(),
378 name: metadata.name(),
379 target: metadata.target(),
380 level: *metadata.level(),
381 fields,
382 timestamp: SystemTime::now(),
383 parent_id,
384 thread_name,
385 file: metadata.file(),
386 line: metadata.line(),
387 };
388
389 self.send_event(event);
390 }
391
392 fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
393 let (thread_name, _) = get_thread_info();
394 let event = TraceEvent::SpanEnter {
395 id: id.into_u64(),
396 timestamp: SystemTime::now(),
397 thread_name,
398 };
399
400 self.send_event(event);
401 }
402
403 fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
404 let (thread_name, _) = get_thread_info();
405 let event = TraceEvent::SpanExit {
406 id: id.into_u64(),
407 timestamp: SystemTime::now(),
408 thread_name,
409 };
410
411 self.send_event(event);
412 }
413
414 fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
415 let metadata = event.metadata();
416 let mut fields = TraceFields::new();
417 let mut visitor = FieldVisitor(&mut fields);
418 event.record(&mut visitor);
419
420 let parent_span = ctx.event_span(event).map(|span| span.id().into_u64());
421
422 let (thread_name, thread_id) = get_thread_info();
423
424 let trace_event = TraceEvent::Event {
425 name: metadata.name(),
426 target: metadata.target(),
427 level: *metadata.level(),
428 fields,
429 timestamp: SystemTime::now(),
430 parent_span,
431 thread_id,
432 thread_name,
433 module_path: metadata.module_path(),
434 file: metadata.file(),
435 line: metadata.line(),
436 };
437
438 self.send_event(trace_event);
439 }
440
441 fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
442 let event = TraceEvent::SpanClose {
443 id: id.into_u64(),
444 timestamp: SystemTime::now(),
445 };
446
447 self.send_event(event);
448 }
449
450 fn max_level_hint(&self) -> Option<LevelFilter> {
451 self.max_level
452 }
453}
454
455struct FieldVisitor<'a>(&'a mut TraceFields);
456
457impl<'a> tracing::field::Visit for FieldVisitor<'a> {
458 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
459 self.0.push((field.name(), FieldValue::Bool(value)));
460 }
461
462 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
463 self.0.push((field.name(), FieldValue::I64(value)));
464 }
465
466 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
467 self.0.push((field.name(), FieldValue::U64(value)));
468 }
469
470 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
471 self.0.push((field.name(), FieldValue::F64(value)));
472 }
473
474 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
475 self.0
476 .push((field.name(), FieldValue::Str(value.to_string())));
477 }
478
479 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
480 self.0
481 .push((field.name(), FieldValue::Debug(format!("{:?}", value))));
482 }
483}
484
485fn worker_loop(
489 receiver: mpsc::Receiver<TraceEvent>,
490 dropped_receiver: mpsc::Receiver<TraceEvent>,
491 control_receiver: Option<mpsc::Receiver<DispatcherControl>>,
492 mut sinks: Vec<Box<dyn TraceEventSink>>,
493 dropped_events: Arc<AtomicU64>,
494) {
495 const FLUSH_INTERVAL: Duration = Duration::from_millis(100);
496 const FLUSH_EVENT_COUNT: usize = 1000;
497 let mut last_flush = std::time::Instant::now();
498 let mut events_since_flush = 0;
499
500 fn flush_sinks(sinks: &mut [Box<dyn TraceEventSink>]) {
501 for sink in sinks {
502 if let Err(e) = sink.flush() {
503 eprintln!("[telemetry] sink {} failed to flush: {}", sink.name(), e);
504 }
505 }
506 }
507
508 fn dispatch_to_sinks(sinks: &mut [Box<dyn TraceEventSink>], event: TraceEvent) {
509 for sink in sinks {
510 if match &event {
511 TraceEvent::NewSpan { target, level, .. }
512 | TraceEvent::Event { target, level, .. } => match sink.target_filter() {
513 Some(targets) => targets.would_enable(target, level),
514 None => true,
515 },
516 _ => true,
517 } {
518 if let Err(e) = sink.consume(&event) {
519 eprintln!(
520 "[telemetry] sink {} failed to consume event: {}",
521 sink.name(),
522 e
523 );
524 }
525 }
526 }
527 }
528
529 loop {
530 while let Ok(event) = dropped_receiver.try_recv() {
531 dispatch_to_sinks(&mut sinks, event);
532 events_since_flush += 1;
533 }
534
535 if let Some(ref ctrl_rx) = control_receiver {
537 while let Ok(control) = ctrl_rx.try_recv() {
538 match control {
539 DispatcherControl::AddSink(sink) => {
540 sinks.push(sink);
541 }
542 }
543 }
544 }
545
546 match receiver.recv_timeout(FLUSH_INTERVAL) {
547 Ok(event) => {
548 dispatch_to_sinks(&mut sinks, event);
549 events_since_flush += 1;
550
551 if events_since_flush >= FLUSH_EVENT_COUNT || last_flush.elapsed() >= FLUSH_INTERVAL
552 {
553 flush_sinks(&mut sinks);
554 last_flush = std::time::Instant::now();
555 events_since_flush = 0;
556 }
557 }
558 Err(mpsc::RecvTimeoutError::Timeout) => {
559 flush_sinks(&mut sinks);
560 last_flush = std::time::Instant::now();
561 events_since_flush = 0;
562 }
563 Err(mpsc::RecvTimeoutError::Disconnected) => {
564 break;
565 }
566 }
567 }
568
569 while let Ok(event) = dropped_receiver.try_recv() {
570 dispatch_to_sinks(&mut sinks, event);
571 }
572 while let Ok(event) = receiver.try_recv() {
573 dispatch_to_sinks(&mut sinks, event);
574 }
575
576 flush_sinks(&mut sinks);
577
578 let total_dropped = dropped_events.load(Ordering::Relaxed);
579 if total_dropped > 0 {
580 eprintln!(
581 "[telemetry] Telemetry worker shutting down. Total events dropped during session: {}",
582 total_dropped
583 );
584 }
585}
586
587impl Drop for WorkerHandle {
588 fn drop(&mut self) {
589 if let Some(handle) = self.join_handle.take() {
590 if let Err(e) = handle.join() {
591 eprintln!("[telemetry] worker thread panicked: {:?}", e);
592 }
593 }
594 }
595}