1use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::fmt::Display;
13use std::mem::take;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::atomic::AtomicUsize;
17use std::sync::atomic::Ordering;
18use std::time::SystemTime;
19
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde::Serialize;
23use tracing::Level;
24use tracing::Metadata;
25use tracing::Span;
26use tracing::Subscriber;
27use tracing::field::Field;
28use tracing::field::Visit;
29use tracing::span;
30use tracing::span::Attributes;
31use tracing::span::Id;
32use tracing_subscriber::Layer;
33use tracing_subscriber::layer::Context;
34use tracing_subscriber::registry::LookupSpan;
35use tracing_subscriber::registry::Scope;
36
37use crate::pool::Pool;
38use crate::spool::Spool;
39
40#[derive(Debug)]
43struct Key(
44 Box<u8>,
48);
49
50impl Key {
51 fn new() -> Self {
53 Self(Box::new(0u8))
54 }
55
56 fn as_key_ref(&self) -> KeyRef {
61 KeyRef(&*self.0 as *const _)
62 }
63}
64
65#[derive(Debug, PartialEq, Eq, Hash, Clone)]
68struct KeyRef(*const u8);
69
70unsafe impl Send for KeyRef {}
73unsafe impl Sync for KeyRef {}
76
77impl Copy for KeyRef {}
78
79impl From<Key> for KeyRef {
80 fn from(value: Key) -> Self {
81 Self(&*value.0 as *const _)
82 }
83}
84
85impl From<u64> for KeyRef {
86 fn from(value: u64) -> Self {
87 Self(value as *const _)
88 }
89}
90
91impl From<KeyRef> for u64 {
92 fn from(value: KeyRef) -> Self {
93 value.0 as u64
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub enum Value {
103 String(String),
105 I64(i64),
107 U64(u64),
109 F64(f64),
111 Bool(bool),
113}
114
115impl Value {
116 pub fn to_json(&self) -> serde_json::Value {
118 match &self {
119 Self::String(s) => serde_json::Value::String(s.clone()),
120 Self::I64(i) => serde_json::Value::Number(serde_json::Number::from(*i)),
121 Self::U64(u) => serde_json::Value::Number(serde_json::Number::from(*u)),
122 Self::F64(f) => serde_json::Value::Number(serde_json::Number::from_f64(*f).unwrap()),
123 Self::Bool(b) => serde_json::Value::Bool(*b),
124 }
125 }
126}
127
128impl Default for Value {
129 fn default() -> Self {
130 Value::Bool(false)
131 }
132}
133
134#[derive(Debug, Default, Clone)]
135struct Entry {
136 name: &'static str,
137 value: Value,
138 buffer: Option<String>,
139}
140
141impl Entry {
142 fn reset(&mut self) {
143 if let Value::String(mut s) = take(&mut self.value) {
144 s.clear();
145 self.buffer = Some(s);
146 }
147 }
148
149 fn set_str(&mut self, name: &'static str, value: &str) {
150 self.reset();
151 let mut buf = self.buffer.take().unwrap_or_else(String::new);
152 buf.clear();
153 buf.push_str(value);
154 self.name = name;
155 self.value = Value::String(buf);
156 }
157
158 fn set_error(&mut self, name: &'static str, value: &(dyn std::error::Error + 'static)) {
159 self.reset();
160 let mut buf = self.buffer.take().unwrap_or_else(String::new);
161
162 let mut formatter =
163 core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
164
165 Display::fmt(value, &mut formatter)
166 .expect("a Display implementation returned an error unexpectedly");
167
168 self.name = name;
169 self.value = Value::String(buf);
170 }
171
172 fn set_debug(&mut self, name: &'static str, value: &(dyn std::fmt::Debug)) {
173 self.reset();
174 let mut buf = self.buffer.take().unwrap_or_else(String::new);
175
176 let mut formatter =
177 core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
178
179 Debug::fmt(value, &mut formatter)
180 .expect("a Debug implementation returned an error unexpectedly");
181
182 self.name = name;
183 self.value = Value::String(buf);
184 }
185
186 fn set_i64(&mut self, name: &'static str, value: i64) {
187 self.reset();
188 self.name = name;
189 self.value = Value::I64(value);
190 }
191
192 fn set_u64(&mut self, name: &'static str, value: u64) {
193 self.reset();
194 self.name = name;
195 self.value = Value::U64(value);
196 }
197
198 fn set_f64(&mut self, name: &'static str, value: f64) {
199 self.reset();
200 self.name = name;
201 self.value = Value::F64(value);
202 }
203
204 fn set_bool(&mut self, name: &'static str, value: bool) {
205 self.reset();
206 self.name = name;
207 self.value = Value::Bool(value);
208 }
209}
210
211#[derive(Debug, Clone)]
213pub struct Event {
214 pub time: SystemTime,
216
217 pub metadata: &'static Metadata<'static>,
219
220 fields: Vec<Entry>,
222
223 num_fields: usize,
225
226 pub seq: usize,
228}
229
230impl Event {
231 fn reset(&mut self, time: SystemTime, metadata: &'static Metadata<'static>, seq: usize) {
232 self.time = time;
233 self.metadata = metadata;
234 self.num_fields = 0;
235 self.seq = seq;
236 }
237
238 fn next_field(&mut self) -> &mut Entry {
239 while self.fields.len() <= self.num_fields {
240 self.fields.push(Default::default());
241 }
242 let field = self.fields.get_mut(self.num_fields).unwrap();
243 field.reset();
244 self.num_fields += 1;
245 field
246 }
247
248 pub fn json_value(&self) -> serde_json::Value {
250 serde_json::Value::Object(self.json_fields())
251 }
252
253 pub fn json_fields(&self) -> serde_json::Map<String, serde_json::Value> {
255 let mut map = serde_json::Map::new();
256 for field in &self.fields {
257 map.insert(field.name.to_string(), field.value.to_json());
258 }
259 map
260 }
261
262 pub fn fields(&self) -> Vec<(String, Value)> {
264 self.fields
265 .iter()
266 .map(|field| (field.name.to_string(), field.value.clone()))
267 .collect()
268 }
269}
270
271impl Default for Event {
272 fn default() -> Self {
273 static __CALLSITE: tracing::__macro_support::MacroCallsite = tracing::callsite2! {
274 name: "",
275 kind: tracing::metadata::Kind::SPAN,
276 level: Level::DEBUG,
277 fields:
278 };
279 static DEFAULT_METADATA: Metadata<'static> = tracing::metadata! {
280 name: "",
281 target: "",
282 level: Level::DEBUG,
283 fields: &[],
284 callsite: &__CALLSITE,
285 kind: tracing::metadata::Kind::SPAN,
286 };
287 Self {
288 time: SystemTime::now(),
289 metadata: &DEFAULT_METADATA,
290 fields: Vec::new(),
291 num_fields: 0,
292 seq: 0,
293 }
294 }
295}
296
297impl Visit for Event {
299 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
300 self.next_field().set_i64(field.name(), value);
301 }
302
303 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
304 self.next_field().set_u64(field.name(), value);
305 }
306
307 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
308 self.next_field().set_f64(field.name(), value);
309 }
310
311 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
312 self.next_field().set_bool(field.name(), value);
313 }
314
315 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
316 self.next_field().set_str(field.name(), value);
317 }
318
319 fn record_error(
320 &mut self,
321 field: &tracing::field::Field,
322 value: &(dyn std::error::Error + 'static),
323 ) {
324 self.next_field().set_error(field.name(), value);
325 }
326
327 fn record_debug(&mut self, field: &tracing::field::Field, value: &(dyn std::fmt::Debug)) {
328 self.next_field().set_debug(field.name(), value);
329 }
330}
331
332#[derive(Debug)]
334pub struct Recording {
335 state: Arc<RecordingState>,
336 recorder_state: Arc<RecorderState>,
337}
338
339impl Recording {
340 fn new(cap: usize, recorder_state: Arc<RecorderState>) -> Self {
341 assert!(cap > 1, "capacity must be > 1");
342
343 let state = Arc::new(RecordingState {
344 key: Key::new(),
345 active: Mutex::new(HashMap::new()),
346 spool: Spool::new(cap),
347 seq: AtomicUsize::new(0),
348 });
349 assert!(
350 recorder_state
351 .recordings
352 .insert(state.key.as_key_ref(), Arc::clone(&state))
353 .is_none(),
354 "non-unique key"
355 );
356
357 Self {
358 state,
359 recorder_state,
360 }
361 }
362
363 pub fn span(&self) -> Span {
365 span!(
366 Level::INFO,
367 "recording",
368 recording = self.recording_key(),
369 recorder = self.recorder_key(),
370 )
371 }
372
373 pub fn tail(&self) -> Vec<Event> {
376 self.state.spool.tail()
377 }
378
379 pub fn stacks(&self) -> Vec<Vec<&'static Metadata<'static>>> {
382 let snapshot = self.state.active.lock().unwrap().clone();
383
384 let parents: HashSet<Id> = snapshot
385 .iter()
386 .filter_map(|(_, (_, parent))| parent.clone())
387 .collect();
388
389 snapshot
390 .iter()
391 .filter_map(|(id, _)| {
392 if parents.contains(id) {
393 None
394 } else {
395 Some(id.clone())
396 }
397 })
398 .map(|id| {
399 let mut stack = Vec::new();
400 let mut parent: Option<Id> = Some(id);
401 while let Some(id) = parent {
402 let Some((meta, next_parent)) = snapshot.get(&id) else {
403 break;
404 };
405 stack.push(*meta);
406 parent = next_parent.clone();
407 }
408 stack
409 })
410 .collect()
411 }
412
413 fn recording_key_ref(&self) -> KeyRef {
414 self.state.key.as_key_ref()
415 }
416
417 fn recording_key(&self) -> u64 {
418 self.recording_key_ref().into()
419 }
420
421 fn recorder_key(&self) -> u64 {
422 self.recorder_state.key.as_key_ref().into()
423 }
424}
425
426impl Drop for Recording {
427 fn drop(&mut self) {
428 assert!(
429 self.recorder_state
430 .recordings
431 .remove(&self.state.key.as_key_ref())
432 .is_some(),
433 "missing recording"
434 );
435 }
436}
437
438#[derive(Debug)]
439struct RecordingState {
440 active: Mutex<HashMap<Id, (&'static Metadata<'static>, Option<Id>)>>,
441 key: Key,
442 seq: AtomicUsize,
443 spool: Spool<Event>,
444}
445
446pub struct Recorder {
451 state: Arc<RecorderState>,
452}
453
454#[derive(Debug)]
455struct RecorderState {
456 key: Key,
457 recordings: Arc<DashMap<KeyRef, Arc<RecordingState>>>,
458 pool: Pool<Event>,
459}
460
461impl Recorder {
462 pub fn new() -> Self {
464 let state = Arc::new(RecorderState {
465 key: Key::new(),
466 recordings: Arc::new(DashMap::new()),
467 pool: Pool::new(1024),
468 });
469 Self { state }
470 }
471
472 pub fn record(&self, cap: usize) -> Recording {
475 Recording::new(cap, Arc::clone(&self.state))
476 }
477
478 pub fn layer(&self) -> RecorderLayer {
482 RecorderLayer {
483 state: Arc::clone(&self.state),
484 }
485 }
486}
487
488pub struct RecorderLayer {
490 state: Arc<RecorderState>,
491}
492
493impl RecorderLayer {
494 fn iter_recordings<'a, S>(
496 &'a self,
497 scope: Scope<'a, S>,
498 ) -> impl Iterator<Item = dashmap::mapref::one::Ref<'a, KeyRef, Arc<RecordingState>>> + 'a
499 where
500 S: Subscriber + for<'lookup> LookupSpan<'lookup>,
501 {
502 scope
505 .from_root()
506 .filter_map(|span| match span.extensions().get::<(KeyRef, KeyRef)>() {
507 Some((recording_key_ref, recorder_key_ref))
508 if recorder_key_ref == &self.state.key.as_key_ref() =>
509 {
510 Some(*recording_key_ref)
511 }
512 _ => None,
513 })
514 .filter_map(|key_ref| self.state.recordings.get(&key_ref))
516 }
517}
518
519impl<S: Subscriber> Layer<S> for RecorderLayer
520where
521 S: for<'span> LookupSpan<'span>,
522{
523 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
524 let mut visitor = RecordingKeysVisitor::new();
525
526 attrs.record(&mut visitor);
527
528 if let Some(keys) = visitor.keys() {
529 if let Some(span) = ctx.span(id) {
530 let mut extensions: tracing_subscriber::registry::ExtensionsMut<'_> =
531 span.extensions_mut();
532 extensions.insert(keys);
533 }
534 }
535 }
536
537 fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
538 let Some(scope) = ctx.event_scope(event) else {
539 return;
540 };
541 for state in self.iter_recordings(scope) {
542 let mut recorded = self.state.pool.get();
543 let seq = state.seq.fetch_add(1, Ordering::Relaxed);
544 recorded.reset(SystemTime::now(), event.metadata(), seq);
545 event.record(&mut recorded);
546 state.spool.push(recorded);
547 }
548 }
549
550 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
551 let Some(span) = ctx.span(id) else { return };
552
553 for state in self.iter_recordings(span.scope()) {
554 let mut active = state.active.lock().unwrap();
555 active.insert(id.clone(), (span.metadata(), span.parent().map(|o| o.id())));
556 }
557 }
558
559 fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
560 let Some(span) = ctx.span(id) else { return };
561
562 for state in self.iter_recordings(span.scope()) {
563 let mut active = state.active.lock().unwrap();
564 active.remove(id);
565 }
566 }
567}
568
569struct RecordingKeysVisitor {
571 recording_key: Option<KeyRef>,
572 recorder_key: Option<KeyRef>,
573}
574
575impl RecordingKeysVisitor {
576 fn new() -> Self {
577 Self {
578 recording_key: None,
579 recorder_key: None,
580 }
581 }
582
583 fn keys(&self) -> Option<(KeyRef, KeyRef)> {
584 match (self.recording_key, self.recorder_key) {
585 (Some(recording_key), Some(recorder_key)) => Some((recording_key, recorder_key)),
586 _ => None,
587 }
588 }
589}
590
591impl Visit for RecordingKeysVisitor {
592 fn record_u64(&mut self, field: &Field, value: u64) {
593 if field.name() == "recording" {
594 self.recording_key = Some(value.into());
595 } else if field.name() == "recorder" {
596 self.recorder_key = Some(value.into());
597 }
598 }
599
600 fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
601}
602
603#[cfg(test)]
604mod tests {
605 use serde_json::json;
606 use tracing::Level;
607 use tracing::info;
608 use tracing::span;
609 use tracing_subscriber::Registry;
610 use tracing_subscriber::prelude::*;
611
612 use super::*;
613
614 #[test]
615 fn test_key() {
616 let key = Key::new();
617 assert_eq!(key.as_key_ref(), key.as_key_ref());
618
619 assert_ne!(Key::new().as_key_ref(), Key::new().as_key_ref());
620 }
621
622 #[test]
623 fn test_events_are_recorded() {
624 let recorder = Recorder::new();
625 let recording = recorder.record(10);
626 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
627 let span = recording.span();
628 let _guard = span.enter();
629 info!("This event should be recorded");
630 info!("another event");
631 });
632
633 let events = recording.tail();
634 assert_eq!(events.len(), 2);
635 assert_eq!(
636 events[0].json_value(),
637 json!({
638 "message": "This event should be recorded"
639 })
640 );
641 assert_eq!(
642 events[1].json_value(),
643 json!({
644 "message": "another event"
645 })
646 );
647 }
648
649 #[test]
650 fn test_last_n_entries() {
651 let recorder = Recorder::new();
652 let recording = recorder.record(5);
653 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
654 let span = recording.span();
655 let _guard = span.enter();
656 for i in 0..10 {
657 info!("event {}", i);
658 }
659 });
660
661 let events = recording.tail();
662 assert_eq!(events.len(), 5);
663 for (i, event) in events.into_iter().enumerate() {
664 assert_eq!(
665 event.json_value(),
666 json!({
667 "message": format!("event {}", i + 5)
668 })
669 );
670 assert_eq!(event.seq, i + 5);
671 }
672 }
673
674 #[test]
675 fn test_events_outside_span_are_not_recorded() {
676 let recorder = Recorder::new();
677 let recording = recorder.record(10);
678 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
679 let _span = recording.span(); info!("This event should NOT be recorded");
681 });
682 assert_eq!(recording.tail().len(), 0);
683 }
684
685 #[test]
686 fn test_child_span_inherits_recorder() {
687 let recorder = Recorder::new();
688 let recording = recorder.record(10);
689
690 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
691 let outer = recording.span();
692 let _outer_guard = outer.enter();
693
694 {
695 let inner = span!(Level::INFO, "child_span");
696 let _inner_guard = inner.enter();
697
698 info!("Event from inner span");
699 }
700 });
701
702 let events = recording.tail();
703 assert_eq!(events.len(), 1);
704 assert_eq!(
705 events[0].json_value(),
706 json!({
707 "message": "Event from inner span"
708 })
709 );
710 }
711
712 #[test]
713 fn test_active_spans() {
714 let recorder = Recorder::new();
715 let recording = recorder.record(10);
716
717 tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
718 let outer = recording.span();
719 let _outer_guard = outer.enter();
720
721 {
722 let inner = span!(Level::INFO, "child_span");
723 let _inner_guard = inner.enter();
724
725 assert_eq!(recording.stacks().len(), 1);
726 assert_eq!(recording.stacks()[0][0].name(), "child_span");
728 assert_eq!(recording.stacks()[0][1].name(), "recording");
729 }
730 });
731 }
732}