hyperactor_telemetry/
recorder.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::fmt::Display;
13use std::mem::take;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::atomic::AtomicUsize;
17use std::sync::atomic::Ordering;
18use std::time::SystemTime;
19
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde::Serialize;
23use tracing::Level;
24use tracing::Metadata;
25use tracing::Span;
26use tracing::Subscriber;
27use tracing::field::Field;
28use tracing::field::Visit;
29use tracing::span;
30use tracing::span::Attributes;
31use tracing::span::Id;
32use tracing_subscriber::Layer;
33use tracing_subscriber::layer::Context;
34use tracing_subscriber::registry::LookupSpan;
35use tracing_subscriber::registry::Scope;
36
37use crate::SPAN_FIELD_RECORDING;
38use crate::pool::Pool;
39use crate::spool::Spool;
40
41/// Key is provides a guaranteed unique [`KeyRef`] for as long as
42/// the [`Key`] is alive.
43#[derive(Debug)]
44struct Key(
45    // We have to use a u8 (as opposed to, e.g., ()) here, as
46    // Rust gets clever about zero size objects, and allocates
47    // them all to 0x1.
48    Box<u8>,
49);
50
51impl Key {
52    /// Create a new unique key.
53    fn new() -> Self {
54        Self(Box::new(0u8))
55    }
56
57    /// Produce a reference to this key. The reference is guaranteed to be
58    /// unique for the lifetime of this key. The user must be careful to
59    /// ensure that there are no outstanding [`KeyRef`]s after its corresponding
60    /// [`Key`] is dropped.
61    fn as_key_ref(&self) -> KeyRef {
62        KeyRef(&*self.0 as *const _)
63    }
64}
65
66/// A reference to a [`Key`]. KeyRefs may be used as keys in a map,
67/// and are guaranteed to be unique for the lifetime of the [`Key`].
68#[derive(Debug, PartialEq, Eq, Hash, Clone)]
69struct KeyRef(*const u8);
70
71// SAFETY: We only ever pass these around as values. They are never
72// dereferenced.
73unsafe impl Send for KeyRef {}
74// SAFETY: We only ever pass these around as values. They are never
75// dereferenced.
76unsafe impl Sync for KeyRef {}
77
78impl Copy for KeyRef {}
79
80impl From<Key> for KeyRef {
81    fn from(value: Key) -> Self {
82        Self(&*value.0 as *const _)
83    }
84}
85
86impl From<u64> for KeyRef {
87    fn from(value: u64) -> Self {
88        Self(value as *const _)
89    }
90}
91
92impl From<KeyRef> for u64 {
93    fn from(value: KeyRef) -> Self {
94        value.0 as u64
95    }
96}
97
98/// A value that can appear in an entry. This custom
99/// representation serves two purposes: 1) internally, to
100/// to manage string buffer reuse; and 2) as a serialization
101/// format that may be used with bincode.
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
103pub enum Value {
104    /// A String.
105    String(String),
106    /// An i64.
107    I64(i64),
108    /// A u64.
109    U64(u64),
110    /// An f64.
111    F64(f64),
112    /// A bool.
113    Bool(bool),
114}
115
116impl Value {
117    /// Convert this Value to a JSON value
118    pub fn to_json(&self) -> serde_json::Value {
119        match &self {
120            Self::String(s) => serde_json::Value::String(s.clone()),
121            Self::I64(i) => serde_json::Value::Number(serde_json::Number::from(*i)),
122            Self::U64(u) => serde_json::Value::Number(serde_json::Number::from(*u)),
123            Self::F64(f) => serde_json::Value::Number(serde_json::Number::from_f64(*f).unwrap()),
124            Self::Bool(b) => serde_json::Value::Bool(*b),
125        }
126    }
127}
128
129impl Default for Value {
130    fn default() -> Self {
131        Value::Bool(false)
132    }
133}
134
135#[derive(Debug, Default, Clone)]
136struct Entry {
137    name: &'static str,
138    value: Value,
139    buffer: Option<String>,
140}
141
142impl Entry {
143    fn reset(&mut self) {
144        if let Value::String(mut s) = take(&mut self.value) {
145            s.clear();
146            self.buffer = Some(s);
147        }
148    }
149
150    fn set_str(&mut self, name: &'static str, value: &str) {
151        self.reset();
152        let mut buf = self.buffer.take().unwrap_or_else(String::new);
153        buf.clear();
154        buf.push_str(value);
155        self.name = name;
156        self.value = Value::String(buf);
157    }
158
159    fn set_error(&mut self, name: &'static str, value: &(dyn std::error::Error + 'static)) {
160        self.reset();
161        let mut buf = self.buffer.take().unwrap_or_else(String::new);
162
163        let mut formatter =
164            core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
165
166        Display::fmt(value, &mut formatter)
167            .expect("a Display implementation returned an error unexpectedly");
168
169        self.name = name;
170        self.value = Value::String(buf);
171    }
172
173    fn set_debug(&mut self, name: &'static str, value: &(dyn std::fmt::Debug)) {
174        self.reset();
175        let mut buf = self.buffer.take().unwrap_or_else(String::new);
176
177        let mut formatter =
178            core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
179
180        Debug::fmt(value, &mut formatter)
181            .expect("a Debug implementation returned an error unexpectedly");
182
183        self.name = name;
184        self.value = Value::String(buf);
185    }
186
187    fn set_i64(&mut self, name: &'static str, value: i64) {
188        self.reset();
189        self.name = name;
190        self.value = Value::I64(value);
191    }
192
193    fn set_u64(&mut self, name: &'static str, value: u64) {
194        self.reset();
195        self.name = name;
196        self.value = Value::U64(value);
197    }
198
199    fn set_f64(&mut self, name: &'static str, value: f64) {
200        self.reset();
201        self.name = name;
202        self.value = Value::F64(value);
203    }
204
205    fn set_bool(&mut self, name: &'static str, value: bool) {
206        self.reset();
207        self.name = name;
208        self.value = Value::Bool(value);
209    }
210}
211
212/// An event that has been recorded by a [`Recorder`].
213#[derive(Debug, Clone)]
214pub struct Event {
215    /// The time at which the event was recorded.
216    pub time: SystemTime,
217
218    /// The metadata for the event.
219    pub metadata: &'static Metadata<'static>,
220
221    /// All other (structured) fields defined in the event.
222    fields: Vec<Entry>,
223
224    /// The number of fields defined.
225    num_fields: usize,
226
227    /// A monotonically increasing sequence number.
228    pub seq: usize,
229}
230
231impl Event {
232    fn reset(&mut self, time: SystemTime, metadata: &'static Metadata<'static>, seq: usize) {
233        self.time = time;
234        self.metadata = metadata;
235        self.num_fields = 0;
236        self.seq = seq;
237    }
238
239    fn next_field(&mut self) -> &mut Entry {
240        while self.fields.len() <= self.num_fields {
241            self.fields.push(Default::default());
242        }
243        let field = self.fields.get_mut(self.num_fields).unwrap();
244        field.reset();
245        self.num_fields += 1;
246        field
247    }
248
249    /// The fields of this event, structed as a JSON value.
250    pub fn json_value(&self) -> serde_json::Value {
251        serde_json::Value::Object(self.json_fields())
252    }
253
254    /// The fields of this event, structured as a JSON map.
255    pub fn json_fields(&self) -> serde_json::Map<String, serde_json::Value> {
256        let mut map = serde_json::Map::new();
257        for field in &self.fields {
258            map.insert(field.name.to_string(), field.value.to_json());
259        }
260        map
261    }
262
263    /// The fields of this event, using the [`Value`] representation.
264    pub fn fields(&self) -> Vec<(String, Value)> {
265        self.fields
266            .iter()
267            .map(|field| (field.name.to_string(), field.value.clone()))
268            .collect()
269    }
270}
271
272impl Default for Event {
273    fn default() -> Self {
274        static __CALLSITE: tracing::__macro_support::MacroCallsite = tracing::callsite2! {
275            name: "",
276            kind: tracing::metadata::Kind::SPAN,
277            level: Level::DEBUG,
278            fields:
279        };
280        static DEFAULT_METADATA: Metadata<'static> = tracing::metadata! {
281            name: "",
282            target: "",
283            level: Level::DEBUG,
284            fields: &[],
285            callsite: &__CALLSITE,
286            kind: tracing::metadata::Kind::SPAN,
287        };
288        Self {
289            time: SystemTime::now(),
290            metadata: &DEFAULT_METADATA,
291            fields: Vec::new(),
292            num_fields: 0,
293            seq: 0,
294        }
295    }
296}
297
298/// Visitor to populate an [`Event`] from a [`tracing::Event`].
299impl Visit for Event {
300    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
301        self.next_field().set_i64(field.name(), value);
302    }
303
304    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
305        self.next_field().set_u64(field.name(), value);
306    }
307
308    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
309        self.next_field().set_f64(field.name(), value);
310    }
311
312    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
313        self.next_field().set_bool(field.name(), value);
314    }
315
316    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
317        self.next_field().set_str(field.name(), value);
318    }
319
320    fn record_error(
321        &mut self,
322        field: &tracing::field::Field,
323        value: &(dyn std::error::Error + 'static),
324    ) {
325        self.next_field().set_error(field.name(), value);
326    }
327
328    fn record_debug(&mut self, field: &tracing::field::Field, value: &(dyn std::fmt::Debug)) {
329        self.next_field().set_debug(field.name(), value);
330    }
331}
332
333/// A recording of events from a [`Recorder`].
334#[derive(Debug)]
335pub struct Recording {
336    state: Arc<RecordingState>,
337    recorder_state: Arc<RecorderState>,
338}
339
340impl Recording {
341    fn new(cap: usize, recorder_state: Arc<RecorderState>) -> Self {
342        assert!(cap > 1, "capacity must be > 1");
343
344        let state = Arc::new(RecordingState {
345            key: Key::new(),
346            active: Mutex::new(HashMap::new()),
347            spool: Spool::new(cap),
348            seq: AtomicUsize::new(0),
349        });
350        assert!(
351            recorder_state
352                .recordings
353                .insert(state.key.as_key_ref(), Arc::clone(&state))
354                .is_none(),
355            "non-unique key"
356        );
357
358        Self {
359            state,
360            recorder_state,
361        }
362    }
363
364    /// Return a span, which will record events to this recording when entered.
365    pub fn span(&self) -> Span {
366        span!(
367            Level::INFO,
368            SPAN_FIELD_RECORDING,
369            recording = self.recording_key(),
370            recorder = self.recorder_key(),
371        )
372    }
373
374    /// Retrieve the tail of the recorded event log,
375    /// up to the capacity of the recording.
376    pub fn tail(&self) -> Vec<Event> {
377        self.state.spool.tail()
378    }
379
380    /// Return the set of currently active span stacks. The metadata has
381    /// enough information to recover a sparse stack trace for the trace.
382    pub fn stacks(&self) -> Vec<Vec<&'static Metadata<'static>>> {
383        let snapshot = self.state.active.lock().unwrap().clone();
384
385        let parents: HashSet<Id> = snapshot
386            .iter()
387            .filter_map(|(_, (_, parent))| parent.clone())
388            .collect();
389
390        snapshot
391            .iter()
392            .filter_map(|(id, _)| {
393                if parents.contains(id) {
394                    None
395                } else {
396                    Some(id.clone())
397                }
398            })
399            .map(|id| {
400                let mut stack = Vec::new();
401                let mut parent: Option<Id> = Some(id);
402                while let Some(id) = parent {
403                    let Some((meta, next_parent)) = snapshot.get(&id) else {
404                        break;
405                    };
406                    stack.push(*meta);
407                    parent = next_parent.clone();
408                }
409                stack
410            })
411            .collect()
412    }
413
414    fn recording_key_ref(&self) -> KeyRef {
415        self.state.key.as_key_ref()
416    }
417
418    fn recording_key(&self) -> u64 {
419        self.recording_key_ref().into()
420    }
421
422    fn recorder_key(&self) -> u64 {
423        self.recorder_state.key.as_key_ref().into()
424    }
425}
426
427impl Drop for Recording {
428    fn drop(&mut self) {
429        assert!(
430            self.recorder_state
431                .recordings
432                .remove(&self.state.key.as_key_ref())
433                .is_some(),
434            "missing recording"
435        );
436    }
437}
438
439#[derive(Debug)]
440struct RecordingState {
441    active: Mutex<HashMap<Id, (&'static Metadata<'static>, Option<Id>)>>,
442    key: Key,
443    seq: AtomicUsize,
444    spool: Spool<Event>,
445}
446
447/// A recorder captures events from a [`tracing::span`] and records them
448/// to a [`mpsc::UnboundedSender`]. In order to record events, the recorder's
449/// layer ([`Recorder::layer`]) must be installed into the relevant tracing
450/// subscriber.
451pub struct Recorder {
452    state: Arc<RecorderState>,
453}
454
455#[derive(Debug)]
456struct RecorderState {
457    key: Key,
458    recordings: Arc<DashMap<KeyRef, Arc<RecordingState>>>,
459    pool: Pool<Event>,
460}
461
462impl Recorder {
463    /// Create a new recorder.
464    pub fn new() -> Self {
465        let state = Arc::new(RecorderState {
466            key: Key::new(),
467            recordings: Arc::new(DashMap::new()),
468            pool: Pool::new(1024),
469        });
470        Self { state }
471    }
472
473    /// Create a new recording that can be used to selective capture
474    /// events and span traces.
475    pub fn record(&self, cap: usize) -> Recording {
476        Recording::new(cap, Arc::clone(&self.state))
477    }
478
479    /// The layer associated with this recorder. This layer must be
480    /// installed into the relevant tracing subscriber in order to
481    /// record events.
482    pub fn layer(&self) -> RecorderLayer {
483        RecorderLayer {
484            state: Arc::clone(&self.state),
485        }
486    }
487}
488
489/// The type of layer used by [`Recorder`].
490pub struct RecorderLayer {
491    state: Arc<RecorderState>,
492}
493
494impl RecorderLayer {
495    /// Return an iterator over all the [`KeyRef`]s for the given scope.
496    fn iter_recordings<'a, S>(
497        &'a self,
498        scope: Scope<'a, S>,
499    ) -> impl Iterator<Item = dashmap::mapref::one::Ref<'a, KeyRef, Arc<RecordingState>>> + 'a
500    where
501        S: Subscriber + for<'lookup> LookupSpan<'lookup>,
502    {
503        // We can provide deadlock freedom here because we always iterate in the same order:
504        // from root to leaf; thus there can be no cycles.
505        scope
506            .from_root()
507            .filter_map(|span| match span.extensions().get::<(KeyRef, KeyRef)>() {
508                Some((recording_key_ref, recorder_key_ref))
509                    if recorder_key_ref == &self.state.key.as_key_ref() =>
510                {
511                    Some(*recording_key_ref)
512                }
513                _ => None,
514            })
515            // Spans can outlive recording, which may no longer exist.
516            .filter_map(|key_ref| self.state.recordings.get(&key_ref))
517    }
518}
519
520impl<S: Subscriber> Layer<S> for RecorderLayer
521where
522    S: for<'span> LookupSpan<'span>,
523{
524    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
525        let mut visitor = RecordingKeysVisitor::new();
526
527        attrs.record(&mut visitor);
528
529        if let Some(keys) = visitor.keys() {
530            if let Some(span) = ctx.span(id) {
531                let mut extensions: tracing_subscriber::registry::ExtensionsMut<'_> =
532                    span.extensions_mut();
533                extensions.insert(keys);
534            }
535        }
536    }
537
538    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
539        let Some(scope) = ctx.event_scope(event) else {
540            return;
541        };
542        for state in self.iter_recordings(scope) {
543            let mut recorded = self.state.pool.get();
544            let seq = state.seq.fetch_add(1, Ordering::Relaxed);
545            recorded.reset(SystemTime::now(), event.metadata(), seq);
546            event.record(&mut recorded);
547            state.spool.push(recorded);
548        }
549    }
550
551    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
552        let Some(span) = ctx.span(id) else { return };
553
554        for state in self.iter_recordings(span.scope()) {
555            let mut active = state.active.lock().unwrap();
556            active.insert(id.clone(), (span.metadata(), span.parent().map(|o| o.id())));
557        }
558    }
559
560    fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
561        let Some(span) = ctx.span(id) else { return };
562
563        for state in self.iter_recordings(span.scope()) {
564            let mut active = state.active.lock().unwrap();
565            active.remove(id);
566        }
567    }
568}
569
570/// A visitor to pick out the recording key from spans.
571struct RecordingKeysVisitor {
572    recording_key: Option<KeyRef>,
573    recorder_key: Option<KeyRef>,
574}
575
576impl RecordingKeysVisitor {
577    fn new() -> Self {
578        Self {
579            recording_key: None,
580            recorder_key: None,
581        }
582    }
583
584    fn keys(&self) -> Option<(KeyRef, KeyRef)> {
585        match (self.recording_key, self.recorder_key) {
586            (Some(recording_key), Some(recorder_key)) => Some((recording_key, recorder_key)),
587            _ => None,
588        }
589    }
590}
591
592impl Visit for RecordingKeysVisitor {
593    fn record_u64(&mut self, field: &Field, value: u64) {
594        if field.name() == "recording" {
595            self.recording_key = Some(value.into());
596        } else if field.name() == "recorder" {
597            self.recorder_key = Some(value.into());
598        }
599    }
600
601    fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
602}
603
604#[cfg(test)]
605mod tests {
606    use serde_json::json;
607    use tracing::Level;
608    use tracing::info;
609    use tracing::span;
610    use tracing_subscriber::Registry;
611    use tracing_subscriber::prelude::*;
612
613    use super::*;
614
615    #[test]
616    fn test_key() {
617        let key = Key::new();
618        assert_eq!(key.as_key_ref(), key.as_key_ref());
619
620        assert_ne!(Key::new().as_key_ref(), Key::new().as_key_ref());
621    }
622
623    #[test]
624    fn test_events_are_recorded() {
625        let recorder = Recorder::new();
626        let recording = recorder.record(10);
627        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
628            let span = recording.span();
629            let _guard = span.enter();
630            info!("This event should be recorded");
631            info!("another event");
632        });
633
634        let events = recording.tail();
635        assert_eq!(events.len(), 2);
636        assert_eq!(
637            events[0].json_value(),
638            json!({
639                "message": "This event should be recorded"
640            })
641        );
642        assert_eq!(
643            events[1].json_value(),
644            json!({
645                "message": "another event"
646            })
647        );
648    }
649
650    #[test]
651    fn test_last_n_entries() {
652        let recorder = Recorder::new();
653        let recording = recorder.record(5);
654        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
655            let span = recording.span();
656            let _guard = span.enter();
657            for i in 0..10 {
658                info!("event {}", i);
659            }
660        });
661
662        let events = recording.tail();
663        assert_eq!(events.len(), 5);
664        for (i, event) in events.into_iter().enumerate() {
665            assert_eq!(
666                event.json_value(),
667                json!({
668                    "message": format!("event {}", i + 5)
669                })
670            );
671            assert_eq!(event.seq, i + 5);
672        }
673    }
674
675    #[test]
676    fn test_events_outside_span_are_not_recorded() {
677        let recorder = Recorder::new();
678        let recording = recorder.record(10);
679        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
680            let _span = recording.span(); // not entered
681            info!("This event should NOT be recorded");
682        });
683        assert_eq!(recording.tail().len(), 0);
684    }
685
686    #[test]
687    fn test_child_span_inherits_recorder() {
688        let recorder = Recorder::new();
689        let recording = recorder.record(10);
690
691        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
692            let outer = recording.span();
693            let _outer_guard = outer.enter();
694
695            {
696                let inner = span!(Level::INFO, "child_span");
697                let _inner_guard = inner.enter();
698
699                info!("Event from inner span");
700            }
701        });
702
703        let events = recording.tail();
704        assert_eq!(events.len(), 1);
705        assert_eq!(
706            events[0].json_value(),
707            json!({
708                "message": "Event from inner span"
709            })
710        );
711    }
712
713    #[test]
714    fn test_active_spans() {
715        let recorder = Recorder::new();
716        let recording = recorder.record(10);
717
718        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
719            let outer = recording.span();
720            let _outer_guard = outer.enter();
721
722            {
723                let inner = span!(Level::INFO, "child_span");
724                let _inner_guard = inner.enter();
725
726                assert_eq!(recording.stacks().len(), 1);
727                // TODO: possibly we should remove the 'recording' span from here.
728                assert_eq!(recording.stacks()[0][0].name(), "child_span");
729                assert_eq!(recording.stacks()[0][1].name(), "recording");
730            }
731        });
732    }
733}