1#![allow(internal_features)]
10#![feature(assert_matches)]
11#![feature(sync_unsafe_cell)]
12#![feature(mpmc_channel)]
13#![feature(cfg_version)]
14#![feature(formatting_options)]
15#![recursion_limit = "256"]
16
17pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME";
19
20const LOG_LEVEL_INFO: &str = "info";
22const LOG_LEVEL_DEBUG: &str = "debug";
23
24const SPAN_FIELD_RECORDING: &str = "recording";
26#[allow(dead_code)]
27const SPAN_FIELD_RECORDER: &str = "recorder";
28
29const ENV_VALUE_LOCAL: &str = "local";
31const ENV_VALUE_MAST_EMULATOR: &str = "mast_emulator";
32const ENV_VALUE_MAST: &str = "mast";
33const ENV_VALUE_TEST: &str = "test";
34#[allow(dead_code)]
35const ENV_VALUE_LOCAL_MAST_SIMULATOR: &str = "local_mast_simulator";
36
37#[allow(non_upper_case_globals)]
49pub const skip_record: bool = true;
51
52mod config;
53pub mod in_memory_reader;
54#[cfg(fbcode_build)]
55mod meta;
56mod otel;
57pub(crate) mod otlp;
58mod pool;
59mod rate_limit;
60pub mod recorder;
61pub mod sinks;
62mod spool;
63pub mod sqlite;
64pub mod task;
65pub mod trace;
66pub mod trace_dispatcher;
67
68use std::collections::hash_map::DefaultHasher;
70use std::hash::Hash;
71use std::hash::Hasher;
72use std::io::Write;
73use std::str::FromStr;
74use std::sync::Arc;
75use std::sync::Mutex;
76use std::sync::atomic::AtomicU64;
77use std::sync::atomic::Ordering;
78use std::sync::mpsc;
79use std::time::Instant;
80use std::time::SystemTime;
81
82use lazy_static::lazy_static;
83pub use opentelemetry;
84pub use opentelemetry::Key;
85pub use opentelemetry::KeyValue;
86pub use opentelemetry::Value;
87pub use opentelemetry::global::meter;
88pub use trace_dispatcher::DispatcherControl;
89pub use trace_dispatcher::FieldValue;
90pub use trace_dispatcher::TraceEvent;
91pub use trace_dispatcher::TraceEventSink;
92pub use tracing;
93pub use tracing::Level;
94use tracing_appender::non_blocking::NonBlocking;
95use tracing_appender::non_blocking::WorkerGuard;
96use tracing_appender::rolling::RollingFileAppender;
97use tracing_glog::Glog;
98use tracing_glog::GlogFields;
99use tracing_glog::LocalTime;
100use tracing_subscriber::Layer;
101use tracing_subscriber::filter::LevelFilter;
102use tracing_subscriber::filter::Targets;
103use tracing_subscriber::fmt;
104use tracing_subscriber::fmt::FormatEvent;
105use tracing_subscriber::fmt::FormatFields;
106use tracing_subscriber::fmt::format::Writer;
107use tracing_subscriber::registry::LookupSpan;
108
109use crate::config::ENABLE_OTEL_METRICS;
110use crate::config::ENABLE_OTEL_TRACING;
111use crate::config::ENABLE_RECORDER_TRACING;
112use crate::config::ENABLE_SQLITE_TRACING;
113use crate::config::MONARCH_FILE_LOG_LEVEL;
114use crate::config::MONARCH_LOG_SUFFIX;
115use crate::config::USE_UNIFIED_LAYER;
116use crate::recorder::Recorder;
117use crate::sqlite::get_reloadable_sqlite_layer;
118
119pub fn hash_to_u64(value: &impl Hash) -> u64 {
121 let mut hasher = DefaultHasher::new();
122 value.hash(&mut hasher);
123 hasher.finish()
124}
125
126#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
127pub struct TelemetrySample {
128 fields: Vec<(String, String)>,
129}
130
131impl TelemetrySample {
132 pub fn get_string(&self, key: &str) -> Option<&str> {
133 for (k, v) in &self.fields {
134 if k == key {
135 return Some(v.as_str());
136 }
137 }
138 None
139 }
140}
141
142#[cfg(fbcode_build)]
143impl From<crate::meta::sample_buffer::Sample> for TelemetrySample {
144 fn from(sample: crate::meta::sample_buffer::Sample) -> Self {
145 let mut fields = Vec::new();
146 for (key, value) in sample.0 {
147 if let crate::meta::sample_buffer::SampleValue::String(s) = value {
148 fields.push((key.to_string(), s.to_string()));
149 }
150 }
151 Self { fields }
152 }
153}
154
155#[cfg(not(fbcode_build))]
156impl TelemetrySample {
157 pub fn new() -> Self {
158 Self { fields: Vec::new() }
159 }
160}
161
162pub trait TelemetryTestHandle {
163 fn get_tracing_samples(&self) -> Vec<TelemetrySample>;
164}
165
166#[cfg(fbcode_build)]
167struct MockScubaHandle {
168 tracing_client: crate::meta::scuba_utils::MockScubaClient,
169}
170
171#[cfg(fbcode_build)]
172impl TelemetryTestHandle for MockScubaHandle {
173 fn get_tracing_samples(&self) -> Vec<TelemetrySample> {
174 self.tracing_client
175 .get_samples()
176 .into_iter()
177 .map(TelemetrySample::from)
178 .collect()
179 }
180}
181
182struct EmptyTestHandle;
183
184impl TelemetryTestHandle for EmptyTestHandle {
185 fn get_tracing_samples(&self) -> Vec<TelemetrySample> {
186 vec![]
187 }
188}
189
190pub trait TelemetryClock {
191 fn now(&self) -> tokio::time::Instant;
192 fn system_time_now(&self) -> std::time::SystemTime;
193}
194
195pub struct DefaultTelemetryClock {}
196
197impl TelemetryClock for DefaultTelemetryClock {
198 fn now(&self) -> tokio::time::Instant {
199 tokio::time::Instant::now()
200 }
201
202 fn system_time_now(&self) -> std::time::SystemTime {
203 std::time::SystemTime::now()
204 }
205}
206
207pub fn username() -> String {
208 let env = env::Env::current();
209 match env {
210 env::Env::Mast => {
211 std::env::var("MAST_JOB_OWNER_UNIXNAME").unwrap_or_else(|_| "mast_owner".to_string())
212 }
213 _ => whoami::username(),
214 }
215}
216
217pub fn log_file_path(
220 env: env::Env,
221 suffix: Option<&str>,
222) -> Result<(String, String), anyhow::Error> {
223 let suffix = suffix
224 .map(|s| {
225 if s.is_empty() {
226 String::new()
227 } else {
228 format!("_{}", s)
229 }
230 })
231 .unwrap_or_default();
232 match env {
233 env::Env::Local | env::Env::MastEmulator => {
234 let username = if whoami::username().is_empty() {
235 "monarch".to_string()
236 } else {
237 whoami::username()
238 };
239 Ok((
240 format!("/tmp/{}", username),
241 format!("monarch_log{}", suffix),
242 ))
243 }
244 env::Env::Mast => Ok((
245 "/logs/".to_string(),
246 format!("dedicated_log_monarch{}", suffix),
247 )),
248 _ => Err(anyhow::anyhow!(
249 "file writer unsupported for environment {}",
250 env
251 )),
252 }
253}
254
255fn try_create_appender(
256 path: &str,
257 filename: &str,
258 create_dir: bool,
259) -> Result<RollingFileAppender, Box<dyn std::error::Error>> {
260 if create_dir {
261 std::fs::create_dir_all(path)?;
262 }
263 Ok(RollingFileAppender::builder()
264 .filename_prefix(filename)
265 .filename_suffix("log")
266 .build(path)?)
267}
268
269fn writer() -> Box<dyn Write + Send> {
270 match env::Env::current() {
271 env::Env::Test => Box::new(std::io::stderr()),
272 env::Env::Local | env::Env::MastEmulator | env::Env::Mast => {
273 let suffix = hyperactor_config::global::try_get_cloned(MONARCH_LOG_SUFFIX);
274 let (path, filename) = log_file_path(env::Env::current(), suffix.as_deref()).unwrap();
275 match try_create_appender(&path, &filename, true) {
276 Ok(file_appender) => Box::new(file_appender),
277 Err(e) => {
278 eprintln!(
279 "unable to create log file in {}: {}. Falling back to stderr",
280 path, e
281 );
282 Box::new(std::io::stderr())
283 }
284 }
285 }
286 }
287}
288
289lazy_static! {
290 static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
291 Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {})));
292 static ref SINK_CONTROL_CHANNEL: (
296 mpsc::Sender<DispatcherControl>,
297 Mutex<Option<mpsc::Receiver<DispatcherControl>>>
298 ) = {
299 let (sender, receiver) = mpsc::channel();
300 (sender, Mutex::new(Some(receiver)))
301 };
302 static ref ENTITY_EVENT_STATE: Mutex<EntityEventState> = Mutex::new(
308 EntityEventState::Buffering(Vec::new())
309 );
310}
311
312enum EntityEventState {
317 Buffering(Vec<EntityEvent>),
318 Dispatching(Box<dyn EntityEventDispatcher>),
319}
320
321#[derive(Debug, Clone)]
324pub struct ActorEvent {
325 pub id: u64,
327 pub timestamp: SystemTime,
329 pub mesh_id: u64,
331 pub rank: u64,
333 pub full_name: String,
335 pub display_name: Option<String>,
337}
338
339pub fn notify_actor_created(event: ActorEvent) {
343 dispatch_or_buffer(EntityEvent::Actor(event));
344}
345
346#[derive(Debug, Clone)]
349pub struct MeshEvent {
350 pub id: u64,
352 pub timestamp: SystemTime,
354 pub class: String,
356 pub given_name: String,
358 pub full_name: String,
360 pub shape_json: String,
362 pub parent_mesh_id: Option<u64>,
364 pub parent_view_json: Option<String>,
366}
367
368pub fn notify_mesh_created(event: MeshEvent) {
372 dispatch_or_buffer(EntityEvent::Mesh(event));
373}
374
375#[derive(Debug, Clone)]
378pub struct ActorStatusEvent {
379 pub id: u64,
381 pub timestamp: SystemTime,
383 pub actor_id: u64,
385 pub new_status: String,
387 pub reason: Option<String>,
389}
390
391pub fn notify_actor_status_changed(event: ActorStatusEvent) {
395 dispatch_or_buffer(EntityEvent::ActorStatus(event));
396}
397
398#[derive(Debug, Clone)]
403pub struct SentMessageEvent {
404 pub timestamp: SystemTime,
405 pub sender_actor_id: u64,
407 pub actor_mesh_id: u64,
409 pub view_json: String,
414 pub shape_json: String,
417}
418
419pub fn notify_sent_message(event: SentMessageEvent) {
423 dispatch_or_buffer(EntityEvent::SentMessage(event));
424}
425
426#[derive(Debug, Clone)]
428pub struct MessageEvent {
429 pub timestamp: SystemTime,
430 pub id: u64,
432 pub from_actor_id: u64,
434 pub to_actor_id: u64,
436 pub endpoint: Option<String>,
438 pub port_id: Option<u64>,
440}
441
442pub fn notify_message(event: MessageEvent) {
444 dispatch_or_buffer(EntityEvent::Message(event));
445}
446
447#[derive(Debug, Clone)]
449pub struct MessageStatusEvent {
450 pub timestamp: SystemTime,
451 pub id: u64,
453 pub message_id: u64,
455 pub status: String,
457}
458
459pub fn notify_message_status(event: MessageStatusEvent) {
461 dispatch_or_buffer(EntityEvent::MessageStatus(event));
462}
463
464static ACTOR_STATUS_SEQ: AtomicU64 = AtomicU64::new(1);
465
466pub fn generate_actor_status_event_id(actor_id: u64) -> u64 {
471 let seq = ACTOR_STATUS_SEQ.fetch_add(1, Ordering::Relaxed);
472 hash_to_u64(&(actor_id, seq))
473}
474
475static SEND_SEQ: AtomicU64 = AtomicU64::new(1);
476
477pub fn generate_sent_message_id(sender_actor_id: u64) -> u64 {
479 let seq = SEND_SEQ.fetch_add(1, Ordering::Relaxed);
480 hash_to_u64(&(sender_actor_id, seq))
481}
482
483static RECV_MSG_SEQ: AtomicU64 = AtomicU64::new(1);
484
485pub fn generate_message_id(to_actor_id: u64) -> u64 {
490 let seq = RECV_MSG_SEQ.fetch_add(1, Ordering::Relaxed);
491 hash_to_u64(&(to_actor_id, seq))
492}
493
494static STATUS_EVENT_SEQ: AtomicU64 = AtomicU64::new(1);
495
496pub fn generate_status_event_id(message_id: u64) -> u64 {
501 let seq = STATUS_EVENT_SEQ.fetch_add(1, Ordering::Relaxed);
502 hash_to_u64(&(message_id, seq))
503}
504
505#[derive(Debug, Clone)]
511pub enum EntityEvent {
512 Actor(ActorEvent),
514 Mesh(MeshEvent),
516 ActorStatus(ActorStatusEvent),
518 SentMessage(SentMessageEvent),
520 Message(MessageEvent),
522 MessageStatus(MessageStatusEvent),
524}
525
526pub trait EntityEventDispatcher: Send + Sync {
558 fn dispatch(&self, event: EntityEvent) -> Result<(), anyhow::Error>;
560}
561
562fn dispatch_or_buffer(event: EntityEvent) {
564 if let Ok(mut state) = ENTITY_EVENT_STATE.lock() {
565 match &mut *state {
566 EntityEventState::Dispatching(d) => {
567 if let Err(e) = d.dispatch(event) {
568 tracing::error!("failed to dispatch entity event: {:?}", e);
569 }
570 }
571 EntityEventState::Buffering(buf) => {
572 const MAX_BUFFERED_EVENTS: usize = 1000;
574 if buf.len() < MAX_BUFFERED_EVENTS {
575 buf.push(event);
576 if buf.len() == MAX_BUFFERED_EVENTS {
577 tracing::warn!(
578 "entity event buffer full ({MAX_BUFFERED_EVENTS}); \
579 dropping further events until a dispatcher is registered"
580 );
581 }
582 }
583 }
584 }
585 }
586}
587
588pub fn set_entity_dispatcher(dispatcher: Box<dyn EntityEventDispatcher>) {
596 if let Ok(mut state) = ENTITY_EVENT_STATE.lock() {
597 let buffered =
599 match std::mem::replace(&mut *state, EntityEventState::Dispatching(dispatcher)) {
600 EntityEventState::Buffering(buf) => buf,
601 EntityEventState::Dispatching(_) => Vec::new(),
602 };
603 for event in buffered {
604 if let EntityEventState::Dispatching(d) = &*state {
605 if let Err(e) = d.dispatch(event) {
606 tracing::error!("failed to dispatch buffered entity event: {:?}", e);
607 }
608 }
609 }
610 }
611}
612
613pub fn register_sink(sink: Box<dyn TraceEventSink>) {
630 let sender = &SINK_CONTROL_CHANNEL.0;
631 if let Err(e) = sender.send(DispatcherControl::AddSink(sink)) {
632 eprintln!("[telemetry] failed to register sink: {}", e);
633 }
634}
635
636pub(crate) fn take_sink_control_receiver() -> Option<mpsc::Receiver<DispatcherControl>> {
639 SINK_CONTROL_CHANNEL.1.lock().unwrap().take()
640}
641
642pub fn recorder() -> &'static Recorder {
645 static RECORDER: std::sync::OnceLock<Recorder> = std::sync::OnceLock::new();
646 RECORDER.get_or_init(Recorder::new)
647}
648
649pub fn swap_telemetry_clock(clock: impl TelemetryClock + Send + 'static) {
652 *TELEMETRY_CLOCK.lock().unwrap() = Box::new(clock);
653}
654
655#[macro_export]
659macro_rules! key_value {
660 ($key:expr, $val:expr) => {
661 $crate::opentelemetry::KeyValue::new(
662 $crate::opentelemetry::Key::new($key),
663 $crate::opentelemetry::Value::from($val),
664 )
665 };
666}
667#[macro_export]
683macro_rules! kv_pairs {
684 ($($k:expr => $v:expr),* $(,)?) => {
685 &[$($crate::key_value!($k, $v),)*]
686 };
687}
688
689#[derive(Debug, Clone, Copy)]
690pub enum TimeUnit {
691 Millis,
692 Micros,
693 Nanos,
694}
695
696impl TimeUnit {
697 pub fn as_str(&self) -> &'static str {
698 match self {
699 TimeUnit::Millis => "ms",
700 TimeUnit::Micros => "us",
701 TimeUnit::Nanos => "ns",
702 }
703 }
704}
705pub struct Timer(opentelemetry::metrics::Histogram<u64>, TimeUnit);
706
707impl<'a> Timer {
708 pub fn new(data: opentelemetry::metrics::Histogram<u64>, unit: TimeUnit) -> Self {
709 Timer(data, unit)
710 }
711 pub fn start(&'static self, pairs: &'a [opentelemetry::KeyValue]) -> TimerGuard<'a> {
712 TimerGuard {
713 data: self,
714 pairs,
715 start: Instant::now(),
716 }
717 }
718
719 pub fn record(&'static self, dur: std::time::Duration, pairs: &'a [opentelemetry::KeyValue]) {
720 let dur = match self.1 {
721 TimeUnit::Millis => dur.as_millis(),
722 TimeUnit::Micros => dur.as_micros(),
723 TimeUnit::Nanos => dur.as_nanos(),
724 } as u64;
725
726 self.0.record(dur, pairs);
727 }
728}
729pub struct TimerGuard<'a> {
730 data: &'static Timer,
731 pairs: &'a [opentelemetry::KeyValue],
732 start: Instant,
733}
734
735impl<'a> Drop for TimerGuard<'a> {
736 fn drop(&mut self) {
737 let now = Instant::now();
738 let dur = now.duration_since(self.start);
739 self.data.record(dur, self.pairs);
740 }
741}
742
743#[macro_export]
760macro_rules! declare_static_timer {
761 ($name:ident, $key:expr, $unit:path) => {
762 #[doc = "a global histogram timer named: "]
763 #[doc = $key]
764 pub static $name: std::sync::LazyLock<$crate::Timer> = std::sync::LazyLock::new(|| {
765 $crate::Timer::new(
766 $crate::meter(module_path!())
767 .u64_histogram(format!("{}.{}", $key, $unit.as_str()))
768 .with_unit($unit.as_str())
769 .build(),
770 $unit,
771 )
772 });
773 };
774}
775
776#[macro_export]
798macro_rules! declare_static_counter {
799 ($name:ident, $key:expr) => {
800 #[doc = "a global counter named: "]
801 #[doc = $key]
802 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
803 std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
804 };
805}
806
807#[macro_export]
829macro_rules! declare_static_up_down_counter {
830 ($name:ident, $key:expr) => {
831 #[doc = "a global up down counter named: "]
832 #[doc = $key]
833 pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
834 std::sync::LazyLock::new(|| {
835 $crate::meter(module_path!())
836 .i64_up_down_counter($key)
837 .build()
838 });
839 };
840}
841
842#[macro_export]
864macro_rules! declare_static_gauge {
865 ($name:ident, $key:expr) => {
866 #[doc = "a global gauge named: "]
867 #[doc = $key]
868 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
869 std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
870 };
871}
872#[macro_export]
890macro_rules! declare_observable_gauge {
891 ($name:ident, $key:expr, $cb:expr) => {
892 #[doc = "a global gauge named: "]
893 #[doc = $key]
894 pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
895 std::sync::LazyLock::new(|| {
896 $crate::meter(module_path!())
897 .f64_observable_gauge($key)
898 .with_callback($cb)
899 .build()
900 });
901 };
902}
903#[macro_export]
925macro_rules! declare_static_histogram {
926 ($name:ident, $key:expr) => {
927 #[doc = "a global histogram named: "]
928 #[doc = $key]
929 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Histogram<f64>> =
930 std::sync::LazyLock::new(|| {
931 hyperactor_telemetry::meter(module_path!())
932 .f64_histogram($key)
933 .build()
934 });
935 };
936}
937
938static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
939 std::sync::OnceLock::new();
940
941struct PrefixedFormatter {
943 formatter: Glog<LocalTime>,
944 prefix_env_var: Option<String>,
945}
946
947impl PrefixedFormatter {
948 fn new(prefix_env_var: Option<String>) -> Self {
949 let formatter = Glog::default().with_timer(LocalTime::default());
950 Self {
951 formatter,
952 prefix_env_var,
953 }
954 }
955}
956
957impl<S, N> FormatEvent<S, N> for PrefixedFormatter
958where
959 S: tracing::Subscriber + for<'a> LookupSpan<'a>,
960 N: for<'a> FormatFields<'a> + 'static,
961{
962 fn format_event(
963 &self,
964 ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
965 mut writer: Writer<'_>,
966 event: &tracing::Event<'_>,
967 ) -> std::fmt::Result {
968 let prefix: String = if self.prefix_env_var.is_some() {
969 std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
970 } else {
971 "".to_string()
972 };
973
974 if prefix.is_empty() {
975 write!(writer, "[-]")?;
976 } else {
977 write!(writer, "[{}]", prefix)?;
978 }
979
980 self.formatter.format_event(ctx, writer, event)
981 }
982}
983
984pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
993 initialize_logging_with_log_prefix(clock, None);
994}
995
996pub fn initialize_logging_for_test() {
998 initialize_logging(DefaultTelemetryClock {});
999}
1000
1001pub fn initialize_logging_with_log_prefix(
1016 clock: impl TelemetryClock + Send + 'static,
1017 prefix_env_var: Option<String>,
1018) {
1019 let _ = initialize_logging_with_log_prefix_impl(clock, prefix_env_var, false);
1020}
1021
1022pub fn initialize_logging_with_log_prefix_mock_scuba(
1023 clock: impl TelemetryClock + Send + 'static,
1024 prefix_env_var: Option<String>,
1025) -> Box<dyn TelemetryTestHandle> {
1026 initialize_logging_with_log_prefix_impl(clock, prefix_env_var, true)
1027}
1028
1029fn initialize_logging_with_log_prefix_impl(
1030 clock: impl TelemetryClock + Send + 'static,
1031 prefix_env_var: Option<String>,
1032 mock_scuba: bool,
1033) -> Box<dyn TelemetryTestHandle> {
1034 let use_unified = hyperactor_config::global::get(USE_UNIFIED_LAYER);
1035
1036 swap_telemetry_clock(clock);
1037 let file_log_level = match env::Env::current() {
1038 env::Env::Local => LOG_LEVEL_INFO,
1039 env::Env::MastEmulator => LOG_LEVEL_INFO,
1040 env::Env::Mast => LOG_LEVEL_INFO,
1041 env::Env::Test => LOG_LEVEL_DEBUG,
1042 };
1043
1044 use tracing_subscriber::Registry;
1045 use tracing_subscriber::layer::SubscriberExt;
1046 use tracing_subscriber::util::SubscriberInitExt;
1047
1048 #[cfg(fbcode_build)]
1049 {
1050 let mut mock_scuba_client: Option<crate::meta::scuba_utils::MockScubaClient> = None;
1051
1052 if use_unified {
1053 let mut sinks: Vec<Box<dyn trace_dispatcher::TraceEventSink>> = Vec::new();
1054 sinks.push(Box::new(sinks::glog::GlogSink::new(
1055 writer(),
1056 prefix_env_var.clone(),
1057 file_log_level,
1058 )));
1059
1060 let sqlite_enabled = hyperactor_config::global::get(ENABLE_SQLITE_TRACING);
1061
1062 if sqlite_enabled {
1063 match create_sqlite_sink() {
1064 Ok(sink) => {
1065 sinks.push(Box::new(sink));
1066 }
1067 Err(e) => {
1068 tracing::warn!("failed to create SqliteSink: {}", e);
1069 }
1070 }
1071 }
1072
1073 if hyperactor_config::global::get(sinks::perfetto::PERFETTO_TRACE_MODE)
1074 != sinks::perfetto::PerfettoTraceMode::Off
1075 {
1076 let exec_id = env::execution_id();
1077 let process_name = std::env::var("HYPERACTOR_PROCESS_NAME")
1078 .unwrap_or_else(|_| "client".to_string());
1079 match sinks::perfetto::PerfettoFileSink::new(
1080 sinks::perfetto::default_trace_dir(),
1081 &exec_id,
1082 &process_name,
1083 ) {
1084 Ok(sink) => {
1085 sinks.push(Box::new(sink));
1086 }
1087 Err(e) => {
1088 tracing::warn!("failed to create PerfettoFileSink: {}", e);
1089 }
1090 }
1091 }
1092
1093 {
1094 if hyperactor_config::global::get(ENABLE_OTEL_TRACING) {
1095 use crate::meta;
1096 use crate::meta::scuba_utils::LOG_ENTER_EXIT;
1097
1098 if mock_scuba {
1099 let tracing_client = meta::scuba_utils::MockScubaClient::new();
1100
1101 sinks.push(Box::new(
1102 meta::scuba_sink::ScubaSink::with_client(
1103 tracing_client.clone(),
1104 match meta::tracing_resource().get(&LOG_ENTER_EXIT) {
1105 Some(Value::Bool(enabled)) => enabled,
1106 _ => false,
1107 },
1108 )
1109 .with_target_filter(crate::config::get_tracing_targets()),
1110 ));
1111
1112 mock_scuba_client = Some(tracing_client);
1113 } else {
1114 sinks.push(Box::new(
1115 meta::scuba_sink::ScubaSink::new(meta::tracing_resource())
1116 .with_target_filter(crate::config::get_tracing_targets()),
1117 ));
1118 }
1119 }
1120 }
1121
1122 let dispatcher = trace_dispatcher::TraceEventDispatcher::new(sinks);
1123
1124 if let Err(err) = Registry::default()
1125 .with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1126 Some(recorder().layer())
1127 } else {
1128 None
1129 })
1130 .with(dispatcher)
1131 .try_init()
1132 {
1133 tracing::debug!("logging already initialized for this process: {}", err);
1134 }
1135 } else {
1136 let (non_blocking, guard) =
1138 tracing_appender::non_blocking::NonBlockingBuilder::default()
1139 .lossy(false)
1140 .finish(writer());
1141 let writer_guard = Arc::new((non_blocking, guard));
1142 let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
1143
1144 let file_layer = fmt::Layer::default()
1145 .with_writer(writer_guard.0.clone())
1146 .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
1147 .fmt_fields(GlogFields::default().compact())
1148 .with_ansi(false)
1149 .with_filter(
1150 Targets::new()
1151 .with_default(LevelFilter::from_level({
1152 let log_level_str =
1153 hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
1154 .unwrap_or_else(|| file_log_level.to_string());
1155 tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
1156 tracing::Level::from_str(file_log_level)
1157 .expect("Invalid default log level")
1158 })
1159 }))
1160 .with_target("opentelemetry", LevelFilter::OFF), );
1162
1163 let registry = Registry::default()
1164 .with(if hyperactor_config::global::get(ENABLE_SQLITE_TRACING) {
1165 Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer"))
1168 } else {
1169 None
1170 })
1171 .with(file_layer)
1172 .with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1173 Some(recorder().layer())
1174 } else {
1175 None
1176 });
1177
1178 if mock_scuba {
1179 let tracing_client = crate::meta::scuba_utils::MockScubaClient::new();
1180
1181 let scuba_layer = crate::meta::tracing_layer_with_client(tracing_client.clone());
1182
1183 if let Err(err) = registry.with(scuba_layer).try_init() {
1184 tracing::debug!("logging already initialized for this process: {}", err);
1185 }
1186
1187 mock_scuba_client = Some(tracing_client);
1188 } else if let Err(err) = registry
1189 .with(if hyperactor_config::global::get(ENABLE_OTEL_TRACING) {
1190 Some(otel::tracing_layer())
1191 } else {
1192 None
1193 })
1194 .try_init()
1195 {
1196 tracing::debug!("logging already initialized for this process: {}", err);
1197 }
1198 }
1199 let exec_id = env::execution_id();
1200 let process_name =
1201 std::env::var("HYPERACTOR_PROCESS_NAME").unwrap_or_else(|_| "client".to_string());
1202
1203 tracing::info!(
1205 target: "execution",
1206 execution_id = exec_id,
1207 environment = %env::Env::current(),
1208 args = ?std::env::args(),
1209 build_mode = build_info::BuildInfo::get_build_mode(),
1210 compiler = build_info::BuildInfo::get_compiler(),
1211 compiler_version = build_info::BuildInfo::get_compiler_version(),
1212 buck_rule = build_info::BuildInfo::get_rule(),
1213 package_name = build_info::BuildInfo::get_package_name(),
1214 package_release = build_info::BuildInfo::get_package_release(),
1215 upstream_revision = build_info::BuildInfo::get_upstream_revision(),
1216 revision = build_info::BuildInfo::get_revision(),
1217 process_name = process_name,
1218 "logging_initialized",
1219 );
1220 meta::log_execution_event(
1222 &exec_id,
1223 &env::Env::current().to_string(),
1224 std::env::args().collect(),
1225 build_info::BuildInfo::get_build_mode(),
1226 build_info::BuildInfo::get_compiler(),
1227 build_info::BuildInfo::get_compiler_version(),
1228 build_info::BuildInfo::get_rule(),
1229 build_info::BuildInfo::get_package_name(),
1230 build_info::BuildInfo::get_package_release(),
1231 build_info::BuildInfo::get_upstream_revision(),
1232 build_info::BuildInfo::get_revision(),
1233 &process_name,
1234 );
1235
1236 if hyperactor_config::global::get(ENABLE_OTEL_METRICS) {
1237 otel::init_metrics();
1238 }
1239
1240 if let Some(tracing_client) = mock_scuba_client {
1241 Box::new(MockScubaHandle { tracing_client })
1242 } else {
1243 Box::new(EmptyTestHandle)
1244 }
1245 }
1246 #[cfg(not(fbcode_build))]
1247 {
1248 let registry =
1249 Registry::default().with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1250 Some(recorder().layer())
1251 } else {
1252 None
1253 });
1254
1255 if use_unified {
1256 let mut sinks: Vec<Box<dyn trace_dispatcher::TraceEventSink>> = Vec::new();
1257
1258 let sqlite_enabled = hyperactor_config::global::get(ENABLE_SQLITE_TRACING);
1259
1260 if sqlite_enabled {
1261 match create_sqlite_sink() {
1262 Ok(sink) => {
1263 sinks.push(Box::new(sink));
1264 }
1265 Err(e) => {
1266 tracing::warn!("failed to create SqliteSink: {}", e);
1267 }
1268 }
1269 }
1270
1271 sinks.push(Box::new(sinks::glog::GlogSink::new(
1272 writer(),
1273 prefix_env_var.clone(),
1274 file_log_level,
1275 )));
1276
1277 if let Some(log_sink) = otlp::otlp_log_sink() {
1278 sinks.push(log_sink);
1279 }
1280
1281 let dispatcher = trace_dispatcher::TraceEventDispatcher::new(sinks);
1282
1283 if let Err(err) = registry.with(dispatcher).try_init() {
1284 tracing::debug!("logging already initialized for this process: {}", err);
1285 }
1286 } else {
1287 let (non_blocking, guard) =
1288 tracing_appender::non_blocking::NonBlockingBuilder::default()
1289 .lossy(false)
1290 .finish(writer());
1291 let writer_guard = Arc::new((non_blocking, guard));
1292 let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
1293
1294 let file_layer = fmt::Layer::default()
1295 .with_writer(writer_guard.0.clone())
1296 .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
1297 .fmt_fields(GlogFields::default().compact())
1298 .with_ansi(false)
1299 .with_filter(
1300 Targets::new()
1301 .with_default(LevelFilter::from_level({
1302 let log_level_str =
1303 hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
1304 .unwrap_or_else(|| file_log_level.to_string());
1305 tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
1306 tracing::Level::from_str(file_log_level)
1307 .expect("Invalid default log level")
1308 })
1309 }))
1310 .with_target("opentelemetry", LevelFilter::OFF), );
1312
1313 if let Err(err) = registry.with(file_layer).try_init() {
1314 tracing::debug!("logging already initialized for this process: {}", err);
1315 }
1316 }
1317
1318 otel::init_metrics();
1319
1320 Box::new(EmptyTestHandle)
1321 }
1322}
1323
1324fn create_sqlite_sink() -> anyhow::Result<sinks::sqlite::SqliteSink> {
1325 let (db_path, _) = log_file_path(env::Env::current(), Some("traces"))
1326 .expect("failed to determine trace db path");
1327 let db_file = format!("{}/hyperactor_trace_{}.db", db_path, std::process::id());
1328
1329 sinks::sqlite::SqliteSink::new_with_file(&db_file, 100)
1330}
1331
1332#[macro_export]
1345macro_rules! context_span {
1346 (target: $target:expr, parent: $parent:expr, $name:expr, $($field:tt)*) => {
1347 ::tracing::error_span!(
1348 target: $target,
1349 parent: $parent,
1350 $name,
1351 skip_record = $crate::skip_record,
1352 $($field)*
1353 )
1354 };
1355 (target: $target:expr, parent: $parent:expr, $name:expr) => {
1356 ::tracing::error_span!(
1357 target: $target,
1358 parent: $parent,
1359 $name,
1360 skip_record = $crate::skip_record,
1361 )
1362 };
1363 (parent: $parent:expr, $name:expr, $($field:tt)*) => {
1364 ::tracing::error_span!(
1365 target: module_path!(),
1366 parent: $parent,
1367 $name,
1368 skip_record = $crate::skip_record,
1369 $($field)*
1370 )
1371 };
1372 (parent: $parent:expr, $name:expr) => {
1373 ::tracing::error_span!(
1374 parent: $parent,
1375 $name,
1376 skip_record = $crate::skip_record,
1377 )
1378 };
1379 (target: $target:expr, $name:expr, $($field:tt)*) => {
1380 ::tracing::error_span!(
1381 target: $target,
1382 $name,
1383 skip_record = $crate::skip_record,
1384 $($field)*
1385 )
1386 };
1387 (target: $target:expr, $name:expr) => {
1388 ::tracing::error_span!(
1389 target: $target,
1390 $name,
1391 skip_record = $crate::skip_record,
1392 )
1393 };
1394 ($name:expr, $($field:tt)*) => {
1395 ::tracing::error_span!(
1396 target: module_path!(),
1397 $name,
1398 skip_record = $crate::skip_record,
1399 $($field)*
1400 )
1401 };
1402 ($name:expr) => {
1403 ::tracing::error_span!(
1404 $name,
1405 skip_record = $crate::skip_record,
1406 )
1407 };
1408}
1409
1410pub mod env {
1411 use rand::RngCore;
1412
1413 pub const HYPERACTOR_EXECUTION_ID_ENV: &str = "HYPERACTOR_EXECUTION_ID";
1415 pub const OTEL_EXPORTER: &str = "HYPERACTOR_OTEL_EXPORTER";
1416 pub const MAST_ENVIRONMENT: &str = "MAST_ENVIRONMENT";
1417
1418 pub fn execution_id() -> String {
1425 let id = std::env::var(HYPERACTOR_EXECUTION_ID_ENV).unwrap_or_else(|_| {
1426 let username = crate::username();
1428 let now = {
1429 let now = std::time::SystemTime::now();
1430 let datetime: chrono::DateTime<chrono::Local> = now.into();
1431 datetime.format("%b-%d_%H:%M").to_string()
1432 };
1433 let random_number: u16 = (rand::rng().next_u32() % 1000) as u16;
1434 let execution_id = format!("{}_{}_{}", username, now, random_number);
1435 execution_id
1436 });
1437 unsafe {
1440 std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, id.clone());
1441 }
1442 id
1443 }
1444
1445 #[cfg(fbcode_build)]
1447 pub async fn execution_url() -> anyhow::Result<Option<String>> {
1448 Ok(Some(
1449 crate::meta::scuba_tracing::url::get_samples_shorturl(&execution_id()).await?,
1450 ))
1451 }
1452 #[cfg(not(fbcode_build))]
1453 pub async fn execution_url() -> anyhow::Result<Option<String>> {
1454 Ok(None)
1455 }
1456
1457 #[derive(PartialEq)]
1458 pub enum Env {
1459 Local,
1460 Mast,
1461 MastEmulator,
1462 Test,
1463 }
1464
1465 impl std::fmt::Display for Env {
1466 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1467 write!(
1468 f,
1469 "{}",
1470 match self {
1471 Self::Local => crate::ENV_VALUE_LOCAL,
1472 Self::MastEmulator => crate::ENV_VALUE_MAST_EMULATOR,
1473 Self::Mast => crate::ENV_VALUE_MAST,
1474 Self::Test => crate::ENV_VALUE_TEST,
1475 }
1476 )
1477 }
1478 }
1479
1480 impl Env {
1481 #[cfg(test)]
1482 pub fn current() -> Self {
1483 Self::Test
1484 }
1485
1486 #[cfg(not(test))]
1487 pub fn current() -> Self {
1488 match std::env::var(MAST_ENVIRONMENT).unwrap_or_default().as_str() {
1489 crate::ENV_VALUE_LOCAL_MAST_SIMULATOR => Self::MastEmulator,
1491 _ => match std::env::var(crate::MAST_HPC_JOB_NAME_ENV).is_ok() {
1492 true => Self::Mast,
1493 false => Self::Local,
1494 },
1495 }
1496 }
1497 }
1498}
1499
1500#[cfg(test)]
1501mod test {
1502 use opentelemetry::*;
1503 extern crate self as hyperactor_telemetry;
1504 use super::*;
1505
1506 #[test]
1507 fn infer_kv_pair_types() {
1508 assert_eq!(
1509 key_value!("str", "str"),
1510 KeyValue::new(Key::new("str"), Value::String("str".into()))
1511 );
1512 assert_eq!(
1513 key_value!("str", 25),
1514 KeyValue::new(Key::new("str"), Value::I64(25))
1515 );
1516 assert_eq!(
1517 key_value!("str", 1.1),
1518 KeyValue::new(Key::new("str"), Value::F64(1.1))
1519 );
1520 }
1521 #[test]
1522 fn kv_pair_slices() {
1523 assert_eq!(
1524 kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
1525 &[
1526 key_value!("1", "1"),
1527 key_value!("2", 2),
1528 key_value!("3", 3.0),
1529 ],
1530 );
1531 }
1532
1533 #[test]
1534 fn test_static_gauge() {
1535 declare_static_gauge!(TEST_GAUGE, "test_gauge");
1537 declare_static_gauge!(MEMORY_GAUGE, "memory_usage");
1538
1539 TEST_GAUGE.record(42.5, kv_pairs!("component" => "test", "unit" => "MB"));
1542 MEMORY_GAUGE.record(512.0, kv_pairs!("type" => "heap", "process" => "test"));
1543
1544 TEST_GAUGE.record(50.0, &[]);
1546 }
1547}