hyperactor_telemetry/sinks/
glog.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Glog-formatted text sink for trace events.
10//! Replicates the behavior of the fmt::Layer with glog formatting.
11
12use std::collections::HashMap;
13use std::fmt::Write as FmtWrite;
14use std::io::Write;
15use std::str::FromStr;
16
17use anyhow::Result;
18use tracing_core::LevelFilter;
19use tracing_subscriber::filter::Targets;
20
21use crate::config::MONARCH_FILE_LOG_LEVEL;
22use crate::trace_dispatcher::FieldValue;
23use crate::trace_dispatcher::TraceEvent;
24use crate::trace_dispatcher::TraceEventSink;
25use crate::trace_dispatcher::TraceFields;
26use crate::trace_dispatcher::get_field;
27
28const MAX_LINE_SIZE: usize = 4096;
29const TRUNCATION_SUFFIX_RESERVE: usize = 32;
30
31/// A string buffer that limits writes to a maximum size.
32/// Once the limit is reached, further writes are silently ignored and
33/// truncated chars are tracked for reporting.
34struct LimitedBuffer {
35    buffer: String,
36    /// Max bytes for content (excluding truncation suffix and newline).
37    limit: usize,
38    truncated_chars: usize,
39}
40
41impl LimitedBuffer {
42    fn new(limit: usize) -> Self {
43        Self {
44            buffer: String::with_capacity(limit + TRUNCATION_SUFFIX_RESERVE),
45            limit,
46            truncated_chars: 0,
47        }
48    }
49
50    fn clear(&mut self) {
51        self.buffer.clear();
52        self.truncated_chars = 0;
53    }
54
55    /// Write truncation suffix + add a newline
56    fn finish_line(&mut self) {
57        if self.truncated_chars > 0 {
58            use std::fmt::Write;
59            let _ = write!(
60                &mut self.buffer,
61                "...[truncated {} chars]",
62                self.truncated_chars
63            );
64        }
65        self.buffer.push('\n');
66    }
67
68    fn as_bytes(&self) -> &[u8] {
69        self.buffer.as_bytes()
70    }
71}
72
73impl FmtWrite for LimitedBuffer {
74    fn write_str(&mut self, s: &str) -> std::fmt::Result {
75        let remaining = self.limit.saturating_sub(self.buffer.len());
76        if remaining == 0 {
77            self.truncated_chars += s.chars().count();
78            return Ok(());
79        }
80        if s.len() <= remaining {
81            self.buffer.push_str(s);
82        } else {
83            let mut truncate_at = remaining;
84            while truncate_at > 0 && !s.is_char_boundary(truncate_at) {
85                truncate_at -= 1;
86            }
87            self.buffer.push_str(&s[..truncate_at]);
88            self.truncated_chars += s[truncate_at..].chars().count();
89        }
90        Ok(())
91    }
92}
93
94/// Glog sink that writes events in glog format to a file.
95/// This replaces the fmt::Layer that was previously used for text logging.
96///
97/// This only logs Events, not Spans (matching old fmt::Layer behavior).
98pub struct GlogSink {
99    writer: Box<dyn Write + Send>,
100    prefix: Option<String>,
101    /// Track active spans by ID with (name, fields, parent_id) to show span context in event logs
102    active_spans: HashMap<u64, (String, TraceFields, Option<u64>)>,
103    targets: Targets,
104    /// Reusable buffer for formatting log lines to ensure atomic writes.
105    /// We build the entire line in this buffer, then write it atomically to avoid
106    /// interleaving with other threads writing to the same fd (e.g., stderr).
107    line_buffer: LimitedBuffer,
108}
109
110impl GlogSink {
111    /// Create a new glog sink with the given writer.
112    ///
113    /// # Arguments
114    /// * `writer` - Writer to write log events to (used directly without buffering)
115    /// * `prefix_env_var` - Optional environment variable name to read prefix from (matching old impl)
116    /// * `file_log_level` - Minimum log level to capture (e.g., "info", "debug")
117    pub fn new(
118        writer: Box<dyn Write + Send>,
119        prefix_env_var: Option<String>,
120        file_log_level: &str,
121    ) -> Self {
122        let prefix = if let Some(ref env_var_name) = prefix_env_var {
123            std::env::var(env_var_name).ok()
124        } else {
125            None
126        };
127
128        Self {
129            writer,
130            prefix,
131            active_spans: HashMap::new(),
132            targets: Targets::new()
133                .with_default(LevelFilter::from_level({
134                    let log_level_str =
135                        hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
136                            .unwrap_or_else(|| file_log_level.to_string());
137                    tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
138                        tracing::Level::from_str(file_log_level).expect("Invalid default log level")
139                    })
140                }))
141                .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
142            line_buffer: LimitedBuffer::new(MAX_LINE_SIZE - TRUNCATION_SUFFIX_RESERVE),
143        }
144    }
145
146    fn write_event(&mut self, event: &TraceEvent) -> Result<()> {
147        self.line_buffer.clear();
148
149        let timestamp_str = match event {
150            TraceEvent::Event { timestamp, .. } => {
151                let datetime: chrono::DateTime<chrono::Local> = (*timestamp).into();
152                datetime.format("%m%d %H:%M:%S%.6f").to_string()
153            }
154            // write_event is only called for Events, but keep this for safety
155            _ => chrono::Local::now().format("%m%d %H:%M:%S%.6f").to_string(),
156        };
157
158        let prefix_str = if let Some(ref p) = self.prefix {
159            format!("[{}]", p)
160        } else {
161            "[-]".to_string()
162        };
163
164        match event {
165            TraceEvent::Event {
166                level,
167                fields,
168                parent_span,
169                thread_id,
170                file,
171                line,
172                ..
173            } => {
174                let level_char = match *level {
175                    tracing::Level::ERROR => 'E',
176                    tracing::Level::WARN => 'W',
177                    tracing::Level::INFO => 'I',
178                    tracing::Level::DEBUG => 'D',
179                    tracing::Level::TRACE => 'T',
180                };
181
182                // [prefix]LMMDD HH:MM:SS.ffffff thread_id file:line] message, key:value, key:value
183                write!(
184                    &mut self.line_buffer,
185                    "{}{}{} {} ",
186                    prefix_str, level_char, timestamp_str, thread_id
187                )?;
188
189                if let (Some(f), Some(l)) = (file, line) {
190                    write!(&mut self.line_buffer, "{}:{}] ", f, l)?;
191                } else {
192                    write!(&mut self.line_buffer, "unknown:0] ")?;
193                }
194
195                if let Some(parent_id) = parent_span {
196                    self.write_span_context(*parent_id)?;
197                }
198
199                if let Some(v) = get_field(fields, "message") {
200                    match v {
201                        FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
202                        FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
203                        _ => write!(&mut self.line_buffer, "event")?,
204                    }
205                } else {
206                    write!(&mut self.line_buffer, "event")?;
207                }
208
209                for (k, v) in fields.iter() {
210                    if *k != "message" {
211                        write!(&mut self.line_buffer, ", {}:", k)?;
212                        match v {
213                            FieldValue::Bool(b) => write!(&mut self.line_buffer, "{}", b)?,
214                            FieldValue::I64(i) => write!(&mut self.line_buffer, "{}", i)?,
215                            FieldValue::U64(u) => write!(&mut self.line_buffer, "{}", u)?,
216                            FieldValue::F64(f) => write!(&mut self.line_buffer, "{}", f)?,
217                            FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
218                            FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
219                        }
220                    }
221                }
222
223                self.line_buffer.finish_line();
224
225                self.writer.write_all(self.line_buffer.as_bytes())?;
226            }
227
228            // write_event should only be called for Events, but handle gracefully
229            _ => {
230                self.line_buffer.clear();
231                write!(
232                    &mut self.line_buffer,
233                    "{}I{} - unknown:0] unexpected event type",
234                    prefix_str, timestamp_str
235                )?;
236                self.line_buffer.finish_line();
237                self.writer.write_all(self.line_buffer.as_bytes())?;
238            }
239        }
240
241        Ok(())
242    }
243
244    /// Writes span context into line_buffer: "[outer{field:value}, inner{field:value}] "
245    fn write_span_context(&mut self, span_id: u64) -> Result<()> {
246        let mut span_ids = Vec::new();
247        let mut current_id = Some(span_id);
248
249        while let Some(id) = current_id {
250            if let Some((_, _, parent_id)) = self.active_spans.get(&id) {
251                span_ids.push(id);
252                current_id = *parent_id;
253            } else {
254                break;
255            }
256        }
257        if span_ids.is_empty() {
258            return Ok(());
259        }
260
261        write!(&mut self.line_buffer, "[")?;
262
263        for (i, id) in span_ids.iter().rev().enumerate() {
264            if i > 0 {
265                write!(&mut self.line_buffer, ", ")?;
266            }
267
268            if let Some((name, fields, _)) = self.active_spans.get(id) {
269                write!(&mut self.line_buffer, "{}", name)?;
270                if !fields.is_empty() {
271                    write!(&mut self.line_buffer, "{{")?;
272
273                    let mut first = true;
274                    for (k, v) in fields.iter() {
275                        if !first {
276                            write!(&mut self.line_buffer, ", ")?;
277                        }
278                        first = false;
279                        write!(&mut self.line_buffer, "{}:", k)?;
280
281                        match v {
282                            FieldValue::Bool(b) => write!(&mut self.line_buffer, "{}", b)?,
283                            FieldValue::I64(i) => write!(&mut self.line_buffer, "{}", i)?,
284                            FieldValue::U64(u) => write!(&mut self.line_buffer, "{}", u)?,
285                            FieldValue::F64(f) => write!(&mut self.line_buffer, "{}", f)?,
286                            FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
287                            FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
288                        }
289                    }
290
291                    write!(&mut self.line_buffer, "}}")?;
292                }
293            }
294        }
295
296        write!(&mut self.line_buffer, "] ")?;
297        Ok(())
298    }
299}
300
301impl TraceEventSink for GlogSink {
302    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
303        match event {
304            // Track span lifecycle for context display (must happen even if we don't export spans)
305            TraceEvent::NewSpan {
306                id,
307                name,
308                fields,
309                parent_id,
310                ..
311            } => {
312                self.active_spans
313                    .insert(*id, (name.to_string(), fields.clone(), *parent_id));
314            }
315            TraceEvent::SpanClose { id, .. } => {
316                self.active_spans.remove(id);
317            }
318            TraceEvent::Event { .. } => {
319                self.write_event(event)?;
320            }
321            _ => {}
322        }
323        Ok(())
324    }
325
326    fn flush(&mut self) -> Result<(), anyhow::Error> {
327        self.writer.flush()?;
328        Ok(())
329    }
330
331    fn name(&self) -> &str {
332        "GlogSink"
333    }
334
335    fn target_filter(&self) -> Option<&Targets> {
336        Some(&self.targets)
337    }
338}
339
340#[cfg(test)]
341mod test {
342    use super::*;
343
344    #[test]
345    fn test_limited_buffer_truncation() {
346        let mut buf = LimitedBuffer::new(20);
347
348        write!(
349            &mut buf,
350            "Hello, this is a very long message that exceeds the limit"
351        )
352        .unwrap();
353        buf.finish_line();
354
355        let output = std::str::from_utf8(buf.as_bytes()).unwrap();
356
357        assert_eq!(output, "Hello, this is a ver...[truncated 37 chars]\n");
358    }
359
360    #[test]
361    fn test_limited_buffer_no_truncation() {
362        let mut buf = LimitedBuffer::new(50);
363
364        write!(&mut buf, "Short message").unwrap();
365        buf.finish_line();
366
367        let output = std::str::from_utf8(buf.as_bytes()).unwrap();
368
369        assert_eq!(output, "Short message\n");
370    }
371}