1#![allow(unsafe_op_in_unsafe_fn)]
10
11use hyperactor_telemetry::sqlite::SqliteTracing;
12use opentelemetry::global;
13use opentelemetry::metrics;
14use pyo3::prelude::*;
15use pyo3::types::PyTraceback;
16
17#[pyfunction]
19pub fn get_current_span_id() -> PyResult<u64> {
20 Ok(tracing::Span::current().id().map_or(0, |id| id.into_u64()))
21}
22
23#[pyfunction]
25pub fn forward_to_tracing(py: Python, record: Py<PyAny>) -> PyResult<()> {
26 let message = record.call_method0(py, "getMessage")?;
27 let message: &str = message.extract(py)?;
28 let lineno: i64 = record.getattr(py, "lineno")?.extract(py)?;
29 let file = record.getattr(py, "filename")?;
30 let file: &str = file.extract(py)?;
31 let level: i32 = record.getattr(py, "levelno")?.extract(py)?;
32
33 let actor_id = record
35 .getattr(py, "actor_id")
36 .ok()
37 .and_then(|attr| attr.extract::<String>(py).ok());
38
39 match level {
41 40 | 50 => {
42 let exc = record.getattr(py, "exc_info").ok();
43 let traceback = exc
44 .and_then(|exc| {
45 if exc.is_none(py) {
46 return None;
47 }
48 exc.extract::<(Py<PyAny>, Py<PyAny>, Bound<'_, PyTraceback>)>(py)
49 .ok()
50 })
51 .map(|(_, _, tb)| tb.format().unwrap_or_default());
52 match traceback {
53 Some(traceback) => {
54 tracing::error!(
55 target:"log_events",
56 file = file,
57 lineno = lineno,
58 stacktrace = traceback,
59 actor_id = actor_id.as_deref(),
60 message
61 );
62 }
63 None => {
64 tracing::error!(
65 file = file,
66 lineno = lineno,
67 actor_id = actor_id.as_deref(),
68 message
69 );
70 }
71 }
72 }
73 30 => {
74 tracing::warn!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
75 }
76 20 => {
77 tracing::info!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
78 }
79 10 => {
80 tracing::debug!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
81 }
82 _ => {
83 tracing::info!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
84 }
85 }
86 Ok(())
87}
88
89#[pyfunction]
91pub fn get_execution_id() -> PyResult<String> {
92 Ok(hyperactor_telemetry::env::execution_id())
93}
94
95#[pyfunction]
96pub fn instant_event(message: &str) -> PyResult<()> {
97 tracing::info!(message);
98 Ok(())
99}
100
101fn as_static_str(to_leak: &str) -> &'static str {
105 String::from(to_leak).leak()
106}
107
108#[pyclass(
109 subclass,
110 module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
111)]
112struct PyCounter {
113 inner: metrics::Counter<u64>,
114}
115
116#[pymethods]
117impl PyCounter {
118 #[new]
119 fn new(name: &str) -> Self {
120 Self {
121 inner: global::meter("monarch")
122 .u64_counter(as_static_str(name))
123 .build(),
124 }
125 }
126
127 fn add(&mut self, value: u64, attributes: Option<std::collections::HashMap<String, String>>) {
128 let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
129 Some(attrs) => attrs
130 .into_iter()
131 .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
132 .collect(),
133 None => vec![],
134 };
135 self.inner.add(value, &kv_attributes);
136 }
137}
138
139#[pyclass(
140 subclass,
141 module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
142)]
143struct PyHistogram {
144 inner: metrics::Histogram<f64>,
145}
146
147#[pymethods]
148impl PyHistogram {
149 #[new]
150 fn new(name: &str) -> Self {
151 Self {
152 inner: global::meter("monarch")
153 .f64_histogram(as_static_str(name))
154 .build(),
155 }
156 }
157
158 fn record(
159 &mut self,
160 value: f64,
161 attributes: Option<std::collections::HashMap<String, String>>,
162 ) {
163 let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
164 Some(attrs) => attrs
165 .into_iter()
166 .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
167 .collect(),
168 None => vec![],
169 };
170 self.inner.record(value, &kv_attributes);
171 }
172}
173
174#[pyclass(
175 subclass,
176 module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
177)]
178struct PyUpDownCounter {
179 inner: metrics::UpDownCounter<i64>,
180}
181
182#[pymethods]
183impl PyUpDownCounter {
184 #[new]
185 fn new(name: &str) -> Self {
186 Self {
187 inner: global::meter("monarch")
188 .i64_up_down_counter(as_static_str(name))
189 .build(),
190 }
191 }
192
193 fn add(&mut self, value: i64, attributes: Option<std::collections::HashMap<String, String>>) {
194 let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
195 Some(attrs) => attrs
196 .into_iter()
197 .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
198 .collect(),
199 None => vec![],
200 };
201 self.inner.add(value, &kv_attributes);
202 }
203}
204
205#[pyclass(
206 unsendable,
207 subclass,
208 module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
209)]
210struct PySpan {
211 span: tracing::span::EnteredSpan,
212}
213
214#[pymethods]
215impl PySpan {
216 #[new]
217 fn new(name: &str, actor_id: Option<&str>) -> Self {
218 let span = if let Some(actor_id) = actor_id {
219 tracing::span!(
220 tracing::Level::INFO,
221 "python.span",
222 name = name,
223 actor_id = actor_id
224 )
225 } else {
226 tracing::span!(tracing::Level::INFO, "python.span", name = name)
227 };
228 let entered_span = span.entered();
229
230 Self { span: entered_span }
231 }
232
233 fn exit(&mut self) {
234 self.span = tracing::span::Span::none().entered();
235 }
236}
237
238#[pyclass(
239 subclass,
240 module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
241)]
242struct PySqliteTracing {
243 guard: Option<SqliteTracing>,
244}
245
246#[pymethods]
247impl PySqliteTracing {
248 #[new]
249 #[pyo3(signature = (in_memory = false))]
250 fn new(in_memory: bool) -> PyResult<Self> {
251 let guard = if in_memory {
252 SqliteTracing::new_in_memory()
253 } else {
254 SqliteTracing::new()
255 };
256
257 match guard {
258 Ok(guard) => Ok(Self { guard: Some(guard) }),
259 Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
260 "Failed to create SQLite tracing guard: {}",
261 e
262 ))),
263 }
264 }
265
266 fn db_path(&self) -> PyResult<Option<String>> {
267 match &self.guard {
268 Some(guard) => Ok(guard.db_path().map(|p| p.to_string_lossy().to_string())),
269 None => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
270 "Guard has been closed",
271 )),
272 }
273 }
274
275 fn __enter__(slf: PyRefMut<'_, Self>) -> PyResult<PyRefMut<'_, Self>> {
276 Ok(slf)
277 }
278
279 fn __exit__(
280 &mut self,
281 _exc_type: Option<Py<PyAny>>,
282 _exc_value: Option<Py<PyAny>>,
283 _traceback: Option<Py<PyAny>>,
284 ) -> PyResult<bool> {
285 self.guard = None;
286 Ok(false) }
288
289 fn close(&mut self) {
290 self.guard = None;
291 }
292}
293
294use pyo3::Bound;
295use pyo3::types::PyModule;
296
297pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
298 let f = wrap_pyfunction!(forward_to_tracing, module)?;
300 f.setattr(
301 "__module__",
302 "monarch._rust_bindings.monarch_hyperactor.telemetry",
303 )?;
304 module.add_function(f)?;
305
306 let get_current_span_id_fn = wrap_pyfunction!(get_current_span_id, module)?;
308 get_current_span_id_fn.setattr(
309 "__module__",
310 "monarch._rust_bindings.monarch_hyperactor.telemetry",
311 )?;
312 module.add_function(get_current_span_id_fn)?;
313
314 let get_execution_id_fn = wrap_pyfunction!(get_execution_id, module)?;
315 get_execution_id_fn.setattr(
316 "__module__",
317 "monarch._rust_bindings.monarch_hyperactor.telemetry",
318 )?;
319 module.add_function(get_execution_id_fn)?;
320
321 let instant_event_fn = wrap_pyfunction!(instant_event, module)?;
322 instant_event_fn.setattr(
323 "__module__",
324 "monarch._rust_bindings.monarch_hyperactor.telemetry",
325 )?;
326 module.add_function(instant_event_fn)?;
327
328 module.add_class::<PySpan>()?;
329 module.add_class::<PyCounter>()?;
330 module.add_class::<PyHistogram>()?;
331 module.add_class::<PyUpDownCounter>()?;
332 module.add_class::<PySqliteTracing>()?;
333 Ok(())
334}