1use std::sync::Arc;
14use std::sync::atomic::AtomicU64;
15use std::sync::atomic::Ordering;
16use std::sync::mpsc;
17use std::thread::JoinHandle;
18use std::time::Duration;
19use std::time::SystemTime;
20
21use indexmap::IndexMap;
22use tracing::Id;
23use tracing::Subscriber;
24use tracing_subscriber::filter::Targets;
25use tracing_subscriber::layer::Context;
26use tracing_subscriber::layer::Layer;
27use tracing_subscriber::registry::LookupSpan;
28
29const QUEUE_CAPACITY: usize = 100_000;
30
31#[derive(Debug, Clone)]
35pub(crate) enum TraceEvent {
36 NewSpan {
38 id: u64,
39 name: &'static str,
40 target: &'static str,
41 level: tracing::Level,
42 fields: IndexMap<String, FieldValue>,
43 timestamp: SystemTime,
44 parent_id: Option<u64>,
45 thread_name: String,
46 file: Option<&'static str>,
47 line: Option<u32>,
48 },
49 SpanEnter { id: u64, timestamp: SystemTime },
51 SpanExit { id: u64, timestamp: SystemTime },
53 SpanClose { id: u64, timestamp: SystemTime },
55 Event {
57 name: &'static str,
58 target: &'static str,
59 level: tracing::Level,
60 fields: IndexMap<String, FieldValue>,
61 timestamp: SystemTime,
62 parent_span: Option<u64>,
63 thread_id: String,
64 thread_name: String,
65 module_path: Option<&'static str>,
66 file: Option<&'static str>,
67 line: Option<u32>,
68 },
69}
70
71#[derive(Debug, Clone)]
73pub(crate) enum FieldValue {
74 Bool(bool),
75 I64(i64),
76 U64(u64),
77 F64(f64),
78 Str(String),
79 Debug(String),
80}
81
82pub(crate) trait TraceEventSink: Send + 'static {
86 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error>;
88
89 fn target_filter(&self) -> Option<&Targets> {
109 None
110 }
111
112 fn flush(&mut self) -> Result<(), anyhow::Error>;
115
116 fn name(&self) -> &str {
118 std::any::type_name::<Self>()
119 }
120}
121
122pub struct TraceEventDispatcher {
125 sender: Option<mpsc::SyncSender<TraceEvent>>,
126 dropped_sender: Option<mpsc::Sender<TraceEvent>>,
128 _worker_handle: WorkerHandle,
129 max_level: Option<tracing::level_filters::LevelFilter>,
130 dropped_events: Arc<AtomicU64>,
131}
132
133struct WorkerHandle {
134 join_handle: Option<JoinHandle<()>>,
135}
136
137impl TraceEventDispatcher {
138 pub(crate) fn new(
148 sinks: Vec<Box<dyn TraceEventSink>>,
149 max_level: Option<tracing::level_filters::LevelFilter>,
150 ) -> Self {
151 let (sender, receiver) = mpsc::sync_channel(QUEUE_CAPACITY);
152 let (dropped_sender, dropped_receiver) = mpsc::channel();
153 let dropped_events = Arc::new(AtomicU64::new(0));
154 let dropped_events_worker = Arc::clone(&dropped_events);
155
156 let worker_handle = std::thread::Builder::new()
157 .name("telemetry-worker".into())
158 .spawn(move || {
159 worker_loop(receiver, dropped_receiver, sinks, dropped_events_worker);
160 })
161 .expect("failed to spawn telemetry worker thread");
162
163 Self {
164 sender: Some(sender),
165 dropped_sender: Some(dropped_sender),
166 _worker_handle: WorkerHandle {
167 join_handle: Some(worker_handle),
168 },
169 max_level,
170 dropped_events,
171 }
172 }
173
174 fn send_event(&self, event: TraceEvent) {
175 if let Some(sender) = &self.sender {
176 if let Err(mpsc::TrySendError::Full(_)) = sender.try_send(event) {
177 let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
178
179 if dropped == 1 || dropped.is_multiple_of(1000) {
180 eprintln!(
181 "[telemetry]: {} events and log lines dropped que to full queue (capacity: {})",
182 dropped, QUEUE_CAPACITY
183 );
184 self.send_drop_event(dropped);
185 }
186 }
187 }
188 }
189
190 fn send_drop_event(&self, total_dropped: u64) {
191 if let Some(dropped_sender) = &self.dropped_sender {
192 #[cfg(target_os = "linux")]
193 let thread_id_num = {
194 unsafe { libc::syscall(libc::SYS_gettid) as u64 }
196 };
197 #[cfg(not(target_os = "linux"))]
198 let thread_id_num = {
199 let tid = std::thread::current().id();
200 unsafe { std::mem::transmute::<std::thread::ThreadId, u64>(tid) }
202 };
203
204 let mut fields = IndexMap::new();
205 fields.insert(
206 "message".to_string(),
207 FieldValue::Str(format!(
208 "Telemetry events and log lines dropped due to full queue (capacity: {}). Worker may be falling behind.",
209 QUEUE_CAPACITY
210 )),
211 );
212 fields.insert("dropped_count".to_string(), FieldValue::U64(total_dropped));
213
214 let drop_event = TraceEvent::Event {
217 name: "dropped events",
218 target: module_path!(),
219 level: tracing::Level::ERROR,
220 fields,
221 timestamp: SystemTime::now(),
222 parent_span: None,
223 thread_id: thread_id_num.to_string(),
224 thread_name: std::thread::current()
225 .name()
226 .unwrap_or_default()
227 .to_string(),
228 module_path: Some(module_path!()),
229 file: Some(file!()),
230 line: Some(line!()),
231 };
232
233 if dropped_sender.send(drop_event).is_err() {
234 eprintln!(
236 "[telemetry] CRITICAL: {} events and log lines dropped and unable to log to telemetry \
237 (worker thread may have died). Telemetry system offline.",
238 total_dropped
239 );
240 }
241 }
242 }
243}
244
245impl Drop for TraceEventDispatcher {
246 fn drop(&mut self) {
247 drop(self.sender.take());
251 drop(self.dropped_sender.take());
252 }
253}
254
255impl<S> Layer<S> for TraceEventDispatcher
256where
257 S: Subscriber + for<'a> LookupSpan<'a>,
258{
259 fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
260 let metadata = attrs.metadata();
261 let mut fields = IndexMap::new();
262
263 let mut visitor = FieldVisitor(&mut fields);
264 attrs.record(&mut visitor);
265
266 let parent_id = if let Some(parent) = attrs.parent() {
267 Some(parent.into_u64())
268 } else {
269 ctx.current_span().id().map(|id| id.into_u64())
270 };
271
272 let thread_name = std::thread::current()
273 .name()
274 .unwrap_or_default()
275 .to_string();
276
277 let event = TraceEvent::NewSpan {
278 id: id.into_u64(),
279 name: metadata.name(),
280 target: metadata.target(),
281 level: *metadata.level(),
282 fields,
283 timestamp: SystemTime::now(),
284 parent_id,
285 thread_name,
286 file: metadata.file(),
287 line: metadata.line(),
288 };
289
290 self.send_event(event);
291 }
292
293 fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
294 let event = TraceEvent::SpanEnter {
295 id: id.into_u64(),
296 timestamp: SystemTime::now(),
297 };
298
299 self.send_event(event);
300 }
301
302 fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
303 let event = TraceEvent::SpanExit {
304 id: id.into_u64(),
305 timestamp: SystemTime::now(),
306 };
307
308 self.send_event(event);
309 }
310
311 fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
312 let metadata = event.metadata();
313 let mut fields = IndexMap::new();
314 let mut visitor = FieldVisitor(&mut fields);
315 event.record(&mut visitor);
316
317 let parent_span = ctx.event_span(event).map(|span| span.id().into_u64());
318
319 #[cfg(target_os = "linux")]
320 let thread_id_num = {
321 unsafe { libc::syscall(libc::SYS_gettid) as u64 }
325 };
326 #[cfg(not(target_os = "linux"))]
327 let thread_id_num = {
328 let tid = std::thread::current().id();
329 unsafe { std::mem::transmute::<std::thread::ThreadId, u64>(tid) }
335 };
336 let thread_id_str = thread_id_num.to_string();
337
338 let thread_name = std::thread::current()
339 .name()
340 .unwrap_or_default()
341 .to_string();
342
343 let trace_event = TraceEvent::Event {
344 name: metadata.name(),
345 target: metadata.target(),
346 level: *metadata.level(),
347 fields,
348 timestamp: SystemTime::now(),
349 parent_span,
350 thread_id: thread_id_str,
351 thread_name,
352 module_path: metadata.module_path(),
353 file: metadata.file(),
354 line: metadata.line(),
355 };
356
357 self.send_event(trace_event);
358 }
359
360 fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
361 let event = TraceEvent::SpanClose {
362 id: id.into_u64(),
363 timestamp: SystemTime::now(),
364 };
365
366 self.send_event(event);
367 }
368
369 fn max_level_hint(&self) -> Option<tracing::level_filters::LevelFilter> {
370 self.max_level
371 }
372}
373
374struct FieldVisitor<'a>(&'a mut IndexMap<String, FieldValue>);
375
376impl<'a> tracing::field::Visit for FieldVisitor<'a> {
377 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
378 self.0
379 .insert(field.name().to_string(), FieldValue::Bool(value));
380 }
381
382 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
383 self.0
384 .insert(field.name().to_string(), FieldValue::I64(value));
385 }
386
387 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
388 self.0
389 .insert(field.name().to_string(), FieldValue::U64(value));
390 }
391
392 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
393 self.0
394 .insert(field.name().to_string(), FieldValue::F64(value));
395 }
396
397 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
398 self.0
399 .insert(field.name().to_string(), FieldValue::Str(value.to_string()));
400 }
401
402 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
403 self.0.insert(
404 field.name().to_string(),
405 FieldValue::Debug(format!("{:?}", value)),
406 );
407 }
408}
409
410fn worker_loop(
414 receiver: mpsc::Receiver<TraceEvent>,
415 dropped_receiver: mpsc::Receiver<TraceEvent>,
416 mut sinks: Vec<Box<dyn TraceEventSink>>,
417 dropped_events: Arc<AtomicU64>,
418) {
419 const FLUSH_INTERVAL: Duration = Duration::from_millis(100);
420 const FLUSH_EVENT_COUNT: usize = 1000;
421 let mut last_flush = std::time::Instant::now();
422 let mut events_since_flush = 0;
423
424 fn flush_sinks(sinks: &mut [Box<dyn TraceEventSink>]) {
425 for sink in sinks {
426 if let Err(e) = sink.flush() {
427 eprintln!("[telemetry] sink {} failed to flush: {}", sink.name(), e);
428 }
429 }
430 }
431
432 fn dispatch_to_sinks(sinks: &mut [Box<dyn TraceEventSink>], event: TraceEvent) {
433 for sink in sinks {
434 if match &event {
435 TraceEvent::NewSpan { target, level, .. }
436 | TraceEvent::Event { target, level, .. } => match sink.target_filter() {
437 Some(targets) => targets.would_enable(target, level),
438 None => true,
439 },
440 _ => true,
441 } {
442 if let Err(e) = sink.consume(&event) {
443 eprintln!(
444 "[telemetry] sink {} failed to consume event: {}",
445 sink.name(),
446 e
447 );
448 }
449 }
450 }
451 }
452
453 loop {
454 while let Ok(event) = dropped_receiver.try_recv() {
455 dispatch_to_sinks(&mut sinks, event);
456 events_since_flush += 1;
457 }
458
459 match receiver.recv_timeout(FLUSH_INTERVAL) {
460 Ok(event) => {
461 dispatch_to_sinks(&mut sinks, event);
462 events_since_flush += 1;
463
464 if events_since_flush >= FLUSH_EVENT_COUNT || last_flush.elapsed() >= FLUSH_INTERVAL
465 {
466 flush_sinks(&mut sinks);
467 last_flush = std::time::Instant::now();
468 events_since_flush = 0;
469 }
470 }
471 Err(mpsc::RecvTimeoutError::Timeout) => {
472 flush_sinks(&mut sinks);
473 last_flush = std::time::Instant::now();
474 events_since_flush = 0;
475 }
476 Err(mpsc::RecvTimeoutError::Disconnected) => {
477 break;
478 }
479 }
480 }
481
482 while let Ok(event) = dropped_receiver.try_recv() {
483 dispatch_to_sinks(&mut sinks, event);
484 }
485 while let Ok(event) = receiver.try_recv() {
486 dispatch_to_sinks(&mut sinks, event);
487 }
488
489 flush_sinks(&mut sinks);
490
491 let total_dropped = dropped_events.load(Ordering::Relaxed);
492 if total_dropped > 0 {
493 eprintln!(
494 "[telemetry] Telemetry worker shutting down. Total events dropped during session: {}",
495 total_dropped
496 );
497 }
498}
499
500impl Drop for WorkerHandle {
501 fn drop(&mut self) {
502 if let Some(handle) = self.join_handle.take() {
503 if let Err(e) = handle.join() {
504 eprintln!("[telemetry] worker thread panicked: {:?}", e);
505 }
506 }
507 }
508}