1#![allow(internal_features)]
10#![feature(assert_matches)]
11#![feature(sync_unsafe_cell)]
12#![feature(mpmc_channel)]
13#![feature(cfg_version)]
14#![feature(formatting_options)]
15
16pub const DISABLE_OTEL_TRACING: &str = "DISABLE_OTEL_TRACING";
20
21pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
24
25pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";
28
29pub const ENABLE_SQLITE_TRACING: &str = "ENABLE_SQLITE_TRACING";
32
33const MONARCH_FILE_LOG_ENV: &str = "MONARCH_FILE_LOG";
36
37pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME";
38
39const LOG_LEVEL_INFO: &str = "info";
41const LOG_LEVEL_DEBUG: &str = "debug";
42
43const SPAN_FIELD_RECORDING: &str = "recording";
45#[allow(dead_code)]
46const SPAN_FIELD_RECORDER: &str = "recorder";
47
48const ENV_VALUE_LOCAL: &str = "local";
50const ENV_VALUE_MAST_EMULATOR: &str = "mast_emulator";
51const ENV_VALUE_MAST: &str = "mast";
52const ENV_VALUE_TEST: &str = "test";
53#[allow(dead_code)]
54const ENV_VALUE_LOCAL_MAST_SIMULATOR: &str = "local_mast_simulator";
55
56pub mod in_memory_reader;
57#[cfg(fbcode_build)]
58mod meta;
59mod otel;
60mod pool;
61pub mod recorder;
62mod spool;
63pub mod sqlite;
64pub mod task;
65pub mod trace;
66use std::io::Write;
67use std::str::FromStr;
68use std::sync::Arc;
69use std::sync::Mutex;
70use std::time::Instant;
71
72use lazy_static::lazy_static;
73pub use opentelemetry;
74pub use opentelemetry::Key;
75pub use opentelemetry::KeyValue;
76pub use opentelemetry::Value;
77pub use opentelemetry::global::meter;
78pub use tracing;
79pub use tracing::Level;
80use tracing_appender::non_blocking::NonBlocking;
81use tracing_appender::non_blocking::WorkerGuard;
82use tracing_appender::rolling::RollingFileAppender;
83use tracing_glog::Glog;
84use tracing_glog::GlogFields;
85use tracing_glog::LocalTime;
86use tracing_subscriber::Layer;
87use tracing_subscriber::filter::LevelFilter;
88use tracing_subscriber::filter::Targets;
89use tracing_subscriber::fmt;
90use tracing_subscriber::fmt::FormatEvent;
91use tracing_subscriber::fmt::FormatFields;
92use tracing_subscriber::fmt::format::Writer;
93use tracing_subscriber::registry::LookupSpan;
94
95use crate::recorder::Recorder;
96use crate::sqlite::get_reloadable_sqlite_layer;
97
98pub trait TelemetryClock {
99 fn now(&self) -> tokio::time::Instant;
100 fn system_time_now(&self) -> std::time::SystemTime;
101}
102
103pub struct DefaultTelemetryClock {}
104
105impl TelemetryClock for DefaultTelemetryClock {
106 fn now(&self) -> tokio::time::Instant {
107 tokio::time::Instant::now()
108 }
109
110 fn system_time_now(&self) -> std::time::SystemTime {
111 std::time::SystemTime::now()
112 }
113}
114
115pub fn username() -> String {
116 let env = env::Env::current();
117 match env {
118 env::Env::Mast => {
119 std::env::var("MAST_JOB_OWNER_UNIXNAME").unwrap_or_else(|_| "mast_owner".to_string())
120 }
121 _ => whoami::username(),
122 }
123}
124
125pub fn log_file_path(
128 env: env::Env,
129 suffix: Option<&str>,
130) -> Result<(String, String), anyhow::Error> {
131 let suffix = suffix.map(|s| format!("_{}", s)).unwrap_or_default();
132 match env {
133 env::Env::Local | env::Env::MastEmulator => {
134 let username = if whoami::username().is_empty() {
135 "monarch".to_string()
136 } else {
137 whoami::username()
138 };
139 Ok((
140 format!("/tmp/{}", username),
141 format!("monarch_log{}", suffix),
142 ))
143 }
144 env::Env::Mast => Ok((
145 "/logs/".to_string(),
146 format!("dedicated_log_monarch{}", suffix),
147 )),
148 _ => Err(anyhow::anyhow!(
149 "file writer unsupported for environment {}",
150 env
151 )),
152 }
153}
154
155fn try_create_appender(
156 path: &str,
157 filename: &str,
158 create_dir: bool,
159) -> Result<RollingFileAppender, Box<dyn std::error::Error>> {
160 if create_dir {
161 std::fs::create_dir_all(path)?;
162 }
163 Ok(RollingFileAppender::builder()
164 .filename_prefix(filename)
165 .filename_suffix("log")
166 .build(path)?)
167}
168
169fn writer() -> Box<dyn Write + Send> {
170 match env::Env::current() {
171 env::Env::Test => Box::new(std::io::stderr()),
172 env::Env::Local | env::Env::MastEmulator | env::Env::Mast => {
173 let (path, filename) = log_file_path(env::Env::current(), None).unwrap();
174 match try_create_appender(&path, &filename, true) {
175 Ok(file_appender) => Box::new(file_appender),
176 Err(e) => {
177 eprintln!(
178 "unable to create log file in {}: {}. Falling back to stderr",
179 path, e
180 );
181 Box::new(std::io::stderr())
182 }
183 }
184 }
185 }
186}
187
188lazy_static! {
189 static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
190 Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {})));
191}
192
193pub fn recorder() -> &'static Recorder {
196 static RECORDER: std::sync::OnceLock<Recorder> = std::sync::OnceLock::new();
197 RECORDER.get_or_init(Recorder::new)
198}
199
200pub fn swap_telemetry_clock(clock: impl TelemetryClock + Send + 'static) {
203 *TELEMETRY_CLOCK.lock().unwrap() = Box::new(clock);
204}
205
206#[macro_export]
210macro_rules! key_value {
211 ($key:expr, $val:expr) => {
212 $crate::opentelemetry::KeyValue::new(
213 $crate::opentelemetry::Key::new($key),
214 $crate::opentelemetry::Value::from($val),
215 )
216 };
217}
218#[macro_export]
234macro_rules! kv_pairs {
235 ($($k:expr => $v:expr),* $(,)?) => {
236 &[$($crate::key_value!($k, $v),)*]
237 };
238}
239
240#[derive(Debug, Clone, Copy)]
241pub enum TimeUnit {
242 Millis,
243 Micros,
244 Nanos,
245}
246
247impl TimeUnit {
248 pub fn as_str(&self) -> &'static str {
249 match self {
250 TimeUnit::Millis => "ms",
251 TimeUnit::Micros => "us",
252 TimeUnit::Nanos => "ns",
253 }
254 }
255}
256pub struct Timer(opentelemetry::metrics::Histogram<u64>, TimeUnit);
257
258impl<'a> Timer {
259 pub fn new(data: opentelemetry::metrics::Histogram<u64>, unit: TimeUnit) -> Self {
260 Timer(data, unit)
261 }
262 pub fn start(&'static self, pairs: &'a [opentelemetry::KeyValue]) -> TimerGuard<'a> {
263 TimerGuard {
264 data: self,
265 pairs,
266 start: Instant::now(),
267 }
268 }
269
270 pub fn record(&'static self, dur: std::time::Duration, pairs: &'a [opentelemetry::KeyValue]) {
271 let dur = match self.1 {
272 TimeUnit::Millis => dur.as_millis(),
273 TimeUnit::Micros => dur.as_micros(),
274 TimeUnit::Nanos => dur.as_nanos(),
275 } as u64;
276
277 self.0.record(dur, pairs);
278 }
279}
280pub struct TimerGuard<'a> {
281 data: &'static Timer,
282 pairs: &'a [opentelemetry::KeyValue],
283 start: Instant,
284}
285
286impl<'a> Drop for TimerGuard<'a> {
287 fn drop(&mut self) {
288 let now = Instant::now();
289 let dur = now.duration_since(self.start);
290 self.data.record(dur, self.pairs);
291 }
292}
293
294#[macro_export]
311macro_rules! declare_static_timer {
312 ($name:ident, $key:expr, $unit:path) => {
313 #[doc = "a global histogram timer named: "]
314 #[doc = $key]
315 pub static $name: std::sync::LazyLock<$crate::Timer> = std::sync::LazyLock::new(|| {
316 $crate::Timer::new(
317 $crate::meter(module_path!())
318 .u64_histogram(format!("{}.{}", $key, $unit.as_str()))
319 .with_unit($unit.as_str())
320 .build(),
321 $unit,
322 )
323 });
324 };
325}
326
327#[macro_export]
349macro_rules! declare_static_counter {
350 ($name:ident, $key:expr) => {
351 #[doc = "a global counter named: "]
352 #[doc = $key]
353 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
354 std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
355 };
356}
357
358#[macro_export]
380macro_rules! declare_static_up_down_counter {
381 ($name:ident, $key:expr) => {
382 #[doc = "a global up down counter named: "]
383 #[doc = $key]
384 pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
385 std::sync::LazyLock::new(|| {
386 $crate::meter(module_path!())
387 .i64_up_down_counter($key)
388 .build()
389 });
390 };
391}
392
393#[macro_export]
415macro_rules! declare_static_gauge {
416 ($name:ident, $key:expr) => {
417 #[doc = "a global gauge named: "]
418 #[doc = $key]
419 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
420 std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
421 };
422}
423#[macro_export]
441macro_rules! declare_observable_gauge {
442 ($name:ident, $key:expr, $cb:expr) => {
443 #[doc = "a global gauge named: "]
444 #[doc = $key]
445 pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
446 std::sync::LazyLock::new(|| {
447 $crate::meter(module_path!())
448 .f64_observable_gauge($key)
449 .with_callback($cb)
450 .build()
451 });
452 };
453}
454#[macro_export]
476macro_rules! declare_static_histogram {
477 ($name:ident, $key:expr) => {
478 #[doc = "a global histogram named: "]
479 #[doc = $key]
480 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Histogram<f64>> =
481 std::sync::LazyLock::new(|| {
482 hyperactor_telemetry::meter(module_path!())
483 .f64_histogram($key)
484 .build()
485 });
486 };
487}
488
489static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
490 std::sync::OnceLock::new();
491
492struct PrefixedFormatter {
494 formatter: Glog<LocalTime>,
495 prefix_env_var: Option<String>,
496}
497
498impl PrefixedFormatter {
499 fn new(prefix_env_var: Option<String>) -> Self {
500 let formatter = Glog::default().with_timer(LocalTime::default());
501 Self {
502 formatter,
503 prefix_env_var,
504 }
505 }
506}
507
508impl<S, N> FormatEvent<S, N> for PrefixedFormatter
509where
510 S: tracing::Subscriber + for<'a> LookupSpan<'a>,
511 N: for<'a> FormatFields<'a> + 'static,
512{
513 fn format_event(
514 &self,
515 ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
516 mut writer: Writer<'_>,
517 event: &tracing::Event<'_>,
518 ) -> std::fmt::Result {
519 let prefix: String = if self.prefix_env_var.is_some() {
520 std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
521 } else {
522 "".to_string()
523 };
524
525 if prefix.is_empty() {
526 write!(writer, "[-]")?;
527 } else {
528 write!(writer, "[{}]", prefix)?;
529 }
530
531 self.formatter.format_event(ctx, writer, event)
532 }
533}
534
535pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
544 initialize_logging_with_log_prefix(clock, None);
545}
546
547pub fn initialize_logging_for_test() {
549 initialize_logging(DefaultTelemetryClock {});
550}
551
552pub fn initialize_logging_with_log_prefix(
567 clock: impl TelemetryClock + Send + 'static,
568 prefix_env_var: Option<String>,
569) {
570 swap_telemetry_clock(clock);
571 let file_log_level = match env::Env::current() {
572 env::Env::Local => LOG_LEVEL_INFO,
573 env::Env::MastEmulator => LOG_LEVEL_INFO,
574 env::Env::Mast => LOG_LEVEL_INFO,
575 env::Env::Test => LOG_LEVEL_DEBUG,
576 };
577 let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
578 .lossy(false)
579 .finish(writer());
580 let writer_guard = Arc::new((non_blocking, guard));
581 let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
582
583 let file_layer = fmt::Layer::default()
584 .with_writer(writer_guard.0.clone())
585 .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
586 .fmt_fields(GlogFields::default().compact())
587 .with_ansi(false)
588 .with_filter(
589 Targets::new()
590 .with_default(LevelFilter::from_level(
591 tracing::Level::from_str(
592 &std::env::var(MONARCH_FILE_LOG_ENV).unwrap_or(file_log_level.to_string()),
593 )
594 .expect("Invalid log level"),
595 ))
596 .with_target("opentelemetry", LevelFilter::OFF), );
598
599 use tracing_subscriber::Registry;
600 use tracing_subscriber::layer::SubscriberExt;
601 use tracing_subscriber::util::SubscriberInitExt;
602
603 #[cfg(fbcode_build)]
604 {
605 use crate::env::Env;
606 fn is_layer_enabled(env_var: &str) -> bool {
607 std::env::var(env_var).unwrap_or_default() == "1"
608 }
609 fn is_layer_disabled(env_var: &str) -> bool {
610 std::env::var(env_var).unwrap_or_default() == "1"
611 }
612 if let Err(err) = Registry::default()
613 .with(if is_layer_enabled(ENABLE_SQLITE_TRACING) {
614 Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer"))
617 } else {
618 None
619 })
620 .with(if !is_layer_disabled(DISABLE_OTEL_TRACING) {
621 Some(otel::tracing_layer())
622 } else {
623 None
624 })
625 .with(file_layer)
626 .with(if !is_layer_disabled(DISABLE_RECORDER_TRACING) {
627 Some(recorder().layer())
628 } else {
629 None
630 })
631 .try_init()
632 {
633 tracing::debug!("logging already initialized for this process: {}", err);
634 }
635 let exec_id = env::execution_id();
636 tracing::info!(
637 target: "execution",
638 execution_id = exec_id,
639 environment = %Env::current(),
640 args = ?std::env::args(),
641 build_mode = build_info::BuildInfo::get_build_mode(),
642 compiler = build_info::BuildInfo::get_compiler(),
643 compiler_version = build_info::BuildInfo::get_compiler_version(),
644 buck_rule = build_info::BuildInfo::get_rule(),
645 package_name = build_info::BuildInfo::get_package_name(),
646 package_release = build_info::BuildInfo::get_package_release(),
647 upstream_revision = build_info::BuildInfo::get_upstream_revision(),
648 revision = build_info::BuildInfo::get_revision(),
649 "logging_initialized"
650 );
651
652 if !is_layer_disabled(DISABLE_OTEL_METRICS) {
653 otel::init_metrics();
654 }
655 }
656 #[cfg(not(fbcode_build))]
657 {
658 if let Err(err) = Registry::default()
659 .with(file_layer)
660 .with(
661 if std::env::var(DISABLE_RECORDER_TRACING).unwrap_or_default() != "1" {
662 Some(recorder().layer())
663 } else {
664 None
665 },
666 )
667 .try_init()
668 {
669 tracing::debug!("logging already initialized for this process: {}", err);
670 }
671 }
672}
673
674pub mod env {
675 use rand::RngCore;
676
677 pub const HYPERACTOR_EXECUTION_ID_ENV: &str = "HYPERACTOR_EXECUTION_ID";
679 pub const OTEL_EXPORTER: &str = "HYPERACTOR_OTEL_EXPORTER";
680 pub const MAST_ENVIRONMENT: &str = "MAST_ENVIRONMENT";
681
682 pub fn execution_id() -> String {
689 let id = std::env::var(HYPERACTOR_EXECUTION_ID_ENV).unwrap_or_else(|_| {
690 let username = crate::username();
692 let now = {
693 let now = std::time::SystemTime::now();
694 let datetime: chrono::DateTime<chrono::Local> = now.into();
695 datetime.format("%b-%d_%H:%M").to_string()
696 };
697 let random_number: u16 = (rand::thread_rng().next_u32() % 1000) as u16;
698 let execution_id = format!("{}_{}_{}", username, now, random_number);
699 execution_id
700 });
701 unsafe {
704 std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, id.clone());
705 }
706 id
707 }
708
709 #[derive(PartialEq)]
710 pub enum Env {
711 Local,
712 Mast,
713 MastEmulator,
714 Test,
715 }
716
717 impl std::fmt::Display for Env {
718 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
719 write!(
720 f,
721 "{}",
722 match self {
723 Self::Local => crate::ENV_VALUE_LOCAL,
724 Self::MastEmulator => crate::ENV_VALUE_MAST_EMULATOR,
725 Self::Mast => crate::ENV_VALUE_MAST,
726 Self::Test => crate::ENV_VALUE_TEST,
727 }
728 )
729 }
730 }
731
732 impl Env {
733 #[cfg(test)]
734 pub fn current() -> Self {
735 Self::Test
736 }
737
738 #[cfg(not(test))]
739 pub fn current() -> Self {
740 match std::env::var(MAST_ENVIRONMENT).unwrap_or_default().as_str() {
741 crate::ENV_VALUE_LOCAL_MAST_SIMULATOR => Self::MastEmulator,
743 _ => match std::env::var(crate::MAST_HPC_JOB_NAME_ENV).is_ok() {
744 true => Self::Mast,
745 false => Self::Local,
746 },
747 }
748 }
749 }
750}
751
752#[cfg(test)]
753mod test {
754 use opentelemetry::*;
755 extern crate self as hyperactor_telemetry;
756 use super::*;
757
758 #[test]
759 fn infer_kv_pair_types() {
760 assert_eq!(
761 key_value!("str", "str"),
762 KeyValue::new(Key::new("str"), Value::String("str".into()))
763 );
764 assert_eq!(
765 key_value!("str", 25),
766 KeyValue::new(Key::new("str"), Value::I64(25))
767 );
768 assert_eq!(
769 key_value!("str", 1.1),
770 KeyValue::new(Key::new("str"), Value::F64(1.1))
771 );
772 }
773 #[test]
774 fn kv_pair_slices() {
775 assert_eq!(
776 kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
777 &[
778 key_value!("1", "1"),
779 key_value!("2", 2),
780 key_value!("3", 3.0),
781 ],
782 );
783 }
784
785 #[test]
786 fn test_static_gauge() {
787 declare_static_gauge!(TEST_GAUGE, "test_gauge");
789 declare_static_gauge!(MEMORY_GAUGE, "memory_usage");
790
791 TEST_GAUGE.record(42.5, kv_pairs!("component" => "test", "unit" => "MB"));
794 MEMORY_GAUGE.record(512.0, kv_pairs!("type" => "heap", "process" => "test"));
795
796 TEST_GAUGE.record(50.0, &[]);
798 }
799}