monarch_distributed_telemetry/
record_batch_sink.rs1use std::sync::atomic::AtomicUsize;
21use std::sync::atomic::Ordering;
22
23use datafusion::arrow::record_batch::RecordBatch;
24use hyperactor_telemetry::FieldValue;
25use hyperactor_telemetry::TraceEvent;
26use hyperactor_telemetry::TraceEventSink;
27use monarch_record_batch::RecordBatchBuffer;
28use monarch_record_batch::RecordBatchRow;
29
30static FLUSH_COUNT: AtomicUsize = AtomicUsize::new(0);
33
34pub fn get_flush_count() -> usize {
36 FLUSH_COUNT.load(Ordering::SeqCst)
37}
38
39pub fn reset_flush_count() {
41 FLUSH_COUNT.store(0, Ordering::SeqCst);
42}
43
44use crate::timestamp_to_micros;
45
46fn fields_to_json(fields: &[(&str, FieldValue)]) -> String {
48 let mut map = serde_json::Map::new();
49 for (key, value) in fields {
50 let json_value = match value {
51 FieldValue::Bool(b) => serde_json::Value::Bool(*b),
52 FieldValue::I64(i) => serde_json::Value::Number((*i).into()),
53 FieldValue::U64(u) => serde_json::Value::Number((*u).into()),
54 FieldValue::F64(f) => serde_json::Number::from_f64(*f)
55 .map(serde_json::Value::Number)
56 .unwrap_or(serde_json::Value::Null),
57 FieldValue::Str(s) => serde_json::Value::String(s.clone()),
58 FieldValue::Debug(d) => serde_json::Value::String(d.clone()),
59 };
60 map.insert((*key).to_string(), json_value);
61 }
62 serde_json::Value::Object(map).to_string()
63}
64
65#[derive(RecordBatchRow)]
67pub struct Span {
68 pub id: u64,
69 pub name: String,
70 pub target: String,
71 pub level: String,
72 pub fields_json: String,
73 pub timestamp_us: i64,
74 pub parent_id: Option<u64>,
75 pub thread_name: String,
76 pub file: Option<String>,
77 pub line: Option<u32>,
78}
79
80#[derive(RecordBatchRow)]
82pub struct SpanEvent {
83 pub id: u64,
84 pub timestamp_us: i64,
85 pub event_type: String,
86}
87
88#[derive(RecordBatchRow)]
90pub struct Event {
91 pub name: String,
92 pub target: String,
93 pub level: String,
94 pub fields_json: String,
95 pub timestamp_us: i64,
96 pub parent_span: Option<u64>,
97 pub thread_id: String,
98 pub thread_name: String,
99 pub module_path: Option<String>,
100 pub file: Option<String>,
101 pub line: Option<u32>,
102}
103
104use std::sync::Arc;
105use std::sync::Mutex;
106
107pub type FlushCallback = Box<dyn Fn(&str, RecordBatch) + Send>;
111
112struct RecordBatchSinkInner {
114 spans_buffer: SpanBuffer,
115 span_events_buffer: SpanEventBuffer,
116 events_buffer: EventBuffer,
117 batch_size: usize,
118 flush_callback: FlushCallback,
119}
120
121impl RecordBatchSinkInner {
122 fn flush_buffer<B: RecordBatchBuffer>(
123 buffer: &mut B,
124 table_name: &str,
125 callback: &FlushCallback,
126 ) -> anyhow::Result<()> {
127 let batch = buffer.drain_to_record_batch()?;
130 callback(table_name, batch);
131 Ok(())
132 }
133
134 fn flush(&mut self) -> anyhow::Result<()> {
135 Self::flush_buffer(&mut self.spans_buffer, "spans", &self.flush_callback)?;
136 Self::flush_buffer(
137 &mut self.span_events_buffer,
138 "span_events",
139 &self.flush_callback,
140 )?;
141 Self::flush_buffer(&mut self.events_buffer, "events", &self.flush_callback)?;
142 Ok(())
143 }
144
145 fn flush_spans_if_full(&mut self) -> anyhow::Result<()> {
146 if self.spans_buffer.len() >= self.batch_size {
147 Self::flush_buffer(&mut self.spans_buffer, "spans", &self.flush_callback)?;
148 }
149 Ok(())
150 }
151
152 fn flush_span_events_if_full(&mut self) -> anyhow::Result<()> {
153 if self.span_events_buffer.len() >= self.batch_size {
154 Self::flush_buffer(
155 &mut self.span_events_buffer,
156 "span_events",
157 &self.flush_callback,
158 )?;
159 }
160 Ok(())
161 }
162
163 fn flush_events_if_full(&mut self) -> anyhow::Result<()> {
164 if self.events_buffer.len() >= self.batch_size {
165 Self::flush_buffer(&mut self.events_buffer, "events", &self.flush_callback)?;
166 }
167 Ok(())
168 }
169}
170
171#[derive(Clone)]
186pub struct RecordBatchSink {
187 inner: Arc<Mutex<RecordBatchSinkInner>>,
188}
189
190impl RecordBatchSink {
191 pub fn new(batch_size: usize, flush_callback: FlushCallback) -> Self {
201 let inner = Arc::new(Mutex::new(RecordBatchSinkInner {
202 spans_buffer: SpanBuffer::default(),
203 span_events_buffer: SpanEventBuffer::default(),
204 events_buffer: EventBuffer::default(),
205 batch_size,
206 flush_callback,
207 }));
208 Self { inner }
209 }
210
211 pub fn flush(&self) -> anyhow::Result<()> {
217 let mut inner = self
218 .inner
219 .lock()
220 .map_err(|_| anyhow::anyhow!("lock poisoned"))?;
221 inner.flush()
222 }
223
224 pub fn new_printing(batch_size: usize) -> Self {
226 Self::new(
227 batch_size,
228 Box::new(|table_name, batch| {
229 FLUSH_COUNT.fetch_add(1, Ordering::SeqCst);
230 println!(
231 "[RecordBatchSink] Table: {}, rows: {}, schema: {:?}",
232 table_name,
233 batch.num_rows(),
234 batch
235 .schema()
236 .fields()
237 .iter()
238 .map(|f| f.name())
239 .collect::<Vec<_>>()
240 );
241 }),
242 )
243 }
244}
245
246impl TraceEventSink for RecordBatchSink {
247 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
248 let mut inner = self
249 .inner
250 .lock()
251 .map_err(|_| anyhow::anyhow!("lock poisoned"))?;
252
253 match event {
254 TraceEvent::NewSpan {
255 id,
256 name,
257 target,
258 level,
259 fields,
260 timestamp,
261 parent_id,
262 thread_name,
263 file,
264 line,
265 } => {
266 inner.spans_buffer.insert(Span {
267 id: *id,
268 name: name.to_string(),
269 target: target.to_string(),
270 level: level.to_string(),
271 fields_json: fields_to_json(fields),
272 timestamp_us: timestamp_to_micros(timestamp),
273 parent_id: *parent_id,
274 thread_name: thread_name.to_string(),
275 file: file.map(|s| s.to_string()),
276 line: *line,
277 });
278 inner.flush_spans_if_full()?;
279 }
280 TraceEvent::SpanEnter { id, timestamp, .. } => {
281 inner.span_events_buffer.insert(SpanEvent {
282 id: *id,
283 timestamp_us: timestamp_to_micros(timestamp),
284 event_type: "enter".to_string(),
285 });
286 inner.flush_span_events_if_full()?;
287 }
288 TraceEvent::SpanExit { id, timestamp, .. } => {
289 inner.span_events_buffer.insert(SpanEvent {
290 id: *id,
291 timestamp_us: timestamp_to_micros(timestamp),
292 event_type: "exit".to_string(),
293 });
294 inner.flush_span_events_if_full()?;
295 }
296 TraceEvent::SpanClose { id, timestamp } => {
297 inner.span_events_buffer.insert(SpanEvent {
298 id: *id,
299 timestamp_us: timestamp_to_micros(timestamp),
300 event_type: "close".to_string(),
301 });
302 inner.flush_span_events_if_full()?;
303 }
304 TraceEvent::Event {
305 name,
306 target,
307 level,
308 fields,
309 timestamp,
310 parent_span,
311 thread_id,
312 thread_name,
313 module_path,
314 file,
315 line,
316 } => {
317 inner.events_buffer.insert(Event {
318 name: name.to_string(),
319 target: target.to_string(),
320 level: level.to_string(),
321 fields_json: fields_to_json(fields),
322 timestamp_us: timestamp_to_micros(timestamp),
323 parent_span: *parent_span,
324 thread_id: thread_id.to_string(),
325 thread_name: thread_name.to_string(),
326 module_path: module_path.map(|s| s.to_string()),
327 file: file.map(|s| s.to_string()),
328 line: *line,
329 });
330 inner.flush_events_if_full()?;
331 }
332 }
333 Ok(())
334 }
335
336 fn flush(&mut self) -> Result<(), anyhow::Error> {
337 Ok(())
342 }
343
344 fn name(&self) -> &str {
345 "RecordBatchSink"
346 }
347}