Skip to main content

hyperactor_telemetry/
recorder.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::fmt::Display;
13use std::mem::take;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::sync::atomic::AtomicUsize;
17use std::sync::atomic::Ordering;
18use std::time::SystemTime;
19
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde::Serialize;
23use tracing::Level;
24use tracing::Metadata;
25use tracing::Span;
26use tracing::Subscriber;
27use tracing::field::Field;
28use tracing::field::Visit;
29use tracing::span;
30use tracing::span::Attributes;
31use tracing::span::Id;
32use tracing_subscriber::Layer;
33use tracing_subscriber::layer::Context;
34use tracing_subscriber::registry::LookupSpan;
35use tracing_subscriber::registry::Scope;
36
37use crate::SPAN_FIELD_RECORDING;
38use crate::pool::Pool;
39use crate::spool::Spool;
40
41/// Key is provides a guaranteed unique [`KeyRef`] for as long as
42/// the [`Key`] is alive.
43#[derive(Debug)]
44struct Key(
45    // We have to use a u8 (as opposed to, e.g., ()) here, as
46    // Rust gets clever about zero size objects, and allocates
47    // them all to 0x1.
48    Box<u8>,
49);
50
51impl Key {
52    /// Create a new unique key.
53    fn new() -> Self {
54        Self(Box::new(0u8))
55    }
56
57    /// Produce a reference to this key. The reference is guaranteed to be
58    /// unique for the lifetime of this key. The user must be careful to
59    /// ensure that there are no outstanding [`KeyRef`]s after its corresponding
60    /// [`Key`] is dropped.
61    fn as_key_ref(&self) -> KeyRef {
62        KeyRef(&*self.0 as *const _)
63    }
64}
65
66/// A reference to a [`Key`]. KeyRefs may be used as keys in a map,
67/// and are guaranteed to be unique for the lifetime of the [`Key`].
68#[derive(Debug, PartialEq, Eq, Hash, Clone)]
69struct KeyRef(*const u8);
70
71// SAFETY: We only ever pass these around as values. They are never
72// dereferenced.
73unsafe impl Send for KeyRef {}
74// SAFETY: We only ever pass these around as values. They are never
75// dereferenced.
76unsafe impl Sync for KeyRef {}
77
78impl Copy for KeyRef {}
79
80impl From<Key> for KeyRef {
81    fn from(value: Key) -> Self {
82        Self(&*value.0 as *const _)
83    }
84}
85
86impl From<u64> for KeyRef {
87    fn from(value: u64) -> Self {
88        Self(value as *const _)
89    }
90}
91
92impl From<KeyRef> for u64 {
93    fn from(value: KeyRef) -> Self {
94        value.0 as u64
95    }
96}
97
98/// A value that can appear in an entry. This custom
99/// representation serves two purposes: 1) internally, to
100/// to manage string buffer reuse; and 2) as a serialization
101/// format that may be used with bincode.
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
103pub enum Value {
104    /// A String.
105    String(String),
106    /// An i64.
107    I64(i64),
108    /// A u64.
109    U64(u64),
110    /// An f64.
111    F64(f64),
112    /// A bool.
113    Bool(bool),
114}
115
116impl Value {
117    /// Convert this Value to a JSON value
118    pub fn to_json(&self) -> serde_json::Value {
119        match &self {
120            Self::String(s) => serde_json::Value::String(s.clone()),
121            Self::I64(i) => serde_json::Value::Number(serde_json::Number::from(*i)),
122            Self::U64(u) => serde_json::Value::Number(serde_json::Number::from(*u)),
123            Self::F64(f) => serde_json::Value::Number(serde_json::Number::from_f64(*f).unwrap()),
124            Self::Bool(b) => serde_json::Value::Bool(*b),
125        }
126    }
127}
128
129impl Default for Value {
130    fn default() -> Self {
131        Value::Bool(false)
132    }
133}
134
135#[derive(Debug, Default, Clone)]
136struct Entry {
137    name: &'static str,
138    value: Value,
139    buffer: Option<String>,
140}
141
142impl Entry {
143    fn reset(&mut self) {
144        if let Value::String(mut s) = take(&mut self.value) {
145            s.clear();
146            self.buffer = Some(s);
147        }
148    }
149
150    fn set_str(&mut self, name: &'static str, value: &str) {
151        self.reset();
152        let mut buf = self.buffer.take().unwrap_or_default();
153        buf.clear();
154        buf.push_str(value);
155        self.name = name;
156        self.value = Value::String(buf);
157    }
158
159    fn set_error(&mut self, name: &'static str, value: &(dyn std::error::Error + 'static)) {
160        self.reset();
161        let mut buf = self.buffer.take().unwrap_or_default();
162
163        let mut formatter =
164            core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
165
166        Display::fmt(value, &mut formatter)
167            .expect("a Display implementation returned an error unexpectedly");
168
169        self.name = name;
170        self.value = Value::String(buf);
171    }
172
173    fn set_debug(&mut self, name: &'static str, value: &dyn std::fmt::Debug) {
174        self.reset();
175        let mut buf = self.buffer.take().unwrap_or_default();
176
177        let mut formatter =
178            core::fmt::Formatter::new(&mut buf, core::fmt::FormattingOptions::new());
179
180        Debug::fmt(value, &mut formatter)
181            .expect("a Debug implementation returned an error unexpectedly");
182
183        self.name = name;
184        self.value = Value::String(buf);
185    }
186
187    fn set_i64(&mut self, name: &'static str, value: i64) {
188        self.reset();
189        self.name = name;
190        self.value = Value::I64(value);
191    }
192
193    fn set_u64(&mut self, name: &'static str, value: u64) {
194        self.reset();
195        self.name = name;
196        self.value = Value::U64(value);
197    }
198
199    fn set_f64(&mut self, name: &'static str, value: f64) {
200        self.reset();
201        self.name = name;
202        self.value = Value::F64(value);
203    }
204
205    fn set_bool(&mut self, name: &'static str, value: bool) {
206        self.reset();
207        self.name = name;
208        self.value = Value::Bool(value);
209    }
210}
211
212/// An event that has been recorded by a [`Recorder`].
213#[derive(Debug, Clone)]
214pub struct Event {
215    /// The time at which the event was recorded.
216    pub time: SystemTime,
217
218    /// The metadata for the event.
219    pub metadata: &'static Metadata<'static>,
220
221    /// All other (structured) fields defined in the event.
222    fields: Vec<Entry>,
223
224    /// The number of fields defined.
225    num_fields: usize,
226
227    /// A monotonically increasing sequence number.
228    pub seq: usize,
229}
230
231impl Event {
232    fn reset(&mut self, time: SystemTime, metadata: &'static Metadata<'static>, seq: usize) {
233        self.time = time;
234        self.metadata = metadata;
235        self.num_fields = 0;
236        self.seq = seq;
237    }
238
239    fn next_field(&mut self) -> &mut Entry {
240        while self.fields.len() <= self.num_fields {
241            self.fields.push(Default::default());
242        }
243        let field = self.fields.get_mut(self.num_fields).unwrap();
244        field.reset();
245        self.num_fields += 1;
246        field
247    }
248
249    /// The fields of this event, structed as a JSON value.
250    pub fn json_value(&self) -> serde_json::Value {
251        serde_json::Value::Object(self.json_fields())
252    }
253
254    /// The fields of this event, structured as a JSON map.
255    pub fn json_fields(&self) -> serde_json::Map<String, serde_json::Value> {
256        let mut map = serde_json::Map::new();
257        for field in &self.fields {
258            map.insert(field.name.to_string(), field.value.to_json());
259        }
260        map
261    }
262
263    /// The fields of this event, using the [`Value`] representation.
264    pub fn fields(&self) -> Vec<(String, Value)> {
265        self.fields
266            .iter()
267            .map(|field| (field.name.to_string(), field.value.clone()))
268            .collect()
269    }
270}
271
272impl Default for Event {
273    fn default() -> Self {
274        static __CALLSITE: tracing::__macro_support::MacroCallsite = tracing::callsite2! {
275            name: "",
276            kind: tracing::metadata::Kind::SPAN,
277            level: Level::DEBUG,
278            fields:
279        };
280        static DEFAULT_METADATA: Metadata<'static> = tracing::metadata! {
281            name: "",
282            target: "",
283            level: Level::DEBUG,
284            fields: &[],
285            callsite: &__CALLSITE,
286            kind: tracing::metadata::Kind::SPAN,
287        };
288        Self {
289            time: SystemTime::now(),
290            metadata: &DEFAULT_METADATA,
291            fields: Vec::new(),
292            num_fields: 0,
293            seq: 0,
294        }
295    }
296}
297
298/// Visitor to populate an [`Event`] from a [`tracing::Event`].
299impl Visit for Event {
300    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
301        self.next_field().set_i64(field.name(), value);
302    }
303
304    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
305        self.next_field().set_u64(field.name(), value);
306    }
307
308    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
309        self.next_field().set_f64(field.name(), value);
310    }
311
312    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
313        self.next_field().set_bool(field.name(), value);
314    }
315
316    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
317        self.next_field().set_str(field.name(), value);
318    }
319
320    fn record_error(
321        &mut self,
322        field: &tracing::field::Field,
323        value: &(dyn std::error::Error + 'static),
324    ) {
325        self.next_field().set_error(field.name(), value);
326    }
327
328    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
329        self.next_field().set_debug(field.name(), value);
330    }
331}
332
333/// A recording of events from a [`Recorder`].
334#[derive(Debug)]
335pub struct Recording {
336    state: Arc<RecordingState>,
337    recorder_state: Arc<RecorderState>,
338}
339
340impl Recording {
341    fn new(cap: usize, recorder_state: Arc<RecorderState>) -> Self {
342        assert!(cap > 1, "capacity must be > 1");
343
344        let state = Arc::new(RecordingState {
345            key: Key::new(),
346            active: Mutex::new(HashMap::new()),
347            spool: Spool::new(cap),
348            seq: AtomicUsize::new(0),
349        });
350        assert!(
351            recorder_state
352                .recordings
353                .insert(state.key.as_key_ref(), Arc::clone(&state))
354                .is_none(),
355            "non-unique key"
356        );
357
358        Self {
359            state,
360            recorder_state,
361        }
362    }
363
364    /// Return a span, which will record events to this recording when entered.
365    ///
366    /// The `subject` identifies the entity (actor, proc, etc.) that events
367    /// within this span pertain to. It is rendered as a prefix in glog output.
368    ///
369    /// Uses `parent: None` so that this span is always a root span. Without this,
370    /// contextual parentage would cause a spawned actor's recording span to become
371    /// a child of the spawning actor's recording span (via `Instance::start()`'s
372    /// `.instrument(Span::current())`), causing events to leak into the parent
373    /// actor's flight recorder.
374    pub fn span(&self, subject: &str) -> Span {
375        span!(
376            parent: None,
377            Level::INFO,
378            SPAN_FIELD_RECORDING,
379            recording = self.recording_key(),
380            recorder = self.recorder_key(),
381            subject = subject,
382        )
383    }
384
385    /// Retrieve the tail of the recorded event log,
386    /// up to the capacity of the recording.
387    pub fn tail(&self) -> Vec<Event> {
388        self.state.spool.tail()
389    }
390
391    /// Return the set of currently active span stacks. The metadata has
392    /// enough information to recover a sparse stack trace for the trace.
393    pub fn stacks(&self) -> Vec<Vec<&'static Metadata<'static>>> {
394        let snapshot = self.state.active.lock().unwrap().clone();
395
396        let parents: HashSet<Id> = snapshot
397            .iter()
398            .filter_map(|(_, (_, parent))| parent.clone())
399            .collect();
400
401        snapshot
402            .keys()
403            .filter_map(|id| {
404                if parents.contains(id) {
405                    None
406                } else {
407                    Some(id.clone())
408                }
409            })
410            .map(|id| {
411                let mut stack = Vec::new();
412                let mut parent: Option<Id> = Some(id);
413                while let Some(id) = parent {
414                    let Some((meta, next_parent)) = snapshot.get(&id) else {
415                        break;
416                    };
417                    stack.push(*meta);
418                    parent = next_parent.clone();
419                }
420                stack
421            })
422            .collect()
423    }
424
425    fn recording_key_ref(&self) -> KeyRef {
426        self.state.key.as_key_ref()
427    }
428
429    fn recording_key(&self) -> u64 {
430        self.recording_key_ref().into()
431    }
432
433    fn recorder_key(&self) -> u64 {
434        self.recorder_state.key.as_key_ref().into()
435    }
436}
437
438impl Drop for Recording {
439    fn drop(&mut self) {
440        assert!(
441            self.recorder_state
442                .recordings
443                .remove(&self.state.key.as_key_ref())
444                .is_some(),
445            "missing recording"
446        );
447    }
448}
449
450#[derive(Debug)]
451struct RecordingState {
452    active: Mutex<HashMap<Id, (&'static Metadata<'static>, Option<Id>)>>,
453    key: Key,
454    seq: AtomicUsize,
455    spool: Spool<Event>,
456}
457
458/// A recorder captures events from a [`tracing::span`] and records them
459/// to a [`mpsc::UnboundedSender`]. In order to record events, the recorder's
460/// layer ([`Recorder::layer`]) must be installed into the relevant tracing
461/// subscriber.
462pub struct Recorder {
463    state: Arc<RecorderState>,
464}
465
466#[derive(Debug)]
467struct RecorderState {
468    key: Key,
469    recordings: Arc<DashMap<KeyRef, Arc<RecordingState>>>,
470    pool: Pool<Event>,
471}
472
473impl Recorder {
474    /// Create a new recorder.
475    pub fn new() -> Self {
476        let state = Arc::new(RecorderState {
477            key: Key::new(),
478            recordings: Arc::new(DashMap::new()),
479            pool: Pool::new(1024),
480        });
481        Self { state }
482    }
483
484    /// Create a new recording that can be used to selective capture
485    /// events and span traces.
486    pub fn record(&self, cap: usize) -> Recording {
487        Recording::new(cap, Arc::clone(&self.state))
488    }
489
490    /// The layer associated with this recorder. This layer must be
491    /// installed into the relevant tracing subscriber in order to
492    /// record events.
493    pub fn layer(&self) -> RecorderLayer {
494        RecorderLayer {
495            state: Arc::clone(&self.state),
496        }
497    }
498}
499
500/// The type of layer used by [`Recorder`].
501pub struct RecorderLayer {
502    state: Arc<RecorderState>,
503}
504
505impl RecorderLayer {
506    /// Return an iterator over all the [`KeyRef`]s for the given scope.
507    fn iter_recordings<'a, S>(
508        &'a self,
509        scope: Scope<'a, S>,
510    ) -> impl Iterator<Item = dashmap::mapref::one::Ref<'a, KeyRef, Arc<RecordingState>>> + 'a
511    where
512        S: Subscriber + for<'lookup> LookupSpan<'lookup>,
513    {
514        // We can provide deadlock freedom here because we always iterate in the same order:
515        // from root to leaf; thus there can be no cycles.
516        scope
517            .from_root()
518            .filter_map(|span| match span.extensions().get::<(KeyRef, KeyRef)>() {
519                Some((recording_key_ref, recorder_key_ref))
520                    if recorder_key_ref == &self.state.key.as_key_ref() =>
521                {
522                    Some(*recording_key_ref)
523                }
524                _ => None,
525            })
526            // Spans can outlive recording, which may no longer exist.
527            .filter_map(|key_ref| self.state.recordings.get(&key_ref))
528    }
529}
530
531impl<S: Subscriber> Layer<S> for RecorderLayer
532where
533    S: for<'span> LookupSpan<'span>,
534{
535    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
536        let mut visitor = RecordingKeysVisitor::new();
537
538        attrs.record(&mut visitor);
539
540        if let Some(keys) = visitor.keys()
541            && let Some(span) = ctx.span(id)
542        {
543            let mut extensions: tracing_subscriber::registry::ExtensionsMut<'_> =
544                span.extensions_mut();
545            extensions.insert(keys);
546        }
547    }
548
549    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
550        let Some(scope) = ctx.event_scope(event) else {
551            return;
552        };
553        for state in self.iter_recordings(scope) {
554            let mut recorded = self.state.pool.get();
555            let seq = state.seq.fetch_add(1, Ordering::Relaxed);
556            recorded.reset(SystemTime::now(), event.metadata(), seq);
557            event.record(&mut recorded);
558            state.spool.push(recorded);
559        }
560    }
561
562    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
563        let Some(span) = ctx.span(id) else { return };
564
565        for state in self.iter_recordings(span.scope()) {
566            let mut active = state.active.lock().unwrap();
567            active.insert(id.clone(), (span.metadata(), span.parent().map(|o| o.id())));
568        }
569    }
570
571    fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
572        let Some(span) = ctx.span(id) else { return };
573
574        for state in self.iter_recordings(span.scope()) {
575            let mut active = state.active.lock().unwrap();
576            active.remove(id);
577        }
578    }
579}
580
581/// A visitor to pick out the recording key from spans.
582struct RecordingKeysVisitor {
583    recording_key: Option<KeyRef>,
584    recorder_key: Option<KeyRef>,
585}
586
587impl RecordingKeysVisitor {
588    fn new() -> Self {
589        Self {
590            recording_key: None,
591            recorder_key: None,
592        }
593    }
594
595    fn keys(&self) -> Option<(KeyRef, KeyRef)> {
596        match (self.recording_key, self.recorder_key) {
597            (Some(recording_key), Some(recorder_key)) => Some((recording_key, recorder_key)),
598            _ => None,
599        }
600    }
601}
602
603impl Visit for RecordingKeysVisitor {
604    fn record_u64(&mut self, field: &Field, value: u64) {
605        if field.name() == "recording" {
606            self.recording_key = Some(value.into());
607        } else if field.name() == "recorder" {
608            self.recorder_key = Some(value.into());
609        }
610    }
611
612    fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {}
613}
614
615#[cfg(test)]
616mod tests {
617    use serde_json::json;
618    use tracing::Level;
619    use tracing::info;
620    use tracing::span;
621    use tracing_subscriber::Registry;
622    use tracing_subscriber::prelude::*;
623
624    use super::*;
625
626    #[test]
627    fn test_key() {
628        let key = Key::new();
629        assert_eq!(key.as_key_ref(), key.as_key_ref());
630
631        assert_ne!(Key::new().as_key_ref(), Key::new().as_key_ref());
632    }
633
634    #[test]
635    fn test_events_are_recorded() {
636        let recorder = Recorder::new();
637        let recording = recorder.record(10);
638        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
639            let span = recording.span("test");
640            let _guard = span.enter();
641            info!("This event should be recorded");
642            info!("another event");
643        });
644
645        let events = recording.tail();
646        assert_eq!(events.len(), 2);
647        assert_eq!(
648            events[0].json_value(),
649            json!({
650                "message": "This event should be recorded"
651            })
652        );
653        assert_eq!(
654            events[1].json_value(),
655            json!({
656                "message": "another event"
657            })
658        );
659    }
660
661    #[test]
662    fn test_last_n_entries() {
663        let recorder = Recorder::new();
664        let recording = recorder.record(5);
665        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
666            let span = recording.span("test");
667            let _guard = span.enter();
668            for i in 0..10 {
669                info!("event {}", i);
670            }
671        });
672
673        let events = recording.tail();
674        assert_eq!(events.len(), 5);
675        for (i, event) in events.into_iter().enumerate() {
676            assert_eq!(
677                event.json_value(),
678                json!({
679                    "message": format!("event {}", i + 5)
680                })
681            );
682            assert_eq!(event.seq, i + 5);
683        }
684    }
685
686    #[test]
687    fn test_events_outside_span_are_not_recorded() {
688        let recorder = Recorder::new();
689        let recording = recorder.record(10);
690        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
691            let _span = recording.span("test"); // not entered
692            info!("This event should NOT be recorded");
693        });
694        assert_eq!(recording.tail().len(), 0);
695    }
696
697    #[test]
698    fn test_child_span_inherits_recorder() {
699        let recorder = Recorder::new();
700        let recording = recorder.record(10);
701
702        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
703            let outer = recording.span("test");
704            let _outer_guard = outer.enter();
705
706            {
707                let inner = span!(Level::INFO, "child_span");
708                let _inner_guard = inner.enter();
709
710                info!("Event from inner span");
711            }
712        });
713
714        let events = recording.tail();
715        assert_eq!(events.len(), 1);
716        assert_eq!(
717            events[0].json_value(),
718            json!({
719                "message": "Event from inner span"
720            })
721        );
722    }
723
724    // Ensures that nested tracing recordings remain isolated: events
725    // emitted while span A is active are captured only by recording
726    // A, and events emitted inside nested span B are captured only by
727    // recording B. This verifies that the recorder correctly
728    // attributes events to the currently entered span without
729    // cross-contamination.
730    #[test]
731    fn test_recording_spans_are_isolated_across_nested_recordings() {
732        let recorder = Recorder::new();
733        let recording_a = recorder.record(10);
734        let recording_b = recorder.record(10);
735
736        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
737            let span_a = recording_a.span("test_a");
738            let _guard_a = span_a.enter();
739
740            info!("event_from_a");
741
742            {
743                let span_b = recording_b.span("test_b");
744                let _guard_b = span_b.enter();
745
746                info!("event_from_b");
747            }
748        });
749
750        let events_a: Vec<String> = recording_a
751            .tail()
752            .iter()
753            .map(|e| e.json_value().to_string())
754            .collect();
755        let events_b: Vec<String> = recording_b
756            .tail()
757            .iter()
758            .map(|e| e.json_value().to_string())
759            .collect();
760
761        let a_joined = events_a.join(" ");
762        let b_joined = events_b.join(" ");
763
764        assert!(
765            a_joined.contains("event_from_a"),
766            "recording A should contain event_from_a"
767        );
768        assert!(
769            !a_joined.contains("event_from_b"),
770            "recording A should NOT contain event_from_b"
771        );
772        assert!(
773            b_joined.contains("event_from_b"),
774            "recording B should contain event_from_b"
775        );
776        assert!(
777            !b_joined.contains("event_from_a"),
778            "recording B should NOT contain event_from_a"
779        );
780    }
781
782    #[test]
783    fn test_active_spans() {
784        let recorder = Recorder::new();
785        let recording = recorder.record(10);
786
787        tracing::subscriber::with_default(Registry::default().with(recorder.layer()), || {
788            let outer = recording.span("test");
789            let _outer_guard = outer.enter();
790
791            {
792                let inner = span!(Level::INFO, "child_span");
793                let _inner_guard = inner.enter();
794
795                assert_eq!(recording.stacks().len(), 1);
796                // TODO: possibly we should remove the 'recording' span from here.
797                assert_eq!(recording.stacks()[0][0].name(), "child_span");
798                assert_eq!(recording.stacks()[0][1].name(), "recording");
799            }
800        });
801    }
802}