hyperactor_telemetry/
recorder.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::fmt::Display;
13use std::mem::take;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::atomic::AtomicUsize;
17use std::sync::atomic::Ordering;
18use std::time::SystemTime;
19
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde::Serialize;
23use tracing::Level;
24use tracing::Metadata;
25use tracing::Span;
26use tracing::Subscriber;
27use tracing::field::Field;
28use tracing::field::Visit;
29use tracing::span;
30use tracing::span::Attributes;
31use tracing::span::Id;
32use tracing_subscriber::Layer;
33use tracing_subscriber::layer::Context;
34use tracing_subscriber::registry::LookupSpan;
35use tracing_subscriber::registry::Scope;
36
37use crate::pool::Pool;
38use crate::spool::Spool;
39
40/// Key is provides a guaranteed unique [`KeyRef`] for as long as
41/// the [`Key`] is alive.
42#[derive(Debug)]
43struct Key(
44    // We have to use a u8 (as opposed to, e.g., ()) here, as
45    // Rust gets clever about zero size objects, and allocates
46    // them all to 0x1.
47    Box<u8>,
48);
49
50impl Key {
51    /// Create a new unique key.
52    fn new() -> Self {
53        Self(Box::new(0u8))
54    }
55
56    /// Produce a reference to this key. The reference is guaranteed to be
57    /// unique for the lifetime of this key. The user must be careful to
58    /// ensure that there are no outstanding [`KeyRef`]s after its corresponding
59    /// [`Key`] is dropped.
60    fn as_key_ref(&self) -> KeyRef {
61        KeyRef(&*self.0 as *const _)
62    }
63}
64
65/// A reference to a [`Key`]. KeyRefs may be used as keys in a map,
66/// and are guaranteed to be unique for the lifetime of the [`Key`].
67#[derive(Debug, PartialEq, Eq, Hash, Clone)]
68struct KeyRef(*const u8);
69
70// SAFETY: We only ever pass these around as values. They are never
71// dereferenced.
72unsafe impl Send for KeyRef {}
73// SAFETY: We only ever pass these around as values. They are never
74// dereferenced.
75unsafe impl Sync for KeyRef {}
76
77impl Copy for KeyRef {}
78
79impl From<Key> for KeyRef {
80    fn from(value: Key) -> Self {
81        Self(&*value.0 as *const _)
82    }
83}
84
85impl From<u64> for KeyRef {
86    fn from(value: u64) -> Self {
87        Self(value as *const _)
88    }
89}
90
91impl From<KeyRef> for u64 {
92    fn from(value: KeyRef) -> Self {
93        value.0 as u64
94    }
95}
96
97/// A value that can appear in an entry. This custom
98/// representation serves two purposes: 1) internally, to
99/// to manage string buffer reuse; and 2) as a serialization
100/// format that may be used with bincode.
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub enum Value {
103    /// A String.
104    String(String),
105    /// An i64.
106    I64(i64),
107    /// A u64.
108    U64(u64),
109    /// An f64.
110    F64(f64),
111    /// A bool.
112    Bool(bool),
113}
114
115impl Value {
116    /// Convert this Value to a JSON value
117    pub fn to_json(&self) -> serde_json::Value {
118        match &self {
119            Self::String(s) => serde_json::Value::String(s.clone()),
120            Self::I64(i) => serde_json::Value::Number(serde_json::Number::from(*i)),
121            Self::U64(u) => serde_json::Value::Number(serde_json::Number::from(*u)),
122            Self::F64(f) => serde_json::Value::Number(serde_json::Number::from_f64(*f).unwrap()),
123            Self::Bool(b) => serde_json::Value::Bool(*b),
124        }
125    }
126}
127
128impl Default for Value {
129    fn default() -> Self {
130        Value::Bool(false)
131    }
132}
133
134#[derive(Debug, Default, Clone)]
135struct Entry {
136    name: &'static str,
137    value: Value,
138    buffer: Option<String>,
139}
140
141impl Entry {
142    fn reset(&mut self) {
143        if let Value::String(mut s) = take(&mut self.value) {
144            s.clear();
145            self.buffer = Some(s);
146        }
147    }
148
149    fn set_str(&mut self, name: &'static str, value: &str) {
150        self.reset();
151        let mut buf = self.buffer.take().unwrap_or_else(String::new);
152        buf.clear();
153        buf.push_str(value);
154        self.name = name;
155        self.value = Value::String(buf);
156    }
157
158    fn set_error(&mut self, name: &'static str, value: &(dyn std::error::Error + 'static)) {
159        self.reset();
160        let mut buf = self.buffer.take().unwrap_or_else(String::new);
161
162        let mut formatter =
163            core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
164
165        Display::fmt(value, &mut formatter)
166            .expect("a Display implementation returned an error unexpectedly");
167
168        self.name = name;
169        self.value = Value::String(buf);
170    }
171
172    fn set_debug(&mut self, name: &'static str, value: &(dyn std::fmt::Debug)) {
173        self.reset();
174        let mut buf = self.buffer.take().unwrap_or_else(String::new);
175
176        let mut formatter =
177            core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
178
179        Debug::fmt(value, &mut formatter)
180            .expect("a Debug implementation returned an error unexpectedly");
181
182        self.name = name;
183        self.value = Value::String(buf);
184    }
185
186    fn set_i64(&mut self, name: &'static str, value: i64) {
187        self.reset();
188        self.name = name;
189        self.value = Value::I64(value);
190    }
191
192    fn set_u64(&mut self, name: &'static str, value: u64) {
193        self.reset();
194        self.name = name;
195        self.value = Value::U64(value);
196    }
197
198    fn set_f64(&mut self, name: &'static str, value: f64) {
199        self.reset();
200        self.name = name;
201        self.value = Value::F64(value);
202    }
203
204    fn set_bool(&mut self, name: &'static str, value: bool) {
205        self.reset();
206        self.name = name;
207        self.value = Value::Bool(value);
208    }
209}
210
211/// An event that has been recorded by a [`Recorder`].
212#[derive(Debug, Clone)]
213pub struct Event {
214    /// The time at which the event was recorded.
215    pub time: SystemTime,
216
217    /// The metadata for the event.
218    pub metadata: &'static Metadata<'static>,
219
220    /// All other (structured) fields defined in the event.
221    fields: Vec<Entry>,
222
223    /// The number of fields defined.
224    num_fields: usize,
225
226    /// A monotonically increasing sequence number.
227    pub seq: usize,
228}
229
230impl Event {
231    fn reset(&mut self, time: SystemTime, metadata: &'static Metadata<'static>, seq: usize) {
232        self.time = time;
233        self.metadata = metadata;
234        self.num_fields = 0;
235        self.seq = seq;
236    }
237
238    fn next_field(&mut self) -> &mut Entry {
239        while self.fields.len() <= self.num_fields {
240            self.fields.push(Default::default());
241        }
242        let field = self.fields.get_mut(self.num_fields).unwrap();
243        field.reset();
244        self.num_fields += 1;
245        field
246    }
247
248    /// The fields of this event, structed as a JSON value.
249    pub fn json_value(&self) -> serde_json::Value {
250        serde_json::Value::Object(self.json_fields())
251    }
252
253    /// The fields of this event, structured as a JSON map.
254    pub fn json_fields(&self) -> serde_json::Map<String, serde_json::Value> {
255        let mut map = serde_json::Map::new();
256        for field in &self.fields {
257            map.insert(field.name.to_string(), field.value.to_json());
258        }
259        map
260    }
261
262    /// The fields of this event, using the [`Value`] representation.
263    pub fn fields(&self) -> Vec<(String, Value)> {
264        self.fields
265            .iter()
266            .map(|field| (field.name.to_string(), field.value.clone()))
267            .collect()
268    }
269}
270
271impl Default for Event {
272    fn default() -> Self {
273        static __CALLSITE: tracing::__macro_support::MacroCallsite = tracing::callsite2! {
274            name: "",
275            kind: tracing::metadata::Kind::SPAN,
276            level: Level::DEBUG,
277            fields:
278        };
279        static DEFAULT_METADATA: Metadata<'static> = tracing::metadata! {
280            name: "",
281            target: "",
282            level: Level::DEBUG,
283            fields: &[],
284            callsite: &__CALLSITE,
285            kind: tracing::metadata::Kind::SPAN,
286        };
287        Self {
288            time: SystemTime::now(),
289            metadata: &DEFAULT_METADATA,
290            fields: Vec::new(),
291            num_fields: 0,
292            seq: 0,
293        }
294    }
295}
296
297/// Visitor to populate an [`Event`] from a [`tracing::Event`].
298impl Visit for Event {
299    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
300        self.next_field().set_i64(field.name(), value);
301    }
302
303    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
304        self.next_field().set_u64(field.name(), value);
305    }
306
307    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
308        self.next_field().set_f64(field.name(), value);
309    }
310
311    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
312        self.next_field().set_bool(field.name(), value);
313    }
314
315    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
316        self.next_field().set_str(field.name(), value);
317    }
318
319    fn record_error(
320        &mut self,
321        field: &tracing::field::Field,
322        value: &(dyn std::error::Error + 'static),
323    ) {
324        self.next_field().set_error(field.name(), value);
325    }
326
327    fn record_debug(&mut self, field: &tracing::field::Field, value: &(dyn std::fmt::Debug)) {
328        self.next_field().set_debug(field.name(), value);
329    }
330}
331
332/// A recording of events from a [`Recorder`].
333#[derive(Debug)]
334pub struct Recording {
335    state: Arc<RecordingState>,
336    recorder_state: Arc<RecorderState>,
337}
338
339impl Recording {
340    fn new(cap: usize, recorder_state: Arc<RecorderState>) -> Self {
341        assert!(cap > 1, "capacity must be > 1");
342
343        let state = Arc::new(RecordingState {
344            key: Key::new(),
345            active: Mutex::new(HashMap::new()),
346            spool: Spool::new(cap),
347            seq: AtomicUsize::new(0),
348        });
349        assert!(
350            recorder_state
351                .recordings
352                .insert(state.key.as_key_ref(), Arc::clone(&state))
353                .is_none(),
354            "non-unique key"
355        );
356
357        Self {
358            state,
359            recorder_state,
360        }
361    }
362
363    /// Return a span, which will record events to this recording when entered.
364    pub fn span(&self) -> Span {
365        span!(
366            Level::INFO,
367            "recording",
368            recording = self.recording_key(),
369            recorder = self.recorder_key(),
370        )
371    }
372
373    /// Retrieve the tail of the recorded event log,
374    /// up to the capacity of the recording.
375    pub fn tail(&self) -> Vec<Event> {
376        self.state.spool.tail()
377    }
378
379    /// Return the set of currently active span stacks. The metadata has
380    /// enough information to recover a sparse stack trace for the trace.
381    pub fn stacks(&self) -> Vec<Vec<&'static Metadata<'static>>> {
382        let snapshot = self.state.active.lock().unwrap().clone();
383
384        let parents: HashSet<Id> = snapshot
385            .iter()
386            .filter_map(|(_, (_, parent))| parent.clone())
387            .collect();
388
389        snapshot
390            .iter()
391            .filter_map(|(id, _)| {
392                if parents.contains(id) {
393                    None
394                } else {
395                    Some(id.clone())
396                }
397            })
398            .map(|id| {
399                let mut stack = Vec::new();
400                let mut parent: Option<Id> = Some(id);
401                while let Some(id) = parent {
402                    let Some((meta, next_parent)) = snapshot.get(&id) else {
403                        break;
404                    };
405                    stack.push(*meta);
406                    parent = next_parent.clone();
407                }
408                stack
409            })
410            .collect()
411    }
412
413    fn recording_key_ref(&self) -> KeyRef {
414        self.state.key.as_key_ref()
415    }
416
417    fn recording_key(&self) -> u64 {
418        self.recording_key_ref().into()
419    }
420
421    fn recorder_key(&self) -> u64 {
422        self.recorder_state.key.as_key_ref().into()
423    }
424}
425
426impl Drop for Recording {
427    fn drop(&mut self) {
428        assert!(
429            self.recorder_state
430                .recordings
431                .remove(&self.state.key.as_key_ref())
432                .is_some(),
433            "missing recording"
434        );
435    }
436}
437
438#[derive(Debug)]
439struct RecordingState {
440    active: Mutex<HashMap<Id, (&'static Metadata<'static>, Option<Id>)>>,
441    key: Key,
442    seq: AtomicUsize,
443    spool: Spool<Event>,
444}
445
446/// A recorder captures events from a [`tracing::span`] and records them
447/// to a [`mpsc::UnboundedSender`]. In order to record events, the recorder's
448/// layer ([`Recorder::layer`]) must be installed into the relevant tracing
449/// subscriber.
450pub struct Recorder {
451    state: Arc<RecorderState>,
452}
453
454#[derive(Debug)]
455struct RecorderState {
456    key: Key,
457    recordings: Arc<DashMap<KeyRef, Arc<RecordingState>>>,
458    pool: Pool<Event>,
459}
460
461impl Recorder {
462    /// Create a new recorder.
463    pub fn new() -> Self {
464        let state = Arc::new(RecorderState {
465            key: Key::new(),
466            recordings: Arc::new(DashMap::new()),
467            pool: Pool::new(1024),
468        });
469        Self { state }
470    }
471
472    /// Create a new recording that can be used to selective capture
473    /// events and span traces.
474    pub fn record(&self, cap: usize) -> Recording {
475        Recording::new(cap, Arc::clone(&self.state))
476    }
477
478    /// The layer associated with this recorder. This layer must be
479    /// installed into the relevant tracing subscriber in order to
480    /// record events.
481    pub fn layer(&self) -> RecorderLayer {
482        RecorderLayer {
483            state: Arc::clone(&self.state),
484        }
485    }
486}
487
488/// The type of layer used by [`Recorder`].
489pub struct RecorderLayer {
490    state: Arc<RecorderState>,
491}
492
493impl RecorderLayer {
494    /// Return an iterator over all the [`KeyRef`]s for the given scope.
495    fn iter_recordings<'a, S>(
496        &'a self,
497        scope: Scope<'a, S>,
498    ) -> impl Iterator<Item = dashmap::mapref::one::Ref<'a, KeyRef, Arc<RecordingState>>> + 'a
499    where
500        S: Subscriber + for<'lookup> LookupSpan<'lookup>,
501    {
502        // We can provide deadlock freedom here because we always iterate in the same order:
503        // from root to leaf; thus there can be no cycles.
504        scope
505            .from_root()
506            .filter_map(|span| match span.extensions().get::<(KeyRef, KeyRef)>() {
507                Some((recording_key_ref, recorder_key_ref))
508                    if recorder_key_ref == &self.state.key.as_key_ref() =>
509                {
510                    Some(*recording_key_ref)
511                }
512                _ => None,
513            })
514            // Spans can outlive recording, which may no longer exist.
515            .filter_map(|key_ref| self.state.recordings.get(&key_ref))
516    }
517}
518
519impl<S: Subscriber> Layer<S> for RecorderLayer
520where
521    S: for<'span> LookupSpan<'span>,
522{
523    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
524        let mut visitor = RecordingKeysVisitor::new();
525
526        attrs.record(&mut visitor);
527
528        if let Some(keys) = visitor.keys() {
529            if let Some(span) = ctx.span(id) {
530                let mut extensions: tracing_subscriber::registry::ExtensionsMut<'_> =
531                    span.extensions_mut();
532                extensions.insert(keys);
533            }
534        }
535    }
536
537    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
538        let Some(scope) = ctx.event_scope(event) else {
539            return;
540        };
541        for state in self.iter_recordings(scope) {
542            let mut recorded = self.state.pool.get();
543            let seq = state.seq.fetch_add(1, Ordering::Relaxed);
544            recorded.reset(SystemTime::now(), event.metadata(), seq);
545            event.record(&mut recorded);
546            state.spool.push(recorded);
547        }
548    }
549
550    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
551        let Some(span) = ctx.span(id) else { return };
552
553        for state in self.iter_recordings(span.scope()) {
554            let mut active = state.active.lock().unwrap();
555            active.insert(id.clone(), (span.metadata(), span.parent().map(|o| o.id())));
556        }
557    }
558
559    fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
560        let Some(span) = ctx.span(id) else { return };
561
562        for state in self.iter_recordings(span.scope()) {
563            let mut active = state.active.lock().unwrap();
564            active.remove(id);
565        }
566    }
567}
568
569/// A visitor to pick out the recording key from spans.
570struct RecordingKeysVisitor {
571    recording_key: Option<KeyRef>,
572    recorder_key: Option<KeyRef>,
573}
574
575impl RecordingKeysVisitor {
576    fn new() -> Self {
577        Self {
578            recording_key: None,
579            recorder_key: None,
580        }
581    }
582
583    fn keys(&self) -> Option<(KeyRef, KeyRef)> {
584        match (self.recording_key, self.recorder_key) {
585            (Some(recording_key), Some(recorder_key)) => Some((recording_key, recorder_key)),
586            _ => None,
587        }
588    }
589}
590
591impl Visit for RecordingKeysVisitor {
592    fn record_u64(&mut self, field: &Field, value: u64) {
593        if field.name() == "recording" {
594            self.recording_key = Some(value.into());
595        } else if field.name() == "recorder" {
596            self.recorder_key = Some(value.into());
597        }
598    }
599
600    fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
601}
602
603#[cfg(test)]
604mod tests {
605    use serde_json::json;
606    use tracing::Level;
607    use tracing::info;
608    use tracing::span;
609    use tracing_subscriber::Registry;
610    use tracing_subscriber::prelude::*;
611
612    use super::*;
613
614    #[test]
615    fn test_key() {
616        let key = Key::new();
617        assert_eq!(key.as_key_ref(), key.as_key_ref());
618
619        assert_ne!(Key::new().as_key_ref(), Key::new().as_key_ref());
620    }
621
622    #[test]
623    fn test_events_are_recorded() {
624        let recorder = Recorder::new();
625        let recording = recorder.record(10);
626        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
627            let span = recording.span();
628            let _guard = span.enter();
629            info!("This event should be recorded");
630            info!("another event");
631        });
632
633        let events = recording.tail();
634        assert_eq!(events.len(), 2);
635        assert_eq!(
636            events[0].json_value(),
637            json!({
638                "message": "This event should be recorded"
639            })
640        );
641        assert_eq!(
642            events[1].json_value(),
643            json!({
644                "message": "another event"
645            })
646        );
647    }
648
649    #[test]
650    fn test_last_n_entries() {
651        let recorder = Recorder::new();
652        let recording = recorder.record(5);
653        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
654            let span = recording.span();
655            let _guard = span.enter();
656            for i in 0..10 {
657                info!("event {}", i);
658            }
659        });
660
661        let events = recording.tail();
662        assert_eq!(events.len(), 5);
663        for (i, event) in events.into_iter().enumerate() {
664            assert_eq!(
665                event.json_value(),
666                json!({
667                    "message": format!("event {}", i + 5)
668                })
669            );
670            assert_eq!(event.seq, i + 5);
671        }
672    }
673
674    #[test]
675    fn test_events_outside_span_are_not_recorded() {
676        let recorder = Recorder::new();
677        let recording = recorder.record(10);
678        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
679            let _span = recording.span(); // not entered
680            info!("This event should NOT be recorded");
681        });
682        assert_eq!(recording.tail().len(), 0);
683    }
684
685    #[test]
686    fn test_child_span_inherits_recorder() {
687        let recorder = Recorder::new();
688        let recording = recorder.record(10);
689
690        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
691            let outer = recording.span();
692            let _outer_guard = outer.enter();
693
694            {
695                let inner = span!(Level::INFO, "child_span");
696                let _inner_guard = inner.enter();
697
698                info!("Event from inner span");
699            }
700        });
701
702        let events = recording.tail();
703        assert_eq!(events.len(), 1);
704        assert_eq!(
705            events[0].json_value(),
706            json!({
707                "message": "Event from inner span"
708            })
709        );
710    }
711
712    #[test]
713    fn test_active_spans() {
714        let recorder = Recorder::new();
715        let recording = recorder.record(10);
716
717        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
718            let outer = recording.span();
719            let _outer_guard = outer.enter();
720
721            {
722                let inner = span!(Level::INFO, "child_span");
723                let _inner_guard = inner.enter();
724
725                assert_eq!(recording.stacks().len(), 1);
726                // TODO: possibly we should remove the 'recording' span from here.
727                assert_eq!(recording.stacks()[0][0].name(), "child_span");
728                assert_eq!(recording.stacks()[0][1].name(), "recording");
729            }
730        });
731    }
732}