1#![allow(internal_features)]
10#![feature(assert_matches)]
11#![feature(sync_unsafe_cell)]
12#![feature(mpmc_channel)]
13#![feature(cfg_version)]
14#![feature(formatting_options)]
15
16pub const DISABLE_OTEL_TRACING: &str = "DISABLE_OTEL_TRACING";
20
21pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
24
25pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";
28
29pub const DISABLE_SQLITE_TRACING: &str = "DISABLE_SQLITE_TRACING";
32const MONARCH_FILE_LOG_ENV: &str = "MONARCH_FILE_LOG";
34const MONARCH_STDERR_LOG_ENV: &str = "MONARCH_STDERR_LOG";
35pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME";
36
37const LOG_LEVEL_INFO: &str = "info";
39const LOG_LEVEL_DEBUG: &str = "debug";
40const LOG_LEVEL_ERROR: &str = "error";
41
42const SPAN_FIELD_RECORDING: &str = "recording";
44const SPAN_FIELD_RECORDER: &str = "recorder";
45
46const ENV_VALUE_LOCAL: &str = "local";
48const ENV_VALUE_MAST_EMULATOR: &str = "mast_emulator";
49const ENV_VALUE_MAST: &str = "mast";
50const ENV_VALUE_TEST: &str = "test";
51const ENV_VALUE_LOCAL_MAST_SIMULATOR: &str = "local_mast_simulator";
52
53pub mod in_memory_reader;
54#[cfg(fbcode_build)]
55mod meta;
56mod otel;
57mod pool;
58pub mod recorder;
59mod spool;
60pub mod sqlite;
61pub mod task;
62pub mod trace;
63use std::io::IsTerminal;
64use std::io::Write;
65use std::str::FromStr;
66use std::sync::Arc;
67use std::sync::Mutex;
68use std::time::Instant;
69
70use lazy_static::lazy_static;
71pub use opentelemetry;
72pub use opentelemetry::Key;
73pub use opentelemetry::KeyValue;
74pub use opentelemetry::Value;
75pub use opentelemetry::global::meter;
76pub use tracing;
77pub use tracing::Level;
78use tracing_appender::non_blocking::NonBlocking;
79use tracing_appender::non_blocking::WorkerGuard;
80use tracing_appender::rolling::RollingFileAppender;
81use tracing_glog::Glog;
82use tracing_glog::GlogFields;
83use tracing_glog::LocalTime;
84use tracing_subscriber::Layer;
85use tracing_subscriber::filter::LevelFilter;
86use tracing_subscriber::filter::Targets;
87use tracing_subscriber::fmt;
88use tracing_subscriber::fmt::FormatEvent;
89use tracing_subscriber::fmt::FormatFields;
90use tracing_subscriber::fmt::format::Writer;
91use tracing_subscriber::registry::LookupSpan;
92
93use crate::recorder::Recorder;
94use crate::sqlite::get_reloadable_sqlite_layer;
95
96pub trait TelemetryClock {
97 fn now(&self) -> tokio::time::Instant;
98 fn system_time_now(&self) -> std::time::SystemTime;
99}
100
101pub struct DefaultTelemetryClock {}
102
103impl TelemetryClock for DefaultTelemetryClock {
104 fn now(&self) -> tokio::time::Instant {
105 tokio::time::Instant::now()
106 }
107
108 fn system_time_now(&self) -> std::time::SystemTime {
109 std::time::SystemTime::now()
110 }
111}
112
113pub fn username() -> String {
114 let env = env::Env::current();
115 match env {
116 env::Env::Mast => {
117 std::env::var("MAST_JOB_OWNER_UNIXNAME").unwrap_or_else(|_| "mast_owner".to_string())
118 }
119 _ => whoami::username(),
120 }
121}
122
123pub fn log_file_path(env: env::Env) -> Result<(String, String), anyhow::Error> {
125 match env {
126 env::Env::Local | env::Env::MastEmulator => {
127 let username = if whoami::username().is_empty() {
128 "monarch".to_string()
129 } else {
130 whoami::username()
131 };
132 Ok((format!("/tmp/{}", username), "monarch_log".to_string()))
133 }
134 env::Env::Mast => Ok(("/logs/".to_string(), "dedicated_log_monarch".to_string())),
135 _ => Err(anyhow::anyhow!(
136 "file writer unsupported for environment {}",
137 env
138 )),
139 }
140}
141
142fn try_create_appender(
143 path: &str,
144 filename: &str,
145 create_dir: bool,
146) -> Result<RollingFileAppender, Box<dyn std::error::Error>> {
147 if create_dir {
148 std::fs::create_dir_all(path)?;
149 }
150 Ok(RollingFileAppender::builder()
151 .filename_prefix(filename)
152 .filename_suffix("log")
153 .build(path)?)
154}
155
156fn writer() -> Box<dyn Write + Send> {
157 match env::Env::current() {
158 env::Env::Test => Box::new(std::io::stderr()),
159 env::Env::Local | env::Env::MastEmulator | env::Env::Mast => {
160 let (path, filename) = log_file_path(env::Env::current()).unwrap();
161 match try_create_appender(&path, &filename, true) {
162 Ok(file_appender) => Box::new(file_appender),
163 Err(e) => {
164 eprintln!(
165 "unable to create log file in {}: {}. Falling back to stderr",
166 path, e
167 );
168 Box::new(std::io::stderr())
169 }
170 }
171 }
172 }
173}
174
175lazy_static! {
176 static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
177 Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {})));
178}
179
180pub fn recorder() -> &'static Recorder {
183 static RECORDER: std::sync::OnceLock<Recorder> = std::sync::OnceLock::new();
184 RECORDER.get_or_init(Recorder::new)
185}
186
187pub fn swap_telemetry_clock(clock: impl TelemetryClock + Send + 'static) {
190 *TELEMETRY_CLOCK.lock().unwrap() = Box::new(clock);
191}
192
193#[macro_export]
197macro_rules! key_value {
198 ($key:expr, $val:expr) => {
199 $crate::opentelemetry::KeyValue::new(
200 $crate::opentelemetry::Key::new($key),
201 $crate::opentelemetry::Value::from($val),
202 )
203 };
204}
205#[macro_export]
221macro_rules! kv_pairs {
222 ($($k:expr => $v:expr),* $(,)?) => {
223 &[$($crate::key_value!($k, $v),)*]
224 };
225}
226
227#[derive(Debug, Clone, Copy)]
228pub enum TimeUnit {
229 Millis,
230 Micros,
231 Nanos,
232}
233
234impl TimeUnit {
235 pub fn as_str(&self) -> &'static str {
236 match self {
237 TimeUnit::Millis => "ms",
238 TimeUnit::Micros => "us",
239 TimeUnit::Nanos => "ns",
240 }
241 }
242}
243pub struct Timer(opentelemetry::metrics::Histogram<u64>, TimeUnit);
244
245impl<'a> Timer {
246 pub fn new(data: opentelemetry::metrics::Histogram<u64>, unit: TimeUnit) -> Self {
247 Timer(data, unit)
248 }
249 pub fn start(&'static self, pairs: &'a [opentelemetry::KeyValue]) -> TimerGuard<'a> {
250 TimerGuard {
251 data: self,
252 pairs,
253 start: Instant::now(),
254 }
255 }
256
257 pub fn record(&'static self, dur: std::time::Duration, pairs: &'a [opentelemetry::KeyValue]) {
258 let dur = match self.1 {
259 TimeUnit::Millis => dur.as_millis(),
260 TimeUnit::Micros => dur.as_micros(),
261 TimeUnit::Nanos => dur.as_nanos(),
262 } as u64;
263
264 self.0.record(dur, pairs);
265 }
266}
267pub struct TimerGuard<'a> {
268 data: &'static Timer,
269 pairs: &'a [opentelemetry::KeyValue],
270 start: Instant,
271}
272
273impl<'a> Drop for TimerGuard<'a> {
274 fn drop(&mut self) {
275 let now = Instant::now();
276 let dur = now.duration_since(self.start);
277 self.data.record(dur, self.pairs);
278 }
279}
280
281#[macro_export]
298macro_rules! declare_static_timer {
299 ($name:ident, $key:expr, $unit:path) => {
300 #[doc = "a global histogram timer named: "]
301 #[doc = $key]
302 pub static $name: std::sync::LazyLock<$crate::Timer> = std::sync::LazyLock::new(|| {
303 $crate::Timer::new(
304 $crate::meter(module_path!())
305 .u64_histogram(format!("{}.{}", $key, $unit.as_str()))
306 .with_unit($unit.as_str())
307 .build(),
308 $unit,
309 )
310 });
311 };
312}
313
314#[macro_export]
336macro_rules! declare_static_counter {
337 ($name:ident, $key:expr) => {
338 #[doc = "a global counter named: "]
339 #[doc = $key]
340 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
341 std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
342 };
343}
344
345#[macro_export]
367macro_rules! declare_static_up_down_counter {
368 ($name:ident, $key:expr) => {
369 #[doc = "a global up down counter named: "]
370 #[doc = $key]
371 pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
372 std::sync::LazyLock::new(|| {
373 $crate::meter(module_path!())
374 .i64_up_down_counter($key)
375 .build()
376 });
377 };
378}
379
380#[macro_export]
402macro_rules! declare_static_gauge {
403 ($name:ident, $key:expr) => {
404 #[doc = "a global gauge named: "]
405 #[doc = $key]
406 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
407 std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
408 };
409}
410#[macro_export]
428macro_rules! declare_observable_gauge {
429 ($name:ident, $key:expr, $cb:expr) => {
430 #[doc = "a global gauge named: "]
431 #[doc = $key]
432 pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
433 std::sync::LazyLock::new(|| {
434 $crate::meter(module_path!())
435 .f64_observable_gauge($key)
436 .with_callback($cb)
437 .build()
438 });
439 };
440}
441#[macro_export]
463macro_rules! declare_static_histogram {
464 ($name:ident, $key:expr) => {
465 #[doc = "a global histogram named: "]
466 #[doc = $key]
467 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Histogram<f64>> =
468 std::sync::LazyLock::new(|| {
469 hyperactor_telemetry::meter(module_path!())
470 .f64_histogram($key)
471 .build()
472 });
473 };
474}
475
476static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
477 std::sync::OnceLock::new();
478
479struct PrefixedFormatter {
481 formatter: Glog<LocalTime>,
482 prefix_env_var: Option<String>,
483}
484
485impl PrefixedFormatter {
486 fn new(prefix_env_var: Option<String>) -> Self {
487 let formatter = Glog::default().with_timer(LocalTime::default());
488 Self {
489 formatter,
490 prefix_env_var,
491 }
492 }
493}
494
495impl<S, N> FormatEvent<S, N> for PrefixedFormatter
496where
497 S: tracing::Subscriber + for<'a> LookupSpan<'a>,
498 N: for<'a> FormatFields<'a> + 'static,
499{
500 fn format_event(
501 &self,
502 ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
503 mut writer: Writer<'_>,
504 event: &tracing::Event<'_>,
505 ) -> std::fmt::Result {
506 let prefix: String = if self.prefix_env_var.is_some() {
507 std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
508 } else {
509 "".to_string()
510 };
511
512 if prefix.is_empty() {
513 write!(writer, "[-]")?;
514 } else {
515 write!(writer, "[{}]", prefix)?;
516 }
517
518 self.formatter.format_event(ctx, writer, event)
519 }
520}
521
522pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
531 initialize_logging_with_log_prefix(clock, None);
532}
533
534pub fn initialize_logging_for_test() {
536 initialize_logging(DefaultTelemetryClock {});
537}
538
539pub fn initialize_logging_with_log_prefix(
554 clock: impl TelemetryClock + Send + 'static,
555 prefix_env_var: Option<String>,
556) {
557 swap_telemetry_clock(clock);
558 let file_log_level = match env::Env::current() {
559 env::Env::Local => LOG_LEVEL_INFO,
560 env::Env::MastEmulator => LOG_LEVEL_INFO,
561 env::Env::Mast => LOG_LEVEL_INFO,
562 env::Env::Test => LOG_LEVEL_DEBUG,
563 };
564 let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
565 .lossy(false)
566 .finish(writer());
567 let writer_guard = Arc::new((non_blocking, guard));
568 let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
569
570 let file_layer = fmt::Layer::default()
571 .with_writer(writer_guard.0.clone())
572 .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
573 .fmt_fields(GlogFields::default().compact())
574 .with_ansi(false)
575 .with_filter(
576 Targets::new()
577 .with_default(LevelFilter::from_level(
578 tracing::Level::from_str(
579 &std::env::var(MONARCH_FILE_LOG_ENV).unwrap_or(file_log_level.to_string()),
580 )
581 .expect("Invalid log level"),
582 ))
583 .with_target("opentelemetry", LevelFilter::OFF), );
585
586 let stderr_log_level = match env::Env::current() {
587 env::Env::Local => LOG_LEVEL_ERROR,
588 env::Env::MastEmulator => LOG_LEVEL_INFO,
589 env::Env::Mast => LOG_LEVEL_ERROR,
590 env::Env::Test => LOG_LEVEL_DEBUG,
591 };
592 let stderr_layer = fmt::Layer::default()
593 .with_writer(std::io::stderr)
594 .event_format(PrefixedFormatter::new(prefix_env_var))
595 .fmt_fields(GlogFields::default().compact())
596 .with_ansi(std::io::stderr().is_terminal())
597 .with_filter(
598 Targets::new()
599 .with_default(LevelFilter::from_level(
600 tracing::Level::from_str(
601 &std::env::var(MONARCH_STDERR_LOG_ENV)
602 .unwrap_or(stderr_log_level.to_string()),
603 )
604 .expect("Invalid log level"),
605 ))
606 .with_target("opentelemetry", LevelFilter::OFF), );
608
609 use tracing_subscriber::Registry;
610 use tracing_subscriber::layer::SubscriberExt;
611 use tracing_subscriber::util::SubscriberInitExt;
612
613 #[cfg(fbcode_build)]
614 {
615 use crate::env::Env;
616 fn is_layer_enabled(env_var: &str) -> bool {
617 std::env::var(env_var).unwrap_or_default() != "1"
618 }
619 if let Err(err) = Registry::default()
620 .with(if is_layer_enabled(DISABLE_SQLITE_TRACING) {
621 Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer"))
622 } else {
623 None
624 })
625 .with(if is_layer_enabled(DISABLE_OTEL_TRACING) {
626 Some(otel::tracing_layer())
627 } else {
628 None
629 })
630 .with(file_layer)
631 .with(stderr_layer)
632 .with(if is_layer_enabled(DISABLE_RECORDER_TRACING) {
633 Some(recorder().layer())
634 } else {
635 None
636 })
637 .try_init()
638 {
639 tracing::debug!("logging already initialized for this process: {}", err);
640 }
641 let exec_id = env::execution_id();
642 tracing::debug!(
643 target: "execution",
644 execution_id = exec_id,
645 environment = %Env::current(),
646 args = ?std::env::args(),
647 build_mode = build_info::BuildInfo::get_build_mode(),
648 compiler = build_info::BuildInfo::get_compiler(),
649 compiler_version = build_info::BuildInfo::get_compiler_version(),
650 buck_rule = build_info::BuildInfo::get_rule(),
651 package_name = build_info::BuildInfo::get_package_name(),
652 package_release = build_info::BuildInfo::get_package_release(),
653 upstream_revision = build_info::BuildInfo::get_revision(),
654 "logging_initialized"
655 );
656
657 if is_layer_enabled(DISABLE_OTEL_METRICS) {
658 otel::init_metrics();
659 }
660 }
661 #[cfg(not(fbcode_build))]
662 {
663 if let Err(err) = Registry::default()
664 .with(file_layer)
665 .with(stderr_layer)
666 .with(
667 if std::env::var(DISABLE_RECORDER_TRACING).unwrap_or_default() != "1" {
668 Some(recorder().layer())
669 } else {
670 None
671 },
672 )
673 .try_init()
674 {
675 tracing::debug!("logging already initialized for this process: {}", err);
676 }
677 }
678}
679
680pub mod env {
681 use rand::RngCore;
682
683 pub const HYPERACTOR_EXECUTION_ID_ENV: &str = "HYPERACTOR_EXECUTION_ID";
685 pub const OTEL_EXPORTER: &str = "HYPERACTOR_OTEL_EXPORTER";
686 pub const MAST_ENVIRONMENT: &str = "MAST_ENVIRONMENT";
687
688 pub fn execution_id() -> String {
695 let id = std::env::var(HYPERACTOR_EXECUTION_ID_ENV).unwrap_or_else(|_| {
696 let username = crate::username();
698 let now = {
699 let now = std::time::SystemTime::now();
700 let datetime: chrono::DateTime<chrono::Local> = now.into();
701 datetime.format("%b-%d_%H:%M").to_string()
702 };
703 let random_number: u16 = (rand::thread_rng().next_u32() % 1000) as u16;
704 let execution_id = format!("{}_{}_{}", username, now, random_number);
705 execution_id
706 });
707 unsafe {
710 std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, id.clone());
711 }
712 id
713 }
714
715 #[derive(PartialEq)]
716 pub enum Env {
717 Local,
718 Mast,
719 MastEmulator,
720 Test,
721 }
722
723 impl std::fmt::Display for Env {
724 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
725 write!(
726 f,
727 "{}",
728 match self {
729 Self::Local => crate::ENV_VALUE_LOCAL,
730 Self::MastEmulator => crate::ENV_VALUE_MAST_EMULATOR,
731 Self::Mast => crate::ENV_VALUE_MAST,
732 Self::Test => crate::ENV_VALUE_TEST,
733 }
734 )
735 }
736 }
737
738 impl Env {
739 #[cfg(test)]
740 pub fn current() -> Self {
741 Self::Test
742 }
743
744 #[cfg(not(test))]
745 pub fn current() -> Self {
746 match std::env::var(MAST_ENVIRONMENT).unwrap_or_default().as_str() {
747 crate::ENV_VALUE_LOCAL_MAST_SIMULATOR => Self::MastEmulator,
749 _ => match std::env::var(crate::MAST_HPC_JOB_NAME_ENV).is_ok() {
750 true => Self::Mast,
751 false => Self::Local,
752 },
753 }
754 }
755 }
756}
757
758#[cfg(test)]
759mod test {
760 use opentelemetry::*;
761 extern crate self as hyperactor_telemetry;
762 use super::*;
763
764 #[test]
765 fn infer_kv_pair_types() {
766 assert_eq!(
767 key_value!("str", "str"),
768 KeyValue::new(Key::new("str"), Value::String("str".into()))
769 );
770 assert_eq!(
771 key_value!("str", 25),
772 KeyValue::new(Key::new("str"), Value::I64(25))
773 );
774 assert_eq!(
775 key_value!("str", 1.1),
776 KeyValue::new(Key::new("str"), Value::F64(1.1))
777 );
778 }
779 #[test]
780 fn kv_pair_slices() {
781 assert_eq!(
782 kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
783 &[
784 key_value!("1", "1"),
785 key_value!("2", 2),
786 key_value!("3", 3.0),
787 ],
788 );
789 }
790
791 #[test]
792 fn test_static_gauge() {
793 declare_static_gauge!(TEST_GAUGE, "test_gauge");
795 declare_static_gauge!(MEMORY_GAUGE, "memory_usage");
796
797 TEST_GAUGE.record(42.5, kv_pairs!("component" => "test", "unit" => "MB"));
800 MEMORY_GAUGE.record(512.0, kv_pairs!("type" => "heap", "process" => "test"));
801
802 TEST_GAUGE.record(50.0, &[]);
804 }
805}