Skip to main content

hyperactor_telemetry/sinks/
glog.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Glog-formatted text sink for trace events.
10//! Replicates the behavior of the fmt::Layer with glog formatting.
11
12use std::collections::HashMap;
13use std::fmt::Write as FmtWrite;
14use std::io::Write;
15use std::str::FromStr;
16
17use anyhow::Result;
18use tracing_core::LevelFilter;
19use tracing_subscriber::filter::Targets;
20
21use crate::config::MONARCH_FILE_LOG_LEVEL;
22use crate::trace_dispatcher::FieldValue;
23use crate::trace_dispatcher::TraceEvent;
24use crate::trace_dispatcher::TraceEventSink;
25use crate::trace_dispatcher::TraceFields;
26use crate::trace_dispatcher::get_field;
27
28const MAX_LINE_SIZE: usize = 65536;
29const TRUNCATION_SUFFIX_RESERVE: usize = 32;
30
31/// A string buffer that limits writes to a maximum size.
32/// Once the limit is reached, further writes are silently ignored and
33/// truncated chars are tracked for reporting.
34struct LimitedBuffer {
35    buffer: String,
36    /// Max bytes for content (excluding truncation suffix and newline).
37    limit: usize,
38    truncated_chars: usize,
39}
40
41impl LimitedBuffer {
42    fn new(limit: usize) -> Self {
43        Self {
44            buffer: String::with_capacity(limit + TRUNCATION_SUFFIX_RESERVE),
45            limit,
46            truncated_chars: 0,
47        }
48    }
49
50    fn clear(&mut self) {
51        self.buffer.clear();
52        self.truncated_chars = 0;
53    }
54
55    /// Write truncation suffix + add a newline
56    fn finish_line(&mut self) {
57        if self.truncated_chars > 0 {
58            use std::fmt::Write;
59            let _ = write!(
60                &mut self.buffer,
61                "...[truncated {} chars]",
62                self.truncated_chars
63            );
64        }
65        self.buffer.push('\n');
66    }
67
68    fn as_bytes(&self) -> &[u8] {
69        self.buffer.as_bytes()
70    }
71}
72
73impl FmtWrite for LimitedBuffer {
74    fn write_str(&mut self, s: &str) -> std::fmt::Result {
75        let remaining = self.limit.saturating_sub(self.buffer.len());
76        if remaining == 0 {
77            self.truncated_chars += s.chars().count();
78            return Ok(());
79        }
80        if s.len() <= remaining {
81            self.buffer.push_str(s);
82        } else {
83            let mut truncate_at = remaining;
84            while truncate_at > 0 && !s.is_char_boundary(truncate_at) {
85                truncate_at -= 1;
86            }
87            self.buffer.push_str(&s[..truncate_at]);
88            self.truncated_chars += s[truncate_at..].chars().count();
89        }
90        Ok(())
91    }
92}
93
94/// Glog sink that writes events in glog format to a file.
95/// This replaces the fmt::Layer that was previously used for text logging.
96///
97/// This only logs Events, not Spans (matching old fmt::Layer behavior).
98pub struct GlogSink {
99    writer: Box<dyn Write + Send>,
100    prefix: Option<String>,
101    /// Track active spans by ID with (name, fields, parent_id) to show span context in event logs
102    active_spans: HashMap<u64, (String, TraceFields, Option<u64>)>,
103    targets: Targets,
104    /// Reusable buffer for formatting log lines to ensure atomic writes.
105    /// We build the entire line in this buffer, then write it atomically to avoid
106    /// interleaving with other threads writing to the same fd (e.g., stderr).
107    line_buffer: LimitedBuffer,
108}
109
110impl GlogSink {
111    /// Create a new glog sink with the given writer.
112    ///
113    /// # Arguments
114    /// * `writer` - Writer to write log events to (used directly without buffering)
115    /// * `prefix_env_var` - Optional environment variable name to read prefix from (matching old impl)
116    /// * `file_log_level` - Minimum log level to capture (e.g., "info", "debug")
117    pub fn new(
118        writer: Box<dyn Write + Send>,
119        prefix_env_var: Option<String>,
120        file_log_level: &str,
121    ) -> Self {
122        let prefix = if let Some(ref env_var_name) = prefix_env_var {
123            std::env::var(env_var_name).ok()
124        } else {
125            None
126        };
127
128        Self {
129            writer,
130            prefix,
131            active_spans: HashMap::new(),
132            targets: Targets::new()
133                .with_default(LevelFilter::from_level({
134                    let log_level_str =
135                        hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
136                            .unwrap_or_else(|| file_log_level.to_string());
137                    tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
138                        tracing::Level::from_str(file_log_level).expect("Invalid default log level")
139                    })
140                }))
141                .with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
142            line_buffer: LimitedBuffer::new(MAX_LINE_SIZE - TRUNCATION_SUFFIX_RESERVE),
143        }
144    }
145
146    fn write_event(&mut self, event: &TraceEvent) -> Result<()> {
147        self.line_buffer.clear();
148
149        let timestamp_str = match event {
150            TraceEvent::Event { timestamp, .. } => {
151                let datetime: chrono::DateTime<chrono::Local> = (*timestamp).into();
152                datetime.format("%m%d %H:%M:%S%.6f").to_string()
153            }
154            // write_event is only called for Events, but keep this for safety
155            _ => chrono::Local::now().format("%m%d %H:%M:%S%.6f").to_string(),
156        };
157
158        let prefix_str = if let Some(ref p) = self.prefix {
159            format!("[{}]", p)
160        } else {
161            "[-]".to_string()
162        };
163
164        match event {
165            TraceEvent::Event {
166                level,
167                fields,
168                parent_span,
169                thread_id,
170                file,
171                line,
172                ..
173            } => {
174                let level_char = match *level {
175                    tracing::Level::ERROR => 'E',
176                    tracing::Level::WARN => 'W',
177                    tracing::Level::INFO => 'I',
178                    tracing::Level::DEBUG => 'D',
179                    tracing::Level::TRACE => 'T',
180                };
181
182                // [prefix]LMMDD HH:MM:SS.ffffff thread_id file:line] message, key:value, key:value
183                write!(
184                    &mut self.line_buffer,
185                    "{}{}{} {} ",
186                    prefix_str, level_char, timestamp_str, thread_id
187                )?;
188
189                if let (Some(f), Some(l)) = (file, line) {
190                    write!(&mut self.line_buffer, "{}:{}] ", f, l)?;
191                } else {
192                    write!(&mut self.line_buffer, "unknown:0] ")?;
193                }
194
195                // Render subject as a prefix. Check event fields first,
196                // then fall back to the enclosing span chain.
197                if let Some(subject) = get_field(fields, crate::SUBJECT_KEY) {
198                    Self::write_subject(&mut self.line_buffer, subject)?;
199                } else if let Some(parent_id) = parent_span {
200                    self.write_span_context(*parent_id)?;
201                }
202
203                if let Some(v) = get_field(fields, "message") {
204                    match v {
205                        FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
206                        FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
207                        _ => write!(&mut self.line_buffer, "event")?,
208                    }
209                } else {
210                    write!(&mut self.line_buffer, "event")?;
211                }
212
213                let max_key_len = fields
214                    .iter()
215                    .filter(|(k, _)| *k != "message" && *k != crate::SUBJECT_KEY)
216                    .map(|(k, _)| k.len())
217                    .max()
218                    .unwrap_or(0);
219
220                for (k, v) in fields.iter() {
221                    if *k != "message" && *k != crate::SUBJECT_KEY {
222                        let pad = max_key_len - k.len() + 1;
223                        write!(&mut self.line_buffer, "\n    {k}:{:pad$}", "")?;
224                        match v {
225                            FieldValue::Bool(b) => write!(&mut self.line_buffer, "{}", b)?,
226                            FieldValue::I64(i) => write!(&mut self.line_buffer, "{}", i)?,
227                            FieldValue::U64(u) => write!(&mut self.line_buffer, "{}", u)?,
228                            FieldValue::F64(f) => write!(&mut self.line_buffer, "{}", f)?,
229                            FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
230                            FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
231                        }
232                    }
233                }
234
235                self.line_buffer.finish_line();
236
237                self.writer.write_all(self.line_buffer.as_bytes())?;
238            }
239
240            // write_event should only be called for Events, but handle gracefully
241            _ => {
242                self.line_buffer.clear();
243                write!(
244                    &mut self.line_buffer,
245                    "{}I{} - unknown:0] unexpected event type",
246                    prefix_str, timestamp_str
247                )?;
248                self.line_buffer.finish_line();
249                self.writer.write_all(self.line_buffer.as_bytes())?;
250            }
251        }
252
253        Ok(())
254    }
255
256    fn write_subject(buf: &mut LimitedBuffer, value: &FieldValue) -> Result<()> {
257        match value {
258            FieldValue::Str(s) => write!(buf, "{} ", s)?,
259            FieldValue::Debug(s) => write!(buf, "{} ", s)?,
260            _ => {}
261        }
262        Ok(())
263    }
264
265    /// Walks the span chain looking for a `subject` field. If found,
266    /// renders it as a prefix (e.g., `<actor id> `). If no subject is
267    /// found, no span context is rendered.
268    fn write_span_context(&mut self, span_id: u64) -> Result<()> {
269        let mut current_id = Some(span_id);
270        while let Some(id) = current_id {
271            if let Some((_, fields, parent_id)) = self.active_spans.get(&id) {
272                if let Some(subject) = get_field(fields, crate::SUBJECT_KEY) {
273                    Self::write_subject(&mut self.line_buffer, subject)?;
274                    return Ok(());
275                }
276                current_id = *parent_id;
277            } else {
278                break;
279            }
280        }
281        Ok(())
282    }
283}
284
285impl TraceEventSink for GlogSink {
286    fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
287        match event {
288            // Track span lifecycle for context display (must happen even if we don't export spans)
289            TraceEvent::NewSpan {
290                id,
291                name,
292                fields,
293                parent_id,
294                ..
295            } => {
296                self.active_spans
297                    .insert(*id, (name.to_string(), fields.clone(), *parent_id));
298            }
299            TraceEvent::SpanClose { id, .. } => {
300                self.active_spans.remove(id);
301            }
302            TraceEvent::Event { .. } => {
303                self.write_event(event)?;
304            }
305            _ => {}
306        }
307        Ok(())
308    }
309
310    fn flush(&mut self) -> Result<(), anyhow::Error> {
311        self.writer.flush()?;
312        Ok(())
313    }
314
315    fn name(&self) -> &str {
316        "GlogSink"
317    }
318
319    fn target_filter(&self) -> Option<&Targets> {
320        Some(&self.targets)
321    }
322}
323
324#[cfg(test)]
325mod test {
326    use super::*;
327
328    #[test]
329    fn test_limited_buffer_truncation() {
330        let mut buf = LimitedBuffer::new(20);
331
332        write!(
333            &mut buf,
334            "Hello, this is a very long message that exceeds the limit"
335        )
336        .unwrap();
337        buf.finish_line();
338
339        let output = std::str::from_utf8(buf.as_bytes()).unwrap();
340
341        assert_eq!(output, "Hello, this is a ver...[truncated 37 chars]\n");
342    }
343
344    #[test]
345    fn test_limited_buffer_no_truncation() {
346        let mut buf = LimitedBuffer::new(50);
347
348        write!(&mut buf, "Short message").unwrap();
349        buf.finish_line();
350
351        let output = std::str::from_utf8(buf.as_bytes()).unwrap();
352
353        assert_eq!(output, "Short message\n");
354    }
355}