hyperactor_telemetry/
lib.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(internal_features)]
10#![feature(assert_matches)]
11#![feature(sync_unsafe_cell)]
12#![feature(mpmc_channel)]
13#![feature(cfg_version)]
14#![feature(formatting_options)]
15
16// TODO:ehedeman Remove or replace with better config once telemetry perf issues are solved
17/// Environment variable to disable the OpenTelemetry logging layer.
18/// Set to "1" to disable OpenTelemetry  tracing.
19pub const DISABLE_OTEL_TRACING: &str = "DISABLE_OTEL_TRACING";
20
21/// Environment variable to disable the OpenTelemetry logging layer.
22/// Set to "1" to disable OpenTelemetry metrics.
23pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
24
25/// Environment variable to disable the recorder logging layer.
26/// Set to "1" to disable the recorder output.
27pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";
28
29/// Environment variable to disable the sqlite logging layer.
30/// Set to "1" to disable the sqlite tracing.
31pub const DISABLE_SQLITE_TRACING: &str = "DISABLE_SQLITE_TRACING";
32// Environment variable constants
33const MONARCH_FILE_LOG_ENV: &str = "MONARCH_FILE_LOG";
34const MONARCH_STDERR_LOG_ENV: &str = "MONARCH_STDERR_LOG";
35pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME";
36
37// Log level constants
38const LOG_LEVEL_INFO: &str = "info";
39const LOG_LEVEL_DEBUG: &str = "debug";
40const LOG_LEVEL_ERROR: &str = "error";
41
42// Span field constants
43const SPAN_FIELD_RECORDING: &str = "recording";
44const SPAN_FIELD_RECORDER: &str = "recorder";
45
46// Environment value constants
47const ENV_VALUE_LOCAL: &str = "local";
48const ENV_VALUE_MAST_EMULATOR: &str = "mast_emulator";
49const ENV_VALUE_MAST: &str = "mast";
50const ENV_VALUE_TEST: &str = "test";
51const ENV_VALUE_LOCAL_MAST_SIMULATOR: &str = "local_mast_simulator";
52
53pub mod in_memory_reader;
54#[cfg(fbcode_build)]
55mod meta;
56mod otel;
57mod pool;
58pub mod recorder;
59mod spool;
60pub mod sqlite;
61pub mod task;
62pub mod trace;
63use std::io::IsTerminal;
64use std::io::Write;
65use std::str::FromStr;
66use std::sync::Arc;
67use std::sync::Mutex;
68use std::time::Instant;
69
70use lazy_static::lazy_static;
71pub use opentelemetry;
72pub use opentelemetry::Key;
73pub use opentelemetry::KeyValue;
74pub use opentelemetry::Value;
75pub use opentelemetry::global::meter;
76pub use tracing;
77pub use tracing::Level;
78use tracing_appender::non_blocking::NonBlocking;
79use tracing_appender::non_blocking::WorkerGuard;
80use tracing_appender::rolling::RollingFileAppender;
81use tracing_glog::Glog;
82use tracing_glog::GlogFields;
83use tracing_glog::LocalTime;
84use tracing_subscriber::Layer;
85use tracing_subscriber::filter::LevelFilter;
86use tracing_subscriber::filter::Targets;
87use tracing_subscriber::fmt;
88use tracing_subscriber::fmt::FormatEvent;
89use tracing_subscriber::fmt::FormatFields;
90use tracing_subscriber::fmt::format::Writer;
91use tracing_subscriber::registry::LookupSpan;
92
93use crate::recorder::Recorder;
94use crate::sqlite::get_reloadable_sqlite_layer;
95
96pub trait TelemetryClock {
97    fn now(&self) -> tokio::time::Instant;
98    fn system_time_now(&self) -> std::time::SystemTime;
99}
100
101pub struct DefaultTelemetryClock {}
102
103impl TelemetryClock for DefaultTelemetryClock {
104    fn now(&self) -> tokio::time::Instant {
105        tokio::time::Instant::now()
106    }
107
108    fn system_time_now(&self) -> std::time::SystemTime {
109        std::time::SystemTime::now()
110    }
111}
112
113pub fn username() -> String {
114    let env = env::Env::current();
115    match env {
116        env::Env::Mast => {
117            std::env::var("MAST_JOB_OWNER_UNIXNAME").unwrap_or_else(|_| "mast_owner".to_string())
118        }
119        _ => whoami::username(),
120    }
121}
122
123// Given an environment, determine the log file path to write to.
124pub fn log_file_path(env: env::Env) -> Result<(String, String), anyhow::Error> {
125    match env {
126        env::Env::Local | env::Env::MastEmulator => {
127            let username = if whoami::username().is_empty() {
128                "monarch".to_string()
129            } else {
130                whoami::username()
131            };
132            Ok((format!("/tmp/{}", username), "monarch_log".to_string()))
133        }
134        env::Env::Mast => Ok(("/logs/".to_string(), "dedicated_log_monarch".to_string())),
135        _ => Err(anyhow::anyhow!(
136            "file writer unsupported for environment {}",
137            env
138        )),
139    }
140}
141
142fn try_create_appender(
143    path: &str,
144    filename: &str,
145    create_dir: bool,
146) -> Result<RollingFileAppender, Box<dyn std::error::Error>> {
147    if create_dir {
148        std::fs::create_dir_all(path)?;
149    }
150    Ok(RollingFileAppender::builder()
151        .filename_prefix(filename)
152        .filename_suffix("log")
153        .build(path)?)
154}
155
156fn writer() -> Box<dyn Write + Send> {
157    match env::Env::current() {
158        env::Env::Test => Box::new(std::io::stderr()),
159        env::Env::Local | env::Env::MastEmulator | env::Env::Mast => {
160            let (path, filename) = log_file_path(env::Env::current()).unwrap();
161            match try_create_appender(&path, &filename, true) {
162                Ok(file_appender) => Box::new(file_appender),
163                Err(e) => {
164                    eprintln!(
165                        "unable to create log file in {}: {}. Falling back to stderr",
166                        path, e
167                    );
168                    Box::new(std::io::stderr())
169                }
170            }
171        }
172    }
173}
174
175lazy_static! {
176    static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
177        Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {})));
178}
179
180/// The recorder singleton that is configured as a layer in the the default tracing
181/// subscriber, as configured by `initialize_logging`.
182pub fn recorder() -> &'static Recorder {
183    static RECORDER: std::sync::OnceLock<Recorder> = std::sync::OnceLock::new();
184    RECORDER.get_or_init(Recorder::new)
185}
186
187/// Hotswap the telemetry clock at runtime. This allows changing the clock implementation
188/// after initialization, which is useful for testing or switching between real and simulated time.
189pub fn swap_telemetry_clock(clock: impl TelemetryClock + Send + 'static) {
190    *TELEMETRY_CLOCK.lock().unwrap() = Box::new(clock);
191}
192
193/// Create key value pairs for use in opentelemetry. These pairs can be stored and used multiple
194/// times. Opentelemetry adds key value attributes when you bump counters and histograms.
195/// so MY_COUNTER.add(42, &[key_value!("key", "value")])  and MY_COUNTER.add(42, &[key_value!("key", "other_value")]) will actually bump two separete counters.
196#[macro_export]
197macro_rules! key_value {
198    ($key:expr, $val:expr) => {
199        $crate::opentelemetry::KeyValue::new(
200            $crate::opentelemetry::Key::new($key),
201            $crate::opentelemetry::Value::from($val),
202        )
203    };
204}
205/// Construct the key value attribute slice using mapping syntax.
206/// Example:
207/// ```
208/// # #[macro_use] extern crate hyperactor_telemetry;
209/// # fn main() {
210/// assert_eq!(
211///     kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
212///     &[
213///         key_value!("1", "1"),
214///         key_value!("2", 2),
215///         key_value!("3", 3.0),
216///     ],
217/// );
218/// # }
219/// ```
220#[macro_export]
221macro_rules! kv_pairs {
222    ($($k:expr => $v:expr),* $(,)?) => {
223        &[$($crate::key_value!($k, $v),)*]
224    };
225}
226
227#[derive(Debug, Clone, Copy)]
228pub enum TimeUnit {
229    Millis,
230    Micros,
231    Nanos,
232}
233
234impl TimeUnit {
235    pub fn as_str(&self) -> &'static str {
236        match self {
237            TimeUnit::Millis => "ms",
238            TimeUnit::Micros => "us",
239            TimeUnit::Nanos => "ns",
240        }
241    }
242}
243pub struct Timer(opentelemetry::metrics::Histogram<u64>, TimeUnit);
244
245impl<'a> Timer {
246    pub fn new(data: opentelemetry::metrics::Histogram<u64>, unit: TimeUnit) -> Self {
247        Timer(data, unit)
248    }
249    pub fn start(&'static self, pairs: &'a [opentelemetry::KeyValue]) -> TimerGuard<'a> {
250        TimerGuard {
251            data: self,
252            pairs,
253            start: Instant::now(),
254        }
255    }
256
257    pub fn record(&'static self, dur: std::time::Duration, pairs: &'a [opentelemetry::KeyValue]) {
258        let dur = match self.1 {
259            TimeUnit::Millis => dur.as_millis(),
260            TimeUnit::Micros => dur.as_micros(),
261            TimeUnit::Nanos => dur.as_nanos(),
262        } as u64;
263
264        self.0.record(dur, pairs);
265    }
266}
267pub struct TimerGuard<'a> {
268    data: &'static Timer,
269    pairs: &'a [opentelemetry::KeyValue],
270    start: Instant,
271}
272
273impl<'a> Drop for TimerGuard<'a> {
274    fn drop(&mut self) {
275        let now = Instant::now();
276        let dur = now.duration_since(self.start);
277        self.data.record(dur, self.pairs);
278    }
279}
280
281/// Create a thread safe static timer that can be used to measure durations.
282/// This macro creates a histogram with predefined boundaries appropriate for the specified time unit.
283/// Supported units are "ms" (milliseconds), "us" (microseconds), and "ns" (nanoseconds).
284///
285/// Example:
286/// ```
287/// # #[macro_use] extern crate hyperactor_telemetry;
288/// # fn main() {
289/// declare_static_timer!(REQUEST_TIMER, "request_processing_time", hyperactor_telemetry::TimeUnit::Millis);
290///
291/// {
292///     let _ = REQUEST_TIMER.start(kv_pairs!("endpoint" => "/api/users", "method" => "GET"));
293///     // do something expensive
294/// }
295/// # }
296/// ```
297#[macro_export]
298macro_rules! declare_static_timer {
299    ($name:ident, $key:expr, $unit:path) => {
300        #[doc = "a global histogram timer named: "]
301        #[doc = $key]
302        pub static $name: std::sync::LazyLock<$crate::Timer> = std::sync::LazyLock::new(|| {
303            $crate::Timer::new(
304                $crate::meter(module_path!())
305                    .u64_histogram(format!("{}.{}", $key, $unit.as_str()))
306                    .with_unit($unit.as_str())
307                    .build(),
308                $unit,
309            )
310        });
311    };
312}
313
314/// Create a thread safe static counter that can be incremeneted or decremented.
315/// This is useful to avoid creating temporary counters.
316/// You can safely create counters with the same name. They will be joined by the underlying
317/// runtime and are thread safe.
318///
319/// Example:
320/// ```
321/// struct Url {
322///     pub path: String,
323///     pub proto: String,
324/// }
325///
326/// # #[macro_use] extern crate hyperactor_telemetry;
327/// # fn main() {
328/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
329/// declare_static_counter!(REQUESTS_RECEIVED, "requests_received");
330///
331/// REQUESTS_RECEIVED.add(40, kv_pairs!("path" => url.path, "proto" => url.proto))
332///
333/// # }
334/// ```
335#[macro_export]
336macro_rules! declare_static_counter {
337    ($name:ident, $key:expr) => {
338        #[doc = "a global counter named: "]
339        #[doc = $key]
340        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
341            std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
342    };
343}
344
345/// Create a thread safe static counter that can be incremeneted or decremented.
346/// This is useful to avoid creating temporary counters.
347/// You can safely create counters with the same name. They will be joined by the underlying
348/// runtime and are thread safe.
349///
350/// Example:
351/// ```
352/// struct Url {
353///     pub path: String,
354///     pub proto: String,
355/// }
356///
357/// # #[macro_use] extern crate hyperactor_telemetry;
358/// # fn main() {
359/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
360/// declare_static_counter!(REQUESTS_RECEIVED, "requests_received");
361///
362/// REQUESTS_RECEIVED.add(40, kv_pairs!("path" => url.path, "proto" => url.proto))
363///
364/// # }
365/// ```
366#[macro_export]
367macro_rules! declare_static_up_down_counter {
368    ($name:ident, $key:expr) => {
369        #[doc = "a global up down counter named: "]
370        #[doc = $key]
371        pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
372            std::sync::LazyLock::new(|| {
373                $crate::meter(module_path!())
374                    .i64_up_down_counter($key)
375                    .build()
376            });
377    };
378}
379
380/// Create a thread safe static gauge that can be set to a specific value.
381/// This is useful to avoid creating temporary gauges.
382/// You can safely create gauges with the same name. They will be joined by the underlying
383/// runtime and are thread safe.
384///
385/// Example:
386/// ```
387/// struct System {
388///     pub memory_usage: f64,
389///     pub cpu_usage: f64,
390/// }
391///
392/// # #[macro_use] extern crate hyperactor_telemetry;
393/// # fn main() {
394/// # let system = System{memory_usage: 512.5, cpu_usage: 25.0};
395/// declare_static_gauge!(MEMORY_USAGE, "memory_usage");
396///
397/// MEMORY_USAGE.record(system.memory_usage, kv_pairs!("unit" => "MB", "process" => "hyperactor"))
398///
399/// # }
400/// ```
401#[macro_export]
402macro_rules! declare_static_gauge {
403    ($name:ident, $key:expr) => {
404        #[doc = "a global gauge named: "]
405        #[doc = $key]
406        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
407            std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
408    };
409}
410/// Create a thread safe static observable gauge that can be set to a specific value based on the provided callback.
411/// This is useful for metrics that need to be calculated or retrieved dynamically.
412/// The callback will be executed whenever the gauge is observed by the metrics system.
413///
414/// Example:
415/// ```
416/// # #[macro_use] extern crate hyperactor_telemetry;
417///
418/// # fn main() {
419/// declare_observable_gauge!(MEMORY_USAGE_GAUGE, "memory_usage", |observer| {
420///     // Simulate getting memory usage - this could be any complex operation
421///     observer.observe(512.0, &[]);
422/// });
423///
424/// // The gauge will be automatically updated when observed
425/// # }
426/// ```
427#[macro_export]
428macro_rules! declare_observable_gauge {
429    ($name:ident, $key:expr, $cb:expr) => {
430        #[doc = "a global gauge named: "]
431        #[doc = $key]
432        pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
433            std::sync::LazyLock::new(|| {
434                $crate::meter(module_path!())
435                    .f64_observable_gauge($key)
436                    .with_callback($cb)
437                    .build()
438            });
439    };
440}
441/// Create a thread safe static histogram that can be incremeneted or decremented.
442/// This is useful to avoid creating temporary histograms.
443/// You can safely create histograms with the same name. They will be joined by the underlying
444/// runtime and are thread safe.
445///
446/// Example:
447/// ```
448/// struct Url {
449///     pub path: String,
450///     pub proto: String,
451/// }
452///
453/// # #[macro_use] extern crate hyperactor_telemetry;
454/// # fn main() {
455/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
456/// declare_static_histogram!(REQUEST_LATENCY, "request_latency");
457///
458/// REQUEST_LATENCY.record(40.0, kv_pairs!("path" => url.path, "proto" => url.proto))
459///
460/// # }
461/// ```
462#[macro_export]
463macro_rules! declare_static_histogram {
464    ($name:ident, $key:expr) => {
465        #[doc = "a global histogram named: "]
466        #[doc = $key]
467        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Histogram<f64>> =
468            std::sync::LazyLock::new(|| {
469                hyperactor_telemetry::meter(module_path!())
470                    .f64_histogram($key)
471                    .build()
472            });
473    };
474}
475
476static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
477    std::sync::OnceLock::new();
478
479/// A custom formatter that prepends prefix from env_var to log messages.
480struct PrefixedFormatter {
481    formatter: Glog<LocalTime>,
482    prefix_env_var: Option<String>,
483}
484
485impl PrefixedFormatter {
486    fn new(prefix_env_var: Option<String>) -> Self {
487        let formatter = Glog::default().with_timer(LocalTime::default());
488        Self {
489            formatter,
490            prefix_env_var,
491        }
492    }
493}
494
495impl<S, N> FormatEvent<S, N> for PrefixedFormatter
496where
497    S: tracing::Subscriber + for<'a> LookupSpan<'a>,
498    N: for<'a> FormatFields<'a> + 'static,
499{
500    fn format_event(
501        &self,
502        ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
503        mut writer: Writer<'_>,
504        event: &tracing::Event<'_>,
505    ) -> std::fmt::Result {
506        let prefix: String = if self.prefix_env_var.is_some() {
507            std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
508        } else {
509            "".to_string()
510        };
511
512        if prefix.is_empty() {
513            write!(writer, "[-]")?;
514        } else {
515            write!(writer, "[{}]", prefix)?;
516        }
517
518        self.formatter.format_event(ctx, writer, event)
519    }
520}
521
522/// Set up logging based on the given execution environment. We specialize logging based on how the
523/// logs are consumed. The destination scuba table is specialized based on the execution environment.
524/// mast -> monarch_tracing/prod
525/// devserver -> monarch_tracing/local
526/// unit test  -> monarch_tracing/test
527/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
528/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
529/// to get this behavior.
530pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
531    initialize_logging_with_log_prefix(clock, None);
532}
533
534/// testing
535pub fn initialize_logging_for_test() {
536    initialize_logging(DefaultTelemetryClock {});
537}
538
539/// Set up logging based on the given execution environment. We specialize logging based on how the
540/// logs are consumed. The destination scuba table is specialized based on the execution environment.
541/// mast -> monarch_tracing/prod
542/// devserver -> monarch_tracing/local
543/// unit test  -> monarch_tracing/test
544/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
545/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
546/// to get this behavior.
547///
548/// tracing logs will be prefixed with the given prefix and routed to:
549/// test -> stderr
550/// local -> /tmp/monarch_log.log
551/// mast -> /logs/dedicated_monarch_logs.log
552/// Additionally, is MONARCH_STDERR_LOG sets logs level, then logs will be routed to stderr as well.
553pub fn initialize_logging_with_log_prefix(
554    clock: impl TelemetryClock + Send + 'static,
555    prefix_env_var: Option<String>,
556) {
557    swap_telemetry_clock(clock);
558    let file_log_level = match env::Env::current() {
559        env::Env::Local => LOG_LEVEL_INFO,
560        env::Env::MastEmulator => LOG_LEVEL_INFO,
561        env::Env::Mast => LOG_LEVEL_INFO,
562        env::Env::Test => LOG_LEVEL_DEBUG,
563    };
564    let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
565        .lossy(false)
566        .finish(writer());
567    let writer_guard = Arc::new((non_blocking, guard));
568    let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
569
570    let file_layer = fmt::Layer::default()
571        .with_writer(writer_guard.0.clone())
572        .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
573        .fmt_fields(GlogFields::default().compact())
574        .with_ansi(false)
575        .with_filter(
576            Targets::new()
577                .with_default(LevelFilter::from_level(
578                    tracing::Level::from_str(
579                        &std::env::var(MONARCH_FILE_LOG_ENV).unwrap_or(file_log_level.to_string()),
580                    )
581                    .expect("Invalid log level"),
582                ))
583                .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
584        );
585
586    let stderr_log_level = match env::Env::current() {
587        env::Env::Local => LOG_LEVEL_ERROR,
588        env::Env::MastEmulator => LOG_LEVEL_INFO,
589        env::Env::Mast => LOG_LEVEL_ERROR,
590        env::Env::Test => LOG_LEVEL_DEBUG,
591    };
592    let stderr_layer = fmt::Layer::default()
593        .with_writer(std::io::stderr)
594        .event_format(PrefixedFormatter::new(prefix_env_var))
595        .fmt_fields(GlogFields::default().compact())
596        .with_ansi(std::io::stderr().is_terminal())
597        .with_filter(
598            Targets::new()
599                .with_default(LevelFilter::from_level(
600                    tracing::Level::from_str(
601                        &std::env::var(MONARCH_STDERR_LOG_ENV)
602                            .unwrap_or(stderr_log_level.to_string()),
603                    )
604                    .expect("Invalid log level"),
605                ))
606                .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
607        );
608
609    use tracing_subscriber::Registry;
610    use tracing_subscriber::layer::SubscriberExt;
611    use tracing_subscriber::util::SubscriberInitExt;
612
613    #[cfg(fbcode_build)]
614    {
615        use crate::env::Env;
616        fn is_layer_enabled(env_var: &str) -> bool {
617            std::env::var(env_var).unwrap_or_default() != "1"
618        }
619        if let Err(err) = Registry::default()
620            .with(if is_layer_enabled(DISABLE_SQLITE_TRACING) {
621                Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer"))
622            } else {
623                None
624            })
625            .with(if is_layer_enabled(DISABLE_OTEL_TRACING) {
626                Some(otel::tracing_layer())
627            } else {
628                None
629            })
630            .with(file_layer)
631            .with(stderr_layer)
632            .with(if is_layer_enabled(DISABLE_RECORDER_TRACING) {
633                Some(recorder().layer())
634            } else {
635                None
636            })
637            .try_init()
638        {
639            tracing::debug!("logging already initialized for this process: {}", err);
640        }
641        let exec_id = env::execution_id();
642        tracing::debug!(
643            target: "execution",
644            execution_id = exec_id,
645            environment = %Env::current(),
646            args = ?std::env::args(),
647            build_mode = build_info::BuildInfo::get_build_mode(),
648            compiler = build_info::BuildInfo::get_compiler(),
649            compiler_version = build_info::BuildInfo::get_compiler_version(),
650            buck_rule = build_info::BuildInfo::get_rule(),
651            package_name = build_info::BuildInfo::get_package_name(),
652            package_release = build_info::BuildInfo::get_package_release(),
653            upstream_revision = build_info::BuildInfo::get_revision(),
654            "logging_initialized"
655        );
656
657        if is_layer_enabled(DISABLE_OTEL_METRICS) {
658            otel::init_metrics();
659        }
660    }
661    #[cfg(not(fbcode_build))]
662    {
663        if let Err(err) = Registry::default()
664            .with(file_layer)
665            .with(stderr_layer)
666            .with(
667                if std::env::var(DISABLE_RECORDER_TRACING).unwrap_or_default() != "1" {
668                    Some(recorder().layer())
669                } else {
670                    None
671                },
672            )
673            .try_init()
674        {
675            tracing::debug!("logging already initialized for this process: {}", err);
676        }
677    }
678}
679
680pub mod env {
681    use rand::RngCore;
682
683    /// Env var name set when monarch launches subprocesses to forward the execution context
684    pub const HYPERACTOR_EXECUTION_ID_ENV: &str = "HYPERACTOR_EXECUTION_ID";
685    pub const OTEL_EXPORTER: &str = "HYPERACTOR_OTEL_EXPORTER";
686    pub const MAST_ENVIRONMENT: &str = "MAST_ENVIRONMENT";
687
688    /// Forward or generate a uuid for this execution. When running in production on mast, this is provided to
689    /// us via the MAST_HPC_JOB_NAME env var. Subprocesses should either forward the MAST_HPC_JOB_NAME
690    /// variable, or set the "MONARCH_EXECUTION_ID" var for subprocesses launched by this process.
691    /// We keep these env vars separate so that other applications that depend on the MAST_HPC_JOB_NAME existing
692    /// to understand their environment do not get confused and think they are running on mast when we are doing
693    ///  local testing.
694    pub fn execution_id() -> String {
695        let id = std::env::var(HYPERACTOR_EXECUTION_ID_ENV).unwrap_or_else(|_| {
696            // not able to find an existing id so generate a unique one: username + current_time + random number.
697            let username = crate::username();
698            let now = {
699                let now = std::time::SystemTime::now();
700                let datetime: chrono::DateTime<chrono::Local> = now.into();
701                datetime.format("%b-%d_%H:%M").to_string()
702            };
703            let random_number: u16 = (rand::thread_rng().next_u32() % 1000) as u16;
704            let execution_id = format!("{}_{}_{}", username, now, random_number);
705            execution_id
706        });
707        // Safety: Can be unsound if there are multiple threads
708        // reading and writing the environment.
709        unsafe {
710            std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, id.clone());
711        }
712        id
713    }
714
715    #[derive(PartialEq)]
716    pub enum Env {
717        Local,
718        Mast,
719        MastEmulator,
720        Test,
721    }
722
723    impl std::fmt::Display for Env {
724        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
725            write!(
726                f,
727                "{}",
728                match self {
729                    Self::Local => crate::ENV_VALUE_LOCAL,
730                    Self::MastEmulator => crate::ENV_VALUE_MAST_EMULATOR,
731                    Self::Mast => crate::ENV_VALUE_MAST,
732                    Self::Test => crate::ENV_VALUE_TEST,
733                }
734            )
735        }
736    }
737
738    impl Env {
739        #[cfg(test)]
740        pub fn current() -> Self {
741            Self::Test
742        }
743
744        #[cfg(not(test))]
745        pub fn current() -> Self {
746            match std::env::var(MAST_ENVIRONMENT).unwrap_or_default().as_str() {
747                // Constant from https://fburl.com/fhysd3fd
748                crate::ENV_VALUE_LOCAL_MAST_SIMULATOR => Self::MastEmulator,
749                _ => match std::env::var(crate::MAST_HPC_JOB_NAME_ENV).is_ok() {
750                    true => Self::Mast,
751                    false => Self::Local,
752                },
753            }
754        }
755    }
756}
757
758#[cfg(test)]
759mod test {
760    use opentelemetry::*;
761    extern crate self as hyperactor_telemetry;
762    use super::*;
763
764    #[test]
765    fn infer_kv_pair_types() {
766        assert_eq!(
767            key_value!("str", "str"),
768            KeyValue::new(Key::new("str"), Value::String("str".into()))
769        );
770        assert_eq!(
771            key_value!("str", 25),
772            KeyValue::new(Key::new("str"), Value::I64(25))
773        );
774        assert_eq!(
775            key_value!("str", 1.1),
776            KeyValue::new(Key::new("str"), Value::F64(1.1))
777        );
778    }
779    #[test]
780    fn kv_pair_slices() {
781        assert_eq!(
782            kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
783            &[
784                key_value!("1", "1"),
785                key_value!("2", 2),
786                key_value!("3", 3.0),
787            ],
788        );
789    }
790
791    #[test]
792    fn test_static_gauge() {
793        // Create a static gauge using the macro
794        declare_static_gauge!(TEST_GAUGE, "test_gauge");
795        declare_static_gauge!(MEMORY_GAUGE, "memory_usage");
796
797        // Set values to the gauge with different attributes
798        // This shouldn't actually log to scribe/scuba in test environment
799        TEST_GAUGE.record(42.5, kv_pairs!("component" => "test", "unit" => "MB"));
800        MEMORY_GAUGE.record(512.0, kv_pairs!("type" => "heap", "process" => "test"));
801
802        // Test with empty attributes
803        TEST_GAUGE.record(50.0, &[]);
804    }
805}