hyperactor_telemetry/sinks/
glog.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Glog-formatted text sink for trace events.
10//! Replicates the behavior of the fmt::Layer with glog formatting.
11
12use std::collections::HashMap;
13use std::fmt::Write as FmtWrite;
14use std::io::Write;
15use std::str::FromStr;
16
17use anyhow::Result;
18use indexmap::IndexMap;
19use tracing_core::LevelFilter;
20use tracing_subscriber::filter::Targets;
21
22use crate::config::MONARCH_FILE_LOG_LEVEL;
23use crate::trace_dispatcher::FieldValue;
24use crate::trace_dispatcher::TraceEvent;
25use crate::trace_dispatcher::TraceEventSink;
26
27const MAX_LINE_SIZE: usize = 4096;
28const TRUNCATION_SUFFIX_RESERVE: usize = 32;
29
30/// A string buffer that limits writes to a maximum size.
31/// Once the limit is reached, further writes are silently ignored and
32/// truncated chars are tracked for reporting.
33struct LimitedBuffer {
34    buffer: String,
35    /// Max bytes for content (excluding truncation suffix and newline).
36    limit: usize,
37    truncated_chars: usize,
38}
39
40impl LimitedBuffer {
41    fn new(limit: usize) -> Self {
42        Self {
43            buffer: String::with_capacity(limit + TRUNCATION_SUFFIX_RESERVE),
44            limit,
45            truncated_chars: 0,
46        }
47    }
48
49    fn clear(&mut self) {
50        self.buffer.clear();
51        self.truncated_chars = 0;
52    }
53
54    /// Write truncation suffix + add a newline
55    fn finish_line(&mut self) {
56        if self.truncated_chars > 0 {
57            use std::fmt::Write;
58            let _ = write!(
59                &mut self.buffer,
60                "...[truncated {} chars]",
61                self.truncated_chars
62            );
63        }
64        self.buffer.push('\n');
65    }
66
67    fn as_bytes(&self) -> &[u8] {
68        self.buffer.as_bytes()
69    }
70}
71
72impl FmtWrite for LimitedBuffer {
73    fn write_str(&mut self, s: &str) -> std::fmt::Result {
74        let remaining = self.limit.saturating_sub(self.buffer.len());
75        if remaining == 0 {
76            self.truncated_chars += s.chars().count();
77            return Ok(());
78        }
79        if s.len() <= remaining {
80            self.buffer.push_str(s);
81        } else {
82            let mut truncate_at = remaining;
83            while truncate_at > 0 && !s.is_char_boundary(truncate_at) {
84                truncate_at -= 1;
85            }
86            self.buffer.push_str(&s[..truncate_at]);
87            self.truncated_chars += s[truncate_at..].chars().count();
88        }
89        Ok(())
90    }
91}
92
93/// Glog sink that writes events in glog format to a file.
94/// This replaces the fmt::Layer that was previously used for text logging.
95///
96/// This only logs Events, not Spans (matching old fmt::Layer behavior).
97pub struct GlogSink {
98    writer: Box<dyn Write + Send>,
99    prefix: Option<String>,
100    /// Track active spans by ID with (name, fields, parent_id) to show span context in event logs
101    active_spans: HashMap<u64, (String, IndexMap<String, FieldValue>, Option<u64>)>,
102    targets: Targets,
103    /// Reusable buffer for formatting log lines to ensure atomic writes.
104    /// We build the entire line in this buffer, then write it atomically to avoid
105    /// interleaving with other threads writing to the same fd (e.g., stderr).
106    line_buffer: LimitedBuffer,
107}
108
109impl GlogSink {
110    /// Create a new glog sink with the given writer.
111    ///
112    /// # Arguments
113    /// * `writer` - Writer to write log events to (used directly without buffering)
114    /// * `prefix_env_var` - Optional environment variable name to read prefix from (matching old impl)
115    /// * `file_log_level` - Minimum log level to capture (e.g., "info", "debug")
116    pub fn new(
117        writer: Box<dyn Write + Send>,
118        prefix_env_var: Option<String>,
119        file_log_level: &str,
120    ) -> Self {
121        let prefix = if let Some(ref env_var_name) = prefix_env_var {
122            std::env::var(env_var_name).ok()
123        } else {
124            None
125        };
126
127        Self {
128            writer,
129            prefix,
130            active_spans: HashMap::new(),
131            targets: Targets::new()
132                .with_default(LevelFilter::from_level({
133                    let log_level_str =
134                        hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
135                            .unwrap_or_else(|| file_log_level.to_string());
136                    tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
137                        tracing::Level::from_str(file_log_level).expect("Invalid default log level")
138                    })
139                }))
140                .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
141            line_buffer: LimitedBuffer::new(MAX_LINE_SIZE - TRUNCATION_SUFFIX_RESERVE),
142        }
143    }
144
145    fn write_event(&mut self, event: &TraceEvent) -> Result<()> {
146        self.line_buffer.clear();
147
148        let timestamp_str = match event {
149            TraceEvent::Event { timestamp, .. } => {
150                let datetime: chrono::DateTime<chrono::Local> = (*timestamp).into();
151                datetime.format("%m%d %H:%M:%S%.6f").to_string()
152            }
153            // write_event is only called for Events, but keep this for safety
154            _ => chrono::Local::now().format("%m%d %H:%M:%S%.6f").to_string(),
155        };
156
157        let prefix_str = if let Some(ref p) = self.prefix {
158            format!("[{}]", p)
159        } else {
160            "[-]".to_string()
161        };
162
163        match event {
164            TraceEvent::Event {
165                level,
166                fields,
167                parent_span,
168                thread_id,
169                file,
170                line,
171                ..
172            } => {
173                let level_char = match *level {
174                    tracing::Level::ERROR => 'E',
175                    tracing::Level::WARN => 'W',
176                    tracing::Level::INFO => 'I',
177                    tracing::Level::DEBUG => 'D',
178                    tracing::Level::TRACE => 'T',
179                };
180
181                // [prefix]LMMDD HH:MM:SS.ffffff thread_id file:line] message, key:value, key:value
182                write!(
183                    &mut self.line_buffer,
184                    "{}{}{} {} ",
185                    prefix_str, level_char, timestamp_str, thread_id
186                )?;
187
188                if let (Some(f), Some(l)) = (file, line) {
189                    write!(&mut self.line_buffer, "{}:{}] ", f, l)?;
190                } else {
191                    write!(&mut self.line_buffer, "unknown:0] ")?;
192                }
193
194                if let Some(parent_id) = parent_span {
195                    self.write_span_context(*parent_id)?;
196                }
197
198                if let Some(v) = fields.get("message") {
199                    match v {
200                        FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
201                        FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
202                        _ => write!(&mut self.line_buffer, "event")?,
203                    }
204                } else {
205                    write!(&mut self.line_buffer, "event")?;
206                }
207
208                for (k, v) in fields.iter() {
209                    if k != "message" {
210                        write!(&mut self.line_buffer, ", {}:", k)?;
211                        match v {
212                            FieldValue::Bool(b) => write!(&mut self.line_buffer, "{}", b)?,
213                            FieldValue::I64(i) => write!(&mut self.line_buffer, "{}", i)?,
214                            FieldValue::U64(u) => write!(&mut self.line_buffer, "{}", u)?,
215                            FieldValue::F64(f) => write!(&mut self.line_buffer, "{}", f)?,
216                            FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
217                            FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
218                        }
219                    }
220                }
221
222                self.line_buffer.finish_line();
223
224                self.writer.write_all(self.line_buffer.as_bytes())?;
225            }
226
227            // write_event should only be called for Events, but handle gracefully
228            _ => {
229                self.line_buffer.clear();
230                write!(
231                    &mut self.line_buffer,
232                    "{}I{} - unknown:0] unexpected event type",
233                    prefix_str, timestamp_str
234                )?;
235                self.line_buffer.finish_line();
236                self.writer.write_all(self.line_buffer.as_bytes())?;
237            }
238        }
239
240        Ok(())
241    }
242
243    /// Writes span context into line_buffer: "[outer{field:value}, inner{field:value}] "
244    fn write_span_context(&mut self, span_id: u64) -> Result<()> {
245        let mut span_ids = Vec::new();
246        let mut current_id = Some(span_id);
247
248        while let Some(id) = current_id {
249            if let Some((_, _, parent_id)) = self.active_spans.get(&id) {
250                span_ids.push(id);
251                current_id = *parent_id;
252            } else {
253                break;
254            }
255        }
256        if span_ids.is_empty() {
257            return Ok(());
258        }
259
260        write!(&mut self.line_buffer, "[")?;
261
262        for (i, id) in span_ids.iter().rev().enumerate() {
263            if i > 0 {
264                write!(&mut self.line_buffer, ", ")?;
265            }
266
267            if let Some((name, fields, _)) = self.active_spans.get(id) {
268                write!(&mut self.line_buffer, "{}", name)?;
269                if !fields.is_empty() {
270                    write!(&mut self.line_buffer, "{{")?;
271
272                    let mut first = true;
273                    for (k, v) in fields.iter() {
274                        if !first {
275                            write!(&mut self.line_buffer, ", ")?;
276                        }
277                        first = false;
278                        write!(&mut self.line_buffer, "{}:", k)?;
279
280                        match v {
281                            FieldValue::Bool(b) => write!(&mut self.line_buffer, "{}", b)?,
282                            FieldValue::I64(i) => write!(&mut self.line_buffer, "{}", i)?,
283                            FieldValue::U64(u) => write!(&mut self.line_buffer, "{}", u)?,
284                            FieldValue::F64(f) => write!(&mut self.line_buffer, "{}", f)?,
285                            FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
286                            FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
287                        }
288                    }
289
290                    write!(&mut self.line_buffer, "}}")?;
291                }
292            }
293        }
294
295        write!(&mut self.line_buffer, "] ")?;
296        Ok(())
297    }
298}
299
300impl TraceEventSink for GlogSink {
301    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
302        match event {
303            // Track span lifecycle for context display (must happen even if we don't export spans)
304            TraceEvent::NewSpan {
305                id,
306                name,
307                fields,
308                parent_id,
309                ..
310            } => {
311                self.active_spans
312                    .insert(*id, (name.to_string(), fields.clone(), *parent_id));
313            }
314            TraceEvent::SpanClose { id, .. } => {
315                self.active_spans.remove(id);
316            }
317            TraceEvent::Event { .. } => {
318                self.write_event(event)?;
319            }
320            _ => {}
321        }
322        Ok(())
323    }
324
325    fn flush(&mut self) -> Result<(), anyhow::Error> {
326        self.writer.flush()?;
327        Ok(())
328    }
329
330    fn name(&self) -> &str {
331        "GlogSink"
332    }
333
334    fn target_filter(&self) -> Option<&Targets> {
335        Some(&self.targets)
336    }
337}
338
339#[cfg(test)]
340mod test {
341    use super::*;
342
343    #[test]
344    fn test_limited_buffer_truncation() {
345        let mut buf = LimitedBuffer::new(20);
346
347        write!(
348            &mut buf,
349            "Hello, this is a very long message that exceeds the limit"
350        )
351        .unwrap();
352        buf.finish_line();
353
354        let output = std::str::from_utf8(buf.as_bytes()).unwrap();
355
356        assert_eq!(output, "Hello, this is a ver...[truncated 37 chars]\n");
357    }
358
359    #[test]
360    fn test_limited_buffer_no_truncation() {
361        let mut buf = LimitedBuffer::new(50);
362
363        write!(&mut buf, "Short message").unwrap();
364        buf.finish_line();
365
366        let output = std::str::from_utf8(buf.as_bytes()).unwrap();
367
368        assert_eq!(output, "Short message\n");
369    }
370}