1use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::fmt;
12use std::path::Path;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::Duration;
16use std::time::SystemTime;
17
18use anyhow::Result;
19use async_trait::async_trait;
20use chrono::DateTime;
21use chrono::Local;
22use hostname;
23use hyperactor::Actor;
24use hyperactor::ActorRef;
25use hyperactor::Bind;
26use hyperactor::Context;
27use hyperactor::HandleClient;
28use hyperactor::Handler;
29use hyperactor::Instance;
30use hyperactor::Named;
31use hyperactor::OncePortRef;
32use hyperactor::RefClient;
33use hyperactor::Unbind;
34use hyperactor::channel;
35use hyperactor::channel::ChannelAddr;
36use hyperactor::channel::ChannelRx;
37use hyperactor::channel::ChannelTransport;
38use hyperactor::channel::ChannelTx;
39use hyperactor::channel::Rx;
40use hyperactor::channel::Tx;
41use hyperactor::channel::TxStatus;
42use hyperactor::clock::Clock;
43use hyperactor::clock::RealClock;
44use hyperactor::config::CONFIG;
45use hyperactor::config::ConfigAttr;
46use hyperactor::data::Serialized;
47use hyperactor::declare_attrs;
48use hyperactor_telemetry::env;
49use hyperactor_telemetry::log_file_path;
50use serde::Deserialize;
51use serde::Serialize;
52use tokio::io;
53use tokio::io::AsyncRead;
54use tokio::io::AsyncReadExt;
55use tokio::io::AsyncWriteExt;
56use tokio::sync::Mutex;
57use tokio::sync::Notify;
58use tokio::sync::RwLock;
59use tokio::sync::watch::Receiver;
60use tokio::task::JoinHandle;
61use tracing::Level;
62
63use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
64use crate::shortuuid::ShortUuid;
65
66mod line_prefixing_writer;
67
68pub(crate) const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5;
69const MAX_LINE_SIZE: usize = 4 * 1024;
70
71declare_attrs! {
72 @meta(CONFIG = ConfigAttr {
76 env_name: Some("HYPERACTOR_READ_LOG_BUFFER".to_string()),
77 py_name: None,
78 })
79 pub attr READ_LOG_BUFFER: usize = 100;
80
81 @meta(CONFIG = ConfigAttr {
83 env_name: Some("HYPERACTOR_FORCE_FILE_LOG".to_string()),
84 py_name: None,
85 })
86 pub attr FORCE_FILE_LOG: bool = false;
87
88 @meta(CONFIG = ConfigAttr {
90 env_name: Some("HYPERACTOR_PREFIX_WITH_RANK".to_string()),
91 py_name: None,
92 })
93 pub attr PREFIX_WITH_RANK: bool = true;
94}
95
96fn levenshtein_distance(left: &str, right: &str) -> usize {
98 let left_chars: Vec<char> = left.chars().collect();
99 let right_chars: Vec<char> = right.chars().collect();
100
101 let left_len = left_chars.len();
102 let right_len = right_chars.len();
103
104 if left_len == 0 {
106 return right_len;
107 }
108 if right_len == 0 {
109 return left_len;
110 }
111
112 let mut matrix = vec![vec![0; right_len + 1]; left_len + 1];
114
115 for (i, row) in matrix.iter_mut().enumerate().take(left_len + 1) {
117 row[0] = i;
118 }
119 for (j, cell) in matrix[0].iter_mut().enumerate().take(right_len + 1) {
120 *cell = j;
121 }
122
123 for i in 1..=left_len {
125 for j in 1..=right_len {
126 let cost = if left_chars[i - 1] == right_chars[j - 1] {
127 0
128 } else {
129 1
130 };
131
132 matrix[i][j] = std::cmp::min(
133 std::cmp::min(
134 matrix[i - 1][j] + 1, matrix[i][j - 1] + 1, ),
137 matrix[i - 1][j - 1] + cost, );
139 }
140 }
141
142 matrix[left_len][right_len]
144}
145
146fn normalized_edit_distance(left: &str, right: &str) -> f64 {
148 let distance = levenshtein_distance(left, right) as f64;
149 let max_len = std::cmp::max(left.len(), right.len()) as f64;
150
151 if max_len == 0.0 {
152 0.0 } else {
154 distance / max_len
155 }
156}
157
158#[derive(Debug, Clone)]
159struct LogLine {
161 content: String,
162 pub count: u64,
163}
164
165impl LogLine {
166 fn new(content: String) -> Self {
167 Self { content, count: 1 }
168 }
169}
170
171impl fmt::Display for LogLine {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 write!(
174 f,
175 "\x1b[33m[{} similar log lines]\x1b[0m {}",
176 self.count, self.content
177 )
178 }
179}
180
181#[derive(Debug, Clone)]
182struct Aggregator {
185 lines: Vec<LogLine>,
186 start_time: SystemTime,
187 similarity_threshold: f64, }
189
190impl Aggregator {
191 fn new() -> Self {
192 Self::new_with_threshold(0.15)
194 }
195
196 fn new_with_threshold(threshold: f64) -> Self {
197 Aggregator {
198 lines: vec![],
199 start_time: RealClock.system_time_now(),
200 similarity_threshold: threshold,
201 }
202 }
203
204 fn reset(&mut self) {
205 self.lines.clear();
206 self.start_time = RealClock.system_time_now();
207 }
208
209 fn add_line(&mut self, line: &str) -> anyhow::Result<()> {
210 let mut best_match_idx = None;
212 let mut best_similarity = f64::MAX;
213
214 for (idx, existing_line) in self.lines.iter().enumerate() {
215 let distance = normalized_edit_distance(&existing_line.content, line);
216
217 if distance < best_similarity && distance < self.similarity_threshold {
219 best_match_idx = Some(idx);
220 best_similarity = distance;
221 }
222 }
223
224 if let Some(idx) = best_match_idx {
226 self.lines[idx].count += 1;
227 } else {
228 self.lines.push(LogLine::new(line.to_string()));
230 }
231
232 Ok(())
233 }
234
235 fn is_empty(&self) -> bool {
236 self.lines.is_empty()
237 }
238}
239
240fn format_system_time(time: SystemTime) -> String {
242 let datetime: DateTime<Local> = time.into();
243 datetime.format("%Y-%m-%d %H:%M:%S").to_string()
244}
245
246impl fmt::Display for Aggregator {
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 let start_time_str = format_system_time(self.start_time);
250
251 let current_time = RealClock.system_time_now();
253 let end_time_str = format_system_time(current_time);
254
255 writeln!(
257 f,
258 "\x1b[36m>>> Aggregated Logs ({}) >>>\x1b[0m",
259 start_time_str
260 )?;
261
262 for line in self.lines.iter() {
264 writeln!(f, "{}", line)?;
265 }
266 writeln!(
267 f,
268 "\x1b[36m<<< Aggregated Logs ({}) <<<\x1b[0m",
269 end_time_str
270 )?;
271 Ok(())
272 }
273}
274
275#[derive(
277 Debug,
278 Clone,
279 Serialize,
280 Deserialize,
281 Named,
282 Handler,
283 HandleClient,
284 RefClient
285)]
286pub enum LogMessage {
287 Log {
289 hostname: String,
291 pid: u32,
293 output_target: OutputTarget,
295 payload: Serialized,
297 },
298
299 Flush {
301 sync_version: Option<u64>,
304 },
305}
306
307#[derive(
309 Debug,
310 Clone,
311 Serialize,
312 Deserialize,
313 Named,
314 Handler,
315 HandleClient,
316 RefClient
317)]
318pub enum LogClientMessage {
319 SetAggregate {
320 aggregate_window_sec: Option<u64>,
322 },
323
324 StartSyncFlush {
326 expected_procs: usize,
328 reply: OncePortRef<()>,
330 version: OncePortRef<u64>,
332 },
333}
334
335#[async_trait]
337pub trait LogSender: Send + Sync {
338 fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()>;
340
341 fn flush(&mut self) -> anyhow::Result<()>;
344}
345
346#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
348pub enum OutputTarget {
349 Stdout,
351 Stderr,
353}
354
355#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
356pub enum Stream {
357 ChildStdout,
359 ChildStderr,
361}
362
363pub struct LocalLogSender {
365 hostname: String,
366 pid: u32,
367 tx: ChannelTx<LogMessage>,
368 status: Receiver<TxStatus>,
369}
370
371impl LocalLogSender {
372 fn new(log_channel: ChannelAddr, pid: u32) -> Result<Self, anyhow::Error> {
373 let tx = channel::dial::<LogMessage>(log_channel)?;
374 let status = tx.status().clone();
375
376 let hostname = hostname::get()
377 .unwrap_or_else(|_| "unknown_host".into())
378 .into_string()
379 .unwrap_or("unknown_host".to_string());
380 Ok(Self {
381 hostname,
382 pid,
383 tx,
384 status,
385 })
386 }
387}
388
389#[async_trait]
390impl LogSender for LocalLogSender {
391 fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()> {
392 if TxStatus::Active == *self.status.borrow() {
393 self.tx.post(LogMessage::Log {
395 hostname: self.hostname.clone(),
396 pid: self.pid,
397 output_target: target,
398 payload: Serialized::serialize(&payload)?,
399 });
400 }
401
402 Ok(())
403 }
404
405 fn flush(&mut self) -> anyhow::Result<()> {
406 if TxStatus::Active == *self.status.borrow() {
408 self.tx.post(LogMessage::Flush { sync_version: None });
410 }
411 Ok(())
412 }
413}
414
415#[derive(Debug, Clone, Serialize, Deserialize, Named)]
417pub struct FileMonitorMessage {
418 lines: Vec<String>,
419}
420
421pub struct FileAppender {
423 stdout_addr: ChannelAddr,
424 stderr_addr: ChannelAddr,
425 #[allow(dead_code)] stdout_task: JoinHandle<()>,
427 #[allow(dead_code)]
428 stderr_task: JoinHandle<()>,
429 stop: Arc<Notify>,
430}
431
432impl fmt::Debug for FileAppender {
433 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434 f.debug_struct("FileMonitor")
435 .field("stdout_addr", &self.stdout_addr)
436 .field("stderr_addr", &self.stderr_addr)
437 .finish()
438 }
439}
440
441impl FileAppender {
442 pub fn new() -> Option<Self> {
445 let stop = Arc::new(Notify::new());
446 let file_name_tag = hostname::get()
448 .unwrap_or_else(|_| "unknown_host".into())
449 .into_string()
450 .unwrap_or("unknown_host".to_string());
451
452 let (stdout_path, stdout_writer) =
454 match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stdout) {
455 Some(writer) => writer,
456 None => {
457 tracing::warn!("failed to create stdout file");
458 return None;
459 }
460 };
461 let (stdout_addr, stdout_rx) = {
462 let _guard = tracing::span!(Level::INFO, "appender", file = "stdout").entered();
463 match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
464 Ok((addr, rx)) => (addr, rx),
465 Err(e) => {
466 tracing::warn!("failed to serve stdout channel: {}", e);
467 return None;
468 }
469 }
470 };
471 let stdout_stop = stop.clone();
472 let stdout_task = tokio::spawn(file_monitor_task(
473 stdout_rx,
474 stdout_writer,
475 OutputTarget::Stdout,
476 stdout_stop,
477 ));
478
479 let (stderr_path, stderr_writer) =
481 match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stderr) {
482 Some(writer) => writer,
483 None => {
484 tracing::warn!("failed to create stderr file");
485 return None;
486 }
487 };
488 let (stderr_addr, stderr_rx) = {
489 let _guard = tracing::span!(Level::INFO, "appender", file = "stderr").entered();
490 match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
491 Ok((addr, rx)) => (addr, rx),
492 Err(e) => {
493 tracing::warn!("failed to serve stderr channel: {}", e);
494 return None;
495 }
496 }
497 };
498 let stderr_stop = stop.clone();
499 let stderr_task = tokio::spawn(file_monitor_task(
500 stderr_rx,
501 stderr_writer,
502 OutputTarget::Stderr,
503 stderr_stop,
504 ));
505
506 tracing::debug!(
507 "FileAppender: created for stdout {} stderr {} ",
508 stdout_path.display(),
509 stderr_path.display()
510 );
511
512 Some(Self {
513 stdout_addr,
514 stderr_addr,
515 stdout_task,
516 stderr_task,
517 stop,
518 })
519 }
520
521 pub fn addr_for(&self, target: OutputTarget) -> ChannelAddr {
523 match target {
524 OutputTarget::Stdout => self.stdout_addr.clone(),
525 OutputTarget::Stderr => self.stderr_addr.clone(),
526 }
527 }
528}
529
530impl Drop for FileAppender {
531 fn drop(&mut self) {
532 self.stop.notify_waiters();
534 tracing::debug!("FileMonitor: dropping, stop signal sent, tasks will flush and exit");
535 }
536}
537
538async fn file_monitor_task(
540 mut rx: ChannelRx<FileMonitorMessage>,
541 mut writer: Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
542 target: OutputTarget,
543 stop: Arc<Notify>,
544) {
545 loop {
546 tokio::select! {
547 msg = rx.recv() => {
548 match msg {
549 Ok(msg) => {
550 for line in &msg.lines {
552 if let Err(e) = writer.write_all(line.as_bytes()).await {
553 tracing::warn!("FileMonitor: failed to write line to file: {}", e);
554 continue;
555 }
556 if let Err(e) = writer.write_all(b"\n").await {
557 tracing::warn!("FileMonitor: failed to write newline to file: {}", e);
558 }
559 }
560 if let Err(e) = writer.flush().await {
561 tracing::warn!("FileMonitor: failed to flush file: {}", e);
562 }
563 }
564 Err(e) => {
565 tracing::debug!("FileMonitor task for {:?}: channel error: {}", target, e);
567 break;
568 }
569 }
570 }
571 _ = stop.notified() => {
572 tracing::debug!("FileMonitor task for {:?}: stop signal received", target);
573 break;
574 }
575 }
576 }
577
578 if let Err(e) = writer.flush().await {
580 tracing::warn!("FileMonitor: failed final flush: {}", e);
581 }
582 tracing::debug!("FileMonitor task for {:?} exiting", target);
583}
584
585fn create_unique_file_writer(
586 file_name_tag: &str,
587 output_target: OutputTarget,
588 env: env::Env,
589) -> Result<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
590 let suffix = match output_target {
591 OutputTarget::Stderr => "stderr",
592 OutputTarget::Stdout => "stdout",
593 };
594 let (path, filename) = log_file_path(env, None)?;
595 let path = Path::new(&path);
596 let mut full_path = PathBuf::from(path);
597
598 let uuid = ShortUuid::generate();
599
600 full_path.push(format!(
601 "{}_{}_{}.{}",
602 filename, file_name_tag, uuid, suffix
603 ));
604 let file = std::fs::OpenOptions::new()
605 .create(true)
606 .append(true)
607 .open(full_path.clone())?;
608 let tokio_file = tokio::fs::File::from_std(file);
609 Ok((full_path, Box::new(tokio_file)))
611}
612
613fn get_unique_local_log_destination(
614 file_name_tag: &str,
615 output_target: OutputTarget,
616) -> Option<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
617 let env: env::Env = env::Env::current();
618 if env == env::Env::Local && !hyperactor::config::global::get(FORCE_FILE_LOG) {
619 tracing::debug!("not creating log file because of env type");
620 None
621 } else {
622 match create_unique_file_writer(file_name_tag, output_target, env) {
623 Ok((a, b)) => Some((a, b)),
624 Err(e) => {
625 tracing::warn!("failed to create unique file writer: {}", e);
626 None
627 }
628 }
629 }
630}
631
632fn std_writer(target: OutputTarget) -> Box<dyn io::AsyncWrite + Send + Unpin> {
634 match target {
636 OutputTarget::Stdout => Box::new(tokio::io::stdout()),
637 OutputTarget::Stderr => Box::new(tokio::io::stderr()),
638 }
639}
640
641async fn tee(
644 mut reader: impl AsyncRead + Unpin + Send + 'static,
645 mut std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
646 log_sender: Option<Box<dyn LogSender + Send>>,
647 file_monitor_addr: Option<ChannelAddr>,
648 target: OutputTarget,
649 prefix: Option<String>,
650 stop: Arc<Notify>,
651 recent_lines_buf: RotatingLineBuffer,
652) -> Result<(), io::Error> {
653 let mut buf = [0u8; 8192];
654 let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE);
655 let mut log_sender = log_sender;
656
657 let mut file_monitor_tx: Option<ChannelTx<FileMonitorMessage>> =
659 file_monitor_addr.and_then(|addr| match channel::dial(addr.clone()) {
660 Ok(tx) => Some(tx),
661 Err(e) => {
662 tracing::warn!("Failed to dial file monitor channel {}: {}", addr, e);
663 None
664 }
665 });
666
667 loop {
668 tokio::select! {
669 read_result = reader.read(&mut buf) => {
670 match read_result {
671 Ok(n) => {
672 if n == 0 {
673 tracing::debug!("EOF reached in tee");
675 break;
676 }
677
678 if let Err(e) = std_writer.write_all(&buf[..n]).await {
680 tracing::warn!("error writing to std: {}", e);
681 }
682
683 let mut completed_lines = Vec::new();
685
686 for &byte in &buf[..n] {
687 if byte == b'\n' {
688 let mut line = String::from_utf8_lossy(&line_buffer).to_string();
690
691 if line.len() > MAX_LINE_SIZE {
694 let mut truncate_at = MAX_LINE_SIZE;
695 while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
696 truncate_at -= 1;
697 }
698 line.truncate(truncate_at);
699 line.push_str("... [TRUNCATED]");
700 }
701
702 let final_line = if let Some(ref p) = prefix {
704 format!("[{}] {}", p, line)
705 } else {
706 line
707 };
708
709 completed_lines.push(final_line);
710 line_buffer.clear();
711 } else {
712 line_buffer.push(byte);
713 }
714 }
715
716 if !completed_lines.is_empty() {
718 if let Some(ref mut sender) = log_sender {
719 let bytes: Vec<Vec<u8>> = completed_lines.iter()
720 .map(|s| s.as_bytes().to_vec())
721 .collect();
722 if let Err(e) = sender.send(target, bytes) {
723 tracing::warn!("error sending to log_sender: {}", e);
724 }
725 }
726
727 if let Some(ref mut tx) = file_monitor_tx {
729 let msg = FileMonitorMessage {
730 lines: completed_lines,
731 };
732 tx.post(msg);
734 }
735 }
736
737 recent_lines_buf.try_add_data(&buf, n);
738 },
739 Err(e) => {
740 tracing::debug!("read error in tee: {}", e);
741 return Err(e);
742 }
743 }
744 },
745 _ = stop.notified() => {
746 tracing::debug!("stop signal received in tee");
747 break;
748 }
749 }
750 }
751
752 std_writer.flush().await?;
753
754 if !line_buffer.is_empty() {
756 let mut line = String::from_utf8_lossy(&line_buffer).to_string();
757 if line.len() > MAX_LINE_SIZE {
760 let mut truncate_at = MAX_LINE_SIZE;
761 while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
762 truncate_at -= 1;
763 }
764 line.truncate(truncate_at);
765 line.push_str("... [TRUNCATED]");
766 }
767 let final_line = if let Some(ref p) = prefix {
768 format!("[{}] {}", p, line)
769 } else {
770 line
771 };
772
773 let final_lines = vec![final_line];
774
775 if let Some(ref mut sender) = log_sender {
777 let bytes: Vec<Vec<u8>> = final_lines.iter().map(|s| s.as_bytes().to_vec()).collect();
778 let _ = sender.send(target, bytes);
779 }
780
781 if let Some(ref mut tx) = file_monitor_tx {
783 let msg = FileMonitorMessage { lines: final_lines };
784 tx.post(msg);
785 }
786 }
787
788 if let Some(ref mut sender) = log_sender {
790 let _ = sender.flush();
791 }
792
793 Ok(())
794}
795
796#[derive(Debug, Clone)]
797struct RotatingLineBuffer {
798 recent_lines: Arc<RwLock<VecDeque<String>>>,
799 max_buffer_size: usize,
800}
801
802impl RotatingLineBuffer {
803 fn try_add_data(&self, buf: &[u8], buf_end: usize) {
804 let data_str = String::from_utf8_lossy(&buf[..buf_end]);
805 let lines: Vec<&str> = data_str.lines().collect();
806
807 if let Ok(mut recent_lines_guard) = self.recent_lines.try_write() {
808 for line in lines {
809 if !line.is_empty() {
810 recent_lines_guard.push_back(line.to_string());
811 if recent_lines_guard.len() > self.max_buffer_size {
812 recent_lines_guard.pop_front();
813 }
814 }
815 }
816 } else {
817 tracing::debug!("Failed to acquire write lock on recent_lines buffer in tee");
818 }
819 }
820
821 async fn peek(&self) -> Vec<String> {
822 let lines = self.recent_lines.read().await;
823 let start_idx = if lines.len() > self.max_buffer_size {
824 lines.len() - self.max_buffer_size
825 } else {
826 0
827 };
828
829 lines.range(start_idx..).cloned().collect()
830 }
831}
832
833pub struct StreamFwder {
835 teer: JoinHandle<Result<(), io::Error>>,
836 recent_lines_buf: RotatingLineBuffer,
838 stop: Arc<Notify>,
840}
841
842impl StreamFwder {
843 pub fn start(
850 reader: impl AsyncRead + Unpin + Send + 'static,
851 file_monitor_addr: Option<ChannelAddr>,
852 target: OutputTarget,
853 max_buffer_size: usize,
854 log_channel: Option<ChannelAddr>,
855 pid: u32,
856 local_rank: usize,
857 ) -> Self {
858 let prefix = match hyperactor::config::global::get(PREFIX_WITH_RANK) {
859 true => Some(local_rank.to_string()),
860 false => None,
861 };
862 let std_writer = std_writer(target);
863
864 Self::start_with_writer(
865 reader,
866 std_writer,
867 file_monitor_addr,
868 target,
869 max_buffer_size,
870 log_channel,
871 pid,
872 prefix,
873 )
874 }
875
876 fn start_with_writer(
878 reader: impl AsyncRead + Unpin + Send + 'static,
879 std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
880 file_monitor_addr: Option<ChannelAddr>,
881 target: OutputTarget,
882 max_buffer_size: usize,
883 log_channel: Option<ChannelAddr>,
884 pid: u32,
885 prefix: Option<String>,
886 ) -> Self {
887 debug_assert!(
893 file_monitor_addr.is_some() || max_buffer_size > 0 || log_channel.is_some(),
894 "StreamFwder started with no sinks and no tail"
895 );
896
897 let stop = Arc::new(Notify::new());
898 let recent_lines_buf = RotatingLineBuffer {
899 recent_lines: Arc::new(RwLock::new(VecDeque::<String>::with_capacity(
900 max_buffer_size,
901 ))),
902 max_buffer_size,
903 };
904
905 let log_sender: Option<Box<dyn LogSender + Send>> = if let Some(addr) = log_channel {
906 match LocalLogSender::new(addr, pid) {
907 Ok(s) => Some(Box::new(s) as Box<dyn LogSender + Send>),
908 Err(e) => {
909 tracing::error!("failed to create log sender: {}", e);
910 None
911 }
912 }
913 } else {
914 None
915 };
916
917 let teer_stop = stop.clone();
918 let recent_line_buf_clone = recent_lines_buf.clone();
919 let teer = tokio::spawn(async move {
920 tee(
921 reader,
922 std_writer,
923 log_sender,
924 file_monitor_addr,
925 target,
926 prefix,
927 teer_stop,
928 recent_line_buf_clone,
929 )
930 .await
931 });
932
933 StreamFwder {
934 teer,
935 recent_lines_buf,
936 stop,
937 }
938 }
939
940 pub async fn abort(self) -> (Vec<String>, Result<(), anyhow::Error>) {
941 self.stop.notify_waiters();
942
943 let lines = self.peek().await;
944 let teer_result = self.teer.await;
945
946 let result: Result<(), anyhow::Error> = match teer_result {
947 Ok(inner) => inner.map_err(anyhow::Error::from),
948 Err(e) => Err(e.into()),
949 };
950
951 (lines, result)
952 }
953
954 pub async fn peek(&self) -> Vec<String> {
957 self.recent_lines_buf.peek().await
958 }
959}
960
961#[derive(
963 Debug,
964 Clone,
965 Serialize,
966 Deserialize,
967 Named,
968 Handler,
969 HandleClient,
970 RefClient,
971 Bind,
972 Unbind
973)]
974pub enum LogForwardMessage {
975 Forward {},
977
978 SetMode { stream_to_client: bool },
980
981 ForceSyncFlush { version: u64 },
983}
984
985#[derive(Debug)]
987#[hyperactor::export(
988 spawn = true,
989 handlers = [LogForwardMessage {cast = true}],
990)]
991pub struct LogForwardActor {
992 rx: ChannelRx<LogMessage>,
993 flush_tx: Arc<Mutex<ChannelTx<LogMessage>>>,
994 next_flush_deadline: SystemTime,
995 logging_client_ref: ActorRef<LogClientActor>,
996 stream_to_client: bool,
997}
998
999#[async_trait]
1000impl Actor for LogForwardActor {
1001 type Params = ActorRef<LogClientActor>;
1002
1003 async fn new(logging_client_ref: Self::Params) -> Result<Self> {
1004 let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
1005 Ok(channel) => channel.parse()?,
1006 Err(err) => {
1007 tracing::debug!(
1008 "log forwarder actor failed to read env var {}: {}",
1009 BOOTSTRAP_LOG_CHANNEL,
1010 err
1011 );
1012 ChannelAddr::any(ChannelTransport::Unix)
1014 }
1015 };
1016 tracing::info!(
1017 "log forwarder {} serve at {}",
1018 std::process::id(),
1019 log_channel
1020 );
1021
1022 let rx = match channel::serve(log_channel.clone()) {
1023 Ok((_, rx)) => rx,
1024 Err(err) => {
1025 tracing::error!(
1028 "log forwarder actor failed to bootstrap on given channel {}: {}",
1029 log_channel,
1030 err
1031 );
1032 channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1
1033 }
1034 };
1035
1036 let flush_tx = Arc::new(Mutex::new(channel::dial::<LogMessage>(log_channel)?));
1038 let now = RealClock.system_time_now();
1039
1040 Ok(Self {
1041 rx,
1042 flush_tx,
1043 next_flush_deadline: now,
1044 logging_client_ref,
1045 stream_to_client: true,
1046 })
1047 }
1048
1049 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1050 this.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1051
1052 self.flush_tx
1054 .lock()
1055 .await
1056 .send(LogMessage::Flush { sync_version: None })
1057 .await?;
1058 Ok(())
1059 }
1060}
1061
1062#[async_trait]
1063#[hyperactor::forward(LogForwardMessage)]
1064impl LogForwardMessageHandler for LogForwardActor {
1065 async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
1066 match self.rx.recv().await {
1067 Ok(LogMessage::Flush { sync_version }) => {
1068 let now = RealClock.system_time_now();
1069 match sync_version {
1070 None => {
1071 let delay = Duration::from_secs(1);
1073 if now >= self.next_flush_deadline {
1074 self.next_flush_deadline = now + delay;
1075 let flush_tx = self.flush_tx.clone();
1076 tokio::spawn(async move {
1077 RealClock.sleep(delay).await;
1078 if let Err(e) = flush_tx
1079 .lock()
1080 .await
1081 .send(LogMessage::Flush { sync_version: None })
1082 .await
1083 {
1084 tracing::error!("failed to send flush message: {}", e);
1085 }
1086 });
1087 }
1088 }
1089 version => {
1090 self.logging_client_ref.flush(ctx, version).await?;
1091 }
1092 }
1093 }
1094 Ok(LogMessage::Log {
1095 hostname,
1096 pid,
1097 output_target,
1098 payload,
1099 }) => {
1100 if self.stream_to_client {
1101 self.logging_client_ref
1102 .log(ctx, hostname, pid, output_target, payload)
1103 .await?;
1104 }
1105 }
1106 Err(e) => {
1107 return Err(e.into());
1108 }
1109 }
1110
1111 ctx.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1113
1114 Ok(())
1115 }
1116
1117 async fn set_mode(
1118 &mut self,
1119 _ctx: &Context<Self>,
1120 stream_to_client: bool,
1121 ) -> Result<(), anyhow::Error> {
1122 self.stream_to_client = stream_to_client;
1123 Ok(())
1124 }
1125
1126 async fn force_sync_flush(
1127 &mut self,
1128 _cx: &Context<Self>,
1129 version: u64,
1130 ) -> Result<(), anyhow::Error> {
1131 self.flush_tx
1132 .lock()
1133 .await
1134 .send(LogMessage::Flush {
1135 sync_version: Some(version),
1136 })
1137 .await
1138 .map_err(anyhow::Error::from)
1139 }
1140}
1141
1142fn deserialize_message_lines(
1144 serialized_message: &hyperactor::data::Serialized,
1145) -> Result<Vec<Vec<String>>> {
1146 if let Ok(message_bytes) = serialized_message.deserialized::<Vec<Vec<u8>>>() {
1148 let mut result = Vec::new();
1149 for bytes in message_bytes {
1150 let message_str = String::from_utf8(bytes)?;
1151 let lines: Vec<String> = message_str.lines().map(|s| s.to_string()).collect();
1152 result.push(lines);
1153 }
1154 return Ok(result);
1155 }
1156
1157 if let Ok(message) = serialized_message.deserialized::<String>() {
1159 let lines: Vec<String> = message.lines().map(|s| s.to_string()).collect();
1160 return Ok(vec![lines]);
1161 }
1162
1163 anyhow::bail!("failed to deserialize message as either Vec<Vec<u8>> or String")
1165}
1166
1167#[derive(Debug)]
1169#[hyperactor::export(
1170 spawn = true,
1171 handlers = [LogMessage, LogClientMessage],
1172)]
1173pub struct LogClientActor {
1174 aggregate_window_sec: Option<u64>,
1175 aggregators: HashMap<OutputTarget, Aggregator>,
1176 last_flush_time: SystemTime,
1177 next_flush_deadline: Option<SystemTime>,
1178
1179 current_flush_version: u64,
1181 current_flush_port: Option<OncePortRef<()>>,
1182 current_unflushed_procs: usize,
1183}
1184
1185impl LogClientActor {
1186 fn print_aggregators(&mut self) {
1187 for (output_target, aggregator) in self.aggregators.iter_mut() {
1188 if aggregator.is_empty() {
1189 continue;
1190 }
1191 match output_target {
1192 OutputTarget::Stdout => {
1193 println!("{}", aggregator);
1194 }
1195 OutputTarget::Stderr => {
1196 eprintln!("{}", aggregator);
1197 }
1198 }
1199
1200 aggregator.reset();
1202 }
1203 }
1204
1205 fn print_log_line(hostname: &str, pid: u32, output_target: OutputTarget, line: String) {
1206 let message = format!("[{} {}] {}", hostname, pid, line);
1207
1208 #[cfg(test)]
1209 crate::logging::test_tap::push(&message);
1210
1211 match output_target {
1212 OutputTarget::Stdout => println!("{}", message),
1213 OutputTarget::Stderr => eprintln!("{}", message),
1214 }
1215 }
1216
1217 fn flush_internal(&mut self) {
1218 self.print_aggregators();
1219 self.last_flush_time = RealClock.system_time_now();
1220 self.next_flush_deadline = None;
1221 }
1222}
1223
1224#[async_trait]
1225impl Actor for LogClientActor {
1226 type Params = ();
1228
1229 async fn new(_: ()) -> Result<Self, anyhow::Error> {
1230 let mut aggregators = HashMap::new();
1232 aggregators.insert(OutputTarget::Stderr, Aggregator::new());
1233 aggregators.insert(OutputTarget::Stdout, Aggregator::new());
1234
1235 Ok(Self {
1236 aggregate_window_sec: Some(DEFAULT_AGGREGATE_WINDOW_SEC),
1237 aggregators,
1238 last_flush_time: RealClock.system_time_now(),
1239 next_flush_deadline: None,
1240 current_flush_version: 0,
1241 current_flush_port: None,
1242 current_unflushed_procs: 0,
1243 })
1244 }
1245}
1246
1247impl Drop for LogClientActor {
1248 fn drop(&mut self) {
1249 self.print_aggregators();
1251 }
1252}
1253
1254#[async_trait]
1255#[hyperactor::forward(LogMessage)]
1256impl LogMessageHandler for LogClientActor {
1257 async fn log(
1258 &mut self,
1259 cx: &Context<Self>,
1260 hostname: String,
1261 pid: u32,
1262 output_target: OutputTarget,
1263 payload: Serialized,
1264 ) -> Result<(), anyhow::Error> {
1265 let message_line_groups = deserialize_message_lines(&payload)?;
1267 let hostname = hostname.as_str();
1268
1269 let message_lines: Vec<String> = message_line_groups.into_iter().flatten().collect();
1270 match self.aggregate_window_sec {
1271 None => {
1272 for line in message_lines {
1273 Self::print_log_line(hostname, pid, output_target, line);
1274 }
1275 self.last_flush_time = RealClock.system_time_now();
1276 }
1277 Some(window) => {
1278 for line in message_lines {
1279 if let Some(aggregator) = self.aggregators.get_mut(&output_target) {
1280 if let Err(e) = aggregator.add_line(&line) {
1281 tracing::error!("error adding log line: {}", e);
1282 Self::print_log_line(hostname, pid, output_target, line);
1284 }
1285 } else {
1286 tracing::error!("unknown output target: {:?}", output_target);
1287 Self::print_log_line(hostname, pid, output_target, line);
1289 }
1290 }
1291
1292 let new_deadline = self.last_flush_time + Duration::from_secs(window);
1293 let now = RealClock.system_time_now();
1294 if new_deadline <= now {
1295 self.flush_internal();
1296 } else {
1297 let delay = new_deadline.duration_since(now)?;
1298 match self.next_flush_deadline {
1299 None => {
1300 self.next_flush_deadline = Some(new_deadline);
1301 cx.self_message_with_delay(
1302 LogMessage::Flush { sync_version: None },
1303 delay,
1304 )?;
1305 }
1306 Some(deadline) => {
1307 if new_deadline < deadline {
1309 self.next_flush_deadline = Some(new_deadline);
1311 cx.self_message_with_delay(
1312 LogMessage::Flush { sync_version: None },
1313 delay,
1314 )?;
1315 }
1316 }
1317 }
1318 }
1319 }
1320 }
1321
1322 Ok(())
1323 }
1324
1325 async fn flush(
1326 &mut self,
1327 cx: &Context<Self>,
1328 sync_version: Option<u64>,
1329 ) -> Result<(), anyhow::Error> {
1330 match sync_version {
1331 None => {
1332 self.flush_internal();
1333 }
1334 Some(version) => {
1335 if version != self.current_flush_version {
1336 tracing::error!(
1337 "found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
1338 version,
1339 self.current_flush_version
1340 );
1341 return Ok(());
1342 }
1343
1344 if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
1345 anyhow::bail!("found no ongoing flush request");
1347 }
1348 self.current_unflushed_procs -= 1;
1349
1350 tracing::debug!(
1351 "ack sync flush: version {}; remaining procs: {}",
1352 self.current_flush_version,
1353 self.current_unflushed_procs
1354 );
1355
1356 if self.current_unflushed_procs == 0 {
1357 self.flush_internal();
1358 let reply = self.current_flush_port.take().unwrap();
1359 self.current_flush_port = None;
1360 reply.send(cx, ()).map_err(anyhow::Error::from)?;
1361 }
1362 }
1363 }
1364
1365 Ok(())
1366 }
1367}
1368
1369#[async_trait]
1370#[hyperactor::forward(LogClientMessage)]
1371impl LogClientMessageHandler for LogClientActor {
1372 async fn set_aggregate(
1373 &mut self,
1374 _cx: &Context<Self>,
1375 aggregate_window_sec: Option<u64>,
1376 ) -> Result<(), anyhow::Error> {
1377 if self.aggregate_window_sec.is_some() && aggregate_window_sec.is_none() {
1378 self.print_aggregators();
1380 }
1381 self.aggregate_window_sec = aggregate_window_sec;
1382 Ok(())
1383 }
1384
1385 async fn start_sync_flush(
1386 &mut self,
1387 cx: &Context<Self>,
1388 expected_procs_flushed: usize,
1389 reply: OncePortRef<()>,
1390 version: OncePortRef<u64>,
1391 ) -> Result<(), anyhow::Error> {
1392 if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
1393 tracing::warn!(
1394 "found unfinished ongoing flush: version {}; {} unflushed procs",
1395 self.current_flush_version,
1396 self.current_unflushed_procs,
1397 );
1398 }
1399
1400 self.current_flush_version += 1;
1401 tracing::debug!(
1402 "start sync flush with version {}",
1403 self.current_flush_version
1404 );
1405 self.current_flush_port = Some(reply.clone());
1406 self.current_unflushed_procs = expected_procs_flushed;
1407 version
1408 .send(cx, self.current_flush_version)
1409 .map_err(anyhow::Error::from)?;
1410 Ok(())
1411 }
1412}
1413
1414#[cfg(test)]
1415pub mod test_tap {
1416 use std::sync::Mutex;
1417 use std::sync::OnceLock;
1418
1419 use tokio::sync::mpsc::UnboundedReceiver;
1420 use tokio::sync::mpsc::UnboundedSender;
1421
1422 static TAP: OnceLock<UnboundedSender<String>> = OnceLock::new();
1423 static RX: OnceLock<Mutex<UnboundedReceiver<String>>> = OnceLock::new();
1424
1425 pub fn install(tx: UnboundedSender<String>) {
1427 let _ = TAP.set(tx);
1428 }
1429
1430 pub fn set_receiver(rx: UnboundedReceiver<String>) {
1432 let _ = RX.set(Mutex::new(rx));
1433 }
1434
1435 pub fn push(s: &str) {
1437 if let Some(tx) = TAP.get() {
1438 let _ = tx.send(s.to_string());
1439 }
1440 }
1441
1442 pub fn drain() -> Vec<String> {
1444 let mut out = Vec::new();
1445 if let Some(rx) = RX.get() {
1446 let mut rx = rx.lock().unwrap();
1447 while let Ok(line) = rx.try_recv() {
1448 out.push(line);
1449 }
1450 }
1451 out
1452 }
1453}
1454
1455#[cfg(test)]
1456mod tests {
1457
1458 use std::sync::Arc;
1459 use std::sync::Mutex;
1460
1461 use hyperactor::channel;
1462 use hyperactor::channel::ChannelAddr;
1463 use hyperactor::channel::ChannelTx;
1464 use hyperactor::channel::Tx;
1465 use hyperactor::id;
1466 use hyperactor::mailbox::BoxedMailboxSender;
1467 use hyperactor::mailbox::DialMailboxRouter;
1468 use hyperactor::mailbox::MailboxServer;
1469 use hyperactor::proc::Proc;
1470 use tokio::io::AsyncSeek;
1471 use tokio::io::AsyncSeekExt;
1472 use tokio::io::AsyncWriteExt;
1473 use tokio::io::SeekFrom;
1474 use tokio::sync::mpsc;
1475
1476 use super::*;
1477
1478 #[derive(Debug)]
1480 struct FileProcessingResult {
1481 lines: Vec<Vec<u8>>,
1483 new_position: u64,
1485 incomplete_line_buffer: Vec<u8>,
1487 }
1488
1489 async fn process_file_content<R: AsyncRead + AsyncSeek + Unpin>(
1492 reader: &mut R,
1493 current_position: u64,
1494 file_size: u64,
1495 existing_line_buffer: Vec<u8>,
1496 max_buffer_size: usize,
1497 ) -> Result<FileProcessingResult> {
1498 if current_position == file_size {
1500 return Ok(FileProcessingResult {
1501 lines: Vec::new(),
1502 new_position: current_position,
1503 incomplete_line_buffer: existing_line_buffer,
1504 });
1505 }
1506
1507 let actual_position = if current_position > file_size {
1509 tracing::warn!(
1510 "File appears to have been truncated (position {} > file size {}), resetting to start",
1511 current_position,
1512 file_size
1513 );
1514 reader.seek(SeekFrom::Start(0)).await?;
1515 0
1516 } else {
1517 reader.seek(SeekFrom::Start(current_position)).await?;
1519 current_position
1520 };
1521
1522 let mut buf = vec![0u8; 128 * 1024];
1523 let mut line_buffer = existing_line_buffer;
1524 let mut lines = Vec::with_capacity(max_buffer_size);
1525 let mut processed_bytes = 0u64;
1526
1527 loop {
1528 let bytes_read = reader.read(&mut buf).await?;
1529 if bytes_read == 0 {
1530 break;
1531 }
1532
1533 let chunk = &buf[..bytes_read];
1534
1535 let mut start = 0;
1536 while let Some(newline_pos) = chunk[start..].iter().position(|&b| b == b'\n') {
1537 let absolute_pos = start + newline_pos;
1538
1539 line_buffer.extend_from_slice(&chunk[start..absolute_pos]);
1540
1541 if !line_buffer.is_empty() {
1542 if line_buffer.len() > MAX_LINE_SIZE {
1543 line_buffer.truncate(MAX_LINE_SIZE);
1544 line_buffer.extend_from_slice(b"... [TRUNCATED]");
1545 }
1546
1547 let line_data = std::mem::replace(&mut line_buffer, Vec::with_capacity(2048));
1548 lines.push(line_data);
1549 }
1550
1551 start = absolute_pos + 1;
1552
1553 if lines.len() >= max_buffer_size {
1555 let new_position = actual_position + processed_bytes + start as u64;
1558
1559 return Ok(FileProcessingResult {
1561 lines,
1562 new_position,
1563 incomplete_line_buffer: Vec::new(),
1564 });
1565 }
1566 }
1567
1568 processed_bytes += bytes_read as u64;
1570
1571 if start < chunk.len() {
1572 line_buffer.extend_from_slice(&chunk[start..]);
1573 }
1574 }
1575
1576 let new_position = actual_position + processed_bytes;
1577
1578 Ok(FileProcessingResult {
1579 lines,
1580 new_position,
1581 incomplete_line_buffer: line_buffer,
1582 })
1583 }
1584
1585 #[tokio::test]
1586 async fn test_forwarding_log_to_client() {
1587 let router = DialMailboxRouter::new();
1589 let (proc_addr, client_rx) =
1590 channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1591 let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
1592 proc.clone().serve(client_rx);
1593 router.bind(id!(client[0]).into(), proc_addr.clone());
1594 let (client, _handle) = proc.instance("client").unwrap();
1595
1596 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
1598 unsafe {
1600 std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
1601 }
1602 let log_client: ActorRef<LogClientActor> =
1603 proc.spawn("log_client", ()).await.unwrap().bind();
1604 let log_forwarder: ActorRef<LogForwardActor> = proc
1605 .spawn("log_forwarder", log_client)
1606 .await
1607 .unwrap()
1608 .bind();
1609
1610 let tx: ChannelTx<LogMessage> = channel::dial(log_channel).unwrap();
1612 tx.post(LogMessage::Log {
1613 hostname: "my_host".into(),
1614 pid: 1,
1615 output_target: OutputTarget::Stderr,
1616 payload: Serialized::serialize(&"will not stream".to_string()).unwrap(),
1617 });
1618
1619 log_forwarder.set_mode(&client, true).await.unwrap();
1621 tx.post(LogMessage::Log {
1622 hostname: "my_host".into(),
1623 pid: 1,
1624 output_target: OutputTarget::Stderr,
1625 payload: Serialized::serialize(&"will stream".to_string()).unwrap(),
1626 });
1627
1628 }
1630
1631 #[test]
1632 fn test_deserialize_message_lines_string() {
1633 let message = "Line 1\nLine 2\nLine 3".to_string();
1635 let serialized = Serialized::serialize(&message).unwrap();
1636
1637 let result = deserialize_message_lines(&serialized).unwrap();
1638 assert_eq!(result, vec![vec!["Line 1", "Line 2", "Line 3"]]);
1639
1640 let message_bytes = vec![
1642 "Hello\nWorld".as_bytes().to_vec(),
1643 "UTF-8 \u{1F980}\nTest".as_bytes().to_vec(),
1644 ];
1645 let serialized = Serialized::serialize(&message_bytes).unwrap();
1646
1647 let result = deserialize_message_lines(&serialized).unwrap();
1648 assert_eq!(
1649 result,
1650 vec![vec!["Hello", "World"], vec!["UTF-8 \u{1F980}", "Test"]]
1651 );
1652
1653 let message = "Single line message".to_string();
1655 let serialized = Serialized::serialize(&message).unwrap();
1656
1657 let result = deserialize_message_lines(&serialized).unwrap();
1658
1659 assert_eq!(result, vec![vec!["Single line message"]]);
1660
1661 let message = "\n\n".to_string();
1663 let serialized = Serialized::serialize(&message).unwrap();
1664
1665 let result = deserialize_message_lines(&serialized).unwrap();
1666
1667 assert_eq!(result, vec![vec!["", ""]]);
1668
1669 let invalid_utf8_bytes = vec![vec![0xFF, 0xFE, 0xFD]]; let serialized = Serialized::serialize(&invalid_utf8_bytes).unwrap();
1672
1673 let result = deserialize_message_lines(&serialized);
1674
1675 assert!(
1677 result.is_err(),
1678 "Expected deserialization to fail with invalid UTF-8 bytes"
1679 );
1680 }
1681 #[allow(dead_code)]
1682 struct MockLogSender {
1683 log_sender: mpsc::UnboundedSender<(OutputTarget, String)>, flush_called: Arc<Mutex<bool>>, }
1686
1687 impl MockLogSender {
1688 #[allow(dead_code)]
1689 fn new(log_sender: mpsc::UnboundedSender<(OutputTarget, String)>) -> Self {
1690 Self {
1691 log_sender,
1692 flush_called: Arc::new(Mutex::new(false)),
1693 }
1694 }
1695 }
1696
1697 #[async_trait]
1698 impl LogSender for MockLogSender {
1699 fn send(
1700 &mut self,
1701 output_target: OutputTarget,
1702 payload: Vec<Vec<u8>>,
1703 ) -> anyhow::Result<()> {
1704 let lines: Vec<String> = payload
1706 .iter()
1707 .map(|b| String::from_utf8_lossy(b).trim_end_matches('\n').to_owned())
1708 .collect();
1709
1710 for line in lines {
1711 self.log_sender
1712 .send((output_target, line))
1713 .map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e))?;
1714 }
1715 Ok(())
1716 }
1717
1718 fn flush(&mut self) -> anyhow::Result<()> {
1719 let mut flush_called = self.flush_called.lock().unwrap();
1721 *flush_called = true;
1722
1723 Ok(())
1726 }
1727 }
1728
1729 #[test]
1730 fn test_string_similarity() {
1731 assert_eq!(normalized_edit_distance("hello", "hello"), 0.0);
1733
1734 assert_eq!(normalized_edit_distance("hello", "i'mdiff"), 1.0);
1736
1737 assert!(normalized_edit_distance("hello", "helo") < 0.5);
1739 assert!(normalized_edit_distance("hello", "hello!") < 0.5);
1740
1741 assert_eq!(normalized_edit_distance("", ""), 0.0);
1743 assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1744 }
1745
1746 #[test]
1747 fn test_add_line_to_empty_aggregator() {
1748 let mut aggregator = Aggregator::new();
1749 let result = aggregator.add_line("ERROR 404 not found");
1750
1751 assert!(result.is_ok());
1752 assert_eq!(aggregator.lines.len(), 1);
1753 assert_eq!(aggregator.lines[0].content, "ERROR 404 not found");
1754 assert_eq!(aggregator.lines[0].count, 1);
1755 }
1756
1757 #[test]
1758 fn test_add_line_merges_with_similar_line() {
1759 let mut aggregator = Aggregator::new_with_threshold(0.2);
1760
1761 aggregator.add_line("ERROR 404 timeout").unwrap();
1763 assert_eq!(aggregator.lines.len(), 1);
1764
1765 aggregator.add_line("ERROR 500 timeout").unwrap();
1767 assert_eq!(aggregator.lines.len(), 1); assert_eq!(aggregator.lines[0].count, 2);
1769
1770 aggregator
1772 .add_line("WARNING database connection failed")
1773 .unwrap();
1774 assert_eq!(aggregator.lines.len(), 2); aggregator
1778 .add_line("WARNING database connection timed out")
1779 .unwrap();
1780 assert_eq!(aggregator.lines.len(), 2); assert_eq!(aggregator.lines[1].count, 2); }
1783
1784 #[test]
1785 fn test_aggregation_of_similar_log_lines() {
1786 let mut aggregator = Aggregator::new_with_threshold(0.2);
1787
1788 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/i/ims-ansi, falling back to search/replace to update prefixes for this file.").unwrap();
1790 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,351 conda-unpack-fb:292] Found invalid offsets for lib/pkgconfig/ncursesw.pc, falling back to search/replace to update prefixes for this file.").unwrap();
1791 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/k/kt7, falling back to search/replace to update prefixes for this file.").unwrap();
1792
1793 assert_eq!(aggregator.lines.len(), 1);
1795
1796 assert_eq!(aggregator.lines[0].count, 3);
1798 }
1799
1800 #[tokio::test]
1801 async fn test_stream_fwd_creation() {
1802 hyperactor_telemetry::initialize_logging_for_test();
1803
1804 let (mut writer, reader) = tokio::io::duplex(1024);
1805 let (log_channel, mut rx) =
1806 channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1807
1808 let temp_file = tempfile::NamedTempFile::new().unwrap();
1810 let temp_path = temp_file.path().to_path_buf();
1811
1812 let file_writer = tokio::fs::OpenOptions::new()
1814 .create(true)
1815 .write(true)
1816 .append(true)
1817 .open(&temp_path)
1818 .await
1819 .unwrap();
1820
1821 let file_monitor = FileAppender::new();
1823 let file_monitor_addr = file_monitor
1824 .as_ref()
1825 .map(|fm| fm.addr_for(OutputTarget::Stdout));
1826
1827 let monitor = StreamFwder::start_with_writer(
1828 reader,
1829 Box::new(file_writer),
1830 file_monitor_addr,
1831 OutputTarget::Stdout,
1832 3, Some(log_channel),
1834 12345, None, );
1837
1838 RealClock.sleep(Duration::from_millis(500)).await;
1840
1841 writer.write_all(b"Initial log line\n").await.unwrap();
1843 writer.flush().await.unwrap();
1844
1845 for i in 1..=5 {
1847 writer
1848 .write_all(format!("New log line {}\n", i).as_bytes())
1849 .await
1850 .unwrap();
1851 }
1852 writer.flush().await.unwrap();
1853
1854 RealClock.sleep(Duration::from_millis(500)).await;
1856
1857 let timeout = Duration::from_secs(1);
1859 let _ = RealClock
1860 .timeout(timeout, rx.recv())
1861 .await
1862 .unwrap_or_else(|_| panic!("Did not get log message within {:?}", timeout));
1863
1864 RealClock.sleep(Duration::from_millis(200)).await;
1866
1867 let (recent_lines, _result) = monitor.abort().await;
1868
1869 assert!(
1870 recent_lines.len() >= 3,
1871 "Expected buffer with at least 3 lines, got {} lines: {:?}",
1872 recent_lines.len(),
1873 recent_lines
1874 );
1875
1876 let file_contents = std::fs::read_to_string(&temp_path).unwrap();
1877 assert!(
1878 file_contents.contains("Initial log line"),
1879 "Expected temp file to contain 'Initial log line', got: {:?}",
1880 file_contents
1881 );
1882 assert!(
1883 file_contents.contains("New log line 1"),
1884 "Expected temp file to contain 'New log line 1', got: {:?}",
1885 file_contents
1886 );
1887 assert!(
1888 file_contents.contains("New log line 5"),
1889 "Expected temp file to contain 'New log line 5', got: {:?}",
1890 file_contents
1891 );
1892 }
1893
1894 #[test]
1895 fn test_aggregator_custom_threshold() {
1896 let mut strict_aggregator = Aggregator::new_with_threshold(0.05);
1898 strict_aggregator.add_line("ERROR 404").unwrap();
1899 strict_aggregator.add_line("ERROR 500").unwrap(); assert_eq!(strict_aggregator.lines.len(), 2);
1901
1902 let mut lenient_aggregator = Aggregator::new_with_threshold(0.8);
1904 lenient_aggregator.add_line("ERROR 404").unwrap();
1905 lenient_aggregator.add_line("WARNING 200").unwrap(); assert_eq!(lenient_aggregator.lines.len(), 1);
1907 assert_eq!(lenient_aggregator.lines[0].count, 2);
1908 }
1909
1910 #[test]
1911 fn test_format_system_time() {
1912 let test_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1609459200); let formatted = format_system_time(test_time);
1914
1915 assert!(formatted.contains("-"));
1917 assert!(formatted.contains(":"));
1918 assert!(formatted.len() > 10); }
1920
1921 #[test]
1922 fn test_aggregator_display_formatting() {
1923 let mut aggregator = Aggregator::new();
1924 aggregator.add_line("Test error message").unwrap();
1925 aggregator.add_line("Test error message").unwrap(); let display_string = format!("{}", aggregator);
1928
1929 assert!(display_string.contains("Aggregated Logs"));
1931 assert!(display_string.contains("[2 similar log lines]"));
1932 assert!(display_string.contains("Test error message"));
1933 assert!(display_string.contains(">>>") && display_string.contains("<<<"));
1934 }
1935
1936 #[tokio::test]
1937 async fn test_local_log_sender_inactive_status() {
1938 let (log_channel, _) =
1939 channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1940 let mut sender = LocalLogSender::new(log_channel, 12345).unwrap();
1941
1942 let result = sender.send(OutputTarget::Stdout, vec![b"test".to_vec()]);
1946 assert!(result.is_ok());
1947
1948 let result = sender.flush();
1949 assert!(result.is_ok());
1950 }
1951
1952 #[test]
1953 fn test_levenshtein_distance_edge_cases() {
1954 assert_eq!(levenshtein_distance("", ""), 0);
1956 assert_eq!(levenshtein_distance("", "hello"), 5);
1957 assert_eq!(levenshtein_distance("hello", ""), 5);
1958
1959 assert_eq!(levenshtein_distance("hello", "hello"), 0);
1961
1962 assert_eq!(levenshtein_distance("hello", "helo"), 1); assert_eq!(levenshtein_distance("helo", "hello"), 1); assert_eq!(levenshtein_distance("hello", "hallo"), 1); assert_eq!(levenshtein_distance("café", "cafe"), 1);
1969 }
1970
1971 #[test]
1972 fn test_normalized_edit_distance_edge_cases() {
1973 assert_eq!(normalized_edit_distance("", ""), 0.0);
1975
1976 assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1978 assert_eq!(normalized_edit_distance("", "hello"), 1.0);
1979
1980 let distance = normalized_edit_distance("completely", "different");
1982 assert!(distance >= 0.0 && distance <= 1.0);
1983 }
1984
1985 #[tokio::test]
1986 async fn test_deserialize_message_lines_edge_cases() {
1987 let empty_message = "".to_string();
1989 let serialized = Serialized::serialize(&empty_message).unwrap();
1990 let result = deserialize_message_lines(&serialized).unwrap();
1991 assert_eq!(result, vec![vec![] as Vec<String>]);
1992
1993 let trailing_newline = "line1\nline2\n".to_string();
1995 let serialized = Serialized::serialize(&trailing_newline).unwrap();
1996 let result = deserialize_message_lines(&serialized).unwrap();
1997 assert_eq!(result, vec![vec!["line1", "line2"]]);
1998 }
1999
2000 #[test]
2001 fn test_output_target_serialization() {
2002 let stdout_serialized = serde_json::to_string(&OutputTarget::Stdout).unwrap();
2004 let stderr_serialized = serde_json::to_string(&OutputTarget::Stderr).unwrap();
2005
2006 let stdout_deserialized: OutputTarget = serde_json::from_str(&stdout_serialized).unwrap();
2007 let stderr_deserialized: OutputTarget = serde_json::from_str(&stderr_serialized).unwrap();
2008
2009 assert_eq!(stdout_deserialized, OutputTarget::Stdout);
2010 assert_eq!(stderr_deserialized, OutputTarget::Stderr);
2011 }
2012
2013 #[test]
2014 fn test_log_line_display_formatting() {
2015 let log_line = LogLine::new("Test message".to_string());
2016 let display_string = format!("{}", log_line);
2017
2018 assert!(display_string.contains("[1 similar log lines]"));
2019 assert!(display_string.contains("Test message"));
2020
2021 let mut log_line_multi = LogLine::new("Test message".to_string());
2023 log_line_multi.count = 5;
2024 let display_string_multi = format!("{}", log_line_multi);
2025
2026 assert!(display_string_multi.contains("[5 similar log lines]"));
2027 assert!(display_string_multi.contains("Test message"));
2028 }
2029
2030 fn create_mock_reader(data: Vec<u8>) -> std::io::Cursor<Vec<u8>> {
2032 std::io::Cursor::new(data)
2033 }
2034
2035 #[tokio::test]
2036 async fn test_process_file_content_basic() {
2037 let data = b"line1\nline2\nline3\n".to_vec();
2038 let mut reader = create_mock_reader(data.clone());
2039 let max_buf_size = 10;
2040
2041 let result =
2042 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2043 .await
2044 .unwrap();
2045
2046 assert_eq!(result.lines.len(), 3);
2047 assert_eq!(result.lines[0], b"line1");
2048 assert_eq!(result.lines[1], b"line2");
2049 assert_eq!(result.lines[2], b"line3");
2050 assert_eq!(result.new_position, data.len() as u64);
2051 assert!(result.incomplete_line_buffer.is_empty());
2052 }
2053
2054 #[tokio::test]
2055 async fn test_process_file_content_incomplete_line() {
2056 let data = b"line1\nline2\npartial".to_vec();
2057 let mut reader = create_mock_reader(data.clone());
2058 let max_buf_size = 10;
2059
2060 let result =
2061 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2062 .await
2063 .unwrap();
2064
2065 assert_eq!(result.lines.len(), 2);
2066 assert_eq!(result.lines[0], b"line1");
2067 assert_eq!(result.lines[1], b"line2");
2068 assert_eq!(result.new_position, data.len() as u64);
2069 assert_eq!(result.incomplete_line_buffer, b"partial");
2070 }
2071
2072 #[tokio::test]
2073 async fn test_process_file_content_with_existing_buffer() {
2074 let data = b"omplete\nline2\nline3\n".to_vec();
2075 let mut reader = create_mock_reader(data.clone());
2076 let existing_buffer = b"inc".to_vec();
2077 let max_buf_size = 10;
2078
2079 let result = process_file_content(
2080 &mut reader,
2081 0,
2082 data.len() as u64,
2083 existing_buffer,
2084 max_buf_size,
2085 )
2086 .await
2087 .unwrap();
2088
2089 assert_eq!(result.lines.len(), 3);
2090 assert_eq!(result.lines[0], b"incomplete");
2091 assert_eq!(result.lines[1], b"line2");
2092 assert_eq!(result.lines[2], b"line3");
2093 assert_eq!(result.new_position, data.len() as u64);
2094 assert!(result.incomplete_line_buffer.is_empty());
2095 }
2096
2097 #[tokio::test]
2098 async fn test_process_file_content_empty_file() {
2099 let data = Vec::new();
2100 let mut reader = create_mock_reader(data.clone());
2101 let max_buf_size = 10;
2102
2103 let result = process_file_content(&mut reader, 0, 0, Vec::new(), max_buf_size)
2104 .await
2105 .unwrap();
2106
2107 assert!(result.lines.is_empty());
2108 assert_eq!(result.new_position, 0);
2109 assert!(result.incomplete_line_buffer.is_empty());
2110 }
2111
2112 #[tokio::test]
2113 async fn test_process_file_content_only_newlines() {
2114 let data = b"\n\n\n".to_vec();
2115 let mut reader = create_mock_reader(data.clone());
2116 let max_buf_size = 10;
2117
2118 let result =
2119 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2120 .await
2121 .unwrap();
2122
2123 assert!(result.lines.is_empty());
2125 assert_eq!(result.new_position, data.len() as u64);
2126 assert!(result.incomplete_line_buffer.is_empty());
2127 }
2128
2129 #[tokio::test]
2130 async fn test_process_file_content_no_newlines() {
2131 let data = b"no newlines here".to_vec();
2132 let mut reader = create_mock_reader(data.clone());
2133 let max_buf_size = 10;
2134
2135 let result =
2136 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2137 .await
2138 .unwrap();
2139
2140 assert!(result.lines.is_empty());
2141 assert_eq!(result.new_position, data.len() as u64);
2142 assert_eq!(result.incomplete_line_buffer, b"no newlines here");
2143 }
2144
2145 #[tokio::test]
2146 async fn test_process_file_content_file_truncation() {
2147 let data = b"line1\nline2\n".to_vec();
2148 let mut reader = create_mock_reader(data.clone());
2149
2150 let result = process_file_content(
2152 &mut reader,
2153 100, data.len() as u64,
2155 Vec::new(),
2156 10, )
2158 .await
2159 .unwrap();
2160
2161 assert_eq!(result.lines.len(), 2);
2163 assert_eq!(result.lines[0], b"line1");
2164 assert_eq!(result.lines[1], b"line2");
2165 assert_eq!(result.new_position, data.len() as u64);
2166 assert!(result.incomplete_line_buffer.is_empty());
2167 }
2168
2169 #[tokio::test]
2170 async fn test_process_file_content_seek_to_position() {
2171 let data = b"line1\nline2\nline3\n".to_vec();
2172 let mut reader = create_mock_reader(data.clone());
2173
2174 let result = process_file_content(&mut reader, 6, data.len() as u64, Vec::new(), 10)
2176 .await
2177 .unwrap();
2178
2179 assert_eq!(result.lines.len(), 2);
2180 assert_eq!(result.lines[0], b"line2");
2181 assert_eq!(result.lines[1], b"line3");
2182 assert_eq!(result.new_position, data.len() as u64);
2183 assert!(result.incomplete_line_buffer.is_empty());
2184 }
2185
2186 #[tokio::test]
2187 async fn test_process_file_content_position_equals_file_size() {
2188 let data = b"line1\nline2\n".to_vec();
2189 let mut reader = create_mock_reader(data.clone());
2190
2191 let result = process_file_content(
2193 &mut reader,
2194 data.len() as u64,
2195 data.len() as u64,
2196 Vec::new(),
2197 10,
2198 )
2199 .await
2200 .unwrap();
2201
2202 assert!(
2204 result.lines.is_empty(),
2205 "Expected empty line got {:?}",
2206 result.lines
2207 );
2208 assert_eq!(result.new_position, data.len() as u64);
2209 assert!(result.incomplete_line_buffer.is_empty());
2210 }
2211
2212 #[tokio::test]
2213 async fn test_process_file_content_large_line_truncation() {
2214 let large_line = "x".repeat(MAX_LINE_SIZE + 1000);
2216 let data = format!("{}\nline2\n", large_line).into_bytes();
2217 let mut reader = create_mock_reader(data.clone());
2218
2219 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2220 .await
2221 .unwrap();
2222
2223 assert_eq!(result.lines.len(), 2);
2224
2225 assert_eq!(
2227 result.lines[0].len(),
2228 MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2229 );
2230 assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2231
2232 assert_eq!(result.lines[1], b"line2");
2234
2235 assert_eq!(result.new_position, data.len() as u64);
2236 assert!(result.incomplete_line_buffer.is_empty());
2237 }
2238
2239 #[tokio::test]
2240 async fn test_process_file_content_mixed_line_endings() {
2241 let data = b"line1\nline2\r\nline3\n".to_vec();
2242 let mut reader = create_mock_reader(data.clone());
2243
2244 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2245 .await
2246 .unwrap();
2247
2248 assert_eq!(result.lines.len(), 3);
2249 assert_eq!(result.lines[0], b"line1");
2250 assert_eq!(result.lines[1], b"line2\r"); assert_eq!(result.lines[2], b"line3");
2252 assert_eq!(result.new_position, data.len() as u64);
2253 assert!(result.incomplete_line_buffer.is_empty());
2254 }
2255
2256 #[tokio::test]
2257 async fn test_process_file_content_existing_buffer_with_truncation() {
2258 let existing_buffer = "x".repeat(MAX_LINE_SIZE - 100);
2260 let data = format!("{}\nline2\n", "y".repeat(200)).into_bytes();
2261 let mut reader = create_mock_reader(data.clone());
2262
2263 let result = process_file_content(
2264 &mut reader,
2265 0,
2266 data.len() as u64,
2267 existing_buffer.into_bytes(),
2268 10,
2269 )
2270 .await
2271 .unwrap();
2272
2273 assert_eq!(result.lines.len(), 2);
2274
2275 assert_eq!(
2277 result.lines[0].len(),
2278 MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2279 );
2280 assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2281
2282 assert_eq!(result.lines[1], b"line2");
2284
2285 assert_eq!(result.new_position, data.len() as u64);
2286 assert!(result.incomplete_line_buffer.is_empty());
2287 }
2288
2289 #[tokio::test]
2290 async fn test_process_file_content_single_character_lines() {
2291 let data = b"a\nb\nc\n".to_vec();
2292 let mut reader = create_mock_reader(data.clone());
2293
2294 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2295 .await
2296 .unwrap();
2297
2298 assert_eq!(result.lines.len(), 3);
2299 assert_eq!(result.lines[0], b"a");
2300 assert_eq!(result.lines[1], b"b");
2301 assert_eq!(result.lines[2], b"c");
2302 assert_eq!(result.new_position, data.len() as u64);
2303 assert!(result.incomplete_line_buffer.is_empty());
2304 }
2305
2306 #[tokio::test]
2307 async fn test_process_file_content_binary_data() {
2308 let data = vec![0x00, 0x01, 0x02, b'\n', 0xFF, 0xFE, b'\n'];
2309 let mut reader = create_mock_reader(data.clone());
2310
2311 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2312 .await
2313 .unwrap();
2314
2315 assert_eq!(result.lines.len(), 2);
2316 assert_eq!(result.lines[0], vec![0x00, 0x01, 0x02]);
2317 assert_eq!(result.lines[1], vec![0xFF, 0xFE]);
2318 assert_eq!(result.new_position, data.len() as u64);
2319 assert!(result.incomplete_line_buffer.is_empty());
2320 }
2321
2322 #[tokio::test]
2323 async fn test_process_file_content_resume_after_max_buffer_size() {
2324 let data = b"line 1\nline 2\nline 3\n".to_vec();
2326 let mut reader = create_mock_reader(data.clone());
2327 let max_buffer_size = 2; let result1 = process_file_content(
2331 &mut reader,
2332 0, data.len() as u64,
2334 Vec::new(), max_buffer_size,
2336 )
2337 .await
2338 .unwrap();
2339
2340 assert_eq!(result1.lines.len(), 2, "First call should return 2 lines");
2342 assert_eq!(result1.lines[0], b"line 1");
2343 assert_eq!(result1.lines[1], b"line 2");
2344 assert!(result1.incomplete_line_buffer.is_empty());
2345
2346 let expected_position_after_first_call = b"line 1\nline 2\n".len() as u64;
2348 assert_eq!(result1.new_position, expected_position_after_first_call);
2349
2350 let mut reader2 = create_mock_reader(data.clone());
2352 let result2 = process_file_content(
2353 &mut reader2,
2354 result1.new_position, data.len() as u64,
2356 result1.incomplete_line_buffer, max_buffer_size,
2358 )
2359 .await
2360 .unwrap();
2361
2362 assert_eq!(result2.lines.len(), 1, "Second call should return 1 line");
2364 assert_eq!(result2.lines[0], b"line 3");
2365 assert!(result2.incomplete_line_buffer.is_empty());
2366 assert_eq!(result2.new_position, data.len() as u64);
2367 }
2368
2369 #[tokio::test]
2370 async fn test_utf8_truncation() {
2371 hyperactor_telemetry::initialize_logging_for_test();
2375
2376 let mut long_line = "x".repeat(MAX_LINE_SIZE - 1);
2378 long_line.push('🦀'); long_line.push('\n');
2380
2381 let (mut writer, reader) = tokio::io::duplex(8192);
2383
2384 let monitor = StreamFwder::start_with_writer(
2386 reader,
2387 Box::new(tokio::io::sink()), None, OutputTarget::Stdout,
2390 1, None, 12345, None, );
2395
2396 writer.write_all(long_line.as_bytes()).await.unwrap();
2398 drop(writer); let (_lines, result) = monitor.abort().await;
2402 result.expect("Should complete without panic despite UTF-8 truncation");
2403 }
2404}