hyperactor_telemetry/
recorder.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::fmt::Display;
13use std::mem::take;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::atomic::AtomicUsize;
17use std::sync::atomic::Ordering;
18use std::time::SystemTime;
19
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde::Serialize;
23use tracing::Level;
24use tracing::Metadata;
25use tracing::Span;
26use tracing::Subscriber;
27use tracing::field::Field;
28use tracing::field::Visit;
29use tracing::span;
30use tracing::span::Attributes;
31use tracing::span::Id;
32use tracing_subscriber::Layer;
33use tracing_subscriber::layer::Context;
34use tracing_subscriber::registry::LookupSpan;
35use tracing_subscriber::registry::Scope;
36
37use crate::SPAN_FIELD_RECORDING;
38use crate::pool::Pool;
39use crate::spool::Spool;
40
41/// Key is provides a guaranteed unique [`KeyRef`] for as long as
42/// the [`Key`] is alive.
43#[derive(Debug)]
44struct Key(
45    // We have to use a u8 (as opposed to, e.g., ()) here, as
46    // Rust gets clever about zero size objects, and allocates
47    // them all to 0x1.
48    Box<u8>,
49);
50
51impl Key {
52    /// Create a new unique key.
53    fn new() -> Self {
54        Self(Box::new(0u8))
55    }
56
57    /// Produce a reference to this key. The reference is guaranteed to be
58    /// unique for the lifetime of this key. The user must be careful to
59    /// ensure that there are no outstanding [`KeyRef`]s after its corresponding
60    /// [`Key`] is dropped.
61    fn as_key_ref(&self) -> KeyRef {
62        KeyRef(&*self.0 as *const _)
63    }
64}
65
66/// A reference to a [`Key`]. KeyRefs may be used as keys in a map,
67/// and are guaranteed to be unique for the lifetime of the [`Key`].
68#[derive(Debug, PartialEq, Eq, Hash, Clone)]
69struct KeyRef(*const u8);
70
71// SAFETY: We only ever pass these around as values. They are never
72// dereferenced.
73unsafe impl Send for KeyRef {}
74// SAFETY: We only ever pass these around as values. They are never
75// dereferenced.
76unsafe impl Sync for KeyRef {}
77
78impl Copy for KeyRef {}
79
80impl From<Key> for KeyRef {
81    fn from(value: Key) -> Self {
82        Self(&*value.0 as *const _)
83    }
84}
85
86impl From<u64> for KeyRef {
87    fn from(value: u64) -> Self {
88        Self(value as *const _)
89    }
90}
91
92impl From<KeyRef> for u64 {
93    fn from(value: KeyRef) -> Self {
94        value.0 as u64
95    }
96}
97
98/// A value that can appear in an entry. This custom
99/// representation serves two purposes: 1) internally, to
100/// to manage string buffer reuse; and 2) as a serialization
101/// format that may be used with bincode.
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
103pub enum Value {
104    /// A String.
105    String(String),
106    /// An i64.
107    I64(i64),
108    /// A u64.
109    U64(u64),
110    /// An f64.
111    F64(f64),
112    /// A bool.
113    Bool(bool),
114}
115
116impl Value {
117    /// Convert this Value to a JSON value
118    pub fn to_json(&self) -> serde_json::Value {
119        match &self {
120            Self::String(s) => serde_json::Value::String(s.clone()),
121            Self::I64(i) => serde_json::Value::Number(serde_json::Number::from(*i)),
122            Self::U64(u) => serde_json::Value::Number(serde_json::Number::from(*u)),
123            Self::F64(f) => serde_json::Value::Number(serde_json::Number::from_f64(*f).unwrap()),
124            Self::Bool(b) => serde_json::Value::Bool(*b),
125        }
126    }
127}
128
129impl Default for Value {
130    fn default() -> Self {
131        Value::Bool(false)
132    }
133}
134
135#[derive(Debug, Default, Clone)]
136struct Entry {
137    name: &'static str,
138    value: Value,
139    buffer: Option<String>,
140}
141
142impl Entry {
143    fn reset(&mut self) {
144        if let Value::String(mut s) = take(&mut self.value) {
145            s.clear();
146            self.buffer = Some(s);
147        }
148    }
149
150    fn set_str(&mut self, name: &'static str, value: &str) {
151        self.reset();
152        let mut buf = self.buffer.take().unwrap_or_else(String::new);
153        buf.clear();
154        buf.push_str(value);
155        self.name = name;
156        self.value = Value::String(buf);
157    }
158
159    fn set_error(&mut self, name: &'static str, value: &(dyn std::error::Error + 'static)) {
160        self.reset();
161        let mut buf = self.buffer.take().unwrap_or_else(String::new);
162
163        let mut formatter =
164            core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
165
166        Display::fmt(value, &mut formatter)
167            .expect("a Display implementation returned an error unexpectedly");
168
169        self.name = name;
170        self.value = Value::String(buf);
171    }
172
173    fn set_debug(&mut self, name: &'static str, value: &dyn std::fmt::Debug) {
174        self.reset();
175        let mut buf = self.buffer.take().unwrap_or_else(String::new);
176
177        let mut formatter =
178            core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
179
180        Debug::fmt(value, &mut formatter)
181            .expect("a Debug implementation returned an error unexpectedly");
182
183        self.name = name;
184        self.value = Value::String(buf);
185    }
186
187    fn set_i64(&mut self, name: &'static str, value: i64) {
188        self.reset();
189        self.name = name;
190        self.value = Value::I64(value);
191    }
192
193    fn set_u64(&mut self, name: &'static str, value: u64) {
194        self.reset();
195        self.name = name;
196        self.value = Value::U64(value);
197    }
198
199    fn set_f64(&mut self, name: &'static str, value: f64) {
200        self.reset();
201        self.name = name;
202        self.value = Value::F64(value);
203    }
204
205    fn set_bool(&mut self, name: &'static str, value: bool) {
206        self.reset();
207        self.name = name;
208        self.value = Value::Bool(value);
209    }
210}
211
212/// An event that has been recorded by a [`Recorder`].
213#[derive(Debug, Clone)]
214pub struct Event {
215    /// The time at which the event was recorded.
216    pub time: SystemTime,
217
218    /// The metadata for the event.
219    pub metadata: &'static Metadata<'static>,
220
221    /// All other (structured) fields defined in the event.
222    fields: Vec<Entry>,
223
224    /// The number of fields defined.
225    num_fields: usize,
226
227    /// A monotonically increasing sequence number.
228    pub seq: usize,
229}
230
231impl Event {
232    fn reset(&mut self, time: SystemTime, metadata: &'static Metadata<'static>, seq: usize) {
233        self.time = time;
234        self.metadata = metadata;
235        self.num_fields = 0;
236        self.seq = seq;
237    }
238
239    fn next_field(&mut self) -> &mut Entry {
240        while self.fields.len() <= self.num_fields {
241            self.fields.push(Default::default());
242        }
243        let field = self.fields.get_mut(self.num_fields).unwrap();
244        field.reset();
245        self.num_fields += 1;
246        field
247    }
248
249    /// The fields of this event, structed as a JSON value.
250    pub fn json_value(&self) -> serde_json::Value {
251        serde_json::Value::Object(self.json_fields())
252    }
253
254    /// The fields of this event, structured as a JSON map.
255    pub fn json_fields(&self) -> serde_json::Map<String, serde_json::Value> {
256        let mut map = serde_json::Map::new();
257        for field in &self.fields {
258            map.insert(field.name.to_string(), field.value.to_json());
259        }
260        map
261    }
262
263    /// The fields of this event, using the [`Value`] representation.
264    pub fn fields(&self) -> Vec<(String, Value)> {
265        self.fields
266            .iter()
267            .map(|field| (field.name.to_string(), field.value.clone()))
268            .collect()
269    }
270}
271
272impl Default for Event {
273    fn default() -> Self {
274        static __CALLSITE: tracing::__macro_support::MacroCallsite = tracing::callsite2! {
275            name: "",
276            kind: tracing::metadata::Kind::SPAN,
277            level: Level::DEBUG,
278            fields:
279        };
280        static DEFAULT_METADATA: Metadata<'static> = tracing::metadata! {
281            name: "",
282            target: "",
283            level: Level::DEBUG,
284            fields: &[],
285            callsite: &__CALLSITE,
286            kind: tracing::metadata::Kind::SPAN,
287        };
288        Self {
289            time: SystemTime::now(),
290            metadata: &DEFAULT_METADATA,
291            fields: Vec::new(),
292            num_fields: 0,
293            seq: 0,
294        }
295    }
296}
297
298/// Visitor to populate an [`Event`] from a [`tracing::Event`].
299impl Visit for Event {
300    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
301        self.next_field().set_i64(field.name(), value);
302    }
303
304    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
305        self.next_field().set_u64(field.name(), value);
306    }
307
308    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
309        self.next_field().set_f64(field.name(), value);
310    }
311
312    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
313        self.next_field().set_bool(field.name(), value);
314    }
315
316    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
317        self.next_field().set_str(field.name(), value);
318    }
319
320    fn record_error(
321        &mut self,
322        field: &tracing::field::Field,
323        value: &(dyn std::error::Error + 'static),
324    ) {
325        self.next_field().set_error(field.name(), value);
326    }
327
328    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
329        self.next_field().set_debug(field.name(), value);
330    }
331}
332
333/// A recording of events from a [`Recorder`].
334#[derive(Debug)]
335pub struct Recording {
336    state: Arc<RecordingState>,
337    recorder_state: Arc<RecorderState>,
338}
339
340impl Recording {
341    fn new(cap: usize, recorder_state: Arc<RecorderState>) -> Self {
342        assert!(cap > 1, "capacity must be > 1");
343
344        let state = Arc::new(RecordingState {
345            key: Key::new(),
346            active: Mutex::new(HashMap::new()),
347            spool: Spool::new(cap),
348            seq: AtomicUsize::new(0),
349        });
350        assert!(
351            recorder_state
352                .recordings
353                .insert(state.key.as_key_ref(), Arc::clone(&state))
354                .is_none(),
355            "non-unique key"
356        );
357
358        Self {
359            state,
360            recorder_state,
361        }
362    }
363
364    /// Return a span, which will record events to this recording when entered.
365    ///
366    /// Uses `parent: None` so that this span is always a root span. Without this,
367    /// contextual parentage would cause a spawned actor's recording span to become
368    /// a child of the spawning actor's recording span (via `Instance::start()`'s
369    /// `.instrument(Span::current())`), causing events to leak into the parent
370    /// actor's flight recorder.
371    pub fn span(&self) -> Span {
372        span!(
373            parent: None,
374            Level::INFO,
375            SPAN_FIELD_RECORDING,
376            recording = self.recording_key(),
377            recorder = self.recorder_key(),
378        )
379    }
380
381    /// Retrieve the tail of the recorded event log,
382    /// up to the capacity of the recording.
383    pub fn tail(&self) -> Vec<Event> {
384        self.state.spool.tail()
385    }
386
387    /// Return the set of currently active span stacks. The metadata has
388    /// enough information to recover a sparse stack trace for the trace.
389    pub fn stacks(&self) -> Vec<Vec<&'static Metadata<'static>>> {
390        let snapshot = self.state.active.lock().unwrap().clone();
391
392        let parents: HashSet<Id> = snapshot
393            .iter()
394            .filter_map(|(_, (_, parent))| parent.clone())
395            .collect();
396
397        snapshot
398            .iter()
399            .filter_map(|(id, _)| {
400                if parents.contains(id) {
401                    None
402                } else {
403                    Some(id.clone())
404                }
405            })
406            .map(|id| {
407                let mut stack = Vec::new();
408                let mut parent: Option<Id> = Some(id);
409                while let Some(id) = parent {
410                    let Some((meta, next_parent)) = snapshot.get(&id) else {
411                        break;
412                    };
413                    stack.push(*meta);
414                    parent = next_parent.clone();
415                }
416                stack
417            })
418            .collect()
419    }
420
421    fn recording_key_ref(&self) -> KeyRef {
422        self.state.key.as_key_ref()
423    }
424
425    fn recording_key(&self) -> u64 {
426        self.recording_key_ref().into()
427    }
428
429    fn recorder_key(&self) -> u64 {
430        self.recorder_state.key.as_key_ref().into()
431    }
432}
433
434impl Drop for Recording {
435    fn drop(&mut self) {
436        assert!(
437            self.recorder_state
438                .recordings
439                .remove(&self.state.key.as_key_ref())
440                .is_some(),
441            "missing recording"
442        );
443    }
444}
445
446#[derive(Debug)]
447struct RecordingState {
448    active: Mutex<HashMap<Id, (&'static Metadata<'static>, Option<Id>)>>,
449    key: Key,
450    seq: AtomicUsize,
451    spool: Spool<Event>,
452}
453
454/// A recorder captures events from a [`tracing::span`] and records them
455/// to a [`mpsc::UnboundedSender`]. In order to record events, the recorder's
456/// layer ([`Recorder::layer`]) must be installed into the relevant tracing
457/// subscriber.
458pub struct Recorder {
459    state: Arc<RecorderState>,
460}
461
462#[derive(Debug)]
463struct RecorderState {
464    key: Key,
465    recordings: Arc<DashMap<KeyRef, Arc<RecordingState>>>,
466    pool: Pool<Event>,
467}
468
469impl Recorder {
470    /// Create a new recorder.
471    pub fn new() -> Self {
472        let state = Arc::new(RecorderState {
473            key: Key::new(),
474            recordings: Arc::new(DashMap::new()),
475            pool: Pool::new(1024),
476        });
477        Self { state }
478    }
479
480    /// Create a new recording that can be used to selective capture
481    /// events and span traces.
482    pub fn record(&self, cap: usize) -> Recording {
483        Recording::new(cap, Arc::clone(&self.state))
484    }
485
486    /// The layer associated with this recorder. This layer must be
487    /// installed into the relevant tracing subscriber in order to
488    /// record events.
489    pub fn layer(&self) -> RecorderLayer {
490        RecorderLayer {
491            state: Arc::clone(&self.state),
492        }
493    }
494}
495
496/// The type of layer used by [`Recorder`].
497pub struct RecorderLayer {
498    state: Arc<RecorderState>,
499}
500
501impl RecorderLayer {
502    /// Return an iterator over all the [`KeyRef`]s for the given scope.
503    fn iter_recordings<'a, S>(
504        &'a self,
505        scope: Scope<'a, S>,
506    ) -> impl Iterator<Item = dashmap::mapref::one::Ref<'a, KeyRef, Arc<RecordingState>>> + 'a
507    where
508        S: Subscriber + for<'lookup> LookupSpan<'lookup>,
509    {
510        // We can provide deadlock freedom here because we always iterate in the same order:
511        // from root to leaf; thus there can be no cycles.
512        scope
513            .from_root()
514            .filter_map(|span| match span.extensions().get::<(KeyRef, KeyRef)>() {
515                Some((recording_key_ref, recorder_key_ref))
516                    if recorder_key_ref == &self.state.key.as_key_ref() =>
517                {
518                    Some(*recording_key_ref)
519                }
520                _ => None,
521            })
522            // Spans can outlive recording, which may no longer exist.
523            .filter_map(|key_ref| self.state.recordings.get(&key_ref))
524    }
525}
526
527impl<S: Subscriber> Layer<S> for RecorderLayer
528where
529    S: for<'span> LookupSpan<'span>,
530{
531    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
532        let mut visitor = RecordingKeysVisitor::new();
533
534        attrs.record(&mut visitor);
535
536        if let Some(keys) = visitor.keys() {
537            if let Some(span) = ctx.span(id) {
538                let mut extensions: tracing_subscriber::registry::ExtensionsMut<'_> =
539                    span.extensions_mut();
540                extensions.insert(keys);
541            }
542        }
543    }
544
545    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
546        let Some(scope) = ctx.event_scope(event) else {
547            return;
548        };
549        for state in self.iter_recordings(scope) {
550            let mut recorded = self.state.pool.get();
551            let seq = state.seq.fetch_add(1, Ordering::Relaxed);
552            recorded.reset(SystemTime::now(), event.metadata(), seq);
553            event.record(&mut recorded);
554            state.spool.push(recorded);
555        }
556    }
557
558    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
559        let Some(span) = ctx.span(id) else { return };
560
561        for state in self.iter_recordings(span.scope()) {
562            let mut active = state.active.lock().unwrap();
563            active.insert(id.clone(), (span.metadata(), span.parent().map(|o| o.id())));
564        }
565    }
566
567    fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
568        let Some(span) = ctx.span(id) else { return };
569
570        for state in self.iter_recordings(span.scope()) {
571            let mut active = state.active.lock().unwrap();
572            active.remove(id);
573        }
574    }
575}
576
577/// A visitor to pick out the recording key from spans.
578struct RecordingKeysVisitor {
579    recording_key: Option<KeyRef>,
580    recorder_key: Option<KeyRef>,
581}
582
583impl RecordingKeysVisitor {
584    fn new() -> Self {
585        Self {
586            recording_key: None,
587            recorder_key: None,
588        }
589    }
590
591    fn keys(&self) -> Option<(KeyRef, KeyRef)> {
592        match (self.recording_key, self.recorder_key) {
593            (Some(recording_key), Some(recorder_key)) => Some((recording_key, recorder_key)),
594            _ => None,
595        }
596    }
597}
598
599impl Visit for RecordingKeysVisitor {
600    fn record_u64(&mut self, field: &Field, value: u64) {
601        if field.name() == "recording" {
602            self.recording_key = Some(value.into());
603        } else if field.name() == "recorder" {
604            self.recorder_key = Some(value.into());
605        }
606    }
607
608    fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
609}
610
611#[cfg(test)]
612mod tests {
613    use serde_json::json;
614    use tracing::Level;
615    use tracing::info;
616    use tracing::span;
617    use tracing_subscriber::Registry;
618    use tracing_subscriber::prelude::*;
619
620    use super::*;
621
622    #[test]
623    fn test_key() {
624        let key = Key::new();
625        assert_eq!(key.as_key_ref(), key.as_key_ref());
626
627        assert_ne!(Key::new().as_key_ref(), Key::new().as_key_ref());
628    }
629
630    #[test]
631    fn test_events_are_recorded() {
632        let recorder = Recorder::new();
633        let recording = recorder.record(10);
634        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
635            let span = recording.span();
636            let _guard = span.enter();
637            info!("This event should be recorded");
638            info!("another event");
639        });
640
641        let events = recording.tail();
642        assert_eq!(events.len(), 2);
643        assert_eq!(
644            events[0].json_value(),
645            json!({
646                "message": "This event should be recorded"
647            })
648        );
649        assert_eq!(
650            events[1].json_value(),
651            json!({
652                "message": "another event"
653            })
654        );
655    }
656
657    #[test]
658    fn test_last_n_entries() {
659        let recorder = Recorder::new();
660        let recording = recorder.record(5);
661        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
662            let span = recording.span();
663            let _guard = span.enter();
664            for i in 0..10 {
665                info!("event {}", i);
666            }
667        });
668
669        let events = recording.tail();
670        assert_eq!(events.len(), 5);
671        for (i, event) in events.into_iter().enumerate() {
672            assert_eq!(
673                event.json_value(),
674                json!({
675                    "message": format!("event {}", i + 5)
676                })
677            );
678            assert_eq!(event.seq, i + 5);
679        }
680    }
681
682    #[test]
683    fn test_events_outside_span_are_not_recorded() {
684        let recorder = Recorder::new();
685        let recording = recorder.record(10);
686        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
687            let _span = recording.span(); // not entered
688            info!("This event should NOT be recorded");
689        });
690        assert_eq!(recording.tail().len(), 0);
691    }
692
693    #[test]
694    fn test_child_span_inherits_recorder() {
695        let recorder = Recorder::new();
696        let recording = recorder.record(10);
697
698        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
699            let outer = recording.span();
700            let _outer_guard = outer.enter();
701
702            {
703                let inner = span!(Level::INFO, "child_span");
704                let _inner_guard = inner.enter();
705
706                info!("Event from inner span");
707            }
708        });
709
710        let events = recording.tail();
711        assert_eq!(events.len(), 1);
712        assert_eq!(
713            events[0].json_value(),
714            json!({
715                "message": "Event from inner span"
716            })
717        );
718    }
719
720    // Ensures that nested tracing recordings remain isolated: events
721    // emitted while span A is active are captured only by recording
722    // A, and events emitted inside nested span B are captured only by
723    // recording B. This verifies that the recorder correctly
724    // attributes events to the currently entered span without
725    // cross-contamination.
726    #[test]
727    fn test_recording_spans_are_isolated_across_nested_recordings() {
728        let recorder = Recorder::new();
729        let recording_a = recorder.record(10);
730        let recording_b = recorder.record(10);
731
732        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
733            let span_a = recording_a.span();
734            let _guard_a = span_a.enter();
735
736            info!("event_from_a");
737
738            {
739                let span_b = recording_b.span();
740                let _guard_b = span_b.enter();
741
742                info!("event_from_b");
743            }
744        });
745
746        let events_a: Vec<String> = recording_a
747            .tail()
748            .iter()
749            .map(|e| e.json_value().to_string())
750            .collect();
751        let events_b: Vec<String> = recording_b
752            .tail()
753            .iter()
754            .map(|e| e.json_value().to_string())
755            .collect();
756
757        let a_joined = events_a.join(" ");
758        let b_joined = events_b.join(" ");
759
760        assert!(
761            a_joined.contains("event_from_a"),
762            "recording A should contain event_from_a"
763        );
764        assert!(
765            !a_joined.contains("event_from_b"),
766            "recording A should NOT contain event_from_b"
767        );
768        assert!(
769            b_joined.contains("event_from_b"),
770            "recording B should contain event_from_b"
771        );
772        assert!(
773            !b_joined.contains("event_from_a"),
774            "recording B should NOT contain event_from_a"
775        );
776    }
777
778    #[test]
779    fn test_active_spans() {
780        let recorder = Recorder::new();
781        let recording = recorder.record(10);
782
783        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
784            let outer = recording.span();
785            let _outer_guard = outer.enter();
786
787            {
788                let inner = span!(Level::INFO, "child_span");
789                let _inner_guard = inner.enter();
790
791                assert_eq!(recording.stacks().len(), 1);
792                // TODO: possibly we should remove the 'recording' span from here.
793                assert_eq!(recording.stacks()[0][0].name(), "child_span");
794                assert_eq!(recording.stacks()[0][1].name(), "recording");
795            }
796        });
797    }
798}