hyperactor_telemetry/
lib.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(internal_features)]
10#![feature(assert_matches)]
11#![feature(sync_unsafe_cell)]
12#![feature(mpmc_channel)]
13#![feature(cfg_version)]
14#![feature(formatting_options)]
15
16// TODO:ehedeman Remove or replace with better config once telemetry perf issues are solved
17/// Environment variable to disable the OpenTelemetry logging layer.
18/// Set to "1" to disable OpenTelemetry  tracing.
19pub const DISABLE_OTEL_TRACING: &str = "DISABLE_OTEL_TRACING";
20
21/// Environment variable to disable the OpenTelemetry logging layer.
22/// Set to "1" to disable OpenTelemetry metrics.
23pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
24
25/// Environment variable to disable the recorder logging layer.
26/// Set to "1" to disable the recorder output.
27pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";
28
29/// Environment variable to enable the sqlite logging layer.
30/// Set to "1" to enable the sqlite tracing.
31pub const ENABLE_SQLITE_TRACING: &str = "ENABLE_SQLITE_TRACING";
32
33/// Environment variable constants
34// Log level (debug, info, warn, error, critical) to capture for Monarch traces on dedicated log file (changes based on environment, see `log_file_path`).
35const MONARCH_FILE_LOG_ENV: &str = "MONARCH_FILE_LOG";
36
37pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME";
38
39// Log level constants
40const LOG_LEVEL_INFO: &str = "info";
41const LOG_LEVEL_DEBUG: &str = "debug";
42
43// Span field constants
44const SPAN_FIELD_RECORDING: &str = "recording";
45#[allow(dead_code)]
46const SPAN_FIELD_RECORDER: &str = "recorder";
47
48// Environment value constants
49const ENV_VALUE_LOCAL: &str = "local";
50const ENV_VALUE_MAST_EMULATOR: &str = "mast_emulator";
51const ENV_VALUE_MAST: &str = "mast";
52const ENV_VALUE_TEST: &str = "test";
53#[allow(dead_code)]
54const ENV_VALUE_LOCAL_MAST_SIMULATOR: &str = "local_mast_simulator";
55
56pub mod in_memory_reader;
57#[cfg(fbcode_build)]
58mod meta;
59mod otel;
60mod pool;
61pub mod recorder;
62mod spool;
63pub mod sqlite;
64pub mod task;
65pub mod trace;
66use std::io::Write;
67use std::str::FromStr;
68use std::sync::Arc;
69use std::sync::Mutex;
70use std::time::Instant;
71
72use lazy_static::lazy_static;
73pub use opentelemetry;
74pub use opentelemetry::Key;
75pub use opentelemetry::KeyValue;
76pub use opentelemetry::Value;
77pub use opentelemetry::global::meter;
78pub use tracing;
79pub use tracing::Level;
80use tracing_appender::non_blocking::NonBlocking;
81use tracing_appender::non_blocking::WorkerGuard;
82use tracing_appender::rolling::RollingFileAppender;
83use tracing_glog::Glog;
84use tracing_glog::GlogFields;
85use tracing_glog::LocalTime;
86use tracing_subscriber::Layer;
87use tracing_subscriber::filter::LevelFilter;
88use tracing_subscriber::filter::Targets;
89use tracing_subscriber::fmt;
90use tracing_subscriber::fmt::FormatEvent;
91use tracing_subscriber::fmt::FormatFields;
92use tracing_subscriber::fmt::format::Writer;
93use tracing_subscriber::registry::LookupSpan;
94
95use crate::recorder::Recorder;
96use crate::sqlite::get_reloadable_sqlite_layer;
97
98pub trait TelemetryClock {
99    fn now(&self) -> tokio::time::Instant;
100    fn system_time_now(&self) -> std::time::SystemTime;
101}
102
103pub struct DefaultTelemetryClock {}
104
105impl TelemetryClock for DefaultTelemetryClock {
106    fn now(&self) -> tokio::time::Instant {
107        tokio::time::Instant::now()
108    }
109
110    fn system_time_now(&self) -> std::time::SystemTime {
111        std::time::SystemTime::now()
112    }
113}
114
115pub fn username() -> String {
116    let env = env::Env::current();
117    match env {
118        env::Env::Mast => {
119            std::env::var("MAST_JOB_OWNER_UNIXNAME").unwrap_or_else(|_| "mast_owner".to_string())
120        }
121        _ => whoami::username(),
122    }
123}
124
125// Given an environment, determine the log file path to write to.
126// If a suffix is provided, it will be prepended with "_" and then appended to file name
127pub fn log_file_path(
128    env: env::Env,
129    suffix: Option<&str>,
130) -> Result<(String, String), anyhow::Error> {
131    let suffix = suffix.map(|s| format!("_{}", s)).unwrap_or_default();
132    match env {
133        env::Env::Local | env::Env::MastEmulator => {
134            let username = if whoami::username().is_empty() {
135                "monarch".to_string()
136            } else {
137                whoami::username()
138            };
139            Ok((
140                format!("/tmp/{}", username),
141                format!("monarch_log{}", suffix),
142            ))
143        }
144        env::Env::Mast => Ok((
145            "/logs/".to_string(),
146            format!("dedicated_log_monarch{}", suffix),
147        )),
148        _ => Err(anyhow::anyhow!(
149            "file writer unsupported for environment {}",
150            env
151        )),
152    }
153}
154
155fn try_create_appender(
156    path: &str,
157    filename: &str,
158    create_dir: bool,
159) -> Result<RollingFileAppender, Box<dyn std::error::Error>> {
160    if create_dir {
161        std::fs::create_dir_all(path)?;
162    }
163    Ok(RollingFileAppender::builder()
164        .filename_prefix(filename)
165        .filename_suffix("log")
166        .build(path)?)
167}
168
169fn writer() -> Box<dyn Write + Send> {
170    match env::Env::current() {
171        env::Env::Test => Box::new(std::io::stderr()),
172        env::Env::Local | env::Env::MastEmulator | env::Env::Mast => {
173            let (path, filename) = log_file_path(env::Env::current(), None).unwrap();
174            match try_create_appender(&path, &filename, true) {
175                Ok(file_appender) => Box::new(file_appender),
176                Err(e) => {
177                    eprintln!(
178                        "unable to create log file in {}: {}. Falling back to stderr",
179                        path, e
180                    );
181                    Box::new(std::io::stderr())
182                }
183            }
184        }
185    }
186}
187
188lazy_static! {
189    static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
190        Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {})));
191}
192
193/// The recorder singleton that is configured as a layer in the the default tracing
194/// subscriber, as configured by `initialize_logging`.
195pub fn recorder() -> &'static Recorder {
196    static RECORDER: std::sync::OnceLock<Recorder> = std::sync::OnceLock::new();
197    RECORDER.get_or_init(Recorder::new)
198}
199
200/// Hotswap the telemetry clock at runtime. This allows changing the clock implementation
201/// after initialization, which is useful for testing or switching between real and simulated time.
202pub fn swap_telemetry_clock(clock: impl TelemetryClock + Send + 'static) {
203    *TELEMETRY_CLOCK.lock().unwrap() = Box::new(clock);
204}
205
206/// Create key value pairs for use in opentelemetry. These pairs can be stored and used multiple
207/// times. Opentelemetry adds key value attributes when you bump counters and histograms.
208/// so MY_COUNTER.add(42, &[key_value!("key", "value")])  and MY_COUNTER.add(42, &[key_value!("key", "other_value")]) will actually bump two separete counters.
209#[macro_export]
210macro_rules! key_value {
211    ($key:expr, $val:expr) => {
212        $crate::opentelemetry::KeyValue::new(
213            $crate::opentelemetry::Key::new($key),
214            $crate::opentelemetry::Value::from($val),
215        )
216    };
217}
218/// Construct the key value attribute slice using mapping syntax.
219/// Example:
220/// ```
221/// # #[macro_use] extern crate hyperactor_telemetry;
222/// # fn main() {
223/// assert_eq!(
224///     kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
225///     &[
226///         key_value!("1", "1"),
227///         key_value!("2", 2),
228///         key_value!("3", 3.0),
229///     ],
230/// );
231/// # }
232/// ```
233#[macro_export]
234macro_rules! kv_pairs {
235    ($($k:expr => $v:expr),* $(,)?) => {
236        &[$($crate::key_value!($k, $v),)*]
237    };
238}
239
240#[derive(Debug, Clone, Copy)]
241pub enum TimeUnit {
242    Millis,
243    Micros,
244    Nanos,
245}
246
247impl TimeUnit {
248    pub fn as_str(&self) -> &'static str {
249        match self {
250            TimeUnit::Millis => "ms",
251            TimeUnit::Micros => "us",
252            TimeUnit::Nanos => "ns",
253        }
254    }
255}
256pub struct Timer(opentelemetry::metrics::Histogram<u64>, TimeUnit);
257
258impl<'a> Timer {
259    pub fn new(data: opentelemetry::metrics::Histogram<u64>, unit: TimeUnit) -> Self {
260        Timer(data, unit)
261    }
262    pub fn start(&'static self, pairs: &'a [opentelemetry::KeyValue]) -> TimerGuard<'a> {
263        TimerGuard {
264            data: self,
265            pairs,
266            start: Instant::now(),
267        }
268    }
269
270    pub fn record(&'static self, dur: std::time::Duration, pairs: &'a [opentelemetry::KeyValue]) {
271        let dur = match self.1 {
272            TimeUnit::Millis => dur.as_millis(),
273            TimeUnit::Micros => dur.as_micros(),
274            TimeUnit::Nanos => dur.as_nanos(),
275        } as u64;
276
277        self.0.record(dur, pairs);
278    }
279}
280pub struct TimerGuard<'a> {
281    data: &'static Timer,
282    pairs: &'a [opentelemetry::KeyValue],
283    start: Instant,
284}
285
286impl<'a> Drop for TimerGuard<'a> {
287    fn drop(&mut self) {
288        let now = Instant::now();
289        let dur = now.duration_since(self.start);
290        self.data.record(dur, self.pairs);
291    }
292}
293
294/// Create a thread safe static timer that can be used to measure durations.
295/// This macro creates a histogram with predefined boundaries appropriate for the specified time unit.
296/// Supported units are "ms" (milliseconds), "us" (microseconds), and "ns" (nanoseconds).
297///
298/// Example:
299/// ```
300/// # #[macro_use] extern crate hyperactor_telemetry;
301/// # fn main() {
302/// declare_static_timer!(REQUEST_TIMER, "request_processing_time", hyperactor_telemetry::TimeUnit::Millis);
303///
304/// {
305///     let _ = REQUEST_TIMER.start(kv_pairs!("endpoint" => "/api/users", "method" => "GET"));
306///     // do something expensive
307/// }
308/// # }
309/// ```
310#[macro_export]
311macro_rules! declare_static_timer {
312    ($name:ident, $key:expr, $unit:path) => {
313        #[doc = "a global histogram timer named: "]
314        #[doc = $key]
315        pub static $name: std::sync::LazyLock<$crate::Timer> = std::sync::LazyLock::new(|| {
316            $crate::Timer::new(
317                $crate::meter(module_path!())
318                    .u64_histogram(format!("{}.{}", $key, $unit.as_str()))
319                    .with_unit($unit.as_str())
320                    .build(),
321                $unit,
322            )
323        });
324    };
325}
326
327/// Create a thread safe static counter that can be incremeneted or decremented.
328/// This is useful to avoid creating temporary counters.
329/// You can safely create counters with the same name. They will be joined by the underlying
330/// runtime and are thread safe.
331///
332/// Example:
333/// ```
334/// struct Url {
335///     pub path: String,
336///     pub proto: String,
337/// }
338///
339/// # #[macro_use] extern crate hyperactor_telemetry;
340/// # fn main() {
341/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
342/// declare_static_counter!(REQUESTS_RECEIVED, "requests_received");
343///
344/// REQUESTS_RECEIVED.add(40, kv_pairs!("path" => url.path, "proto" => url.proto))
345///
346/// # }
347/// ```
348#[macro_export]
349macro_rules! declare_static_counter {
350    ($name:ident, $key:expr) => {
351        #[doc = "a global counter named: "]
352        #[doc = $key]
353        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
354            std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
355    };
356}
357
358/// Create a thread safe static counter that can be incremeneted or decremented.
359/// This is useful to avoid creating temporary counters.
360/// You can safely create counters with the same name. They will be joined by the underlying
361/// runtime and are thread safe.
362///
363/// Example:
364/// ```
365/// struct Url {
366///     pub path: String,
367///     pub proto: String,
368/// }
369///
370/// # #[macro_use] extern crate hyperactor_telemetry;
371/// # fn main() {
372/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
373/// declare_static_counter!(REQUESTS_RECEIVED, "requests_received");
374///
375/// REQUESTS_RECEIVED.add(40, kv_pairs!("path" => url.path, "proto" => url.proto))
376///
377/// # }
378/// ```
379#[macro_export]
380macro_rules! declare_static_up_down_counter {
381    ($name:ident, $key:expr) => {
382        #[doc = "a global up down counter named: "]
383        #[doc = $key]
384        pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
385            std::sync::LazyLock::new(|| {
386                $crate::meter(module_path!())
387                    .i64_up_down_counter($key)
388                    .build()
389            });
390    };
391}
392
393/// Create a thread safe static gauge that can be set to a specific value.
394/// This is useful to avoid creating temporary gauges.
395/// You can safely create gauges with the same name. They will be joined by the underlying
396/// runtime and are thread safe.
397///
398/// Example:
399/// ```
400/// struct System {
401///     pub memory_usage: f64,
402///     pub cpu_usage: f64,
403/// }
404///
405/// # #[macro_use] extern crate hyperactor_telemetry;
406/// # fn main() {
407/// # let system = System{memory_usage: 512.5, cpu_usage: 25.0};
408/// declare_static_gauge!(MEMORY_USAGE, "memory_usage");
409///
410/// MEMORY_USAGE.record(system.memory_usage, kv_pairs!("unit" => "MB", "process" => "hyperactor"))
411///
412/// # }
413/// ```
414#[macro_export]
415macro_rules! declare_static_gauge {
416    ($name:ident, $key:expr) => {
417        #[doc = "a global gauge named: "]
418        #[doc = $key]
419        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
420            std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
421    };
422}
423/// Create a thread safe static observable gauge that can be set to a specific value based on the provided callback.
424/// This is useful for metrics that need to be calculated or retrieved dynamically.
425/// The callback will be executed whenever the gauge is observed by the metrics system.
426///
427/// Example:
428/// ```
429/// # #[macro_use] extern crate hyperactor_telemetry;
430///
431/// # fn main() {
432/// declare_observable_gauge!(MEMORY_USAGE_GAUGE, "memory_usage", |observer| {
433///     // Simulate getting memory usage - this could be any complex operation
434///     observer.observe(512.0, &[]);
435/// });
436///
437/// // The gauge will be automatically updated when observed
438/// # }
439/// ```
440#[macro_export]
441macro_rules! declare_observable_gauge {
442    ($name:ident, $key:expr, $cb:expr) => {
443        #[doc = "a global gauge named: "]
444        #[doc = $key]
445        pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
446            std::sync::LazyLock::new(|| {
447                $crate::meter(module_path!())
448                    .f64_observable_gauge($key)
449                    .with_callback($cb)
450                    .build()
451            });
452    };
453}
454/// Create a thread safe static histogram that can be incremeneted or decremented.
455/// This is useful to avoid creating temporary histograms.
456/// You can safely create histograms with the same name. They will be joined by the underlying
457/// runtime and are thread safe.
458///
459/// Example:
460/// ```
461/// struct Url {
462///     pub path: String,
463///     pub proto: String,
464/// }
465///
466/// # #[macro_use] extern crate hyperactor_telemetry;
467/// # fn main() {
468/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
469/// declare_static_histogram!(REQUEST_LATENCY, "request_latency");
470///
471/// REQUEST_LATENCY.record(40.0, kv_pairs!("path" => url.path, "proto" => url.proto))
472///
473/// # }
474/// ```
475#[macro_export]
476macro_rules! declare_static_histogram {
477    ($name:ident, $key:expr) => {
478        #[doc = "a global histogram named: "]
479        #[doc = $key]
480        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Histogram<f64>> =
481            std::sync::LazyLock::new(|| {
482                hyperactor_telemetry::meter(module_path!())
483                    .f64_histogram($key)
484                    .build()
485            });
486    };
487}
488
489static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
490    std::sync::OnceLock::new();
491
492/// A custom formatter that prepends prefix from env_var to log messages.
493struct PrefixedFormatter {
494    formatter: Glog<LocalTime>,
495    prefix_env_var: Option<String>,
496}
497
498impl PrefixedFormatter {
499    fn new(prefix_env_var: Option<String>) -> Self {
500        let formatter = Glog::default().with_timer(LocalTime::default());
501        Self {
502            formatter,
503            prefix_env_var,
504        }
505    }
506}
507
508impl<S, N> FormatEvent<S, N> for PrefixedFormatter
509where
510    S: tracing::Subscriber + for<'a> LookupSpan<'a>,
511    N: for<'a> FormatFields<'a> + 'static,
512{
513    fn format_event(
514        &self,
515        ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
516        mut writer: Writer<'_>,
517        event: &tracing::Event<'_>,
518    ) -> std::fmt::Result {
519        let prefix: String = if self.prefix_env_var.is_some() {
520            std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
521        } else {
522            "".to_string()
523        };
524
525        if prefix.is_empty() {
526            write!(writer, "[-]")?;
527        } else {
528            write!(writer, "[{}]", prefix)?;
529        }
530
531        self.formatter.format_event(ctx, writer, event)
532    }
533}
534
535/// Set up logging based on the given execution environment. We specialize logging based on how the
536/// logs are consumed. The destination scuba table is specialized based on the execution environment.
537/// mast -> monarch_tracing/prod
538/// devserver -> monarch_tracing/local
539/// unit test  -> monarch_tracing/test
540/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
541/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
542/// to get this behavior.
543pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
544    initialize_logging_with_log_prefix(clock, None);
545}
546
547/// testing
548pub fn initialize_logging_for_test() {
549    initialize_logging(DefaultTelemetryClock {});
550}
551
552/// Set up logging based on the given execution environment. We specialize logging based on how the
553/// logs are consumed. The destination scuba table is specialized based on the execution environment.
554/// mast -> monarch_tracing/prod
555/// devserver -> monarch_tracing/local
556/// unit test  -> monarch_tracing/test
557/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
558/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
559/// to get this behavior.
560///
561/// tracing logs will be prefixed with the given prefix and routed to:
562/// test -> stderr
563/// local -> /tmp/monarch_log.log
564/// mast -> /logs/dedicated_monarch_logs.log
565/// Additionally, is MONARCH_STDERR_LOG sets logs level, then logs will be routed to stderr as well.
566pub fn initialize_logging_with_log_prefix(
567    clock: impl TelemetryClock + Send + 'static,
568    prefix_env_var: Option<String>,
569) {
570    swap_telemetry_clock(clock);
571    let file_log_level = match env::Env::current() {
572        env::Env::Local => LOG_LEVEL_INFO,
573        env::Env::MastEmulator => LOG_LEVEL_INFO,
574        env::Env::Mast => LOG_LEVEL_INFO,
575        env::Env::Test => LOG_LEVEL_DEBUG,
576    };
577    let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
578        .lossy(false)
579        .finish(writer());
580    let writer_guard = Arc::new((non_blocking, guard));
581    let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
582
583    let file_layer = fmt::Layer::default()
584        .with_writer(writer_guard.0.clone())
585        .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
586        .fmt_fields(GlogFields::default().compact())
587        .with_ansi(false)
588        .with_filter(
589            Targets::new()
590                .with_default(LevelFilter::from_level(
591                    tracing::Level::from_str(
592                        &std::env::var(MONARCH_FILE_LOG_ENV).unwrap_or(file_log_level.to_string()),
593                    )
594                    .expect("Invalid log level"),
595                ))
596                .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
597        );
598
599    use tracing_subscriber::Registry;
600    use tracing_subscriber::layer::SubscriberExt;
601    use tracing_subscriber::util::SubscriberInitExt;
602
603    #[cfg(fbcode_build)]
604    {
605        use crate::env::Env;
606        fn is_layer_enabled(env_var: &str) -> bool {
607            std::env::var(env_var).unwrap_or_default() == "1"
608        }
609        fn is_layer_disabled(env_var: &str) -> bool {
610            std::env::var(env_var).unwrap_or_default() == "1"
611        }
612        if let Err(err) = Registry::default()
613            .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) {
614                // TODO: get_reloadable_sqlite_layer currently still returns None,
615                // and some additional work is required to make it work.
616                Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer"))
617            } else {
618                None
619            })
620            .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) {
621                Some(otel::tracing_layer())
622            } else {
623                None
624            })
625            .with(file_layer)
626            .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) {
627                Some(recorder().layer())
628            } else {
629                None
630            })
631            .try_init()
632        {
633            tracing::debug!("logging already initialized for this process: {}", err);
634        }
635        let exec_id = env::execution_id();
636        tracing::info!(
637            target: "execution",
638            execution_id = exec_id,
639            environment = %Env::current(),
640            args = ?std::env::args(),
641            build_mode = build_info::BuildInfo::get_build_mode(),
642            compiler = build_info::BuildInfo::get_compiler(),
643            compiler_version = build_info::BuildInfo::get_compiler_version(),
644            buck_rule = build_info::BuildInfo::get_rule(),
645            package_name = build_info::BuildInfo::get_package_name(),
646            package_release = build_info::BuildInfo::get_package_release(),
647            upstream_revision = build_info::BuildInfo::get_upstream_revision(),
648            revision = build_info::BuildInfo::get_revision(),
649            "logging_initialized"
650        );
651
652        if !is_layer_disabled(DISABLE_OTEL_METRICS) {
653            otel::init_metrics();
654        }
655    }
656    #[cfg(not(fbcode_build))]
657    {
658        if let Err(err) = Registry::default()
659            .with(file_layer)
660            .with(
661                if std::env::var(DISABLE_RECORDER_TRACING).unwrap_or_default() != "1" {
662                    Some(recorder().layer())
663                } else {
664                    None
665                },
666            )
667            .try_init()
668        {
669            tracing::debug!("logging already initialized for this process: {}", err);
670        }
671    }
672}
673
674pub mod env {
675    use rand::RngCore;
676
677    /// Env var name set when monarch launches subprocesses to forward the execution context
678    pub const HYPERACTOR_EXECUTION_ID_ENV: &str = "HYPERACTOR_EXECUTION_ID";
679    pub const OTEL_EXPORTER: &str = "HYPERACTOR_OTEL_EXPORTER";
680    pub const MAST_ENVIRONMENT: &str = "MAST_ENVIRONMENT";
681
682    /// Forward or generate a uuid for this execution. When running in production on mast, this is provided to
683    /// us via the MAST_HPC_JOB_NAME env var. Subprocesses should either forward the MAST_HPC_JOB_NAME
684    /// variable, or set the "MONARCH_EXECUTION_ID" var for subprocesses launched by this process.
685    /// We keep these env vars separate so that other applications that depend on the MAST_HPC_JOB_NAME existing
686    /// to understand their environment do not get confused and think they are running on mast when we are doing
687    ///  local testing.
688    pub fn execution_id() -> String {
689        let id = std::env::var(HYPERACTOR_EXECUTION_ID_ENV).unwrap_or_else(|_| {
690            // not able to find an existing id so generate a unique one: username + current_time + random number.
691            let username = crate::username();
692            let now = {
693                let now = std::time::SystemTime::now();
694                let datetime: chrono::DateTime<chrono::Local> = now.into();
695                datetime.format("%b-%d_%H:%M").to_string()
696            };
697            let random_number: u16 = (rand::thread_rng().next_u32() % 1000) as u16;
698            let execution_id = format!("{}_{}_{}", username, now, random_number);
699            execution_id
700        });
701        // Safety: Can be unsound if there are multiple threads
702        // reading and writing the environment.
703        unsafe {
704            std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, id.clone());
705        }
706        id
707    }
708
709    #[derive(PartialEq)]
710    pub enum Env {
711        Local,
712        Mast,
713        MastEmulator,
714        Test,
715    }
716
717    impl std::fmt::Display for Env {
718        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
719            write!(
720                f,
721                "{}",
722                match self {
723                    Self::Local => crate::ENV_VALUE_LOCAL,
724                    Self::MastEmulator => crate::ENV_VALUE_MAST_EMULATOR,
725                    Self::Mast => crate::ENV_VALUE_MAST,
726                    Self::Test => crate::ENV_VALUE_TEST,
727                }
728            )
729        }
730    }
731
732    impl Env {
733        #[cfg(test)]
734        pub fn current() -> Self {
735            Self::Test
736        }
737
738        #[cfg(not(test))]
739        pub fn current() -> Self {
740            match std::env::var(MAST_ENVIRONMENT).unwrap_or_default().as_str() {
741                // Constant from https://fburl.com/fhysd3fd
742                crate::ENV_VALUE_LOCAL_MAST_SIMULATOR => Self::MastEmulator,
743                _ => match std::env::var(crate::MAST_HPC_JOB_NAME_ENV).is_ok() {
744                    true => Self::Mast,
745                    false => Self::Local,
746                },
747            }
748        }
749    }
750}
751
752#[cfg(test)]
753mod test {
754    use opentelemetry::*;
755    extern crate self as hyperactor_telemetry;
756    use super::*;
757
758    #[test]
759    fn infer_kv_pair_types() {
760        assert_eq!(
761            key_value!("str", "str"),
762            KeyValue::new(Key::new("str"), Value::String("str".into()))
763        );
764        assert_eq!(
765            key_value!("str", 25),
766            KeyValue::new(Key::new("str"), Value::I64(25))
767        );
768        assert_eq!(
769            key_value!("str", 1.1),
770            KeyValue::new(Key::new("str"), Value::F64(1.1))
771        );
772    }
773    #[test]
774    fn kv_pair_slices() {
775        assert_eq!(
776            kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
777            &[
778                key_value!("1", "1"),
779                key_value!("2", 2),
780                key_value!("3", 3.0),
781            ],
782        );
783    }
784
785    #[test]
786    fn test_static_gauge() {
787        // Create a static gauge using the macro
788        declare_static_gauge!(TEST_GAUGE, "test_gauge");
789        declare_static_gauge!(MEMORY_GAUGE, "memory_usage");
790
791        // Set values to the gauge with different attributes
792        // This shouldn't actually log to scribe/scuba in test environment
793        TEST_GAUGE.record(42.5, kv_pairs!("component" => "test", "unit" => "MB"));
794        MEMORY_GAUGE.record(512.0, kv_pairs!("type" => "heap", "process" => "test"));
795
796        // Test with empty attributes
797        TEST_GAUGE.record(50.0, &[]);
798    }
799}