monarch_distributed_telemetry/
lib.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Distributed Telemetry - Three-component architecture
10//!
11//! 1. DatabaseScanner (Rust): Local MemTable operations, scans with child stream merging
12//! 2. DistributedTelemetryActor (Python): Orchestrates children, wraps DatabaseScanner
13//! 3. QueryEngine (Rust): DataFusion query execution, creates ports, collects results
14//!
15//! Data flows directly Rust-to-Rust via PortRef for efficiency.
16
17pub mod database_scanner;
18mod entity_dispatcher;
19pub mod pyspy_table;
20pub mod query_engine;
21mod record_batch_sink;
22
23pub use database_scanner::DatabaseScanner;
24use datafusion::arrow::datatypes::SchemaRef;
25use datafusion::arrow::ipc::writer::StreamWriter;
26use datafusion::arrow::record_batch::RecordBatch;
27pub use entity_dispatcher::EntityDispatcher;
28use hyperactor::Bind;
29use hyperactor::Unbind;
30use pyo3::prelude::*;
31pub use pyspy_table::PySpyDump;
32pub use pyspy_table::PySpyFrame;
33pub use pyspy_table::PySpyLocalVariable;
34pub use pyspy_table::PySpyStackTrace;
35pub use query_engine::QueryEngine;
36pub use record_batch_sink::FlushCallback;
37pub use record_batch_sink::RecordBatchSink;
38pub use record_batch_sink::get_flush_count;
39pub use record_batch_sink::reset_flush_count;
40use serde::Deserialize;
41use serde::Serialize;
42use serde_multipart::Part;
43use typeuri::Named;
44
45/// Response message for streaming query results.
46/// Sent directly over Rust PortRef for efficiency.
47/// Completion is signaled by the scan endpoint returning, not via a message.
48#[derive(Debug, Clone, Serialize, Deserialize, Named, Bind, Unbind)]
49pub struct QueryResponse {
50    /// A batch of data in Arrow IPC format.
51    /// Uses Part for zero-copy transfer across the actor system.
52    pub data: Part,
53}
54
55// ============================================================================
56// Serialization helpers
57// ============================================================================
58
59/// Helper to convert SystemTime to microseconds since Unix epoch.
60pub(crate) fn timestamp_to_micros(timestamp: &std::time::SystemTime) -> i64 {
61    timestamp
62        .duration_since(std::time::UNIX_EPOCH)
63        .unwrap_or_default()
64        .as_micros() as i64
65}
66
67pub(crate) fn serialize_schema(schema: &SchemaRef) -> anyhow::Result<Vec<u8>> {
68    let batch = RecordBatch::new_empty(schema.clone());
69    let mut buf = Vec::new();
70    let mut writer = StreamWriter::try_new(&mut buf, schema)?;
71    writer.write(&batch)?;
72    writer.finish()?;
73    Ok(buf)
74}
75
76pub(crate) fn serialize_batch(batch: &RecordBatch) -> anyhow::Result<Vec<u8>> {
77    let mut buf = Vec::new();
78    let mut writer = StreamWriter::try_new(&mut buf, &batch.schema())?;
79    writer.write(batch)?;
80    writer.finish()?;
81    Ok(buf)
82}
83
84// ============================================================================
85// Python module registration
86// ============================================================================
87
88/// Register the RecordBatchSink with the telemetry system.
89/// This will cause trace events to be collected as RecordBatches.
90/// Use `get_record_batch_flush_count()` to check how many batches have been flushed.
91///
92/// This can be called at any time - before or after telemetry initialization.
93/// Sinks will start receiving events once the telemetry dispatcher is created.
94///
95/// Args:
96///     batch_size: Number of rows to buffer before flushing each table
97#[pyfunction]
98fn enable_record_batch_tracing(batch_size: usize) {
99    let sink = RecordBatchSink::new_printing(batch_size);
100    hyperactor_telemetry::register_sink(Box::new(sink));
101}
102
103/// Get the total number of RecordBatches flushed by the sink.
104/// This can be used in tests to verify that the sink is receiving events.
105#[pyfunction]
106fn get_record_batch_flush_count() -> usize {
107    get_flush_count()
108}
109
110/// Reset the flush counter to zero. Useful for tests.
111#[pyfunction]
112fn reset_record_batch_flush_count() {
113    reset_flush_count()
114}
115
116pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
117    module.add_function(wrap_pyfunction!(enable_record_batch_tracing, module)?)?;
118    module.add_function(wrap_pyfunction!(get_record_batch_flush_count, module)?)?;
119    module.add_function(wrap_pyfunction!(reset_record_batch_flush_count, module)?)?;
120    Ok(())
121}