monarch_distributed_telemetry/
record_batch_sink.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! RecordBatchSink - Collects tracing telemetry data as Arrow RecordBatches
10//!
11//! Implements TraceEventSink for tracing events (spans, log events).
12//!
13//! Produces three tables:
14//! - `spans`: Information about span creation (NewSpan events)
15//! - `span_events`: Enter/exit/close events for spans
16//! - `events`: Tracing events (e.g., tracing::info!())
17//!
18//! For entity lifecycle events (actors, meshes), see EntityDispatcher.
19
20use std::sync::atomic::AtomicUsize;
21use std::sync::atomic::Ordering;
22
23use datafusion::arrow::record_batch::RecordBatch;
24use hyperactor_telemetry::FieldValue;
25use hyperactor_telemetry::TraceEvent;
26use hyperactor_telemetry::TraceEventSink;
27use monarch_record_batch::RecordBatchBuffer;
28use monarch_record_batch::RecordBatchRow;
29
30/// Global counter for the number of batches flushed by the counting sink.
31/// This can be checked from tests to verify that the sink is active.
32static FLUSH_COUNT: AtomicUsize = AtomicUsize::new(0);
33
34/// Get the total number of batches flushed by counting sinks.
35pub fn get_flush_count() -> usize {
36    FLUSH_COUNT.load(Ordering::SeqCst)
37}
38
39/// Reset the flush counter to zero. Useful for tests.
40pub fn reset_flush_count() {
41    FLUSH_COUNT.store(0, Ordering::SeqCst);
42}
43
44use crate::timestamp_to_micros;
45
46/// Helper to convert FieldValue slice to JSON string.
47fn fields_to_json(fields: &[(&str, FieldValue)]) -> String {
48    let mut map = serde_json::Map::new();
49    for (key, value) in fields {
50        let json_value = match value {
51            FieldValue::Bool(b) => serde_json::Value::Bool(*b),
52            FieldValue::I64(i) => serde_json::Value::Number((*i).into()),
53            FieldValue::U64(u) => serde_json::Value::Number((*u).into()),
54            FieldValue::F64(f) => serde_json::Number::from_f64(*f)
55                .map(serde_json::Value::Number)
56                .unwrap_or(serde_json::Value::Null),
57            FieldValue::Str(s) => serde_json::Value::String(s.clone()),
58            FieldValue::Debug(d) => serde_json::Value::String(d.clone()),
59        };
60        map.insert((*key).to_string(), json_value);
61    }
62    serde_json::Value::Object(map).to_string()
63}
64
65/// Row data for the spans table.
66#[derive(RecordBatchRow)]
67pub struct Span {
68    pub id: u64,
69    pub name: String,
70    pub target: String,
71    pub level: String,
72    pub fields_json: String,
73    pub timestamp_us: i64,
74    pub parent_id: Option<u64>,
75    pub thread_name: String,
76    pub file: Option<String>,
77    pub line: Option<u32>,
78}
79
80/// Row data for the span_events table.
81#[derive(RecordBatchRow)]
82pub struct SpanEvent {
83    pub id: u64,
84    pub timestamp_us: i64,
85    pub event_type: String,
86}
87
88/// Row data for the events table.
89#[derive(RecordBatchRow)]
90pub struct Event {
91    pub name: String,
92    pub target: String,
93    pub level: String,
94    pub fields_json: String,
95    pub timestamp_us: i64,
96    pub parent_span: Option<u64>,
97    pub thread_id: String,
98    pub thread_name: String,
99    pub module_path: Option<String>,
100    pub file: Option<String>,
101    pub line: Option<u32>,
102}
103
104use std::sync::Arc;
105use std::sync::Mutex;
106
107/// Callback function type for flushing RecordBatches.
108/// Takes ownership of the RecordBatch. The callback should handle empty batches
109/// by creating the table with the schema but not appending the empty data.
110pub type FlushCallback = Box<dyn Fn(&str, RecordBatch) + Send>;
111
112/// Inner state of RecordBatchSink.
113struct RecordBatchSinkInner {
114    spans_buffer: SpanBuffer,
115    span_events_buffer: SpanEventBuffer,
116    events_buffer: EventBuffer,
117    batch_size: usize,
118    flush_callback: FlushCallback,
119}
120
121impl RecordBatchSinkInner {
122    fn flush_buffer<B: RecordBatchBuffer>(
123        buffer: &mut B,
124        table_name: &str,
125        callback: &FlushCallback,
126    ) -> anyhow::Result<()> {
127        // Always produce a batch (even if empty) - the callback handles empty batches
128        // by creating the table with the schema but not appending empty data
129        let batch = buffer.drain_to_record_batch()?;
130        callback(table_name, batch);
131        Ok(())
132    }
133
134    fn flush(&mut self) -> anyhow::Result<()> {
135        Self::flush_buffer(&mut self.spans_buffer, "spans", &self.flush_callback)?;
136        Self::flush_buffer(
137            &mut self.span_events_buffer,
138            "span_events",
139            &self.flush_callback,
140        )?;
141        Self::flush_buffer(&mut self.events_buffer, "events", &self.flush_callback)?;
142        Ok(())
143    }
144
145    fn flush_spans_if_full(&mut self) -> anyhow::Result<()> {
146        if self.spans_buffer.len() >= self.batch_size {
147            Self::flush_buffer(&mut self.spans_buffer, "spans", &self.flush_callback)?;
148        }
149        Ok(())
150    }
151
152    fn flush_span_events_if_full(&mut self) -> anyhow::Result<()> {
153        if self.span_events_buffer.len() >= self.batch_size {
154            Self::flush_buffer(
155                &mut self.span_events_buffer,
156                "span_events",
157                &self.flush_callback,
158            )?;
159        }
160        Ok(())
161    }
162
163    fn flush_events_if_full(&mut self) -> anyhow::Result<()> {
164        if self.events_buffer.len() >= self.batch_size {
165            Self::flush_buffer(&mut self.events_buffer, "events", &self.flush_callback)?;
166        }
167        Ok(())
168    }
169}
170
171/// Buffers tracing events and produces Arrow RecordBatches.
172///
173/// Implements TraceEventSink for tracing events (spans, log events).
174///
175/// This type can be cloned to get a handle for flushing from outside the
176/// telemetry system. Clone it before registering with telemetry if you need
177/// to call flush() later.
178///
179/// Produces three tables:
180/// - `spans`: Information about span creation (NewSpan events)
181/// - `span_events`: Enter/exit/close events for spans
182/// - `events`: Tracing events (e.g., tracing::info!())
183///
184/// For entity lifecycle events (actors, meshes), see EntityDispatcher.
185#[derive(Clone)]
186pub struct RecordBatchSink {
187    inner: Arc<Mutex<RecordBatchSinkInner>>,
188}
189
190impl RecordBatchSink {
191    /// Create a new RecordBatchSink with the specified batch size and flush callback.
192    ///
193    /// The callback receives (table_name, record_batch) when a batch is ready.
194    /// The callback should handle empty batches by creating the table with the
195    /// schema but not appending the empty data.
196    ///
197    /// # Arguments
198    /// * `batch_size` - Number of rows to buffer before flushing each table
199    /// * `flush_callback` - Called with (table_name, record_batch) when a batch is ready
200    pub fn new(batch_size: usize, flush_callback: FlushCallback) -> Self {
201        let inner = Arc::new(Mutex::new(RecordBatchSinkInner {
202            spans_buffer: SpanBuffer::default(),
203            span_events_buffer: SpanEventBuffer::default(),
204            events_buffer: EventBuffer::default(),
205            batch_size,
206            flush_callback,
207        }));
208        Self { inner }
209    }
210
211    /// Flush all buffers, emitting batches for all tables.
212    ///
213    /// This always emits batches for all three tables (spans, span_events, events),
214    /// even if they are empty. The callback is expected to handle empty batches
215    /// by creating the table with the correct schema but not appending empty data.
216    pub fn flush(&self) -> anyhow::Result<()> {
217        let mut inner = self
218            .inner
219            .lock()
220            .map_err(|_| anyhow::anyhow!("lock poisoned"))?;
221        inner.flush()
222    }
223
224    /// Create a new RecordBatchSink that prints batches to stdout.
225    pub fn new_printing(batch_size: usize) -> Self {
226        Self::new(
227            batch_size,
228            Box::new(|table_name, batch| {
229                FLUSH_COUNT.fetch_add(1, Ordering::SeqCst);
230                println!(
231                    "[RecordBatchSink] Table: {}, rows: {}, schema: {:?}",
232                    table_name,
233                    batch.num_rows(),
234                    batch
235                        .schema()
236                        .fields()
237                        .iter()
238                        .map(|f| f.name())
239                        .collect::<Vec<_>>()
240                );
241            }),
242        )
243    }
244}
245
246impl TraceEventSink for RecordBatchSink {
247    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
248        let mut inner = self
249            .inner
250            .lock()
251            .map_err(|_| anyhow::anyhow!("lock poisoned"))?;
252
253        match event {
254            TraceEvent::NewSpan {
255                id,
256                name,
257                target,
258                level,
259                fields,
260                timestamp,
261                parent_id,
262                thread_name,
263                file,
264                line,
265            } => {
266                inner.spans_buffer.insert(Span {
267                    id: *id,
268                    name: name.to_string(),
269                    target: target.to_string(),
270                    level: level.to_string(),
271                    fields_json: fields_to_json(fields),
272                    timestamp_us: timestamp_to_micros(timestamp),
273                    parent_id: *parent_id,
274                    thread_name: thread_name.to_string(),
275                    file: file.map(|s| s.to_string()),
276                    line: *line,
277                });
278                inner.flush_spans_if_full()?;
279            }
280            TraceEvent::SpanEnter { id, timestamp, .. } => {
281                inner.span_events_buffer.insert(SpanEvent {
282                    id: *id,
283                    timestamp_us: timestamp_to_micros(timestamp),
284                    event_type: "enter".to_string(),
285                });
286                inner.flush_span_events_if_full()?;
287            }
288            TraceEvent::SpanExit { id, timestamp, .. } => {
289                inner.span_events_buffer.insert(SpanEvent {
290                    id: *id,
291                    timestamp_us: timestamp_to_micros(timestamp),
292                    event_type: "exit".to_string(),
293                });
294                inner.flush_span_events_if_full()?;
295            }
296            TraceEvent::SpanClose { id, timestamp } => {
297                inner.span_events_buffer.insert(SpanEvent {
298                    id: *id,
299                    timestamp_us: timestamp_to_micros(timestamp),
300                    event_type: "close".to_string(),
301                });
302                inner.flush_span_events_if_full()?;
303            }
304            TraceEvent::Event {
305                name,
306                target,
307                level,
308                fields,
309                timestamp,
310                parent_span,
311                thread_id,
312                thread_name,
313                module_path,
314                file,
315                line,
316            } => {
317                inner.events_buffer.insert(Event {
318                    name: name.to_string(),
319                    target: target.to_string(),
320                    level: level.to_string(),
321                    fields_json: fields_to_json(fields),
322                    timestamp_us: timestamp_to_micros(timestamp),
323                    parent_span: *parent_span,
324                    thread_id: thread_id.to_string(),
325                    thread_name: thread_name.to_string(),
326                    module_path: module_path.map(|s| s.to_string()),
327                    file: file.map(|s| s.to_string()),
328                    line: *line,
329                });
330                inner.flush_events_if_full()?;
331            }
332        }
333        Ok(())
334    }
335
336    fn flush(&mut self) -> Result<(), anyhow::Error> {
337        // No-op: we don't flush on the periodic timer from the telemetry worker.
338        // Instead, flush happens:
339        // 1. Automatically in consume() when buffers reach batch_size
340        // 2. Explicitly via RecordBatchSink::flush() before queries
341        Ok(())
342    }
343
344    fn name(&self) -> &str {
345        "RecordBatchSink"
346    }
347}