1use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::fmt::Display;
13use std::mem::take;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::atomic::AtomicUsize;
17use std::sync::atomic::Ordering;
18use std::time::SystemTime;
19
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde::Serialize;
23use tracing::Level;
24use tracing::Metadata;
25use tracing::Span;
26use tracing::Subscriber;
27use tracing::field::Field;
28use tracing::field::Visit;
29use tracing::span;
30use tracing::span::Attributes;
31use tracing::span::Id;
32use tracing_subscriber::Layer;
33use tracing_subscriber::layer::Context;
34use tracing_subscriber::registry::LookupSpan;
35use tracing_subscriber::registry::Scope;
36
37use crate::SPAN_FIELD_RECORDING;
38use crate::pool::Pool;
39use crate::spool::Spool;
40
41#[derive(Debug)]
44struct Key(
45 Box<u8>,
49);
50
51impl Key {
52 fn new() -> Self {
54 Self(Box::new(0u8))
55 }
56
57 fn as_key_ref(&self) -> KeyRef {
62 KeyRef(&*self.0 as *const _)
63 }
64}
65
66#[derive(Debug, PartialEq, Eq, Hash, Clone)]
69struct KeyRef(*const u8);
70
71unsafe impl Send for KeyRef {}
74unsafe impl Sync for KeyRef {}
77
78impl Copy for KeyRef {}
79
80impl From<Key> for KeyRef {
81 fn from(value: Key) -> Self {
82 Self(&*value.0 as *const _)
83 }
84}
85
86impl From<u64> for KeyRef {
87 fn from(value: u64) -> Self {
88 Self(value as *const _)
89 }
90}
91
92impl From<KeyRef> for u64 {
93 fn from(value: KeyRef) -> Self {
94 value.0 as u64
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
103pub enum Value {
104 String(String),
106 I64(i64),
108 U64(u64),
110 F64(f64),
112 Bool(bool),
114}
115
116impl Value {
117 pub fn to_json(&self) -> serde_json::Value {
119 match &self {
120 Self::String(s) => serde_json::Value::String(s.clone()),
121 Self::I64(i) => serde_json::Value::Number(serde_json::Number::from(*i)),
122 Self::U64(u) => serde_json::Value::Number(serde_json::Number::from(*u)),
123 Self::F64(f) => serde_json::Value::Number(serde_json::Number::from_f64(*f).unwrap()),
124 Self::Bool(b) => serde_json::Value::Bool(*b),
125 }
126 }
127}
128
129impl Default for Value {
130 fn default() -> Self {
131 Value::Bool(false)
132 }
133}
134
135#[derive(Debug, Default, Clone)]
136struct Entry {
137 name: &'static str,
138 value: Value,
139 buffer: Option<String>,
140}
141
142impl Entry {
143 fn reset(&mut self) {
144 if let Value::String(mut s) = take(&mut self.value) {
145 s.clear();
146 self.buffer = Some(s);
147 }
148 }
149
150 fn set_str(&mut self, name: &'static str, value: &str) {
151 self.reset();
152 let mut buf = self.buffer.take().unwrap_or_else(String::new);
153 buf.clear();
154 buf.push_str(value);
155 self.name = name;
156 self.value = Value::String(buf);
157 }
158
159 fn set_error(&mut self, name: &'static str, value: &(dyn std::error::Error + 'static)) {
160 self.reset();
161 let mut buf = self.buffer.take().unwrap_or_else(String::new);
162
163 let mut formatter =
164 core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
165
166 Display::fmt(value, &mut formatter)
167 .expect("a Display implementation returned an error unexpectedly");
168
169 self.name = name;
170 self.value = Value::String(buf);
171 }
172
173 fn set_debug(&mut self, name: &'static str, value: &(dyn std::fmt::Debug)) {
174 self.reset();
175 let mut buf = self.buffer.take().unwrap_or_else(String::new);
176
177 let mut formatter =
178 core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
179
180 Debug::fmt(value, &mut formatter)
181 .expect("a Debug implementation returned an error unexpectedly");
182
183 self.name = name;
184 self.value = Value::String(buf);
185 }
186
187 fn set_i64(&mut self, name: &'static str, value: i64) {
188 self.reset();
189 self.name = name;
190 self.value = Value::I64(value);
191 }
192
193 fn set_u64(&mut self, name: &'static str, value: u64) {
194 self.reset();
195 self.name = name;
196 self.value = Value::U64(value);
197 }
198
199 fn set_f64(&mut self, name: &'static str, value: f64) {
200 self.reset();
201 self.name = name;
202 self.value = Value::F64(value);
203 }
204
205 fn set_bool(&mut self, name: &'static str, value: bool) {
206 self.reset();
207 self.name = name;
208 self.value = Value::Bool(value);
209 }
210}
211
212#[derive(Debug, Clone)]
214pub struct Event {
215 pub time: SystemTime,
217
218 pub metadata: &'static Metadata<'static>,
220
221 fields: Vec<Entry>,
223
224 num_fields: usize,
226
227 pub seq: usize,
229}
230
231impl Event {
232 fn reset(&mut self, time: SystemTime, metadata: &'static Metadata<'static>, seq: usize) {
233 self.time = time;
234 self.metadata = metadata;
235 self.num_fields = 0;
236 self.seq = seq;
237 }
238
239 fn next_field(&mut self) -> &mut Entry {
240 while self.fields.len() <= self.num_fields {
241 self.fields.push(Default::default());
242 }
243 let field = self.fields.get_mut(self.num_fields).unwrap();
244 field.reset();
245 self.num_fields += 1;
246 field
247 }
248
249 pub fn json_value(&self) -> serde_json::Value {
251 serde_json::Value::Object(self.json_fields())
252 }
253
254 pub fn json_fields(&self) -> serde_json::Map<String, serde_json::Value> {
256 let mut map = serde_json::Map::new();
257 for field in &self.fields {
258 map.insert(field.name.to_string(), field.value.to_json());
259 }
260 map
261 }
262
263 pub fn fields(&self) -> Vec<(String, Value)> {
265 self.fields
266 .iter()
267 .map(|field| (field.name.to_string(), field.value.clone()))
268 .collect()
269 }
270}
271
272impl Default for Event {
273 fn default() -> Self {
274 static __CALLSITE: tracing::__macro_support::MacroCallsite = tracing::callsite2! {
275 name: "",
276 kind: tracing::metadata::Kind::SPAN,
277 level: Level::DEBUG,
278 fields:
279 };
280 static DEFAULT_METADATA: Metadata<'static> = tracing::metadata! {
281 name: "",
282 target: "",
283 level: Level::DEBUG,
284 fields: &[],
285 callsite: &__CALLSITE,
286 kind: tracing::metadata::Kind::SPAN,
287 };
288 Self {
289 time: SystemTime::now(),
290 metadata: &DEFAULT_METADATA,
291 fields: Vec::new(),
292 num_fields: 0,
293 seq: 0,
294 }
295 }
296}
297
298impl Visit for Event {
300 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
301 self.next_field().set_i64(field.name(), value);
302 }
303
304 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
305 self.next_field().set_u64(field.name(), value);
306 }
307
308 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
309 self.next_field().set_f64(field.name(), value);
310 }
311
312 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
313 self.next_field().set_bool(field.name(), value);
314 }
315
316 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
317 self.next_field().set_str(field.name(), value);
318 }
319
320 fn record_error(
321 &mut self,
322 field: &tracing::field::Field,
323 value: &(dyn std::error::Error + 'static),
324 ) {
325 self.next_field().set_error(field.name(), value);
326 }
327
328 fn record_debug(&mut self, field: &tracing::field::Field, value: &(dyn std::fmt::Debug)) {
329 self.next_field().set_debug(field.name(), value);
330 }
331}
332
333#[derive(Debug)]
335pub struct Recording {
336 state: Arc<RecordingState>,
337 recorder_state: Arc<RecorderState>,
338}
339
340impl Recording {
341 fn new(cap: usize, recorder_state: Arc<RecorderState>) -> Self {
342 assert!(cap > 1, "capacity must be > 1");
343
344 let state = Arc::new(RecordingState {
345 key: Key::new(),
346 active: Mutex::new(HashMap::new()),
347 spool: Spool::new(cap),
348 seq: AtomicUsize::new(0),
349 });
350 assert!(
351 recorder_state
352 .recordings
353 .insert(state.key.as_key_ref(), Arc::clone(&state))
354 .is_none(),
355 "non-unique key"
356 );
357
358 Self {
359 state,
360 recorder_state,
361 }
362 }
363
364 pub fn span(&self) -> Span {
366 span!(
367 Level::INFO,
368 SPAN_FIELD_RECORDING,
369 recording = self.recording_key(),
370 recorder = self.recorder_key(),
371 )
372 }
373
374 pub fn tail(&self) -> Vec<Event> {
377 self.state.spool.tail()
378 }
379
380 pub fn stacks(&self) -> Vec<Vec<&'static Metadata<'static>>> {
383 let snapshot = self.state.active.lock().unwrap().clone();
384
385 let parents: HashSet<Id> = snapshot
386 .iter()
387 .filter_map(|(_, (_, parent))| parent.clone())
388 .collect();
389
390 snapshot
391 .iter()
392 .filter_map(|(id, _)| {
393 if parents.contains(id) {
394 None
395 } else {
396 Some(id.clone())
397 }
398 })
399 .map(|id| {
400 let mut stack = Vec::new();
401 let mut parent: Option<Id> = Some(id);
402 while let Some(id) = parent {
403 let Some((meta, next_parent)) = snapshot.get(&id) else {
404 break;
405 };
406 stack.push(*meta);
407 parent = next_parent.clone();
408 }
409 stack
410 })
411 .collect()
412 }
413
414 fn recording_key_ref(&self) -> KeyRef {
415 self.state.key.as_key_ref()
416 }
417
418 fn recording_key(&self) -> u64 {
419 self.recording_key_ref().into()
420 }
421
422 fn recorder_key(&self) -> u64 {
423 self.recorder_state.key.as_key_ref().into()
424 }
425}
426
427impl Drop for Recording {
428 fn drop(&mut self) {
429 assert!(
430 self.recorder_state
431 .recordings
432 .remove(&self.state.key.as_key_ref())
433 .is_some(),
434 "missing recording"
435 );
436 }
437}
438
439#[derive(Debug)]
440struct RecordingState {
441 active: Mutex<HashMap<Id, (&'static Metadata<'static>, Option<Id>)>>,
442 key: Key,
443 seq: AtomicUsize,
444 spool: Spool<Event>,
445}
446
447pub struct Recorder {
452 state: Arc<RecorderState>,
453}
454
455#[derive(Debug)]
456struct RecorderState {
457 key: Key,
458 recordings: Arc<DashMap<KeyRef, Arc<RecordingState>>>,
459 pool: Pool<Event>,
460}
461
462impl Recorder {
463 pub fn new() -> Self {
465 let state = Arc::new(RecorderState {
466 key: Key::new(),
467 recordings: Arc::new(DashMap::new()),
468 pool: Pool::new(1024),
469 });
470 Self { state }
471 }
472
473 pub fn record(&self, cap: usize) -> Recording {
476 Recording::new(cap, Arc::clone(&self.state))
477 }
478
479 pub fn layer(&self) -> RecorderLayer {
483 RecorderLayer {
484 state: Arc::clone(&self.state),
485 }
486 }
487}
488
489pub struct RecorderLayer {
491 state: Arc<RecorderState>,
492}
493
494impl RecorderLayer {
495 fn iter_recordings<'a, S>(
497 &'a self,
498 scope: Scope<'a, S>,
499 ) -> impl Iterator<Item = dashmap::mapref::one::Ref<'a, KeyRef, Arc<RecordingState>>> + 'a
500 where
501 S: Subscriber + for<'lookup> LookupSpan<'lookup>,
502 {
503 scope
506 .from_root()
507 .filter_map(|span| match span.extensions().get::<(KeyRef, KeyRef)>() {
508 Some((recording_key_ref, recorder_key_ref))
509 if recorder_key_ref == &self.state.key.as_key_ref() =>
510 {
511 Some(*recording_key_ref)
512 }
513 _ => None,
514 })
515 .filter_map(|key_ref| self.state.recordings.get(&key_ref))
517 }
518}
519
520impl<S: Subscriber> Layer<S> for RecorderLayer
521where
522 S: for<'span> LookupSpan<'span>,
523{
524 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
525 let mut visitor = RecordingKeysVisitor::new();
526
527 attrs.record(&mut visitor);
528
529 if let Some(keys) = visitor.keys() {
530 if let Some(span) = ctx.span(id) {
531 let mut extensions: tracing_subscriber::registry::ExtensionsMut<'_> =
532 span.extensions_mut();
533 extensions.insert(keys);
534 }
535 }
536 }
537
538 fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
539 let Some(scope) = ctx.event_scope(event) else {
540 return;
541 };
542 for state in self.iter_recordings(scope) {
543 let mut recorded = self.state.pool.get();
544 let seq = state.seq.fetch_add(1, Ordering::Relaxed);
545 recorded.reset(SystemTime::now(), event.metadata(), seq);
546 event.record(&mut recorded);
547 state.spool.push(recorded);
548 }
549 }
550
551 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
552 let Some(span) = ctx.span(id) else { return };
553
554 for state in self.iter_recordings(span.scope()) {
555 let mut active = state.active.lock().unwrap();
556 active.insert(id.clone(), (span.metadata(), span.parent().map(|o| o.id())));
557 }
558 }
559
560 fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
561 let Some(span) = ctx.span(id) else { return };
562
563 for state in self.iter_recordings(span.scope()) {
564 let mut active = state.active.lock().unwrap();
565 active.remove(id);
566 }
567 }
568}
569
570struct RecordingKeysVisitor {
572 recording_key: Option<KeyRef>,
573 recorder_key: Option<KeyRef>,
574}
575
576impl RecordingKeysVisitor {
577 fn new() -> Self {
578 Self {
579 recording_key: None,
580 recorder_key: None,
581 }
582 }
583
584 fn keys(&self) -> Option<(KeyRef, KeyRef)> {
585 match (self.recording_key, self.recorder_key) {
586 (Some(recording_key), Some(recorder_key)) => Some((recording_key, recorder_key)),
587 _ => None,
588 }
589 }
590}
591
592impl Visit for RecordingKeysVisitor {
593 fn record_u64(&mut self, field: &Field, value: u64) {
594 if field.name() == "recording" {
595 self.recording_key = Some(value.into());
596 } else if field.name() == "recorder" {
597 self.recorder_key = Some(value.into());
598 }
599 }
600
601 fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
602}
603
604#[cfg(test)]
605mod tests {
606 use serde_json::json;
607 use tracing::Level;
608 use tracing::info;
609 use tracing::span;
610 use tracing_subscriber::Registry;
611 use tracing_subscriber::prelude::*;
612
613 use super::*;
614
615 #[test]
616 fn test_key() {
617 let key = Key::new();
618 assert_eq!(key.as_key_ref(), key.as_key_ref());
619
620 assert_ne!(Key::new().as_key_ref(), Key::new().as_key_ref());
621 }
622
623 #[test]
624 fn test_events_are_recorded() {
625 let recorder = Recorder::new();
626 let recording = recorder.record(10);
627 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
628 let span = recording.span();
629 let _guard = span.enter();
630 info!("This event should be recorded");
631 info!("another event");
632 });
633
634 let events = recording.tail();
635 assert_eq!(events.len(), 2);
636 assert_eq!(
637 events[0].json_value(),
638 json!({
639 "message": "This event should be recorded"
640 })
641 );
642 assert_eq!(
643 events[1].json_value(),
644 json!({
645 "message": "another event"
646 })
647 );
648 }
649
650 #[test]
651 fn test_last_n_entries() {
652 let recorder = Recorder::new();
653 let recording = recorder.record(5);
654 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
655 let span = recording.span();
656 let _guard = span.enter();
657 for i in 0..10 {
658 info!("event {}", i);
659 }
660 });
661
662 let events = recording.tail();
663 assert_eq!(events.len(), 5);
664 for (i, event) in events.into_iter().enumerate() {
665 assert_eq!(
666 event.json_value(),
667 json!({
668 "message": format!("event {}", i + 5)
669 })
670 );
671 assert_eq!(event.seq, i + 5);
672 }
673 }
674
675 #[test]
676 fn test_events_outside_span_are_not_recorded() {
677 let recorder = Recorder::new();
678 let recording = recorder.record(10);
679 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
680 let _span = recording.span(); info!("This event should NOT be recorded");
682 });
683 assert_eq!(recording.tail().len(), 0);
684 }
685
686 #[test]
687 fn test_child_span_inherits_recorder() {
688 let recorder = Recorder::new();
689 let recording = recorder.record(10);
690
691 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
692 let outer = recording.span();
693 let _outer_guard = outer.enter();
694
695 {
696 let inner = span!(Level::INFO, "child_span");
697 let _inner_guard = inner.enter();
698
699 info!("Event from inner span");
700 }
701 });
702
703 let events = recording.tail();
704 assert_eq!(events.len(), 1);
705 assert_eq!(
706 events[0].json_value(),
707 json!({
708 "message": "Event from inner span"
709 })
710 );
711 }
712
713 #[test]
714 fn test_active_spans() {
715 let recorder = Recorder::new();
716 let recording = recorder.record(10);
717
718 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
719 let outer = recording.span();
720 let _outer_guard = outer.enter();
721
722 {
723 let inner = span!(Level::INFO, "child_span");
724 let _inner_guard = inner.enter();
725
726 assert_eq!(recording.stacks().len(), 1);
727 assert_eq!(recording.stacks()[0][0].name(), "child_span");
729 assert_eq!(recording.stacks()[0][1].name(), "recording");
730 }
731 });
732 }
733}