hyperactor_mesh/
logging.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::fmt;
12use std::path::Path;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::Duration;
16use std::time::SystemTime;
17
18use anyhow::Result;
19use async_trait::async_trait;
20use chrono::DateTime;
21use chrono::Local;
22use hostname;
23use hyperactor::Actor;
24use hyperactor::ActorRef;
25use hyperactor::Bind;
26use hyperactor::Context;
27use hyperactor::HandleClient;
28use hyperactor::Handler;
29use hyperactor::Instance;
30use hyperactor::OncePortRef;
31use hyperactor::RefClient;
32use hyperactor::Unbind;
33use hyperactor::channel;
34use hyperactor::channel::ChannelAddr;
35use hyperactor::channel::ChannelRx;
36use hyperactor::channel::ChannelTransport;
37use hyperactor::channel::ChannelTx;
38use hyperactor::channel::Rx;
39use hyperactor::channel::Tx;
40use hyperactor::channel::TxStatus;
41use hyperactor::clock::Clock;
42use hyperactor::clock::RealClock;
43use hyperactor_config::CONFIG;
44use hyperactor_config::ConfigAttr;
45use hyperactor_config::attrs::declare_attrs;
46use hyperactor_telemetry::env;
47use hyperactor_telemetry::log_file_path;
48use serde::Deserialize;
49use serde::Serialize;
50use tokio::io;
51use tokio::io::AsyncRead;
52use tokio::io::AsyncReadExt;
53use tokio::io::AsyncWriteExt;
54use tokio::sync::Mutex;
55use tokio::sync::Notify;
56use tokio::sync::RwLock;
57use tokio::sync::watch::Receiver;
58use tokio::task::JoinHandle;
59use tracing::Level;
60use typeuri::Named;
61
62use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
63use crate::shortuuid::ShortUuid;
64
65mod line_prefixing_writer;
66
67pub(crate) const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5;
68const MAX_LINE_SIZE: usize = 4 * 1024;
69
70declare_attrs! {
71    /// Maximum number of lines to batch before flushing to client
72    /// This means that stdout/err reader will be paused after reading `HYPERACTOR_READ_LOG_BUFFER` lines.
73    /// After pause lines will be flushed and reading will resume.
74    @meta(CONFIG = ConfigAttr {
75        env_name: Some("HYPERACTOR_READ_LOG_BUFFER".to_string()),
76        py_name: Some("read_log_buffer".to_string()),
77    })
78    pub attr READ_LOG_BUFFER: usize = 100;
79
80    /// If enabled, local logs are also written to a file and aggregated
81    @meta(CONFIG = ConfigAttr {
82        env_name: Some("HYPERACTOR_FORCE_FILE_LOG".to_string()),
83        py_name: Some("force_file_log".to_string()),
84    })
85    pub attr FORCE_FILE_LOG: bool = false;
86
87    /// Prefixes logs with rank
88    @meta(CONFIG = ConfigAttr {
89        env_name: Some("HYPERACTOR_PREFIX_WITH_RANK".to_string()),
90        py_name: Some("prefix_with_rank".to_string()),
91    })
92    pub attr PREFIX_WITH_RANK: bool = true;
93}
94
95/// Calculate the Levenshtein distance between two strings
96fn levenshtein_distance(left: &str, right: &str) -> usize {
97    let left_chars: Vec<char> = left.chars().collect();
98    let right_chars: Vec<char> = right.chars().collect();
99
100    let left_len = left_chars.len();
101    let right_len = right_chars.len();
102
103    // Handle edge cases
104    if left_len == 0 {
105        return right_len;
106    }
107    if right_len == 0 {
108        return left_len;
109    }
110
111    // Create a matrix of size (len_s1+1) x (len_s2+1)
112    let mut matrix = vec![vec![0; right_len + 1]; left_len + 1];
113
114    // Initialize the first row and column
115    for (i, row) in matrix.iter_mut().enumerate().take(left_len + 1) {
116        row[0] = i;
117    }
118    for (j, cell) in matrix[0].iter_mut().enumerate().take(right_len + 1) {
119        *cell = j;
120    }
121
122    // Fill the matrix
123    for i in 1..=left_len {
124        for j in 1..=right_len {
125            let cost = if left_chars[i - 1] == right_chars[j - 1] {
126                0
127            } else {
128                1
129            };
130
131            matrix[i][j] = std::cmp::min(
132                std::cmp::min(
133                    matrix[i - 1][j] + 1, // deletion
134                    matrix[i][j - 1] + 1, // insertion
135                ),
136                matrix[i - 1][j - 1] + cost, // substitution
137            );
138        }
139    }
140
141    // Return the bottom-right cell
142    matrix[left_len][right_len]
143}
144
145/// Calculate the normalized edit distance between two strings (0.0 to 1.0)
146fn normalized_edit_distance(left: &str, right: &str) -> f64 {
147    let distance = levenshtein_distance(left, right) as f64;
148    let max_len = std::cmp::max(left.len(), right.len()) as f64;
149
150    if max_len == 0.0 {
151        0.0 // Both strings are empty, so they're identical
152    } else {
153        distance / max_len
154    }
155}
156
157#[derive(Debug, Clone)]
158/// LogLine represents a single log line with its content and count
159struct LogLine {
160    content: String,
161    pub count: u64,
162}
163
164impl LogLine {
165    fn new(content: String) -> Self {
166        Self { content, count: 1 }
167    }
168}
169
170impl fmt::Display for LogLine {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        write!(
173            f,
174            "\x1b[33m[{} similar log lines]\x1b[0m {}",
175            self.count, self.content
176        )
177    }
178}
179
180#[derive(Debug, Clone)]
181/// Aggregator is a struct that holds a list of LogLines and a start time.
182/// It can aggregate new log lines to existing ones if they are "similar" based on edit distance.
183struct Aggregator {
184    lines: Vec<LogLine>,
185    start_time: SystemTime,
186    similarity_threshold: f64, // Threshold for considering two strings similar (0.0 to 1.0)
187}
188
189impl Aggregator {
190    fn new() -> Self {
191        // Default threshold: strings with normalized edit distance < 0.15 are considered similar
192        Self::new_with_threshold(0.15)
193    }
194
195    fn new_with_threshold(threshold: f64) -> Self {
196        Aggregator {
197            lines: vec![],
198            start_time: RealClock.system_time_now(),
199            similarity_threshold: threshold,
200        }
201    }
202
203    fn reset(&mut self) {
204        self.lines.clear();
205        self.start_time = RealClock.system_time_now();
206    }
207
208    fn add_line(&mut self, line: &str) -> anyhow::Result<()> {
209        // Find the most similar existing line
210        let mut best_match_idx = None;
211        let mut best_similarity = f64::MAX;
212
213        for (idx, existing_line) in self.lines.iter().enumerate() {
214            let distance = normalized_edit_distance(&existing_line.content, line);
215
216            // If this line is more similar than our current best match
217            if distance < best_similarity && distance < self.similarity_threshold {
218                best_match_idx = Some(idx);
219                best_similarity = distance;
220            }
221        }
222
223        // If we found a similar enough line, increment its count
224        if let Some(idx) = best_match_idx {
225            self.lines[idx].count += 1;
226        } else {
227            // Otherwise, add a new line
228            self.lines.push(LogLine::new(line.to_string()));
229        }
230
231        Ok(())
232    }
233
234    fn is_empty(&self) -> bool {
235        self.lines.is_empty()
236    }
237}
238
239// Helper function to format SystemTime
240fn format_system_time(time: SystemTime) -> String {
241    let datetime: DateTime<Local> = time.into();
242    datetime.format("%Y-%m-%d %H:%M:%S").to_string()
243}
244
245impl fmt::Display for Aggregator {
246    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247        // Format the start time
248        let start_time_str = format_system_time(self.start_time);
249
250        // Get and format the current time
251        let current_time = RealClock.system_time_now();
252        let end_time_str = format_system_time(current_time);
253
254        // Write the header with formatted time window
255        writeln!(
256            f,
257            "\x1b[36m>>> Aggregated Logs ({}) >>>\x1b[0m",
258            start_time_str
259        )?;
260
261        // Write each log line
262        for line in self.lines.iter() {
263            writeln!(f, "{}", line)?;
264        }
265        writeln!(
266            f,
267            "\x1b[36m<<< Aggregated Logs ({}) <<<\x1b[0m",
268            end_time_str
269        )?;
270        Ok(())
271    }
272}
273
274/// Messages that can be sent to the LogClientActor remotely.
275#[derive(
276    Debug,
277    Clone,
278    Serialize,
279    Deserialize,
280    Named,
281    Handler,
282    HandleClient,
283    RefClient
284)]
285pub enum LogMessage {
286    /// Log details
287    Log {
288        /// The hostname of the process that generated the log
289        hostname: String,
290        /// The pid of the process that generated the log
291        pid: u32,
292        /// The target output stream (stdout or stderr)
293        output_target: OutputTarget,
294        /// The log payload as bytes
295        payload: wirevalue::Any,
296    },
297
298    /// Flush the log
299    Flush {
300        /// Indicate if the current flush is synced or non-synced.
301        /// If synced, a version number is available. Otherwise, none.
302        sync_version: Option<u64>,
303    },
304}
305
306/// Messages that can be sent to the LogClient locally.
307#[derive(
308    Debug,
309    Clone,
310    Serialize,
311    Deserialize,
312    Named,
313    Handler,
314    HandleClient,
315    RefClient
316)]
317pub enum LogClientMessage {
318    SetAggregate {
319        /// The time window in seconds to aggregate logs. If None, aggregation is disabled.
320        aggregate_window_sec: Option<u64>,
321    },
322
323    /// Synchronously flush all the logs from all the procs. This is for client to call.
324    StartSyncFlush {
325        /// Expect these many procs to ack the flush message.
326        expected_procs: usize,
327        /// Return once we have received the acks from all the procs
328        reply: OncePortRef<()>,
329        /// Return to the caller the current flush version
330        version: OncePortRef<u64>,
331    },
332}
333
334/// Trait for sending logs
335#[async_trait]
336pub trait LogSender: Send + Sync {
337    /// Send a log payload in bytes
338    fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()>;
339
340    /// Flush the log channel, ensuring all messages are delivered
341    /// Returns when the flush message has been acknowledged
342    fn flush(&mut self) -> anyhow::Result<()>;
343}
344
345/// Represents the target output stream (stdout or stderr)
346#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
347pub enum OutputTarget {
348    /// Standard output stream
349    Stdout,
350    /// Standard error stream
351    Stderr,
352}
353
354#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
355pub enum Stream {
356    /// Standard output stream
357    ChildStdout,
358    /// Standard error stream
359    ChildStderr,
360}
361
362/// Write the log to a local unix channel so some actors can listen to it and stream the log back.
363pub struct LocalLogSender {
364    hostname: String,
365    pid: u32,
366    tx: ChannelTx<LogMessage>,
367    status: Receiver<TxStatus>,
368}
369
370impl LocalLogSender {
371    fn new(log_channel: ChannelAddr, pid: u32) -> Result<Self, anyhow::Error> {
372        let tx = channel::dial::<LogMessage>(log_channel)?;
373        let status = tx.status().clone();
374
375        let hostname = hostname::get()
376            .unwrap_or_else(|_| "unknown_host".into())
377            .into_string()
378            .unwrap_or("unknown_host".to_string());
379        Ok(Self {
380            hostname,
381            pid,
382            tx,
383            status,
384        })
385    }
386}
387
388#[async_trait]
389impl LogSender for LocalLogSender {
390    fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()> {
391        if TxStatus::Active == *self.status.borrow() {
392            // Do not use tx.send, it will block the allocator as the child process state is unknown.
393            self.tx.post(LogMessage::Log {
394                hostname: self.hostname.clone(),
395                pid: self.pid,
396                output_target: target,
397                payload: wirevalue::Any::serialize(&payload)?,
398            });
399        }
400
401        Ok(())
402    }
403
404    fn flush(&mut self) -> anyhow::Result<()> {
405        // send will make sure message is delivered
406        if TxStatus::Active == *self.status.borrow() {
407            // Do not use tx.send, it will block the allocator as the child process state is unknown.
408            self.tx.post(LogMessage::Flush { sync_version: None });
409        }
410        Ok(())
411    }
412}
413
414/// Message sent to FileMonitor
415#[derive(Debug, Clone, Serialize, Deserialize, Named)]
416pub struct FileMonitorMessage {
417    lines: Vec<String>,
418}
419wirevalue::register_type!(FileMonitorMessage);
420
421/// File appender, coordinates write access to a file via a channel.
422pub struct FileAppender {
423    stdout_addr: ChannelAddr,
424    stderr_addr: ChannelAddr,
425    #[allow(dead_code)] // Tasks are self terminating
426    stdout_task: JoinHandle<()>,
427    #[allow(dead_code)]
428    stderr_task: JoinHandle<()>,
429    stop: Arc<Notify>,
430}
431
432impl fmt::Debug for FileAppender {
433    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434        f.debug_struct("FileMonitor")
435            .field("stdout_addr", &self.stdout_addr)
436            .field("stderr_addr", &self.stderr_addr)
437            .finish()
438    }
439}
440
441impl FileAppender {
442    /// Create a new FileAppender with aggregated log files for stdout and stderr
443    /// Returns None if file creation fails
444    pub fn new() -> Option<Self> {
445        let stop = Arc::new(Notify::new());
446        // TODO make it configurable
447        let file_name_tag = hostname::get()
448            .unwrap_or_else(|_| "unknown_host".into())
449            .into_string()
450            .unwrap_or("unknown_host".to_string());
451
452        // Create stdout file and task
453        let (stdout_path, stdout_writer) =
454            match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stdout) {
455                Some(writer) => writer,
456                None => {
457                    tracing::warn!("failed to create stdout file");
458                    return None;
459                }
460            };
461        let (stdout_addr, stdout_rx) = {
462            let _guard = tracing::span!(Level::INFO, "appender", file = "stdout").entered();
463            match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
464                Ok((addr, rx)) => (addr, rx),
465                Err(e) => {
466                    tracing::warn!("failed to serve stdout channel: {}", e);
467                    return None;
468                }
469            }
470        };
471        let stdout_stop = stop.clone();
472        let stdout_task = tokio::spawn(file_monitor_task(
473            stdout_rx,
474            stdout_writer,
475            OutputTarget::Stdout,
476            stdout_stop,
477        ));
478
479        // Create stderr file and task
480        let (stderr_path, stderr_writer) =
481            match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stderr) {
482                Some(writer) => writer,
483                None => {
484                    tracing::warn!("failed to create stderr file");
485                    return None;
486                }
487            };
488        let (stderr_addr, stderr_rx) = {
489            let _guard = tracing::span!(Level::INFO, "appender", file = "stderr").entered();
490            match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
491                Ok((addr, rx)) => (addr, rx),
492                Err(e) => {
493                    tracing::warn!("failed to serve stderr channel: {}", e);
494                    return None;
495                }
496            }
497        };
498        let stderr_stop = stop.clone();
499        let stderr_task = tokio::spawn(file_monitor_task(
500            stderr_rx,
501            stderr_writer,
502            OutputTarget::Stderr,
503            stderr_stop,
504        ));
505
506        tracing::debug!(
507            "FileAppender: created for stdout {} stderr {} ",
508            stdout_path.display(),
509            stderr_path.display()
510        );
511
512        Some(Self {
513            stdout_addr,
514            stderr_addr,
515            stdout_task,
516            stderr_task,
517            stop,
518        })
519    }
520
521    /// Get a channel address for the specified output target
522    pub fn addr_for(&self, target: OutputTarget) -> ChannelAddr {
523        match target {
524            OutputTarget::Stdout => self.stdout_addr.clone(),
525            OutputTarget::Stderr => self.stderr_addr.clone(),
526        }
527    }
528}
529
530impl Drop for FileAppender {
531    fn drop(&mut self) {
532        // Trigger stop signal to notify tasks to exit
533        self.stop.notify_waiters();
534        tracing::debug!("FileMonitor: dropping, stop signal sent, tasks will flush and exit");
535    }
536}
537
538/// Task that receives lines from StreamFwds and writes them to the aggregated file
539async fn file_monitor_task(
540    mut rx: ChannelRx<FileMonitorMessage>,
541    mut writer: Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
542    target: OutputTarget,
543    stop: Arc<Notify>,
544) {
545    loop {
546        tokio::select! {
547            msg = rx.recv() => {
548                match msg {
549                    Ok(msg) => {
550                        // Write lines to aggregated file
551                        for line in &msg.lines {
552                            if let Err(e) = writer.write_all(line.as_bytes()).await {
553                                tracing::warn!("FileMonitor: failed to write line to file: {}", e);
554                                continue;
555                            }
556                            if let Err(e) = writer.write_all(b"\n").await {
557                                tracing::warn!("FileMonitor: failed to write newline to file: {}", e);
558                            }
559                        }
560                        if let Err(e) = writer.flush().await {
561                            tracing::warn!("FileMonitor: failed to flush file: {}", e);
562                        }
563                    }
564                    Err(e) => {
565                        // Channel error
566                        tracing::debug!("FileMonitor task for {:?}: channel error: {}", target, e);
567                        break;
568                    }
569                }
570            }
571            _ = stop.notified() => {
572                tracing::debug!("FileMonitor task for {:?}: stop signal received", target);
573                break;
574            }
575        }
576    }
577
578    // Graceful shutdown: flush one last time
579    if let Err(e) = writer.flush().await {
580        tracing::warn!("FileMonitor: failed final flush: {}", e);
581    }
582    tracing::debug!("FileMonitor task for {:?} exiting", target);
583}
584
585fn create_unique_file_writer(
586    file_name_tag: &str,
587    output_target: OutputTarget,
588    env: env::Env,
589) -> Result<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
590    let suffix = match output_target {
591        OutputTarget::Stderr => "stderr",
592        OutputTarget::Stdout => "stdout",
593    };
594    let (path, filename) = log_file_path(env, None)?;
595    let path = Path::new(&path);
596    let mut full_path = PathBuf::from(path);
597
598    let uuid = ShortUuid::generate();
599
600    full_path.push(format!(
601        "{}_{}_{}.{}",
602        filename, file_name_tag, uuid, suffix
603    ));
604    let file = std::fs::OpenOptions::new()
605        .create(true)
606        .append(true)
607        .open(full_path.clone())?;
608    let tokio_file = tokio::fs::File::from_std(file);
609    // TODO: should we buffer this?
610    Ok((full_path, Box::new(tokio_file)))
611}
612
613fn get_unique_local_log_destination(
614    file_name_tag: &str,
615    output_target: OutputTarget,
616) -> Option<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
617    let env: env::Env = env::Env::current();
618    if env == env::Env::Local && !hyperactor_config::global::get(FORCE_FILE_LOG) {
619        tracing::debug!("not creating log file because of env type");
620        None
621    } else {
622        match create_unique_file_writer(file_name_tag, output_target, env) {
623            Ok((a, b)) => Some((a, b)),
624            Err(e) => {
625                tracing::warn!("failed to create unique file writer: {}", e);
626                None
627            }
628        }
629    }
630}
631
632/// Create a writer for stdout or stderr
633fn std_writer(target: OutputTarget) -> Box<dyn io::AsyncWrite + Send + Unpin> {
634    // Return the appropriate standard output or error writer
635    match target {
636        OutputTarget::Stdout => Box::new(tokio::io::stdout()),
637        OutputTarget::Stderr => Box::new(tokio::io::stderr()),
638    }
639}
640
641/// Copy bytes from `reader` to `writer`, forward to log_sender, and forward to FileMonitor.
642/// The same formatted lines go to both log_sender and file_monitor.
643async fn tee(
644    mut reader: impl AsyncRead + Unpin + Send + 'static,
645    mut std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
646    log_sender: Option<Box<dyn LogSender + Send>>,
647    file_monitor_addr: Option<ChannelAddr>,
648    target: OutputTarget,
649    prefix: Option<String>,
650    stop: Arc<Notify>,
651    recent_lines_buf: RotatingLineBuffer,
652) -> Result<(), io::Error> {
653    let mut buf = [0u8; 8192];
654    let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE);
655    let mut log_sender = log_sender;
656
657    // Dial the file monitor channel if provided
658    let mut file_monitor_tx: Option<ChannelTx<FileMonitorMessage>> =
659        file_monitor_addr.and_then(|addr| match channel::dial(addr.clone()) {
660            Ok(tx) => Some(tx),
661            Err(e) => {
662                tracing::warn!("Failed to dial file monitor channel {}: {}", addr, e);
663                None
664            }
665        });
666
667    loop {
668        tokio::select! {
669            read_result = reader.read(&mut buf) => {
670                match read_result {
671                    Ok(n) => {
672                        if n == 0 {
673                            // EOF reached
674                            tracing::debug!("EOF reached in tee");
675                            break;
676                        }
677
678                        // Write to console
679                        if let Err(e) = std_writer.write_all(&buf[..n]).await {
680                            tracing::warn!("error writing to std: {}", e);
681                        }
682
683                        // Process bytes into lines for log_sender and FileMonitor
684                        let mut completed_lines = Vec::new();
685
686                        for &byte in &buf[..n] {
687                            if byte == b'\n' {
688                                // Complete line found
689                                let mut line = String::from_utf8_lossy(&line_buffer).to_string();
690
691                                // Truncate if too long, respecting UTF-8 boundaries
692                                // (multi-byte chars like emojis can be up to 4 bytes)
693                                if line.len() > MAX_LINE_SIZE {
694                                    let mut truncate_at = MAX_LINE_SIZE;
695                                    while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
696                                        truncate_at -= 1;
697                                    }
698                                    line.truncate(truncate_at);
699                                    line.push_str("... [TRUNCATED]");
700                                }
701
702                                // Prepend with prefix if configured
703                                let final_line = if let Some(ref p) = prefix {
704                                    format!("[{}] {}", p, line)
705                                } else {
706                                    line
707                                };
708
709                                completed_lines.push(final_line);
710                                line_buffer.clear();
711                            } else {
712                                line_buffer.push(byte);
713                            }
714                        }
715
716                        // Send completed lines to both log_sender and FileAppender
717                        if !completed_lines.is_empty() {
718                            if let Some(ref mut sender) = log_sender {
719                                let bytes: Vec<Vec<u8>> = completed_lines.iter()
720                                    .map(|s| s.as_bytes().to_vec())
721                                    .collect();
722                                if let Err(e) = sender.send(target, bytes) {
723                                    tracing::warn!("error sending to log_sender: {}", e);
724                                }
725                            }
726
727                            // Send to FileMonitor via hyperactor channel
728                            if let Some(ref mut tx) = file_monitor_tx {
729                                let msg = FileMonitorMessage {
730                                    lines: completed_lines,
731                                };
732                                // Use post() to avoid blocking
733                                tx.post(msg);
734                            }
735                        }
736
737                        recent_lines_buf.try_add_data(&buf, n);
738                    },
739                    Err(e) => {
740                        tracing::debug!("read error in tee: {}", e);
741                        return Err(e);
742                    }
743                }
744            },
745            _ = stop.notified() => {
746                tracing::debug!("stop signal received in tee");
747                break;
748            }
749        }
750    }
751
752    std_writer.flush().await?;
753
754    // Send any remaining partial line
755    if !line_buffer.is_empty() {
756        let mut line = String::from_utf8_lossy(&line_buffer).to_string();
757        // Truncate if too long, respecting UTF-8 boundaries
758        // (multi-byte chars like emojis can be up to 4 bytes)
759        if line.len() > MAX_LINE_SIZE {
760            let mut truncate_at = MAX_LINE_SIZE;
761            while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
762                truncate_at -= 1;
763            }
764            line.truncate(truncate_at);
765            line.push_str("... [TRUNCATED]");
766        }
767        let final_line = if let Some(ref p) = prefix {
768            format!("[{}] {}", p, line)
769        } else {
770            line
771        };
772
773        let final_lines = vec![final_line];
774
775        // Send to log_sender
776        if let Some(ref mut sender) = log_sender {
777            let bytes: Vec<Vec<u8>> = final_lines.iter().map(|s| s.as_bytes().to_vec()).collect();
778            let _ = sender.send(target, bytes);
779        }
780
781        // Send to FileMonitor
782        if let Some(ref mut tx) = file_monitor_tx {
783            let msg = FileMonitorMessage { lines: final_lines };
784            tx.post(msg);
785        }
786    }
787
788    // Flush log_sender
789    if let Some(ref mut sender) = log_sender {
790        let _ = sender.flush();
791    }
792
793    Ok(())
794}
795
796#[derive(Debug, Clone)]
797struct RotatingLineBuffer {
798    recent_lines: Arc<RwLock<VecDeque<String>>>,
799    max_buffer_size: usize,
800}
801
802impl RotatingLineBuffer {
803    fn try_add_data(&self, buf: &[u8], buf_end: usize) {
804        let data_str = String::from_utf8_lossy(&buf[..buf_end]);
805        let lines: Vec<&str> = data_str.lines().collect();
806
807        if let Ok(mut recent_lines_guard) = self.recent_lines.try_write() {
808            for line in lines {
809                if !line.is_empty() {
810                    recent_lines_guard.push_back(line.to_string());
811                    if recent_lines_guard.len() > self.max_buffer_size {
812                        recent_lines_guard.pop_front();
813                    }
814                }
815            }
816        } else {
817            tracing::debug!("Failed to acquire write lock on recent_lines buffer in tee");
818        }
819    }
820
821    async fn peek(&self) -> Vec<String> {
822        let lines = self.recent_lines.read().await;
823        let start_idx = if lines.len() > self.max_buffer_size {
824            lines.len() - self.max_buffer_size
825        } else {
826            0
827        };
828
829        lines.range(start_idx..).cloned().collect()
830    }
831}
832
833/// Given a stream forwards data to the provided channel.
834pub struct StreamFwder {
835    teer: JoinHandle<Result<(), io::Error>>,
836    // Shared buffer for peek functionality
837    recent_lines_buf: RotatingLineBuffer,
838    // Shutdown signal to stop the monitoring loop
839    stop: Arc<Notify>,
840}
841
842impl StreamFwder {
843    /// Create a new StreamFwder instance, and start monitoring the provided path.
844    /// Once started Monitor will
845    /// - forward logs to log_sender
846    /// - forward logs to file_monitor (if available)
847    /// - pipe reader to target
848    /// - And capture last `max_buffer_size` which can be used to inspect file contents via `peek`.
849    pub fn start(
850        reader: impl AsyncRead + Unpin + Send + 'static,
851        file_monitor_addr: Option<ChannelAddr>,
852        target: OutputTarget,
853        max_buffer_size: usize,
854        log_channel: Option<ChannelAddr>,
855        pid: u32,
856        local_rank: usize,
857    ) -> Self {
858        let prefix = match hyperactor_config::global::get(PREFIX_WITH_RANK) {
859            true => Some(local_rank.to_string()),
860            false => None,
861        };
862        let std_writer = std_writer(target);
863
864        Self::start_with_writer(
865            reader,
866            std_writer,
867            file_monitor_addr,
868            target,
869            max_buffer_size,
870            log_channel,
871            pid,
872            prefix,
873        )
874    }
875
876    /// Create a new StreamFwder instance with a custom writer (used in tests).
877    fn start_with_writer(
878        reader: impl AsyncRead + Unpin + Send + 'static,
879        std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
880        file_monitor_addr: Option<ChannelAddr>,
881        target: OutputTarget,
882        max_buffer_size: usize,
883        log_channel: Option<ChannelAddr>,
884        pid: u32,
885        prefix: Option<String>,
886    ) -> Self {
887        // Sanity: when there is no file sink, no log forwarding, and
888        // `tail_size == 0`, the child should have **inherited** stdio
889        // and no `StreamFwder` should exist. In that case console
890        // mirroring happens via inheritance, not via `StreamFwder`.
891        // If we hit this, we piped unnecessarily.
892        debug_assert!(
893            file_monitor_addr.is_some() || max_buffer_size > 0 || log_channel.is_some(),
894            "StreamFwder started with no sinks and no tail"
895        );
896
897        let stop = Arc::new(Notify::new());
898        let recent_lines_buf = RotatingLineBuffer {
899            recent_lines: Arc::new(RwLock::new(VecDeque::<String>::with_capacity(
900                max_buffer_size,
901            ))),
902            max_buffer_size,
903        };
904
905        let log_sender: Option<Box<dyn LogSender + Send>> = if let Some(addr) = log_channel {
906            match LocalLogSender::new(addr, pid) {
907                Ok(s) => Some(Box::new(s) as Box<dyn LogSender + Send>),
908                Err(e) => {
909                    tracing::error!("failed to create log sender: {}", e);
910                    None
911                }
912            }
913        } else {
914            None
915        };
916
917        let teer_stop = stop.clone();
918        let recent_line_buf_clone = recent_lines_buf.clone();
919        let teer = tokio::spawn(async move {
920            tee(
921                reader,
922                std_writer,
923                log_sender,
924                file_monitor_addr,
925                target,
926                prefix,
927                teer_stop,
928                recent_line_buf_clone,
929            )
930            .await
931        });
932
933        StreamFwder {
934            teer,
935            recent_lines_buf,
936            stop,
937        }
938    }
939
940    pub async fn abort(self) -> (Vec<String>, Result<(), anyhow::Error>) {
941        self.stop.notify_waiters();
942
943        let lines = self.peek().await;
944        let teer_result = self.teer.await;
945
946        let result: Result<(), anyhow::Error> = match teer_result {
947            Ok(inner) => inner.map_err(anyhow::Error::from),
948            Err(e) => Err(e.into()),
949        };
950
951        (lines, result)
952    }
953
954    /// Inspect the latest `max_buffer` lines read from the file being monitored
955    /// Returns lines in chronological order (oldest first)
956    pub async fn peek(&self) -> Vec<String> {
957        self.recent_lines_buf.peek().await
958    }
959}
960
961/// Messages that can be sent to the LogForwarder
962#[derive(
963    Debug,
964    Clone,
965    Serialize,
966    Deserialize,
967    Named,
968    Handler,
969    HandleClient,
970    RefClient,
971    Bind,
972    Unbind
973)]
974pub enum LogForwardMessage {
975    /// Receive the log from the parent process and forward it to the client.
976    Forward {},
977
978    /// If to stream the log back to the client.
979    SetMode { stream_to_client: bool },
980
981    /// Flush the log with a version number.
982    ForceSyncFlush { version: u64 },
983}
984
985/// A log forwarder that receives the log from its parent process and forward it back to the client
986#[hyperactor::export(
987    spawn = true,
988    handlers = [LogForwardMessage {cast = true}],
989)]
990pub struct LogForwardActor {
991    rx: ChannelRx<LogMessage>,
992    flush_tx: Arc<Mutex<ChannelTx<LogMessage>>>,
993    next_flush_deadline: SystemTime,
994    logging_client_ref: ActorRef<LogClientActor>,
995    stream_to_client: bool,
996}
997
998#[async_trait]
999impl Actor for LogForwardActor {
1000    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1001        this.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1002
1003        // Make sure we start the flush loop periodically so the log channel will not deadlock.
1004        self.flush_tx
1005            .lock()
1006            .await
1007            .send(LogMessage::Flush { sync_version: None })
1008            .await?;
1009        Ok(())
1010    }
1011}
1012
1013#[async_trait]
1014impl hyperactor::RemoteSpawn for LogForwardActor {
1015    type Params = ActorRef<LogClientActor>;
1016
1017    async fn new(logging_client_ref: Self::Params) -> Result<Self> {
1018        let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
1019            Ok(channel) => channel.parse()?,
1020            Err(err) => {
1021                tracing::debug!(
1022                    "log forwarder actor failed to read env var {}: {}",
1023                    BOOTSTRAP_LOG_CHANNEL,
1024                    err
1025                );
1026                // TODO: an empty channel to serve
1027                ChannelAddr::any(ChannelTransport::Unix)
1028            }
1029        };
1030        tracing::info!(
1031            "log forwarder {} serve at {}",
1032            std::process::id(),
1033            log_channel
1034        );
1035
1036        let rx = match channel::serve(log_channel.clone()) {
1037            Ok((_, rx)) => rx,
1038            Err(err) => {
1039                // This can happen if we are not spanwed on a separate process like local.
1040                // For local mesh, log streaming anyway is not needed.
1041                tracing::error!(
1042                    "log forwarder actor failed to bootstrap on given channel {}: {}",
1043                    log_channel,
1044                    err
1045                );
1046                channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1
1047            }
1048        };
1049
1050        // Dial the same channel to send flush message to drain the log queue.
1051        let flush_tx = Arc::new(Mutex::new(channel::dial::<LogMessage>(log_channel)?));
1052        let now = RealClock.system_time_now();
1053
1054        Ok(Self {
1055            rx,
1056            flush_tx,
1057            next_flush_deadline: now,
1058            logging_client_ref,
1059            stream_to_client: true,
1060        })
1061    }
1062}
1063
1064#[async_trait]
1065#[hyperactor::forward(LogForwardMessage)]
1066impl LogForwardMessageHandler for LogForwardActor {
1067    async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
1068        match self.rx.recv().await {
1069            Ok(LogMessage::Flush { sync_version }) => {
1070                let now = RealClock.system_time_now();
1071                match sync_version {
1072                    None => {
1073                        // Schedule another flush to keep the log channel from deadlocking.
1074                        let delay = Duration::from_secs(1);
1075                        if now >= self.next_flush_deadline {
1076                            self.next_flush_deadline = now + delay;
1077                            let flush_tx = self.flush_tx.clone();
1078                            tokio::spawn(async move {
1079                                RealClock.sleep(delay).await;
1080                                if let Err(e) = flush_tx
1081                                    .lock()
1082                                    .await
1083                                    .send(LogMessage::Flush { sync_version: None })
1084                                    .await
1085                                {
1086                                    tracing::error!("failed to send flush message: {}", e);
1087                                }
1088                            });
1089                        }
1090                    }
1091                    version => {
1092                        self.logging_client_ref.flush(ctx, version).await?;
1093                    }
1094                }
1095            }
1096            Ok(LogMessage::Log {
1097                hostname,
1098                pid,
1099                output_target,
1100                payload,
1101            }) => {
1102                if self.stream_to_client {
1103                    self.logging_client_ref
1104                        .log(ctx, hostname, pid, output_target, payload)
1105                        .await?;
1106                }
1107            }
1108            Err(e) => {
1109                return Err(e.into());
1110            }
1111        }
1112
1113        // This is not ideal as we are using raw tx/rx.
1114        ctx.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1115
1116        Ok(())
1117    }
1118
1119    async fn set_mode(
1120        &mut self,
1121        _ctx: &Context<Self>,
1122        stream_to_client: bool,
1123    ) -> Result<(), anyhow::Error> {
1124        self.stream_to_client = stream_to_client;
1125        Ok(())
1126    }
1127
1128    async fn force_sync_flush(
1129        &mut self,
1130        _cx: &Context<Self>,
1131        version: u64,
1132    ) -> Result<(), anyhow::Error> {
1133        self.flush_tx
1134            .lock()
1135            .await
1136            .send(LogMessage::Flush {
1137                sync_version: Some(version),
1138            })
1139            .await
1140            .map_err(anyhow::Error::from)
1141    }
1142}
1143
1144/// Deserialize a serialized message and split it into UTF-8 lines
1145fn deserialize_message_lines(serialized_message: &wirevalue::Any) -> Result<Vec<Vec<String>>> {
1146    // Try to deserialize as Vec<Vec<u8>> first (multiple byte arrays)
1147    if let Ok(message_bytes) = serialized_message.deserialized::<Vec<Vec<u8>>>() {
1148        let mut result = Vec::new();
1149        for bytes in message_bytes {
1150            let message_str = String::from_utf8(bytes)?;
1151            let lines: Vec<String> = message_str.lines().map(|s| s.to_string()).collect();
1152            result.push(lines);
1153        }
1154        return Ok(result);
1155    }
1156
1157    // If that fails, try to deserialize as String and wrap in Vec<Vec<String>>
1158    if let Ok(message) = serialized_message.deserialized::<String>() {
1159        let lines: Vec<String> = message.lines().map(|s| s.to_string()).collect();
1160        return Ok(vec![lines]);
1161    }
1162
1163    // If both fail, return an error
1164    anyhow::bail!("failed to deserialize message as either Vec<Vec<u8>> or String")
1165}
1166
1167/// A client to receive logs from remote processes
1168#[derive(Debug)]
1169#[hyperactor::export(
1170    spawn = true,
1171    handlers = [LogMessage, LogClientMessage],
1172)]
1173pub struct LogClientActor {
1174    aggregate_window_sec: Option<u64>,
1175    aggregators: HashMap<OutputTarget, Aggregator>,
1176    last_flush_time: SystemTime,
1177    next_flush_deadline: Option<SystemTime>,
1178
1179    // For flush sync barrier
1180    current_flush_version: u64,
1181    current_flush_port: Option<OncePortRef<()>>,
1182    current_unflushed_procs: usize,
1183}
1184
1185impl Default for LogClientActor {
1186    fn default() -> Self {
1187        // Initialize aggregators
1188        let mut aggregators = HashMap::new();
1189        aggregators.insert(OutputTarget::Stderr, Aggregator::new());
1190        aggregators.insert(OutputTarget::Stdout, Aggregator::new());
1191
1192        Self {
1193            aggregate_window_sec: Some(DEFAULT_AGGREGATE_WINDOW_SEC),
1194            aggregators,
1195            last_flush_time: RealClock.system_time_now(),
1196            next_flush_deadline: None,
1197            current_flush_version: 0,
1198            current_flush_port: None,
1199            current_unflushed_procs: 0,
1200        }
1201    }
1202}
1203
1204impl LogClientActor {
1205    fn print_aggregators(&mut self) {
1206        for (output_target, aggregator) in self.aggregators.iter_mut() {
1207            if aggregator.is_empty() {
1208                continue;
1209            }
1210            match output_target {
1211                OutputTarget::Stdout => {
1212                    println!("{}", aggregator);
1213                }
1214                OutputTarget::Stderr => {
1215                    eprintln!("{}", aggregator);
1216                }
1217            }
1218
1219            // Reset the aggregator
1220            aggregator.reset();
1221        }
1222    }
1223
1224    fn print_log_line(hostname: &str, pid: u32, output_target: OutputTarget, line: String) {
1225        let message = format!("[{} {}] {}", hostname, pid, line);
1226
1227        #[cfg(test)]
1228        crate::logging::test_tap::push(&message);
1229
1230        match output_target {
1231            OutputTarget::Stdout => println!("{}", message),
1232            OutputTarget::Stderr => eprintln!("{}", message),
1233        }
1234    }
1235
1236    fn flush_internal(&mut self) {
1237        self.print_aggregators();
1238        self.last_flush_time = RealClock.system_time_now();
1239        self.next_flush_deadline = None;
1240    }
1241}
1242
1243#[async_trait]
1244impl Actor for LogClientActor {}
1245
1246impl Drop for LogClientActor {
1247    fn drop(&mut self) {
1248        // Flush the remaining logs before shutting down
1249        self.print_aggregators();
1250    }
1251}
1252
1253#[async_trait]
1254#[hyperactor::forward(LogMessage)]
1255impl LogMessageHandler for LogClientActor {
1256    async fn log(
1257        &mut self,
1258        cx: &Context<Self>,
1259        hostname: String,
1260        pid: u32,
1261        output_target: OutputTarget,
1262        payload: wirevalue::Any,
1263    ) -> Result<(), anyhow::Error> {
1264        // Deserialize the message and process line by line with UTF-8
1265        let message_line_groups = deserialize_message_lines(&payload)?;
1266        let hostname = hostname.as_str();
1267
1268        let message_lines: Vec<String> = message_line_groups.into_iter().flatten().collect();
1269        match self.aggregate_window_sec {
1270            None => {
1271                for line in message_lines {
1272                    Self::print_log_line(hostname, pid, output_target, line);
1273                }
1274                self.last_flush_time = RealClock.system_time_now();
1275            }
1276            Some(window) => {
1277                for line in message_lines {
1278                    if let Some(aggregator) = self.aggregators.get_mut(&output_target) {
1279                        if let Err(e) = aggregator.add_line(&line) {
1280                            tracing::error!("error adding log line: {}", e);
1281                            // For the sake of completeness, flush the log lines.
1282                            Self::print_log_line(hostname, pid, output_target, line);
1283                        }
1284                    } else {
1285                        tracing::error!("unknown output target: {:?}", output_target);
1286                        // For the sake of completeness, flush the log lines.
1287                        Self::print_log_line(hostname, pid, output_target, line);
1288                    }
1289                }
1290
1291                let new_deadline = self.last_flush_time + Duration::from_secs(window);
1292                let now = RealClock.system_time_now();
1293                if new_deadline <= now {
1294                    self.flush_internal();
1295                } else {
1296                    let delay = new_deadline.duration_since(now)?;
1297                    match self.next_flush_deadline {
1298                        None => {
1299                            self.next_flush_deadline = Some(new_deadline);
1300                            cx.self_message_with_delay(
1301                                LogMessage::Flush { sync_version: None },
1302                                delay,
1303                            )?;
1304                        }
1305                        Some(deadline) => {
1306                            // Some early log lines have alrady triggered the flush.
1307                            if new_deadline < deadline {
1308                                // This can happen if the user has adjusted the aggregation window.
1309                                self.next_flush_deadline = Some(new_deadline);
1310                                cx.self_message_with_delay(
1311                                    LogMessage::Flush { sync_version: None },
1312                                    delay,
1313                                )?;
1314                            }
1315                        }
1316                    }
1317                }
1318            }
1319        }
1320
1321        Ok(())
1322    }
1323
1324    async fn flush(
1325        &mut self,
1326        cx: &Context<Self>,
1327        sync_version: Option<u64>,
1328    ) -> Result<(), anyhow::Error> {
1329        match sync_version {
1330            None => {
1331                self.flush_internal();
1332            }
1333            Some(version) => {
1334                if version != self.current_flush_version {
1335                    tracing::error!(
1336                        "found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
1337                        version,
1338                        self.current_flush_version
1339                    );
1340                    return Ok(());
1341                }
1342
1343                if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
1344                    // This is a serious issue; it's better to error out.
1345                    anyhow::bail!("found no ongoing flush request");
1346                }
1347                self.current_unflushed_procs -= 1;
1348
1349                tracing::debug!(
1350                    "ack sync flush: version {}; remaining procs: {}",
1351                    self.current_flush_version,
1352                    self.current_unflushed_procs
1353                );
1354
1355                if self.current_unflushed_procs == 0 {
1356                    self.flush_internal();
1357                    let reply = self.current_flush_port.take().unwrap();
1358                    self.current_flush_port = None;
1359                    reply.send(cx, ()).map_err(anyhow::Error::from)?;
1360                }
1361            }
1362        }
1363
1364        Ok(())
1365    }
1366}
1367
1368#[async_trait]
1369#[hyperactor::forward(LogClientMessage)]
1370impl LogClientMessageHandler for LogClientActor {
1371    async fn set_aggregate(
1372        &mut self,
1373        _cx: &Context<Self>,
1374        aggregate_window_sec: Option<u64>,
1375    ) -> Result<(), anyhow::Error> {
1376        if self.aggregate_window_sec.is_some() && aggregate_window_sec.is_none() {
1377            // Make sure we flush whatever in the aggregators before disabling aggregation.
1378            self.print_aggregators();
1379        }
1380        self.aggregate_window_sec = aggregate_window_sec;
1381        Ok(())
1382    }
1383
1384    async fn start_sync_flush(
1385        &mut self,
1386        cx: &Context<Self>,
1387        expected_procs_flushed: usize,
1388        reply: OncePortRef<()>,
1389        version: OncePortRef<u64>,
1390    ) -> Result<(), anyhow::Error> {
1391        if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
1392            tracing::warn!(
1393                "found unfinished ongoing flush: version {}; {} unflushed procs",
1394                self.current_flush_version,
1395                self.current_unflushed_procs,
1396            );
1397        }
1398
1399        self.current_flush_version += 1;
1400        tracing::debug!(
1401            "start sync flush with version {}",
1402            self.current_flush_version
1403        );
1404        self.current_flush_port = Some(reply.clone());
1405        self.current_unflushed_procs = expected_procs_flushed;
1406        version
1407            .send(cx, self.current_flush_version)
1408            .map_err(anyhow::Error::from)?;
1409        Ok(())
1410    }
1411}
1412
1413#[cfg(test)]
1414pub mod test_tap {
1415    use std::sync::Mutex;
1416    use std::sync::OnceLock;
1417
1418    use tokio::sync::mpsc::UnboundedReceiver;
1419    use tokio::sync::mpsc::UnboundedSender;
1420
1421    static TAP: OnceLock<UnboundedSender<String>> = OnceLock::new();
1422    static RX: OnceLock<Mutex<UnboundedReceiver<String>>> = OnceLock::new();
1423
1424    // Called by tests to install the sender.
1425    pub fn install(tx: UnboundedSender<String>) {
1426        let _ = TAP.set(tx);
1427    }
1428
1429    // Called by tests to register the receiver so we can drain later.
1430    pub fn set_receiver(rx: UnboundedReceiver<String>) {
1431        let _ = RX.set(Mutex::new(rx));
1432    }
1433
1434    // Used by LogClientActor (under #[cfg(test)]) to push a line.
1435    pub fn push(s: &str) {
1436        if let Some(tx) = TAP.get() {
1437            let _ = tx.send(s.to_string());
1438        }
1439    }
1440
1441    // Tests call this to collect everything observed so far.
1442    pub fn drain() -> Vec<String> {
1443        let mut out = Vec::new();
1444        if let Some(rx) = RX.get() {
1445            let mut rx = rx.lock().unwrap();
1446            while let Ok(line) = rx.try_recv() {
1447                out.push(line);
1448            }
1449        }
1450        out
1451    }
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456
1457    use std::sync::Arc;
1458    use std::sync::Mutex;
1459
1460    use hyperactor::RemoteSpawn;
1461    use hyperactor::channel;
1462    use hyperactor::channel::ChannelAddr;
1463    use hyperactor::channel::ChannelTx;
1464    use hyperactor::channel::Tx;
1465    use hyperactor::id;
1466    use hyperactor::mailbox::BoxedMailboxSender;
1467    use hyperactor::mailbox::DialMailboxRouter;
1468    use hyperactor::mailbox::MailboxServer;
1469    use hyperactor::proc::Proc;
1470    use tokio::io::AsyncSeek;
1471    use tokio::io::AsyncSeekExt;
1472    use tokio::io::AsyncWriteExt;
1473    use tokio::io::SeekFrom;
1474    use tokio::sync::mpsc;
1475
1476    use super::*;
1477
1478    /// Result of processing file content
1479    #[derive(Debug)]
1480    struct FileProcessingResult {
1481        /// Complete lines found during processing
1482        lines: Vec<Vec<u8>>,
1483        /// Updated position in the file after processing
1484        new_position: u64,
1485        /// Any remaining incomplete line data, buffered for subsequent reads
1486        incomplete_line_buffer: Vec<u8>,
1487    }
1488
1489    /// Process new file content from a given position, extracting complete lines
1490    /// This function is extracted to enable easier unit testing without file system dependencies
1491    async fn process_file_content<R: AsyncRead + AsyncSeek + Unpin>(
1492        reader: &mut R,
1493        current_position: u64,
1494        file_size: u64,
1495        existing_line_buffer: Vec<u8>,
1496        max_buffer_size: usize,
1497    ) -> Result<FileProcessingResult> {
1498        // If position equals file size, we're at the end
1499        if current_position == file_size {
1500            return Ok(FileProcessingResult {
1501                lines: Vec::new(),
1502                new_position: current_position,
1503                incomplete_line_buffer: existing_line_buffer,
1504            });
1505        }
1506
1507        // Handle potential file truncation/rotation
1508        let actual_position = if current_position > file_size {
1509            tracing::warn!(
1510                "File appears to have been truncated (position {} > file size {}), resetting to start",
1511                current_position,
1512                file_size
1513            );
1514            reader.seek(SeekFrom::Start(0)).await?;
1515            0
1516        } else {
1517            // current_position < file_size
1518            reader.seek(SeekFrom::Start(current_position)).await?;
1519            current_position
1520        };
1521
1522        let mut buf = vec![0u8; 128 * 1024];
1523        let mut line_buffer = existing_line_buffer;
1524        let mut lines = Vec::with_capacity(max_buffer_size);
1525        let mut processed_bytes = 0u64;
1526
1527        loop {
1528            let bytes_read = reader.read(&mut buf).await?;
1529            if bytes_read == 0 {
1530                break;
1531            }
1532
1533            let chunk = &buf[..bytes_read];
1534
1535            let mut start = 0;
1536            while let Some(newline_pos) = chunk[start..].iter().position(|&b| b == b'\n') {
1537                let absolute_pos = start + newline_pos;
1538
1539                line_buffer.extend_from_slice(&chunk[start..absolute_pos]);
1540
1541                if !line_buffer.is_empty() {
1542                    if line_buffer.len() > MAX_LINE_SIZE {
1543                        line_buffer.truncate(MAX_LINE_SIZE);
1544                        line_buffer.extend_from_slice(b"... [TRUNCATED]");
1545                    }
1546
1547                    let line_data = std::mem::replace(&mut line_buffer, Vec::with_capacity(2048));
1548                    lines.push(line_data);
1549                }
1550
1551                start = absolute_pos + 1;
1552
1553                // Check if we've reached the max buffer size after adding each line
1554                if lines.len() >= max_buffer_size {
1555                    // We've processed up to and including the current newline
1556                    // The new position is where we should start reading next time
1557                    let new_position = actual_position + processed_bytes + start as u64;
1558
1559                    // Don't save remaining data - we'll re-read it from the new position
1560                    return Ok(FileProcessingResult {
1561                        lines,
1562                        new_position,
1563                        incomplete_line_buffer: Vec::new(),
1564                    });
1565                }
1566            }
1567
1568            // Only add bytes to processed_bytes if we've fully processed this chunk
1569            processed_bytes += bytes_read as u64;
1570
1571            if start < chunk.len() {
1572                line_buffer.extend_from_slice(&chunk[start..]);
1573            }
1574        }
1575
1576        let new_position = actual_position + processed_bytes;
1577
1578        Ok(FileProcessingResult {
1579            lines,
1580            new_position,
1581            incomplete_line_buffer: line_buffer,
1582        })
1583    }
1584
1585    #[tokio::test]
1586    async fn test_forwarding_log_to_client() {
1587        // Setup the basics
1588        let router = DialMailboxRouter::new();
1589        let (proc_addr, client_rx) =
1590            channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1591        let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
1592        proc.clone().serve(client_rx);
1593        router.bind(id!(client[0]).into(), proc_addr.clone());
1594        let (client, _handle) = proc.instance("client").unwrap();
1595
1596        // Spin up both the forwarder and the client
1597        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
1598        // SAFETY: Unit test
1599        unsafe {
1600            std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
1601        }
1602        let log_client_actor = LogClientActor::new(()).await.unwrap();
1603        let log_client: ActorRef<LogClientActor> =
1604            proc.spawn("log_client", log_client_actor).unwrap().bind();
1605        let log_forwarder_actor = LogForwardActor::new(log_client.clone()).await.unwrap();
1606        let log_forwarder: ActorRef<LogForwardActor> = proc
1607            .spawn("log_forwarder", log_forwarder_actor)
1608            .unwrap()
1609            .bind();
1610
1611        // Write some logs that will not be streamed
1612        let tx: ChannelTx<LogMessage> = channel::dial(log_channel).unwrap();
1613        tx.post(LogMessage::Log {
1614            hostname: "my_host".into(),
1615            pid: 1,
1616            output_target: OutputTarget::Stderr,
1617            payload: wirevalue::Any::serialize(&"will not stream".to_string()).unwrap(),
1618        });
1619
1620        // Turn on streaming
1621        log_forwarder.set_mode(&client, true).await.unwrap();
1622        tx.post(LogMessage::Log {
1623            hostname: "my_host".into(),
1624            pid: 1,
1625            output_target: OutputTarget::Stderr,
1626            payload: wirevalue::Any::serialize(&"will stream".to_string()).unwrap(),
1627        });
1628
1629        // TODO: it is hard to test out anything meaningful here as the client flushes to stdout.
1630    }
1631
1632    #[test]
1633    fn test_deserialize_message_lines_string() {
1634        // Test deserializing a String message with multiple lines
1635        let message = "Line 1\nLine 2\nLine 3".to_string();
1636        let serialized = wirevalue::Any::serialize(&message).unwrap();
1637
1638        let result = deserialize_message_lines(&serialized).unwrap();
1639        assert_eq!(result, vec![vec!["Line 1", "Line 2", "Line 3"]]);
1640
1641        // Test deserializing a Vec<Vec<u8>> message with UTF-8 content
1642        let message_bytes = vec![
1643            "Hello\nWorld".as_bytes().to_vec(),
1644            "UTF-8 \u{1F980}\nTest".as_bytes().to_vec(),
1645        ];
1646        let serialized = wirevalue::Any::serialize(&message_bytes).unwrap();
1647
1648        let result = deserialize_message_lines(&serialized).unwrap();
1649        assert_eq!(
1650            result,
1651            vec![vec!["Hello", "World"], vec!["UTF-8 \u{1F980}", "Test"]]
1652        );
1653
1654        // Test deserializing a single line message
1655        let message = "Single line message".to_string();
1656        let serialized = wirevalue::Any::serialize(&message).unwrap();
1657
1658        let result = deserialize_message_lines(&serialized).unwrap();
1659
1660        assert_eq!(result, vec![vec!["Single line message"]]);
1661
1662        // Test deserializing an empty lines
1663        let message = "\n\n".to_string();
1664        let serialized = wirevalue::Any::serialize(&message).unwrap();
1665
1666        let result = deserialize_message_lines(&serialized).unwrap();
1667
1668        assert_eq!(result, vec![vec!["", ""]]);
1669
1670        // Test error handling for invalid UTF-8 bytes
1671        let invalid_utf8_bytes = vec![vec![0xFF, 0xFE, 0xFD]]; // Invalid UTF-8 sequence in Vec<Vec<u8>>
1672        let serialized = wirevalue::Any::serialize(&invalid_utf8_bytes).unwrap();
1673
1674        let result = deserialize_message_lines(&serialized);
1675
1676        // The function should fail when trying to convert invalid UTF-8 bytes to String
1677        assert!(
1678            result.is_err(),
1679            "Expected deserialization to fail with invalid UTF-8 bytes"
1680        );
1681    }
1682    #[allow(dead_code)]
1683    struct MockLogSender {
1684        log_sender: mpsc::UnboundedSender<(OutputTarget, String)>, // (output_target, content)
1685        flush_called: Arc<Mutex<bool>>,                            // Track if flush was called
1686    }
1687
1688    impl MockLogSender {
1689        #[allow(dead_code)]
1690        fn new(log_sender: mpsc::UnboundedSender<(OutputTarget, String)>) -> Self {
1691            Self {
1692                log_sender,
1693                flush_called: Arc::new(Mutex::new(false)),
1694            }
1695        }
1696    }
1697
1698    #[async_trait]
1699    impl LogSender for MockLogSender {
1700        fn send(
1701            &mut self,
1702            output_target: OutputTarget,
1703            payload: Vec<Vec<u8>>,
1704        ) -> anyhow::Result<()> {
1705            // For testing purposes, convert to string if it's valid UTF-8
1706            let lines: Vec<String> = payload
1707                .iter()
1708                .map(|b| String::from_utf8_lossy(b).trim_end_matches('\n').to_owned())
1709                .collect();
1710
1711            for line in lines {
1712                self.log_sender
1713                    .send((output_target, line))
1714                    .map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e))?;
1715            }
1716            Ok(())
1717        }
1718
1719        fn flush(&mut self) -> anyhow::Result<()> {
1720            // Mark that flush was called
1721            let mut flush_called = self.flush_called.lock().unwrap();
1722            *flush_called = true;
1723
1724            // For testing purposes, just return Ok
1725            // In a real implementation, this would wait for all messages to be delivered
1726            Ok(())
1727        }
1728    }
1729
1730    #[test]
1731    fn test_string_similarity() {
1732        // Test exact match
1733        assert_eq!(normalized_edit_distance("hello", "hello"), 0.0);
1734
1735        // Test completely different strings
1736        assert_eq!(normalized_edit_distance("hello", "i'mdiff"), 1.0);
1737
1738        // Test similar strings
1739        assert!(normalized_edit_distance("hello", "helo") < 0.5);
1740        assert!(normalized_edit_distance("hello", "hello!") < 0.5);
1741
1742        // Test empty strings
1743        assert_eq!(normalized_edit_distance("", ""), 0.0);
1744        assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1745    }
1746
1747    #[test]
1748    fn test_add_line_to_empty_aggregator() {
1749        let mut aggregator = Aggregator::new();
1750        let result = aggregator.add_line("ERROR 404 not found");
1751
1752        assert!(result.is_ok());
1753        assert_eq!(aggregator.lines.len(), 1);
1754        assert_eq!(aggregator.lines[0].content, "ERROR 404 not found");
1755        assert_eq!(aggregator.lines[0].count, 1);
1756    }
1757
1758    #[test]
1759    fn test_add_line_merges_with_similar_line() {
1760        let mut aggregator = Aggregator::new_with_threshold(0.2);
1761
1762        // Add first line
1763        aggregator.add_line("ERROR 404 timeout").unwrap();
1764        assert_eq!(aggregator.lines.len(), 1);
1765
1766        // Add second line that should merge (similar enough)
1767        aggregator.add_line("ERROR 500 timeout").unwrap();
1768        assert_eq!(aggregator.lines.len(), 1); // Should still be 1 line after merge
1769        assert_eq!(aggregator.lines[0].count, 2);
1770
1771        // Add third line that's too different
1772        aggregator
1773            .add_line("WARNING database connection failed")
1774            .unwrap();
1775        assert_eq!(aggregator.lines.len(), 2); // Should be 2 lines now
1776
1777        // Add fourth line similar to third
1778        aggregator
1779            .add_line("WARNING database connection timed out")
1780            .unwrap();
1781        assert_eq!(aggregator.lines.len(), 2); // Should still be 2 lines
1782        assert_eq!(aggregator.lines[1].count, 2); // Second group has 2 lines
1783    }
1784
1785    #[test]
1786    fn test_aggregation_of_similar_log_lines() {
1787        let mut aggregator = Aggregator::new_with_threshold(0.2);
1788
1789        // Add the provided log lines with small differences
1790        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/i/ims-ansi, falling back to search/replace to update prefixes for this file.").unwrap();
1791        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,351 conda-unpack-fb:292] Found invalid offsets for lib/pkgconfig/ncursesw.pc, falling back to search/replace to update prefixes for this file.").unwrap();
1792        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/k/kt7, falling back to search/replace to update prefixes for this file.").unwrap();
1793
1794        // Check that we have only one aggregated line due to similarity
1795        assert_eq!(aggregator.lines.len(), 1);
1796
1797        // Check that the count is 3
1798        assert_eq!(aggregator.lines[0].count, 3);
1799    }
1800
1801    #[tokio::test]
1802    async fn test_stream_fwd_creation() {
1803        hyperactor_telemetry::initialize_logging_for_test();
1804
1805        let (mut writer, reader) = tokio::io::duplex(1024);
1806        let (log_channel, mut rx) =
1807            channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1808
1809        // Create a temporary file for testing the writer
1810        let temp_file = tempfile::NamedTempFile::new().unwrap();
1811        let temp_path = temp_file.path().to_path_buf();
1812
1813        // Create file writer that writes to the temp file (using tokio for async compatibility)
1814        let file_writer = tokio::fs::OpenOptions::new()
1815            .create(true)
1816            .write(true)
1817            .append(true)
1818            .open(&temp_path)
1819            .await
1820            .unwrap();
1821
1822        // Create FileMonitor and get address for stdout
1823        let file_monitor = FileAppender::new();
1824        let file_monitor_addr = file_monitor
1825            .as_ref()
1826            .map(|fm| fm.addr_for(OutputTarget::Stdout));
1827
1828        let monitor = StreamFwder::start_with_writer(
1829            reader,
1830            Box::new(file_writer),
1831            file_monitor_addr,
1832            OutputTarget::Stdout,
1833            3, // max_buffer_size
1834            Some(log_channel),
1835            12345, // pid
1836            None,  // no prefix
1837        );
1838
1839        // Wait a bit for set up to be done
1840        RealClock.sleep(Duration::from_millis(500)).await;
1841
1842        // Write initial content through the input writer
1843        writer.write_all(b"Initial log line\n").await.unwrap();
1844        writer.flush().await.unwrap();
1845
1846        // Write more content
1847        for i in 1..=5 {
1848            writer
1849                .write_all(format!("New log line {}\n", i).as_bytes())
1850                .await
1851                .unwrap();
1852        }
1853        writer.flush().await.unwrap();
1854
1855        // Wait a bit for the file to be written and the watcher to detect changes
1856        RealClock.sleep(Duration::from_millis(500)).await;
1857
1858        // Wait until log sender gets message
1859        let timeout = Duration::from_secs(1);
1860        let _ = RealClock
1861            .timeout(timeout, rx.recv())
1862            .await
1863            .unwrap_or_else(|_| panic!("Did not get log message within {:?}", timeout));
1864
1865        // Wait a bit more for all lines to be processed
1866        RealClock.sleep(Duration::from_millis(200)).await;
1867
1868        let (recent_lines, _result) = monitor.abort().await;
1869
1870        assert!(
1871            recent_lines.len() >= 3,
1872            "Expected buffer with at least 3 lines, got {} lines: {:?}",
1873            recent_lines.len(),
1874            recent_lines
1875        );
1876
1877        let file_contents = std::fs::read_to_string(&temp_path).unwrap();
1878        assert!(
1879            file_contents.contains("Initial log line"),
1880            "Expected temp file to contain 'Initial log line', got: {:?}",
1881            file_contents
1882        );
1883        assert!(
1884            file_contents.contains("New log line 1"),
1885            "Expected temp file to contain 'New log line 1', got: {:?}",
1886            file_contents
1887        );
1888        assert!(
1889            file_contents.contains("New log line 5"),
1890            "Expected temp file to contain 'New log line 5', got: {:?}",
1891            file_contents
1892        );
1893    }
1894
1895    #[test]
1896    fn test_aggregator_custom_threshold() {
1897        // Test with very strict threshold (0.05)
1898        let mut strict_aggregator = Aggregator::new_with_threshold(0.05);
1899        strict_aggregator.add_line("ERROR 404").unwrap();
1900        strict_aggregator.add_line("ERROR 500").unwrap(); // Should not merge due to strict threshold
1901        assert_eq!(strict_aggregator.lines.len(), 2);
1902
1903        // Test with very lenient threshold (0.8)
1904        let mut lenient_aggregator = Aggregator::new_with_threshold(0.8);
1905        lenient_aggregator.add_line("ERROR 404").unwrap();
1906        lenient_aggregator.add_line("WARNING 200").unwrap(); // Should merge due to lenient threshold
1907        assert_eq!(lenient_aggregator.lines.len(), 1);
1908        assert_eq!(lenient_aggregator.lines[0].count, 2);
1909    }
1910
1911    #[test]
1912    fn test_format_system_time() {
1913        let test_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1609459200); // 2021-01-01 00:00:00 UTC
1914        let formatted = format_system_time(test_time);
1915
1916        // Just verify it's a reasonable format (contains date and time components)
1917        assert!(formatted.contains("-"));
1918        assert!(formatted.contains(":"));
1919        assert!(formatted.len() > 10); // Should be reasonable length
1920    }
1921
1922    #[test]
1923    fn test_aggregator_display_formatting() {
1924        let mut aggregator = Aggregator::new();
1925        aggregator.add_line("Test error message").unwrap();
1926        aggregator.add_line("Test error message").unwrap(); // Should merge
1927
1928        let display_string = format!("{}", aggregator);
1929
1930        // Verify the output contains expected elements
1931        assert!(display_string.contains("Aggregated Logs"));
1932        assert!(display_string.contains("[2 similar log lines]"));
1933        assert!(display_string.contains("Test error message"));
1934        assert!(display_string.contains(">>>") && display_string.contains("<<<"));
1935    }
1936
1937    #[tokio::test]
1938    async fn test_local_log_sender_inactive_status() {
1939        let (log_channel, _) =
1940            channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1941        let mut sender = LocalLogSender::new(log_channel, 12345).unwrap();
1942
1943        // This test verifies that the sender handles inactive status gracefully
1944        // In a real scenario, the channel would be closed, but for testing we just
1945        // verify the send/flush methods don't panic
1946        let result = sender.send(OutputTarget::Stdout, vec![b"test".to_vec()]);
1947        assert!(result.is_ok());
1948
1949        let result = sender.flush();
1950        assert!(result.is_ok());
1951    }
1952
1953    #[test]
1954    fn test_levenshtein_distance_edge_cases() {
1955        // Test with empty strings
1956        assert_eq!(levenshtein_distance("", ""), 0);
1957        assert_eq!(levenshtein_distance("", "hello"), 5);
1958        assert_eq!(levenshtein_distance("hello", ""), 5);
1959
1960        // Test with identical strings
1961        assert_eq!(levenshtein_distance("hello", "hello"), 0);
1962
1963        // Test with single character differences
1964        assert_eq!(levenshtein_distance("hello", "helo"), 1); // deletion
1965        assert_eq!(levenshtein_distance("helo", "hello"), 1); // insertion
1966        assert_eq!(levenshtein_distance("hello", "hallo"), 1); // substitution
1967
1968        // Test with unicode characters
1969        assert_eq!(levenshtein_distance("café", "cafe"), 1);
1970    }
1971
1972    #[test]
1973    fn test_normalized_edit_distance_edge_cases() {
1974        // Test with empty strings
1975        assert_eq!(normalized_edit_distance("", ""), 0.0);
1976
1977        // Test normalization
1978        assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1979        assert_eq!(normalized_edit_distance("", "hello"), 1.0);
1980
1981        // Test that result is always between 0.0 and 1.0
1982        let distance = normalized_edit_distance("completely", "different");
1983        assert!(distance >= 0.0 && distance <= 1.0);
1984    }
1985
1986    #[tokio::test]
1987    async fn test_deserialize_message_lines_edge_cases() {
1988        // Test with empty string
1989        let empty_message = "".to_string();
1990        let serialized = wirevalue::Any::serialize(&empty_message).unwrap();
1991        let result = deserialize_message_lines(&serialized).unwrap();
1992        assert_eq!(result, vec![vec![] as Vec<String>]);
1993
1994        // Test with trailing newline
1995        let trailing_newline = "line1\nline2\n".to_string();
1996        let serialized = wirevalue::Any::serialize(&trailing_newline).unwrap();
1997        let result = deserialize_message_lines(&serialized).unwrap();
1998        assert_eq!(result, vec![vec!["line1", "line2"]]);
1999    }
2000
2001    #[test]
2002    fn test_output_target_serialization() {
2003        // Test that OutputTarget can be serialized and deserialized
2004        let stdout_serialized = serde_json::to_string(&OutputTarget::Stdout).unwrap();
2005        let stderr_serialized = serde_json::to_string(&OutputTarget::Stderr).unwrap();
2006
2007        let stdout_deserialized: OutputTarget = serde_json::from_str(&stdout_serialized).unwrap();
2008        let stderr_deserialized: OutputTarget = serde_json::from_str(&stderr_serialized).unwrap();
2009
2010        assert_eq!(stdout_deserialized, OutputTarget::Stdout);
2011        assert_eq!(stderr_deserialized, OutputTarget::Stderr);
2012    }
2013
2014    #[test]
2015    fn test_log_line_display_formatting() {
2016        let log_line = LogLine::new("Test message".to_string());
2017        let display_string = format!("{}", log_line);
2018
2019        assert!(display_string.contains("[1 similar log lines]"));
2020        assert!(display_string.contains("Test message"));
2021
2022        // Test with higher count
2023        let mut log_line_multi = LogLine::new("Test message".to_string());
2024        log_line_multi.count = 5;
2025        let display_string_multi = format!("{}", log_line_multi);
2026
2027        assert!(display_string_multi.contains("[5 similar log lines]"));
2028        assert!(display_string_multi.contains("Test message"));
2029    }
2030
2031    // Mock reader for testing process_file_content using std::io::Cursor
2032    fn create_mock_reader(data: Vec<u8>) -> std::io::Cursor<Vec<u8>> {
2033        std::io::Cursor::new(data)
2034    }
2035
2036    #[tokio::test]
2037    async fn test_process_file_content_basic() {
2038        let data = b"line1\nline2\nline3\n".to_vec();
2039        let mut reader = create_mock_reader(data.clone());
2040        let max_buf_size = 10;
2041
2042        let result =
2043            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2044                .await
2045                .unwrap();
2046
2047        assert_eq!(result.lines.len(), 3);
2048        assert_eq!(result.lines[0], b"line1");
2049        assert_eq!(result.lines[1], b"line2");
2050        assert_eq!(result.lines[2], b"line3");
2051        assert_eq!(result.new_position, data.len() as u64);
2052        assert!(result.incomplete_line_buffer.is_empty());
2053    }
2054
2055    #[tokio::test]
2056    async fn test_process_file_content_incomplete_line() {
2057        let data = b"line1\nline2\npartial".to_vec();
2058        let mut reader = create_mock_reader(data.clone());
2059        let max_buf_size = 10;
2060
2061        let result =
2062            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2063                .await
2064                .unwrap();
2065
2066        assert_eq!(result.lines.len(), 2);
2067        assert_eq!(result.lines[0], b"line1");
2068        assert_eq!(result.lines[1], b"line2");
2069        assert_eq!(result.new_position, data.len() as u64);
2070        assert_eq!(result.incomplete_line_buffer, b"partial");
2071    }
2072
2073    #[tokio::test]
2074    async fn test_process_file_content_with_existing_buffer() {
2075        let data = b"omplete\nline2\nline3\n".to_vec();
2076        let mut reader = create_mock_reader(data.clone());
2077        let existing_buffer = b"inc".to_vec();
2078        let max_buf_size = 10;
2079
2080        let result = process_file_content(
2081            &mut reader,
2082            0,
2083            data.len() as u64,
2084            existing_buffer,
2085            max_buf_size,
2086        )
2087        .await
2088        .unwrap();
2089
2090        assert_eq!(result.lines.len(), 3);
2091        assert_eq!(result.lines[0], b"incomplete");
2092        assert_eq!(result.lines[1], b"line2");
2093        assert_eq!(result.lines[2], b"line3");
2094        assert_eq!(result.new_position, data.len() as u64);
2095        assert!(result.incomplete_line_buffer.is_empty());
2096    }
2097
2098    #[tokio::test]
2099    async fn test_process_file_content_empty_file() {
2100        let data = Vec::new();
2101        let mut reader = create_mock_reader(data.clone());
2102        let max_buf_size = 10;
2103
2104        let result = process_file_content(&mut reader, 0, 0, Vec::new(), max_buf_size)
2105            .await
2106            .unwrap();
2107
2108        assert!(result.lines.is_empty());
2109        assert_eq!(result.new_position, 0);
2110        assert!(result.incomplete_line_buffer.is_empty());
2111    }
2112
2113    #[tokio::test]
2114    async fn test_process_file_content_only_newlines() {
2115        let data = b"\n\n\n".to_vec();
2116        let mut reader = create_mock_reader(data.clone());
2117        let max_buf_size = 10;
2118
2119        let result =
2120            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2121                .await
2122                .unwrap();
2123
2124        // Empty lines should not be added (the function skips empty line_buffer)
2125        assert!(result.lines.is_empty());
2126        assert_eq!(result.new_position, data.len() as u64);
2127        assert!(result.incomplete_line_buffer.is_empty());
2128    }
2129
2130    #[tokio::test]
2131    async fn test_process_file_content_no_newlines() {
2132        let data = b"no newlines here".to_vec();
2133        let mut reader = create_mock_reader(data.clone());
2134        let max_buf_size = 10;
2135
2136        let result =
2137            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2138                .await
2139                .unwrap();
2140
2141        assert!(result.lines.is_empty());
2142        assert_eq!(result.new_position, data.len() as u64);
2143        assert_eq!(result.incomplete_line_buffer, b"no newlines here");
2144    }
2145
2146    #[tokio::test]
2147    async fn test_process_file_content_file_truncation() {
2148        let data = b"line1\nline2\n".to_vec();
2149        let mut reader = create_mock_reader(data.clone());
2150
2151        // Simulate current position being beyond file size (file was truncated)
2152        let result = process_file_content(
2153            &mut reader,
2154            100, // position beyond file size
2155            data.len() as u64,
2156            Vec::new(),
2157            10, // max_buf_size
2158        )
2159        .await
2160        .unwrap();
2161
2162        // Should reset to beginning and read all lines
2163        assert_eq!(result.lines.len(), 2);
2164        assert_eq!(result.lines[0], b"line1");
2165        assert_eq!(result.lines[1], b"line2");
2166        assert_eq!(result.new_position, data.len() as u64);
2167        assert!(result.incomplete_line_buffer.is_empty());
2168    }
2169
2170    #[tokio::test]
2171    async fn test_process_file_content_seek_to_position() {
2172        let data = b"line1\nline2\nline3\n".to_vec();
2173        let mut reader = create_mock_reader(data.clone());
2174
2175        // Start reading from position 6 (after "line1\n")
2176        let result = process_file_content(&mut reader, 6, data.len() as u64, Vec::new(), 10)
2177            .await
2178            .unwrap();
2179
2180        assert_eq!(result.lines.len(), 2);
2181        assert_eq!(result.lines[0], b"line2");
2182        assert_eq!(result.lines[1], b"line3");
2183        assert_eq!(result.new_position, data.len() as u64);
2184        assert!(result.incomplete_line_buffer.is_empty());
2185    }
2186
2187    #[tokio::test]
2188    async fn test_process_file_content_position_equals_file_size() {
2189        let data = b"line1\nline2\n".to_vec();
2190        let mut reader = create_mock_reader(data.clone());
2191
2192        // Start reading from end of file
2193        let result = process_file_content(
2194            &mut reader,
2195            data.len() as u64,
2196            data.len() as u64,
2197            Vec::new(),
2198            10,
2199        )
2200        .await
2201        .unwrap();
2202
2203        // Should not read anything new
2204        assert!(
2205            result.lines.is_empty(),
2206            "Expected empty line got {:?}",
2207            result.lines
2208        );
2209        assert_eq!(result.new_position, data.len() as u64);
2210        assert!(result.incomplete_line_buffer.is_empty());
2211    }
2212
2213    #[tokio::test]
2214    async fn test_process_file_content_large_line_truncation() {
2215        // Create a line longer than MAX_LINE_SIZE
2216        let large_line = "x".repeat(MAX_LINE_SIZE + 1000);
2217        let data = format!("{}\nline2\n", large_line).into_bytes();
2218        let mut reader = create_mock_reader(data.clone());
2219
2220        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2221            .await
2222            .unwrap();
2223
2224        assert_eq!(result.lines.len(), 2);
2225
2226        // First line should be truncated
2227        assert_eq!(
2228            result.lines[0].len(),
2229            MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2230        );
2231        assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2232
2233        // Second line should be normal
2234        assert_eq!(result.lines[1], b"line2");
2235
2236        assert_eq!(result.new_position, data.len() as u64);
2237        assert!(result.incomplete_line_buffer.is_empty());
2238    }
2239
2240    #[tokio::test]
2241    async fn test_process_file_content_mixed_line_endings() {
2242        let data = b"line1\nline2\r\nline3\n".to_vec();
2243        let mut reader = create_mock_reader(data.clone());
2244
2245        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2246            .await
2247            .unwrap();
2248
2249        assert_eq!(result.lines.len(), 3);
2250        assert_eq!(result.lines[0], b"line1");
2251        assert_eq!(result.lines[1], b"line2\r"); // \r is preserved
2252        assert_eq!(result.lines[2], b"line3");
2253        assert_eq!(result.new_position, data.len() as u64);
2254        assert!(result.incomplete_line_buffer.is_empty());
2255    }
2256
2257    #[tokio::test]
2258    async fn test_process_file_content_existing_buffer_with_truncation() {
2259        // Create a scenario where existing buffer + new data creates a line that needs truncation
2260        let existing_buffer = "x".repeat(MAX_LINE_SIZE - 100);
2261        let data = format!("{}\nline2\n", "y".repeat(200)).into_bytes();
2262        let mut reader = create_mock_reader(data.clone());
2263
2264        let result = process_file_content(
2265            &mut reader,
2266            0,
2267            data.len() as u64,
2268            existing_buffer.into_bytes(),
2269            10,
2270        )
2271        .await
2272        .unwrap();
2273
2274        assert_eq!(result.lines.len(), 2);
2275
2276        // First line should be truncated (existing buffer + new data)
2277        assert_eq!(
2278            result.lines[0].len(),
2279            MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2280        );
2281        assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2282
2283        // Second line should be normal
2284        assert_eq!(result.lines[1], b"line2");
2285
2286        assert_eq!(result.new_position, data.len() as u64);
2287        assert!(result.incomplete_line_buffer.is_empty());
2288    }
2289
2290    #[tokio::test]
2291    async fn test_process_file_content_single_character_lines() {
2292        let data = b"a\nb\nc\n".to_vec();
2293        let mut reader = create_mock_reader(data.clone());
2294
2295        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2296            .await
2297            .unwrap();
2298
2299        assert_eq!(result.lines.len(), 3);
2300        assert_eq!(result.lines[0], b"a");
2301        assert_eq!(result.lines[1], b"b");
2302        assert_eq!(result.lines[2], b"c");
2303        assert_eq!(result.new_position, data.len() as u64);
2304        assert!(result.incomplete_line_buffer.is_empty());
2305    }
2306
2307    #[tokio::test]
2308    async fn test_process_file_content_binary_data() {
2309        let data = vec![0x00, 0x01, 0x02, b'\n', 0xFF, 0xFE, b'\n'];
2310        let mut reader = create_mock_reader(data.clone());
2311
2312        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2313            .await
2314            .unwrap();
2315
2316        assert_eq!(result.lines.len(), 2);
2317        assert_eq!(result.lines[0], vec![0x00, 0x01, 0x02]);
2318        assert_eq!(result.lines[1], vec![0xFF, 0xFE]);
2319        assert_eq!(result.new_position, data.len() as u64);
2320        assert!(result.incomplete_line_buffer.is_empty());
2321    }
2322
2323    #[tokio::test]
2324    async fn test_process_file_content_resume_after_max_buffer_size() {
2325        // Test data: 3 lines as specified in the example
2326        let data = b"line 1\nline 2\nline 3\n".to_vec();
2327        let mut reader = create_mock_reader(data.clone());
2328        let max_buffer_size = 2; // Limit to 2 lines per call
2329
2330        // First call: should return first 2 lines
2331        let result1 = process_file_content(
2332            &mut reader,
2333            0, // start from beginning
2334            data.len() as u64,
2335            Vec::new(), // no existing buffer
2336            max_buffer_size,
2337        )
2338        .await
2339        .unwrap();
2340
2341        // Verify first call results
2342        assert_eq!(result1.lines.len(), 2, "First call should return 2 lines");
2343        assert_eq!(result1.lines[0], b"line 1");
2344        assert_eq!(result1.lines[1], b"line 2");
2345        assert!(result1.incomplete_line_buffer.is_empty());
2346
2347        // The position should be after "line 1\nline 2\n" (14 bytes)
2348        let expected_position_after_first_call = b"line 1\nline 2\n".len() as u64;
2349        assert_eq!(result1.new_position, expected_position_after_first_call);
2350
2351        // Second call: resume from where first call left off
2352        let mut reader2 = create_mock_reader(data.clone());
2353        let result2 = process_file_content(
2354            &mut reader2,
2355            result1.new_position, // resume from previous position
2356            data.len() as u64,
2357            result1.incomplete_line_buffer, // pass any incomplete buffer (should be empty)
2358            max_buffer_size,
2359        )
2360        .await
2361        .unwrap();
2362
2363        // Verify second call results
2364        assert_eq!(result2.lines.len(), 1, "Second call should return 1 line");
2365        assert_eq!(result2.lines[0], b"line 3");
2366        assert!(result2.incomplete_line_buffer.is_empty());
2367        assert_eq!(result2.new_position, data.len() as u64);
2368    }
2369
2370    #[tokio::test]
2371    async fn test_utf8_truncation() {
2372        // Test that StreamFwder doesn't panic when truncating lines
2373        // with multi-byte chars.
2374
2375        hyperactor_telemetry::initialize_logging_for_test();
2376
2377        // Create a line longer than MAX_LINE_SIZE with an emoji at the boundary
2378        let mut long_line = "x".repeat(MAX_LINE_SIZE - 1);
2379        long_line.push('🦀'); // 4-byte emoji - truncation will land in the middle
2380        long_line.push('\n');
2381
2382        // Create IO streams
2383        let (mut writer, reader) = tokio::io::duplex(8192);
2384
2385        // Start StreamFwder
2386        let monitor = StreamFwder::start_with_writer(
2387            reader,
2388            Box::new(tokio::io::sink()), // discard output
2389            None,                        // no file monitor needed
2390            OutputTarget::Stdout,
2391            1,     // tail buffer of 1 (need at least one sink)
2392            None,  // no log channel
2393            12345, // pid
2394            None,  // no prefix
2395        );
2396
2397        // Write the problematic line
2398        writer.write_all(long_line.as_bytes()).await.unwrap();
2399        drop(writer); // Close to signal EOF
2400
2401        // Wait for completion - should NOT panic
2402        let (_lines, result) = monitor.abort().await;
2403        result.expect("Should complete without panic despite UTF-8 truncation");
2404    }
2405}