1#![allow(internal_features)]
10#![feature(assert_matches)]
11#![feature(sync_unsafe_cell)]
12#![feature(mpmc_channel)]
13#![feature(cfg_version)]
14#![feature(formatting_options)]
15
16pub const DISABLE_OTEL_TRACING: &str = "DISABLE_OTEL_TRACING";
20
21pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
24
25pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";
28
29pub mod in_memory_reader;
30#[cfg(fbcode_build)]
31mod meta;
32mod otel;
33mod pool;
34pub mod recorder;
35mod spool;
36pub mod sqlite;
37pub mod trace;
38use std::io::IsTerminal;
39use std::io::Write;
40use std::str::FromStr;
41use std::sync::Arc;
42use std::sync::Mutex;
43use std::time::Instant;
44
45use lazy_static::lazy_static;
46pub use opentelemetry;
47pub use opentelemetry::Key;
48pub use opentelemetry::KeyValue;
49pub use opentelemetry::Value;
50pub use opentelemetry::global::meter;
51pub use tracing::Level;
52use tracing_appender::non_blocking::NonBlocking;
53use tracing_appender::non_blocking::WorkerGuard;
54use tracing_appender::rolling::RollingFileAppender;
55use tracing_glog::Glog;
56use tracing_glog::GlogFields;
57use tracing_glog::LocalTime;
58use tracing_subscriber::Layer;
59use tracing_subscriber::filter::LevelFilter;
60use tracing_subscriber::filter::Targets;
61use tracing_subscriber::fmt;
62use tracing_subscriber::fmt::FormatEvent;
63use tracing_subscriber::fmt::FormatFields;
64use tracing_subscriber::fmt::format::Writer;
65use tracing_subscriber::registry::LookupSpan;
66
67use crate::recorder::Recorder;
68use crate::sqlite::get_reloadable_sqlite_layer;
69
70pub trait TelemetryClock {
71 fn now(&self) -> tokio::time::Instant;
72 fn system_time_now(&self) -> std::time::SystemTime;
73}
74
75pub struct DefaultTelemetryClock {}
76
77impl TelemetryClock for DefaultTelemetryClock {
78 fn now(&self) -> tokio::time::Instant {
79 tokio::time::Instant::now()
80 }
81
82 fn system_time_now(&self) -> std::time::SystemTime {
83 std::time::SystemTime::now()
84 }
85}
86
87pub fn log_file_path(env: env::Env) -> Result<(String, String), anyhow::Error> {
89 match env {
90 env::Env::Local | env::Env::MastEmulator => {
91 let username = if whoami::username().is_empty() {
92 "monarch".to_string()
93 } else {
94 whoami::username()
95 };
96 Ok((format!("/tmp/{}", username), "monarch_log".to_string()))
97 }
98 env::Env::Mast => Ok(("/logs/".to_string(), "dedicated_log_monarch".to_string())),
99 _ => Err(anyhow::anyhow!(
100 "file writer unsupported for environment {}",
101 env
102 )),
103 }
104}
105
106fn try_create_appender(
107 path: &str,
108 filename: &str,
109 create_dir: bool,
110) -> Result<RollingFileAppender, Box<dyn std::error::Error>> {
111 if create_dir {
112 std::fs::create_dir_all(path)?;
113 }
114 Ok(RollingFileAppender::builder()
115 .filename_prefix(filename)
116 .filename_suffix("log")
117 .build(path)?)
118}
119
120fn writer() -> Box<dyn Write + Send> {
121 match env::Env::current() {
122 env::Env::Test => Box::new(std::io::stderr()),
123 env::Env::Local | env::Env::MastEmulator | env::Env::Mast => {
124 let (path, filename) = log_file_path(env::Env::current()).unwrap();
125 match try_create_appender(&path, &filename, true) {
126 Ok(file_appender) => Box::new(file_appender),
127 Err(e) => {
128 eprintln!(
129 "unable to create log file in {}: {}. Falling back to stderr",
130 path, e
131 );
132 Box::new(std::io::stderr())
133 }
134 }
135 }
136 }
137}
138
139lazy_static! {
140 static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
141 Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {})));
142}
143
144pub fn recorder() -> &'static Recorder {
147 static RECORDER: std::sync::OnceLock<Recorder> = std::sync::OnceLock::new();
148 RECORDER.get_or_init(Recorder::new)
149}
150
151pub fn swap_telemetry_clock(clock: impl TelemetryClock + Send + 'static) {
154 *TELEMETRY_CLOCK.lock().unwrap() = Box::new(clock);
155}
156
157#[macro_export]
161macro_rules! key_value {
162 ($key:expr_2021, $val:expr_2021) => {
163 $crate::opentelemetry::KeyValue::new(
164 $crate::opentelemetry::Key::new($key),
165 $crate::opentelemetry::Value::from($val),
166 )
167 };
168}
169#[macro_export]
185macro_rules! kv_pairs {
186 ($($k:expr_2021 => $v:expr_2021),* $(,)?) => {
187 &[$($crate::key_value!($k, $v),)*]
188 };
189}
190
191#[derive(Debug, Clone, Copy)]
192pub enum TimeUnit {
193 Millis,
194 Micros,
195 Nanos,
196}
197
198impl TimeUnit {
199 pub fn as_str(&self) -> &'static str {
200 match self {
201 TimeUnit::Millis => "ms",
202 TimeUnit::Micros => "us",
203 TimeUnit::Nanos => "ns",
204 }
205 }
206}
207pub struct Timer(opentelemetry::metrics::Histogram<u64>, TimeUnit);
208
209impl<'a> Timer {
210 pub fn new(data: opentelemetry::metrics::Histogram<u64>, unit: TimeUnit) -> Self {
211 Timer(data, unit)
212 }
213 pub fn start(&'static self, pairs: &'a [opentelemetry::KeyValue]) -> TimerGuard<'a> {
214 TimerGuard {
215 data: self,
216 pairs,
217 start: Instant::now(),
218 }
219 }
220
221 pub fn record(&'static self, dur: std::time::Duration, pairs: &'a [opentelemetry::KeyValue]) {
222 let dur = match self.1 {
223 TimeUnit::Millis => dur.as_millis(),
224 TimeUnit::Micros => dur.as_micros(),
225 TimeUnit::Nanos => dur.as_nanos(),
226 } as u64;
227
228 self.0.record(dur, pairs);
229 }
230}
231pub struct TimerGuard<'a> {
232 data: &'static Timer,
233 pairs: &'a [opentelemetry::KeyValue],
234 start: Instant,
235}
236
237impl<'a> Drop for TimerGuard<'a> {
238 fn drop(&mut self) {
239 let now = Instant::now();
240 let dur = now.duration_since(self.start);
241 self.data.record(dur, self.pairs);
242 }
243}
244
245#[macro_export]
262macro_rules! declare_static_timer {
263 ($name:ident, $key:expr_2021, $unit:path) => {
264 #[doc = "a global histogram timer named: "]
265 #[doc = $key]
266 pub static $name: std::sync::LazyLock<$crate::Timer> = std::sync::LazyLock::new(|| {
267 $crate::Timer::new(
268 $crate::meter(module_path!())
269 .u64_histogram(format!("{}.{}", $key, $unit.as_str()))
270 .with_unit($unit.as_str())
271 .build(),
272 $unit,
273 )
274 });
275 };
276}
277
278#[macro_export]
300macro_rules! declare_static_counter {
301 ($name:ident, $key:expr_2021) => {
302 #[doc = "a global counter named: "]
303 #[doc = $key]
304 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
305 std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
306 };
307}
308
309#[macro_export]
331macro_rules! declare_static_up_down_counter {
332 ($name:ident, $key:expr_2021) => {
333 #[doc = "a global up down counter named: "]
334 #[doc = $key]
335 pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
336 std::sync::LazyLock::new(|| {
337 $crate::meter(module_path!())
338 .i64_up_down_counter($key)
339 .build()
340 });
341 };
342}
343
344#[macro_export]
366macro_rules! declare_static_gauge {
367 ($name:ident, $key:expr_2021) => {
368 #[doc = "a global gauge named: "]
369 #[doc = $key]
370 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
371 std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
372 };
373}
374#[macro_export]
392macro_rules! declare_observable_gauge {
393 ($name:ident, $key:expr_2021, $cb:expr_2021) => {
394 #[doc = "a global gauge named: "]
395 #[doc = $key]
396 pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
397 std::sync::LazyLock::new(|| {
398 $crate::meter(module_path!())
399 .f64_observable_gauge($key)
400 .with_callback($cb)
401 .build()
402 });
403 };
404}
405#[macro_export]
427macro_rules! declare_static_histogram {
428 ($name:ident, $key:expr_2021) => {
429 #[doc = "a global histogram named: "]
430 #[doc = $key]
431 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Histogram<f64>> =
432 std::sync::LazyLock::new(|| {
433 hyperactor_telemetry::meter(module_path!())
434 .f64_histogram($key)
435 .build()
436 });
437 };
438}
439
440static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
441 std::sync::OnceLock::new();
442
443struct PrefixedFormatter {
445 formatter: Glog<LocalTime>,
446 prefix_env_var: Option<String>,
447}
448
449impl PrefixedFormatter {
450 fn new(prefix_env_var: Option<String>) -> Self {
451 let formatter = Glog::default().with_timer(LocalTime::default());
452 Self {
453 formatter,
454 prefix_env_var,
455 }
456 }
457}
458
459impl<S, N> FormatEvent<S, N> for PrefixedFormatter
460where
461 S: tracing::Subscriber + for<'a> LookupSpan<'a>,
462 N: for<'a> FormatFields<'a> + 'static,
463{
464 fn format_event(
465 &self,
466 ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
467 mut writer: Writer<'_>,
468 event: &tracing::Event<'_>,
469 ) -> std::fmt::Result {
470 let prefix: String = if self.prefix_env_var.is_some() {
471 std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
472 } else {
473 "".to_string()
474 };
475
476 if prefix.is_empty() {
477 write!(writer, "[-]")?;
478 } else {
479 write!(writer, "[{}]", prefix)?;
480 }
481
482 self.formatter.format_event(ctx, writer, event)
483 }
484}
485
486pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
495 initialize_logging_with_log_prefix(clock, None);
496}
497
498pub fn initialize_logging_with_log_prefix(
513 clock: impl TelemetryClock + Send + 'static,
514 prefix_env_var: Option<String>,
515) {
516 swap_telemetry_clock(clock);
517 let file_log_level = match env::Env::current() {
518 env::Env::Local => "info",
519 env::Env::MastEmulator => "info",
520 env::Env::Mast => "info",
521 env::Env::Test => "debug",
522 };
523 let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
524 .lossy(false)
525 .finish(writer());
526 let writer_guard = Arc::new((non_blocking, guard));
527 let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
528
529 let file_layer = fmt::Layer::default()
530 .with_writer(writer_guard.0.clone())
531 .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
532 .fmt_fields(GlogFields::default().compact())
533 .with_ansi(false)
534 .with_filter(
535 Targets::new()
536 .with_default(LevelFilter::from_level(
537 tracing::Level::from_str(
538 &std::env::var("MONARCH_FILE_LOG").unwrap_or(file_log_level.to_string()),
539 )
540 .expect("Invalid log level"),
541 ))
542 .with_target("opentelemetry", LevelFilter::OFF), );
544
545 let stderr_log_level = match env::Env::current() {
546 env::Env::Local => "error",
547 env::Env::MastEmulator => "info",
548 env::Env::Mast => "error",
549 env::Env::Test => "debug",
550 };
551 let stderr_layer = fmt::Layer::default()
552 .with_writer(std::io::stderr)
553 .event_format(PrefixedFormatter::new(prefix_env_var))
554 .fmt_fields(GlogFields::default().compact())
555 .with_ansi(std::io::stderr().is_terminal())
556 .with_filter(
557 Targets::new()
558 .with_default(LevelFilter::from_level(
559 tracing::Level::from_str(
560 &std::env::var("MONARCH_STDERR_LOG")
561 .unwrap_or(stderr_log_level.to_string()),
562 )
563 .expect("Invalid log level"),
564 ))
565 .with_target("opentelemetry", LevelFilter::OFF), );
567
568 let sqlite_layer = get_reloadable_sqlite_layer().unwrap();
569
570 use tracing_subscriber::Registry;
571 use tracing_subscriber::layer::SubscriberExt;
572 use tracing_subscriber::util::SubscriberInitExt;
573
574 #[cfg(fbcode_build)]
575 {
576 use crate::env::Env;
577 fn is_layer_enabled(env_var: &str) -> bool {
578 std::env::var(env_var).unwrap_or_default() != "1"
579 }
580 if let Err(err) = Registry::default()
581 .with(sqlite_layer)
582 .with(if is_layer_enabled(DISABLE_OTEL_TRACING) {
583 Some(otel::tracing_layer())
584 } else {
585 None
586 })
587 .with(file_layer)
588 .with(stderr_layer)
589 .with(if is_layer_enabled(DISABLE_RECORDER_TRACING) {
590 Some(recorder().layer())
591 } else {
592 None
593 })
594 .try_init()
595 {
596 tracing::debug!("logging already initialized for this process: {}", err);
597 }
598 let exec_id = env::execution_id();
599 tracing::debug!(
600 target: "execution",
601 execution_id = exec_id,
602 environment = %Env::current(),
603 args = ?std::env::args(),
604 build_mode = build_info::BuildInfo::get_build_mode(),
605 compiler = build_info::BuildInfo::get_compiler(),
606 compiler_version = build_info::BuildInfo::get_compiler_version(),
607 buck_rule = build_info::BuildInfo::get_rule(),
608 package_name = build_info::BuildInfo::get_package_name(),
609 package_release = build_info::BuildInfo::get_package_release(),
610 upstream_revision = build_info::BuildInfo::get_revision(),
611 "logging_initialized"
612 );
613
614 if is_layer_enabled(DISABLE_OTEL_METRICS) {
615 otel::init_metrics();
616 }
617 }
618 #[cfg(not(fbcode_build))]
619 {
620 if let Err(err) = Registry::default()
621 .with(file_layer)
622 .with(stderr_layer)
623 .with(
624 if std::env::var(DISABLE_RECORDER_TRACING).unwrap_or_default() != "1" {
625 Some(recorder().layer())
626 } else {
627 None
628 },
629 )
630 .try_init()
631 {
632 tracing::debug!("logging already initialized for this process: {}", err);
633 }
634 }
635}
636
637pub mod env {
638 use rand::Rng;
639 use rand::distributions::Alphanumeric;
640
641 pub const HYPERACTOR_EXECUTION_ID_ENV: &str = "HYPERACTOR_EXECUTION_ID";
643 pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME";
644 pub const OTEL_EXPORTER: &str = "HYPERACTOR_OTEL_EXPORTER";
645 pub const MAST_ENVIRONMENT: &str = "MAST_ENVIRONMENT";
646
647 pub fn execution_id() -> String {
654 let id = std::env::var(HYPERACTOR_EXECUTION_ID_ENV)
655 .or(std::env::var(MAST_HPC_JOB_NAME_ENV))
656 .ok()
657 .unwrap_or_else(|| {
658 let random_string: String = rand::thread_rng()
660 .sample_iter(&Alphanumeric)
661 .take(24)
662 .map(char::from)
663 .collect::<String>();
664 random_string
665 });
666 unsafe {
669 std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, id.clone());
670 }
671 id
672 }
673
674 #[derive(PartialEq)]
675 pub enum Env {
676 Local,
677 Mast,
678 MastEmulator,
679 Test,
680 }
681
682 impl std::fmt::Display for Env {
683 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
684 write!(
685 f,
686 "{}",
687 match self {
688 Self::Local => "local",
689 Self::MastEmulator => "mast_emulator",
690 Self::Mast => "mast",
691 Self::Test => "test",
692 }
693 )
694 }
695 }
696
697 impl Env {
698 #[cfg(test)]
699 pub fn current() -> Self {
700 Self::Test
701 }
702
703 #[cfg(not(test))]
704 pub fn current() -> Self {
705 match std::env::var(MAST_ENVIRONMENT).unwrap_or_default().as_str() {
706 "local_mast_simulator" => Self::MastEmulator,
708 _ => match std::env::var("MAST_HPC_JOB_NAME").is_ok() {
709 true => Self::Mast,
710 false => Self::Local,
711 },
712 }
713 }
714 }
715}
716
717#[cfg(test)]
718mod test {
719 use opentelemetry::*;
720 extern crate self as hyperactor_telemetry;
721 use super::*;
722
723 #[test]
724 fn infer_kv_pair_types() {
725 assert_eq!(
726 key_value!("str", "str"),
727 KeyValue::new(Key::new("str"), Value::String("str".into()))
728 );
729 assert_eq!(
730 key_value!("str", 25),
731 KeyValue::new(Key::new("str"), Value::I64(25))
732 );
733 assert_eq!(
734 key_value!("str", 1.1),
735 KeyValue::new(Key::new("str"), Value::F64(1.1))
736 );
737 }
738 #[test]
739 fn kv_pair_slices() {
740 assert_eq!(
741 kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
742 &[
743 key_value!("1", "1"),
744 key_value!("2", 2),
745 key_value!("3", 3.0),
746 ],
747 );
748 }
749
750 #[test]
751 fn test_static_gauge() {
752 declare_static_gauge!(TEST_GAUGE, "test_gauge");
754 declare_static_gauge!(MEMORY_GAUGE, "memory_usage");
755
756 TEST_GAUGE.record(42.5, kv_pairs!("component" => "test", "unit" => "MB"));
759 MEMORY_GAUGE.record(512.0, kv_pairs!("type" => "heap", "process" => "test"));
760
761 TEST_GAUGE.record(50.0, &[]);
763 }
764}