hyperactor_telemetry/sinks/
sqlite.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! SQLite sink with batched writes and transactions.
10//! Runs on background thread to avoid blocking application threads.
11//!
12//! Reuses table definitions and insertion logic from the old SqliteLayer
13//! to ensure 100% identical behavior.
14
15use std::path::Path;
16
17use anyhow::Result;
18use anyhow::anyhow;
19use rusqlite::Connection;
20use rusqlite::functions::FunctionFlags;
21use serde_json::Value as JValue;
22use tracing_core::LevelFilter;
23use tracing_subscriber::filter::Targets;
24
25use crate::sqlite;
26use crate::trace_dispatcher::FieldValue;
27use crate::trace_dispatcher::TraceEvent;
28use crate::trace_dispatcher::TraceEventSink;
29
30/// SQLite sink that batches events and writes them in transactions.
31/// Reuses the exact same table schema and insertion logic from SqliteLayer.
32pub struct SqliteSink {
33    conn: Connection,
34    batch: Vec<TraceEvent>,
35    batch_size: usize,
36    target_filter: Targets,
37}
38
39impl SqliteSink {
40    /// Create a new SQLite sink with an in-memory database.
41    /// Matches the API of SqliteLayer::new()
42    ///
43    /// # Arguments
44    /// * `batch_size` - Number of events to batch before flushing to disk
45    pub fn new(batch_size: usize) -> Result<Self> {
46        let conn = Connection::open_in_memory()?;
47        Self::setup_connection(conn, batch_size)
48    }
49
50    /// Create a new SQLite sink with a file-based database.
51    /// Matches the API of SqliteLayer::new_with_file()
52    ///
53    /// # Arguments
54    /// * `db_path` - Path to SQLite database file
55    /// * `batch_size` - Number of events to batch before flushing to disk
56    pub fn new_with_file(db_path: impl AsRef<Path>, batch_size: usize) -> Result<Self> {
57        let conn = Connection::open(db_path)?;
58        Self::setup_connection(conn, batch_size)
59    }
60
61    fn setup_connection(conn: Connection, batch_size: usize) -> Result<Self> {
62        for table in sqlite::ALL_TABLES.iter() {
63            conn.execute(&table.create_table_stmt, [])?;
64        }
65
66        conn.create_scalar_function(
67            "assert",
68            2,
69            FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC,
70            move |ctx| {
71                let condition: bool = ctx.get(0)?;
72                let message: String = ctx.get(1)?;
73
74                if !condition {
75                    return Err(rusqlite::Error::UserFunctionError(
76                        anyhow!("assertion failed:{condition} {message}",).into(),
77                    ));
78                }
79
80                Ok(condition)
81            },
82        )?;
83
84        Ok(Self {
85            conn,
86            batch: Vec::with_capacity(batch_size),
87            batch_size,
88            target_filter: Targets::new()
89                .with_target("execution", LevelFilter::OFF)
90                .with_target("opentelemetry", LevelFilter::OFF)
91                .with_target("hyperactor_telemetry", LevelFilter::OFF)
92                .with_default(LevelFilter::TRACE),
93        })
94    }
95
96    fn flush_batch(&mut self) -> Result<()> {
97        if self.batch.is_empty() {
98            return Ok(());
99        }
100
101        let tx = self.conn.transaction()?;
102
103        for event in &self.batch {
104            // We only batch Event variants in consume(), so this match is guaranteed to succeed
105            let TraceEvent::Event {
106                target,
107                fields,
108                timestamp,
109                module_path,
110                file,
111                line,
112                ..
113            } = event
114            else {
115                unreachable!("Only Event variants should be in batch")
116            };
117
118            let timestamp_us = timestamp
119                .duration_since(std::time::UNIX_EPOCH)
120                .unwrap_or_default()
121                .as_micros()
122                .to_string();
123
124            let mut visitor = sqlite::SqlVisitor::default();
125
126            visitor
127                .0
128                .insert("time_us".to_string(), JValue::String(timestamp_us));
129
130            if let Some(mp) = module_path {
131                visitor
132                    .0
133                    .insert("module_path".to_string(), JValue::String(mp.to_string()));
134            }
135            if let Some(l) = line {
136                visitor
137                    .0
138                    .insert("line".to_string(), JValue::String(l.to_string()));
139            }
140            if let Some(f) = file {
141                visitor
142                    .0
143                    .insert("file".to_string(), JValue::String(f.to_string()));
144            }
145
146            for (key, value) in fields {
147                let json_value = match value {
148                    FieldValue::Bool(b) => JValue::Bool(*b),
149                    FieldValue::I64(i) => JValue::Number((*i).into()),
150                    FieldValue::U64(u) => JValue::Number((*u).into()),
151                    FieldValue::F64(f) => serde_json::Number::from_f64(*f)
152                        .map(JValue::Number)
153                        .unwrap_or(JValue::Null),
154                    FieldValue::Str(s) => JValue::String(s.clone()),
155                    FieldValue::Debug(d) => JValue::String(d.clone()),
156                };
157                visitor.0.insert(key.clone(), json_value);
158            }
159
160            let table = if &**target == sqlite::TableName::ACTOR_LIFECYCLE_STR {
161                sqlite::TableName::ActorLifecycle.get_table()
162            } else if &**target == sqlite::TableName::MESSAGES_STR {
163                sqlite::TableName::Messages.get_table()
164            } else {
165                sqlite::TableName::LogEvents.get_table()
166            };
167
168            sqlite::insert_event_fields(&tx, table, visitor)?;
169        }
170
171        tx.commit()?;
172        self.batch.clear();
173
174        Ok(())
175    }
176}
177
178impl TraceEventSink for SqliteSink {
179    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
180        // Only batch Event variants - we ignore spans
181        if matches!(event, TraceEvent::Event { .. }) {
182            self.batch.push(event.clone());
183
184            if self.batch.len() >= self.batch_size {
185                self.flush_batch()?;
186            }
187        }
188
189        Ok(())
190    }
191
192    fn flush(&mut self) -> Result<(), anyhow::Error> {
193        self.flush_batch()
194    }
195
196    fn name(&self) -> &str {
197        "SqliteSink"
198    }
199
200    fn target_filter(&self) -> Option<&Targets> {
201        Some(&self.target_filter)
202    }
203}