monarch_hyperactor/
telemetry.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(unsafe_op_in_unsafe_fn)]
10
11use hyperactor_telemetry::sqlite::SqliteTracing;
12use opentelemetry::global;
13use opentelemetry::metrics;
14use pyo3::prelude::*;
15use pyo3::types::PyTraceback;
16
17/// Get the current span ID from the active span
18#[pyfunction]
19pub fn get_current_span_id() -> PyResult<u64> {
20    Ok(tracing::Span::current().id().map_or(0, |id| id.into_u64()))
21}
22
23/// Log a message with the given metaata
24#[pyfunction]
25pub fn forward_to_tracing(py: Python, record: Py<PyAny>) -> PyResult<()> {
26    let message = record.call_method0(py, "getMessage")?;
27    let message: &str = message.extract(py)?;
28    let lineno: i64 = record.getattr(py, "lineno")?.extract(py)?;
29    let file = record.getattr(py, "filename")?;
30    let file: &str = file.extract(py)?;
31    let level: i32 = record.getattr(py, "levelno")?.extract(py)?;
32
33    // Extract actor_id from the Python record object if available
34    let actor_id = record
35        .getattr(py, "actor_id")
36        .ok()
37        .and_then(|attr| attr.extract::<String>(py).ok());
38
39    // Map level number to level name
40    match level {
41        40 | 50 => {
42            let exc = record.getattr(py, "exc_info").ok();
43            let traceback = exc
44                .and_then(|exc| {
45                    if exc.is_none(py) {
46                        return None;
47                    }
48                    exc.extract::<(Py<PyAny>, Py<PyAny>, Bound<'_, PyTraceback>)>(py)
49                        .ok()
50                })
51                .map(|(_, _, tb)| tb.format().unwrap_or_default());
52            match traceback {
53                Some(traceback) => {
54                    tracing::error!(
55                        target:"log_events",
56                        file = file,
57                        lineno = lineno,
58                        stacktrace = traceback,
59                        actor_id = actor_id.as_deref(),
60                        message
61                    );
62                }
63                None => {
64                    tracing::error!(
65                        file = file,
66                        lineno = lineno,
67                        actor_id = actor_id.as_deref(),
68                        message
69                    );
70                }
71            }
72        }
73        30 => {
74            tracing::warn!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
75        }
76        20 => {
77            tracing::info!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
78        }
79        10 => {
80            tracing::debug!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
81        }
82        _ => {
83            tracing::info!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
84        }
85    }
86    Ok(())
87}
88
89/// Get the current execution ID
90#[pyfunction]
91pub fn get_execution_id() -> PyResult<String> {
92    Ok(hyperactor_telemetry::env::execution_id())
93}
94
95#[pyfunction]
96pub fn instant_event(message: &str) -> PyResult<()> {
97    tracing::info!(message);
98    Ok(())
99}
100
101// opentelemetry requires that the names of counters etc are static for the lifetime of the program.
102// Since we are binding these classes from python to rust, we have to leak these strings in order to
103// ensure they live forever. This is fine, as these classes aren't dynamically created.
104fn as_static_str(to_leak: &str) -> &'static str {
105    String::from(to_leak).leak()
106}
107
108#[pyclass(
109    subclass,
110    module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
111)]
112struct PyCounter {
113    inner: metrics::Counter<u64>,
114}
115
116#[pymethods]
117impl PyCounter {
118    #[new]
119    fn new(name: &str) -> Self {
120        Self {
121            inner: global::meter("monarch")
122                .u64_counter(as_static_str(name))
123                .build(),
124        }
125    }
126
127    fn add(&mut self, value: u64, attributes: Option<std::collections::HashMap<String, String>>) {
128        let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
129            Some(attrs) => attrs
130                .into_iter()
131                .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
132                .collect(),
133            None => vec![],
134        };
135        self.inner.add(value, &kv_attributes);
136    }
137}
138
139#[pyclass(
140    subclass,
141    module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
142)]
143struct PyHistogram {
144    inner: metrics::Histogram<f64>,
145}
146
147#[pymethods]
148impl PyHistogram {
149    #[new]
150    fn new(name: &str) -> Self {
151        Self {
152            inner: global::meter("monarch")
153                .f64_histogram(as_static_str(name))
154                .build(),
155        }
156    }
157
158    fn record(
159        &mut self,
160        value: f64,
161        attributes: Option<std::collections::HashMap<String, String>>,
162    ) {
163        let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
164            Some(attrs) => attrs
165                .into_iter()
166                .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
167                .collect(),
168            None => vec![],
169        };
170        self.inner.record(value, &kv_attributes);
171    }
172}
173
174#[pyclass(
175    subclass,
176    module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
177)]
178struct PyUpDownCounter {
179    inner: metrics::UpDownCounter<i64>,
180}
181
182#[pymethods]
183impl PyUpDownCounter {
184    #[new]
185    fn new(name: &str) -> Self {
186        Self {
187            inner: global::meter("monarch")
188                .i64_up_down_counter(as_static_str(name))
189                .build(),
190        }
191    }
192
193    fn add(&mut self, value: i64, attributes: Option<std::collections::HashMap<String, String>>) {
194        let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
195            Some(attrs) => attrs
196                .into_iter()
197                .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
198                .collect(),
199            None => vec![],
200        };
201        self.inner.add(value, &kv_attributes);
202    }
203}
204
205#[pyclass(
206    unsendable,
207    subclass,
208    module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
209)]
210struct PySpan {
211    span: tracing::span::EnteredSpan,
212}
213
214#[pymethods]
215impl PySpan {
216    #[new]
217    fn new(name: &str, actor_id: Option<&str>) -> Self {
218        let span = if let Some(actor_id) = actor_id {
219            tracing::span!(
220                tracing::Level::INFO,
221                "python.span",
222                name = name,
223                actor_id = actor_id
224            )
225        } else {
226            tracing::span!(tracing::Level::INFO, "python.span", name = name)
227        };
228        let entered_span = span.entered();
229
230        Self { span: entered_span }
231    }
232
233    fn exit(&mut self) {
234        self.span = tracing::span::Span::none().entered();
235    }
236}
237
238#[pyclass(
239    subclass,
240    module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
241)]
242struct PySqliteTracing {
243    guard: Option<SqliteTracing>,
244}
245
246#[pymethods]
247impl PySqliteTracing {
248    #[new]
249    #[pyo3(signature = (in_memory = false))]
250    fn new(in_memory: bool) -> PyResult<Self> {
251        let guard = if in_memory {
252            SqliteTracing::new_in_memory()
253        } else {
254            SqliteTracing::new()
255        };
256
257        match guard {
258            Ok(guard) => Ok(Self { guard: Some(guard) }),
259            Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
260                "Failed to create SQLite tracing guard: {}",
261                e
262            ))),
263        }
264    }
265
266    fn db_path(&self) -> PyResult<Option<String>> {
267        match &self.guard {
268            Some(guard) => Ok(guard.db_path().map(|p| p.to_string_lossy().to_string())),
269            None => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
270                "Guard has been closed",
271            )),
272        }
273    }
274
275    fn __enter__(slf: PyRefMut<'_, Self>) -> PyResult<PyRefMut<'_, Self>> {
276        Ok(slf)
277    }
278
279    fn __exit__(
280        &mut self,
281        _exc_type: Option<Py<PyAny>>,
282        _exc_value: Option<Py<PyAny>>,
283        _traceback: Option<Py<PyAny>>,
284    ) -> PyResult<bool> {
285        self.guard = None;
286        Ok(false) // Don't suppress exceptions
287    }
288
289    fn close(&mut self) {
290        self.guard = None;
291    }
292}
293
294use pyo3::Bound;
295use pyo3::types::PyModule;
296
297pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
298    // Register the forward_to_tracing function
299    let f = wrap_pyfunction!(forward_to_tracing, module)?;
300    f.setattr(
301        "__module__",
302        "monarch._rust_bindings.monarch_hyperactor.telemetry",
303    )?;
304    module.add_function(f)?;
305
306    // Register the span-related functions
307    let get_current_span_id_fn = wrap_pyfunction!(get_current_span_id, module)?;
308    get_current_span_id_fn.setattr(
309        "__module__",
310        "monarch._rust_bindings.monarch_hyperactor.telemetry",
311    )?;
312    module.add_function(get_current_span_id_fn)?;
313
314    let get_execution_id_fn = wrap_pyfunction!(get_execution_id, module)?;
315    get_execution_id_fn.setattr(
316        "__module__",
317        "monarch._rust_bindings.monarch_hyperactor.telemetry",
318    )?;
319    module.add_function(get_execution_id_fn)?;
320
321    let instant_event_fn = wrap_pyfunction!(instant_event, module)?;
322    instant_event_fn.setattr(
323        "__module__",
324        "monarch._rust_bindings.monarch_hyperactor.telemetry",
325    )?;
326    module.add_function(instant_event_fn)?;
327
328    module.add_class::<PySpan>()?;
329    module.add_class::<PyCounter>()?;
330    module.add_class::<PyHistogram>()?;
331    module.add_class::<PyUpDownCounter>()?;
332    module.add_class::<PySqliteTracing>()?;
333    Ok(())
334}