1#![allow(unsafe_op_in_unsafe_fn)]
10
11use hyperactor_telemetry::end_user_span;
12use hyperactor_telemetry::sinks::perfetto::USER_TELEMETRY_PREFIX;
13use hyperactor_telemetry::sqlite::SqliteTracing;
14use hyperactor_telemetry::start_user_span;
15use hyperactor_telemetry::trace_dispatcher::FieldValue;
16use opentelemetry::global;
17use opentelemetry::metrics;
18use pyo3::prelude::*;
19use pyo3::types::PyTraceback;
20
21use crate::proc::PyActorAddr;
22
23#[pyfunction]
25pub fn get_current_span_id() -> PyResult<u64> {
26 Ok(tracing::Span::current().id().map_or(0, |id| id.into_u64()))
27}
28
29#[pyfunction]
31pub fn forward_to_tracing(py: Python, record: Py<PyAny>) -> PyResult<()> {
32 let message = record.call_method0(py, "getMessage")?;
33 let message: &str = message.extract(py)?;
34 let lineno: i64 = record.getattr(py, "lineno")?.extract(py)?;
35 let file = record.getattr(py, "filename")?;
36 let file: &str = file.extract(py)?;
37 let level: i32 = record.getattr(py, "levelno")?.extract(py)?;
38
39 let actor_id = record
41 .getattr(py, "actor_id")
42 .ok()
43 .and_then(|attr| attr.extract::<String>(py).ok());
44
45 let _recording_guard = extract_recording_span(py);
53
54 match level {
56 40 | 50 => {
57 let exc = record.getattr(py, "exc_info").ok();
58 let traceback = exc
59 .and_then(|exc| {
60 if exc.is_none(py) {
61 return None;
62 }
63 exc.extract::<(Py<PyAny>, Py<PyAny>, Bound<'_, PyTraceback>)>(py)
64 .ok()
65 })
66 .map(|(_, _, tb)| tb.format().unwrap_or_default());
67 match traceback {
68 Some(traceback) => {
69 tracing::error!(
70 target:"log_events",
71 file = file,
72 lineno = lineno,
73 stacktrace = traceback,
74 actor_id = actor_id.as_deref(),
75 message
76 );
77 }
78 None => {
79 tracing::error!(
80 file = file,
81 lineno = lineno,
82 actor_id = actor_id.as_deref(),
83 message
84 );
85 }
86 }
87 }
88 30 => {
89 tracing::warn!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
90 }
91 20 => {
92 tracing::info!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
93 }
94 10 => {
95 tracing::debug!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
96 }
97 _ => {
98 tracing::info!(target:"log_events", file = file, lineno = lineno, actor_id = actor_id.as_deref(), message)
99 }
100 }
101 Ok(())
102}
103
104fn extract_recording_span(py: Python) -> Option<tracing::span::EnteredSpan> {
116 let actor_mesh = py.import("monarch._src.actor.actor_mesh").ok()?;
117 let ctx_var = actor_mesh.getattr("_context").ok()?;
118 let ctx_obj = ctx_var.call_method1("get", (py.None(),)).ok()?;
119 if ctx_obj.is_none() {
120 return None;
121 }
122 let py_ctx = ctx_obj
123 .extract::<PyRef<'_, crate::context::PyContext>>()
124 .ok()?;
125 let span = py_ctx.recording_span()?.clone();
126 Some(span.entered())
127}
128
129#[pyfunction]
131pub fn get_execution_id() -> PyResult<String> {
132 Ok(hyperactor_telemetry::env::execution_id())
133}
134
135#[pyfunction]
136pub fn instant_event(message: &str) -> PyResult<()> {
137 tracing::info!(message);
138 Ok(())
139}
140
141fn as_static_str(to_leak: &str) -> &'static str {
145 String::from(to_leak).leak()
146}
147
148#[pyclass(
149 subclass,
150 module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
151)]
152struct PyCounter {
153 inner: metrics::Counter<u64>,
154}
155
156#[pymethods]
157impl PyCounter {
158 #[new]
159 fn new(name: &str) -> Self {
160 Self {
161 inner: global::meter("monarch")
162 .u64_counter(as_static_str(name))
163 .build(),
164 }
165 }
166
167 fn add(&mut self, value: u64, attributes: Option<std::collections::HashMap<String, String>>) {
168 let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
169 Some(attrs) => attrs
170 .into_iter()
171 .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
172 .collect(),
173 None => vec![],
174 };
175 self.inner.add(value, &kv_attributes);
176 }
177}
178
179#[pyclass(
180 subclass,
181 module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
182)]
183struct PyHistogram {
184 inner: metrics::Histogram<f64>,
185}
186
187#[pymethods]
188impl PyHistogram {
189 #[new]
190 fn new(name: &str) -> Self {
191 Self {
192 inner: global::meter("monarch")
193 .f64_histogram(as_static_str(name))
194 .build(),
195 }
196 }
197
198 fn record(
199 &mut self,
200 value: f64,
201 attributes: Option<std::collections::HashMap<String, String>>,
202 ) {
203 let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
204 Some(attrs) => attrs
205 .into_iter()
206 .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
207 .collect(),
208 None => vec![],
209 };
210 self.inner.record(value, &kv_attributes);
211 }
212}
213
214#[pyclass(
215 subclass,
216 module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
217)]
218struct PyUpDownCounter {
219 inner: metrics::UpDownCounter<i64>,
220}
221
222#[pymethods]
223impl PyUpDownCounter {
224 #[new]
225 fn new(name: &str) -> Self {
226 Self {
227 inner: global::meter("monarch")
228 .i64_up_down_counter(as_static_str(name))
229 .build(),
230 }
231 }
232
233 fn add(&mut self, value: i64, attributes: Option<std::collections::HashMap<String, String>>) {
234 let kv_attributes: Vec<opentelemetry::KeyValue> = match attributes {
235 Some(attrs) => attrs
236 .into_iter()
237 .map(|(k, v)| opentelemetry::KeyValue::new(k, v))
238 .collect(),
239 None => vec![],
240 };
241 self.inner.add(value, &kv_attributes);
242 }
243}
244
245#[pyclass(
246 subclass,
247 module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
248)]
249struct PySpan {
250 id: Option<u64>,
251}
252
253#[pymethods]
254impl PySpan {
255 #[new]
256 #[pyo3(signature = (name, actor_id = None))]
257 fn new(name: &str, actor_id: Option<&PyActorAddr>) -> Self {
258 let mut fields = vec![("name", FieldValue::Str(name.to_string()))];
259 if let Some(actor_id) = actor_id {
260 fields.push(("actor_id", FieldValue::Str(actor_id.inner.to_string())));
261 }
262 let id = start_user_span("python_user_span", USER_TELEMETRY_PREFIX, fields);
263
264 Self {
265 id: (id != 0).then_some(id),
266 }
267 }
268
269 fn __enter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
270 slf
271 }
272
273 fn __exit__(
274 &mut self,
275 _exc_type: Option<Py<PyAny>>,
276 _exc_value: Option<Py<PyAny>>,
277 _traceback: Option<Py<PyAny>>,
278 ) -> PyResult<bool> {
279 if let Some(id) = self.id.take() {
280 end_user_span(id);
281 }
282 Ok(false)
283 }
284}
285
286impl Drop for PySpan {
287 fn drop(&mut self) {
288 if let Some(id) = self.id.take() {
289 end_user_span(id);
290 }
291 }
292}
293
294#[pyclass(
295 subclass,
296 module = "monarch._rust_bindings.monarch_hyperactor.telemetry"
297)]
298struct PySqliteTracing {
299 guard: Option<SqliteTracing>,
300}
301
302#[pymethods]
303impl PySqliteTracing {
304 #[new]
305 #[pyo3(signature = (in_memory = false))]
306 fn new(in_memory: bool) -> PyResult<Self> {
307 let guard = if in_memory {
308 SqliteTracing::new_in_memory()
309 } else {
310 SqliteTracing::new()
311 };
312
313 match guard {
314 Ok(guard) => Ok(Self { guard: Some(guard) }),
315 Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
316 "Failed to create SQLite tracing guard: {}",
317 e
318 ))),
319 }
320 }
321
322 fn db_path(&self) -> PyResult<Option<String>> {
323 match &self.guard {
324 Some(guard) => Ok(guard.db_path().map(|p| p.to_string_lossy().to_string())),
325 None => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
326 "Guard has been closed",
327 )),
328 }
329 }
330
331 fn __enter__(slf: PyRefMut<'_, Self>) -> PyResult<PyRefMut<'_, Self>> {
332 Ok(slf)
333 }
334
335 fn __exit__(
336 &mut self,
337 _exc_type: Option<Py<PyAny>>,
338 _exc_value: Option<Py<PyAny>>,
339 _traceback: Option<Py<PyAny>>,
340 ) -> PyResult<bool> {
341 self.guard = None;
342 Ok(false) }
344
345 fn close(&mut self) {
346 self.guard = None;
347 }
348}
349
350pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
351 let f = wrap_pyfunction!(forward_to_tracing, module)?;
353 f.setattr(
354 "__module__",
355 "monarch._rust_bindings.monarch_hyperactor.telemetry",
356 )?;
357 module.add_function(f)?;
358
359 let get_current_span_id_fn = wrap_pyfunction!(get_current_span_id, module)?;
361 get_current_span_id_fn.setattr(
362 "__module__",
363 "monarch._rust_bindings.monarch_hyperactor.telemetry",
364 )?;
365 module.add_function(get_current_span_id_fn)?;
366
367 let get_execution_id_fn = wrap_pyfunction!(get_execution_id, module)?;
368 get_execution_id_fn.setattr(
369 "__module__",
370 "monarch._rust_bindings.monarch_hyperactor.telemetry",
371 )?;
372 module.add_function(get_execution_id_fn)?;
373
374 let instant_event_fn = wrap_pyfunction!(instant_event, module)?;
375 instant_event_fn.setattr(
376 "__module__",
377 "monarch._rust_bindings.monarch_hyperactor.telemetry",
378 )?;
379 module.add_function(instant_event_fn)?;
380
381 module.add_class::<PySpan>()?;
382 module.add_class::<PyCounter>()?;
383 module.add_class::<PyHistogram>()?;
384 module.add_class::<PyUpDownCounter>()?;
385 module.add_class::<PySqliteTracing>()?;
386 Ok(())
387}
388
389#[cfg(test)]
390mod tests {
391 use pyo3::ffi::c_str;
392 use pyo3::prelude::*;
393
394 use super::*;
395
396 fn init_python() {
397 pyo3::Python::initialize();
398 }
399
400 fn make_log_record(py: Python, message: &str) -> Py<PyAny> {
402 let locals = pyo3::types::PyDict::new(py);
403 locals.set_item("msg", message).unwrap();
404 py.run(
405 c_str!(
406 "import logging\n\
407 record = logging.LogRecord('test', logging.INFO, 'test.py', 1, msg, (), None)"
408 ),
409 None,
410 Some(&locals),
411 )
412 .unwrap();
413 locals.get_item("record").unwrap().unwrap().into()
414 }
415
416 #[test]
419 fn forward_to_tracing_without_context_does_not_crash() {
420 init_python();
421 Python::attach(|py| {
422 let record = make_log_record(py, "no context marker");
423 let result = forward_to_tracing(py, record);
424 assert!(result.is_ok());
425 });
426 }
427
428 #[test]
432 fn forward_to_tracing_captures_when_recording_span_entered() {
433 init_python();
434 hyperactor_telemetry::initialize_logging_for_test();
435
436 let recording = hyperactor_telemetry::recorder().record(64);
437 let span = recording.span("test");
438
439 Python::attach(|py| {
440 let record = make_log_record(py, "recorder marker");
441 let _guard = span.enter();
442 let result = forward_to_tracing(py, record);
443 assert!(result.is_ok());
444 });
445
446 let events = recording.tail();
447 assert!(
448 !events.is_empty(),
449 "expected at least one event in recording"
450 );
451 let last = events.last().unwrap();
452 let fields = format!("{:?}", last);
453 assert!(
454 fields.contains("recorder marker"),
455 "expected 'recorder marker' in event fields, got: {fields}"
456 );
457 }
458
459 #[test]
461 fn extract_recording_span_returns_none_without_context() {
462 init_python();
463 Python::attach(|py| {
464 let result = extract_recording_span(py);
465 assert!(result.is_none());
466 });
467 }
468}