hyperactor_telemetry/sinks/
glog.rs1use std::collections::HashMap;
13use std::fmt::Write as FmtWrite;
14use std::io::Write;
15use std::str::FromStr;
16
17use anyhow::Result;
18use tracing_core::LevelFilter;
19use tracing_subscriber::filter::Targets;
20
21use crate::config::MONARCH_FILE_LOG_LEVEL;
22use crate::trace_dispatcher::FieldValue;
23use crate::trace_dispatcher::TraceEvent;
24use crate::trace_dispatcher::TraceEventSink;
25use crate::trace_dispatcher::TraceFields;
26use crate::trace_dispatcher::get_field;
27
28const MAX_LINE_SIZE: usize = 65536;
29const TRUNCATION_SUFFIX_RESERVE: usize = 32;
30
31struct LimitedBuffer {
35 buffer: String,
36 limit: usize,
38 truncated_chars: usize,
39}
40
41impl LimitedBuffer {
42 fn new(limit: usize) -> Self {
43 Self {
44 buffer: String::with_capacity(limit + TRUNCATION_SUFFIX_RESERVE),
45 limit,
46 truncated_chars: 0,
47 }
48 }
49
50 fn clear(&mut self) {
51 self.buffer.clear();
52 self.truncated_chars = 0;
53 }
54
55 fn finish_line(&mut self) {
57 if self.truncated_chars > 0 {
58 use std::fmt::Write;
59 let _ = write!(
60 &mut self.buffer,
61 "...[truncated {} chars]",
62 self.truncated_chars
63 );
64 }
65 self.buffer.push('\n');
66 }
67
68 fn as_bytes(&self) -> &[u8] {
69 self.buffer.as_bytes()
70 }
71}
72
73impl FmtWrite for LimitedBuffer {
74 fn write_str(&mut self, s: &str) -> std::fmt::Result {
75 let remaining = self.limit.saturating_sub(self.buffer.len());
76 if remaining == 0 {
77 self.truncated_chars += s.chars().count();
78 return Ok(());
79 }
80 if s.len() <= remaining {
81 self.buffer.push_str(s);
82 } else {
83 let mut truncate_at = remaining;
84 while truncate_at > 0 && !s.is_char_boundary(truncate_at) {
85 truncate_at -= 1;
86 }
87 self.buffer.push_str(&s[..truncate_at]);
88 self.truncated_chars += s[truncate_at..].chars().count();
89 }
90 Ok(())
91 }
92}
93
94pub struct GlogSink {
99 writer: Box<dyn Write + Send>,
100 prefix: Option<String>,
101 active_spans: HashMap<u64, (String, TraceFields, Option<u64>)>,
103 targets: Targets,
104 line_buffer: LimitedBuffer,
108}
109
110impl GlogSink {
111 pub fn new(
118 writer: Box<dyn Write + Send>,
119 prefix_env_var: Option<String>,
120 file_log_level: &str,
121 ) -> Self {
122 let prefix = if let Some(ref env_var_name) = prefix_env_var {
123 std::env::var(env_var_name).ok()
124 } else {
125 None
126 };
127
128 Self {
129 writer,
130 prefix,
131 active_spans: HashMap::new(),
132 targets: Targets::new()
133 .with_default(LevelFilter::from_level({
134 let log_level_str =
135 hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
136 .unwrap_or_else(|| file_log_level.to_string());
137 tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
138 tracing::Level::from_str(file_log_level).expect("Invalid default log level")
139 })
140 }))
141 .with_target("opentelemetry", LevelFilter::OFF), line_buffer: LimitedBuffer::new(MAX_LINE_SIZE - TRUNCATION_SUFFIX_RESERVE),
143 }
144 }
145
146 fn write_event(&mut self, event: &TraceEvent) -> Result<()> {
147 self.line_buffer.clear();
148
149 let timestamp_str = match event {
150 TraceEvent::Event { timestamp, .. } => {
151 let datetime: chrono::DateTime<chrono::Local> = (*timestamp).into();
152 datetime.format("%m%d %H:%M:%S%.6f").to_string()
153 }
154 _ => chrono::Local::now().format("%m%d %H:%M:%S%.6f").to_string(),
156 };
157
158 let prefix_str = if let Some(ref p) = self.prefix {
159 format!("[{}]", p)
160 } else {
161 "[-]".to_string()
162 };
163
164 match event {
165 TraceEvent::Event {
166 level,
167 fields,
168 parent_span,
169 thread_id,
170 file,
171 line,
172 ..
173 } => {
174 let level_char = match *level {
175 tracing::Level::ERROR => 'E',
176 tracing::Level::WARN => 'W',
177 tracing::Level::INFO => 'I',
178 tracing::Level::DEBUG => 'D',
179 tracing::Level::TRACE => 'T',
180 };
181
182 write!(
184 &mut self.line_buffer,
185 "{}{}{} {} ",
186 prefix_str, level_char, timestamp_str, thread_id
187 )?;
188
189 if let (Some(f), Some(l)) = (file, line) {
190 write!(&mut self.line_buffer, "{}:{}] ", f, l)?;
191 } else {
192 write!(&mut self.line_buffer, "unknown:0] ")?;
193 }
194
195 if let Some(subject) = get_field(fields, crate::SUBJECT_KEY) {
198 Self::write_subject(&mut self.line_buffer, subject)?;
199 } else if let Some(parent_id) = parent_span {
200 self.write_span_context(*parent_id)?;
201 }
202
203 if let Some(v) = get_field(fields, "message") {
204 match v {
205 FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
206 FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
207 _ => write!(&mut self.line_buffer, "event")?,
208 }
209 } else {
210 write!(&mut self.line_buffer, "event")?;
211 }
212
213 let max_key_len = fields
214 .iter()
215 .filter(|(k, _)| *k != "message" && *k != crate::SUBJECT_KEY)
216 .map(|(k, _)| k.len())
217 .max()
218 .unwrap_or(0);
219
220 for (k, v) in fields.iter() {
221 if *k != "message" && *k != crate::SUBJECT_KEY {
222 let pad = max_key_len - k.len() + 1;
223 write!(&mut self.line_buffer, "\n {k}:{:pad$}", "")?;
224 match v {
225 FieldValue::Bool(b) => write!(&mut self.line_buffer, "{}", b)?,
226 FieldValue::I64(i) => write!(&mut self.line_buffer, "{}", i)?,
227 FieldValue::U64(u) => write!(&mut self.line_buffer, "{}", u)?,
228 FieldValue::F64(f) => write!(&mut self.line_buffer, "{}", f)?,
229 FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
230 FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
231 }
232 }
233 }
234
235 self.line_buffer.finish_line();
236
237 self.writer.write_all(self.line_buffer.as_bytes())?;
238 }
239
240 _ => {
242 self.line_buffer.clear();
243 write!(
244 &mut self.line_buffer,
245 "{}I{} - unknown:0] unexpected event type",
246 prefix_str, timestamp_str
247 )?;
248 self.line_buffer.finish_line();
249 self.writer.write_all(self.line_buffer.as_bytes())?;
250 }
251 }
252
253 Ok(())
254 }
255
256 fn write_subject(buf: &mut LimitedBuffer, value: &FieldValue) -> Result<()> {
257 match value {
258 FieldValue::Str(s) => write!(buf, "{} ", s)?,
259 FieldValue::Debug(s) => write!(buf, "{} ", s)?,
260 _ => {}
261 }
262 Ok(())
263 }
264
265 fn write_span_context(&mut self, span_id: u64) -> Result<()> {
269 let mut current_id = Some(span_id);
270 while let Some(id) = current_id {
271 if let Some((_, fields, parent_id)) = self.active_spans.get(&id) {
272 if let Some(subject) = get_field(fields, crate::SUBJECT_KEY) {
273 Self::write_subject(&mut self.line_buffer, subject)?;
274 return Ok(());
275 }
276 current_id = *parent_id;
277 } else {
278 break;
279 }
280 }
281 Ok(())
282 }
283}
284
285impl TraceEventSink for GlogSink {
286 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
287 match event {
288 TraceEvent::NewSpan {
290 id,
291 name,
292 fields,
293 parent_id,
294 ..
295 } => {
296 self.active_spans
297 .insert(*id, (name.to_string(), fields.clone(), *parent_id));
298 }
299 TraceEvent::SpanClose { id, .. } => {
300 self.active_spans.remove(id);
301 }
302 TraceEvent::Event { .. } => {
303 self.write_event(event)?;
304 }
305 _ => {}
306 }
307 Ok(())
308 }
309
310 fn flush(&mut self) -> Result<(), anyhow::Error> {
311 self.writer.flush()?;
312 Ok(())
313 }
314
315 fn name(&self) -> &str {
316 "GlogSink"
317 }
318
319 fn target_filter(&self) -> Option<&Targets> {
320 Some(&self.targets)
321 }
322}
323
324#[cfg(test)]
325mod test {
326 use super::*;
327
328 #[test]
329 fn test_limited_buffer_truncation() {
330 let mut buf = LimitedBuffer::new(20);
331
332 write!(
333 &mut buf,
334 "Hello, this is a very long message that exceeds the limit"
335 )
336 .unwrap();
337 buf.finish_line();
338
339 let output = std::str::from_utf8(buf.as_bytes()).unwrap();
340
341 assert_eq!(output, "Hello, this is a ver...[truncated 37 chars]\n");
342 }
343
344 #[test]
345 fn test_limited_buffer_no_truncation() {
346 let mut buf = LimitedBuffer::new(50);
347
348 write!(&mut buf, "Short message").unwrap();
349 buf.finish_line();
350
351 let output = std::str::from_utf8(buf.as_bytes()).unwrap();
352
353 assert_eq!(output, "Short message\n");
354 }
355}