1#![allow(internal_features)]
10#![feature(assert_matches)]
11#![feature(sync_unsafe_cell)]
12#![feature(mpmc_channel)]
13#![feature(cfg_version)]
14#![feature(formatting_options)]
15#![feature(thread_id_value)]
16#![recursion_limit = "256"]
17
18pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME";
20
21const LOG_LEVEL_INFO: &str = "info";
23const LOG_LEVEL_DEBUG: &str = "debug";
24
25const SPAN_FIELD_RECORDING: &str = "recording";
27#[allow(dead_code)]
28const SPAN_FIELD_RECORDER: &str = "recorder";
29
30pub const SUBJECT_KEY: &str = "subject";
34
35const ENV_VALUE_LOCAL: &str = "local";
37const ENV_VALUE_MAST_EMULATOR: &str = "mast_emulator";
38const ENV_VALUE_MAST: &str = "mast";
39const ENV_VALUE_TEST: &str = "test";
40#[allow(dead_code)]
41const ENV_VALUE_LOCAL_MAST_SIMULATOR: &str = "local_mast_simulator";
42
43#[allow(non_upper_case_globals)]
55pub const skip_record: bool = true;
57
58mod config;
59pub mod in_memory_reader;
60#[cfg(all(fbcode_build, target_os = "linux"))]
61mod meta;
62mod otel;
63pub(crate) mod otlp;
64mod pool;
65mod rate_limit;
66pub mod recorder;
67pub mod sinks;
68mod spool;
69pub mod sqlite;
70pub mod task;
71pub mod trace;
72pub mod trace_dispatcher;
73
74use std::collections::hash_map::DefaultHasher;
76use std::hash::Hash;
77use std::hash::Hasher;
78use std::io::Write;
79use std::str::FromStr;
80use std::sync::Arc;
81use std::sync::Mutex;
82use std::sync::atomic::AtomicU64;
83use std::sync::atomic::Ordering;
84use std::sync::mpsc;
85use std::time::Instant;
86use std::time::SystemTime;
87
88use lazy_static::lazy_static;
89pub use opentelemetry;
90pub use opentelemetry::Key;
91pub use opentelemetry::KeyValue;
92pub use opentelemetry::Value;
93pub use opentelemetry::global::meter;
94pub use trace_dispatcher::DispatcherControl;
95pub use trace_dispatcher::FieldValue;
96pub use trace_dispatcher::TraceEvent;
97pub use trace_dispatcher::TraceEventSink;
98use trace_dispatcher::TraceFields;
99pub use tracing;
100pub use tracing::Level;
101use tracing_appender::non_blocking::NonBlocking;
102use tracing_appender::non_blocking::WorkerGuard;
103use tracing_appender::rolling::RollingFileAppender;
104use tracing_glog::Glog;
105use tracing_glog::GlogFields;
106use tracing_glog::LocalTime;
107use tracing_subscriber::Layer;
108use tracing_subscriber::filter::LevelFilter;
109use tracing_subscriber::filter::Targets;
110use tracing_subscriber::fmt;
111use tracing_subscriber::fmt::FormatEvent;
112use tracing_subscriber::fmt::FormatFields;
113use tracing_subscriber::fmt::format::Writer;
114use tracing_subscriber::registry::LookupSpan;
115
116#[cfg(all(fbcode_build, target_os = "linux"))]
117use crate::config::ENABLE_OTEL_METRICS;
118#[cfg(all(fbcode_build, target_os = "linux"))]
119use crate::config::ENABLE_OTEL_TRACING;
120use crate::config::ENABLE_RECORDER_TRACING;
121use crate::config::ENABLE_SQLITE_TRACING;
122use crate::config::MONARCH_FILE_LOG_LEVEL;
123use crate::config::MONARCH_LOG_SUFFIX;
124use crate::config::USE_UNIFIED_LAYER;
125use crate::recorder::Recorder;
126#[cfg(all(fbcode_build, target_os = "linux"))]
127use crate::sqlite::get_reloadable_sqlite_layer;
128
129pub fn hash_to_u64(value: &impl Hash) -> u64 {
131 let mut hasher = DefaultHasher::new();
132 value.hash(&mut hasher);
133 hasher.finish()
134}
135
136#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
137pub struct TelemetrySample {
138 fields: Vec<(String, String)>,
139}
140
141impl TelemetrySample {
142 pub fn get_string(&self, key: &str) -> Option<&str> {
143 for (k, v) in &self.fields {
144 if k == key {
145 return Some(v.as_str());
146 }
147 }
148 None
149 }
150}
151
152#[cfg(all(fbcode_build, target_os = "linux"))]
153impl From<crate::meta::sample_buffer::Sample> for TelemetrySample {
154 fn from(sample: crate::meta::sample_buffer::Sample) -> Self {
155 let mut fields = Vec::new();
156 for (key, value) in sample.0 {
157 if let crate::meta::sample_buffer::SampleValue::String(s) = value {
158 fields.push((key.to_string(), s.to_string()));
159 }
160 }
161 Self { fields }
162 }
163}
164
165#[cfg(not(all(fbcode_build, target_os = "linux")))]
166impl TelemetrySample {
167 pub fn new() -> Self {
168 Self { fields: Vec::new() }
169 }
170}
171
172pub trait TelemetryTestHandle {
173 fn get_tracing_samples(&self) -> Vec<TelemetrySample>;
174}
175
176#[cfg(all(fbcode_build, target_os = "linux"))]
177struct MockScubaHandle {
178 tracing_client: crate::meta::scuba_utils::MockScubaClient,
179}
180
181#[cfg(all(fbcode_build, target_os = "linux"))]
182impl TelemetryTestHandle for MockScubaHandle {
183 fn get_tracing_samples(&self) -> Vec<TelemetrySample> {
184 self.tracing_client
185 .get_samples()
186 .into_iter()
187 .map(TelemetrySample::from)
188 .collect()
189 }
190}
191
192struct EmptyTestHandle;
193
194impl TelemetryTestHandle for EmptyTestHandle {
195 fn get_tracing_samples(&self) -> Vec<TelemetrySample> {
196 vec![]
197 }
198}
199
200pub trait TelemetryClock {
201 fn now(&self) -> tokio::time::Instant;
202 fn system_time_now(&self) -> std::time::SystemTime;
203}
204
205pub struct DefaultTelemetryClock {}
206
207impl TelemetryClock for DefaultTelemetryClock {
208 fn now(&self) -> tokio::time::Instant {
209 tokio::time::Instant::now()
210 }
211
212 fn system_time_now(&self) -> std::time::SystemTime {
213 std::time::SystemTime::now()
214 }
215}
216
217pub fn username() -> String {
218 let env = env::Env::current();
219 match env {
220 env::Env::Mast => {
221 std::env::var("MAST_JOB_OWNER_UNIXNAME").unwrap_or_else(|_| "mast_owner".to_string())
222 }
223 _ => whoami::username(),
224 }
225}
226
227pub fn log_file_path(
230 env: env::Env,
231 suffix: Option<&str>,
232) -> Result<(String, String), anyhow::Error> {
233 let suffix = suffix
234 .map(|s| {
235 if s.is_empty() {
236 String::new()
237 } else {
238 format!("_{}", s)
239 }
240 })
241 .unwrap_or_default();
242 match env {
243 env::Env::Local | env::Env::MastEmulator => {
244 let username = if whoami::username().is_empty() {
245 "monarch".to_string()
246 } else {
247 whoami::username()
248 };
249 Ok((
250 format!("/tmp/{}", username),
251 format!("monarch_log{}", suffix),
252 ))
253 }
254 env::Env::Mast => Ok((
255 "/logs/".to_string(),
256 format!("dedicated_log_monarch{}", suffix),
257 )),
258 _ => Err(anyhow::anyhow!(
259 "file writer unsupported for environment {}",
260 env
261 )),
262 }
263}
264
265fn try_create_appender(
266 path: &str,
267 filename: &str,
268 create_dir: bool,
269) -> Result<RollingFileAppender, Box<dyn std::error::Error>> {
270 if create_dir {
271 std::fs::create_dir_all(path)?;
272 }
273 Ok(RollingFileAppender::builder()
274 .filename_prefix(filename)
275 .filename_suffix("log")
276 .build(path)?)
277}
278
279fn writer() -> Box<dyn Write + Send> {
280 match env::Env::current() {
281 env::Env::Test => Box::new(std::io::stderr()),
282 env::Env::Local | env::Env::MastEmulator | env::Env::Mast => {
283 let suffix = hyperactor_config::global::try_get_cloned(MONARCH_LOG_SUFFIX);
284 let (path, filename) = log_file_path(env::Env::current(), suffix.as_deref()).unwrap();
285 match try_create_appender(&path, &filename, true) {
286 Ok(file_appender) => Box::new(file_appender),
287 Err(e) => {
288 eprintln!(
289 "unable to create log file in {}: {}. Falling back to stderr",
290 path, e
291 );
292 Box::new(std::io::stderr())
293 }
294 }
295 }
296 }
297}
298
299lazy_static! {
300 static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
301 Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {})));
302 static ref SYNTHETIC_TRACE_EVENT_SENDER: Mutex<Option<mpsc::SyncSender<TraceEvent>>> =
309 Mutex::new(None);
310 static ref SINK_CONTROL_CHANNEL: (
314 mpsc::Sender<DispatcherControl>,
315 Mutex<Option<mpsc::Receiver<DispatcherControl>>>
316 ) = {
317 let (sender, receiver) = mpsc::channel();
318 (sender, Mutex::new(Some(receiver)))
319 };
320 static ref ENTITY_EVENT_STATE: Mutex<EntityEventState> = Mutex::new(
326 EntityEventState::Buffering(Vec::new())
327 );
328}
329
330const SYNTHETIC_USER_SPAN_ID_BASE: u64 = 1 << 63;
331static USER_SPAN_SEQ: AtomicU64 = AtomicU64::new(SYNTHETIC_USER_SPAN_ID_BASE);
332const SYNTHETIC_USER_SPAN_TRACK_NAME: &str = "python";
333
334pub(crate) fn set_synthetic_trace_event_sender(sender: mpsc::SyncSender<TraceEvent>) {
337 *SYNTHETIC_TRACE_EVENT_SENDER
338 .lock()
339 .expect("SYNTHETIC_TRACE_EVENT_SENDER mutex should not be poisoned") = Some(sender);
340}
341
342pub(crate) fn emit_trace_event(event: TraceEvent) -> bool {
345 let sender = SYNTHETIC_TRACE_EVENT_SENDER
346 .lock()
347 .expect("SYNTHETIC_TRACE_EVENT_SENDER mutex should not be poisoned")
348 .clone();
349 match sender {
350 Some(sender) => sender.try_send(event).is_ok(),
351 None => false,
352 }
353}
354
355pub fn start_user_span(
357 name: &'static str,
358 target: &'static str,
359 fields: impl IntoIterator<Item = (&'static str, FieldValue)>,
360) -> u64 {
361 if SYNTHETIC_TRACE_EVENT_SENDER
362 .lock()
363 .expect("SYNTHETIC_TRACE_EVENT_SENDER mutex should not be poisoned")
364 .is_none()
365 {
366 return 0;
367 }
368
369 let id = USER_SPAN_SEQ.fetch_add(1, Ordering::Relaxed);
370
371 let fields = fields.into_iter().collect::<TraceFields>();
372
373 let _ = emit_trace_event(TraceEvent::NewSpan {
374 id,
375 name,
376 target,
377 level: tracing::Level::INFO,
378 fields,
379 timestamp: SystemTime::now(),
380 parent_id: None,
381 thread_name: SYNTHETIC_USER_SPAN_TRACK_NAME,
382 file: None,
383 line: None,
384 });
385
386 let _ = emit_trace_event(TraceEvent::SpanEnter {
387 id,
388 timestamp: SystemTime::now(),
389 thread_name: SYNTHETIC_USER_SPAN_TRACK_NAME,
390 });
391
392 id
393}
394
395pub fn end_user_span(id: u64) {
397 if id == 0 {
398 return;
399 }
400
401 let _ = emit_trace_event(TraceEvent::SpanExit {
402 id,
403 timestamp: SystemTime::now(),
404 thread_name: SYNTHETIC_USER_SPAN_TRACK_NAME,
405 });
406
407 let _ = emit_trace_event(TraceEvent::SpanClose {
408 id,
409 timestamp: SystemTime::now(),
410 });
411}
412
413enum EntityEventState {
418 Buffering(Vec<EntityEvent>),
419 Dispatching(Box<dyn EntityEventDispatcher>),
420}
421
422#[derive(Debug, Clone)]
425pub struct ActorEvent {
426 pub id: u64,
428 pub timestamp: SystemTime,
430 pub mesh_id: u64,
432 pub rank: u64,
434 pub full_name: String,
436 pub display_name: Option<String>,
438}
439
440pub fn notify_actor_created(event: ActorEvent) {
444 dispatch_or_buffer(EntityEvent::Actor(event));
445}
446
447#[derive(Debug, Clone)]
450pub struct MeshEvent {
451 pub id: u64,
453 pub timestamp: SystemTime,
455 pub class: String,
457 pub given_name: String,
459 pub full_name: String,
461 pub shape_json: String,
463 pub parent_mesh_id: Option<u64>,
465 pub parent_view_json: Option<String>,
467}
468
469pub fn notify_mesh_created(event: MeshEvent) {
473 dispatch_or_buffer(EntityEvent::Mesh(event));
474}
475
476#[derive(Debug, Clone)]
479pub struct ActorStatusEvent {
480 pub id: u64,
482 pub timestamp: SystemTime,
484 pub actor_id: u64,
486 pub new_status: String,
488 pub reason: Option<String>,
490}
491
492pub fn notify_actor_status_changed(event: ActorStatusEvent) {
496 dispatch_or_buffer(EntityEvent::ActorStatus(event));
497}
498
499#[derive(Debug, Clone)]
504pub struct SentMessageEvent {
505 pub timestamp: SystemTime,
506 pub sender_actor_id: u64,
508 pub actor_mesh_id: u64,
510 pub view_json: String,
515 pub shape_json: String,
518}
519
520pub fn notify_sent_message(event: SentMessageEvent) {
524 dispatch_or_buffer(EntityEvent::SentMessage(event));
525}
526
527#[derive(Debug, Clone)]
529pub struct MessageEvent {
530 pub timestamp: SystemTime,
531 pub id: u64,
533 pub from_actor_id: u64,
535 pub to_actor_id: u64,
537 pub endpoint: Option<String>,
539 pub port_id: Option<u64>,
541}
542
543pub fn notify_message(event: MessageEvent) {
545 dispatch_or_buffer(EntityEvent::Message(event));
546}
547
548#[derive(Debug, Clone)]
550pub struct MessageStatusEvent {
551 pub timestamp: SystemTime,
552 pub id: u64,
554 pub message_id: u64,
556 pub status: String,
558}
559
560pub fn notify_message_status(event: MessageStatusEvent) {
562 dispatch_or_buffer(EntityEvent::MessageStatus(event));
563}
564
565static ACTOR_STATUS_SEQ: AtomicU64 = AtomicU64::new(1);
566
567pub fn generate_actor_status_event_id(actor_id: u64) -> u64 {
572 let seq = ACTOR_STATUS_SEQ.fetch_add(1, Ordering::Relaxed);
573 hash_to_u64(&(actor_id, seq))
574}
575
576static SEND_SEQ: AtomicU64 = AtomicU64::new(1);
577
578pub fn generate_sent_message_id(sender_actor_id: u64) -> u64 {
580 let seq = SEND_SEQ.fetch_add(1, Ordering::Relaxed);
581 hash_to_u64(&(sender_actor_id, seq))
582}
583
584static RECV_MSG_SEQ: AtomicU64 = AtomicU64::new(1);
585
586pub fn generate_message_id(to_actor_id: u64) -> u64 {
591 let seq = RECV_MSG_SEQ.fetch_add(1, Ordering::Relaxed);
592 hash_to_u64(&(to_actor_id, seq))
593}
594
595static STATUS_EVENT_SEQ: AtomicU64 = AtomicU64::new(1);
596
597pub fn generate_status_event_id(message_id: u64) -> u64 {
602 let seq = STATUS_EVENT_SEQ.fetch_add(1, Ordering::Relaxed);
603 hash_to_u64(&(message_id, seq))
604}
605
606#[derive(Debug, Clone)]
612pub enum EntityEvent {
613 Actor(ActorEvent),
615 Mesh(MeshEvent),
617 ActorStatus(ActorStatusEvent),
619 SentMessage(SentMessageEvent),
621 Message(MessageEvent),
623 MessageStatus(MessageStatusEvent),
625}
626
627pub trait EntityEventDispatcher: Send + Sync {
659 fn dispatch(&self, event: EntityEvent) -> Result<(), anyhow::Error>;
661}
662
663fn dispatch_or_buffer(event: EntityEvent) {
665 if let Ok(mut state) = ENTITY_EVENT_STATE.lock() {
666 match &mut *state {
667 EntityEventState::Dispatching(d) => {
668 if let Err(e) = d.dispatch(event) {
669 tracing::error!("failed to dispatch entity event: {:?}", e);
670 }
671 }
672 EntityEventState::Buffering(buf) => {
673 const MAX_BUFFERED_EVENTS: usize = 1000;
675 if buf.len() < MAX_BUFFERED_EVENTS {
676 buf.push(event);
677 if buf.len() == MAX_BUFFERED_EVENTS {
678 tracing::warn!(
679 "entity event buffer full ({MAX_BUFFERED_EVENTS}); \
680 dropping further events until a dispatcher is registered"
681 );
682 }
683 }
684 }
685 }
686 }
687}
688
689pub fn set_entity_dispatcher(dispatcher: Box<dyn EntityEventDispatcher>) {
697 if let Ok(mut state) = ENTITY_EVENT_STATE.lock() {
698 let buffered =
700 match std::mem::replace(&mut *state, EntityEventState::Dispatching(dispatcher)) {
701 EntityEventState::Buffering(buf) => buf,
702 EntityEventState::Dispatching(_) => Vec::new(),
703 };
704 for event in buffered {
705 if let EntityEventState::Dispatching(d) = &*state
706 && let Err(e) = d.dispatch(event)
707 {
708 tracing::error!("failed to dispatch buffered entity event: {:?}", e);
709 }
710 }
711 }
712}
713
714pub fn register_sink(sink: Box<dyn TraceEventSink>) {
731 let sender = &SINK_CONTROL_CHANNEL.0;
732 if let Err(e) = sender.send(DispatcherControl::AddSink(sink)) {
733 eprintln!("[telemetry] failed to register sink: {}", e);
734 }
735}
736
737pub(crate) fn take_sink_control_receiver() -> Option<mpsc::Receiver<DispatcherControl>> {
740 SINK_CONTROL_CHANNEL.1.lock().unwrap().take()
741}
742
743pub fn recorder() -> &'static Recorder {
746 static RECORDER: std::sync::OnceLock<Recorder> = std::sync::OnceLock::new();
747 RECORDER.get_or_init(Recorder::new)
748}
749
750pub fn swap_telemetry_clock(clock: impl TelemetryClock + Send + 'static) {
753 *TELEMETRY_CLOCK.lock().unwrap() = Box::new(clock);
754}
755
756#[macro_export]
760macro_rules! key_value {
761 ($key:expr, $val:expr) => {
762 $crate::opentelemetry::KeyValue::new(
763 $crate::opentelemetry::Key::new($key),
764 $crate::opentelemetry::Value::from($val),
765 )
766 };
767}
768#[macro_export]
784macro_rules! kv_pairs {
785 ($($k:expr => $v:expr),* $(,)?) => {
786 &[$($crate::key_value!($k, $v),)*]
787 };
788}
789
790#[derive(Debug, Clone, Copy)]
791pub enum TimeUnit {
792 Millis,
793 Micros,
794 Nanos,
795}
796
797impl TimeUnit {
798 pub fn as_str(&self) -> &'static str {
799 match self {
800 TimeUnit::Millis => "ms",
801 TimeUnit::Micros => "us",
802 TimeUnit::Nanos => "ns",
803 }
804 }
805}
806pub struct Timer(opentelemetry::metrics::Histogram<u64>, TimeUnit);
807
808impl<'a> Timer {
809 pub fn new(data: opentelemetry::metrics::Histogram<u64>, unit: TimeUnit) -> Self {
810 Timer(data, unit)
811 }
812 pub fn start(&'static self, pairs: &'a [opentelemetry::KeyValue]) -> TimerGuard<'a> {
813 TimerGuard {
814 data: self,
815 pairs,
816 start: Instant::now(),
817 }
818 }
819
820 pub fn record(&'static self, dur: std::time::Duration, pairs: &'a [opentelemetry::KeyValue]) {
821 let dur = match self.1 {
822 TimeUnit::Millis => dur.as_millis(),
823 TimeUnit::Micros => dur.as_micros(),
824 TimeUnit::Nanos => dur.as_nanos(),
825 } as u64;
826
827 self.0.record(dur, pairs);
828 }
829}
830pub struct TimerGuard<'a> {
831 data: &'static Timer,
832 pairs: &'a [opentelemetry::KeyValue],
833 start: Instant,
834}
835
836impl Drop for TimerGuard<'_> {
837 fn drop(&mut self) {
838 let now = Instant::now();
839 let dur = now.duration_since(self.start);
840 self.data.record(dur, self.pairs);
841 }
842}
843
844#[macro_export]
861macro_rules! declare_static_timer {
862 ($name:ident, $key:expr, $unit:path) => {
863 #[doc = "a global histogram timer named: "]
864 #[doc = $key]
865 pub static $name: std::sync::LazyLock<$crate::Timer> = std::sync::LazyLock::new(|| {
866 $crate::Timer::new(
867 $crate::meter(module_path!())
868 .u64_histogram(format!("{}.{}", $key, $unit.as_str()))
869 .with_unit($unit.as_str())
870 .build(),
871 $unit,
872 )
873 });
874 };
875}
876
877#[macro_export]
899macro_rules! declare_static_counter {
900 ($name:ident, $key:expr) => {
901 #[doc = "a global counter named: "]
902 #[doc = $key]
903 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
904 std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
905 };
906}
907
908#[macro_export]
930macro_rules! declare_static_up_down_counter {
931 ($name:ident, $key:expr) => {
932 #[doc = "a global up down counter named: "]
933 #[doc = $key]
934 pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
935 std::sync::LazyLock::new(|| {
936 $crate::meter(module_path!())
937 .i64_up_down_counter($key)
938 .build()
939 });
940 };
941}
942
943#[macro_export]
965macro_rules! declare_static_gauge {
966 ($name:ident, $key:expr) => {
967 #[doc = "a global gauge named: "]
968 #[doc = $key]
969 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
970 std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
971 };
972}
973#[macro_export]
991macro_rules! declare_observable_gauge {
992 ($name:ident, $key:expr, $cb:expr) => {
993 #[doc = "a global gauge named: "]
994 #[doc = $key]
995 pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
996 std::sync::LazyLock::new(|| {
997 $crate::meter(module_path!())
998 .f64_observable_gauge($key)
999 .with_callback($cb)
1000 .build()
1001 });
1002 };
1003}
1004#[macro_export]
1026macro_rules! declare_static_histogram {
1027 ($name:ident, $key:expr) => {
1028 #[doc = "a global histogram named: "]
1029 #[doc = $key]
1030 pub static $name: std::sync::LazyLock<opentelemetry::metrics::Histogram<f64>> =
1031 std::sync::LazyLock::new(|| {
1032 hyperactor_telemetry::meter(module_path!())
1033 .f64_histogram($key)
1034 .build()
1035 });
1036 };
1037}
1038
1039static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
1040 std::sync::OnceLock::new();
1041
1042struct PrefixedFormatter {
1044 formatter: Glog<LocalTime>,
1045 prefix_env_var: Option<String>,
1046}
1047
1048impl PrefixedFormatter {
1049 fn new(prefix_env_var: Option<String>) -> Self {
1050 let formatter = Glog::default().with_timer(LocalTime::default());
1051 Self {
1052 formatter,
1053 prefix_env_var,
1054 }
1055 }
1056}
1057
1058impl<S, N> FormatEvent<S, N> for PrefixedFormatter
1059where
1060 S: tracing::Subscriber + for<'a> LookupSpan<'a>,
1061 N: for<'a> FormatFields<'a> + 'static,
1062{
1063 fn format_event(
1064 &self,
1065 ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
1066 mut writer: Writer<'_>,
1067 event: &tracing::Event<'_>,
1068 ) -> std::fmt::Result {
1069 let prefix: String = if self.prefix_env_var.is_some() {
1070 std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
1071 } else {
1072 "".to_string()
1073 };
1074
1075 if prefix.is_empty() {
1076 write!(writer, "[-]")?;
1077 } else {
1078 write!(writer, "[{}]", prefix)?;
1079 }
1080
1081 self.formatter.format_event(ctx, writer, event)
1082 }
1083}
1084
1085pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
1094 initialize_logging_with_log_prefix(clock, None);
1095}
1096
1097pub fn initialize_logging_for_test() {
1099 initialize_logging(DefaultTelemetryClock {});
1100}
1101
1102pub fn initialize_logging_with_log_prefix(
1117 clock: impl TelemetryClock + Send + 'static,
1118 prefix_env_var: Option<String>,
1119) {
1120 let _ = initialize_logging_with_log_prefix_impl(clock, prefix_env_var, false);
1121}
1122
1123pub fn initialize_logging_with_log_prefix_mock_scuba(
1124 clock: impl TelemetryClock + Send + 'static,
1125 prefix_env_var: Option<String>,
1126) -> Box<dyn TelemetryTestHandle> {
1127 initialize_logging_with_log_prefix_impl(clock, prefix_env_var, true)
1128}
1129
1130fn initialize_logging_with_log_prefix_impl(
1131 clock: impl TelemetryClock + Send + 'static,
1132 prefix_env_var: Option<String>,
1133 mock_scuba: bool,
1134) -> Box<dyn TelemetryTestHandle> {
1135 #[cfg(not(all(fbcode_build, target_os = "linux")))]
1136 let _ = mock_scuba;
1137
1138 let use_unified = hyperactor_config::global::get(USE_UNIFIED_LAYER);
1139 let should_install_subscriber = !tracing::dispatcher::has_been_set();
1140
1141 swap_telemetry_clock(clock);
1142 if !should_install_subscriber {
1143 tracing::debug!("logging already initialized for this process");
1144 }
1145 let file_log_level = match env::Env::current() {
1146 env::Env::Local => LOG_LEVEL_INFO,
1147 env::Env::MastEmulator => LOG_LEVEL_INFO,
1148 env::Env::Mast => LOG_LEVEL_INFO,
1149 env::Env::Test => LOG_LEVEL_DEBUG,
1150 };
1151
1152 use tracing_subscriber::Registry;
1153 use tracing_subscriber::layer::SubscriberExt;
1154 use tracing_subscriber::util::SubscriberInitExt;
1155
1156 #[cfg(all(fbcode_build, target_os = "linux"))]
1157 {
1158 let mut mock_scuba_client: Option<crate::meta::scuba_utils::MockScubaClient> = None;
1159
1160 if should_install_subscriber {
1161 if use_unified {
1162 let mut sinks: Vec<Box<dyn trace_dispatcher::TraceEventSink>> = Vec::new();
1163 sinks.push(Box::new(sinks::glog::GlogSink::new(
1164 writer(),
1165 prefix_env_var.clone(),
1166 file_log_level,
1167 )));
1168
1169 let sqlite_enabled = hyperactor_config::global::get(ENABLE_SQLITE_TRACING);
1170
1171 if sqlite_enabled {
1172 match create_sqlite_sink() {
1173 Ok(sink) => {
1174 sinks.push(Box::new(sink));
1175 }
1176 Err(e) => {
1177 tracing::warn!("failed to create SqliteSink: {}", e);
1178 }
1179 }
1180 }
1181
1182 if hyperactor_config::global::get(sinks::perfetto::PERFETTO_TRACE_MODE)
1183 != sinks::perfetto::PerfettoTraceMode::Off
1184 {
1185 let exec_id = env::execution_id();
1186 let process_name = std::env::var("HYPERACTOR_PROCESS_NAME")
1187 .unwrap_or_else(|_| "client".to_string());
1188 match sinks::perfetto::PerfettoFileSink::new(
1189 sinks::perfetto::default_trace_dir(),
1190 &exec_id,
1191 &process_name,
1192 ) {
1193 Ok(sink) => {
1194 sinks.push(Box::new(sink));
1195 }
1196 Err(e) => {
1197 tracing::warn!("failed to create PerfettoFileSink: {}", e);
1198 }
1199 }
1200 }
1201
1202 {
1203 if hyperactor_config::global::get(ENABLE_OTEL_TRACING) {
1204 use crate::meta;
1205 use crate::meta::scuba_utils::LOG_ENTER_EXIT;
1206
1207 if mock_scuba {
1208 let tracing_client = meta::scuba_utils::MockScubaClient::new();
1209
1210 sinks.push(Box::new(
1211 meta::scuba_sink::ScubaSink::with_client(
1212 tracing_client.clone(),
1213 match meta::tracing_resource().get(&LOG_ENTER_EXIT) {
1214 Some(Value::Bool(enabled)) => enabled,
1215 _ => false,
1216 },
1217 )
1218 .with_target_filter(crate::config::get_tracing_targets()),
1219 ));
1220
1221 mock_scuba_client = Some(tracing_client);
1222 } else {
1223 sinks.push(Box::new(
1224 meta::scuba_sink::ScubaSink::new(meta::tracing_resource())
1225 .with_target_filter(crate::config::get_tracing_targets()),
1226 ));
1227 }
1228 }
1229 }
1230
1231 let dispatcher = trace_dispatcher::TraceEventDispatcher::new(sinks);
1232 let synthetic_sender = dispatcher.sender();
1233
1234 if let Err(err) = Registry::default()
1235 .with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1236 Some(recorder().layer())
1237 } else {
1238 None
1239 })
1240 .with(dispatcher)
1241 .try_init()
1242 {
1243 tracing::debug!("logging already initialized for this process: {}", err);
1244 } else {
1245 set_synthetic_trace_event_sender(synthetic_sender);
1246 }
1247 } else {
1248 let (non_blocking, guard) =
1250 tracing_appender::non_blocking::NonBlockingBuilder::default()
1251 .lossy(false)
1252 .finish(writer());
1253 let writer_guard = Arc::new((non_blocking, guard));
1254 let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
1255
1256 let file_layer = fmt::Layer::default()
1257 .with_writer(writer_guard.0.clone())
1258 .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
1259 .fmt_fields(GlogFields::default().compact())
1260 .with_ansi(false)
1261 .with_filter(
1262 Targets::new()
1263 .with_default(LevelFilter::from_level({
1264 let log_level_str = hyperactor_config::global::try_get_cloned(
1265 MONARCH_FILE_LOG_LEVEL,
1266 )
1267 .unwrap_or_else(|| file_log_level.to_string());
1268 tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
1269 tracing::Level::from_str(file_log_level)
1270 .expect("Invalid default log level")
1271 })
1272 }))
1273 .with_target("opentelemetry", LevelFilter::OFF), );
1275
1276 let registry = Registry::default()
1277 .with(if hyperactor_config::global::get(ENABLE_SQLITE_TRACING) {
1278 Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer"))
1281 } else {
1282 None
1283 })
1284 .with(file_layer)
1285 .with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1286 Some(recorder().layer())
1287 } else {
1288 None
1289 });
1290
1291 if mock_scuba {
1292 let tracing_client = crate::meta::scuba_utils::MockScubaClient::new();
1293
1294 let scuba_layer =
1295 crate::meta::tracing_layer_with_client(tracing_client.clone());
1296
1297 if let Err(err) = registry.with(scuba_layer).try_init() {
1298 tracing::debug!("logging already initialized for this process: {}", err);
1299 }
1300
1301 mock_scuba_client = Some(tracing_client);
1302 } else if let Err(err) = registry
1303 .with(if hyperactor_config::global::get(ENABLE_OTEL_TRACING) {
1304 Some(otel::tracing_layer())
1305 } else {
1306 None
1307 })
1308 .try_init()
1309 {
1310 tracing::debug!("logging already initialized for this process: {}", err);
1311 }
1312 }
1313 }
1314 let exec_id = env::execution_id();
1315 let process_name =
1316 std::env::var("HYPERACTOR_PROCESS_NAME").unwrap_or_else(|_| "client".to_string());
1317
1318 tracing::info!(
1320 target: "execution",
1321 execution_id = exec_id,
1322 environment = %env::Env::current(),
1323 args = ?std::env::args(),
1324 build_mode = build_info::BuildInfo::get_build_mode(),
1325 compiler = build_info::BuildInfo::get_compiler(),
1326 compiler_version = build_info::BuildInfo::get_compiler_version(),
1327 buck_rule = build_info::BuildInfo::get_rule(),
1328 package_name = build_info::BuildInfo::get_package_name(),
1329 package_release = build_info::BuildInfo::get_package_release(),
1330 upstream_revision = build_info::BuildInfo::get_upstream_revision(),
1331 revision = build_info::BuildInfo::get_revision(),
1332 process_name = process_name,
1333 "logging_initialized",
1334 );
1335 meta::log_execution_event(
1337 &exec_id,
1338 &env::Env::current().to_string(),
1339 std::env::args().collect(),
1340 build_info::BuildInfo::get_build_mode(),
1341 build_info::BuildInfo::get_compiler(),
1342 build_info::BuildInfo::get_compiler_version(),
1343 build_info::BuildInfo::get_rule(),
1344 build_info::BuildInfo::get_package_name(),
1345 build_info::BuildInfo::get_package_release(),
1346 build_info::BuildInfo::get_upstream_revision(),
1347 build_info::BuildInfo::get_revision(),
1348 &process_name,
1349 );
1350
1351 if hyperactor_config::global::get(ENABLE_OTEL_METRICS) {
1352 otel::init_metrics();
1353 }
1354
1355 if let Some(tracing_client) = mock_scuba_client {
1356 Box::new(MockScubaHandle { tracing_client })
1357 } else {
1358 Box::new(EmptyTestHandle)
1359 }
1360 }
1361 #[cfg(not(all(fbcode_build, target_os = "linux")))]
1362 {
1363 let registry =
1364 Registry::default().with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1365 Some(recorder().layer())
1366 } else {
1367 None
1368 });
1369
1370 if should_install_subscriber {
1371 if use_unified {
1372 let mut sinks: Vec<Box<dyn trace_dispatcher::TraceEventSink>> = Vec::new();
1373
1374 let sqlite_enabled = hyperactor_config::global::get(ENABLE_SQLITE_TRACING);
1375
1376 if sqlite_enabled {
1377 match create_sqlite_sink() {
1378 Ok(sink) => {
1379 sinks.push(Box::new(sink));
1380 }
1381 Err(e) => {
1382 tracing::warn!("failed to create SqliteSink: {}", e);
1383 }
1384 }
1385 }
1386
1387 sinks.push(Box::new(sinks::glog::GlogSink::new(
1388 writer(),
1389 prefix_env_var.clone(),
1390 file_log_level,
1391 )));
1392
1393 if let Some(log_sink) = otlp::otlp_log_sink() {
1394 sinks.push(log_sink);
1395 }
1396
1397 let dispatcher = trace_dispatcher::TraceEventDispatcher::new(sinks);
1398 let synthetic_sender = dispatcher.sender();
1399
1400 if let Err(err) = registry.with(dispatcher).try_init() {
1401 tracing::debug!("logging already initialized for this process: {}", err);
1402 } else {
1403 set_synthetic_trace_event_sender(synthetic_sender);
1404 }
1405 } else {
1406 let (non_blocking, guard) =
1407 tracing_appender::non_blocking::NonBlockingBuilder::default()
1408 .lossy(false)
1409 .finish(writer());
1410 let writer_guard = Arc::new((non_blocking, guard));
1411 let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
1412
1413 let file_layer = fmt::Layer::default()
1414 .with_writer(writer_guard.0.clone())
1415 .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
1416 .fmt_fields(GlogFields::default().compact())
1417 .with_ansi(false)
1418 .with_filter(
1419 Targets::new()
1420 .with_default(LevelFilter::from_level({
1421 let log_level_str = hyperactor_config::global::try_get_cloned(
1422 MONARCH_FILE_LOG_LEVEL,
1423 )
1424 .unwrap_or_else(|| file_log_level.to_string());
1425 tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
1426 tracing::Level::from_str(file_log_level)
1427 .expect("Invalid default log level")
1428 })
1429 }))
1430 .with_target("opentelemetry", LevelFilter::OFF), );
1432
1433 if let Err(err) = registry.with(file_layer).try_init() {
1434 tracing::debug!("logging already initialized for this process: {}", err);
1435 }
1436 }
1437 }
1438
1439 otel::init_metrics();
1440
1441 Box::new(EmptyTestHandle)
1442 }
1443}
1444
1445fn create_sqlite_sink() -> anyhow::Result<sinks::sqlite::SqliteSink> {
1446 let (db_path, _) = log_file_path(env::Env::current(), Some("traces"))
1447 .expect("failed to determine trace db path");
1448 let db_file = format!("{}/hyperactor_trace_{}.db", db_path, std::process::id());
1449
1450 sinks::sqlite::SqliteSink::new_with_file(&db_file, 100)
1451}
1452
1453#[macro_export]
1466macro_rules! context_span {
1467 (target: $target:expr, parent: $parent:expr, $name:expr, $($field:tt)*) => {
1468 ::tracing::error_span!(
1469 target: $target,
1470 parent: $parent,
1471 $name,
1472 skip_record = $crate::skip_record,
1473 $($field)*
1474 )
1475 };
1476 (target: $target:expr, parent: $parent:expr, $name:expr) => {
1477 ::tracing::error_span!(
1478 target: $target,
1479 parent: $parent,
1480 $name,
1481 skip_record = $crate::skip_record,
1482 )
1483 };
1484 (parent: $parent:expr, $name:expr, $($field:tt)*) => {
1485 ::tracing::error_span!(
1486 target: module_path!(),
1487 parent: $parent,
1488 $name,
1489 skip_record = $crate::skip_record,
1490 $($field)*
1491 )
1492 };
1493 (parent: $parent:expr, $name:expr) => {
1494 ::tracing::error_span!(
1495 parent: $parent,
1496 $name,
1497 skip_record = $crate::skip_record,
1498 )
1499 };
1500 (target: $target:expr, $name:expr, $($field:tt)*) => {
1501 ::tracing::error_span!(
1502 target: $target,
1503 $name,
1504 skip_record = $crate::skip_record,
1505 $($field)*
1506 )
1507 };
1508 (target: $target:expr, $name:expr) => {
1509 ::tracing::error_span!(
1510 target: $target,
1511 $name,
1512 skip_record = $crate::skip_record,
1513 )
1514 };
1515 ($name:expr, $($field:tt)*) => {
1516 ::tracing::error_span!(
1517 target: module_path!(),
1518 $name,
1519 skip_record = $crate::skip_record,
1520 $($field)*
1521 )
1522 };
1523 ($name:expr) => {
1524 ::tracing::error_span!(
1525 $name,
1526 skip_record = $crate::skip_record,
1527 )
1528 };
1529}
1530
1531pub mod env {
1532 pub const HYPERACTOR_EXECUTION_ID_ENV: &str = "HYPERACTOR_EXECUTION_ID";
1534 pub const OTEL_EXPORTER: &str = "HYPERACTOR_OTEL_EXPORTER";
1535 pub const MAST_ENVIRONMENT: &str = "MAST_ENVIRONMENT";
1536
1537 pub fn execution_id() -> String {
1544 let id = std::env::var(HYPERACTOR_EXECUTION_ID_ENV).unwrap_or_else(|_| {
1545 let username = crate::username();
1547 let now = {
1548 let now = std::time::SystemTime::now();
1549 let datetime: chrono::DateTime<chrono::Local> = now.into();
1550 datetime.format("%b-%d_%H:%M").to_string()
1551 };
1552 let random_number: u16 = (rand::random::<u32>() % 1000) as u16;
1553 let execution_id = format!("{}_{}_{}", username, now, random_number);
1554 execution_id
1555 });
1556 unsafe {
1559 std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, id.clone());
1560 }
1561 id
1562 }
1563
1564 #[derive(PartialEq)]
1565 pub enum Env {
1566 Local,
1567 Mast,
1568 MastEmulator,
1569 Test,
1570 }
1571
1572 impl std::fmt::Display for Env {
1573 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1574 write!(
1575 f,
1576 "{}",
1577 match self {
1578 Self::Local => crate::ENV_VALUE_LOCAL,
1579 Self::MastEmulator => crate::ENV_VALUE_MAST_EMULATOR,
1580 Self::Mast => crate::ENV_VALUE_MAST,
1581 Self::Test => crate::ENV_VALUE_TEST,
1582 }
1583 )
1584 }
1585 }
1586
1587 impl Env {
1588 #[cfg(test)]
1589 pub fn current() -> Self {
1590 Self::Test
1591 }
1592
1593 #[cfg(not(test))]
1594 pub fn current() -> Self {
1595 match std::env::var(MAST_ENVIRONMENT).unwrap_or_default().as_str() {
1596 crate::ENV_VALUE_LOCAL_MAST_SIMULATOR => Self::MastEmulator,
1598 _ => match std::env::var(crate::MAST_HPC_JOB_NAME_ENV).is_ok() {
1599 true => Self::Mast,
1600 false => Self::Local,
1601 },
1602 }
1603 }
1604 }
1605}
1606
1607#[cfg(test)]
1608mod test {
1609 use opentelemetry::*;
1610 extern crate self as hyperactor_telemetry;
1611 use super::*;
1612
1613 #[test]
1614 fn infer_kv_pair_types() {
1615 assert_eq!(
1616 key_value!("str", "str"),
1617 KeyValue::new(Key::new("str"), Value::String("str".into()))
1618 );
1619 assert_eq!(
1620 key_value!("str", 25),
1621 KeyValue::new(Key::new("str"), Value::I64(25))
1622 );
1623 assert_eq!(
1624 key_value!("str", 1.1),
1625 KeyValue::new(Key::new("str"), Value::F64(1.1))
1626 );
1627 }
1628 #[test]
1629 fn kv_pair_slices() {
1630 assert_eq!(
1631 kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
1632 &[
1633 key_value!("1", "1"),
1634 key_value!("2", 2),
1635 key_value!("3", 3.0),
1636 ],
1637 );
1638 }
1639
1640 #[test]
1641 fn test_static_gauge() {
1642 declare_static_gauge!(TEST_GAUGE, "test_gauge");
1644 declare_static_gauge!(MEMORY_GAUGE, "memory_usage");
1645
1646 TEST_GAUGE.record(42.5, kv_pairs!("component" => "test", "unit" => "MB"));
1649 MEMORY_GAUGE.record(512.0, kv_pairs!("type" => "heap", "process" => "test"));
1650
1651 TEST_GAUGE.record(50.0, &[]);
1653 }
1654}