hyperactor_telemetry/
lib.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(internal_features)]
10#![feature(assert_matches)]
11#![feature(sync_unsafe_cell)]
12#![feature(mpmc_channel)]
13#![feature(cfg_version)]
14#![feature(formatting_options)]
15#![recursion_limit = "256"]
16
17// Environment variable for job name (used for environment detection)
18pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME";
19
20// Log level constants
21const LOG_LEVEL_INFO: &str = "info";
22const LOG_LEVEL_DEBUG: &str = "debug";
23
24// Span field constants
25const SPAN_FIELD_RECORDING: &str = "recording";
26#[allow(dead_code)]
27const SPAN_FIELD_RECORDER: &str = "recorder";
28
29// Environment value constants
30const ENV_VALUE_LOCAL: &str = "local";
31const ENV_VALUE_MAST_EMULATOR: &str = "mast_emulator";
32const ENV_VALUE_MAST: &str = "mast";
33const ENV_VALUE_TEST: &str = "test";
34#[allow(dead_code)]
35const ENV_VALUE_LOCAL_MAST_SIMULATOR: &str = "local_mast_simulator";
36
37/// A marker field used to indicate that a span should not be recorded as
38/// individual start/end span events; rather the span is purely used to
39/// provide context for child events.
40///
41/// Note that the mechanism for skipping span recording uses the precise
42/// name "skip_record", thus it must be used as a naked identifier:
43/// ```ignore
44/// use hyperactor_telemetry::skip_record;
45///
46/// tracing::span!(..., skip_record);
47/// ```
48#[allow(non_upper_case_globals)]
49// pub const skip_record: tracing::field::Empty = tracing::field::Empty;
50pub const skip_record: bool = true;
51
52mod config;
53pub mod in_memory_reader;
54#[cfg(fbcode_build)]
55mod meta;
56mod otel;
57pub(crate) mod otlp;
58mod pool;
59mod rate_limit;
60pub mod recorder;
61pub mod sinks;
62mod spool;
63pub mod sqlite;
64pub mod task;
65pub mod trace;
66pub mod trace_dispatcher;
67
68// Re-export key types for external sink implementations
69use std::collections::hash_map::DefaultHasher;
70use std::hash::Hash;
71use std::hash::Hasher;
72use std::io::Write;
73use std::str::FromStr;
74use std::sync::Arc;
75use std::sync::Mutex;
76use std::sync::atomic::AtomicU64;
77use std::sync::atomic::Ordering;
78use std::sync::mpsc;
79use std::time::Instant;
80use std::time::SystemTime;
81
82use lazy_static::lazy_static;
83pub use opentelemetry;
84pub use opentelemetry::Key;
85pub use opentelemetry::KeyValue;
86pub use opentelemetry::Value;
87pub use opentelemetry::global::meter;
88pub use trace_dispatcher::DispatcherControl;
89pub use trace_dispatcher::FieldValue;
90pub use trace_dispatcher::TraceEvent;
91pub use trace_dispatcher::TraceEventSink;
92pub use tracing;
93pub use tracing::Level;
94use tracing_appender::non_blocking::NonBlocking;
95use tracing_appender::non_blocking::WorkerGuard;
96use tracing_appender::rolling::RollingFileAppender;
97use tracing_glog::Glog;
98use tracing_glog::GlogFields;
99use tracing_glog::LocalTime;
100use tracing_subscriber::Layer;
101use tracing_subscriber::filter::LevelFilter;
102use tracing_subscriber::filter::Targets;
103use tracing_subscriber::fmt;
104use tracing_subscriber::fmt::FormatEvent;
105use tracing_subscriber::fmt::FormatFields;
106use tracing_subscriber::fmt::format::Writer;
107use tracing_subscriber::registry::LookupSpan;
108
109use crate::config::ENABLE_OTEL_METRICS;
110use crate::config::ENABLE_OTEL_TRACING;
111use crate::config::ENABLE_RECORDER_TRACING;
112use crate::config::ENABLE_SQLITE_TRACING;
113use crate::config::MONARCH_FILE_LOG_LEVEL;
114use crate::config::MONARCH_LOG_SUFFIX;
115use crate::config::USE_UNIFIED_LAYER;
116use crate::recorder::Recorder;
117use crate::sqlite::get_reloadable_sqlite_layer;
118
119/// Hash any hashable value to a u64 using DefaultHasher.
120pub fn hash_to_u64(value: &impl Hash) -> u64 {
121    let mut hasher = DefaultHasher::new();
122    value.hash(&mut hasher);
123    hasher.finish()
124}
125
126#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
127pub struct TelemetrySample {
128    fields: Vec<(String, String)>,
129}
130
131impl TelemetrySample {
132    pub fn get_string(&self, key: &str) -> Option<&str> {
133        for (k, v) in &self.fields {
134            if k == key {
135                return Some(v.as_str());
136            }
137        }
138        None
139    }
140}
141
142#[cfg(fbcode_build)]
143impl From<crate::meta::sample_buffer::Sample> for TelemetrySample {
144    fn from(sample: crate::meta::sample_buffer::Sample) -> Self {
145        let mut fields = Vec::new();
146        for (key, value) in sample.0 {
147            if let crate::meta::sample_buffer::SampleValue::String(s) = value {
148                fields.push((key.to_string(), s.to_string()));
149            }
150        }
151        Self { fields }
152    }
153}
154
155#[cfg(not(fbcode_build))]
156impl TelemetrySample {
157    pub fn new() -> Self {
158        Self { fields: Vec::new() }
159    }
160}
161
162pub trait TelemetryTestHandle {
163    fn get_tracing_samples(&self) -> Vec<TelemetrySample>;
164}
165
166#[cfg(fbcode_build)]
167struct MockScubaHandle {
168    tracing_client: crate::meta::scuba_utils::MockScubaClient,
169}
170
171#[cfg(fbcode_build)]
172impl TelemetryTestHandle for MockScubaHandle {
173    fn get_tracing_samples(&self) -> Vec<TelemetrySample> {
174        self.tracing_client
175            .get_samples()
176            .into_iter()
177            .map(TelemetrySample::from)
178            .collect()
179    }
180}
181
182struct EmptyTestHandle;
183
184impl TelemetryTestHandle for EmptyTestHandle {
185    fn get_tracing_samples(&self) -> Vec<TelemetrySample> {
186        vec![]
187    }
188}
189
190pub trait TelemetryClock {
191    fn now(&self) -> tokio::time::Instant;
192    fn system_time_now(&self) -> std::time::SystemTime;
193}
194
195pub struct DefaultTelemetryClock {}
196
197impl TelemetryClock for DefaultTelemetryClock {
198    fn now(&self) -> tokio::time::Instant {
199        tokio::time::Instant::now()
200    }
201
202    fn system_time_now(&self) -> std::time::SystemTime {
203        std::time::SystemTime::now()
204    }
205}
206
207pub fn username() -> String {
208    let env = env::Env::current();
209    match env {
210        env::Env::Mast => {
211            std::env::var("MAST_JOB_OWNER_UNIXNAME").unwrap_or_else(|_| "mast_owner".to_string())
212        }
213        _ => whoami::username(),
214    }
215}
216
217// Given an environment, determine the log file path to write to.
218// If a suffix is provided, it will be prepended with "_" and then appended to file name
219pub fn log_file_path(
220    env: env::Env,
221    suffix: Option<&str>,
222) -> Result<(String, String), anyhow::Error> {
223    let suffix = suffix
224        .map(|s| {
225            if s.is_empty() {
226                String::new()
227            } else {
228                format!("_{}", s)
229            }
230        })
231        .unwrap_or_default();
232    match env {
233        env::Env::Local | env::Env::MastEmulator => {
234            let username = if whoami::username().is_empty() {
235                "monarch".to_string()
236            } else {
237                whoami::username()
238            };
239            Ok((
240                format!("/tmp/{}", username),
241                format!("monarch_log{}", suffix),
242            ))
243        }
244        env::Env::Mast => Ok((
245            "/logs/".to_string(),
246            format!("dedicated_log_monarch{}", suffix),
247        )),
248        _ => Err(anyhow::anyhow!(
249            "file writer unsupported for environment {}",
250            env
251        )),
252    }
253}
254
255fn try_create_appender(
256    path: &str,
257    filename: &str,
258    create_dir: bool,
259) -> Result<RollingFileAppender, Box<dyn std::error::Error>> {
260    if create_dir {
261        std::fs::create_dir_all(path)?;
262    }
263    Ok(RollingFileAppender::builder()
264        .filename_prefix(filename)
265        .filename_suffix("log")
266        .build(path)?)
267}
268
269fn writer() -> Box<dyn Write + Send> {
270    match env::Env::current() {
271        env::Env::Test => Box::new(std::io::stderr()),
272        env::Env::Local | env::Env::MastEmulator | env::Env::Mast => {
273            let suffix = hyperactor_config::global::try_get_cloned(MONARCH_LOG_SUFFIX);
274            let (path, filename) = log_file_path(env::Env::current(), suffix.as_deref()).unwrap();
275            match try_create_appender(&path, &filename, true) {
276                Ok(file_appender) => Box::new(file_appender),
277                Err(e) => {
278                    eprintln!(
279                        "unable to create log file in {}: {}. Falling back to stderr",
280                        path, e
281                    );
282                    Box::new(std::io::stderr())
283                }
284            }
285        }
286    }
287}
288
289lazy_static! {
290    static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
291        Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {})));
292    /// Global control channel for sink registration.
293    /// Created upfront so sinks can be registered at any time (before or after telemetry init).
294    /// The receiver is taken once when the TraceEventDispatcher is created.
295    static ref SINK_CONTROL_CHANNEL: (
296        mpsc::Sender<DispatcherControl>,
297        Mutex<Option<mpsc::Receiver<DispatcherControl>>>
298    ) = {
299        let (sender, receiver) = mpsc::channel();
300        (sender, Mutex::new(Some(receiver)))
301    };
302    /// Global unified entity event dispatcher with pre-registration buffering.
303    /// Events emitted before a dispatcher is registered are buffered and replayed
304    /// when `set_entity_dispatcher` is called. This ensures bootstrap actors
305    /// (e.g., HostAgent and ProcAgent) are captured even though they are spawned before the
306    /// telemetry system is initialized.
307    static ref ENTITY_EVENT_STATE: Mutex<EntityEventState> = Mutex::new(
308        EntityEventState::Buffering(Vec::new())
309    );
310}
311
312/// State machine for the entity event dispatcher.
313/// Starts in `Buffering`, collecting events until a dispatcher is registered.
314/// Transitions to `Dispatching` on `set_entity_dispatcher`, replaying buffered
315/// events and dropping the buffer so it cannot accumulate further.
316enum EntityEventState {
317    Buffering(Vec<EntityEvent>),
318    Dispatching(Box<dyn EntityEventDispatcher>),
319}
320
321/// Event data for actor creation.
322/// This is passed to EntityEventDispatcher implementations when an actor is spawned.
323#[derive(Debug, Clone)]
324pub struct ActorEvent {
325    /// Unique identifier for this actor (hashed from ActorId)
326    pub id: u64,
327    /// Timestamp when the actor was created
328    pub timestamp: SystemTime,
329    /// ID of the mesh this actor belongs to (hashed from actor_name)
330    pub mesh_id: u64,
331    /// Rank index into the mesh shape
332    pub rank: u64,
333    /// Full hierarchical name of this actor
334    pub full_name: String,
335    /// User-facing name for this actor
336    pub display_name: Option<String>,
337}
338
339/// Notify the registered dispatcher that an actor was created.
340/// If no dispatcher is registered yet, the event is buffered and will be
341/// replayed when `set_entity_dispatcher` is called.
342pub fn notify_actor_created(event: ActorEvent) {
343    dispatch_or_buffer(EntityEvent::Actor(event));
344}
345
346/// Event data for mesh creation.
347/// This is passed to EntityEventDispatcher implementations when a mesh is spawned.
348#[derive(Debug, Clone)]
349pub struct MeshEvent {
350    /// Unique identifier for this mesh (hashed)
351    pub id: u64,
352    /// Timestamp when the mesh was created
353    pub timestamp: SystemTime,
354    /// Mesh class (e.g., "Proc", "Host", "Python<SomeUserDefinedActor>")
355    pub class: String,
356    /// User-provided name for this mesh
357    pub given_name: String,
358    /// Full hierarchical name as it appears in supervision events
359    pub full_name: String,
360    /// Shape of the mesh, serialized from ndslice::Extent
361    pub shape_json: String,
362    /// Parent mesh ID (None for root meshes)
363    pub parent_mesh_id: Option<u64>,
364    /// Region over which the parent spawned this mesh, serialized from ndslice::Region
365    pub parent_view_json: Option<String>,
366}
367
368/// Notify the registered dispatcher that a mesh was created.
369/// If no dispatcher is registered yet, the event is buffered and will be
370/// replayed when `set_entity_dispatcher` is called.
371pub fn notify_mesh_created(event: MeshEvent) {
372    dispatch_or_buffer(EntityEvent::Mesh(event));
373}
374
375/// Event data for actor status changes.
376/// This is passed to EntityEventDispatcher implementations when an actor changes status.
377#[derive(Debug, Clone)]
378pub struct ActorStatusEvent {
379    /// Unique identifier for this event
380    pub id: u64,
381    /// Timestamp when the status change occurred
382    pub timestamp: SystemTime,
383    /// ID of the actor whose status changed
384    pub actor_id: u64,
385    /// New status value (e.g. "Created", "Idle", "Failed")
386    pub new_status: String,
387    /// Reason for the status change (e.g. error details for Failed)
388    pub reason: Option<String>,
389}
390
391/// Notify the registered dispatcher that an actor changed status.
392/// If no dispatcher is registered yet, the event is buffered and will be
393/// replayed when `set_entity_dispatcher` is called.
394pub fn notify_actor_status_changed(event: ActorStatusEvent) {
395    dispatch_or_buffer(EntityEvent::ActorStatus(event));
396}
397
398/// Event fired when a message is sent to an actor mesh.
399///
400/// Emitted from `cast_with_selection` in `actor_mesh.rs`, which is the common
401/// path for all Python send methods: `call`, `call_one`, `broadcast`, and `choose`.
402#[derive(Debug, Clone)]
403pub struct SentMessageEvent {
404    pub timestamp: SystemTime,
405    /// Hash of the sending actor's [`ActorId`].
406    pub sender_actor_id: u64,
407    /// Hash of the target actor mesh's name.
408    pub actor_mesh_id: u64,
409    /// The view (slice) of the actor mesh that was targeted, serialized from
410    /// [`ndslice::Region`]. For full-mesh sends (call, broadcast) this covers
411    /// all dimensions; for sliced sends (call_one) collapsed dimensions are
412    /// absent; for choose this is a scalar (0-dim) Region.
413    pub view_json: String,
414    /// The shape of the view, serialized from [`ndslice::Shape`] (converted
415    /// from the view Region via `Region::into::<Shape>`).
416    pub shape_json: String,
417}
418
419/// Notify the registered dispatcher that a message was sent.
420/// If no dispatcher is registered yet, the event is buffered and will be
421/// replayed when `set_entity_dispatcher` is called.
422pub fn notify_sent_message(event: SentMessageEvent) {
423    dispatch_or_buffer(EntityEvent::SentMessage(event));
424}
425
426/// Event fired when a message is received (from receiver's perspective).
427#[derive(Debug, Clone)]
428pub struct MessageEvent {
429    pub timestamp: SystemTime,
430    /// Unique identifier for this received message.
431    pub id: u64,
432    /// Hash of sender's ActorId.
433    pub from_actor_id: u64,
434    /// Hash of receiver's ActorId.
435    pub to_actor_id: u64,
436    /// Endpoint name if this message targets a specific actor endpoint
437    pub endpoint: Option<String>,
438    /// Destination port ID
439    pub port_id: Option<u64>,
440}
441
442/// Notify the registered dispatcher that a message was received.
443pub fn notify_message(event: MessageEvent) {
444    dispatch_or_buffer(EntityEvent::Message(event));
445}
446
447/// Event fired when a received message changes status.
448#[derive(Debug, Clone)]
449pub struct MessageStatusEvent {
450    pub timestamp: SystemTime,
451    /// Unique identifier for this status event.
452    pub id: u64,
453    /// The message whose status changed (FK to MessageEvent.id).
454    pub message_id: u64,
455    /// New status: "queued", "active", or "complete".
456    pub status: String,
457}
458
459/// Notify the registered dispatcher that a message changed status.
460pub fn notify_message_status(event: MessageStatusEvent) {
461    dispatch_or_buffer(EntityEvent::MessageStatus(event));
462}
463
464static ACTOR_STATUS_SEQ: AtomicU64 = AtomicU64::new(1);
465
466/// Generate a globally unique ActorStatusEvent ID.
467///
468/// Combines the actor's unique ID with a process-local sequence number,
469/// then hashes the pair to produce an ID that is unique across processes.
470pub fn generate_actor_status_event_id(actor_id: u64) -> u64 {
471    let seq = ACTOR_STATUS_SEQ.fetch_add(1, Ordering::Relaxed);
472    hash_to_u64(&(actor_id, seq))
473}
474
475static SEND_SEQ: AtomicU64 = AtomicU64::new(1);
476
477/// Generate a globally unique SentMessage ID.
478pub fn generate_sent_message_id(sender_actor_id: u64) -> u64 {
479    let seq = SEND_SEQ.fetch_add(1, Ordering::Relaxed);
480    hash_to_u64(&(sender_actor_id, seq))
481}
482
483static RECV_MSG_SEQ: AtomicU64 = AtomicU64::new(1);
484
485/// Generate a unique received-message ID (cross-process unique).
486///
487/// Hashes (to_actor_id, seq) following the same pattern as
488/// `generate_sent_message_id`.
489pub fn generate_message_id(to_actor_id: u64) -> u64 {
490    let seq = RECV_MSG_SEQ.fetch_add(1, Ordering::Relaxed);
491    hash_to_u64(&(to_actor_id, seq))
492}
493
494static STATUS_EVENT_SEQ: AtomicU64 = AtomicU64::new(1);
495
496/// Generate a unique message-status-event ID (cross-process unique).
497///
498/// Hashes (message_id, seq) following the same pattern as
499/// `generate_sent_message_id`.
500pub fn generate_status_event_id(message_id: u64) -> u64 {
501    let seq = STATUS_EVENT_SEQ.fetch_add(1, Ordering::Relaxed);
502    hash_to_u64(&(message_id, seq))
503}
504
505/// Unified event enum for all entity lifecycle events.
506///
507/// This enum wraps all entity events (actors, meshes, and future event types)
508/// into a single type. This enables a single sink to handle all entity events,
509/// simplifying the registration and notification infrastructure.
510#[derive(Debug, Clone)]
511pub enum EntityEvent {
512    /// An actor was created.
513    Actor(ActorEvent),
514    /// A mesh was created.
515    Mesh(MeshEvent),
516    /// An actor changed status.
517    ActorStatus(ActorStatusEvent),
518    /// A message was sent.
519    SentMessage(SentMessageEvent),
520    /// A message was received.
521    Message(MessageEvent),
522    /// A received message changed status.
523    MessageStatus(MessageStatusEvent),
524}
525
526/// Trait for dispatchers that receive unified entity events.
527///
528/// This is the preferred way to receive entity lifecycle events. Implement this
529/// trait and register with `set_entity_dispatcher` to receive notifications for
530/// all entity types (actors, meshes, etc.) through a single callback.
531///
532/// The dispatcher pattern routes events to appropriate handlers based on the
533/// event type (Actor, Mesh, etc.), distinguishing this from TraceEventSink
534/// which handles tracing spans and events.
535///
536/// # Example
537/// ```ignore
538/// use hyperactor_telemetry::{set_entity_dispatcher, EntityEventDispatcher, EntityEvent};
539///
540/// struct MyEntityDispatcher;
541/// impl EntityEventDispatcher for MyEntityDispatcher {
542///     fn dispatch(&self, event: EntityEvent) -> Result<(), anyhow::Error> {
543///         match event {
544///             EntityEvent::Actor(actor) => println!("Actor: {}", actor.full_name),
545///             EntityEvent::Mesh(mesh) => println!("Mesh: {}", mesh.full_name),
546///             EntityEvent::ActorStatus(status) => println!("Status: {}", status.new_status),
547///             EntityEvent::SentMessage(msg) => println!("Sent: {}", msg.id),
548///             EntityEvent::Message(msg) => println!("Recv: {}", msg.id),
549///             EntityEvent::MessageStatus(s) => println!("Status: {}", s.status),
550///         }
551///         Ok(())
552///     }
553/// }
554///
555/// set_entity_dispatcher(Box::new(MyEntityDispatcher));
556/// ```
557pub trait EntityEventDispatcher: Send + Sync {
558    /// Dispatch an entity event to the appropriate handler.
559    fn dispatch(&self, event: EntityEvent) -> Result<(), anyhow::Error>;
560}
561
562/// Dispatch an entity event to the registered dispatcher, or buffer it if none is set.
563fn dispatch_or_buffer(event: EntityEvent) {
564    if let Ok(mut state) = ENTITY_EVENT_STATE.lock() {
565        match &mut *state {
566            EntityEventState::Dispatching(d) => {
567                if let Err(e) = d.dispatch(event) {
568                    tracing::error!("failed to dispatch entity event: {:?}", e);
569                }
570            }
571            EntityEventState::Buffering(buf) => {
572                // TODO: Disable buffer cap once dispatcher is enabled by default.
573                const MAX_BUFFERED_EVENTS: usize = 1000;
574                if buf.len() < MAX_BUFFERED_EVENTS {
575                    buf.push(event);
576                    if buf.len() == MAX_BUFFERED_EVENTS {
577                        tracing::warn!(
578                            "entity event buffer full ({MAX_BUFFERED_EVENTS}); \
579                             dropping further events until a dispatcher is registered"
580                        );
581                    }
582                }
583            }
584        }
585    }
586}
587
588/// Set the dispatcher to receive all entity events.
589///
590/// Any events that were emitted before this call are replayed to the new dispatcher
591/// in order. All subsequent events are dispatched directly.
592///
593/// Note: Only one dispatcher is supported. Setting a new dispatcher replaces any
594/// previously set dispatcher.
595pub fn set_entity_dispatcher(dispatcher: Box<dyn EntityEventDispatcher>) {
596    if let Ok(mut state) = ENTITY_EVENT_STATE.lock() {
597        // Take buffered events if transitioning from Buffering; empty vec otherwise.
598        let buffered =
599            match std::mem::replace(&mut *state, EntityEventState::Dispatching(dispatcher)) {
600                EntityEventState::Buffering(buf) => buf,
601                EntityEventState::Dispatching(_) => Vec::new(),
602            };
603        for event in buffered {
604            if let EntityEventState::Dispatching(d) = &*state {
605                if let Err(e) = d.dispatch(event) {
606                    tracing::error!("failed to dispatch buffered entity event: {:?}", e);
607                }
608            }
609        }
610    }
611}
612
613/// Register a sink to receive trace events.
614/// This can be called at any time - before or after telemetry initialization.
615/// The sink will receive all trace events on the background worker thread.
616///
617/// # Example
618/// ```ignore
619/// use hyperactor_telemetry::{register_sink, TraceEventSink, TraceEvent};
620///
621/// struct MySink;
622/// impl TraceEventSink for MySink {
623///     fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> { Ok(()) }
624///     fn flush(&mut self) -> Result<(), anyhow::Error> { Ok(()) }
625/// }
626///
627/// register_sink(Box::new(MySink));
628/// ```
629pub fn register_sink(sink: Box<dyn TraceEventSink>) {
630    let sender = &SINK_CONTROL_CHANNEL.0;
631    if let Err(e) = sender.send(DispatcherControl::AddSink(sink)) {
632        eprintln!("[telemetry] failed to register sink: {}", e);
633    }
634}
635
636/// Take the control receiver for use by the TraceEventDispatcher.
637/// This can only be called once; subsequent calls return None.
638pub(crate) fn take_sink_control_receiver() -> Option<mpsc::Receiver<DispatcherControl>> {
639    SINK_CONTROL_CHANNEL.1.lock().unwrap().take()
640}
641
642/// The recorder singleton that is configured as a layer in the the default tracing
643/// subscriber, as configured by `initialize_logging`.
644pub fn recorder() -> &'static Recorder {
645    static RECORDER: std::sync::OnceLock<Recorder> = std::sync::OnceLock::new();
646    RECORDER.get_or_init(Recorder::new)
647}
648
649/// Hotswap the telemetry clock at runtime. This allows changing the clock implementation
650/// after initialization, which is useful for testing or switching between real and simulated time.
651pub fn swap_telemetry_clock(clock: impl TelemetryClock + Send + 'static) {
652    *TELEMETRY_CLOCK.lock().unwrap() = Box::new(clock);
653}
654
655/// Create key value pairs for use in opentelemetry. These pairs can be stored and used multiple
656/// times. Opentelemetry adds key value attributes when you bump counters and histograms.
657/// so MY_COUNTER.add(42, &[key_value!("key", "value")])  and MY_COUNTER.add(42, &[key_value!("key", "other_value")]) will actually bump two separete counters.
658#[macro_export]
659macro_rules! key_value {
660    ($key:expr, $val:expr) => {
661        $crate::opentelemetry::KeyValue::new(
662            $crate::opentelemetry::Key::new($key),
663            $crate::opentelemetry::Value::from($val),
664        )
665    };
666}
667/// Construct the key value attribute slice using mapping syntax.
668/// Example:
669/// ```
670/// # #[macro_use] extern crate hyperactor_telemetry;
671/// # fn main() {
672/// assert_eq!(
673///     kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
674///     &[
675///         key_value!("1", "1"),
676///         key_value!("2", 2),
677///         key_value!("3", 3.0),
678///     ],
679/// );
680/// # }
681/// ```
682#[macro_export]
683macro_rules! kv_pairs {
684    ($($k:expr => $v:expr),* $(,)?) => {
685        &[$($crate::key_value!($k, $v),)*]
686    };
687}
688
689#[derive(Debug, Clone, Copy)]
690pub enum TimeUnit {
691    Millis,
692    Micros,
693    Nanos,
694}
695
696impl TimeUnit {
697    pub fn as_str(&self) -> &'static str {
698        match self {
699            TimeUnit::Millis => "ms",
700            TimeUnit::Micros => "us",
701            TimeUnit::Nanos => "ns",
702        }
703    }
704}
705pub struct Timer(opentelemetry::metrics::Histogram<u64>, TimeUnit);
706
707impl<'a> Timer {
708    pub fn new(data: opentelemetry::metrics::Histogram<u64>, unit: TimeUnit) -> Self {
709        Timer(data, unit)
710    }
711    pub fn start(&'static self, pairs: &'a [opentelemetry::KeyValue]) -> TimerGuard<'a> {
712        TimerGuard {
713            data: self,
714            pairs,
715            start: Instant::now(),
716        }
717    }
718
719    pub fn record(&'static self, dur: std::time::Duration, pairs: &'a [opentelemetry::KeyValue]) {
720        let dur = match self.1 {
721            TimeUnit::Millis => dur.as_millis(),
722            TimeUnit::Micros => dur.as_micros(),
723            TimeUnit::Nanos => dur.as_nanos(),
724        } as u64;
725
726        self.0.record(dur, pairs);
727    }
728}
729pub struct TimerGuard<'a> {
730    data: &'static Timer,
731    pairs: &'a [opentelemetry::KeyValue],
732    start: Instant,
733}
734
735impl<'a> Drop for TimerGuard<'a> {
736    fn drop(&mut self) {
737        let now = Instant::now();
738        let dur = now.duration_since(self.start);
739        self.data.record(dur, self.pairs);
740    }
741}
742
743/// Create a thread safe static timer that can be used to measure durations.
744/// This macro creates a histogram with predefined boundaries appropriate for the specified time unit.
745/// Supported units are "ms" (milliseconds), "us" (microseconds), and "ns" (nanoseconds).
746///
747/// Example:
748/// ```
749/// # #[macro_use] extern crate hyperactor_telemetry;
750/// # fn main() {
751/// declare_static_timer!(REQUEST_TIMER, "request_processing_time", hyperactor_telemetry::TimeUnit::Millis);
752///
753/// {
754///     let _ = REQUEST_TIMER.start(kv_pairs!("endpoint" => "/api/users", "method" => "GET"));
755///     // do something expensive
756/// }
757/// # }
758/// ```
759#[macro_export]
760macro_rules! declare_static_timer {
761    ($name:ident, $key:expr, $unit:path) => {
762        #[doc = "a global histogram timer named: "]
763        #[doc = $key]
764        pub static $name: std::sync::LazyLock<$crate::Timer> = std::sync::LazyLock::new(|| {
765            $crate::Timer::new(
766                $crate::meter(module_path!())
767                    .u64_histogram(format!("{}.{}", $key, $unit.as_str()))
768                    .with_unit($unit.as_str())
769                    .build(),
770                $unit,
771            )
772        });
773    };
774}
775
776/// Create a thread safe static counter that can be incremeneted or decremented.
777/// This is useful to avoid creating temporary counters.
778/// You can safely create counters with the same name. They will be joined by the underlying
779/// runtime and are thread safe.
780///
781/// Example:
782/// ```
783/// struct Url {
784///     pub path: String,
785///     pub proto: String,
786/// }
787///
788/// # #[macro_use] extern crate hyperactor_telemetry;
789/// # fn main() {
790/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
791/// declare_static_counter!(REQUESTS_RECEIVED, "requests_received");
792///
793/// REQUESTS_RECEIVED.add(40, kv_pairs!("path" => url.path, "proto" => url.proto))
794///
795/// # }
796/// ```
797#[macro_export]
798macro_rules! declare_static_counter {
799    ($name:ident, $key:expr) => {
800        #[doc = "a global counter named: "]
801        #[doc = $key]
802        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
803            std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
804    };
805}
806
807/// Create a thread safe static counter that can be incremeneted or decremented.
808/// This is useful to avoid creating temporary counters.
809/// You can safely create counters with the same name. They will be joined by the underlying
810/// runtime and are thread safe.
811///
812/// Example:
813/// ```
814/// struct Url {
815///     pub path: String,
816///     pub proto: String,
817/// }
818///
819/// # #[macro_use] extern crate hyperactor_telemetry;
820/// # fn main() {
821/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
822/// declare_static_counter!(REQUESTS_RECEIVED, "requests_received");
823///
824/// REQUESTS_RECEIVED.add(40, kv_pairs!("path" => url.path, "proto" => url.proto))
825///
826/// # }
827/// ```
828#[macro_export]
829macro_rules! declare_static_up_down_counter {
830    ($name:ident, $key:expr) => {
831        #[doc = "a global up down counter named: "]
832        #[doc = $key]
833        pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
834            std::sync::LazyLock::new(|| {
835                $crate::meter(module_path!())
836                    .i64_up_down_counter($key)
837                    .build()
838            });
839    };
840}
841
842/// Create a thread safe static gauge that can be set to a specific value.
843/// This is useful to avoid creating temporary gauges.
844/// You can safely create gauges with the same name. They will be joined by the underlying
845/// runtime and are thread safe.
846///
847/// Example:
848/// ```
849/// struct System {
850///     pub memory_usage: f64,
851///     pub cpu_usage: f64,
852/// }
853///
854/// # #[macro_use] extern crate hyperactor_telemetry;
855/// # fn main() {
856/// # let system = System{memory_usage: 512.5, cpu_usage: 25.0};
857/// declare_static_gauge!(MEMORY_USAGE, "memory_usage");
858///
859/// MEMORY_USAGE.record(system.memory_usage, kv_pairs!("unit" => "MB", "process" => "hyperactor"))
860///
861/// # }
862/// ```
863#[macro_export]
864macro_rules! declare_static_gauge {
865    ($name:ident, $key:expr) => {
866        #[doc = "a global gauge named: "]
867        #[doc = $key]
868        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
869            std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
870    };
871}
872/// Create a thread safe static observable gauge that can be set to a specific value based on the provided callback.
873/// This is useful for metrics that need to be calculated or retrieved dynamically.
874/// The callback will be executed whenever the gauge is observed by the metrics system.
875///
876/// Example:
877/// ```
878/// # #[macro_use] extern crate hyperactor_telemetry;
879///
880/// # fn main() {
881/// declare_observable_gauge!(MEMORY_USAGE_GAUGE, "memory_usage", |observer| {
882///     // Simulate getting memory usage - this could be any complex operation
883///     observer.observe(512.0, &[]);
884/// });
885///
886/// // The gauge will be automatically updated when observed
887/// # }
888/// ```
889#[macro_export]
890macro_rules! declare_observable_gauge {
891    ($name:ident, $key:expr, $cb:expr) => {
892        #[doc = "a global gauge named: "]
893        #[doc = $key]
894        pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
895            std::sync::LazyLock::new(|| {
896                $crate::meter(module_path!())
897                    .f64_observable_gauge($key)
898                    .with_callback($cb)
899                    .build()
900            });
901    };
902}
903/// Create a thread safe static histogram that can be incremeneted or decremented.
904/// This is useful to avoid creating temporary histograms.
905/// You can safely create histograms with the same name. They will be joined by the underlying
906/// runtime and are thread safe.
907///
908/// Example:
909/// ```
910/// struct Url {
911///     pub path: String,
912///     pub proto: String,
913/// }
914///
915/// # #[macro_use] extern crate hyperactor_telemetry;
916/// # fn main() {
917/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
918/// declare_static_histogram!(REQUEST_LATENCY, "request_latency");
919///
920/// REQUEST_LATENCY.record(40.0, kv_pairs!("path" => url.path, "proto" => url.proto))
921///
922/// # }
923/// ```
924#[macro_export]
925macro_rules! declare_static_histogram {
926    ($name:ident, $key:expr) => {
927        #[doc = "a global histogram named: "]
928        #[doc = $key]
929        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Histogram<f64>> =
930            std::sync::LazyLock::new(|| {
931                hyperactor_telemetry::meter(module_path!())
932                    .f64_histogram($key)
933                    .build()
934            });
935    };
936}
937
938static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
939    std::sync::OnceLock::new();
940
941/// A custom formatter that prepends prefix from env_var to log messages.
942struct PrefixedFormatter {
943    formatter: Glog<LocalTime>,
944    prefix_env_var: Option<String>,
945}
946
947impl PrefixedFormatter {
948    fn new(prefix_env_var: Option<String>) -> Self {
949        let formatter = Glog::default().with_timer(LocalTime::default());
950        Self {
951            formatter,
952            prefix_env_var,
953        }
954    }
955}
956
957impl<S, N> FormatEvent<S, N> for PrefixedFormatter
958where
959    S: tracing::Subscriber + for<'a> LookupSpan<'a>,
960    N: for<'a> FormatFields<'a> + 'static,
961{
962    fn format_event(
963        &self,
964        ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
965        mut writer: Writer<'_>,
966        event: &tracing::Event<'_>,
967    ) -> std::fmt::Result {
968        let prefix: String = if self.prefix_env_var.is_some() {
969            std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
970        } else {
971            "".to_string()
972        };
973
974        if prefix.is_empty() {
975            write!(writer, "[-]")?;
976        } else {
977            write!(writer, "[{}]", prefix)?;
978        }
979
980        self.formatter.format_event(ctx, writer, event)
981    }
982}
983
984/// Set up logging based on the given execution environment. We specialize logging based on how the
985/// logs are consumed. The destination scuba table is specialized based on the execution environment.
986/// mast -> monarch_tracing/prod
987/// devserver -> monarch_tracing/local
988/// unit test  -> monarch_tracing/test
989/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
990/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
991/// to get this behavior.
992pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
993    initialize_logging_with_log_prefix(clock, None);
994}
995
996/// testing
997pub fn initialize_logging_for_test() {
998    initialize_logging(DefaultTelemetryClock {});
999}
1000
1001/// Set up logging based on the given execution environment. We specialize logging based on how the
1002/// logs are consumed. The destination scuba table is specialized based on the execution environment.
1003/// mast -> monarch_tracing/prod
1004/// devserver -> monarch_tracing/local
1005/// unit test  -> monarch_tracing/test
1006/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
1007/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
1008/// to get this behavior.
1009///
1010/// tracing logs will be prefixed with the given prefix and routed to:
1011/// test -> stderr
1012/// local -> /tmp/monarch_log.log
1013/// mast -> /logs/dedicated_monarch_logs.log
1014/// Additionally, is MONARCH_STDERR_LOG sets logs level, then logs will be routed to stderr as well.
1015pub fn initialize_logging_with_log_prefix(
1016    clock: impl TelemetryClock + Send + 'static,
1017    prefix_env_var: Option<String>,
1018) {
1019    let _ = initialize_logging_with_log_prefix_impl(clock, prefix_env_var, false);
1020}
1021
1022pub fn initialize_logging_with_log_prefix_mock_scuba(
1023    clock: impl TelemetryClock + Send + 'static,
1024    prefix_env_var: Option<String>,
1025) -> Box<dyn TelemetryTestHandle> {
1026    initialize_logging_with_log_prefix_impl(clock, prefix_env_var, true)
1027}
1028
1029fn initialize_logging_with_log_prefix_impl(
1030    clock: impl TelemetryClock + Send + 'static,
1031    prefix_env_var: Option<String>,
1032    mock_scuba: bool,
1033) -> Box<dyn TelemetryTestHandle> {
1034    let use_unified = hyperactor_config::global::get(USE_UNIFIED_LAYER);
1035
1036    swap_telemetry_clock(clock);
1037    let file_log_level = match env::Env::current() {
1038        env::Env::Local => LOG_LEVEL_INFO,
1039        env::Env::MastEmulator => LOG_LEVEL_INFO,
1040        env::Env::Mast => LOG_LEVEL_INFO,
1041        env::Env::Test => LOG_LEVEL_DEBUG,
1042    };
1043
1044    use tracing_subscriber::Registry;
1045    use tracing_subscriber::layer::SubscriberExt;
1046    use tracing_subscriber::util::SubscriberInitExt;
1047
1048    #[cfg(fbcode_build)]
1049    {
1050        let mut mock_scuba_client: Option<crate::meta::scuba_utils::MockScubaClient> = None;
1051
1052        if use_unified {
1053            let mut sinks: Vec<Box<dyn trace_dispatcher::TraceEventSink>> = Vec::new();
1054            sinks.push(Box::new(sinks::glog::GlogSink::new(
1055                writer(),
1056                prefix_env_var.clone(),
1057                file_log_level,
1058            )));
1059
1060            let sqlite_enabled = hyperactor_config::global::get(ENABLE_SQLITE_TRACING);
1061
1062            if sqlite_enabled {
1063                match create_sqlite_sink() {
1064                    Ok(sink) => {
1065                        sinks.push(Box::new(sink));
1066                    }
1067                    Err(e) => {
1068                        tracing::warn!("failed to create SqliteSink: {}", e);
1069                    }
1070                }
1071            }
1072
1073            if hyperactor_config::global::get(sinks::perfetto::PERFETTO_TRACE_MODE)
1074                != sinks::perfetto::PerfettoTraceMode::Off
1075            {
1076                let exec_id = env::execution_id();
1077                let process_name = std::env::var("HYPERACTOR_PROCESS_NAME")
1078                    .unwrap_or_else(|_| "client".to_string());
1079                match sinks::perfetto::PerfettoFileSink::new(
1080                    sinks::perfetto::default_trace_dir(),
1081                    &exec_id,
1082                    &process_name,
1083                ) {
1084                    Ok(sink) => {
1085                        sinks.push(Box::new(sink));
1086                    }
1087                    Err(e) => {
1088                        tracing::warn!("failed to create PerfettoFileSink: {}", e);
1089                    }
1090                }
1091            }
1092
1093            {
1094                if hyperactor_config::global::get(ENABLE_OTEL_TRACING) {
1095                    use crate::meta;
1096                    use crate::meta::scuba_utils::LOG_ENTER_EXIT;
1097
1098                    if mock_scuba {
1099                        let tracing_client = meta::scuba_utils::MockScubaClient::new();
1100
1101                        sinks.push(Box::new(
1102                            meta::scuba_sink::ScubaSink::with_client(
1103                                tracing_client.clone(),
1104                                match meta::tracing_resource().get(&LOG_ENTER_EXIT) {
1105                                    Some(Value::Bool(enabled)) => enabled,
1106                                    _ => false,
1107                                },
1108                            )
1109                            .with_target_filter(crate::config::get_tracing_targets()),
1110                        ));
1111
1112                        mock_scuba_client = Some(tracing_client);
1113                    } else {
1114                        sinks.push(Box::new(
1115                            meta::scuba_sink::ScubaSink::new(meta::tracing_resource())
1116                                .with_target_filter(crate::config::get_tracing_targets()),
1117                        ));
1118                    }
1119                }
1120            }
1121
1122            let dispatcher = trace_dispatcher::TraceEventDispatcher::new(sinks);
1123
1124            if let Err(err) = Registry::default()
1125                .with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1126                    Some(recorder().layer())
1127                } else {
1128                    None
1129                })
1130                .with(dispatcher)
1131                .try_init()
1132            {
1133                tracing::debug!("logging already initialized for this process: {}", err);
1134            }
1135        } else {
1136            // For file_layer, use NonBlocking
1137            let (non_blocking, guard) =
1138                tracing_appender::non_blocking::NonBlockingBuilder::default()
1139                    .lossy(false)
1140                    .finish(writer());
1141            let writer_guard = Arc::new((non_blocking, guard));
1142            let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
1143
1144            let file_layer = fmt::Layer::default()
1145                .with_writer(writer_guard.0.clone())
1146                .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
1147                .fmt_fields(GlogFields::default().compact())
1148                .with_ansi(false)
1149                .with_filter(
1150                    Targets::new()
1151                        .with_default(LevelFilter::from_level({
1152                            let log_level_str =
1153                                hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
1154                                    .unwrap_or_else(|| file_log_level.to_string());
1155                            tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
1156                                tracing::Level::from_str(file_log_level)
1157                                    .expect("Invalid default log level")
1158                            })
1159                        }))
1160                        .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
1161                );
1162
1163            let registry = Registry::default()
1164                .with(if hyperactor_config::global::get(ENABLE_SQLITE_TRACING) {
1165                    // TODO: get_reloadable_sqlite_layer currently still returns None,
1166                    // and some additional work is required to make it work.
1167                    Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer"))
1168                } else {
1169                    None
1170                })
1171                .with(file_layer)
1172                .with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1173                    Some(recorder().layer())
1174                } else {
1175                    None
1176                });
1177
1178            if mock_scuba {
1179                let tracing_client = crate::meta::scuba_utils::MockScubaClient::new();
1180
1181                let scuba_layer = crate::meta::tracing_layer_with_client(tracing_client.clone());
1182
1183                if let Err(err) = registry.with(scuba_layer).try_init() {
1184                    tracing::debug!("logging already initialized for this process: {}", err);
1185                }
1186
1187                mock_scuba_client = Some(tracing_client);
1188            } else if let Err(err) = registry
1189                .with(if hyperactor_config::global::get(ENABLE_OTEL_TRACING) {
1190                    Some(otel::tracing_layer())
1191                } else {
1192                    None
1193                })
1194                .try_init()
1195            {
1196                tracing::debug!("logging already initialized for this process: {}", err);
1197            }
1198        }
1199        let exec_id = env::execution_id();
1200        let process_name =
1201            std::env::var("HYPERACTOR_PROCESS_NAME").unwrap_or_else(|_| "client".to_string());
1202
1203        // setting target to "execution" will prevent the monarch_tracing scuba client from logging this
1204        tracing::info!(
1205            target: "execution",
1206            execution_id = exec_id,
1207            environment = %env::Env::current(),
1208            args = ?std::env::args(),
1209            build_mode = build_info::BuildInfo::get_build_mode(),
1210            compiler = build_info::BuildInfo::get_compiler(),
1211            compiler_version = build_info::BuildInfo::get_compiler_version(),
1212            buck_rule = build_info::BuildInfo::get_rule(),
1213            package_name = build_info::BuildInfo::get_package_name(),
1214            package_release = build_info::BuildInfo::get_package_release(),
1215            upstream_revision = build_info::BuildInfo::get_upstream_revision(),
1216            revision = build_info::BuildInfo::get_revision(),
1217            process_name = process_name,
1218            "logging_initialized",
1219        );
1220        // here we have the monarch_executions scuba client log
1221        meta::log_execution_event(
1222            &exec_id,
1223            &env::Env::current().to_string(),
1224            std::env::args().collect(),
1225            build_info::BuildInfo::get_build_mode(),
1226            build_info::BuildInfo::get_compiler(),
1227            build_info::BuildInfo::get_compiler_version(),
1228            build_info::BuildInfo::get_rule(),
1229            build_info::BuildInfo::get_package_name(),
1230            build_info::BuildInfo::get_package_release(),
1231            build_info::BuildInfo::get_upstream_revision(),
1232            build_info::BuildInfo::get_revision(),
1233            &process_name,
1234        );
1235
1236        if hyperactor_config::global::get(ENABLE_OTEL_METRICS) {
1237            otel::init_metrics();
1238        }
1239
1240        if let Some(tracing_client) = mock_scuba_client {
1241            Box::new(MockScubaHandle { tracing_client })
1242        } else {
1243            Box::new(EmptyTestHandle)
1244        }
1245    }
1246    #[cfg(not(fbcode_build))]
1247    {
1248        let registry =
1249            Registry::default().with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1250                Some(recorder().layer())
1251            } else {
1252                None
1253            });
1254
1255        if use_unified {
1256            let mut sinks: Vec<Box<dyn trace_dispatcher::TraceEventSink>> = Vec::new();
1257
1258            let sqlite_enabled = hyperactor_config::global::get(ENABLE_SQLITE_TRACING);
1259
1260            if sqlite_enabled {
1261                match create_sqlite_sink() {
1262                    Ok(sink) => {
1263                        sinks.push(Box::new(sink));
1264                    }
1265                    Err(e) => {
1266                        tracing::warn!("failed to create SqliteSink: {}", e);
1267                    }
1268                }
1269            }
1270
1271            sinks.push(Box::new(sinks::glog::GlogSink::new(
1272                writer(),
1273                prefix_env_var.clone(),
1274                file_log_level,
1275            )));
1276
1277            if let Some(log_sink) = otlp::otlp_log_sink() {
1278                sinks.push(log_sink);
1279            }
1280
1281            let dispatcher = trace_dispatcher::TraceEventDispatcher::new(sinks);
1282
1283            if let Err(err) = registry.with(dispatcher).try_init() {
1284                tracing::debug!("logging already initialized for this process: {}", err);
1285            }
1286        } else {
1287            let (non_blocking, guard) =
1288                tracing_appender::non_blocking::NonBlockingBuilder::default()
1289                    .lossy(false)
1290                    .finish(writer());
1291            let writer_guard = Arc::new((non_blocking, guard));
1292            let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
1293
1294            let file_layer = fmt::Layer::default()
1295                .with_writer(writer_guard.0.clone())
1296                .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
1297                .fmt_fields(GlogFields::default().compact())
1298                .with_ansi(false)
1299                .with_filter(
1300                    Targets::new()
1301                        .with_default(LevelFilter::from_level({
1302                            let log_level_str =
1303                                hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
1304                                    .unwrap_or_else(|| file_log_level.to_string());
1305                            tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
1306                                tracing::Level::from_str(file_log_level)
1307                                    .expect("Invalid default log level")
1308                            })
1309                        }))
1310                        .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
1311                );
1312
1313            if let Err(err) = registry.with(file_layer).try_init() {
1314                tracing::debug!("logging already initialized for this process: {}", err);
1315            }
1316        }
1317
1318        otel::init_metrics();
1319
1320        Box::new(EmptyTestHandle)
1321    }
1322}
1323
1324fn create_sqlite_sink() -> anyhow::Result<sinks::sqlite::SqliteSink> {
1325    let (db_path, _) = log_file_path(env::Env::current(), Some("traces"))
1326        .expect("failed to determine trace db path");
1327    let db_file = format!("{}/hyperactor_trace_{}.db", db_path, std::process::id());
1328
1329    sinks::sqlite::SqliteSink::new_with_file(&db_file, 100)
1330}
1331
1332/// Create a context span at ERROR level with skip_record enabled.
1333/// This is intended to create spans whose only purpose it is to add context
1334/// to child events; the span itself is never independently recorded.
1335///
1336/// Example:
1337/// ```ignore
1338/// use hyperactor_telemetry::context_span;
1339///
1340/// let span = context_span!("my_context", field1 = value1, field2 = value2);
1341/// let _guard = span.enter();
1342/// // ... do work that will be logged with this context
1343/// ```
1344#[macro_export]
1345macro_rules! context_span {
1346    (target: $target:expr, parent: $parent:expr, $name:expr, $($field:tt)*) => {
1347        ::tracing::error_span!(
1348            target: $target,
1349            parent: $parent,
1350            $name,
1351            skip_record = $crate::skip_record,
1352            $($field)*
1353        )
1354    };
1355    (target: $target:expr, parent: $parent:expr, $name:expr) => {
1356        ::tracing::error_span!(
1357            target: $target,
1358            parent: $parent,
1359            $name,
1360            skip_record = $crate::skip_record,
1361        )
1362    };
1363    (parent: $parent:expr, $name:expr, $($field:tt)*) => {
1364        ::tracing::error_span!(
1365            target: module_path!(),
1366            parent: $parent,
1367            $name,
1368            skip_record = $crate::skip_record,
1369            $($field)*
1370        )
1371    };
1372    (parent: $parent:expr, $name:expr) => {
1373        ::tracing::error_span!(
1374            parent: $parent,
1375            $name,
1376            skip_record = $crate::skip_record,
1377        )
1378    };
1379    (target: $target:expr, $name:expr, $($field:tt)*) => {
1380        ::tracing::error_span!(
1381            target: $target,
1382            $name,
1383            skip_record = $crate::skip_record,
1384            $($field)*
1385        )
1386    };
1387    (target: $target:expr, $name:expr) => {
1388        ::tracing::error_span!(
1389            target: $target,
1390            $name,
1391            skip_record = $crate::skip_record,
1392        )
1393    };
1394    ($name:expr, $($field:tt)*) => {
1395        ::tracing::error_span!(
1396            target: module_path!(),
1397            $name,
1398            skip_record = $crate::skip_record,
1399            $($field)*
1400        )
1401    };
1402    ($name:expr) => {
1403        ::tracing::error_span!(
1404            $name,
1405            skip_record = $crate::skip_record,
1406        )
1407    };
1408}
1409
1410pub mod env {
1411    use rand::RngCore;
1412
1413    /// Env var name set when monarch launches subprocesses to forward the execution context
1414    pub const HYPERACTOR_EXECUTION_ID_ENV: &str = "HYPERACTOR_EXECUTION_ID";
1415    pub const OTEL_EXPORTER: &str = "HYPERACTOR_OTEL_EXPORTER";
1416    pub const MAST_ENVIRONMENT: &str = "MAST_ENVIRONMENT";
1417
1418    /// Forward or generate a uuid for this execution. When running in production on mast, this is provided to
1419    /// us via the MAST_HPC_JOB_NAME env var. Subprocesses should either forward the MAST_HPC_JOB_NAME
1420    /// variable, or set the "MONARCH_EXECUTION_ID" var for subprocesses launched by this process.
1421    /// We keep these env vars separate so that other applications that depend on the MAST_HPC_JOB_NAME existing
1422    /// to understand their environment do not get confused and think they are running on mast when we are doing
1423    ///  local testing.
1424    pub fn execution_id() -> String {
1425        let id = std::env::var(HYPERACTOR_EXECUTION_ID_ENV).unwrap_or_else(|_| {
1426            // not able to find an existing id so generate a unique one: username + current_time + random number.
1427            let username = crate::username();
1428            let now = {
1429                let now = std::time::SystemTime::now();
1430                let datetime: chrono::DateTime<chrono::Local> = now.into();
1431                datetime.format("%b-%d_%H:%M").to_string()
1432            };
1433            let random_number: u16 = (rand::rng().next_u32() % 1000) as u16;
1434            let execution_id = format!("{}_{}_{}", username, now, random_number);
1435            execution_id
1436        });
1437        // Safety: Can be unsound if there are multiple threads
1438        // reading and writing the environment.
1439        unsafe {
1440            std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, id.clone());
1441        }
1442        id
1443    }
1444
1445    /// Returns a URL for the execution trace, if available.
1446    #[cfg(fbcode_build)]
1447    pub async fn execution_url() -> anyhow::Result<Option<String>> {
1448        Ok(Some(
1449            crate::meta::scuba_tracing::url::get_samples_shorturl(&execution_id()).await?,
1450        ))
1451    }
1452    #[cfg(not(fbcode_build))]
1453    pub async fn execution_url() -> anyhow::Result<Option<String>> {
1454        Ok(None)
1455    }
1456
1457    #[derive(PartialEq)]
1458    pub enum Env {
1459        Local,
1460        Mast,
1461        MastEmulator,
1462        Test,
1463    }
1464
1465    impl std::fmt::Display for Env {
1466        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1467            write!(
1468                f,
1469                "{}",
1470                match self {
1471                    Self::Local => crate::ENV_VALUE_LOCAL,
1472                    Self::MastEmulator => crate::ENV_VALUE_MAST_EMULATOR,
1473                    Self::Mast => crate::ENV_VALUE_MAST,
1474                    Self::Test => crate::ENV_VALUE_TEST,
1475                }
1476            )
1477        }
1478    }
1479
1480    impl Env {
1481        #[cfg(test)]
1482        pub fn current() -> Self {
1483            Self::Test
1484        }
1485
1486        #[cfg(not(test))]
1487        pub fn current() -> Self {
1488            match std::env::var(MAST_ENVIRONMENT).unwrap_or_default().as_str() {
1489                // Constant from https://fburl.com/fhysd3fd
1490                crate::ENV_VALUE_LOCAL_MAST_SIMULATOR => Self::MastEmulator,
1491                _ => match std::env::var(crate::MAST_HPC_JOB_NAME_ENV).is_ok() {
1492                    true => Self::Mast,
1493                    false => Self::Local,
1494                },
1495            }
1496        }
1497    }
1498}
1499
1500#[cfg(test)]
1501mod test {
1502    use opentelemetry::*;
1503    extern crate self as hyperactor_telemetry;
1504    use super::*;
1505
1506    #[test]
1507    fn infer_kv_pair_types() {
1508        assert_eq!(
1509            key_value!("str", "str"),
1510            KeyValue::new(Key::new("str"), Value::String("str".into()))
1511        );
1512        assert_eq!(
1513            key_value!("str", 25),
1514            KeyValue::new(Key::new("str"), Value::I64(25))
1515        );
1516        assert_eq!(
1517            key_value!("str", 1.1),
1518            KeyValue::new(Key::new("str"), Value::F64(1.1))
1519        );
1520    }
1521    #[test]
1522    fn kv_pair_slices() {
1523        assert_eq!(
1524            kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
1525            &[
1526                key_value!("1", "1"),
1527                key_value!("2", 2),
1528                key_value!("3", 3.0),
1529            ],
1530        );
1531    }
1532
1533    #[test]
1534    fn test_static_gauge() {
1535        // Create a static gauge using the macro
1536        declare_static_gauge!(TEST_GAUGE, "test_gauge");
1537        declare_static_gauge!(MEMORY_GAUGE, "memory_usage");
1538
1539        // Set values to the gauge with different attributes
1540        // This shouldn't actually log to scribe/scuba in test environment
1541        TEST_GAUGE.record(42.5, kv_pairs!("component" => "test", "unit" => "MB"));
1542        MEMORY_GAUGE.record(512.0, kv_pairs!("type" => "heap", "process" => "test"));
1543
1544        // Test with empty attributes
1545        TEST_GAUGE.record(50.0, &[]);
1546    }
1547}