hyperactor_telemetry/
lib.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(internal_features)]
10#![feature(assert_matches)]
11#![feature(sync_unsafe_cell)]
12#![feature(mpmc_channel)]
13#![feature(cfg_version)]
14#![feature(formatting_options)]
15
16// TODO:ehedeman Remove or replace with better config once telemetry perf issues are solved
17/// Environment variable to disable the OpenTelemetry logging layer.
18/// Set to "1" to disable OpenTelemetry  tracing.
19pub const DISABLE_OTEL_TRACING: &str = "DISABLE_OTEL_TRACING";
20
21/// Environment variable to disable the OpenTelemetry logging layer.
22/// Set to "1" to disable OpenTelemetry metrics.
23pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
24
25/// Environment variable to disable the recorder logging layer.
26/// Set to "1" to disable the recorder output.
27pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";
28
29pub mod in_memory_reader;
30#[cfg(fbcode_build)]
31mod meta;
32mod otel;
33mod pool;
34pub mod recorder;
35mod spool;
36pub mod sqlite;
37pub mod trace;
38use std::io::IsTerminal;
39use std::io::Write;
40use std::str::FromStr;
41use std::sync::Arc;
42use std::sync::Mutex;
43use std::time::Instant;
44
45use lazy_static::lazy_static;
46pub use opentelemetry;
47pub use opentelemetry::Key;
48pub use opentelemetry::KeyValue;
49pub use opentelemetry::Value;
50pub use opentelemetry::global::meter;
51pub use tracing::Level;
52use tracing_appender::non_blocking::NonBlocking;
53use tracing_appender::non_blocking::WorkerGuard;
54use tracing_appender::rolling::RollingFileAppender;
55use tracing_glog::Glog;
56use tracing_glog::GlogFields;
57use tracing_glog::LocalTime;
58use tracing_subscriber::Layer;
59use tracing_subscriber::filter::LevelFilter;
60use tracing_subscriber::filter::Targets;
61use tracing_subscriber::fmt;
62use tracing_subscriber::fmt::FormatEvent;
63use tracing_subscriber::fmt::FormatFields;
64use tracing_subscriber::fmt::format::Writer;
65use tracing_subscriber::registry::LookupSpan;
66
67use crate::recorder::Recorder;
68use crate::sqlite::get_reloadable_sqlite_layer;
69
70pub trait TelemetryClock {
71    fn now(&self) -> tokio::time::Instant;
72    fn system_time_now(&self) -> std::time::SystemTime;
73}
74
75pub struct DefaultTelemetryClock {}
76
77impl TelemetryClock for DefaultTelemetryClock {
78    fn now(&self) -> tokio::time::Instant {
79        tokio::time::Instant::now()
80    }
81
82    fn system_time_now(&self) -> std::time::SystemTime {
83        std::time::SystemTime::now()
84    }
85}
86
87// Given an environment, determine the log file path to write to.
88pub fn log_file_path(env: env::Env) -> Result<(String, String), anyhow::Error> {
89    match env {
90        env::Env::Local | env::Env::MastEmulator => {
91            let username = if whoami::username().is_empty() {
92                "monarch".to_string()
93            } else {
94                whoami::username()
95            };
96            Ok((format!("/tmp/{}", username), "monarch_log".to_string()))
97        }
98        env::Env::Mast => Ok(("/logs/".to_string(), "dedicated_log_monarch".to_string())),
99        _ => Err(anyhow::anyhow!(
100            "file writer unsupported for environment {}",
101            env
102        )),
103    }
104}
105
106fn try_create_appender(
107    path: &str,
108    filename: &str,
109    create_dir: bool,
110) -> Result<RollingFileAppender, Box<dyn std::error::Error>> {
111    if create_dir {
112        std::fs::create_dir_all(path)?;
113    }
114    Ok(RollingFileAppender::builder()
115        .filename_prefix(filename)
116        .filename_suffix("log")
117        .build(path)?)
118}
119
120fn writer() -> Box<dyn Write + Send> {
121    match env::Env::current() {
122        env::Env::Test => Box::new(std::io::stderr()),
123        env::Env::Local | env::Env::MastEmulator | env::Env::Mast => {
124            let (path, filename) = log_file_path(env::Env::current()).unwrap();
125            match try_create_appender(&path, &filename, true) {
126                Ok(file_appender) => Box::new(file_appender),
127                Err(e) => {
128                    eprintln!(
129                        "unable to create log file in {}: {}. Falling back to stderr",
130                        path, e
131                    );
132                    Box::new(std::io::stderr())
133                }
134            }
135        }
136    }
137}
138
139lazy_static! {
140    static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
141        Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {})));
142}
143
144/// The recorder singleton that is configured as a layer in the the default tracing
145/// subscriber, as configured by `initialize_logging`.
146pub fn recorder() -> &'static Recorder {
147    static RECORDER: std::sync::OnceLock<Recorder> = std::sync::OnceLock::new();
148    RECORDER.get_or_init(Recorder::new)
149}
150
151/// Hotswap the telemetry clock at runtime. This allows changing the clock implementation
152/// after initialization, which is useful for testing or switching between real and simulated time.
153pub fn swap_telemetry_clock(clock: impl TelemetryClock + Send + 'static) {
154    *TELEMETRY_CLOCK.lock().unwrap() = Box::new(clock);
155}
156
157/// Create key value pairs for use in opentelemetry. These pairs can be stored and used multiple
158/// times. Opentelemetry adds key value attributes when you bump counters and histograms.
159/// so MY_COUNTER.add(42, &[key_value!("key", "value")])  and MY_COUNTER.add(42, &[key_value!("key", "other_value")]) will actually bump two separete counters.
160#[macro_export]
161macro_rules! key_value {
162    ($key:expr_2021, $val:expr_2021) => {
163        $crate::opentelemetry::KeyValue::new(
164            $crate::opentelemetry::Key::new($key),
165            $crate::opentelemetry::Value::from($val),
166        )
167    };
168}
169/// Construct the key value attribute slice using mapping syntax.
170/// Example:
171/// ```
172/// # #[macro_use] extern crate hyperactor_telemetry;
173/// # fn main() {
174/// assert_eq!(
175///     kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
176///     &[
177///         key_value!("1", "1"),
178///         key_value!("2", 2),
179///         key_value!("3", 3.0),
180///     ],
181/// );
182/// # }
183/// ```
184#[macro_export]
185macro_rules! kv_pairs {
186    ($($k:expr_2021 => $v:expr_2021),* $(,)?) => {
187        &[$($crate::key_value!($k, $v),)*]
188    };
189}
190
191#[derive(Debug, Clone, Copy)]
192pub enum TimeUnit {
193    Millis,
194    Micros,
195    Nanos,
196}
197
198impl TimeUnit {
199    pub fn as_str(&self) -> &'static str {
200        match self {
201            TimeUnit::Millis => "ms",
202            TimeUnit::Micros => "us",
203            TimeUnit::Nanos => "ns",
204        }
205    }
206}
207pub struct Timer(opentelemetry::metrics::Histogram<u64>, TimeUnit);
208
209impl<'a> Timer {
210    pub fn new(data: opentelemetry::metrics::Histogram<u64>, unit: TimeUnit) -> Self {
211        Timer(data, unit)
212    }
213    pub fn start(&'static self, pairs: &'a [opentelemetry::KeyValue]) -> TimerGuard<'a> {
214        TimerGuard {
215            data: self,
216            pairs,
217            start: Instant::now(),
218        }
219    }
220
221    pub fn record(&'static self, dur: std::time::Duration, pairs: &'a [opentelemetry::KeyValue]) {
222        let dur = match self.1 {
223            TimeUnit::Millis => dur.as_millis(),
224            TimeUnit::Micros => dur.as_micros(),
225            TimeUnit::Nanos => dur.as_nanos(),
226        } as u64;
227
228        self.0.record(dur, pairs);
229    }
230}
231pub struct TimerGuard<'a> {
232    data: &'static Timer,
233    pairs: &'a [opentelemetry::KeyValue],
234    start: Instant,
235}
236
237impl<'a> Drop for TimerGuard<'a> {
238    fn drop(&mut self) {
239        let now = Instant::now();
240        let dur = now.duration_since(self.start);
241        self.data.record(dur, self.pairs);
242    }
243}
244
245/// Create a thread safe static timer that can be used to measure durations.
246/// This macro creates a histogram with predefined boundaries appropriate for the specified time unit.
247/// Supported units are "ms" (milliseconds), "us" (microseconds), and "ns" (nanoseconds).
248///
249/// Example:
250/// ```
251/// # #[macro_use] extern crate hyperactor_telemetry;
252/// # fn main() {
253/// declare_static_timer!(REQUEST_TIMER, "request_processing_time", hyperactor_telemetry::TimeUnit::Millis);
254///
255/// {
256///     let _ = REQUEST_TIMER.start(kv_pairs!("endpoint" => "/api/users", "method" => "GET"));
257///     // do something expensive
258/// }
259/// # }
260/// ```
261#[macro_export]
262macro_rules! declare_static_timer {
263    ($name:ident, $key:expr_2021, $unit:path) => {
264        #[doc = "a global histogram timer named: "]
265        #[doc = $key]
266        pub static $name: std::sync::LazyLock<$crate::Timer> = std::sync::LazyLock::new(|| {
267            $crate::Timer::new(
268                $crate::meter(module_path!())
269                    .u64_histogram(format!("{}.{}", $key, $unit.as_str()))
270                    .with_unit($unit.as_str())
271                    .build(),
272                $unit,
273            )
274        });
275    };
276}
277
278/// Create a thread safe static counter that can be incremeneted or decremented.
279/// This is useful to avoid creating temporary counters.
280/// You can safely create counters with the same name. They will be joined by the underlying
281/// runtime and are thread safe.
282///
283/// Example:
284/// ```
285/// struct Url {
286///     pub path: String,
287///     pub proto: String,
288/// }
289///
290/// # #[macro_use] extern crate hyperactor_telemetry;
291/// # fn main() {
292/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
293/// declare_static_counter!(REQUESTS_RECEIVED, "requests_received");
294///
295/// REQUESTS_RECEIVED.add(40, kv_pairs!("path" => url.path, "proto" => url.proto))
296///
297/// # }
298/// ```
299#[macro_export]
300macro_rules! declare_static_counter {
301    ($name:ident, $key:expr_2021) => {
302        #[doc = "a global counter named: "]
303        #[doc = $key]
304        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
305            std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
306    };
307}
308
309/// Create a thread safe static counter that can be incremeneted or decremented.
310/// This is useful to avoid creating temporary counters.
311/// You can safely create counters with the same name. They will be joined by the underlying
312/// runtime and are thread safe.
313///
314/// Example:
315/// ```
316/// struct Url {
317///     pub path: String,
318///     pub proto: String,
319/// }
320///
321/// # #[macro_use] extern crate hyperactor_telemetry;
322/// # fn main() {
323/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
324/// declare_static_counter!(REQUESTS_RECEIVED, "requests_received");
325///
326/// REQUESTS_RECEIVED.add(40, kv_pairs!("path" => url.path, "proto" => url.proto))
327///
328/// # }
329/// ```
330#[macro_export]
331macro_rules! declare_static_up_down_counter {
332    ($name:ident, $key:expr_2021) => {
333        #[doc = "a global up down counter named: "]
334        #[doc = $key]
335        pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
336            std::sync::LazyLock::new(|| {
337                $crate::meter(module_path!())
338                    .i64_up_down_counter($key)
339                    .build()
340            });
341    };
342}
343
344/// Create a thread safe static gauge that can be set to a specific value.
345/// This is useful to avoid creating temporary gauges.
346/// You can safely create gauges with the same name. They will be joined by the underlying
347/// runtime and are thread safe.
348///
349/// Example:
350/// ```
351/// struct System {
352///     pub memory_usage: f64,
353///     pub cpu_usage: f64,
354/// }
355///
356/// # #[macro_use] extern crate hyperactor_telemetry;
357/// # fn main() {
358/// # let system = System{memory_usage: 512.5, cpu_usage: 25.0};
359/// declare_static_gauge!(MEMORY_USAGE, "memory_usage");
360///
361/// MEMORY_USAGE.record(system.memory_usage, kv_pairs!("unit" => "MB", "process" => "hyperactor"))
362///
363/// # }
364/// ```
365#[macro_export]
366macro_rules! declare_static_gauge {
367    ($name:ident, $key:expr_2021) => {
368        #[doc = "a global gauge named: "]
369        #[doc = $key]
370        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
371            std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
372    };
373}
374/// Create a thread safe static observable gauge that can be set to a specific value based on the provided callback.
375/// This is useful for metrics that need to be calculated or retrieved dynamically.
376/// The callback will be executed whenever the gauge is observed by the metrics system.
377///
378/// Example:
379/// ```
380/// # #[macro_use] extern crate hyperactor_telemetry;
381///
382/// # fn main() {
383/// declare_observable_gauge!(MEMORY_USAGE_GAUGE, "memory_usage", |observer| {
384///     // Simulate getting memory usage - this could be any complex operation
385///     observer.observe(512.0, &[]);
386/// });
387///
388/// // The gauge will be automatically updated when observed
389/// # }
390/// ```
391#[macro_export]
392macro_rules! declare_observable_gauge {
393    ($name:ident, $key:expr_2021, $cb:expr_2021) => {
394        #[doc = "a global gauge named: "]
395        #[doc = $key]
396        pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
397            std::sync::LazyLock::new(|| {
398                $crate::meter(module_path!())
399                    .f64_observable_gauge($key)
400                    .with_callback($cb)
401                    .build()
402            });
403    };
404}
405/// Create a thread safe static histogram that can be incremeneted or decremented.
406/// This is useful to avoid creating temporary histograms.
407/// You can safely create histograms with the same name. They will be joined by the underlying
408/// runtime and are thread safe.
409///
410/// Example:
411/// ```
412/// struct Url {
413///     pub path: String,
414///     pub proto: String,
415/// }
416///
417/// # #[macro_use] extern crate hyperactor_telemetry;
418/// # fn main() {
419/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
420/// declare_static_histogram!(REQUEST_LATENCY, "request_latency");
421///
422/// REQUEST_LATENCY.record(40.0, kv_pairs!("path" => url.path, "proto" => url.proto))
423///
424/// # }
425/// ```
426#[macro_export]
427macro_rules! declare_static_histogram {
428    ($name:ident, $key:expr_2021) => {
429        #[doc = "a global histogram named: "]
430        #[doc = $key]
431        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Histogram<f64>> =
432            std::sync::LazyLock::new(|| {
433                hyperactor_telemetry::meter(module_path!())
434                    .f64_histogram($key)
435                    .build()
436            });
437    };
438}
439
440static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
441    std::sync::OnceLock::new();
442
443/// A custom formatter that prepends prefix from env_var to log messages.
444struct PrefixedFormatter {
445    formatter: Glog<LocalTime>,
446    prefix_env_var: Option<String>,
447}
448
449impl PrefixedFormatter {
450    fn new(prefix_env_var: Option<String>) -> Self {
451        let formatter = Glog::default().with_timer(LocalTime::default());
452        Self {
453            formatter,
454            prefix_env_var,
455        }
456    }
457}
458
459impl<S, N> FormatEvent<S, N> for PrefixedFormatter
460where
461    S: tracing::Subscriber + for<'a> LookupSpan<'a>,
462    N: for<'a> FormatFields<'a> + 'static,
463{
464    fn format_event(
465        &self,
466        ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
467        mut writer: Writer<'_>,
468        event: &tracing::Event<'_>,
469    ) -> std::fmt::Result {
470        let prefix: String = if self.prefix_env_var.is_some() {
471            std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
472        } else {
473            "".to_string()
474        };
475
476        if prefix.is_empty() {
477            write!(writer, "[-]")?;
478        } else {
479            write!(writer, "[{}]", prefix)?;
480        }
481
482        self.formatter.format_event(ctx, writer, event)
483    }
484}
485
486/// Set up logging based on the given execution environment. We specialize logging based on how the
487/// logs are consumed. The destination scuba table is specialized based on the execution environment.
488/// mast -> monarch_tracing/prod
489/// devserver -> monarch_tracing/local
490/// unit test  -> monarch_tracing/test
491/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
492/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
493/// to get this behavior.
494pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
495    initialize_logging_with_log_prefix(clock, None);
496}
497
498/// Set up logging based on the given execution environment. We specialize logging based on how the
499/// logs are consumed. The destination scuba table is specialized based on the execution environment.
500/// mast -> monarch_tracing/prod
501/// devserver -> monarch_tracing/local
502/// unit test  -> monarch_tracing/test
503/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
504/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
505/// to get this behavior.
506///
507/// tracing logs will be prefixed with the given prefix and routed to:
508/// test -> stderr
509/// local -> /tmp/monarch_log.log
510/// mast -> /logs/dedicated_monarch_logs.log
511/// Additionally, is MONARCH_STDERR_LOG sets logs level, then logs will be routed to stderr as well.
512pub fn initialize_logging_with_log_prefix(
513    clock: impl TelemetryClock + Send + 'static,
514    prefix_env_var: Option<String>,
515) {
516    swap_telemetry_clock(clock);
517    let file_log_level = match env::Env::current() {
518        env::Env::Local => "info",
519        env::Env::MastEmulator => "info",
520        env::Env::Mast => "info",
521        env::Env::Test => "debug",
522    };
523    let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
524        .lossy(false)
525        .finish(writer());
526    let writer_guard = Arc::new((non_blocking, guard));
527    let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
528
529    let file_layer = fmt::Layer::default()
530        .with_writer(writer_guard.0.clone())
531        .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
532        .fmt_fields(GlogFields::default().compact())
533        .with_ansi(false)
534        .with_filter(
535            Targets::new()
536                .with_default(LevelFilter::from_level(
537                    tracing::Level::from_str(
538                        &std::env::var("MONARCH_FILE_LOG").unwrap_or(file_log_level.to_string()),
539                    )
540                    .expect("Invalid log level"),
541                ))
542                .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
543        );
544
545    let stderr_log_level = match env::Env::current() {
546        env::Env::Local => "error",
547        env::Env::MastEmulator => "info",
548        env::Env::Mast => "error",
549        env::Env::Test => "debug",
550    };
551    let stderr_layer = fmt::Layer::default()
552        .with_writer(std::io::stderr)
553        .event_format(PrefixedFormatter::new(prefix_env_var))
554        .fmt_fields(GlogFields::default().compact())
555        .with_ansi(std::io::stderr().is_terminal())
556        .with_filter(
557            Targets::new()
558                .with_default(LevelFilter::from_level(
559                    tracing::Level::from_str(
560                        &std::env::var("MONARCH_STDERR_LOG")
561                            .unwrap_or(stderr_log_level.to_string()),
562                    )
563                    .expect("Invalid log level"),
564                ))
565                .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
566        );
567
568    let sqlite_layer = get_reloadable_sqlite_layer().unwrap();
569
570    use tracing_subscriber::Registry;
571    use tracing_subscriber::layer::SubscriberExt;
572    use tracing_subscriber::util::SubscriberInitExt;
573
574    #[cfg(fbcode_build)]
575    {
576        use crate::env::Env;
577        fn is_layer_enabled(env_var: &str) -> bool {
578            std::env::var(env_var).unwrap_or_default() != "1"
579        }
580        if let Err(err) = Registry::default()
581            .with(sqlite_layer)
582            .with(if is_layer_enabled(DISABLE_OTEL_TRACING) {
583                Some(otel::tracing_layer())
584            } else {
585                None
586            })
587            .with(file_layer)
588            .with(stderr_layer)
589            .with(if is_layer_enabled(DISABLE_RECORDER_TRACING) {
590                Some(recorder().layer())
591            } else {
592                None
593            })
594            .try_init()
595        {
596            tracing::debug!("logging already initialized for this process: {}", err);
597        }
598        let exec_id = env::execution_id();
599        tracing::debug!(
600            target: "execution",
601            execution_id = exec_id,
602            environment = %Env::current(),
603            args = ?std::env::args(),
604            build_mode = build_info::BuildInfo::get_build_mode(),
605            compiler = build_info::BuildInfo::get_compiler(),
606            compiler_version = build_info::BuildInfo::get_compiler_version(),
607            buck_rule = build_info::BuildInfo::get_rule(),
608            package_name = build_info::BuildInfo::get_package_name(),
609            package_release = build_info::BuildInfo::get_package_release(),
610            upstream_revision = build_info::BuildInfo::get_revision(),
611            "logging_initialized"
612        );
613
614        if is_layer_enabled(DISABLE_OTEL_METRICS) {
615            otel::init_metrics();
616        }
617    }
618    #[cfg(not(fbcode_build))]
619    {
620        if let Err(err) = Registry::default()
621            .with(file_layer)
622            .with(stderr_layer)
623            .with(
624                if std::env::var(DISABLE_RECORDER_TRACING).unwrap_or_default() != "1" {
625                    Some(recorder().layer())
626                } else {
627                    None
628                },
629            )
630            .try_init()
631        {
632            tracing::debug!("logging already initialized for this process: {}", err);
633        }
634    }
635}
636
637pub mod env {
638    use rand::Rng;
639    use rand::distributions::Alphanumeric;
640
641    /// Env var name set when monarch launches subprocesses to forward the execution context
642    pub const HYPERACTOR_EXECUTION_ID_ENV: &str = "HYPERACTOR_EXECUTION_ID";
643    pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME";
644    pub const OTEL_EXPORTER: &str = "HYPERACTOR_OTEL_EXPORTER";
645    pub const MAST_ENVIRONMENT: &str = "MAST_ENVIRONMENT";
646
647    /// Forward or generate a uuid for this execution. When running in production on mast, this is provided to
648    /// us via the MAST_HPC_JOB_NAME env var. Subprocesses should either forward the MAST_HPC_JOB_NAME
649    /// variable, or set the "MONARCH_EXECUTION_ID" var for subprocesses launched by this process.
650    /// We keep these env vars separate so that other applications that depend on the MAST_HPC_JOB_NAME existing
651    /// to understand their environment do not get confused and think they are running on mast when we are doing
652    ///  local testing.
653    pub fn execution_id() -> String {
654        let id = std::env::var(HYPERACTOR_EXECUTION_ID_ENV)
655            .or(std::env::var(MAST_HPC_JOB_NAME_ENV))
656            .ok()
657            .unwrap_or_else(|| {
658                // not able to find an existing id so generate a random one. 24 bytes should be sufficient.
659                let random_string: String = rand::thread_rng()
660                    .sample_iter(&Alphanumeric)
661                    .take(24)
662                    .map(char::from)
663                    .collect::<String>();
664                random_string
665            });
666        // Safety: Can be unsound if there are multiple threads
667        // reading and writing the environment.
668        unsafe {
669            std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, id.clone());
670        }
671        id
672    }
673
674    #[derive(PartialEq)]
675    pub enum Env {
676        Local,
677        Mast,
678        MastEmulator,
679        Test,
680    }
681
682    impl std::fmt::Display for Env {
683        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
684            write!(
685                f,
686                "{}",
687                match self {
688                    Self::Local => "local",
689                    Self::MastEmulator => "mast_emulator",
690                    Self::Mast => "mast",
691                    Self::Test => "test",
692                }
693            )
694        }
695    }
696
697    impl Env {
698        #[cfg(test)]
699        pub fn current() -> Self {
700            Self::Test
701        }
702
703        #[cfg(not(test))]
704        pub fn current() -> Self {
705            match std::env::var(MAST_ENVIRONMENT).unwrap_or_default().as_str() {
706                // Constant from https://fburl.com/fhysd3fd
707                "local_mast_simulator" => Self::MastEmulator,
708                _ => match std::env::var("MAST_HPC_JOB_NAME").is_ok() {
709                    true => Self::Mast,
710                    false => Self::Local,
711                },
712            }
713        }
714    }
715}
716
717#[cfg(test)]
718mod test {
719    use opentelemetry::*;
720    extern crate self as hyperactor_telemetry;
721    use super::*;
722
723    #[test]
724    fn infer_kv_pair_types() {
725        assert_eq!(
726            key_value!("str", "str"),
727            KeyValue::new(Key::new("str"), Value::String("str".into()))
728        );
729        assert_eq!(
730            key_value!("str", 25),
731            KeyValue::new(Key::new("str"), Value::I64(25))
732        );
733        assert_eq!(
734            key_value!("str", 1.1),
735            KeyValue::new(Key::new("str"), Value::F64(1.1))
736        );
737    }
738    #[test]
739    fn kv_pair_slices() {
740        assert_eq!(
741            kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
742            &[
743                key_value!("1", "1"),
744                key_value!("2", 2),
745                key_value!("3", 3.0),
746            ],
747        );
748    }
749
750    #[test]
751    fn test_static_gauge() {
752        // Create a static gauge using the macro
753        declare_static_gauge!(TEST_GAUGE, "test_gauge");
754        declare_static_gauge!(MEMORY_GAUGE, "memory_usage");
755
756        // Set values to the gauge with different attributes
757        // This shouldn't actually log to scribe/scuba in test environment
758        TEST_GAUGE.record(42.5, kv_pairs!("component" => "test", "unit" => "MB"));
759        MEMORY_GAUGE.record(512.0, kv_pairs!("type" => "heap", "process" => "test"));
760
761        // Test with empty attributes
762        TEST_GAUGE.record(50.0, &[]);
763    }
764}