hyperactor_telemetry/sinks/
sqlite.rs1use std::path::Path;
16
17use anyhow::Result;
18use anyhow::anyhow;
19use rusqlite::Connection;
20use rusqlite::functions::FunctionFlags;
21use serde_json::Value as JValue;
22use tracing_core::LevelFilter;
23use tracing_subscriber::filter::Targets;
24
25use crate::sqlite;
26use crate::trace_dispatcher::FieldValue;
27use crate::trace_dispatcher::TraceEvent;
28use crate::trace_dispatcher::TraceEventSink;
29
30pub struct SqliteSink {
33 conn: Connection,
34 batch: Vec<TraceEvent>,
35 batch_size: usize,
36 target_filter: Targets,
37}
38
39impl SqliteSink {
40 pub fn new(batch_size: usize) -> Result<Self> {
46 let conn = Connection::open_in_memory()?;
47 Self::setup_connection(conn, batch_size)
48 }
49
50 pub fn new_with_file(db_path: impl AsRef<Path>, batch_size: usize) -> Result<Self> {
57 let conn = Connection::open(db_path)?;
58 Self::setup_connection(conn, batch_size)
59 }
60
61 fn setup_connection(conn: Connection, batch_size: usize) -> Result<Self> {
62 for table in sqlite::ALL_TABLES.iter() {
63 conn.execute(&table.create_table_stmt, [])?;
64 }
65
66 conn.create_scalar_function(
67 "assert",
68 2,
69 FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC,
70 move |ctx| {
71 let condition: bool = ctx.get(0)?;
72 let message: String = ctx.get(1)?;
73
74 if !condition {
75 return Err(rusqlite::Error::UserFunctionError(
76 anyhow!("assertion failed:{condition} {message}",).into(),
77 ));
78 }
79
80 Ok(condition)
81 },
82 )?;
83
84 Ok(Self {
85 conn,
86 batch: Vec::with_capacity(batch_size),
87 batch_size,
88 target_filter: Targets::new()
89 .with_target("execution", LevelFilter::OFF)
90 .with_target("opentelemetry", LevelFilter::OFF)
91 .with_target("hyperactor_telemetry", LevelFilter::OFF)
92 .with_default(LevelFilter::TRACE),
93 })
94 }
95
96 fn flush_batch(&mut self) -> Result<()> {
97 if self.batch.is_empty() {
98 return Ok(());
99 }
100
101 let tx = self.conn.transaction()?;
102
103 for event in &self.batch {
104 let TraceEvent::Event {
106 target,
107 fields,
108 timestamp,
109 module_path,
110 file,
111 line,
112 ..
113 } = event
114 else {
115 unreachable!("Only Event variants should be in batch")
116 };
117
118 let timestamp_us = timestamp
119 .duration_since(std::time::UNIX_EPOCH)
120 .unwrap_or_default()
121 .as_micros()
122 .to_string();
123
124 let mut visitor = sqlite::SqlVisitor::default();
125
126 visitor
127 .0
128 .insert("time_us".to_string(), JValue::String(timestamp_us));
129
130 if let Some(mp) = module_path {
131 visitor
132 .0
133 .insert("module_path".to_string(), JValue::String(mp.to_string()));
134 }
135 if let Some(l) = line {
136 visitor
137 .0
138 .insert("line".to_string(), JValue::String(l.to_string()));
139 }
140 if let Some(f) = file {
141 visitor
142 .0
143 .insert("file".to_string(), JValue::String(f.to_string()));
144 }
145
146 for (key, value) in fields {
147 let json_value = match value {
148 FieldValue::Bool(b) => JValue::Bool(*b),
149 FieldValue::I64(i) => JValue::Number((*i).into()),
150 FieldValue::U64(u) => JValue::Number((*u).into()),
151 FieldValue::F64(f) => serde_json::Number::from_f64(*f)
152 .map(JValue::Number)
153 .unwrap_or(JValue::Null),
154 FieldValue::Str(s) => JValue::String(s.clone()),
155 FieldValue::Debug(d) => JValue::String(d.clone()),
156 };
157 visitor.0.insert(key.clone(), json_value);
158 }
159
160 let table = if &**target == sqlite::TableName::ACTOR_LIFECYCLE_STR {
161 sqlite::TableName::ActorLifecycle.get_table()
162 } else if &**target == sqlite::TableName::MESSAGES_STR {
163 sqlite::TableName::Messages.get_table()
164 } else {
165 sqlite::TableName::LogEvents.get_table()
166 };
167
168 sqlite::insert_event_fields(&tx, table, visitor)?;
169 }
170
171 tx.commit()?;
172 self.batch.clear();
173
174 Ok(())
175 }
176}
177
178impl TraceEventSink for SqliteSink {
179 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
180 if matches!(event, TraceEvent::Event { .. }) {
182 self.batch.push(event.clone());
183
184 if self.batch.len() >= self.batch_size {
185 self.flush_batch()?;
186 }
187 }
188
189 Ok(())
190 }
191
192 fn flush(&mut self) -> Result<(), anyhow::Error> {
193 self.flush_batch()
194 }
195
196 fn name(&self) -> &str {
197 "SqliteSink"
198 }
199
200 fn target_filter(&self) -> Option<&Targets> {
201 Some(&self.target_filter)
202 }
203}