Skip to main content

monarch_hyperactor/
telemetry.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(unsafe_op_in_unsafe_fn)]
10
11use hyperactor_telemetry::end_user_span;
12use hyperactor_telemetry::sinks::perfetto::USER_TELEMETRY_PREFIX;
13use hyperactor_telemetry::sqlite::SqliteTracing;
14use hyperactor_telemetry::start_user_span;
15use hyperactor_telemetry::trace_dispatcher::FieldValue;
16use opentelemetry::global;
17use opentelemetry::metrics;
18use pyo3::prelude::*;
19use pyo3::types::PyTraceback;
20
21use crate::proc::PyActorAddr;
22
23/// Get the current span ID from the active span
24#[pyfunction]
25pub fn get_current_span_id() -> PyResult<u64> {
26    Ok(tracing::Span::current().id().map_or(0, |id| id.into_u64()))
27}
28
29/// Log a message with the given metaata
30#[pyfunction]
31pub fn forward_to_tracing(py: Python, record: Py<PyAny>) -> PyResult<()> {
32    let message = record.call_method0(py, "getMessage")?;
33    let message: &str = message.extract(py)?;
34    let lineno: i64 = record.getattr(py, "lineno")?.extract(py)?;
35    let file = record.getattr(py, "filename")?;
36    let file: &str = file.extract(py)?;
37    let level: i32 = record.getattr(py, "levelno")?.extract(py)?;
38
39    // Extract actor_id from the Python record object if available
40    let actor_id = record
41        .getattr(py, "actor_id")
42        .ok()
43        .and_then(|attr| attr.extract::<String>(py).ok());
44
45    // Enter the actor's recording span (if present) so RecorderLayer
46    // captures this event in the per-actor flight recorder. The span
47    // is entered synchronously for the duration of the tracing emit
48    // only — no cross-contamination in shared-asyncio mode.
49    //
50    // Gracefully falls back to plain tracing when _context is absent,
51    // None, or contains an unexpected type.
52    let _recording_guard = extract_recording_span(py);
53
54    // Map level number to level name
55    match level {
56        40 | 50 => {
57            let exc = record.getattr(py, "exc_info").ok();
58            let traceback = exc
59                .and_then(|exc| {
60                    if exc.is_none(py) {
61                        return None;
62                    }
63                    exc.extract::<(Py<PyAny>, Py<PyAny>, Bound<'_, PyTraceback>)>(py)
64                        .ok()
65                })
66                .map(|(_, _, tb)| tb.format().unwrap_or_default());
67            match traceback {
68                Some(traceback) => {
69                    tracing::error!(
70                        target:"log_events",
71                        file = file,
72                        lineno = lineno,
73                        stacktrace = traceback,
74                        actor_id = actor_id.as_deref(),
75                        message
76                    );
77                }
78                None => {
79                    tracing::error!(
80                        file = file,
81                        lineno = lineno,
82                        actor_id = actor_id.as_deref(),
83                        message
84                    );
85                }
86            }
87        }
88        30 => {
89            tracing::warn!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
90        }
91        20 => {
92            tracing::info!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
93        }
94        10 => {
95            tracing::debug!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
96        }
97        _ => {
98            tracing::info!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
99        }
100    }
101    Ok(())
102}
103
104/// Extract the recording span from the current Python actor context.
105///
106/// Looks up the `_context` ContextVar in
107/// `monarch._src.actor.actor_mesh`, downcasts to `PyContext`, and
108/// clones the recording span. Returns an entered span guard that
109/// routes tracing events to the actor's flight recorder.
110///
111/// Returns `None` on any failure — missing module, absent context,
112/// unexpected type, or no recording span. This function is called
113/// from arbitrary Python logging contexts (import-time, client-side,
114/// non-actor threads) and must never fail.
115fn extract_recording_span(py: Python) -> Option<tracing::span::EnteredSpan> {
116    let actor_mesh = py.import("monarch._src.actor.actor_mesh").ok()?;
117    let ctx_var = actor_mesh.getattr("_context").ok()?;
118    let ctx_obj = ctx_var.call_method1("get", (py.None(),)).ok()?;
119    if ctx_obj.is_none() {
120        return None;
121    }
122    let py_ctx = ctx_obj
123        .extract::<PyRef<'_, crate::context::PyContext>>()
124        .ok()?;
125    let span = py_ctx.recording_span()?.clone();
126    Some(span.entered())
127}
128
129/// Get the current execution ID
130#[pyfunction]
131pub fn get_execution_id() -> PyResult<String> {
132    Ok(hyperactor_telemetry::env::execution_id())
133}
134
135#[pyfunction]
136pub fn instant_event(message: &str) -> PyResult<()> {
137    tracing::info!(message);
138    Ok(())
139}
140
141// opentelemetry requires that the names of counters etc are static for the lifetime of the program.
142// Since we are binding these classes from python to rust, we have to leak these strings in order to
143// ensure they live forever. This is fine, as these classes aren't dynamically created.
144fn as_static_str(to_leak: &str) -> &'static str {
145    String::from(to_leak).leak()
146}
147
148#[pyclass(
149    subclass,
150    module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
151)]
152struct PyCounter {
153    inner: metrics::Counter<u64>,
154}
155
156#[pymethods]
157impl PyCounter {
158    #[new]
159    fn new(name: &str) -> Self {
160        Self {
161            inner: global::meter("monarch")
162                .u64_counter(as_static_str(name))
163                .build(),
164        }
165    }
166
167    fn add(&mut self, value: u64, attributes: Option<std::collections::HashMap<String, String>>) {
168        let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
169            Some(attrs) => attrs
170                .into_iter()
171                .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
172                .collect(),
173            None => vec![],
174        };
175        self.inner.add(value, &kv_attributes);
176    }
177}
178
179#[pyclass(
180    subclass,
181    module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
182)]
183struct PyHistogram {
184    inner: metrics::Histogram<f64>,
185}
186
187#[pymethods]
188impl PyHistogram {
189    #[new]
190    fn new(name: &str) -> Self {
191        Self {
192            inner: global::meter("monarch")
193                .f64_histogram(as_static_str(name))
194                .build(),
195        }
196    }
197
198    fn record(
199        &mut self,
200        value: f64,
201        attributes: Option<std::collections::HashMap<String, String>>,
202    ) {
203        let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
204            Some(attrs) => attrs
205                .into_iter()
206                .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
207                .collect(),
208            None => vec![],
209        };
210        self.inner.record(value, &kv_attributes);
211    }
212}
213
214#[pyclass(
215    subclass,
216    module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
217)]
218struct PyUpDownCounter {
219    inner: metrics::UpDownCounter<i64>,
220}
221
222#[pymethods]
223impl PyUpDownCounter {
224    #[new]
225    fn new(name: &str) -> Self {
226        Self {
227            inner: global::meter("monarch")
228                .i64_up_down_counter(as_static_str(name))
229                .build(),
230        }
231    }
232
233    fn add(&mut self, value: i64, attributes: Option<std::collections::HashMap<String, String>>) {
234        let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
235            Some(attrs) => attrs
236                .into_iter()
237                .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
238                .collect(),
239            None => vec![],
240        };
241        self.inner.add(value, &kv_attributes);
242    }
243}
244
245#[pyclass(
246    subclass,
247    module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
248)]
249struct PySpan {
250    id: Option<u64>,
251}
252
253#[pymethods]
254impl PySpan {
255    #[new]
256    #[pyo3(signature = (name, actor_id = None))]
257    fn new(name: &str, actor_id: Option<&PyActorAddr>) -> Self {
258        let mut fields = vec![("name", FieldValue::Str(name.to_string()))];
259        if let Some(actor_id) = actor_id {
260            fields.push(("actor_id", FieldValue::Str(actor_id.inner.to_string())));
261        }
262        let id = start_user_span("python_user_span", USER_TELEMETRY_PREFIX, fields);
263
264        Self {
265            id: (id != 0).then_some(id),
266        }
267    }
268
269    fn __enter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
270        slf
271    }
272
273    fn __exit__(
274        &mut self,
275        _exc_type: Option<Py<PyAny>>,
276        _exc_value: Option<Py<PyAny>>,
277        _traceback: Option<Py<PyAny>>,
278    ) -> PyResult<bool> {
279        if let Some(id) = self.id.take() {
280            end_user_span(id);
281        }
282        Ok(false)
283    }
284}
285
286impl Drop for PySpan {
287    fn drop(&mut self) {
288        if let Some(id) = self.id.take() {
289            end_user_span(id);
290        }
291    }
292}
293
294#[pyclass(
295    subclass,
296    module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
297)]
298struct PySqliteTracing {
299    guard: Option<SqliteTracing>,
300}
301
302#[pymethods]
303impl PySqliteTracing {
304    #[new]
305    #[pyo3(signature = (in_memory = false))]
306    fn new(in_memory: bool) -> PyResult<Self> {
307        let guard = if in_memory {
308            SqliteTracing::new_in_memory()
309        } else {
310            SqliteTracing::new()
311        };
312
313        match guard {
314            Ok(guard) => Ok(Self { guard: Some(guard) }),
315            Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
316                "Failed to create SQLite tracing guard: {}",
317                e
318            ))),
319        }
320    }
321
322    fn db_path(&self) -> PyResult<Option<String>> {
323        match &self.guard {
324            Some(guard) => Ok(guard.db_path().map(|p| p.to_string_lossy().to_string())),
325            None => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
326                "Guard has been closed",
327            )),
328        }
329    }
330
331    fn __enter__(slf: PyRefMut<'_, Self>) -> PyResult<PyRefMut<'_, Self>> {
332        Ok(slf)
333    }
334
335    fn __exit__(
336        &mut self,
337        _exc_type: Option<Py<PyAny>>,
338        _exc_value: Option<Py<PyAny>>,
339        _traceback: Option<Py<PyAny>>,
340    ) -> PyResult<bool> {
341        self.guard = None;
342        Ok(false) // Don't suppress exceptions
343    }
344
345    fn close(&mut self) {
346        self.guard = None;
347    }
348}
349
350pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
351    // Register the forward_to_tracing function
352    let f = wrap_pyfunction!(forward_to_tracing, module)?;
353    f.setattr(
354        "__module__",
355        "monarch._rust_bindings.monarch_hyperactor.telemetry",
356    )?;
357    module.add_function(f)?;
358
359    // Register the span-related functions
360    let get_current_span_id_fn = wrap_pyfunction!(get_current_span_id, module)?;
361    get_current_span_id_fn.setattr(
362        "__module__",
363        "monarch._rust_bindings.monarch_hyperactor.telemetry",
364    )?;
365    module.add_function(get_current_span_id_fn)?;
366
367    let get_execution_id_fn = wrap_pyfunction!(get_execution_id, module)?;
368    get_execution_id_fn.setattr(
369        "__module__",
370        "monarch._rust_bindings.monarch_hyperactor.telemetry",
371    )?;
372    module.add_function(get_execution_id_fn)?;
373
374    let instant_event_fn = wrap_pyfunction!(instant_event, module)?;
375    instant_event_fn.setattr(
376        "__module__",
377        "monarch._rust_bindings.monarch_hyperactor.telemetry",
378    )?;
379    module.add_function(instant_event_fn)?;
380
381    module.add_class::<PySpan>()?;
382    module.add_class::<PyCounter>()?;
383    module.add_class::<PyHistogram>()?;
384    module.add_class::<PyUpDownCounter>()?;
385    module.add_class::<PySqliteTracing>()?;
386    Ok(())
387}
388
389#[cfg(test)]
390mod tests {
391    use pyo3::ffi::c_str;
392    use pyo3::prelude::*;
393
394    use super::*;
395
396    fn init_python() {
397        pyo3::Python::initialize();
398    }
399
400    /// Helper: create a Python logging.LogRecord with the given message.
401    fn make_log_record(py: Python, message: &str) -> Py<PyAny> {
402        let locals = pyo3::types::PyDict::new(py);
403        locals.set_item("msg", message).unwrap();
404        py.run(
405            c_str!(
406                "import logging\n\
407                 record = logging.LogRecord('test', logging.INFO, 'test.py', 1, msg, (), None)"
408            ),
409            None,
410            Some(&locals),
411        )
412        .unwrap();
413        locals.get_item("record").unwrap().unwrap().into()
414    }
415
416    /// forward_to_tracing returns Ok when _context is absent.
417    /// Must never crash in non-actor logging contexts.
418    #[test]
419    fn forward_to_tracing_without_context_does_not_crash() {
420        init_python();
421        Python::attach(|py| {
422            let record = make_log_record(py, "no context marker");
423            let result = forward_to_tracing(py, record);
424            assert!(result.is_ok());
425        });
426    }
427
428    /// When a recording span is entered on the current thread,
429    /// events emitted by forward_to_tracing are captured in the
430    /// recording's ring buffer.
431    #[test]
432    fn forward_to_tracing_captures_when_recording_span_entered() {
433        init_python();
434        hyperactor_telemetry::initialize_logging_for_test();
435
436        let recording = hyperactor_telemetry::recorder().record(64);
437        let span = recording.span("test");
438
439        Python::attach(|py| {
440            let record = make_log_record(py, "recorder marker");
441            let _guard = span.enter();
442            let result = forward_to_tracing(py, record);
443            assert!(result.is_ok());
444        });
445
446        let events = recording.tail();
447        assert!(
448            !events.is_empty(),
449            "expected at least one event in recording"
450        );
451        let last = events.last().unwrap();
452        let fields = format!("{:?}", last);
453        assert!(
454            fields.contains("recorder marker"),
455            "expected 'recorder marker' in event fields, got: {fields}"
456        );
457    }
458
459    /// extract_recording_span returns None when _context is absent.
460    #[test]
461    fn extract_recording_span_returns_none_without_context() {
462        init_python();
463        Python::attach(|py| {
464            let result = extract_recording_span(py);
465            assert!(result.is_none());
466        });
467    }
468}