Skip to main content

hyperactor_telemetry/
lib.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(internal_features)]
10#![feature(assert_matches)]
11#![feature(sync_unsafe_cell)]
12#![feature(mpmc_channel)]
13#![feature(cfg_version)]
14#![feature(formatting_options)]
15#![feature(thread_id_value)]
16#![recursion_limit = "256"]
17
18// Environment variable for job name (used for environment detection)
19pub const MAST_HPC_JOB_NAME_ENV: &str = "MAST_HPC_JOB_NAME";
20
21// Log level constants
22const LOG_LEVEL_INFO: &str = "info";
23const LOG_LEVEL_DEBUG: &str = "debug";
24
25// Span field constants
26const SPAN_FIELD_RECORDING: &str = "recording";
27#[allow(dead_code)]
28const SPAN_FIELD_RECORDER: &str = "recorder";
29
30/// Well-known tracing field name for the log subject.
31/// Spans carrying this field identify the entity (actor, proc, etc.)
32/// that log events within the span pertain to.
33pub const SUBJECT_KEY: &str = "subject";
34
35// Environment value constants
36const ENV_VALUE_LOCAL: &str = "local";
37const ENV_VALUE_MAST_EMULATOR: &str = "mast_emulator";
38const ENV_VALUE_MAST: &str = "mast";
39const ENV_VALUE_TEST: &str = "test";
40#[allow(dead_code)]
41const ENV_VALUE_LOCAL_MAST_SIMULATOR: &str = "local_mast_simulator";
42
43/// A marker field used to indicate that a span should not be recorded as
44/// individual start/end span events; rather the span is purely used to
45/// provide context for child events.
46///
47/// Note that the mechanism for skipping span recording uses the precise
48/// name "skip_record", thus it must be used as a naked identifier:
49/// ```ignore
50/// use hyperactor_telemetry::skip_record;
51///
52/// tracing::span!(..., skip_record);
53/// ```
54#[allow(non_upper_case_globals)]
55// pub const skip_record: tracing::field::Empty = tracing::field::Empty;
56pub const skip_record: bool = true;
57
58mod config;
59pub mod in_memory_reader;
60#[cfg(all(fbcode_build, target_os = "linux"))]
61mod meta;
62mod otel;
63pub(crate) mod otlp;
64mod pool;
65mod rate_limit;
66pub mod recorder;
67pub mod sinks;
68mod spool;
69pub mod sqlite;
70pub mod task;
71pub mod trace;
72pub mod trace_dispatcher;
73
74// Re-export key types for external sink implementations
75use std::collections::hash_map::DefaultHasher;
76use std::hash::Hash;
77use std::hash::Hasher;
78use std::io::Write;
79use std::str::FromStr;
80use std::sync::Arc;
81use std::sync::Mutex;
82use std::sync::atomic::AtomicU64;
83use std::sync::atomic::Ordering;
84use std::sync::mpsc;
85use std::time::Instant;
86use std::time::SystemTime;
87
88use lazy_static::lazy_static;
89pub use opentelemetry;
90pub use opentelemetry::Key;
91pub use opentelemetry::KeyValue;
92pub use opentelemetry::Value;
93pub use opentelemetry::global::meter;
94pub use trace_dispatcher::DispatcherControl;
95pub use trace_dispatcher::FieldValue;
96pub use trace_dispatcher::TraceEvent;
97pub use trace_dispatcher::TraceEventSink;
98use trace_dispatcher::TraceFields;
99pub use tracing;
100pub use tracing::Level;
101use tracing_appender::non_blocking::NonBlocking;
102use tracing_appender::non_blocking::WorkerGuard;
103use tracing_appender::rolling::RollingFileAppender;
104use tracing_glog::Glog;
105use tracing_glog::GlogFields;
106use tracing_glog::LocalTime;
107use tracing_subscriber::Layer;
108use tracing_subscriber::filter::LevelFilter;
109use tracing_subscriber::filter::Targets;
110use tracing_subscriber::fmt;
111use tracing_subscriber::fmt::FormatEvent;
112use tracing_subscriber::fmt::FormatFields;
113use tracing_subscriber::fmt::format::Writer;
114use tracing_subscriber::registry::LookupSpan;
115
116#[cfg(all(fbcode_build, target_os = "linux"))]
117use crate::config::ENABLE_OTEL_METRICS;
118#[cfg(all(fbcode_build, target_os = "linux"))]
119use crate::config::ENABLE_OTEL_TRACING;
120use crate::config::ENABLE_RECORDER_TRACING;
121use crate::config::ENABLE_SQLITE_TRACING;
122use crate::config::MONARCH_FILE_LOG_LEVEL;
123use crate::config::MONARCH_LOG_SUFFIX;
124use crate::config::USE_UNIFIED_LAYER;
125use crate::recorder::Recorder;
126#[cfg(all(fbcode_build, target_os = "linux"))]
127use crate::sqlite::get_reloadable_sqlite_layer;
128
129/// Hash any hashable value to a u64 using DefaultHasher.
130pub fn hash_to_u64(value: &impl Hash) -> u64 {
131    let mut hasher = DefaultHasher::new();
132    value.hash(&mut hasher);
133    hasher.finish()
134}
135
136#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
137pub struct TelemetrySample {
138    fields: Vec<(String, String)>,
139}
140
141impl TelemetrySample {
142    pub fn get_string(&self, key: &str) -> Option<&str> {
143        for (k, v) in &self.fields {
144            if k == key {
145                return Some(v.as_str());
146            }
147        }
148        None
149    }
150}
151
152#[cfg(all(fbcode_build, target_os = "linux"))]
153impl From<crate::meta::sample_buffer::Sample> for TelemetrySample {
154    fn from(sample: crate::meta::sample_buffer::Sample) -> Self {
155        let mut fields = Vec::new();
156        for (key, value) in sample.0 {
157            if let crate::meta::sample_buffer::SampleValue::String(s) = value {
158                fields.push((key.to_string(), s.to_string()));
159            }
160        }
161        Self { fields }
162    }
163}
164
165#[cfg(not(all(fbcode_build, target_os = "linux")))]
166impl TelemetrySample {
167    pub fn new() -> Self {
168        Self { fields: Vec::new() }
169    }
170}
171
172pub trait TelemetryTestHandle {
173    fn get_tracing_samples(&self) -> Vec<TelemetrySample>;
174}
175
176#[cfg(all(fbcode_build, target_os = "linux"))]
177struct MockScubaHandle {
178    tracing_client: crate::meta::scuba_utils::MockScubaClient,
179}
180
181#[cfg(all(fbcode_build, target_os = "linux"))]
182impl TelemetryTestHandle for MockScubaHandle {
183    fn get_tracing_samples(&self) -> Vec<TelemetrySample> {
184        self.tracing_client
185            .get_samples()
186            .into_iter()
187            .map(TelemetrySample::from)
188            .collect()
189    }
190}
191
192struct EmptyTestHandle;
193
194impl TelemetryTestHandle for EmptyTestHandle {
195    fn get_tracing_samples(&self) -> Vec<TelemetrySample> {
196        vec![]
197    }
198}
199
200pub trait TelemetryClock {
201    fn now(&self) -> tokio::time::Instant;
202    fn system_time_now(&self) -> std::time::SystemTime;
203}
204
205pub struct DefaultTelemetryClock {}
206
207impl TelemetryClock for DefaultTelemetryClock {
208    fn now(&self) -> tokio::time::Instant {
209        tokio::time::Instant::now()
210    }
211
212    fn system_time_now(&self) -> std::time::SystemTime {
213        std::time::SystemTime::now()
214    }
215}
216
217pub fn username() -> String {
218    let env = env::Env::current();
219    match env {
220        env::Env::Mast => {
221            std::env::var("MAST_JOB_OWNER_UNIXNAME").unwrap_or_else(|_| "mast_owner".to_string())
222        }
223        _ => whoami::username(),
224    }
225}
226
227// Given an environment, determine the log file path to write to.
228// If a suffix is provided, it will be prepended with "_" and then appended to file name
229pub fn log_file_path(
230    env: env::Env,
231    suffix: Option<&str>,
232) -> Result<(String, String), anyhow::Error> {
233    let suffix = suffix
234        .map(|s| {
235            if s.is_empty() {
236                String::new()
237            } else {
238                format!("_{}", s)
239            }
240        })
241        .unwrap_or_default();
242    match env {
243        env::Env::Local | env::Env::MastEmulator => {
244            let username = if whoami::username().is_empty() {
245                "monarch".to_string()
246            } else {
247                whoami::username()
248            };
249            Ok((
250                format!("/tmp/{}", username),
251                format!("monarch_log{}", suffix),
252            ))
253        }
254        env::Env::Mast => Ok((
255            "/logs/".to_string(),
256            format!("dedicated_log_monarch{}", suffix),
257        )),
258        _ => Err(anyhow::anyhow!(
259            "file writer unsupported for environment {}",
260            env
261        )),
262    }
263}
264
265fn try_create_appender(
266    path: &str,
267    filename: &str,
268    create_dir: bool,
269) -> Result<RollingFileAppender, Box<dyn std::error::Error>> {
270    if create_dir {
271        std::fs::create_dir_all(path)?;
272    }
273    Ok(RollingFileAppender::builder()
274        .filename_prefix(filename)
275        .filename_suffix("log")
276        .build(path)?)
277}
278
279fn writer() -> Box<dyn Write + Send> {
280    match env::Env::current() {
281        env::Env::Test => Box::new(std::io::stderr()),
282        env::Env::Local | env::Env::MastEmulator | env::Env::Mast => {
283            let suffix = hyperactor_config::global::try_get_cloned(MONARCH_LOG_SUFFIX);
284            let (path, filename) = log_file_path(env::Env::current(), suffix.as_deref()).unwrap();
285            match try_create_appender(&path, &filename, true) {
286                Ok(file_appender) => Box::new(file_appender),
287                Err(e) => {
288                    eprintln!(
289                        "unable to create log file in {}: {}. Falling back to stderr",
290                        path, e
291                    );
292                    Box::new(std::io::stderr())
293                }
294            }
295        }
296    }
297}
298
299lazy_static! {
300    static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
301        Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {})));
302    /// Global sender into the active `TraceEventDispatcher` queue.
303    ///
304    /// This is for `TraceEvent`s synthesized outside normal `tracing` subscriber callbacks,
305    /// such as Python user spans. Once telemetry initializes and constructs the dispatcher,
306    /// we stash its sender here so those synthetic events flow through the same sink fan-out
307    /// path as native Rust tracing events.
308    static ref SYNTHETIC_TRACE_EVENT_SENDER: Mutex<Option<mpsc::SyncSender<TraceEvent>>> =
309        Mutex::new(None);
310    /// Global control channel for sink registration.
311    /// Created upfront so sinks can be registered at any time (before or after telemetry init).
312    /// The receiver is taken once when the TraceEventDispatcher is created.
313    static ref SINK_CONTROL_CHANNEL: (
314        mpsc::Sender<DispatcherControl>,
315        Mutex<Option<mpsc::Receiver<DispatcherControl>>>
316    ) = {
317        let (sender, receiver) = mpsc::channel();
318        (sender, Mutex::new(Some(receiver)))
319    };
320    /// Global unified entity event dispatcher with pre-registration buffering.
321    /// Events emitted before a dispatcher is registered are buffered and replayed
322    /// when `set_entity_dispatcher` is called. This ensures bootstrap actors
323    /// (e.g., HostAgent and ProcAgent) are captured even though they are spawned before the
324    /// telemetry system is initialized.
325    static ref ENTITY_EVENT_STATE: Mutex<EntityEventState> = Mutex::new(
326        EntityEventState::Buffering(Vec::new())
327    );
328}
329
330const SYNTHETIC_USER_SPAN_ID_BASE: u64 = 1 << 63;
331static USER_SPAN_SEQ: AtomicU64 = AtomicU64::new(SYNTHETIC_USER_SPAN_ID_BASE);
332const SYNTHETIC_USER_SPAN_TRACK_NAME: &str = "python";
333
334/// Install the sender for the active dispatcher so synthesized events can join the
335/// same pipeline as events captured from native `tracing` callbacks.
336pub(crate) fn set_synthetic_trace_event_sender(sender: mpsc::SyncSender<TraceEvent>) {
337    *SYNTHETIC_TRACE_EVENT_SENDER
338        .lock()
339        .expect("SYNTHETIC_TRACE_EVENT_SENDER mutex should not be poisoned") = Some(sender);
340}
341
342/// Sends a synthesized trace event to the active dispatcher queue.
343/// Returns `true` if sent successfully.
344pub(crate) fn emit_trace_event(event: TraceEvent) -> bool {
345    let sender = SYNTHETIC_TRACE_EVENT_SENDER
346        .lock()
347        .expect("SYNTHETIC_TRACE_EVENT_SENDER mutex should not be poisoned")
348        .clone();
349    match sender {
350        Some(sender) => sender.try_send(event).is_ok(),
351        None => false,
352    }
353}
354
355/// Begins a user-defined span and returns its id. Returns 0 if the dispatcher is not initialized.
356pub fn start_user_span(
357    name: &'static str,
358    target: &'static str,
359    fields: impl IntoIterator<Item = (&'static str, FieldValue)>,
360) -> u64 {
361    if SYNTHETIC_TRACE_EVENT_SENDER
362        .lock()
363        .expect("SYNTHETIC_TRACE_EVENT_SENDER mutex should not be poisoned")
364        .is_none()
365    {
366        return 0;
367    }
368
369    let id = USER_SPAN_SEQ.fetch_add(1, Ordering::Relaxed);
370
371    let fields = fields.into_iter().collect::<TraceFields>();
372
373    let _ = emit_trace_event(TraceEvent::NewSpan {
374        id,
375        name,
376        target,
377        level: tracing::Level::INFO,
378        fields,
379        timestamp: SystemTime::now(),
380        parent_id: None,
381        thread_name: SYNTHETIC_USER_SPAN_TRACK_NAME,
382        file: None,
383        line: None,
384    });
385
386    let _ = emit_trace_event(TraceEvent::SpanEnter {
387        id,
388        timestamp: SystemTime::now(),
389        thread_name: SYNTHETIC_USER_SPAN_TRACK_NAME,
390    });
391
392    id
393}
394
395/// Ends a user-defined span previously started with [`start_user_span`].
396pub fn end_user_span(id: u64) {
397    if id == 0 {
398        return;
399    }
400
401    let _ = emit_trace_event(TraceEvent::SpanExit {
402        id,
403        timestamp: SystemTime::now(),
404        thread_name: SYNTHETIC_USER_SPAN_TRACK_NAME,
405    });
406
407    let _ = emit_trace_event(TraceEvent::SpanClose {
408        id,
409        timestamp: SystemTime::now(),
410    });
411}
412
413/// State machine for the entity event dispatcher.
414/// Starts in `Buffering`, collecting events until a dispatcher is registered.
415/// Transitions to `Dispatching` on `set_entity_dispatcher`, replaying buffered
416/// events and dropping the buffer so it cannot accumulate further.
417enum EntityEventState {
418    Buffering(Vec<EntityEvent>),
419    Dispatching(Box<dyn EntityEventDispatcher>),
420}
421
422/// Event data for actor creation.
423/// This is passed to EntityEventDispatcher implementations when an actor is spawned.
424#[derive(Debug, Clone)]
425pub struct ActorEvent {
426    /// Unique identifier for this actor (hashed from ActorAddr)
427    pub id: u64,
428    /// Timestamp when the actor was created
429    pub timestamp: SystemTime,
430    /// ID of the mesh this actor belongs to (hashed from actor_name)
431    pub mesh_id: u64,
432    /// Rank index into the mesh shape
433    pub rank: u64,
434    /// Full hierarchical name of this actor
435    pub full_name: String,
436    /// User-facing name for this actor
437    pub display_name: Option<String>,
438}
439
440/// Notify the registered dispatcher that an actor was created.
441/// If no dispatcher is registered yet, the event is buffered and will be
442/// replayed when `set_entity_dispatcher` is called.
443pub fn notify_actor_created(event: ActorEvent) {
444    dispatch_or_buffer(EntityEvent::Actor(event));
445}
446
447/// Event data for mesh creation.
448/// This is passed to EntityEventDispatcher implementations when a mesh is spawned.
449#[derive(Debug, Clone)]
450pub struct MeshEvent {
451    /// Unique identifier for this mesh (hashed)
452    pub id: u64,
453    /// Timestamp when the mesh was created
454    pub timestamp: SystemTime,
455    /// Mesh class (e.g., "Proc", "Host", "Python<SomeUserDefinedActor>")
456    pub class: String,
457    /// User-provided name for this mesh
458    pub given_name: String,
459    /// Full hierarchical name as it appears in supervision events
460    pub full_name: String,
461    /// Shape of the mesh, serialized from ndslice::Extent
462    pub shape_json: String,
463    /// Parent mesh ID (None for root meshes)
464    pub parent_mesh_id: Option<u64>,
465    /// Region over which the parent spawned this mesh, serialized from ndslice::Region
466    pub parent_view_json: Option<String>,
467}
468
469/// Notify the registered dispatcher that a mesh was created.
470/// If no dispatcher is registered yet, the event is buffered and will be
471/// replayed when `set_entity_dispatcher` is called.
472pub fn notify_mesh_created(event: MeshEvent) {
473    dispatch_or_buffer(EntityEvent::Mesh(event));
474}
475
476/// Event data for actor status changes.
477/// This is passed to EntityEventDispatcher implementations when an actor changes status.
478#[derive(Debug, Clone)]
479pub struct ActorStatusEvent {
480    /// Unique identifier for this event
481    pub id: u64,
482    /// Timestamp when the status change occurred
483    pub timestamp: SystemTime,
484    /// ID of the actor whose status changed
485    pub actor_id: u64,
486    /// New status value (e.g. "Created", "Idle", "Failed")
487    pub new_status: String,
488    /// Reason for the status change (e.g. error details for Failed)
489    pub reason: Option<String>,
490}
491
492/// Notify the registered dispatcher that an actor changed status.
493/// If no dispatcher is registered yet, the event is buffered and will be
494/// replayed when `set_entity_dispatcher` is called.
495pub fn notify_actor_status_changed(event: ActorStatusEvent) {
496    dispatch_or_buffer(EntityEvent::ActorStatus(event));
497}
498
499/// Event fired when a message is sent to an actor mesh.
500///
501/// Emitted from `cast_with_selection` in `actor_mesh.rs`, which is the common
502/// path for all Python send methods: `call`, `call_one`, `broadcast`, and `choose`.
503#[derive(Debug, Clone)]
504pub struct SentMessageEvent {
505    pub timestamp: SystemTime,
506    /// Hash of the sending actor's [`ActorAddr`].
507    pub sender_actor_id: u64,
508    /// Hash of the target actor mesh's name.
509    pub actor_mesh_id: u64,
510    /// The view (slice) of the actor mesh that was targeted, serialized from
511    /// [`ndslice::Region`]. For full-mesh sends (call, broadcast) this covers
512    /// all dimensions; for sliced sends (call_one) collapsed dimensions are
513    /// absent; for choose this is a scalar (0-dim) Region.
514    pub view_json: String,
515    /// The shape of the view, serialized from [`ndslice::Shape`] (converted
516    /// from the view Region via `Region::into::<Shape>`).
517    pub shape_json: String,
518}
519
520/// Notify the registered dispatcher that a message was sent.
521/// If no dispatcher is registered yet, the event is buffered and will be
522/// replayed when `set_entity_dispatcher` is called.
523pub fn notify_sent_message(event: SentMessageEvent) {
524    dispatch_or_buffer(EntityEvent::SentMessage(event));
525}
526
527/// Event fired when a message is received (from receiver's perspective).
528#[derive(Debug, Clone)]
529pub struct MessageEvent {
530    pub timestamp: SystemTime,
531    /// Unique identifier for this received message.
532    pub id: u64,
533    /// Hash of sender's ActorAddr.
534    pub from_actor_id: u64,
535    /// Hash of receiver's ActorAddr.
536    pub to_actor_id: u64,
537    /// Endpoint name if this message targets a specific actor endpoint
538    pub endpoint: Option<String>,
539    /// Destination port ID
540    pub port_id: Option<u64>,
541}
542
543/// Notify the registered dispatcher that a message was received.
544pub fn notify_message(event: MessageEvent) {
545    dispatch_or_buffer(EntityEvent::Message(event));
546}
547
548/// Event fired when a received message changes status.
549#[derive(Debug, Clone)]
550pub struct MessageStatusEvent {
551    pub timestamp: SystemTime,
552    /// Unique identifier for this status event.
553    pub id: u64,
554    /// The message whose status changed (FK to MessageEvent.id).
555    pub message_id: u64,
556    /// New status: "queued", "active", or "complete".
557    pub status: String,
558}
559
560/// Notify the registered dispatcher that a message changed status.
561pub fn notify_message_status(event: MessageStatusEvent) {
562    dispatch_or_buffer(EntityEvent::MessageStatus(event));
563}
564
565static ACTOR_STATUS_SEQ: AtomicU64 = AtomicU64::new(1);
566
567/// Generate a globally unique ActorStatusEvent ID.
568///
569/// Combines the actor's unique ID with a process-local sequence number,
570/// then hashes the pair to produce an ID that is unique across processes.
571pub fn generate_actor_status_event_id(actor_id: u64) -> u64 {
572    let seq = ACTOR_STATUS_SEQ.fetch_add(1, Ordering::Relaxed);
573    hash_to_u64(&(actor_id, seq))
574}
575
576static SEND_SEQ: AtomicU64 = AtomicU64::new(1);
577
578/// Generate a globally unique SentMessage ID.
579pub fn generate_sent_message_id(sender_actor_id: u64) -> u64 {
580    let seq = SEND_SEQ.fetch_add(1, Ordering::Relaxed);
581    hash_to_u64(&(sender_actor_id, seq))
582}
583
584static RECV_MSG_SEQ: AtomicU64 = AtomicU64::new(1);
585
586/// Generate a unique received-message ID (cross-process unique).
587///
588/// Hashes (to_actor_id, seq) following the same pattern as
589/// `generate_sent_message_id`.
590pub fn generate_message_id(to_actor_id: u64) -> u64 {
591    let seq = RECV_MSG_SEQ.fetch_add(1, Ordering::Relaxed);
592    hash_to_u64(&(to_actor_id, seq))
593}
594
595static STATUS_EVENT_SEQ: AtomicU64 = AtomicU64::new(1);
596
597/// Generate a unique message-status-event ID (cross-process unique).
598///
599/// Hashes (message_id, seq) following the same pattern as
600/// `generate_sent_message_id`.
601pub fn generate_status_event_id(message_id: u64) -> u64 {
602    let seq = STATUS_EVENT_SEQ.fetch_add(1, Ordering::Relaxed);
603    hash_to_u64(&(message_id, seq))
604}
605
606/// Unified event enum for all entity lifecycle events.
607///
608/// This enum wraps all entity events (actors, meshes, and future event types)
609/// into a single type. This enables a single sink to handle all entity events,
610/// simplifying the registration and notification infrastructure.
611#[derive(Debug, Clone)]
612pub enum EntityEvent {
613    /// An actor was created.
614    Actor(ActorEvent),
615    /// A mesh was created.
616    Mesh(MeshEvent),
617    /// An actor changed status.
618    ActorStatus(ActorStatusEvent),
619    /// A message was sent.
620    SentMessage(SentMessageEvent),
621    /// A message was received.
622    Message(MessageEvent),
623    /// A received message changed status.
624    MessageStatus(MessageStatusEvent),
625}
626
627/// Trait for dispatchers that receive unified entity events.
628///
629/// This is the preferred way to receive entity lifecycle events. Implement this
630/// trait and register with `set_entity_dispatcher` to receive notifications for
631/// all entity types (actors, meshes, etc.) through a single callback.
632///
633/// The dispatcher pattern routes events to appropriate handlers based on the
634/// event type (Actor, Mesh, etc.), distinguishing this from TraceEventSink
635/// which handles tracing spans and events.
636///
637/// # Example
638/// ```ignore
639/// use hyperactor_telemetry::{set_entity_dispatcher, EntityEventDispatcher, EntityEvent};
640///
641/// struct MyEntityDispatcher;
642/// impl EntityEventDispatcher for MyEntityDispatcher {
643///     fn dispatch(&self, event: EntityEvent) -> Result<(), anyhow::Error> {
644///         match event {
645///             EntityEvent::Actor(actor) => println!("Actor: {}", actor.full_name),
646///             EntityEvent::Mesh(mesh) => println!("Mesh: {}", mesh.full_name),
647///             EntityEvent::ActorStatus(status) => println!("Status: {}", status.new_status),
648///             EntityEvent::SentMessage(msg) => println!("Sent: {}", msg.id),
649///             EntityEvent::Message(msg) => println!("Recv: {}", msg.id),
650///             EntityEvent::MessageStatus(s) => println!("Status: {}", s.status),
651///         }
652///         Ok(())
653///     }
654/// }
655///
656/// set_entity_dispatcher(Box::new(MyEntityDispatcher));
657/// ```
658pub trait EntityEventDispatcher: Send + Sync {
659    /// Dispatch an entity event to the appropriate handler.
660    fn dispatch(&self, event: EntityEvent) -> Result<(), anyhow::Error>;
661}
662
663/// Dispatch an entity event to the registered dispatcher, or buffer it if none is set.
664fn dispatch_or_buffer(event: EntityEvent) {
665    if let Ok(mut state) = ENTITY_EVENT_STATE.lock() {
666        match &mut *state {
667            EntityEventState::Dispatching(d) => {
668                if let Err(e) = d.dispatch(event) {
669                    tracing::error!("failed to dispatch entity event: {:?}", e);
670                }
671            }
672            EntityEventState::Buffering(buf) => {
673                // TODO: Disable buffer cap once dispatcher is enabled by default.
674                const MAX_BUFFERED_EVENTS: usize = 1000;
675                if buf.len() < MAX_BUFFERED_EVENTS {
676                    buf.push(event);
677                    if buf.len() == MAX_BUFFERED_EVENTS {
678                        tracing::warn!(
679                            "entity event buffer full ({MAX_BUFFERED_EVENTS}); \
680                             dropping further events until a dispatcher is registered"
681                        );
682                    }
683                }
684            }
685        }
686    }
687}
688
689/// Set the dispatcher to receive all entity events.
690///
691/// Any events that were emitted before this call are replayed to the new dispatcher
692/// in order. All subsequent events are dispatched directly.
693///
694/// Note: Only one dispatcher is supported. Setting a new dispatcher replaces any
695/// previously set dispatcher.
696pub fn set_entity_dispatcher(dispatcher: Box<dyn EntityEventDispatcher>) {
697    if let Ok(mut state) = ENTITY_EVENT_STATE.lock() {
698        // Take buffered events if transitioning from Buffering; empty vec otherwise.
699        let buffered =
700            match std::mem::replace(&mut *state, EntityEventState::Dispatching(dispatcher)) {
701                EntityEventState::Buffering(buf) => buf,
702                EntityEventState::Dispatching(_) => Vec::new(),
703            };
704        for event in buffered {
705            if let EntityEventState::Dispatching(d) = &*state
706                && let Err(e) = d.dispatch(event)
707            {
708                tracing::error!("failed to dispatch buffered entity event: {:?}", e);
709            }
710        }
711    }
712}
713
714/// Register a sink to receive trace events.
715/// This can be called at any time - before or after telemetry initialization.
716/// The sink will receive all trace events on the background worker thread.
717///
718/// # Example
719/// ```ignore
720/// use hyperactor_telemetry::{register_sink, TraceEventSink, TraceEvent};
721///
722/// struct MySink;
723/// impl TraceEventSink for MySink {
724///     fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> { Ok(()) }
725///     fn flush(&mut self) -> Result<(), anyhow::Error> { Ok(()) }
726/// }
727///
728/// register_sink(Box::new(MySink));
729/// ```
730pub fn register_sink(sink: Box<dyn TraceEventSink>) {
731    let sender = &SINK_CONTROL_CHANNEL.0;
732    if let Err(e) = sender.send(DispatcherControl::AddSink(sink)) {
733        eprintln!("[telemetry] failed to register sink: {}", e);
734    }
735}
736
737/// Take the control receiver for use by the TraceEventDispatcher.
738/// This can only be called once; subsequent calls return None.
739pub(crate) fn take_sink_control_receiver() -> Option<mpsc::Receiver<DispatcherControl>> {
740    SINK_CONTROL_CHANNEL.1.lock().unwrap().take()
741}
742
743/// The recorder singleton that is configured as a layer in the the default tracing
744/// subscriber, as configured by `initialize_logging`.
745pub fn recorder() -> &'static Recorder {
746    static RECORDER: std::sync::OnceLock<Recorder> = std::sync::OnceLock::new();
747    RECORDER.get_or_init(Recorder::new)
748}
749
750/// Hotswap the telemetry clock at runtime. This allows changing the clock implementation
751/// after initialization, which is useful for testing or switching between real and simulated time.
752pub fn swap_telemetry_clock(clock: impl TelemetryClock + Send + 'static) {
753    *TELEMETRY_CLOCK.lock().unwrap() = Box::new(clock);
754}
755
756/// Create key value pairs for use in opentelemetry. These pairs can be stored and used multiple
757/// times. Opentelemetry adds key value attributes when you bump counters and histograms.
758/// so MY_COUNTER.add(42, &[key_value!("key", "value")])  and MY_COUNTER.add(42, &[key_value!("key", "other_value")]) will actually bump two separete counters.
759#[macro_export]
760macro_rules! key_value {
761    ($key:expr, $val:expr) => {
762        $crate::opentelemetry::KeyValue::new(
763            $crate::opentelemetry::Key::new($key),
764            $crate::opentelemetry::Value::from($val),
765        )
766    };
767}
768/// Construct the key value attribute slice using mapping syntax.
769/// Example:
770/// ```
771/// # #[macro_use] extern crate hyperactor_telemetry;
772/// # fn main() {
773/// assert_eq!(
774///     kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
775///     &[
776///         key_value!("1", "1"),
777///         key_value!("2", 2),
778///         key_value!("3", 3.0),
779///     ],
780/// );
781/// # }
782/// ```
783#[macro_export]
784macro_rules! kv_pairs {
785    ($($k:expr => $v:expr),* $(,)?) => {
786        &[$($crate::key_value!($k, $v),)*]
787    };
788}
789
790#[derive(Debug, Clone, Copy)]
791pub enum TimeUnit {
792    Millis,
793    Micros,
794    Nanos,
795}
796
797impl TimeUnit {
798    pub fn as_str(&self) -> &'static str {
799        match self {
800            TimeUnit::Millis => "ms",
801            TimeUnit::Micros => "us",
802            TimeUnit::Nanos => "ns",
803        }
804    }
805}
806pub struct Timer(opentelemetry::metrics::Histogram<u64>, TimeUnit);
807
808impl<'a> Timer {
809    pub fn new(data: opentelemetry::metrics::Histogram<u64>, unit: TimeUnit) -> Self {
810        Timer(data, unit)
811    }
812    pub fn start(&'static self, pairs: &'a [opentelemetry::KeyValue]) -> TimerGuard<'a> {
813        TimerGuard {
814            data: self,
815            pairs,
816            start: Instant::now(),
817        }
818    }
819
820    pub fn record(&'static self, dur: std::time::Duration, pairs: &'a [opentelemetry::KeyValue]) {
821        let dur = match self.1 {
822            TimeUnit::Millis => dur.as_millis(),
823            TimeUnit::Micros => dur.as_micros(),
824            TimeUnit::Nanos => dur.as_nanos(),
825        } as u64;
826
827        self.0.record(dur, pairs);
828    }
829}
830pub struct TimerGuard<'a> {
831    data: &'static Timer,
832    pairs: &'a [opentelemetry::KeyValue],
833    start: Instant,
834}
835
836impl Drop for TimerGuard<'_> {
837    fn drop(&mut self) {
838        let now = Instant::now();
839        let dur = now.duration_since(self.start);
840        self.data.record(dur, self.pairs);
841    }
842}
843
844/// Create a thread safe static timer that can be used to measure durations.
845/// This macro creates a histogram with predefined boundaries appropriate for the specified time unit.
846/// Supported units are "ms" (milliseconds), "us" (microseconds), and "ns" (nanoseconds).
847///
848/// Example:
849/// ```
850/// # #[macro_use] extern crate hyperactor_telemetry;
851/// # fn main() {
852/// declare_static_timer!(REQUEST_TIMER, "request_processing_time", hyperactor_telemetry::TimeUnit::Millis);
853///
854/// {
855///     let _ = REQUEST_TIMER.start(kv_pairs!("endpoint" => "/api/users", "method" => "GET"));
856///     // do something expensive
857/// }
858/// # }
859/// ```
860#[macro_export]
861macro_rules! declare_static_timer {
862    ($name:ident, $key:expr, $unit:path) => {
863        #[doc = "a global histogram timer named: "]
864        #[doc = $key]
865        pub static $name: std::sync::LazyLock<$crate::Timer> = std::sync::LazyLock::new(|| {
866            $crate::Timer::new(
867                $crate::meter(module_path!())
868                    .u64_histogram(format!("{}.{}", $key, $unit.as_str()))
869                    .with_unit($unit.as_str())
870                    .build(),
871                $unit,
872            )
873        });
874    };
875}
876
877/// Create a thread safe static counter that can be incremeneted or decremented.
878/// This is useful to avoid creating temporary counters.
879/// You can safely create counters with the same name. They will be joined by the underlying
880/// runtime and are thread safe.
881///
882/// Example:
883/// ```
884/// struct Url {
885///     pub path: String,
886///     pub proto: String,
887/// }
888///
889/// # #[macro_use] extern crate hyperactor_telemetry;
890/// # fn main() {
891/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
892/// declare_static_counter!(REQUESTS_RECEIVED, "requests_received");
893///
894/// REQUESTS_RECEIVED.add(40, kv_pairs!("path" => url.path, "proto" => url.proto))
895///
896/// # }
897/// ```
898#[macro_export]
899macro_rules! declare_static_counter {
900    ($name:ident, $key:expr) => {
901        #[doc = "a global counter named: "]
902        #[doc = $key]
903        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Counter<u64>> =
904            std::sync::LazyLock::new(|| $crate::meter(module_path!()).u64_counter($key).build());
905    };
906}
907
908/// Create a thread safe static counter that can be incremeneted or decremented.
909/// This is useful to avoid creating temporary counters.
910/// You can safely create counters with the same name. They will be joined by the underlying
911/// runtime and are thread safe.
912///
913/// Example:
914/// ```
915/// struct Url {
916///     pub path: String,
917///     pub proto: String,
918/// }
919///
920/// # #[macro_use] extern crate hyperactor_telemetry;
921/// # fn main() {
922/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
923/// declare_static_counter!(REQUESTS_RECEIVED, "requests_received");
924///
925/// REQUESTS_RECEIVED.add(40, kv_pairs!("path" => url.path, "proto" => url.proto))
926///
927/// # }
928/// ```
929#[macro_export]
930macro_rules! declare_static_up_down_counter {
931    ($name:ident, $key:expr) => {
932        #[doc = "a global up down counter named: "]
933        #[doc = $key]
934        pub static $name: std::sync::LazyLock<opentelemetry::metrics::UpDownCounter<i64>> =
935            std::sync::LazyLock::new(|| {
936                $crate::meter(module_path!())
937                    .i64_up_down_counter($key)
938                    .build()
939            });
940    };
941}
942
943/// Create a thread safe static gauge that can be set to a specific value.
944/// This is useful to avoid creating temporary gauges.
945/// You can safely create gauges with the same name. They will be joined by the underlying
946/// runtime and are thread safe.
947///
948/// Example:
949/// ```
950/// struct System {
951///     pub memory_usage: f64,
952///     pub cpu_usage: f64,
953/// }
954///
955/// # #[macro_use] extern crate hyperactor_telemetry;
956/// # fn main() {
957/// # let system = System{memory_usage: 512.5, cpu_usage: 25.0};
958/// declare_static_gauge!(MEMORY_USAGE, "memory_usage");
959///
960/// MEMORY_USAGE.record(system.memory_usage, kv_pairs!("unit" => "MB", "process" => "hyperactor"))
961///
962/// # }
963/// ```
964#[macro_export]
965macro_rules! declare_static_gauge {
966    ($name:ident, $key:expr) => {
967        #[doc = "a global gauge named: "]
968        #[doc = $key]
969        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Gauge<f64>> =
970            std::sync::LazyLock::new(|| $crate::meter(module_path!()).f64_gauge($key).build());
971    };
972}
973/// Create a thread safe static observable gauge that can be set to a specific value based on the provided callback.
974/// This is useful for metrics that need to be calculated or retrieved dynamically.
975/// The callback will be executed whenever the gauge is observed by the metrics system.
976///
977/// Example:
978/// ```
979/// # #[macro_use] extern crate hyperactor_telemetry;
980///
981/// # fn main() {
982/// declare_observable_gauge!(MEMORY_USAGE_GAUGE, "memory_usage", |observer| {
983///     // Simulate getting memory usage - this could be any complex operation
984///     observer.observe(512.0, &[]);
985/// });
986///
987/// // The gauge will be automatically updated when observed
988/// # }
989/// ```
990#[macro_export]
991macro_rules! declare_observable_gauge {
992    ($name:ident, $key:expr, $cb:expr) => {
993        #[doc = "a global gauge named: "]
994        #[doc = $key]
995        pub static $name: std::sync::LazyLock<opentelemetry::metrics::ObservableGauge<f64>> =
996            std::sync::LazyLock::new(|| {
997                $crate::meter(module_path!())
998                    .f64_observable_gauge($key)
999                    .with_callback($cb)
1000                    .build()
1001            });
1002    };
1003}
1004/// Create a thread safe static histogram that can be incremeneted or decremented.
1005/// This is useful to avoid creating temporary histograms.
1006/// You can safely create histograms with the same name. They will be joined by the underlying
1007/// runtime and are thread safe.
1008///
1009/// Example:
1010/// ```
1011/// struct Url {
1012///     pub path: String,
1013///     pub proto: String,
1014/// }
1015///
1016/// # #[macro_use] extern crate hyperactor_telemetry;
1017/// # fn main() {
1018/// # let url = Url{path: "/request/1".into(), proto: "https".into()};
1019/// declare_static_histogram!(REQUEST_LATENCY, "request_latency");
1020///
1021/// REQUEST_LATENCY.record(40.0, kv_pairs!("path" => url.path, "proto" => url.proto))
1022///
1023/// # }
1024/// ```
1025#[macro_export]
1026macro_rules! declare_static_histogram {
1027    ($name:ident, $key:expr) => {
1028        #[doc = "a global histogram named: "]
1029        #[doc = $key]
1030        pub static $name: std::sync::LazyLock<opentelemetry::metrics::Histogram<f64>> =
1031            std::sync::LazyLock::new(|| {
1032                hyperactor_telemetry::meter(module_path!())
1033                    .f64_histogram($key)
1034                    .build()
1035            });
1036    };
1037}
1038
1039static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
1040    std::sync::OnceLock::new();
1041
1042/// A custom formatter that prepends prefix from env_var to log messages.
1043struct PrefixedFormatter {
1044    formatter: Glog<LocalTime>,
1045    prefix_env_var: Option<String>,
1046}
1047
1048impl PrefixedFormatter {
1049    fn new(prefix_env_var: Option<String>) -> Self {
1050        let formatter = Glog::default().with_timer(LocalTime::default());
1051        Self {
1052            formatter,
1053            prefix_env_var,
1054        }
1055    }
1056}
1057
1058impl<S, N> FormatEvent<S, N> for PrefixedFormatter
1059where
1060    S: tracing::Subscriber + for<'a> LookupSpan<'a>,
1061    N: for<'a> FormatFields<'a> + 'static,
1062{
1063    fn format_event(
1064        &self,
1065        ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
1066        mut writer: Writer<'_>,
1067        event: &tracing::Event<'_>,
1068    ) -> std::fmt::Result {
1069        let prefix: String = if self.prefix_env_var.is_some() {
1070            std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
1071        } else {
1072            "".to_string()
1073        };
1074
1075        if prefix.is_empty() {
1076            write!(writer, "[-]")?;
1077        } else {
1078            write!(writer, "[{}]", prefix)?;
1079        }
1080
1081        self.formatter.format_event(ctx, writer, event)
1082    }
1083}
1084
1085/// Set up logging based on the given execution environment. We specialize logging based on how the
1086/// logs are consumed. The destination scuba table is specialized based on the execution environment.
1087/// mast -> monarch_tracing/prod
1088/// devserver -> monarch_tracing/local
1089/// unit test  -> monarch_tracing/test
1090/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
1091/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
1092/// to get this behavior.
1093pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
1094    initialize_logging_with_log_prefix(clock, None);
1095}
1096
1097/// testing
1098pub fn initialize_logging_for_test() {
1099    initialize_logging(DefaultTelemetryClock {});
1100}
1101
1102/// Set up logging based on the given execution environment. We specialize logging based on how the
1103/// logs are consumed. The destination scuba table is specialized based on the execution environment.
1104/// mast -> monarch_tracing/prod
1105/// devserver -> monarch_tracing/local
1106/// unit test  -> monarch_tracing/test
1107/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
1108/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
1109/// to get this behavior.
1110///
1111/// tracing logs will be prefixed with the given prefix and routed to:
1112/// test -> stderr
1113/// local -> /tmp/monarch_log.log
1114/// mast -> /logs/dedicated_monarch_logs.log
1115/// Additionally, is MONARCH_STDERR_LOG sets logs level, then logs will be routed to stderr as well.
1116pub fn initialize_logging_with_log_prefix(
1117    clock: impl TelemetryClock + Send + 'static,
1118    prefix_env_var: Option<String>,
1119) {
1120    let _ = initialize_logging_with_log_prefix_impl(clock, prefix_env_var, false);
1121}
1122
1123pub fn initialize_logging_with_log_prefix_mock_scuba(
1124    clock: impl TelemetryClock + Send + 'static,
1125    prefix_env_var: Option<String>,
1126) -> Box<dyn TelemetryTestHandle> {
1127    initialize_logging_with_log_prefix_impl(clock, prefix_env_var, true)
1128}
1129
1130fn initialize_logging_with_log_prefix_impl(
1131    clock: impl TelemetryClock + Send + 'static,
1132    prefix_env_var: Option<String>,
1133    mock_scuba: bool,
1134) -> Box<dyn TelemetryTestHandle> {
1135    #[cfg(not(all(fbcode_build, target_os = "linux")))]
1136    let _ = mock_scuba;
1137
1138    let use_unified = hyperactor_config::global::get(USE_UNIFIED_LAYER);
1139    let should_install_subscriber = !tracing::dispatcher::has_been_set();
1140
1141    swap_telemetry_clock(clock);
1142    if !should_install_subscriber {
1143        tracing::debug!("logging already initialized for this process");
1144    }
1145    let file_log_level = match env::Env::current() {
1146        env::Env::Local => LOG_LEVEL_INFO,
1147        env::Env::MastEmulator => LOG_LEVEL_INFO,
1148        env::Env::Mast => LOG_LEVEL_INFO,
1149        env::Env::Test => LOG_LEVEL_DEBUG,
1150    };
1151
1152    use tracing_subscriber::Registry;
1153    use tracing_subscriber::layer::SubscriberExt;
1154    use tracing_subscriber::util::SubscriberInitExt;
1155
1156    #[cfg(all(fbcode_build, target_os = "linux"))]
1157    {
1158        let mut mock_scuba_client: Option<crate::meta::scuba_utils::MockScubaClient> = None;
1159
1160        if should_install_subscriber {
1161            if use_unified {
1162                let mut sinks: Vec<Box<dyn trace_dispatcher::TraceEventSink>> = Vec::new();
1163                sinks.push(Box::new(sinks::glog::GlogSink::new(
1164                    writer(),
1165                    prefix_env_var.clone(),
1166                    file_log_level,
1167                )));
1168
1169                let sqlite_enabled = hyperactor_config::global::get(ENABLE_SQLITE_TRACING);
1170
1171                if sqlite_enabled {
1172                    match create_sqlite_sink() {
1173                        Ok(sink) => {
1174                            sinks.push(Box::new(sink));
1175                        }
1176                        Err(e) => {
1177                            tracing::warn!("failed to create SqliteSink: {}", e);
1178                        }
1179                    }
1180                }
1181
1182                if hyperactor_config::global::get(sinks::perfetto::PERFETTO_TRACE_MODE)
1183                    != sinks::perfetto::PerfettoTraceMode::Off
1184                {
1185                    let exec_id = env::execution_id();
1186                    let process_name = std::env::var("HYPERACTOR_PROCESS_NAME")
1187                        .unwrap_or_else(|_| "client".to_string());
1188                    match sinks::perfetto::PerfettoFileSink::new(
1189                        sinks::perfetto::default_trace_dir(),
1190                        &exec_id,
1191                        &process_name,
1192                    ) {
1193                        Ok(sink) => {
1194                            sinks.push(Box::new(sink));
1195                        }
1196                        Err(e) => {
1197                            tracing::warn!("failed to create PerfettoFileSink: {}", e);
1198                        }
1199                    }
1200                }
1201
1202                {
1203                    if hyperactor_config::global::get(ENABLE_OTEL_TRACING) {
1204                        use crate::meta;
1205                        use crate::meta::scuba_utils::LOG_ENTER_EXIT;
1206
1207                        if mock_scuba {
1208                            let tracing_client = meta::scuba_utils::MockScubaClient::new();
1209
1210                            sinks.push(Box::new(
1211                                meta::scuba_sink::ScubaSink::with_client(
1212                                    tracing_client.clone(),
1213                                    match meta::tracing_resource().get(&LOG_ENTER_EXIT) {
1214                                        Some(Value::Bool(enabled)) => enabled,
1215                                        _ => false,
1216                                    },
1217                                )
1218                                .with_target_filter(crate::config::get_tracing_targets()),
1219                            ));
1220
1221                            mock_scuba_client = Some(tracing_client);
1222                        } else {
1223                            sinks.push(Box::new(
1224                                meta::scuba_sink::ScubaSink::new(meta::tracing_resource())
1225                                    .with_target_filter(crate::config::get_tracing_targets()),
1226                            ));
1227                        }
1228                    }
1229                }
1230
1231                let dispatcher = trace_dispatcher::TraceEventDispatcher::new(sinks);
1232                let synthetic_sender = dispatcher.sender();
1233
1234                if let Err(err) = Registry::default()
1235                    .with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1236                        Some(recorder().layer())
1237                    } else {
1238                        None
1239                    })
1240                    .with(dispatcher)
1241                    .try_init()
1242                {
1243                    tracing::debug!("logging already initialized for this process: {}", err);
1244                } else {
1245                    set_synthetic_trace_event_sender(synthetic_sender);
1246                }
1247            } else {
1248                // For file_layer, use NonBlocking
1249                let (non_blocking, guard) =
1250                    tracing_appender::non_blocking::NonBlockingBuilder::default()
1251                        .lossy(false)
1252                        .finish(writer());
1253                let writer_guard = Arc::new((non_blocking, guard));
1254                let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
1255
1256                let file_layer = fmt::Layer::default()
1257                    .with_writer(writer_guard.0.clone())
1258                    .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
1259                    .fmt_fields(GlogFields::default().compact())
1260                    .with_ansi(false)
1261                    .with_filter(
1262                        Targets::new()
1263                            .with_default(LevelFilter::from_level({
1264                                let log_level_str = hyperactor_config::global::try_get_cloned(
1265                                    MONARCH_FILE_LOG_LEVEL,
1266                                )
1267                                .unwrap_or_else(|| file_log_level.to_string());
1268                                tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
1269                                    tracing::Level::from_str(file_log_level)
1270                                        .expect("Invalid default log level")
1271                                })
1272                            }))
1273                            .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
1274                    );
1275
1276                let registry = Registry::default()
1277                    .with(if hyperactor_config::global::get(ENABLE_SQLITE_TRACING) {
1278                        // TODO: get_reloadable_sqlite_layer currently still returns None,
1279                        // and some additional work is required to make it work.
1280                        Some(get_reloadable_sqlite_layer().expect("failed to create sqlite layer"))
1281                    } else {
1282                        None
1283                    })
1284                    .with(file_layer)
1285                    .with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1286                        Some(recorder().layer())
1287                    } else {
1288                        None
1289                    });
1290
1291                if mock_scuba {
1292                    let tracing_client = crate::meta::scuba_utils::MockScubaClient::new();
1293
1294                    let scuba_layer =
1295                        crate::meta::tracing_layer_with_client(tracing_client.clone());
1296
1297                    if let Err(err) = registry.with(scuba_layer).try_init() {
1298                        tracing::debug!("logging already initialized for this process: {}", err);
1299                    }
1300
1301                    mock_scuba_client = Some(tracing_client);
1302                } else if let Err(err) = registry
1303                    .with(if hyperactor_config::global::get(ENABLE_OTEL_TRACING) {
1304                        Some(otel::tracing_layer())
1305                    } else {
1306                        None
1307                    })
1308                    .try_init()
1309                {
1310                    tracing::debug!("logging already initialized for this process: {}", err);
1311                }
1312            }
1313        }
1314        let exec_id = env::execution_id();
1315        let process_name =
1316            std::env::var("HYPERACTOR_PROCESS_NAME").unwrap_or_else(|_| "client".to_string());
1317
1318        // setting target to "execution" will prevent the monarch_tracing scuba client from logging this
1319        tracing::info!(
1320            target: "execution",
1321            execution_id = exec_id,
1322            environment = %env::Env::current(),
1323            args = ?std::env::args(),
1324            build_mode = build_info::BuildInfo::get_build_mode(),
1325            compiler = build_info::BuildInfo::get_compiler(),
1326            compiler_version = build_info::BuildInfo::get_compiler_version(),
1327            buck_rule = build_info::BuildInfo::get_rule(),
1328            package_name = build_info::BuildInfo::get_package_name(),
1329            package_release = build_info::BuildInfo::get_package_release(),
1330            upstream_revision = build_info::BuildInfo::get_upstream_revision(),
1331            revision = build_info::BuildInfo::get_revision(),
1332            process_name = process_name,
1333            "logging_initialized",
1334        );
1335        // here we have the monarch_executions scuba client log
1336        meta::log_execution_event(
1337            &exec_id,
1338            &env::Env::current().to_string(),
1339            std::env::args().collect(),
1340            build_info::BuildInfo::get_build_mode(),
1341            build_info::BuildInfo::get_compiler(),
1342            build_info::BuildInfo::get_compiler_version(),
1343            build_info::BuildInfo::get_rule(),
1344            build_info::BuildInfo::get_package_name(),
1345            build_info::BuildInfo::get_package_release(),
1346            build_info::BuildInfo::get_upstream_revision(),
1347            build_info::BuildInfo::get_revision(),
1348            &process_name,
1349        );
1350
1351        if hyperactor_config::global::get(ENABLE_OTEL_METRICS) {
1352            otel::init_metrics();
1353        }
1354
1355        if let Some(tracing_client) = mock_scuba_client {
1356            Box::new(MockScubaHandle { tracing_client })
1357        } else {
1358            Box::new(EmptyTestHandle)
1359        }
1360    }
1361    #[cfg(not(all(fbcode_build, target_os = "linux")))]
1362    {
1363        let registry =
1364            Registry::default().with(if hyperactor_config::global::get(ENABLE_RECORDER_TRACING) {
1365                Some(recorder().layer())
1366            } else {
1367                None
1368            });
1369
1370        if should_install_subscriber {
1371            if use_unified {
1372                let mut sinks: Vec<Box<dyn trace_dispatcher::TraceEventSink>> = Vec::new();
1373
1374                let sqlite_enabled = hyperactor_config::global::get(ENABLE_SQLITE_TRACING);
1375
1376                if sqlite_enabled {
1377                    match create_sqlite_sink() {
1378                        Ok(sink) => {
1379                            sinks.push(Box::new(sink));
1380                        }
1381                        Err(e) => {
1382                            tracing::warn!("failed to create SqliteSink: {}", e);
1383                        }
1384                    }
1385                }
1386
1387                sinks.push(Box::new(sinks::glog::GlogSink::new(
1388                    writer(),
1389                    prefix_env_var.clone(),
1390                    file_log_level,
1391                )));
1392
1393                if let Some(log_sink) = otlp::otlp_log_sink() {
1394                    sinks.push(log_sink);
1395                }
1396
1397                let dispatcher = trace_dispatcher::TraceEventDispatcher::new(sinks);
1398                let synthetic_sender = dispatcher.sender();
1399
1400                if let Err(err) = registry.with(dispatcher).try_init() {
1401                    tracing::debug!("logging already initialized for this process: {}", err);
1402                } else {
1403                    set_synthetic_trace_event_sender(synthetic_sender);
1404                }
1405            } else {
1406                let (non_blocking, guard) =
1407                    tracing_appender::non_blocking::NonBlockingBuilder::default()
1408                        .lossy(false)
1409                        .finish(writer());
1410                let writer_guard = Arc::new((non_blocking, guard));
1411                let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
1412
1413                let file_layer = fmt::Layer::default()
1414                    .with_writer(writer_guard.0.clone())
1415                    .event_format(PrefixedFormatter::new(prefix_env_var.clone()))
1416                    .fmt_fields(GlogFields::default().compact())
1417                    .with_ansi(false)
1418                    .with_filter(
1419                        Targets::new()
1420                            .with_default(LevelFilter::from_level({
1421                                let log_level_str = hyperactor_config::global::try_get_cloned(
1422                                    MONARCH_FILE_LOG_LEVEL,
1423                                )
1424                                .unwrap_or_else(|| file_log_level.to_string());
1425                                tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
1426                                    tracing::Level::from_str(file_log_level)
1427                                        .expect("Invalid default log level")
1428                                })
1429                            }))
1430                            .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
1431                    );
1432
1433                if let Err(err) = registry.with(file_layer).try_init() {
1434                    tracing::debug!("logging already initialized for this process: {}", err);
1435                }
1436            }
1437        }
1438
1439        otel::init_metrics();
1440
1441        Box::new(EmptyTestHandle)
1442    }
1443}
1444
1445fn create_sqlite_sink() -> anyhow::Result<sinks::sqlite::SqliteSink> {
1446    let (db_path, _) = log_file_path(env::Env::current(), Some("traces"))
1447        .expect("failed to determine trace db path");
1448    let db_file = format!("{}/hyperactor_trace_{}.db", db_path, std::process::id());
1449
1450    sinks::sqlite::SqliteSink::new_with_file(&db_file, 100)
1451}
1452
1453/// Create a context span at ERROR level with skip_record enabled.
1454/// This is intended to create spans whose only purpose it is to add context
1455/// to child events; the span itself is never independently recorded.
1456///
1457/// Example:
1458/// ```ignore
1459/// use hyperactor_telemetry::context_span;
1460///
1461/// let span = context_span!("my_context", field1 = value1, field2 = value2);
1462/// let _guard = span.enter();
1463/// // ... do work that will be logged with this context
1464/// ```
1465#[macro_export]
1466macro_rules! context_span {
1467    (target: $target:expr, parent: $parent:expr, $name:expr, $($field:tt)*) => {
1468        ::tracing::error_span!(
1469            target: $target,
1470            parent: $parent,
1471            $name,
1472            skip_record = $crate::skip_record,
1473            $($field)*
1474        )
1475    };
1476    (target: $target:expr, parent: $parent:expr, $name:expr) => {
1477        ::tracing::error_span!(
1478            target: $target,
1479            parent: $parent,
1480            $name,
1481            skip_record = $crate::skip_record,
1482        )
1483    };
1484    (parent: $parent:expr, $name:expr, $($field:tt)*) => {
1485        ::tracing::error_span!(
1486            target: module_path!(),
1487            parent: $parent,
1488            $name,
1489            skip_record = $crate::skip_record,
1490            $($field)*
1491        )
1492    };
1493    (parent: $parent:expr, $name:expr) => {
1494        ::tracing::error_span!(
1495            parent: $parent,
1496            $name,
1497            skip_record = $crate::skip_record,
1498        )
1499    };
1500    (target: $target:expr, $name:expr, $($field:tt)*) => {
1501        ::tracing::error_span!(
1502            target: $target,
1503            $name,
1504            skip_record = $crate::skip_record,
1505            $($field)*
1506        )
1507    };
1508    (target: $target:expr, $name:expr) => {
1509        ::tracing::error_span!(
1510            target: $target,
1511            $name,
1512            skip_record = $crate::skip_record,
1513        )
1514    };
1515    ($name:expr, $($field:tt)*) => {
1516        ::tracing::error_span!(
1517            target: module_path!(),
1518            $name,
1519            skip_record = $crate::skip_record,
1520            $($field)*
1521        )
1522    };
1523    ($name:expr) => {
1524        ::tracing::error_span!(
1525            $name,
1526            skip_record = $crate::skip_record,
1527        )
1528    };
1529}
1530
1531pub mod env {
1532    /// Env var name set when monarch launches subprocesses to forward the execution context
1533    pub const HYPERACTOR_EXECUTION_ID_ENV: &str = "HYPERACTOR_EXECUTION_ID";
1534    pub const OTEL_EXPORTER: &str = "HYPERACTOR_OTEL_EXPORTER";
1535    pub const MAST_ENVIRONMENT: &str = "MAST_ENVIRONMENT";
1536
1537    /// Forward or generate a uuid for this execution. When running in production on mast, this is provided to
1538    /// us via the MAST_HPC_JOB_NAME env var. Subprocesses should either forward the MAST_HPC_JOB_NAME
1539    /// variable, or set the "MONARCH_EXECUTION_ID" var for subprocesses launched by this process.
1540    /// We keep these env vars separate so that other applications that depend on the MAST_HPC_JOB_NAME existing
1541    /// to understand their environment do not get confused and think they are running on mast when we are doing
1542    ///  local testing.
1543    pub fn execution_id() -> String {
1544        let id = std::env::var(HYPERACTOR_EXECUTION_ID_ENV).unwrap_or_else(|_| {
1545            // not able to find an existing id so generate a unique one: username + current_time + random number.
1546            let username = crate::username();
1547            let now = {
1548                let now = std::time::SystemTime::now();
1549                let datetime: chrono::DateTime<chrono::Local> = now.into();
1550                datetime.format("%b-%d_%H:%M").to_string()
1551            };
1552            let random_number: u16 = (rand::random::<u32>() % 1000) as u16;
1553            let execution_id = format!("{}_{}_{}", username, now, random_number);
1554            execution_id
1555        });
1556        // Safety: Can be unsound if there are multiple threads
1557        // reading and writing the environment.
1558        unsafe {
1559            std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, id.clone());
1560        }
1561        id
1562    }
1563
1564    #[derive(PartialEq)]
1565    pub enum Env {
1566        Local,
1567        Mast,
1568        MastEmulator,
1569        Test,
1570    }
1571
1572    impl std::fmt::Display for Env {
1573        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1574            write!(
1575                f,
1576                "{}",
1577                match self {
1578                    Self::Local => crate::ENV_VALUE_LOCAL,
1579                    Self::MastEmulator => crate::ENV_VALUE_MAST_EMULATOR,
1580                    Self::Mast => crate::ENV_VALUE_MAST,
1581                    Self::Test => crate::ENV_VALUE_TEST,
1582                }
1583            )
1584        }
1585    }
1586
1587    impl Env {
1588        #[cfg(test)]
1589        pub fn current() -> Self {
1590            Self::Test
1591        }
1592
1593        #[cfg(not(test))]
1594        pub fn current() -> Self {
1595            match std::env::var(MAST_ENVIRONMENT).unwrap_or_default().as_str() {
1596                // Constant from https://fburl.com/fhysd3fd
1597                crate::ENV_VALUE_LOCAL_MAST_SIMULATOR => Self::MastEmulator,
1598                _ => match std::env::var(crate::MAST_HPC_JOB_NAME_ENV).is_ok() {
1599                    true => Self::Mast,
1600                    false => Self::Local,
1601                },
1602            }
1603        }
1604    }
1605}
1606
1607#[cfg(test)]
1608mod test {
1609    use opentelemetry::*;
1610    extern crate self as hyperactor_telemetry;
1611    use super::*;
1612
1613    #[test]
1614    fn infer_kv_pair_types() {
1615        assert_eq!(
1616            key_value!("str", "str"),
1617            KeyValue::new(Key::new("str"), Value::String("str".into()))
1618        );
1619        assert_eq!(
1620            key_value!("str", 25),
1621            KeyValue::new(Key::new("str"), Value::I64(25))
1622        );
1623        assert_eq!(
1624            key_value!("str", 1.1),
1625            KeyValue::new(Key::new("str"), Value::F64(1.1))
1626        );
1627    }
1628    #[test]
1629    fn kv_pair_slices() {
1630        assert_eq!(
1631            kv_pairs!("1" => "1", "2" => 2, "3" => 3.0),
1632            &[
1633                key_value!("1", "1"),
1634                key_value!("2", 2),
1635                key_value!("3", 3.0),
1636            ],
1637        );
1638    }
1639
1640    #[test]
1641    fn test_static_gauge() {
1642        // Create a static gauge using the macro
1643        declare_static_gauge!(TEST_GAUGE, "test_gauge");
1644        declare_static_gauge!(MEMORY_GAUGE, "memory_usage");
1645
1646        // Set values to the gauge with different attributes
1647        // This shouldn't actually log to scribe/scuba in test environment
1648        TEST_GAUGE.record(42.5, kv_pairs!("component" => "test", "unit" => "MB"));
1649        MEMORY_GAUGE.record(512.0, kv_pairs!("type" => "heap", "process" => "test"));
1650
1651        // Test with empty attributes
1652        TEST_GAUGE.record(50.0, &[]);
1653    }
1654}