1use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::fmt::Display;
13use std::mem::take;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::atomic::AtomicUsize;
17use std::sync::atomic::Ordering;
18use std::time::SystemTime;
19
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde::Serialize;
23use tracing::Level;
24use tracing::Metadata;
25use tracing::Span;
26use tracing::Subscriber;
27use tracing::field::Field;
28use tracing::field::Visit;
29use tracing::span;
30use tracing::span::Attributes;
31use tracing::span::Id;
32use tracing_subscriber::Layer;
33use tracing_subscriber::layer::Context;
34use tracing_subscriber::registry::LookupSpan;
35use tracing_subscriber::registry::Scope;
36
37use crate::SPAN_FIELD_RECORDING;
38use crate::pool::Pool;
39use crate::spool::Spool;
40
41#[derive(Debug)]
44struct Key(
45 Box<u8>,
49);
50
51impl Key {
52 fn new() -> Self {
54 Self(Box::new(0u8))
55 }
56
57 fn as_key_ref(&self) -> KeyRef {
62 KeyRef(&*self.0 as *const _)
63 }
64}
65
66#[derive(Debug, PartialEq, Eq, Hash, Clone)]
69struct KeyRef(*const u8);
70
71unsafe impl Send for KeyRef {}
74unsafe impl Sync for KeyRef {}
77
78impl Copy for KeyRef {}
79
80impl From<Key> for KeyRef {
81 fn from(value: Key) -> Self {
82 Self(&*value.0 as *const _)
83 }
84}
85
86impl From<u64> for KeyRef {
87 fn from(value: u64) -> Self {
88 Self(value as *const _)
89 }
90}
91
92impl From<KeyRef> for u64 {
93 fn from(value: KeyRef) -> Self {
94 value.0 as u64
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
103pub enum Value {
104 String(String),
106 I64(i64),
108 U64(u64),
110 F64(f64),
112 Bool(bool),
114}
115
116impl Value {
117 pub fn to_json(&self) -> serde_json::Value {
119 match &self {
120 Self::String(s) => serde_json::Value::String(s.clone()),
121 Self::I64(i) => serde_json::Value::Number(serde_json::Number::from(*i)),
122 Self::U64(u) => serde_json::Value::Number(serde_json::Number::from(*u)),
123 Self::F64(f) => serde_json::Value::Number(serde_json::Number::from_f64(*f).unwrap()),
124 Self::Bool(b) => serde_json::Value::Bool(*b),
125 }
126 }
127}
128
129impl Default for Value {
130 fn default() -> Self {
131 Value::Bool(false)
132 }
133}
134
135#[derive(Debug, Default, Clone)]
136struct Entry {
137 name: &'static str,
138 value: Value,
139 buffer: Option<String>,
140}
141
142impl Entry {
143 fn reset(&mut self) {
144 if let Value::String(mut s) = take(&mut self.value) {
145 s.clear();
146 self.buffer = Some(s);
147 }
148 }
149
150 fn set_str(&mut self, name: &'static str, value: &str) {
151 self.reset();
152 let mut buf = self.buffer.take().unwrap_or_default();
153 buf.clear();
154 buf.push_str(value);
155 self.name = name;
156 self.value = Value::String(buf);
157 }
158
159 fn set_error(&mut self, name: &'static str, value: &(dyn std::error::Error + 'static)) {
160 self.reset();
161 let mut buf = self.buffer.take().unwrap_or_default();
162
163 let mut formatter =
164 core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
165
166 Display::fmt(value, &mut formatter)
167 .expect("a Display implementation returned an error unexpectedly");
168
169 self.name = name;
170 self.value = Value::String(buf);
171 }
172
173 fn set_debug(&mut self, name: &'static str, value: &dyn std::fmt::Debug) {
174 self.reset();
175 let mut buf = self.buffer.take().unwrap_or_default();
176
177 let mut formatter =
178 core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
179
180 Debug::fmt(value, &mut formatter)
181 .expect("a Debug implementation returned an error unexpectedly");
182
183 self.name = name;
184 self.value = Value::String(buf);
185 }
186
187 fn set_i64(&mut self, name: &'static str, value: i64) {
188 self.reset();
189 self.name = name;
190 self.value = Value::I64(value);
191 }
192
193 fn set_u64(&mut self, name: &'static str, value: u64) {
194 self.reset();
195 self.name = name;
196 self.value = Value::U64(value);
197 }
198
199 fn set_f64(&mut self, name: &'static str, value: f64) {
200 self.reset();
201 self.name = name;
202 self.value = Value::F64(value);
203 }
204
205 fn set_bool(&mut self, name: &'static str, value: bool) {
206 self.reset();
207 self.name = name;
208 self.value = Value::Bool(value);
209 }
210}
211
212#[derive(Debug, Clone)]
214pub struct Event {
215 pub time: SystemTime,
217
218 pub metadata: &'static Metadata<'static>,
220
221 fields: Vec<Entry>,
223
224 num_fields: usize,
226
227 pub seq: usize,
229}
230
231impl Event {
232 fn reset(&mut self, time: SystemTime, metadata: &'static Metadata<'static>, seq: usize) {
233 self.time = time;
234 self.metadata = metadata;
235 self.num_fields = 0;
236 self.seq = seq;
237 }
238
239 fn next_field(&mut self) -> &mut Entry {
240 while self.fields.len() <= self.num_fields {
241 self.fields.push(Default::default());
242 }
243 let field = self.fields.get_mut(self.num_fields).unwrap();
244 field.reset();
245 self.num_fields += 1;
246 field
247 }
248
249 pub fn json_value(&self) -> serde_json::Value {
251 serde_json::Value::Object(self.json_fields())
252 }
253
254 pub fn json_fields(&self) -> serde_json::Map<String, serde_json::Value> {
256 let mut map = serde_json::Map::new();
257 for field in &self.fields {
258 map.insert(field.name.to_string(), field.value.to_json());
259 }
260 map
261 }
262
263 pub fn fields(&self) -> Vec<(String, Value)> {
265 self.fields
266 .iter()
267 .map(|field| (field.name.to_string(), field.value.clone()))
268 .collect()
269 }
270}
271
272impl Default for Event {
273 fn default() -> Self {
274 static __CALLSITE: tracing::__macro_support::MacroCallsite = tracing::callsite2! {
275 name: "",
276 kind: tracing::metadata::Kind::SPAN,
277 level: Level::DEBUG,
278 fields:
279 };
280 static DEFAULT_METADATA: Metadata<'static> = tracing::metadata! {
281 name: "",
282 target: "",
283 level: Level::DEBUG,
284 fields: &[],
285 callsite: &__CALLSITE,
286 kind: tracing::metadata::Kind::SPAN,
287 };
288 Self {
289 time: SystemTime::now(),
290 metadata: &DEFAULT_METADATA,
291 fields: Vec::new(),
292 num_fields: 0,
293 seq: 0,
294 }
295 }
296}
297
298impl Visit for Event {
300 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
301 self.next_field().set_i64(field.name(), value);
302 }
303
304 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
305 self.next_field().set_u64(field.name(), value);
306 }
307
308 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
309 self.next_field().set_f64(field.name(), value);
310 }
311
312 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
313 self.next_field().set_bool(field.name(), value);
314 }
315
316 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
317 self.next_field().set_str(field.name(), value);
318 }
319
320 fn record_error(
321 &mut self,
322 field: &tracing::field::Field,
323 value: &(dyn std::error::Error + 'static),
324 ) {
325 self.next_field().set_error(field.name(), value);
326 }
327
328 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
329 self.next_field().set_debug(field.name(), value);
330 }
331}
332
333#[derive(Debug)]
335pub struct Recording {
336 state: Arc<RecordingState>,
337 recorder_state: Arc<RecorderState>,
338}
339
340impl Recording {
341 fn new(cap: usize, recorder_state: Arc<RecorderState>) -> Self {
342 assert!(cap > 1, "capacity must be > 1");
343
344 let state = Arc::new(RecordingState {
345 key: Key::new(),
346 active: Mutex::new(HashMap::new()),
347 spool: Spool::new(cap),
348 seq: AtomicUsize::new(0),
349 });
350 assert!(
351 recorder_state
352 .recordings
353 .insert(state.key.as_key_ref(), Arc::clone(&state))
354 .is_none(),
355 "non-unique key"
356 );
357
358 Self {
359 state,
360 recorder_state,
361 }
362 }
363
364 pub fn span(&self, subject: &str) -> Span {
375 span!(
376 parent: None,
377 Level::INFO,
378 SPAN_FIELD_RECORDING,
379 recording = self.recording_key(),
380 recorder = self.recorder_key(),
381 subject = subject,
382 )
383 }
384
385 pub fn tail(&self) -> Vec<Event> {
388 self.state.spool.tail()
389 }
390
391 pub fn stacks(&self) -> Vec<Vec<&'static Metadata<'static>>> {
394 let snapshot = self.state.active.lock().unwrap().clone();
395
396 let parents: HashSet<Id> = snapshot
397 .iter()
398 .filter_map(|(_, (_, parent))| parent.clone())
399 .collect();
400
401 snapshot
402 .keys()
403 .filter_map(|id| {
404 if parents.contains(id) {
405 None
406 } else {
407 Some(id.clone())
408 }
409 })
410 .map(|id| {
411 let mut stack = Vec::new();
412 let mut parent: Option<Id> = Some(id);
413 while let Some(id) = parent {
414 let Some((meta, next_parent)) = snapshot.get(&id) else {
415 break;
416 };
417 stack.push(*meta);
418 parent = next_parent.clone();
419 }
420 stack
421 })
422 .collect()
423 }
424
425 fn recording_key_ref(&self) -> KeyRef {
426 self.state.key.as_key_ref()
427 }
428
429 fn recording_key(&self) -> u64 {
430 self.recording_key_ref().into()
431 }
432
433 fn recorder_key(&self) -> u64 {
434 self.recorder_state.key.as_key_ref().into()
435 }
436}
437
438impl Drop for Recording {
439 fn drop(&mut self) {
440 assert!(
441 self.recorder_state
442 .recordings
443 .remove(&self.state.key.as_key_ref())
444 .is_some(),
445 "missing recording"
446 );
447 }
448}
449
450#[derive(Debug)]
451struct RecordingState {
452 active: Mutex<HashMap<Id, (&'static Metadata<'static>, Option<Id>)>>,
453 key: Key,
454 seq: AtomicUsize,
455 spool: Spool<Event>,
456}
457
458pub struct Recorder {
463 state: Arc<RecorderState>,
464}
465
466#[derive(Debug)]
467struct RecorderState {
468 key: Key,
469 recordings: Arc<DashMap<KeyRef, Arc<RecordingState>>>,
470 pool: Pool<Event>,
471}
472
473impl Recorder {
474 pub fn new() -> Self {
476 let state = Arc::new(RecorderState {
477 key: Key::new(),
478 recordings: Arc::new(DashMap::new()),
479 pool: Pool::new(1024),
480 });
481 Self { state }
482 }
483
484 pub fn record(&self, cap: usize) -> Recording {
487 Recording::new(cap, Arc::clone(&self.state))
488 }
489
490 pub fn layer(&self) -> RecorderLayer {
494 RecorderLayer {
495 state: Arc::clone(&self.state),
496 }
497 }
498}
499
500pub struct RecorderLayer {
502 state: Arc<RecorderState>,
503}
504
505impl RecorderLayer {
506 fn iter_recordings<'a, S>(
508 &'a self,
509 scope: Scope<'a, S>,
510 ) -> impl Iterator<Item = dashmap::mapref::one::Ref<'a, KeyRef, Arc<RecordingState>>> + 'a
511 where
512 S: Subscriber + for<'lookup> LookupSpan<'lookup>,
513 {
514 scope
517 .from_root()
518 .filter_map(|span| match span.extensions().get::<(KeyRef, KeyRef)>() {
519 Some((recording_key_ref, recorder_key_ref))
520 if recorder_key_ref == &self.state.key.as_key_ref() =>
521 {
522 Some(*recording_key_ref)
523 }
524 _ => None,
525 })
526 .filter_map(|key_ref| self.state.recordings.get(&key_ref))
528 }
529}
530
531impl<S: Subscriber> Layer<S> for RecorderLayer
532where
533 S: for<'span> LookupSpan<'span>,
534{
535 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
536 let mut visitor = RecordingKeysVisitor::new();
537
538 attrs.record(&mut visitor);
539
540 if let Some(keys) = visitor.keys()
541 && let Some(span) = ctx.span(id)
542 {
543 let mut extensions: tracing_subscriber::registry::ExtensionsMut<'_> =
544 span.extensions_mut();
545 extensions.insert(keys);
546 }
547 }
548
549 fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
550 let Some(scope) = ctx.event_scope(event) else {
551 return;
552 };
553 for state in self.iter_recordings(scope) {
554 let mut recorded = self.state.pool.get();
555 let seq = state.seq.fetch_add(1, Ordering::Relaxed);
556 recorded.reset(SystemTime::now(), event.metadata(), seq);
557 event.record(&mut recorded);
558 state.spool.push(recorded);
559 }
560 }
561
562 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
563 let Some(span) = ctx.span(id) else { return };
564
565 for state in self.iter_recordings(span.scope()) {
566 let mut active = state.active.lock().unwrap();
567 active.insert(id.clone(), (span.metadata(), span.parent().map(|o| o.id())));
568 }
569 }
570
571 fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
572 let Some(span) = ctx.span(id) else { return };
573
574 for state in self.iter_recordings(span.scope()) {
575 let mut active = state.active.lock().unwrap();
576 active.remove(id);
577 }
578 }
579}
580
581struct RecordingKeysVisitor {
583 recording_key: Option<KeyRef>,
584 recorder_key: Option<KeyRef>,
585}
586
587impl RecordingKeysVisitor {
588 fn new() -> Self {
589 Self {
590 recording_key: None,
591 recorder_key: None,
592 }
593 }
594
595 fn keys(&self) -> Option<(KeyRef, KeyRef)> {
596 match (self.recording_key, self.recorder_key) {
597 (Some(recording_key), Some(recorder_key)) => Some((recording_key, recorder_key)),
598 _ => None,
599 }
600 }
601}
602
603impl Visit for RecordingKeysVisitor {
604 fn record_u64(&mut self, field: &Field, value: u64) {
605 if field.name() == "recording" {
606 self.recording_key = Some(value.into());
607 } else if field.name() == "recorder" {
608 self.recorder_key = Some(value.into());
609 }
610 }
611
612 fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
613}
614
615#[cfg(test)]
616mod tests {
617 use serde_json::json;
618 use tracing::Level;
619 use tracing::info;
620 use tracing::span;
621 use tracing_subscriber::Registry;
622 use tracing_subscriber::prelude::*;
623
624 use super::*;
625
626 #[test]
627 fn test_key() {
628 let key = Key::new();
629 assert_eq!(key.as_key_ref(), key.as_key_ref());
630
631 assert_ne!(Key::new().as_key_ref(), Key::new().as_key_ref());
632 }
633
634 #[test]
635 fn test_events_are_recorded() {
636 let recorder = Recorder::new();
637 let recording = recorder.record(10);
638 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
639 let span = recording.span("test");
640 let _guard = span.enter();
641 info!("This event should be recorded");
642 info!("another event");
643 });
644
645 let events = recording.tail();
646 assert_eq!(events.len(), 2);
647 assert_eq!(
648 events[0].json_value(),
649 json!({
650 "message": "This event should be recorded"
651 })
652 );
653 assert_eq!(
654 events[1].json_value(),
655 json!({
656 "message": "another event"
657 })
658 );
659 }
660
661 #[test]
662 fn test_last_n_entries() {
663 let recorder = Recorder::new();
664 let recording = recorder.record(5);
665 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
666 let span = recording.span("test");
667 let _guard = span.enter();
668 for i in 0..10 {
669 info!("event {}", i);
670 }
671 });
672
673 let events = recording.tail();
674 assert_eq!(events.len(), 5);
675 for (i, event) in events.into_iter().enumerate() {
676 assert_eq!(
677 event.json_value(),
678 json!({
679 "message": format!("event {}", i + 5)
680 })
681 );
682 assert_eq!(event.seq, i + 5);
683 }
684 }
685
686 #[test]
687 fn test_events_outside_span_are_not_recorded() {
688 let recorder = Recorder::new();
689 let recording = recorder.record(10);
690 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
691 let _span = recording.span("test"); info!("This event should NOT be recorded");
693 });
694 assert_eq!(recording.tail().len(), 0);
695 }
696
697 #[test]
698 fn test_child_span_inherits_recorder() {
699 let recorder = Recorder::new();
700 let recording = recorder.record(10);
701
702 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
703 let outer = recording.span("test");
704 let _outer_guard = outer.enter();
705
706 {
707 let inner = span!(Level::INFO, "child_span");
708 let _inner_guard = inner.enter();
709
710 info!("Event from inner span");
711 }
712 });
713
714 let events = recording.tail();
715 assert_eq!(events.len(), 1);
716 assert_eq!(
717 events[0].json_value(),
718 json!({
719 "message": "Event from inner span"
720 })
721 );
722 }
723
724 #[test]
731 fn test_recording_spans_are_isolated_across_nested_recordings() {
732 let recorder = Recorder::new();
733 let recording_a = recorder.record(10);
734 let recording_b = recorder.record(10);
735
736 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
737 let span_a = recording_a.span("test_a");
738 let _guard_a = span_a.enter();
739
740 info!("event_from_a");
741
742 {
743 let span_b = recording_b.span("test_b");
744 let _guard_b = span_b.enter();
745
746 info!("event_from_b");
747 }
748 });
749
750 let events_a: Vec<String> = recording_a
751 .tail()
752 .iter()
753 .map(|e| e.json_value().to_string())
754 .collect();
755 let events_b: Vec<String> = recording_b
756 .tail()
757 .iter()
758 .map(|e| e.json_value().to_string())
759 .collect();
760
761 let a_joined = events_a.join(" ");
762 let b_joined = events_b.join(" ");
763
764 assert!(
765 a_joined.contains("event_from_a"),
766 "recording A should contain event_from_a"
767 );
768 assert!(
769 !a_joined.contains("event_from_b"),
770 "recording A should NOT contain event_from_b"
771 );
772 assert!(
773 b_joined.contains("event_from_b"),
774 "recording B should contain event_from_b"
775 );
776 assert!(
777 !b_joined.contains("event_from_a"),
778 "recording B should NOT contain event_from_a"
779 );
780 }
781
782 #[test]
783 fn test_active_spans() {
784 let recorder = Recorder::new();
785 let recording = recorder.record(10);
786
787 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
788 let outer = recording.span("test");
789 let _outer_guard = outer.enter();
790
791 {
792 let inner = span!(Level::INFO, "child_span");
793 let _inner_guard = inner.enter();
794
795 assert_eq!(recording.stacks().len(), 1);
796 assert_eq!(recording.stacks()[0][0].name(), "child_span");
798 assert_eq!(recording.stacks()[0][1].name(), "recording");
799 }
800 });
801 }
802}