1use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::fmt;
12use std::path::Path;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::Duration;
16use std::time::SystemTime;
17
18use anyhow::Result;
19use async_trait::async_trait;
20use chrono::DateTime;
21use chrono::Local;
22use hostname;
23use hyperactor::Actor;
24use hyperactor::ActorRef;
25use hyperactor::Bind;
26use hyperactor::Context;
27use hyperactor::HandleClient;
28use hyperactor::Handler;
29use hyperactor::Instance;
30use hyperactor::OncePortRef;
31use hyperactor::RefClient;
32use hyperactor::Unbind;
33use hyperactor::channel;
34use hyperactor::channel::ChannelAddr;
35use hyperactor::channel::ChannelRx;
36use hyperactor::channel::ChannelTransport;
37use hyperactor::channel::ChannelTx;
38use hyperactor::channel::Rx;
39use hyperactor::channel::Tx;
40use hyperactor::channel::TxStatus;
41use hyperactor::clock::Clock;
42use hyperactor::clock::RealClock;
43use hyperactor_config::CONFIG;
44use hyperactor_config::ConfigAttr;
45use hyperactor_config::attrs::declare_attrs;
46use hyperactor_telemetry::env;
47use hyperactor_telemetry::log_file_path;
48use serde::Deserialize;
49use serde::Serialize;
50use tokio::io;
51use tokio::io::AsyncRead;
52use tokio::io::AsyncReadExt;
53use tokio::io::AsyncWriteExt;
54use tokio::sync::Mutex;
55use tokio::sync::Notify;
56use tokio::sync::RwLock;
57use tokio::sync::watch::Receiver;
58use tokio::task::JoinHandle;
59use tracing::Level;
60use typeuri::Named;
61
62use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
63use crate::shortuuid::ShortUuid;
64
65mod line_prefixing_writer;
66
67pub(crate) const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5;
68const MAX_LINE_SIZE: usize = 4 * 1024;
69
70declare_attrs! {
71 @meta(CONFIG = ConfigAttr {
75 env_name: Some("HYPERACTOR_READ_LOG_BUFFER".to_string()),
76 py_name: Some("read_log_buffer".to_string()),
77 })
78 pub attr READ_LOG_BUFFER: usize = 100;
79
80 @meta(CONFIG = ConfigAttr {
82 env_name: Some("HYPERACTOR_FORCE_FILE_LOG".to_string()),
83 py_name: Some("force_file_log".to_string()),
84 })
85 pub attr FORCE_FILE_LOG: bool = false;
86
87 @meta(CONFIG = ConfigAttr {
89 env_name: Some("HYPERACTOR_PREFIX_WITH_RANK".to_string()),
90 py_name: Some("prefix_with_rank".to_string()),
91 })
92 pub attr PREFIX_WITH_RANK: bool = true;
93}
94
95fn levenshtein_distance(left: &str, right: &str) -> usize {
97 let left_chars: Vec<char> = left.chars().collect();
98 let right_chars: Vec<char> = right.chars().collect();
99
100 let left_len = left_chars.len();
101 let right_len = right_chars.len();
102
103 if left_len == 0 {
105 return right_len;
106 }
107 if right_len == 0 {
108 return left_len;
109 }
110
111 let mut matrix = vec![vec![0; right_len + 1]; left_len + 1];
113
114 for (i, row) in matrix.iter_mut().enumerate().take(left_len + 1) {
116 row[0] = i;
117 }
118 for (j, cell) in matrix[0].iter_mut().enumerate().take(right_len + 1) {
119 *cell = j;
120 }
121
122 for i in 1..=left_len {
124 for j in 1..=right_len {
125 let cost = if left_chars[i - 1] == right_chars[j - 1] {
126 0
127 } else {
128 1
129 };
130
131 matrix[i][j] = std::cmp::min(
132 std::cmp::min(
133 matrix[i - 1][j] + 1, matrix[i][j - 1] + 1, ),
136 matrix[i - 1][j - 1] + cost, );
138 }
139 }
140
141 matrix[left_len][right_len]
143}
144
145fn normalized_edit_distance(left: &str, right: &str) -> f64 {
147 let distance = levenshtein_distance(left, right) as f64;
148 let max_len = std::cmp::max(left.len(), right.len()) as f64;
149
150 if max_len == 0.0 {
151 0.0 } else {
153 distance / max_len
154 }
155}
156
157#[derive(Debug, Clone)]
158struct LogLine {
160 content: String,
161 pub count: u64,
162}
163
164impl LogLine {
165 fn new(content: String) -> Self {
166 Self { content, count: 1 }
167 }
168}
169
170impl fmt::Display for LogLine {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 write!(
173 f,
174 "\x1b[33m[{} similar log lines]\x1b[0m {}",
175 self.count, self.content
176 )
177 }
178}
179
180#[derive(Debug, Clone)]
181struct Aggregator {
184 lines: Vec<LogLine>,
185 start_time: SystemTime,
186 similarity_threshold: f64, }
188
189impl Aggregator {
190 fn new() -> Self {
191 Self::new_with_threshold(0.15)
193 }
194
195 fn new_with_threshold(threshold: f64) -> Self {
196 Aggregator {
197 lines: vec![],
198 start_time: RealClock.system_time_now(),
199 similarity_threshold: threshold,
200 }
201 }
202
203 fn reset(&mut self) {
204 self.lines.clear();
205 self.start_time = RealClock.system_time_now();
206 }
207
208 fn add_line(&mut self, line: &str) -> anyhow::Result<()> {
209 let mut best_match_idx = None;
211 let mut best_similarity = f64::MAX;
212
213 for (idx, existing_line) in self.lines.iter().enumerate() {
214 let distance = normalized_edit_distance(&existing_line.content, line);
215
216 if distance < best_similarity && distance < self.similarity_threshold {
218 best_match_idx = Some(idx);
219 best_similarity = distance;
220 }
221 }
222
223 if let Some(idx) = best_match_idx {
225 self.lines[idx].count += 1;
226 } else {
227 self.lines.push(LogLine::new(line.to_string()));
229 }
230
231 Ok(())
232 }
233
234 fn is_empty(&self) -> bool {
235 self.lines.is_empty()
236 }
237}
238
239fn format_system_time(time: SystemTime) -> String {
241 let datetime: DateTime<Local> = time.into();
242 datetime.format("%Y-%m-%d %H:%M:%S").to_string()
243}
244
245impl fmt::Display for Aggregator {
246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247 let start_time_str = format_system_time(self.start_time);
249
250 let current_time = RealClock.system_time_now();
252 let end_time_str = format_system_time(current_time);
253
254 writeln!(
256 f,
257 "\x1b[36m>>> Aggregated Logs ({}) >>>\x1b[0m",
258 start_time_str
259 )?;
260
261 for line in self.lines.iter() {
263 writeln!(f, "{}", line)?;
264 }
265 writeln!(
266 f,
267 "\x1b[36m<<< Aggregated Logs ({}) <<<\x1b[0m",
268 end_time_str
269 )?;
270 Ok(())
271 }
272}
273
274#[derive(
276 Debug,
277 Clone,
278 Serialize,
279 Deserialize,
280 Named,
281 Handler,
282 HandleClient,
283 RefClient
284)]
285pub enum LogMessage {
286 Log {
288 hostname: String,
290 pid: u32,
292 output_target: OutputTarget,
294 payload: wirevalue::Any,
296 },
297
298 Flush {
300 sync_version: Option<u64>,
303 },
304}
305
306#[derive(
308 Debug,
309 Clone,
310 Serialize,
311 Deserialize,
312 Named,
313 Handler,
314 HandleClient,
315 RefClient
316)]
317pub enum LogClientMessage {
318 SetAggregate {
319 aggregate_window_sec: Option<u64>,
321 },
322
323 StartSyncFlush {
325 expected_procs: usize,
327 reply: OncePortRef<()>,
329 version: OncePortRef<u64>,
331 },
332}
333
334#[async_trait]
336pub trait LogSender: Send + Sync {
337 fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()>;
339
340 fn flush(&mut self) -> anyhow::Result<()>;
343}
344
345#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
347pub enum OutputTarget {
348 Stdout,
350 Stderr,
352}
353
354#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
355pub enum Stream {
356 ChildStdout,
358 ChildStderr,
360}
361
362pub struct LocalLogSender {
364 hostname: String,
365 pid: u32,
366 tx: ChannelTx<LogMessage>,
367 status: Receiver<TxStatus>,
368}
369
370impl LocalLogSender {
371 fn new(log_channel: ChannelAddr, pid: u32) -> Result<Self, anyhow::Error> {
372 let tx = channel::dial::<LogMessage>(log_channel)?;
373 let status = tx.status().clone();
374
375 let hostname = hostname::get()
376 .unwrap_or_else(|_| "unknown_host".into())
377 .into_string()
378 .unwrap_or("unknown_host".to_string());
379 Ok(Self {
380 hostname,
381 pid,
382 tx,
383 status,
384 })
385 }
386}
387
388#[async_trait]
389impl LogSender for LocalLogSender {
390 fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()> {
391 if TxStatus::Active == *self.status.borrow() {
392 self.tx.post(LogMessage::Log {
394 hostname: self.hostname.clone(),
395 pid: self.pid,
396 output_target: target,
397 payload: wirevalue::Any::serialize(&payload)?,
398 });
399 }
400
401 Ok(())
402 }
403
404 fn flush(&mut self) -> anyhow::Result<()> {
405 if TxStatus::Active == *self.status.borrow() {
407 self.tx.post(LogMessage::Flush { sync_version: None });
409 }
410 Ok(())
411 }
412}
413
414#[derive(Debug, Clone, Serialize, Deserialize, Named)]
416pub struct FileMonitorMessage {
417 lines: Vec<String>,
418}
419wirevalue::register_type!(FileMonitorMessage);
420
421pub struct FileAppender {
423 stdout_addr: ChannelAddr,
424 stderr_addr: ChannelAddr,
425 #[allow(dead_code)] stdout_task: JoinHandle<()>,
427 #[allow(dead_code)]
428 stderr_task: JoinHandle<()>,
429 stop: Arc<Notify>,
430}
431
432impl fmt::Debug for FileAppender {
433 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434 f.debug_struct("FileMonitor")
435 .field("stdout_addr", &self.stdout_addr)
436 .field("stderr_addr", &self.stderr_addr)
437 .finish()
438 }
439}
440
441impl FileAppender {
442 pub fn new() -> Option<Self> {
445 let stop = Arc::new(Notify::new());
446 let file_name_tag = hostname::get()
448 .unwrap_or_else(|_| "unknown_host".into())
449 .into_string()
450 .unwrap_or("unknown_host".to_string());
451
452 let (stdout_path, stdout_writer) =
454 match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stdout) {
455 Some(writer) => writer,
456 None => {
457 tracing::warn!("failed to create stdout file");
458 return None;
459 }
460 };
461 let (stdout_addr, stdout_rx) = {
462 let _guard = tracing::span!(Level::INFO, "appender", file = "stdout").entered();
463 match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
464 Ok((addr, rx)) => (addr, rx),
465 Err(e) => {
466 tracing::warn!("failed to serve stdout channel: {}", e);
467 return None;
468 }
469 }
470 };
471 let stdout_stop = stop.clone();
472 let stdout_task = tokio::spawn(file_monitor_task(
473 stdout_rx,
474 stdout_writer,
475 OutputTarget::Stdout,
476 stdout_stop,
477 ));
478
479 let (stderr_path, stderr_writer) =
481 match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stderr) {
482 Some(writer) => writer,
483 None => {
484 tracing::warn!("failed to create stderr file");
485 return None;
486 }
487 };
488 let (stderr_addr, stderr_rx) = {
489 let _guard = tracing::span!(Level::INFO, "appender", file = "stderr").entered();
490 match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
491 Ok((addr, rx)) => (addr, rx),
492 Err(e) => {
493 tracing::warn!("failed to serve stderr channel: {}", e);
494 return None;
495 }
496 }
497 };
498 let stderr_stop = stop.clone();
499 let stderr_task = tokio::spawn(file_monitor_task(
500 stderr_rx,
501 stderr_writer,
502 OutputTarget::Stderr,
503 stderr_stop,
504 ));
505
506 tracing::debug!(
507 "FileAppender: created for stdout {} stderr {} ",
508 stdout_path.display(),
509 stderr_path.display()
510 );
511
512 Some(Self {
513 stdout_addr,
514 stderr_addr,
515 stdout_task,
516 stderr_task,
517 stop,
518 })
519 }
520
521 pub fn addr_for(&self, target: OutputTarget) -> ChannelAddr {
523 match target {
524 OutputTarget::Stdout => self.stdout_addr.clone(),
525 OutputTarget::Stderr => self.stderr_addr.clone(),
526 }
527 }
528}
529
530impl Drop for FileAppender {
531 fn drop(&mut self) {
532 self.stop.notify_waiters();
534 tracing::debug!("FileMonitor: dropping, stop signal sent, tasks will flush and exit");
535 }
536}
537
538async fn file_monitor_task(
540 mut rx: ChannelRx<FileMonitorMessage>,
541 mut writer: Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
542 target: OutputTarget,
543 stop: Arc<Notify>,
544) {
545 loop {
546 tokio::select! {
547 msg = rx.recv() => {
548 match msg {
549 Ok(msg) => {
550 for line in &msg.lines {
552 if let Err(e) = writer.write_all(line.as_bytes()).await {
553 tracing::warn!("FileMonitor: failed to write line to file: {}", e);
554 continue;
555 }
556 if let Err(e) = writer.write_all(b"\n").await {
557 tracing::warn!("FileMonitor: failed to write newline to file: {}", e);
558 }
559 }
560 if let Err(e) = writer.flush().await {
561 tracing::warn!("FileMonitor: failed to flush file: {}", e);
562 }
563 }
564 Err(e) => {
565 tracing::debug!("FileMonitor task for {:?}: channel error: {}", target, e);
567 break;
568 }
569 }
570 }
571 _ = stop.notified() => {
572 tracing::debug!("FileMonitor task for {:?}: stop signal received", target);
573 break;
574 }
575 }
576 }
577
578 if let Err(e) = writer.flush().await {
580 tracing::warn!("FileMonitor: failed final flush: {}", e);
581 }
582 tracing::debug!("FileMonitor task for {:?} exiting", target);
583}
584
585fn create_unique_file_writer(
586 file_name_tag: &str,
587 output_target: OutputTarget,
588 env: env::Env,
589) -> Result<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
590 let suffix = match output_target {
591 OutputTarget::Stderr => "stderr",
592 OutputTarget::Stdout => "stdout",
593 };
594 let (path, filename) = log_file_path(env, None)?;
595 let path = Path::new(&path);
596 let mut full_path = PathBuf::from(path);
597
598 let uuid = ShortUuid::generate();
599
600 full_path.push(format!(
601 "{}_{}_{}.{}",
602 filename, file_name_tag, uuid, suffix
603 ));
604 let file = std::fs::OpenOptions::new()
605 .create(true)
606 .append(true)
607 .open(full_path.clone())?;
608 let tokio_file = tokio::fs::File::from_std(file);
609 Ok((full_path, Box::new(tokio_file)))
611}
612
613fn get_unique_local_log_destination(
614 file_name_tag: &str,
615 output_target: OutputTarget,
616) -> Option<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
617 let env: env::Env = env::Env::current();
618 if env == env::Env::Local && !hyperactor_config::global::get(FORCE_FILE_LOG) {
619 tracing::debug!("not creating log file because of env type");
620 None
621 } else {
622 match create_unique_file_writer(file_name_tag, output_target, env) {
623 Ok((a, b)) => Some((a, b)),
624 Err(e) => {
625 tracing::warn!("failed to create unique file writer: {}", e);
626 None
627 }
628 }
629 }
630}
631
632fn std_writer(target: OutputTarget) -> Box<dyn io::AsyncWrite + Send + Unpin> {
634 match target {
636 OutputTarget::Stdout => Box::new(tokio::io::stdout()),
637 OutputTarget::Stderr => Box::new(tokio::io::stderr()),
638 }
639}
640
641async fn tee(
644 mut reader: impl AsyncRead + Unpin + Send + 'static,
645 mut std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
646 log_sender: Option<Box<dyn LogSender + Send>>,
647 file_monitor_addr: Option<ChannelAddr>,
648 target: OutputTarget,
649 prefix: Option<String>,
650 stop: Arc<Notify>,
651 recent_lines_buf: RotatingLineBuffer,
652) -> Result<(), io::Error> {
653 let mut buf = [0u8; 8192];
654 let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE);
655 let mut log_sender = log_sender;
656
657 let mut file_monitor_tx: Option<ChannelTx<FileMonitorMessage>> =
659 file_monitor_addr.and_then(|addr| match channel::dial(addr.clone()) {
660 Ok(tx) => Some(tx),
661 Err(e) => {
662 tracing::warn!("Failed to dial file monitor channel {}: {}", addr, e);
663 None
664 }
665 });
666
667 loop {
668 tokio::select! {
669 read_result = reader.read(&mut buf) => {
670 match read_result {
671 Ok(n) => {
672 if n == 0 {
673 tracing::debug!("EOF reached in tee");
675 break;
676 }
677
678 if let Err(e) = std_writer.write_all(&buf[..n]).await {
680 tracing::warn!("error writing to std: {}", e);
681 }
682
683 let mut completed_lines = Vec::new();
685
686 for &byte in &buf[..n] {
687 if byte == b'\n' {
688 let mut line = String::from_utf8_lossy(&line_buffer).to_string();
690
691 if line.len() > MAX_LINE_SIZE {
694 let mut truncate_at = MAX_LINE_SIZE;
695 while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
696 truncate_at -= 1;
697 }
698 line.truncate(truncate_at);
699 line.push_str("... [TRUNCATED]");
700 }
701
702 let final_line = if let Some(ref p) = prefix {
704 format!("[{}] {}", p, line)
705 } else {
706 line
707 };
708
709 completed_lines.push(final_line);
710 line_buffer.clear();
711 } else {
712 line_buffer.push(byte);
713 }
714 }
715
716 if !completed_lines.is_empty() {
718 if let Some(ref mut sender) = log_sender {
719 let bytes: Vec<Vec<u8>> = completed_lines.iter()
720 .map(|s| s.as_bytes().to_vec())
721 .collect();
722 if let Err(e) = sender.send(target, bytes) {
723 tracing::warn!("error sending to log_sender: {}", e);
724 }
725 }
726
727 if let Some(ref mut tx) = file_monitor_tx {
729 let msg = FileMonitorMessage {
730 lines: completed_lines,
731 };
732 tx.post(msg);
734 }
735 }
736
737 recent_lines_buf.try_add_data(&buf, n);
738 },
739 Err(e) => {
740 tracing::debug!("read error in tee: {}", e);
741 return Err(e);
742 }
743 }
744 },
745 _ = stop.notified() => {
746 tracing::debug!("stop signal received in tee");
747 break;
748 }
749 }
750 }
751
752 std_writer.flush().await?;
753
754 if !line_buffer.is_empty() {
756 let mut line = String::from_utf8_lossy(&line_buffer).to_string();
757 if line.len() > MAX_LINE_SIZE {
760 let mut truncate_at = MAX_LINE_SIZE;
761 while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
762 truncate_at -= 1;
763 }
764 line.truncate(truncate_at);
765 line.push_str("... [TRUNCATED]");
766 }
767 let final_line = if let Some(ref p) = prefix {
768 format!("[{}] {}", p, line)
769 } else {
770 line
771 };
772
773 let final_lines = vec![final_line];
774
775 if let Some(ref mut sender) = log_sender {
777 let bytes: Vec<Vec<u8>> = final_lines.iter().map(|s| s.as_bytes().to_vec()).collect();
778 let _ = sender.send(target, bytes);
779 }
780
781 if let Some(ref mut tx) = file_monitor_tx {
783 let msg = FileMonitorMessage { lines: final_lines };
784 tx.post(msg);
785 }
786 }
787
788 if let Some(ref mut sender) = log_sender {
790 let _ = sender.flush();
791 }
792
793 Ok(())
794}
795
796#[derive(Debug, Clone)]
797struct RotatingLineBuffer {
798 recent_lines: Arc<RwLock<VecDeque<String>>>,
799 max_buffer_size: usize,
800}
801
802impl RotatingLineBuffer {
803 fn try_add_data(&self, buf: &[u8], buf_end: usize) {
804 let data_str = String::from_utf8_lossy(&buf[..buf_end]);
805 let lines: Vec<&str> = data_str.lines().collect();
806
807 if let Ok(mut recent_lines_guard) = self.recent_lines.try_write() {
808 for line in lines {
809 if !line.is_empty() {
810 recent_lines_guard.push_back(line.to_string());
811 if recent_lines_guard.len() > self.max_buffer_size {
812 recent_lines_guard.pop_front();
813 }
814 }
815 }
816 } else {
817 tracing::debug!("Failed to acquire write lock on recent_lines buffer in tee");
818 }
819 }
820
821 async fn peek(&self) -> Vec<String> {
822 let lines = self.recent_lines.read().await;
823 let start_idx = if lines.len() > self.max_buffer_size {
824 lines.len() - self.max_buffer_size
825 } else {
826 0
827 };
828
829 lines.range(start_idx..).cloned().collect()
830 }
831}
832
833pub struct StreamFwder {
835 teer: JoinHandle<Result<(), io::Error>>,
836 recent_lines_buf: RotatingLineBuffer,
838 stop: Arc<Notify>,
840}
841
842impl StreamFwder {
843 pub fn start(
850 reader: impl AsyncRead + Unpin + Send + 'static,
851 file_monitor_addr: Option<ChannelAddr>,
852 target: OutputTarget,
853 max_buffer_size: usize,
854 log_channel: Option<ChannelAddr>,
855 pid: u32,
856 local_rank: usize,
857 ) -> Self {
858 let prefix = match hyperactor_config::global::get(PREFIX_WITH_RANK) {
859 true => Some(local_rank.to_string()),
860 false => None,
861 };
862 let std_writer = std_writer(target);
863
864 Self::start_with_writer(
865 reader,
866 std_writer,
867 file_monitor_addr,
868 target,
869 max_buffer_size,
870 log_channel,
871 pid,
872 prefix,
873 )
874 }
875
876 fn start_with_writer(
878 reader: impl AsyncRead + Unpin + Send + 'static,
879 std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
880 file_monitor_addr: Option<ChannelAddr>,
881 target: OutputTarget,
882 max_buffer_size: usize,
883 log_channel: Option<ChannelAddr>,
884 pid: u32,
885 prefix: Option<String>,
886 ) -> Self {
887 debug_assert!(
893 file_monitor_addr.is_some() || max_buffer_size > 0 || log_channel.is_some(),
894 "StreamFwder started with no sinks and no tail"
895 );
896
897 let stop = Arc::new(Notify::new());
898 let recent_lines_buf = RotatingLineBuffer {
899 recent_lines: Arc::new(RwLock::new(VecDeque::<String>::with_capacity(
900 max_buffer_size,
901 ))),
902 max_buffer_size,
903 };
904
905 let log_sender: Option<Box<dyn LogSender + Send>> = if let Some(addr) = log_channel {
906 match LocalLogSender::new(addr, pid) {
907 Ok(s) => Some(Box::new(s) as Box<dyn LogSender + Send>),
908 Err(e) => {
909 tracing::error!("failed to create log sender: {}", e);
910 None
911 }
912 }
913 } else {
914 None
915 };
916
917 let teer_stop = stop.clone();
918 let recent_line_buf_clone = recent_lines_buf.clone();
919 let teer = tokio::spawn(async move {
920 tee(
921 reader,
922 std_writer,
923 log_sender,
924 file_monitor_addr,
925 target,
926 prefix,
927 teer_stop,
928 recent_line_buf_clone,
929 )
930 .await
931 });
932
933 StreamFwder {
934 teer,
935 recent_lines_buf,
936 stop,
937 }
938 }
939
940 pub async fn abort(self) -> (Vec<String>, Result<(), anyhow::Error>) {
941 self.stop.notify_waiters();
942
943 let lines = self.peek().await;
944 let teer_result = self.teer.await;
945
946 let result: Result<(), anyhow::Error> = match teer_result {
947 Ok(inner) => inner.map_err(anyhow::Error::from),
948 Err(e) => Err(e.into()),
949 };
950
951 (lines, result)
952 }
953
954 pub async fn peek(&self) -> Vec<String> {
957 self.recent_lines_buf.peek().await
958 }
959}
960
961#[derive(
963 Debug,
964 Clone,
965 Serialize,
966 Deserialize,
967 Named,
968 Handler,
969 HandleClient,
970 RefClient,
971 Bind,
972 Unbind
973)]
974pub enum LogForwardMessage {
975 Forward {},
977
978 SetMode { stream_to_client: bool },
980
981 ForceSyncFlush { version: u64 },
983}
984
985#[hyperactor::export(
987 spawn = true,
988 handlers = [LogForwardMessage {cast = true}],
989)]
990pub struct LogForwardActor {
991 rx: ChannelRx<LogMessage>,
992 flush_tx: Arc<Mutex<ChannelTx<LogMessage>>>,
993 next_flush_deadline: SystemTime,
994 logging_client_ref: ActorRef<LogClientActor>,
995 stream_to_client: bool,
996}
997
998#[async_trait]
999impl Actor for LogForwardActor {
1000 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1001 this.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1002
1003 self.flush_tx
1005 .lock()
1006 .await
1007 .send(LogMessage::Flush { sync_version: None })
1008 .await?;
1009 Ok(())
1010 }
1011}
1012
1013#[async_trait]
1014impl hyperactor::RemoteSpawn for LogForwardActor {
1015 type Params = ActorRef<LogClientActor>;
1016
1017 async fn new(logging_client_ref: Self::Params) -> Result<Self> {
1018 let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
1019 Ok(channel) => channel.parse()?,
1020 Err(err) => {
1021 tracing::debug!(
1022 "log forwarder actor failed to read env var {}: {}",
1023 BOOTSTRAP_LOG_CHANNEL,
1024 err
1025 );
1026 ChannelAddr::any(ChannelTransport::Unix)
1028 }
1029 };
1030 tracing::info!(
1031 "log forwarder {} serve at {}",
1032 std::process::id(),
1033 log_channel
1034 );
1035
1036 let rx = match channel::serve(log_channel.clone()) {
1037 Ok((_, rx)) => rx,
1038 Err(err) => {
1039 tracing::error!(
1042 "log forwarder actor failed to bootstrap on given channel {}: {}",
1043 log_channel,
1044 err
1045 );
1046 channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1
1047 }
1048 };
1049
1050 let flush_tx = Arc::new(Mutex::new(channel::dial::<LogMessage>(log_channel)?));
1052 let now = RealClock.system_time_now();
1053
1054 Ok(Self {
1055 rx,
1056 flush_tx,
1057 next_flush_deadline: now,
1058 logging_client_ref,
1059 stream_to_client: true,
1060 })
1061 }
1062}
1063
1064#[async_trait]
1065#[hyperactor::forward(LogForwardMessage)]
1066impl LogForwardMessageHandler for LogForwardActor {
1067 async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
1068 match self.rx.recv().await {
1069 Ok(LogMessage::Flush { sync_version }) => {
1070 let now = RealClock.system_time_now();
1071 match sync_version {
1072 None => {
1073 let delay = Duration::from_secs(1);
1075 if now >= self.next_flush_deadline {
1076 self.next_flush_deadline = now + delay;
1077 let flush_tx = self.flush_tx.clone();
1078 tokio::spawn(async move {
1079 RealClock.sleep(delay).await;
1080 if let Err(e) = flush_tx
1081 .lock()
1082 .await
1083 .send(LogMessage::Flush { sync_version: None })
1084 .await
1085 {
1086 tracing::error!("failed to send flush message: {}", e);
1087 }
1088 });
1089 }
1090 }
1091 version => {
1092 self.logging_client_ref.flush(ctx, version).await?;
1093 }
1094 }
1095 }
1096 Ok(LogMessage::Log {
1097 hostname,
1098 pid,
1099 output_target,
1100 payload,
1101 }) => {
1102 if self.stream_to_client {
1103 self.logging_client_ref
1104 .log(ctx, hostname, pid, output_target, payload)
1105 .await?;
1106 }
1107 }
1108 Err(e) => {
1109 return Err(e.into());
1110 }
1111 }
1112
1113 ctx.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1115
1116 Ok(())
1117 }
1118
1119 async fn set_mode(
1120 &mut self,
1121 _ctx: &Context<Self>,
1122 stream_to_client: bool,
1123 ) -> Result<(), anyhow::Error> {
1124 self.stream_to_client = stream_to_client;
1125 Ok(())
1126 }
1127
1128 async fn force_sync_flush(
1129 &mut self,
1130 _cx: &Context<Self>,
1131 version: u64,
1132 ) -> Result<(), anyhow::Error> {
1133 self.flush_tx
1134 .lock()
1135 .await
1136 .send(LogMessage::Flush {
1137 sync_version: Some(version),
1138 })
1139 .await
1140 .map_err(anyhow::Error::from)
1141 }
1142}
1143
1144fn deserialize_message_lines(serialized_message: &wirevalue::Any) -> Result<Vec<Vec<String>>> {
1146 if let Ok(message_bytes) = serialized_message.deserialized::<Vec<Vec<u8>>>() {
1148 let mut result = Vec::new();
1149 for bytes in message_bytes {
1150 let message_str = String::from_utf8(bytes)?;
1151 let lines: Vec<String> = message_str.lines().map(|s| s.to_string()).collect();
1152 result.push(lines);
1153 }
1154 return Ok(result);
1155 }
1156
1157 if let Ok(message) = serialized_message.deserialized::<String>() {
1159 let lines: Vec<String> = message.lines().map(|s| s.to_string()).collect();
1160 return Ok(vec![lines]);
1161 }
1162
1163 anyhow::bail!("failed to deserialize message as either Vec<Vec<u8>> or String")
1165}
1166
1167#[derive(Debug)]
1169#[hyperactor::export(
1170 spawn = true,
1171 handlers = [LogMessage, LogClientMessage],
1172)]
1173pub struct LogClientActor {
1174 aggregate_window_sec: Option<u64>,
1175 aggregators: HashMap<OutputTarget, Aggregator>,
1176 last_flush_time: SystemTime,
1177 next_flush_deadline: Option<SystemTime>,
1178
1179 current_flush_version: u64,
1181 current_flush_port: Option<OncePortRef<()>>,
1182 current_unflushed_procs: usize,
1183}
1184
1185impl Default for LogClientActor {
1186 fn default() -> Self {
1187 let mut aggregators = HashMap::new();
1189 aggregators.insert(OutputTarget::Stderr, Aggregator::new());
1190 aggregators.insert(OutputTarget::Stdout, Aggregator::new());
1191
1192 Self {
1193 aggregate_window_sec: Some(DEFAULT_AGGREGATE_WINDOW_SEC),
1194 aggregators,
1195 last_flush_time: RealClock.system_time_now(),
1196 next_flush_deadline: None,
1197 current_flush_version: 0,
1198 current_flush_port: None,
1199 current_unflushed_procs: 0,
1200 }
1201 }
1202}
1203
1204impl LogClientActor {
1205 fn print_aggregators(&mut self) {
1206 for (output_target, aggregator) in self.aggregators.iter_mut() {
1207 if aggregator.is_empty() {
1208 continue;
1209 }
1210 match output_target {
1211 OutputTarget::Stdout => {
1212 println!("{}", aggregator);
1213 }
1214 OutputTarget::Stderr => {
1215 eprintln!("{}", aggregator);
1216 }
1217 }
1218
1219 aggregator.reset();
1221 }
1222 }
1223
1224 fn print_log_line(hostname: &str, pid: u32, output_target: OutputTarget, line: String) {
1225 let message = format!("[{} {}] {}", hostname, pid, line);
1226
1227 #[cfg(test)]
1228 crate::logging::test_tap::push(&message);
1229
1230 match output_target {
1231 OutputTarget::Stdout => println!("{}", message),
1232 OutputTarget::Stderr => eprintln!("{}", message),
1233 }
1234 }
1235
1236 fn flush_internal(&mut self) {
1237 self.print_aggregators();
1238 self.last_flush_time = RealClock.system_time_now();
1239 self.next_flush_deadline = None;
1240 }
1241}
1242
1243#[async_trait]
1244impl Actor for LogClientActor {}
1245
1246impl Drop for LogClientActor {
1247 fn drop(&mut self) {
1248 self.print_aggregators();
1250 }
1251}
1252
1253#[async_trait]
1254#[hyperactor::forward(LogMessage)]
1255impl LogMessageHandler for LogClientActor {
1256 async fn log(
1257 &mut self,
1258 cx: &Context<Self>,
1259 hostname: String,
1260 pid: u32,
1261 output_target: OutputTarget,
1262 payload: wirevalue::Any,
1263 ) -> Result<(), anyhow::Error> {
1264 let message_line_groups = deserialize_message_lines(&payload)?;
1266 let hostname = hostname.as_str();
1267
1268 let message_lines: Vec<String> = message_line_groups.into_iter().flatten().collect();
1269 match self.aggregate_window_sec {
1270 None => {
1271 for line in message_lines {
1272 Self::print_log_line(hostname, pid, output_target, line);
1273 }
1274 self.last_flush_time = RealClock.system_time_now();
1275 }
1276 Some(window) => {
1277 for line in message_lines {
1278 if let Some(aggregator) = self.aggregators.get_mut(&output_target) {
1279 if let Err(e) = aggregator.add_line(&line) {
1280 tracing::error!("error adding log line: {}", e);
1281 Self::print_log_line(hostname, pid, output_target, line);
1283 }
1284 } else {
1285 tracing::error!("unknown output target: {:?}", output_target);
1286 Self::print_log_line(hostname, pid, output_target, line);
1288 }
1289 }
1290
1291 let new_deadline = self.last_flush_time + Duration::from_secs(window);
1292 let now = RealClock.system_time_now();
1293 if new_deadline <= now {
1294 self.flush_internal();
1295 } else {
1296 let delay = new_deadline.duration_since(now)?;
1297 match self.next_flush_deadline {
1298 None => {
1299 self.next_flush_deadline = Some(new_deadline);
1300 cx.self_message_with_delay(
1301 LogMessage::Flush { sync_version: None },
1302 delay,
1303 )?;
1304 }
1305 Some(deadline) => {
1306 if new_deadline < deadline {
1308 self.next_flush_deadline = Some(new_deadline);
1310 cx.self_message_with_delay(
1311 LogMessage::Flush { sync_version: None },
1312 delay,
1313 )?;
1314 }
1315 }
1316 }
1317 }
1318 }
1319 }
1320
1321 Ok(())
1322 }
1323
1324 async fn flush(
1325 &mut self,
1326 cx: &Context<Self>,
1327 sync_version: Option<u64>,
1328 ) -> Result<(), anyhow::Error> {
1329 match sync_version {
1330 None => {
1331 self.flush_internal();
1332 }
1333 Some(version) => {
1334 if version != self.current_flush_version {
1335 tracing::error!(
1336 "found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
1337 version,
1338 self.current_flush_version
1339 );
1340 return Ok(());
1341 }
1342
1343 if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
1344 anyhow::bail!("found no ongoing flush request");
1346 }
1347 self.current_unflushed_procs -= 1;
1348
1349 tracing::debug!(
1350 "ack sync flush: version {}; remaining procs: {}",
1351 self.current_flush_version,
1352 self.current_unflushed_procs
1353 );
1354
1355 if self.current_unflushed_procs == 0 {
1356 self.flush_internal();
1357 let reply = self.current_flush_port.take().unwrap();
1358 self.current_flush_port = None;
1359 reply.send(cx, ()).map_err(anyhow::Error::from)?;
1360 }
1361 }
1362 }
1363
1364 Ok(())
1365 }
1366}
1367
1368#[async_trait]
1369#[hyperactor::forward(LogClientMessage)]
1370impl LogClientMessageHandler for LogClientActor {
1371 async fn set_aggregate(
1372 &mut self,
1373 _cx: &Context<Self>,
1374 aggregate_window_sec: Option<u64>,
1375 ) -> Result<(), anyhow::Error> {
1376 if self.aggregate_window_sec.is_some() && aggregate_window_sec.is_none() {
1377 self.print_aggregators();
1379 }
1380 self.aggregate_window_sec = aggregate_window_sec;
1381 Ok(())
1382 }
1383
1384 async fn start_sync_flush(
1385 &mut self,
1386 cx: &Context<Self>,
1387 expected_procs_flushed: usize,
1388 reply: OncePortRef<()>,
1389 version: OncePortRef<u64>,
1390 ) -> Result<(), anyhow::Error> {
1391 if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
1392 tracing::warn!(
1393 "found unfinished ongoing flush: version {}; {} unflushed procs",
1394 self.current_flush_version,
1395 self.current_unflushed_procs,
1396 );
1397 }
1398
1399 self.current_flush_version += 1;
1400 tracing::debug!(
1401 "start sync flush with version {}",
1402 self.current_flush_version
1403 );
1404 self.current_flush_port = Some(reply.clone());
1405 self.current_unflushed_procs = expected_procs_flushed;
1406 version
1407 .send(cx, self.current_flush_version)
1408 .map_err(anyhow::Error::from)?;
1409 Ok(())
1410 }
1411}
1412
1413#[cfg(test)]
1414pub mod test_tap {
1415 use std::sync::Mutex;
1416 use std::sync::OnceLock;
1417
1418 use tokio::sync::mpsc::UnboundedReceiver;
1419 use tokio::sync::mpsc::UnboundedSender;
1420
1421 static TAP: OnceLock<UnboundedSender<String>> = OnceLock::new();
1422 static RX: OnceLock<Mutex<UnboundedReceiver<String>>> = OnceLock::new();
1423
1424 pub fn install(tx: UnboundedSender<String>) {
1426 let _ = TAP.set(tx);
1427 }
1428
1429 pub fn set_receiver(rx: UnboundedReceiver<String>) {
1431 let _ = RX.set(Mutex::new(rx));
1432 }
1433
1434 pub fn push(s: &str) {
1436 if let Some(tx) = TAP.get() {
1437 let _ = tx.send(s.to_string());
1438 }
1439 }
1440
1441 pub fn drain() -> Vec<String> {
1443 let mut out = Vec::new();
1444 if let Some(rx) = RX.get() {
1445 let mut rx = rx.lock().unwrap();
1446 while let Ok(line) = rx.try_recv() {
1447 out.push(line);
1448 }
1449 }
1450 out
1451 }
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456
1457 use std::sync::Arc;
1458 use std::sync::Mutex;
1459
1460 use hyperactor::RemoteSpawn;
1461 use hyperactor::channel;
1462 use hyperactor::channel::ChannelAddr;
1463 use hyperactor::channel::ChannelTx;
1464 use hyperactor::channel::Tx;
1465 use hyperactor::id;
1466 use hyperactor::mailbox::BoxedMailboxSender;
1467 use hyperactor::mailbox::DialMailboxRouter;
1468 use hyperactor::mailbox::MailboxServer;
1469 use hyperactor::proc::Proc;
1470 use tokio::io::AsyncSeek;
1471 use tokio::io::AsyncSeekExt;
1472 use tokio::io::AsyncWriteExt;
1473 use tokio::io::SeekFrom;
1474 use tokio::sync::mpsc;
1475
1476 use super::*;
1477
1478 #[derive(Debug)]
1480 struct FileProcessingResult {
1481 lines: Vec<Vec<u8>>,
1483 new_position: u64,
1485 incomplete_line_buffer: Vec<u8>,
1487 }
1488
1489 async fn process_file_content<R: AsyncRead + AsyncSeek + Unpin>(
1492 reader: &mut R,
1493 current_position: u64,
1494 file_size: u64,
1495 existing_line_buffer: Vec<u8>,
1496 max_buffer_size: usize,
1497 ) -> Result<FileProcessingResult> {
1498 if current_position == file_size {
1500 return Ok(FileProcessingResult {
1501 lines: Vec::new(),
1502 new_position: current_position,
1503 incomplete_line_buffer: existing_line_buffer,
1504 });
1505 }
1506
1507 let actual_position = if current_position > file_size {
1509 tracing::warn!(
1510 "File appears to have been truncated (position {} > file size {}), resetting to start",
1511 current_position,
1512 file_size
1513 );
1514 reader.seek(SeekFrom::Start(0)).await?;
1515 0
1516 } else {
1517 reader.seek(SeekFrom::Start(current_position)).await?;
1519 current_position
1520 };
1521
1522 let mut buf = vec![0u8; 128 * 1024];
1523 let mut line_buffer = existing_line_buffer;
1524 let mut lines = Vec::with_capacity(max_buffer_size);
1525 let mut processed_bytes = 0u64;
1526
1527 loop {
1528 let bytes_read = reader.read(&mut buf).await?;
1529 if bytes_read == 0 {
1530 break;
1531 }
1532
1533 let chunk = &buf[..bytes_read];
1534
1535 let mut start = 0;
1536 while let Some(newline_pos) = chunk[start..].iter().position(|&b| b == b'\n') {
1537 let absolute_pos = start + newline_pos;
1538
1539 line_buffer.extend_from_slice(&chunk[start..absolute_pos]);
1540
1541 if !line_buffer.is_empty() {
1542 if line_buffer.len() > MAX_LINE_SIZE {
1543 line_buffer.truncate(MAX_LINE_SIZE);
1544 line_buffer.extend_from_slice(b"... [TRUNCATED]");
1545 }
1546
1547 let line_data = std::mem::replace(&mut line_buffer, Vec::with_capacity(2048));
1548 lines.push(line_data);
1549 }
1550
1551 start = absolute_pos + 1;
1552
1553 if lines.len() >= max_buffer_size {
1555 let new_position = actual_position + processed_bytes + start as u64;
1558
1559 return Ok(FileProcessingResult {
1561 lines,
1562 new_position,
1563 incomplete_line_buffer: Vec::new(),
1564 });
1565 }
1566 }
1567
1568 processed_bytes += bytes_read as u64;
1570
1571 if start < chunk.len() {
1572 line_buffer.extend_from_slice(&chunk[start..]);
1573 }
1574 }
1575
1576 let new_position = actual_position + processed_bytes;
1577
1578 Ok(FileProcessingResult {
1579 lines,
1580 new_position,
1581 incomplete_line_buffer: line_buffer,
1582 })
1583 }
1584
1585 #[tokio::test]
1586 async fn test_forwarding_log_to_client() {
1587 let router = DialMailboxRouter::new();
1589 let (proc_addr, client_rx) =
1590 channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1591 let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
1592 proc.clone().serve(client_rx);
1593 router.bind(id!(client[0]).into(), proc_addr.clone());
1594 let (client, _handle) = proc.instance("client").unwrap();
1595
1596 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
1598 unsafe {
1600 std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
1601 }
1602 let log_client_actor = LogClientActor::new(()).await.unwrap();
1603 let log_client: ActorRef<LogClientActor> =
1604 proc.spawn("log_client", log_client_actor).unwrap().bind();
1605 let log_forwarder_actor = LogForwardActor::new(log_client.clone()).await.unwrap();
1606 let log_forwarder: ActorRef<LogForwardActor> = proc
1607 .spawn("log_forwarder", log_forwarder_actor)
1608 .unwrap()
1609 .bind();
1610
1611 let tx: ChannelTx<LogMessage> = channel::dial(log_channel).unwrap();
1613 tx.post(LogMessage::Log {
1614 hostname: "my_host".into(),
1615 pid: 1,
1616 output_target: OutputTarget::Stderr,
1617 payload: wirevalue::Any::serialize(&"will not stream".to_string()).unwrap(),
1618 });
1619
1620 log_forwarder.set_mode(&client, true).await.unwrap();
1622 tx.post(LogMessage::Log {
1623 hostname: "my_host".into(),
1624 pid: 1,
1625 output_target: OutputTarget::Stderr,
1626 payload: wirevalue::Any::serialize(&"will stream".to_string()).unwrap(),
1627 });
1628
1629 }
1631
1632 #[test]
1633 fn test_deserialize_message_lines_string() {
1634 let message = "Line 1\nLine 2\nLine 3".to_string();
1636 let serialized = wirevalue::Any::serialize(&message).unwrap();
1637
1638 let result = deserialize_message_lines(&serialized).unwrap();
1639 assert_eq!(result, vec![vec!["Line 1", "Line 2", "Line 3"]]);
1640
1641 let message_bytes = vec![
1643 "Hello\nWorld".as_bytes().to_vec(),
1644 "UTF-8 \u{1F980}\nTest".as_bytes().to_vec(),
1645 ];
1646 let serialized = wirevalue::Any::serialize(&message_bytes).unwrap();
1647
1648 let result = deserialize_message_lines(&serialized).unwrap();
1649 assert_eq!(
1650 result,
1651 vec![vec!["Hello", "World"], vec!["UTF-8 \u{1F980}", "Test"]]
1652 );
1653
1654 let message = "Single line message".to_string();
1656 let serialized = wirevalue::Any::serialize(&message).unwrap();
1657
1658 let result = deserialize_message_lines(&serialized).unwrap();
1659
1660 assert_eq!(result, vec![vec!["Single line message"]]);
1661
1662 let message = "\n\n".to_string();
1664 let serialized = wirevalue::Any::serialize(&message).unwrap();
1665
1666 let result = deserialize_message_lines(&serialized).unwrap();
1667
1668 assert_eq!(result, vec![vec!["", ""]]);
1669
1670 let invalid_utf8_bytes = vec![vec![0xFF, 0xFE, 0xFD]]; let serialized = wirevalue::Any::serialize(&invalid_utf8_bytes).unwrap();
1673
1674 let result = deserialize_message_lines(&serialized);
1675
1676 assert!(
1678 result.is_err(),
1679 "Expected deserialization to fail with invalid UTF-8 bytes"
1680 );
1681 }
1682 #[allow(dead_code)]
1683 struct MockLogSender {
1684 log_sender: mpsc::UnboundedSender<(OutputTarget, String)>, flush_called: Arc<Mutex<bool>>, }
1687
1688 impl MockLogSender {
1689 #[allow(dead_code)]
1690 fn new(log_sender: mpsc::UnboundedSender<(OutputTarget, String)>) -> Self {
1691 Self {
1692 log_sender,
1693 flush_called: Arc::new(Mutex::new(false)),
1694 }
1695 }
1696 }
1697
1698 #[async_trait]
1699 impl LogSender for MockLogSender {
1700 fn send(
1701 &mut self,
1702 output_target: OutputTarget,
1703 payload: Vec<Vec<u8>>,
1704 ) -> anyhow::Result<()> {
1705 let lines: Vec<String> = payload
1707 .iter()
1708 .map(|b| String::from_utf8_lossy(b).trim_end_matches('\n').to_owned())
1709 .collect();
1710
1711 for line in lines {
1712 self.log_sender
1713 .send((output_target, line))
1714 .map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e))?;
1715 }
1716 Ok(())
1717 }
1718
1719 fn flush(&mut self) -> anyhow::Result<()> {
1720 let mut flush_called = self.flush_called.lock().unwrap();
1722 *flush_called = true;
1723
1724 Ok(())
1727 }
1728 }
1729
1730 #[test]
1731 fn test_string_similarity() {
1732 assert_eq!(normalized_edit_distance("hello", "hello"), 0.0);
1734
1735 assert_eq!(normalized_edit_distance("hello", "i'mdiff"), 1.0);
1737
1738 assert!(normalized_edit_distance("hello", "helo") < 0.5);
1740 assert!(normalized_edit_distance("hello", "hello!") < 0.5);
1741
1742 assert_eq!(normalized_edit_distance("", ""), 0.0);
1744 assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1745 }
1746
1747 #[test]
1748 fn test_add_line_to_empty_aggregator() {
1749 let mut aggregator = Aggregator::new();
1750 let result = aggregator.add_line("ERROR 404 not found");
1751
1752 assert!(result.is_ok());
1753 assert_eq!(aggregator.lines.len(), 1);
1754 assert_eq!(aggregator.lines[0].content, "ERROR 404 not found");
1755 assert_eq!(aggregator.lines[0].count, 1);
1756 }
1757
1758 #[test]
1759 fn test_add_line_merges_with_similar_line() {
1760 let mut aggregator = Aggregator::new_with_threshold(0.2);
1761
1762 aggregator.add_line("ERROR 404 timeout").unwrap();
1764 assert_eq!(aggregator.lines.len(), 1);
1765
1766 aggregator.add_line("ERROR 500 timeout").unwrap();
1768 assert_eq!(aggregator.lines.len(), 1); assert_eq!(aggregator.lines[0].count, 2);
1770
1771 aggregator
1773 .add_line("WARNING database connection failed")
1774 .unwrap();
1775 assert_eq!(aggregator.lines.len(), 2); aggregator
1779 .add_line("WARNING database connection timed out")
1780 .unwrap();
1781 assert_eq!(aggregator.lines.len(), 2); assert_eq!(aggregator.lines[1].count, 2); }
1784
1785 #[test]
1786 fn test_aggregation_of_similar_log_lines() {
1787 let mut aggregator = Aggregator::new_with_threshold(0.2);
1788
1789 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/i/ims-ansi, falling back to search/replace to update prefixes for this file.").unwrap();
1791 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,351 conda-unpack-fb:292] Found invalid offsets for lib/pkgconfig/ncursesw.pc, falling back to search/replace to update prefixes for this file.").unwrap();
1792 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/k/kt7, falling back to search/replace to update prefixes for this file.").unwrap();
1793
1794 assert_eq!(aggregator.lines.len(), 1);
1796
1797 assert_eq!(aggregator.lines[0].count, 3);
1799 }
1800
1801 #[tokio::test]
1802 async fn test_stream_fwd_creation() {
1803 hyperactor_telemetry::initialize_logging_for_test();
1804
1805 let (mut writer, reader) = tokio::io::duplex(1024);
1806 let (log_channel, mut rx) =
1807 channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1808
1809 let temp_file = tempfile::NamedTempFile::new().unwrap();
1811 let temp_path = temp_file.path().to_path_buf();
1812
1813 let file_writer = tokio::fs::OpenOptions::new()
1815 .create(true)
1816 .write(true)
1817 .append(true)
1818 .open(&temp_path)
1819 .await
1820 .unwrap();
1821
1822 let file_monitor = FileAppender::new();
1824 let file_monitor_addr = file_monitor
1825 .as_ref()
1826 .map(|fm| fm.addr_for(OutputTarget::Stdout));
1827
1828 let monitor = StreamFwder::start_with_writer(
1829 reader,
1830 Box::new(file_writer),
1831 file_monitor_addr,
1832 OutputTarget::Stdout,
1833 3, Some(log_channel),
1835 12345, None, );
1838
1839 RealClock.sleep(Duration::from_millis(500)).await;
1841
1842 writer.write_all(b"Initial log line\n").await.unwrap();
1844 writer.flush().await.unwrap();
1845
1846 for i in 1..=5 {
1848 writer
1849 .write_all(format!("New log line {}\n", i).as_bytes())
1850 .await
1851 .unwrap();
1852 }
1853 writer.flush().await.unwrap();
1854
1855 RealClock.sleep(Duration::from_millis(500)).await;
1857
1858 let timeout = Duration::from_secs(1);
1860 let _ = RealClock
1861 .timeout(timeout, rx.recv())
1862 .await
1863 .unwrap_or_else(|_| panic!("Did not get log message within {:?}", timeout));
1864
1865 RealClock.sleep(Duration::from_millis(200)).await;
1867
1868 let (recent_lines, _result) = monitor.abort().await;
1869
1870 assert!(
1871 recent_lines.len() >= 3,
1872 "Expected buffer with at least 3 lines, got {} lines: {:?}",
1873 recent_lines.len(),
1874 recent_lines
1875 );
1876
1877 let file_contents = std::fs::read_to_string(&temp_path).unwrap();
1878 assert!(
1879 file_contents.contains("Initial log line"),
1880 "Expected temp file to contain 'Initial log line', got: {:?}",
1881 file_contents
1882 );
1883 assert!(
1884 file_contents.contains("New log line 1"),
1885 "Expected temp file to contain 'New log line 1', got: {:?}",
1886 file_contents
1887 );
1888 assert!(
1889 file_contents.contains("New log line 5"),
1890 "Expected temp file to contain 'New log line 5', got: {:?}",
1891 file_contents
1892 );
1893 }
1894
1895 #[test]
1896 fn test_aggregator_custom_threshold() {
1897 let mut strict_aggregator = Aggregator::new_with_threshold(0.05);
1899 strict_aggregator.add_line("ERROR 404").unwrap();
1900 strict_aggregator.add_line("ERROR 500").unwrap(); assert_eq!(strict_aggregator.lines.len(), 2);
1902
1903 let mut lenient_aggregator = Aggregator::new_with_threshold(0.8);
1905 lenient_aggregator.add_line("ERROR 404").unwrap();
1906 lenient_aggregator.add_line("WARNING 200").unwrap(); assert_eq!(lenient_aggregator.lines.len(), 1);
1908 assert_eq!(lenient_aggregator.lines[0].count, 2);
1909 }
1910
1911 #[test]
1912 fn test_format_system_time() {
1913 let test_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1609459200); let formatted = format_system_time(test_time);
1915
1916 assert!(formatted.contains("-"));
1918 assert!(formatted.contains(":"));
1919 assert!(formatted.len() > 10); }
1921
1922 #[test]
1923 fn test_aggregator_display_formatting() {
1924 let mut aggregator = Aggregator::new();
1925 aggregator.add_line("Test error message").unwrap();
1926 aggregator.add_line("Test error message").unwrap(); let display_string = format!("{}", aggregator);
1929
1930 assert!(display_string.contains("Aggregated Logs"));
1932 assert!(display_string.contains("[2 similar log lines]"));
1933 assert!(display_string.contains("Test error message"));
1934 assert!(display_string.contains(">>>") && display_string.contains("<<<"));
1935 }
1936
1937 #[tokio::test]
1938 async fn test_local_log_sender_inactive_status() {
1939 let (log_channel, _) =
1940 channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1941 let mut sender = LocalLogSender::new(log_channel, 12345).unwrap();
1942
1943 let result = sender.send(OutputTarget::Stdout, vec![b"test".to_vec()]);
1947 assert!(result.is_ok());
1948
1949 let result = sender.flush();
1950 assert!(result.is_ok());
1951 }
1952
1953 #[test]
1954 fn test_levenshtein_distance_edge_cases() {
1955 assert_eq!(levenshtein_distance("", ""), 0);
1957 assert_eq!(levenshtein_distance("", "hello"), 5);
1958 assert_eq!(levenshtein_distance("hello", ""), 5);
1959
1960 assert_eq!(levenshtein_distance("hello", "hello"), 0);
1962
1963 assert_eq!(levenshtein_distance("hello", "helo"), 1); assert_eq!(levenshtein_distance("helo", "hello"), 1); assert_eq!(levenshtein_distance("hello", "hallo"), 1); assert_eq!(levenshtein_distance("café", "cafe"), 1);
1970 }
1971
1972 #[test]
1973 fn test_normalized_edit_distance_edge_cases() {
1974 assert_eq!(normalized_edit_distance("", ""), 0.0);
1976
1977 assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1979 assert_eq!(normalized_edit_distance("", "hello"), 1.0);
1980
1981 let distance = normalized_edit_distance("completely", "different");
1983 assert!(distance >= 0.0 && distance <= 1.0);
1984 }
1985
1986 #[tokio::test]
1987 async fn test_deserialize_message_lines_edge_cases() {
1988 let empty_message = "".to_string();
1990 let serialized = wirevalue::Any::serialize(&empty_message).unwrap();
1991 let result = deserialize_message_lines(&serialized).unwrap();
1992 assert_eq!(result, vec![vec![] as Vec<String>]);
1993
1994 let trailing_newline = "line1\nline2\n".to_string();
1996 let serialized = wirevalue::Any::serialize(&trailing_newline).unwrap();
1997 let result = deserialize_message_lines(&serialized).unwrap();
1998 assert_eq!(result, vec![vec!["line1", "line2"]]);
1999 }
2000
2001 #[test]
2002 fn test_output_target_serialization() {
2003 let stdout_serialized = serde_json::to_string(&OutputTarget::Stdout).unwrap();
2005 let stderr_serialized = serde_json::to_string(&OutputTarget::Stderr).unwrap();
2006
2007 let stdout_deserialized: OutputTarget = serde_json::from_str(&stdout_serialized).unwrap();
2008 let stderr_deserialized: OutputTarget = serde_json::from_str(&stderr_serialized).unwrap();
2009
2010 assert_eq!(stdout_deserialized, OutputTarget::Stdout);
2011 assert_eq!(stderr_deserialized, OutputTarget::Stderr);
2012 }
2013
2014 #[test]
2015 fn test_log_line_display_formatting() {
2016 let log_line = LogLine::new("Test message".to_string());
2017 let display_string = format!("{}", log_line);
2018
2019 assert!(display_string.contains("[1 similar log lines]"));
2020 assert!(display_string.contains("Test message"));
2021
2022 let mut log_line_multi = LogLine::new("Test message".to_string());
2024 log_line_multi.count = 5;
2025 let display_string_multi = format!("{}", log_line_multi);
2026
2027 assert!(display_string_multi.contains("[5 similar log lines]"));
2028 assert!(display_string_multi.contains("Test message"));
2029 }
2030
2031 fn create_mock_reader(data: Vec<u8>) -> std::io::Cursor<Vec<u8>> {
2033 std::io::Cursor::new(data)
2034 }
2035
2036 #[tokio::test]
2037 async fn test_process_file_content_basic() {
2038 let data = b"line1\nline2\nline3\n".to_vec();
2039 let mut reader = create_mock_reader(data.clone());
2040 let max_buf_size = 10;
2041
2042 let result =
2043 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2044 .await
2045 .unwrap();
2046
2047 assert_eq!(result.lines.len(), 3);
2048 assert_eq!(result.lines[0], b"line1");
2049 assert_eq!(result.lines[1], b"line2");
2050 assert_eq!(result.lines[2], b"line3");
2051 assert_eq!(result.new_position, data.len() as u64);
2052 assert!(result.incomplete_line_buffer.is_empty());
2053 }
2054
2055 #[tokio::test]
2056 async fn test_process_file_content_incomplete_line() {
2057 let data = b"line1\nline2\npartial".to_vec();
2058 let mut reader = create_mock_reader(data.clone());
2059 let max_buf_size = 10;
2060
2061 let result =
2062 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2063 .await
2064 .unwrap();
2065
2066 assert_eq!(result.lines.len(), 2);
2067 assert_eq!(result.lines[0], b"line1");
2068 assert_eq!(result.lines[1], b"line2");
2069 assert_eq!(result.new_position, data.len() as u64);
2070 assert_eq!(result.incomplete_line_buffer, b"partial");
2071 }
2072
2073 #[tokio::test]
2074 async fn test_process_file_content_with_existing_buffer() {
2075 let data = b"omplete\nline2\nline3\n".to_vec();
2076 let mut reader = create_mock_reader(data.clone());
2077 let existing_buffer = b"inc".to_vec();
2078 let max_buf_size = 10;
2079
2080 let result = process_file_content(
2081 &mut reader,
2082 0,
2083 data.len() as u64,
2084 existing_buffer,
2085 max_buf_size,
2086 )
2087 .await
2088 .unwrap();
2089
2090 assert_eq!(result.lines.len(), 3);
2091 assert_eq!(result.lines[0], b"incomplete");
2092 assert_eq!(result.lines[1], b"line2");
2093 assert_eq!(result.lines[2], b"line3");
2094 assert_eq!(result.new_position, data.len() as u64);
2095 assert!(result.incomplete_line_buffer.is_empty());
2096 }
2097
2098 #[tokio::test]
2099 async fn test_process_file_content_empty_file() {
2100 let data = Vec::new();
2101 let mut reader = create_mock_reader(data.clone());
2102 let max_buf_size = 10;
2103
2104 let result = process_file_content(&mut reader, 0, 0, Vec::new(), max_buf_size)
2105 .await
2106 .unwrap();
2107
2108 assert!(result.lines.is_empty());
2109 assert_eq!(result.new_position, 0);
2110 assert!(result.incomplete_line_buffer.is_empty());
2111 }
2112
2113 #[tokio::test]
2114 async fn test_process_file_content_only_newlines() {
2115 let data = b"\n\n\n".to_vec();
2116 let mut reader = create_mock_reader(data.clone());
2117 let max_buf_size = 10;
2118
2119 let result =
2120 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2121 .await
2122 .unwrap();
2123
2124 assert!(result.lines.is_empty());
2126 assert_eq!(result.new_position, data.len() as u64);
2127 assert!(result.incomplete_line_buffer.is_empty());
2128 }
2129
2130 #[tokio::test]
2131 async fn test_process_file_content_no_newlines() {
2132 let data = b"no newlines here".to_vec();
2133 let mut reader = create_mock_reader(data.clone());
2134 let max_buf_size = 10;
2135
2136 let result =
2137 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2138 .await
2139 .unwrap();
2140
2141 assert!(result.lines.is_empty());
2142 assert_eq!(result.new_position, data.len() as u64);
2143 assert_eq!(result.incomplete_line_buffer, b"no newlines here");
2144 }
2145
2146 #[tokio::test]
2147 async fn test_process_file_content_file_truncation() {
2148 let data = b"line1\nline2\n".to_vec();
2149 let mut reader = create_mock_reader(data.clone());
2150
2151 let result = process_file_content(
2153 &mut reader,
2154 100, data.len() as u64,
2156 Vec::new(),
2157 10, )
2159 .await
2160 .unwrap();
2161
2162 assert_eq!(result.lines.len(), 2);
2164 assert_eq!(result.lines[0], b"line1");
2165 assert_eq!(result.lines[1], b"line2");
2166 assert_eq!(result.new_position, data.len() as u64);
2167 assert!(result.incomplete_line_buffer.is_empty());
2168 }
2169
2170 #[tokio::test]
2171 async fn test_process_file_content_seek_to_position() {
2172 let data = b"line1\nline2\nline3\n".to_vec();
2173 let mut reader = create_mock_reader(data.clone());
2174
2175 let result = process_file_content(&mut reader, 6, data.len() as u64, Vec::new(), 10)
2177 .await
2178 .unwrap();
2179
2180 assert_eq!(result.lines.len(), 2);
2181 assert_eq!(result.lines[0], b"line2");
2182 assert_eq!(result.lines[1], b"line3");
2183 assert_eq!(result.new_position, data.len() as u64);
2184 assert!(result.incomplete_line_buffer.is_empty());
2185 }
2186
2187 #[tokio::test]
2188 async fn test_process_file_content_position_equals_file_size() {
2189 let data = b"line1\nline2\n".to_vec();
2190 let mut reader = create_mock_reader(data.clone());
2191
2192 let result = process_file_content(
2194 &mut reader,
2195 data.len() as u64,
2196 data.len() as u64,
2197 Vec::new(),
2198 10,
2199 )
2200 .await
2201 .unwrap();
2202
2203 assert!(
2205 result.lines.is_empty(),
2206 "Expected empty line got {:?}",
2207 result.lines
2208 );
2209 assert_eq!(result.new_position, data.len() as u64);
2210 assert!(result.incomplete_line_buffer.is_empty());
2211 }
2212
2213 #[tokio::test]
2214 async fn test_process_file_content_large_line_truncation() {
2215 let large_line = "x".repeat(MAX_LINE_SIZE + 1000);
2217 let data = format!("{}\nline2\n", large_line).into_bytes();
2218 let mut reader = create_mock_reader(data.clone());
2219
2220 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2221 .await
2222 .unwrap();
2223
2224 assert_eq!(result.lines.len(), 2);
2225
2226 assert_eq!(
2228 result.lines[0].len(),
2229 MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2230 );
2231 assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2232
2233 assert_eq!(result.lines[1], b"line2");
2235
2236 assert_eq!(result.new_position, data.len() as u64);
2237 assert!(result.incomplete_line_buffer.is_empty());
2238 }
2239
2240 #[tokio::test]
2241 async fn test_process_file_content_mixed_line_endings() {
2242 let data = b"line1\nline2\r\nline3\n".to_vec();
2243 let mut reader = create_mock_reader(data.clone());
2244
2245 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2246 .await
2247 .unwrap();
2248
2249 assert_eq!(result.lines.len(), 3);
2250 assert_eq!(result.lines[0], b"line1");
2251 assert_eq!(result.lines[1], b"line2\r"); assert_eq!(result.lines[2], b"line3");
2253 assert_eq!(result.new_position, data.len() as u64);
2254 assert!(result.incomplete_line_buffer.is_empty());
2255 }
2256
2257 #[tokio::test]
2258 async fn test_process_file_content_existing_buffer_with_truncation() {
2259 let existing_buffer = "x".repeat(MAX_LINE_SIZE - 100);
2261 let data = format!("{}\nline2\n", "y".repeat(200)).into_bytes();
2262 let mut reader = create_mock_reader(data.clone());
2263
2264 let result = process_file_content(
2265 &mut reader,
2266 0,
2267 data.len() as u64,
2268 existing_buffer.into_bytes(),
2269 10,
2270 )
2271 .await
2272 .unwrap();
2273
2274 assert_eq!(result.lines.len(), 2);
2275
2276 assert_eq!(
2278 result.lines[0].len(),
2279 MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2280 );
2281 assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2282
2283 assert_eq!(result.lines[1], b"line2");
2285
2286 assert_eq!(result.new_position, data.len() as u64);
2287 assert!(result.incomplete_line_buffer.is_empty());
2288 }
2289
2290 #[tokio::test]
2291 async fn test_process_file_content_single_character_lines() {
2292 let data = b"a\nb\nc\n".to_vec();
2293 let mut reader = create_mock_reader(data.clone());
2294
2295 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2296 .await
2297 .unwrap();
2298
2299 assert_eq!(result.lines.len(), 3);
2300 assert_eq!(result.lines[0], b"a");
2301 assert_eq!(result.lines[1], b"b");
2302 assert_eq!(result.lines[2], b"c");
2303 assert_eq!(result.new_position, data.len() as u64);
2304 assert!(result.incomplete_line_buffer.is_empty());
2305 }
2306
2307 #[tokio::test]
2308 async fn test_process_file_content_binary_data() {
2309 let data = vec![0x00, 0x01, 0x02, b'\n', 0xFF, 0xFE, b'\n'];
2310 let mut reader = create_mock_reader(data.clone());
2311
2312 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2313 .await
2314 .unwrap();
2315
2316 assert_eq!(result.lines.len(), 2);
2317 assert_eq!(result.lines[0], vec![0x00, 0x01, 0x02]);
2318 assert_eq!(result.lines[1], vec![0xFF, 0xFE]);
2319 assert_eq!(result.new_position, data.len() as u64);
2320 assert!(result.incomplete_line_buffer.is_empty());
2321 }
2322
2323 #[tokio::test]
2324 async fn test_process_file_content_resume_after_max_buffer_size() {
2325 let data = b"line 1\nline 2\nline 3\n".to_vec();
2327 let mut reader = create_mock_reader(data.clone());
2328 let max_buffer_size = 2; let result1 = process_file_content(
2332 &mut reader,
2333 0, data.len() as u64,
2335 Vec::new(), max_buffer_size,
2337 )
2338 .await
2339 .unwrap();
2340
2341 assert_eq!(result1.lines.len(), 2, "First call should return 2 lines");
2343 assert_eq!(result1.lines[0], b"line 1");
2344 assert_eq!(result1.lines[1], b"line 2");
2345 assert!(result1.incomplete_line_buffer.is_empty());
2346
2347 let expected_position_after_first_call = b"line 1\nline 2\n".len() as u64;
2349 assert_eq!(result1.new_position, expected_position_after_first_call);
2350
2351 let mut reader2 = create_mock_reader(data.clone());
2353 let result2 = process_file_content(
2354 &mut reader2,
2355 result1.new_position, data.len() as u64,
2357 result1.incomplete_line_buffer, max_buffer_size,
2359 )
2360 .await
2361 .unwrap();
2362
2363 assert_eq!(result2.lines.len(), 1, "Second call should return 1 line");
2365 assert_eq!(result2.lines[0], b"line 3");
2366 assert!(result2.incomplete_line_buffer.is_empty());
2367 assert_eq!(result2.new_position, data.len() as u64);
2368 }
2369
2370 #[tokio::test]
2371 async fn test_utf8_truncation() {
2372 hyperactor_telemetry::initialize_logging_for_test();
2376
2377 let mut long_line = "x".repeat(MAX_LINE_SIZE - 1);
2379 long_line.push('🦀'); long_line.push('\n');
2381
2382 let (mut writer, reader) = tokio::io::duplex(8192);
2384
2385 let monitor = StreamFwder::start_with_writer(
2387 reader,
2388 Box::new(tokio::io::sink()), None, OutputTarget::Stdout,
2391 1, None, 12345, None, );
2396
2397 writer.write_all(long_line.as_bytes()).await.unwrap();
2399 drop(writer); let (_lines, result) = monitor.abort().await;
2403 result.expect("Should complete without panic despite UTF-8 truncation");
2404 }
2405}