hyperactor_telemetry/sinks/
glog.rs1use std::collections::HashMap;
13use std::fmt::Write as FmtWrite;
14use std::io::Write;
15use std::str::FromStr;
16
17use anyhow::Result;
18use tracing_core::LevelFilter;
19use tracing_subscriber::filter::Targets;
20
21use crate::config::MONARCH_FILE_LOG_LEVEL;
22use crate::trace_dispatcher::FieldValue;
23use crate::trace_dispatcher::TraceEvent;
24use crate::trace_dispatcher::TraceEventSink;
25use crate::trace_dispatcher::TraceFields;
26use crate::trace_dispatcher::get_field;
27
28const MAX_LINE_SIZE: usize = 4096;
29const TRUNCATION_SUFFIX_RESERVE: usize = 32;
30
31struct LimitedBuffer {
35 buffer: String,
36 limit: usize,
38 truncated_chars: usize,
39}
40
41impl LimitedBuffer {
42 fn new(limit: usize) -> Self {
43 Self {
44 buffer: String::with_capacity(limit + TRUNCATION_SUFFIX_RESERVE),
45 limit,
46 truncated_chars: 0,
47 }
48 }
49
50 fn clear(&mut self) {
51 self.buffer.clear();
52 self.truncated_chars = 0;
53 }
54
55 fn finish_line(&mut self) {
57 if self.truncated_chars > 0 {
58 use std::fmt::Write;
59 let _ = write!(
60 &mut self.buffer,
61 "...[truncated {} chars]",
62 self.truncated_chars
63 );
64 }
65 self.buffer.push('\n');
66 }
67
68 fn as_bytes(&self) -> &[u8] {
69 self.buffer.as_bytes()
70 }
71}
72
73impl FmtWrite for LimitedBuffer {
74 fn write_str(&mut self, s: &str) -> std::fmt::Result {
75 let remaining = self.limit.saturating_sub(self.buffer.len());
76 if remaining == 0 {
77 self.truncated_chars += s.chars().count();
78 return Ok(());
79 }
80 if s.len() <= remaining {
81 self.buffer.push_str(s);
82 } else {
83 let mut truncate_at = remaining;
84 while truncate_at > 0 && !s.is_char_boundary(truncate_at) {
85 truncate_at -= 1;
86 }
87 self.buffer.push_str(&s[..truncate_at]);
88 self.truncated_chars += s[truncate_at..].chars().count();
89 }
90 Ok(())
91 }
92}
93
94pub struct GlogSink {
99 writer: Box<dyn Write + Send>,
100 prefix: Option<String>,
101 active_spans: HashMap<u64, (String, TraceFields, Option<u64>)>,
103 targets: Targets,
104 line_buffer: LimitedBuffer,
108}
109
110impl GlogSink {
111 pub fn new(
118 writer: Box<dyn Write + Send>,
119 prefix_env_var: Option<String>,
120 file_log_level: &str,
121 ) -> Self {
122 let prefix = if let Some(ref env_var_name) = prefix_env_var {
123 std::env::var(env_var_name).ok()
124 } else {
125 None
126 };
127
128 Self {
129 writer,
130 prefix,
131 active_spans: HashMap::new(),
132 targets: Targets::new()
133 .with_default(LevelFilter::from_level({
134 let log_level_str =
135 hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
136 .unwrap_or_else(|| file_log_level.to_string());
137 tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
138 tracing::Level::from_str(file_log_level).expect("Invalid default log level")
139 })
140 }))
141 .with_target("opentelemetry", LevelFilter::OFF), line_buffer: LimitedBuffer::new(MAX_LINE_SIZE - TRUNCATION_SUFFIX_RESERVE),
143 }
144 }
145
146 fn write_event(&mut self, event: &TraceEvent) -> Result<()> {
147 self.line_buffer.clear();
148
149 let timestamp_str = match event {
150 TraceEvent::Event { timestamp, .. } => {
151 let datetime: chrono::DateTime<chrono::Local> = (*timestamp).into();
152 datetime.format("%m%d %H:%M:%S%.6f").to_string()
153 }
154 _ => chrono::Local::now().format("%m%d %H:%M:%S%.6f").to_string(),
156 };
157
158 let prefix_str = if let Some(ref p) = self.prefix {
159 format!("[{}]", p)
160 } else {
161 "[-]".to_string()
162 };
163
164 match event {
165 TraceEvent::Event {
166 level,
167 fields,
168 parent_span,
169 thread_id,
170 file,
171 line,
172 ..
173 } => {
174 let level_char = match *level {
175 tracing::Level::ERROR => 'E',
176 tracing::Level::WARN => 'W',
177 tracing::Level::INFO => 'I',
178 tracing::Level::DEBUG => 'D',
179 tracing::Level::TRACE => 'T',
180 };
181
182 write!(
184 &mut self.line_buffer,
185 "{}{}{} {} ",
186 prefix_str, level_char, timestamp_str, thread_id
187 )?;
188
189 if let (Some(f), Some(l)) = (file, line) {
190 write!(&mut self.line_buffer, "{}:{}] ", f, l)?;
191 } else {
192 write!(&mut self.line_buffer, "unknown:0] ")?;
193 }
194
195 if let Some(parent_id) = parent_span {
196 self.write_span_context(*parent_id)?;
197 }
198
199 if let Some(v) = get_field(fields, "message") {
200 match v {
201 FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
202 FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
203 _ => write!(&mut self.line_buffer, "event")?,
204 }
205 } else {
206 write!(&mut self.line_buffer, "event")?;
207 }
208
209 for (k, v) in fields.iter() {
210 if *k != "message" {
211 write!(&mut self.line_buffer, ", {}:", k)?;
212 match v {
213 FieldValue::Bool(b) => write!(&mut self.line_buffer, "{}", b)?,
214 FieldValue::I64(i) => write!(&mut self.line_buffer, "{}", i)?,
215 FieldValue::U64(u) => write!(&mut self.line_buffer, "{}", u)?,
216 FieldValue::F64(f) => write!(&mut self.line_buffer, "{}", f)?,
217 FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
218 FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
219 }
220 }
221 }
222
223 self.line_buffer.finish_line();
224
225 self.writer.write_all(self.line_buffer.as_bytes())?;
226 }
227
228 _ => {
230 self.line_buffer.clear();
231 write!(
232 &mut self.line_buffer,
233 "{}I{} - unknown:0] unexpected event type",
234 prefix_str, timestamp_str
235 )?;
236 self.line_buffer.finish_line();
237 self.writer.write_all(self.line_buffer.as_bytes())?;
238 }
239 }
240
241 Ok(())
242 }
243
244 fn write_span_context(&mut self, span_id: u64) -> Result<()> {
246 let mut span_ids = Vec::new();
247 let mut current_id = Some(span_id);
248
249 while let Some(id) = current_id {
250 if let Some((_, _, parent_id)) = self.active_spans.get(&id) {
251 span_ids.push(id);
252 current_id = *parent_id;
253 } else {
254 break;
255 }
256 }
257 if span_ids.is_empty() {
258 return Ok(());
259 }
260
261 write!(&mut self.line_buffer, "[")?;
262
263 for (i, id) in span_ids.iter().rev().enumerate() {
264 if i > 0 {
265 write!(&mut self.line_buffer, ", ")?;
266 }
267
268 if let Some((name, fields, _)) = self.active_spans.get(id) {
269 write!(&mut self.line_buffer, "{}", name)?;
270 if !fields.is_empty() {
271 write!(&mut self.line_buffer, "{{")?;
272
273 let mut first = true;
274 for (k, v) in fields.iter() {
275 if !first {
276 write!(&mut self.line_buffer, ", ")?;
277 }
278 first = false;
279 write!(&mut self.line_buffer, "{}:", k)?;
280
281 match v {
282 FieldValue::Bool(b) => write!(&mut self.line_buffer, "{}", b)?,
283 FieldValue::I64(i) => write!(&mut self.line_buffer, "{}", i)?,
284 FieldValue::U64(u) => write!(&mut self.line_buffer, "{}", u)?,
285 FieldValue::F64(f) => write!(&mut self.line_buffer, "{}", f)?,
286 FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
287 FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
288 }
289 }
290
291 write!(&mut self.line_buffer, "}}")?;
292 }
293 }
294 }
295
296 write!(&mut self.line_buffer, "] ")?;
297 Ok(())
298 }
299}
300
301impl TraceEventSink for GlogSink {
302 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
303 match event {
304 TraceEvent::NewSpan {
306 id,
307 name,
308 fields,
309 parent_id,
310 ..
311 } => {
312 self.active_spans
313 .insert(*id, (name.to_string(), fields.clone(), *parent_id));
314 }
315 TraceEvent::SpanClose { id, .. } => {
316 self.active_spans.remove(id);
317 }
318 TraceEvent::Event { .. } => {
319 self.write_event(event)?;
320 }
321 _ => {}
322 }
323 Ok(())
324 }
325
326 fn flush(&mut self) -> Result<(), anyhow::Error> {
327 self.writer.flush()?;
328 Ok(())
329 }
330
331 fn name(&self) -> &str {
332 "GlogSink"
333 }
334
335 fn target_filter(&self) -> Option<&Targets> {
336 Some(&self.targets)
337 }
338}
339
340#[cfg(test)]
341mod test {
342 use super::*;
343
344 #[test]
345 fn test_limited_buffer_truncation() {
346 let mut buf = LimitedBuffer::new(20);
347
348 write!(
349 &mut buf,
350 "Hello, this is a very long message that exceeds the limit"
351 )
352 .unwrap();
353 buf.finish_line();
354
355 let output = std::str::from_utf8(buf.as_bytes()).unwrap();
356
357 assert_eq!(output, "Hello, this is a ver...[truncated 37 chars]\n");
358 }
359
360 #[test]
361 fn test_limited_buffer_no_truncation() {
362 let mut buf = LimitedBuffer::new(50);
363
364 write!(&mut buf, "Short message").unwrap();
365 buf.finish_line();
366
367 let output = std::str::from_utf8(buf.as_bytes()).unwrap();
368
369 assert_eq!(output, "Short message\n");
370 }
371}