hyperactor_telemetry/sinks/
glog.rs1use std::collections::HashMap;
13use std::fmt::Write as FmtWrite;
14use std::io::Write;
15use std::str::FromStr;
16
17use anyhow::Result;
18use indexmap::IndexMap;
19use tracing_core::LevelFilter;
20use tracing_subscriber::filter::Targets;
21
22use crate::config::MONARCH_FILE_LOG_LEVEL;
23use crate::trace_dispatcher::FieldValue;
24use crate::trace_dispatcher::TraceEvent;
25use crate::trace_dispatcher::TraceEventSink;
26
27const MAX_LINE_SIZE: usize = 4096;
28const TRUNCATION_SUFFIX_RESERVE: usize = 32;
29
30struct LimitedBuffer {
34 buffer: String,
35 limit: usize,
37 truncated_chars: usize,
38}
39
40impl LimitedBuffer {
41 fn new(limit: usize) -> Self {
42 Self {
43 buffer: String::with_capacity(limit + TRUNCATION_SUFFIX_RESERVE),
44 limit,
45 truncated_chars: 0,
46 }
47 }
48
49 fn clear(&mut self) {
50 self.buffer.clear();
51 self.truncated_chars = 0;
52 }
53
54 fn finish_line(&mut self) {
56 if self.truncated_chars > 0 {
57 use std::fmt::Write;
58 let _ = write!(
59 &mut self.buffer,
60 "...[truncated {} chars]",
61 self.truncated_chars
62 );
63 }
64 self.buffer.push('\n');
65 }
66
67 fn as_bytes(&self) -> &[u8] {
68 self.buffer.as_bytes()
69 }
70}
71
72impl FmtWrite for LimitedBuffer {
73 fn write_str(&mut self, s: &str) -> std::fmt::Result {
74 let remaining = self.limit.saturating_sub(self.buffer.len());
75 if remaining == 0 {
76 self.truncated_chars += s.chars().count();
77 return Ok(());
78 }
79 if s.len() <= remaining {
80 self.buffer.push_str(s);
81 } else {
82 let mut truncate_at = remaining;
83 while truncate_at > 0 && !s.is_char_boundary(truncate_at) {
84 truncate_at -= 1;
85 }
86 self.buffer.push_str(&s[..truncate_at]);
87 self.truncated_chars += s[truncate_at..].chars().count();
88 }
89 Ok(())
90 }
91}
92
93pub struct GlogSink {
98 writer: Box<dyn Write + Send>,
99 prefix: Option<String>,
100 active_spans: HashMap<u64, (String, IndexMap<String, FieldValue>, Option<u64>)>,
102 targets: Targets,
103 line_buffer: LimitedBuffer,
107}
108
109impl GlogSink {
110 pub fn new(
117 writer: Box<dyn Write + Send>,
118 prefix_env_var: Option<String>,
119 file_log_level: &str,
120 ) -> Self {
121 let prefix = if let Some(ref env_var_name) = prefix_env_var {
122 std::env::var(env_var_name).ok()
123 } else {
124 None
125 };
126
127 Self {
128 writer,
129 prefix,
130 active_spans: HashMap::new(),
131 targets: Targets::new()
132 .with_default(LevelFilter::from_level({
133 let log_level_str =
134 hyperactor_config::global::try_get_cloned(MONARCH_FILE_LOG_LEVEL)
135 .unwrap_or_else(|| file_log_level.to_string());
136 tracing::Level::from_str(&log_level_str).unwrap_or_else(|_| {
137 tracing::Level::from_str(file_log_level).expect("Invalid default log level")
138 })
139 }))
140 .with_target("opentelemetry", LevelFilter::OFF), line_buffer: LimitedBuffer::new(MAX_LINE_SIZE - TRUNCATION_SUFFIX_RESERVE),
142 }
143 }
144
145 fn write_event(&mut self, event: &TraceEvent) -> Result<()> {
146 self.line_buffer.clear();
147
148 let timestamp_str = match event {
149 TraceEvent::Event { timestamp, .. } => {
150 let datetime: chrono::DateTime<chrono::Local> = (*timestamp).into();
151 datetime.format("%m%d %H:%M:%S%.6f").to_string()
152 }
153 _ => chrono::Local::now().format("%m%d %H:%M:%S%.6f").to_string(),
155 };
156
157 let prefix_str = if let Some(ref p) = self.prefix {
158 format!("[{}]", p)
159 } else {
160 "[-]".to_string()
161 };
162
163 match event {
164 TraceEvent::Event {
165 level,
166 fields,
167 parent_span,
168 thread_id,
169 file,
170 line,
171 ..
172 } => {
173 let level_char = match *level {
174 tracing::Level::ERROR => 'E',
175 tracing::Level::WARN => 'W',
176 tracing::Level::INFO => 'I',
177 tracing::Level::DEBUG => 'D',
178 tracing::Level::TRACE => 'T',
179 };
180
181 write!(
183 &mut self.line_buffer,
184 "{}{}{} {} ",
185 prefix_str, level_char, timestamp_str, thread_id
186 )?;
187
188 if let (Some(f), Some(l)) = (file, line) {
189 write!(&mut self.line_buffer, "{}:{}] ", f, l)?;
190 } else {
191 write!(&mut self.line_buffer, "unknown:0] ")?;
192 }
193
194 if let Some(parent_id) = parent_span {
195 self.write_span_context(*parent_id)?;
196 }
197
198 if let Some(v) = fields.get("message") {
199 match v {
200 FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
201 FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
202 _ => write!(&mut self.line_buffer, "event")?,
203 }
204 } else {
205 write!(&mut self.line_buffer, "event")?;
206 }
207
208 for (k, v) in fields.iter() {
209 if k != "message" {
210 write!(&mut self.line_buffer, ", {}:", k)?;
211 match v {
212 FieldValue::Bool(b) => write!(&mut self.line_buffer, "{}", b)?,
213 FieldValue::I64(i) => write!(&mut self.line_buffer, "{}", i)?,
214 FieldValue::U64(u) => write!(&mut self.line_buffer, "{}", u)?,
215 FieldValue::F64(f) => write!(&mut self.line_buffer, "{}", f)?,
216 FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
217 FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
218 }
219 }
220 }
221
222 self.line_buffer.finish_line();
223
224 self.writer.write_all(self.line_buffer.as_bytes())?;
225 }
226
227 _ => {
229 self.line_buffer.clear();
230 write!(
231 &mut self.line_buffer,
232 "{}I{} - unknown:0] unexpected event type",
233 prefix_str, timestamp_str
234 )?;
235 self.line_buffer.finish_line();
236 self.writer.write_all(self.line_buffer.as_bytes())?;
237 }
238 }
239
240 Ok(())
241 }
242
243 fn write_span_context(&mut self, span_id: u64) -> Result<()> {
245 let mut span_ids = Vec::new();
246 let mut current_id = Some(span_id);
247
248 while let Some(id) = current_id {
249 if let Some((_, _, parent_id)) = self.active_spans.get(&id) {
250 span_ids.push(id);
251 current_id = *parent_id;
252 } else {
253 break;
254 }
255 }
256 if span_ids.is_empty() {
257 return Ok(());
258 }
259
260 write!(&mut self.line_buffer, "[")?;
261
262 for (i, id) in span_ids.iter().rev().enumerate() {
263 if i > 0 {
264 write!(&mut self.line_buffer, ", ")?;
265 }
266
267 if let Some((name, fields, _)) = self.active_spans.get(id) {
268 write!(&mut self.line_buffer, "{}", name)?;
269 if !fields.is_empty() {
270 write!(&mut self.line_buffer, "{{")?;
271
272 let mut first = true;
273 for (k, v) in fields.iter() {
274 if !first {
275 write!(&mut self.line_buffer, ", ")?;
276 }
277 first = false;
278 write!(&mut self.line_buffer, "{}:", k)?;
279
280 match v {
281 FieldValue::Bool(b) => write!(&mut self.line_buffer, "{}", b)?,
282 FieldValue::I64(i) => write!(&mut self.line_buffer, "{}", i)?,
283 FieldValue::U64(u) => write!(&mut self.line_buffer, "{}", u)?,
284 FieldValue::F64(f) => write!(&mut self.line_buffer, "{}", f)?,
285 FieldValue::Str(s) => write!(&mut self.line_buffer, "{}", s)?,
286 FieldValue::Debug(s) => write!(&mut self.line_buffer, "{}", s)?,
287 }
288 }
289
290 write!(&mut self.line_buffer, "}}")?;
291 }
292 }
293 }
294
295 write!(&mut self.line_buffer, "] ")?;
296 Ok(())
297 }
298}
299
300impl TraceEventSink for GlogSink {
301 fn consume(&mut self, event: &TraceEvent) -> Result<(), anyhow::Error> {
302 match event {
303 TraceEvent::NewSpan {
305 id,
306 name,
307 fields,
308 parent_id,
309 ..
310 } => {
311 self.active_spans
312 .insert(*id, (name.to_string(), fields.clone(), *parent_id));
313 }
314 TraceEvent::SpanClose { id, .. } => {
315 self.active_spans.remove(id);
316 }
317 TraceEvent::Event { .. } => {
318 self.write_event(event)?;
319 }
320 _ => {}
321 }
322 Ok(())
323 }
324
325 fn flush(&mut self) -> Result<(), anyhow::Error> {
326 self.writer.flush()?;
327 Ok(())
328 }
329
330 fn name(&self) -> &str {
331 "GlogSink"
332 }
333
334 fn target_filter(&self) -> Option<&Targets> {
335 Some(&self.targets)
336 }
337}
338
339#[cfg(test)]
340mod test {
341 use super::*;
342
343 #[test]
344 fn test_limited_buffer_truncation() {
345 let mut buf = LimitedBuffer::new(20);
346
347 write!(
348 &mut buf,
349 "Hello, this is a very long message that exceeds the limit"
350 )
351 .unwrap();
352 buf.finish_line();
353
354 let output = std::str::from_utf8(buf.as_bytes()).unwrap();
355
356 assert_eq!(output, "Hello, this is a ver...[truncated 37 chars]\n");
357 }
358
359 #[test]
360 fn test_limited_buffer_no_truncation() {
361 let mut buf = LimitedBuffer::new(50);
362
363 write!(&mut buf, "Short message").unwrap();
364 buf.finish_line();
365
366 let output = std::str::from_utf8(buf.as_bytes()).unwrap();
367
368 assert_eq!(output, "Short message\n");
369 }
370}