hyperactor_mesh/
logging.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::fmt;
12use std::path::Path;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::Duration;
16use std::time::SystemTime;
17
18use anyhow::Result;
19use async_trait::async_trait;
20use chrono::DateTime;
21use chrono::Local;
22use hostname;
23use hyperactor::Actor;
24use hyperactor::ActorRef;
25use hyperactor::Bind;
26use hyperactor::Context;
27use hyperactor::HandleClient;
28use hyperactor::Handler;
29use hyperactor::Instance;
30use hyperactor::Named;
31use hyperactor::OncePortRef;
32use hyperactor::RefClient;
33use hyperactor::Unbind;
34use hyperactor::channel;
35use hyperactor::channel::ChannelAddr;
36use hyperactor::channel::ChannelRx;
37use hyperactor::channel::ChannelTransport;
38use hyperactor::channel::ChannelTx;
39use hyperactor::channel::Rx;
40use hyperactor::channel::Tx;
41use hyperactor::channel::TxStatus;
42use hyperactor::clock::Clock;
43use hyperactor::clock::RealClock;
44use hyperactor::config::CONFIG;
45use hyperactor::config::ConfigAttr;
46use hyperactor::data::Serialized;
47use hyperactor::declare_attrs;
48use hyperactor_telemetry::env;
49use hyperactor_telemetry::log_file_path;
50use serde::Deserialize;
51use serde::Serialize;
52use tokio::io;
53use tokio::io::AsyncRead;
54use tokio::io::AsyncReadExt;
55use tokio::io::AsyncWriteExt;
56use tokio::sync::Mutex;
57use tokio::sync::Notify;
58use tokio::sync::RwLock;
59use tokio::sync::watch::Receiver;
60use tokio::task::JoinHandle;
61use tracing::Level;
62
63use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
64use crate::shortuuid::ShortUuid;
65
66mod line_prefixing_writer;
67
68pub(crate) const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5;
69const MAX_LINE_SIZE: usize = 4 * 1024;
70
71declare_attrs! {
72    /// Maximum number of lines to batch before flushing to client
73    /// This means that stdout/err reader will be paused after reading `HYPERACTOR_READ_LOG_BUFFER` lines.
74    /// After pause lines will be flushed and reading will resume.
75    @meta(CONFIG = ConfigAttr {
76        env_name: Some("HYPERACTOR_READ_LOG_BUFFER".to_string()),
77        py_name: None,
78    })
79    pub attr READ_LOG_BUFFER: usize = 100;
80
81    /// If enabled, local logs are also written to a file and aggregated
82    @meta(CONFIG = ConfigAttr {
83        env_name: Some("HYPERACTOR_FORCE_FILE_LOG".to_string()),
84        py_name: None,
85    })
86    pub attr FORCE_FILE_LOG: bool = false;
87
88    /// Prefixes logs with rank
89    @meta(CONFIG = ConfigAttr {
90        env_name: Some("HYPERACTOR_PREFIX_WITH_RANK".to_string()),
91        py_name: None,
92    })
93    pub attr PREFIX_WITH_RANK: bool = true;
94}
95
96/// Calculate the Levenshtein distance between two strings
97fn levenshtein_distance(left: &str, right: &str) -> usize {
98    let left_chars: Vec<char> = left.chars().collect();
99    let right_chars: Vec<char> = right.chars().collect();
100
101    let left_len = left_chars.len();
102    let right_len = right_chars.len();
103
104    // Handle edge cases
105    if left_len == 0 {
106        return right_len;
107    }
108    if right_len == 0 {
109        return left_len;
110    }
111
112    // Create a matrix of size (len_s1+1) x (len_s2+1)
113    let mut matrix = vec![vec![0; right_len + 1]; left_len + 1];
114
115    // Initialize the first row and column
116    for (i, row) in matrix.iter_mut().enumerate().take(left_len + 1) {
117        row[0] = i;
118    }
119    for (j, cell) in matrix[0].iter_mut().enumerate().take(right_len + 1) {
120        *cell = j;
121    }
122
123    // Fill the matrix
124    for i in 1..=left_len {
125        for j in 1..=right_len {
126            let cost = if left_chars[i - 1] == right_chars[j - 1] {
127                0
128            } else {
129                1
130            };
131
132            matrix[i][j] = std::cmp::min(
133                std::cmp::min(
134                    matrix[i - 1][j] + 1, // deletion
135                    matrix[i][j - 1] + 1, // insertion
136                ),
137                matrix[i - 1][j - 1] + cost, // substitution
138            );
139        }
140    }
141
142    // Return the bottom-right cell
143    matrix[left_len][right_len]
144}
145
146/// Calculate the normalized edit distance between two strings (0.0 to 1.0)
147fn normalized_edit_distance(left: &str, right: &str) -> f64 {
148    let distance = levenshtein_distance(left, right) as f64;
149    let max_len = std::cmp::max(left.len(), right.len()) as f64;
150
151    if max_len == 0.0 {
152        0.0 // Both strings are empty, so they're identical
153    } else {
154        distance / max_len
155    }
156}
157
158#[derive(Debug, Clone)]
159/// LogLine represents a single log line with its content and count
160struct LogLine {
161    content: String,
162    pub count: u64,
163}
164
165impl LogLine {
166    fn new(content: String) -> Self {
167        Self { content, count: 1 }
168    }
169}
170
171impl fmt::Display for LogLine {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        write!(
174            f,
175            "\x1b[33m[{} similar log lines]\x1b[0m {}",
176            self.count, self.content
177        )
178    }
179}
180
181#[derive(Debug, Clone)]
182/// Aggregator is a struct that holds a list of LogLines and a start time.
183/// It can aggregate new log lines to existing ones if they are "similar" based on edit distance.
184struct Aggregator {
185    lines: Vec<LogLine>,
186    start_time: SystemTime,
187    similarity_threshold: f64, // Threshold for considering two strings similar (0.0 to 1.0)
188}
189
190impl Aggregator {
191    fn new() -> Self {
192        // Default threshold: strings with normalized edit distance < 0.15 are considered similar
193        Self::new_with_threshold(0.15)
194    }
195
196    fn new_with_threshold(threshold: f64) -> Self {
197        Aggregator {
198            lines: vec![],
199            start_time: RealClock.system_time_now(),
200            similarity_threshold: threshold,
201        }
202    }
203
204    fn reset(&mut self) {
205        self.lines.clear();
206        self.start_time = RealClock.system_time_now();
207    }
208
209    fn add_line(&mut self, line: &str) -> anyhow::Result<()> {
210        // Find the most similar existing line
211        let mut best_match_idx = None;
212        let mut best_similarity = f64::MAX;
213
214        for (idx, existing_line) in self.lines.iter().enumerate() {
215            let distance = normalized_edit_distance(&existing_line.content, line);
216
217            // If this line is more similar than our current best match
218            if distance < best_similarity && distance < self.similarity_threshold {
219                best_match_idx = Some(idx);
220                best_similarity = distance;
221            }
222        }
223
224        // If we found a similar enough line, increment its count
225        if let Some(idx) = best_match_idx {
226            self.lines[idx].count += 1;
227        } else {
228            // Otherwise, add a new line
229            self.lines.push(LogLine::new(line.to_string()));
230        }
231
232        Ok(())
233    }
234
235    fn is_empty(&self) -> bool {
236        self.lines.is_empty()
237    }
238}
239
240// Helper function to format SystemTime
241fn format_system_time(time: SystemTime) -> String {
242    let datetime: DateTime<Local> = time.into();
243    datetime.format("%Y-%m-%d %H:%M:%S").to_string()
244}
245
246impl fmt::Display for Aggregator {
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        // Format the start time
249        let start_time_str = format_system_time(self.start_time);
250
251        // Get and format the current time
252        let current_time = RealClock.system_time_now();
253        let end_time_str = format_system_time(current_time);
254
255        // Write the header with formatted time window
256        writeln!(
257            f,
258            "\x1b[36m>>> Aggregated Logs ({}) >>>\x1b[0m",
259            start_time_str
260        )?;
261
262        // Write each log line
263        for line in self.lines.iter() {
264            writeln!(f, "{}", line)?;
265        }
266        writeln!(
267            f,
268            "\x1b[36m<<< Aggregated Logs ({}) <<<\x1b[0m",
269            end_time_str
270        )?;
271        Ok(())
272    }
273}
274
275/// Messages that can be sent to the LogClientActor remotely.
276#[derive(
277    Debug,
278    Clone,
279    Serialize,
280    Deserialize,
281    Named,
282    Handler,
283    HandleClient,
284    RefClient
285)]
286pub enum LogMessage {
287    /// Log details
288    Log {
289        /// The hostname of the process that generated the log
290        hostname: String,
291        /// The pid of the process that generated the log
292        pid: u32,
293        /// The target output stream (stdout or stderr)
294        output_target: OutputTarget,
295        /// The log payload as bytes
296        payload: Serialized,
297    },
298
299    /// Flush the log
300    Flush {
301        /// Indicate if the current flush is synced or non-synced.
302        /// If synced, a version number is available. Otherwise, none.
303        sync_version: Option<u64>,
304    },
305}
306
307/// Messages that can be sent to the LogClient locally.
308#[derive(
309    Debug,
310    Clone,
311    Serialize,
312    Deserialize,
313    Named,
314    Handler,
315    HandleClient,
316    RefClient
317)]
318pub enum LogClientMessage {
319    SetAggregate {
320        /// The time window in seconds to aggregate logs. If None, aggregation is disabled.
321        aggregate_window_sec: Option<u64>,
322    },
323
324    /// Synchronously flush all the logs from all the procs. This is for client to call.
325    StartSyncFlush {
326        /// Expect these many procs to ack the flush message.
327        expected_procs: usize,
328        /// Return once we have received the acks from all the procs
329        reply: OncePortRef<()>,
330        /// Return to the caller the current flush version
331        version: OncePortRef<u64>,
332    },
333}
334
335/// Trait for sending logs
336#[async_trait]
337pub trait LogSender: Send + Sync {
338    /// Send a log payload in bytes
339    fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()>;
340
341    /// Flush the log channel, ensuring all messages are delivered
342    /// Returns when the flush message has been acknowledged
343    fn flush(&mut self) -> anyhow::Result<()>;
344}
345
346/// Represents the target output stream (stdout or stderr)
347#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
348pub enum OutputTarget {
349    /// Standard output stream
350    Stdout,
351    /// Standard error stream
352    Stderr,
353}
354
355#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
356pub enum Stream {
357    /// Standard output stream
358    ChildStdout,
359    /// Standard error stream
360    ChildStderr,
361}
362
363/// Write the log to a local unix channel so some actors can listen to it and stream the log back.
364pub struct LocalLogSender {
365    hostname: String,
366    pid: u32,
367    tx: ChannelTx<LogMessage>,
368    status: Receiver<TxStatus>,
369}
370
371impl LocalLogSender {
372    fn new(log_channel: ChannelAddr, pid: u32) -> Result<Self, anyhow::Error> {
373        let tx = channel::dial::<LogMessage>(log_channel)?;
374        let status = tx.status().clone();
375
376        let hostname = hostname::get()
377            .unwrap_or_else(|_| "unknown_host".into())
378            .into_string()
379            .unwrap_or("unknown_host".to_string());
380        Ok(Self {
381            hostname,
382            pid,
383            tx,
384            status,
385        })
386    }
387}
388
389#[async_trait]
390impl LogSender for LocalLogSender {
391    fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()> {
392        if TxStatus::Active == *self.status.borrow() {
393            // Do not use tx.send, it will block the allocator as the child process state is unknown.
394            self.tx.post(LogMessage::Log {
395                hostname: self.hostname.clone(),
396                pid: self.pid,
397                output_target: target,
398                payload: Serialized::serialize(&payload)?,
399            });
400        }
401
402        Ok(())
403    }
404
405    fn flush(&mut self) -> anyhow::Result<()> {
406        // send will make sure message is delivered
407        if TxStatus::Active == *self.status.borrow() {
408            // Do not use tx.send, it will block the allocator as the child process state is unknown.
409            self.tx.post(LogMessage::Flush { sync_version: None });
410        }
411        Ok(())
412    }
413}
414
415/// Message sent to FileMonitor
416#[derive(Debug, Clone, Serialize, Deserialize, Named)]
417pub struct FileMonitorMessage {
418    lines: Vec<String>,
419}
420
421/// File appender, coordinates write access to a file via a channel.
422pub struct FileAppender {
423    stdout_addr: ChannelAddr,
424    stderr_addr: ChannelAddr,
425    #[allow(dead_code)] // Tasks are self terminating
426    stdout_task: JoinHandle<()>,
427    #[allow(dead_code)]
428    stderr_task: JoinHandle<()>,
429    stop: Arc<Notify>,
430}
431
432impl fmt::Debug for FileAppender {
433    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434        f.debug_struct("FileMonitor")
435            .field("stdout_addr", &self.stdout_addr)
436            .field("stderr_addr", &self.stderr_addr)
437            .finish()
438    }
439}
440
441impl FileAppender {
442    /// Create a new FileAppender with aggregated log files for stdout and stderr
443    /// Returns None if file creation fails
444    pub fn new() -> Option<Self> {
445        let stop = Arc::new(Notify::new());
446        // TODO make it configurable
447        let file_name_tag = hostname::get()
448            .unwrap_or_else(|_| "unknown_host".into())
449            .into_string()
450            .unwrap_or("unknown_host".to_string());
451
452        // Create stdout file and task
453        let (stdout_path, stdout_writer) =
454            match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stdout) {
455                Some(writer) => writer,
456                None => {
457                    tracing::warn!("failed to create stdout file");
458                    return None;
459                }
460            };
461        let (stdout_addr, stdout_rx) = {
462            let _guard = tracing::span!(Level::INFO, "appender", file = "stdout").entered();
463            match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
464                Ok((addr, rx)) => (addr, rx),
465                Err(e) => {
466                    tracing::warn!("failed to serve stdout channel: {}", e);
467                    return None;
468                }
469            }
470        };
471        let stdout_stop = stop.clone();
472        let stdout_task = tokio::spawn(file_monitor_task(
473            stdout_rx,
474            stdout_writer,
475            OutputTarget::Stdout,
476            stdout_stop,
477        ));
478
479        // Create stderr file and task
480        let (stderr_path, stderr_writer) =
481            match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stderr) {
482                Some(writer) => writer,
483                None => {
484                    tracing::warn!("failed to create stderr file");
485                    return None;
486                }
487            };
488        let (stderr_addr, stderr_rx) = {
489            let _guard = tracing::span!(Level::INFO, "appender", file = "stderr").entered();
490            match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
491                Ok((addr, rx)) => (addr, rx),
492                Err(e) => {
493                    tracing::warn!("failed to serve stderr channel: {}", e);
494                    return None;
495                }
496            }
497        };
498        let stderr_stop = stop.clone();
499        let stderr_task = tokio::spawn(file_monitor_task(
500            stderr_rx,
501            stderr_writer,
502            OutputTarget::Stderr,
503            stderr_stop,
504        ));
505
506        tracing::debug!(
507            "FileAppender: created for stdout {} stderr {} ",
508            stdout_path.display(),
509            stderr_path.display()
510        );
511
512        Some(Self {
513            stdout_addr,
514            stderr_addr,
515            stdout_task,
516            stderr_task,
517            stop,
518        })
519    }
520
521    /// Get a channel address for the specified output target
522    pub fn addr_for(&self, target: OutputTarget) -> ChannelAddr {
523        match target {
524            OutputTarget::Stdout => self.stdout_addr.clone(),
525            OutputTarget::Stderr => self.stderr_addr.clone(),
526        }
527    }
528}
529
530impl Drop for FileAppender {
531    fn drop(&mut self) {
532        // Trigger stop signal to notify tasks to exit
533        self.stop.notify_waiters();
534        tracing::debug!("FileMonitor: dropping, stop signal sent, tasks will flush and exit");
535    }
536}
537
538/// Task that receives lines from StreamFwds and writes them to the aggregated file
539async fn file_monitor_task(
540    mut rx: ChannelRx<FileMonitorMessage>,
541    mut writer: Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
542    target: OutputTarget,
543    stop: Arc<Notify>,
544) {
545    loop {
546        tokio::select! {
547            msg = rx.recv() => {
548                match msg {
549                    Ok(msg) => {
550                        // Write lines to aggregated file
551                        for line in &msg.lines {
552                            if let Err(e) = writer.write_all(line.as_bytes()).await {
553                                tracing::warn!("FileMonitor: failed to write line to file: {}", e);
554                                continue;
555                            }
556                            if let Err(e) = writer.write_all(b"\n").await {
557                                tracing::warn!("FileMonitor: failed to write newline to file: {}", e);
558                            }
559                        }
560                        if let Err(e) = writer.flush().await {
561                            tracing::warn!("FileMonitor: failed to flush file: {}", e);
562                        }
563                    }
564                    Err(e) => {
565                        // Channel error
566                        tracing::debug!("FileMonitor task for {:?}: channel error: {}", target, e);
567                        break;
568                    }
569                }
570            }
571            _ = stop.notified() => {
572                tracing::debug!("FileMonitor task for {:?}: stop signal received", target);
573                break;
574            }
575        }
576    }
577
578    // Graceful shutdown: flush one last time
579    if let Err(e) = writer.flush().await {
580        tracing::warn!("FileMonitor: failed final flush: {}", e);
581    }
582    tracing::debug!("FileMonitor task for {:?} exiting", target);
583}
584
585fn create_unique_file_writer(
586    file_name_tag: &str,
587    output_target: OutputTarget,
588    env: env::Env,
589) -> Result<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
590    let suffix = match output_target {
591        OutputTarget::Stderr => "stderr",
592        OutputTarget::Stdout => "stdout",
593    };
594    let (path, filename) = log_file_path(env, None)?;
595    let path = Path::new(&path);
596    let mut full_path = PathBuf::from(path);
597
598    let uuid = ShortUuid::generate();
599
600    full_path.push(format!(
601        "{}_{}_{}.{}",
602        filename, file_name_tag, uuid, suffix
603    ));
604    let file = std::fs::OpenOptions::new()
605        .create(true)
606        .append(true)
607        .open(full_path.clone())?;
608    let tokio_file = tokio::fs::File::from_std(file);
609    // TODO: should we buffer this?
610    Ok((full_path, Box::new(tokio_file)))
611}
612
613fn get_unique_local_log_destination(
614    file_name_tag: &str,
615    output_target: OutputTarget,
616) -> Option<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
617    let env: env::Env = env::Env::current();
618    if env == env::Env::Local && !hyperactor::config::global::get(FORCE_FILE_LOG) {
619        tracing::debug!("not creating log file because of env type");
620        None
621    } else {
622        match create_unique_file_writer(file_name_tag, output_target, env) {
623            Ok((a, b)) => Some((a, b)),
624            Err(e) => {
625                tracing::warn!("failed to create unique file writer: {}", e);
626                None
627            }
628        }
629    }
630}
631
632/// Create a writer for stdout or stderr
633fn std_writer(target: OutputTarget) -> Box<dyn io::AsyncWrite + Send + Unpin> {
634    // Return the appropriate standard output or error writer
635    match target {
636        OutputTarget::Stdout => Box::new(tokio::io::stdout()),
637        OutputTarget::Stderr => Box::new(tokio::io::stderr()),
638    }
639}
640
641/// Copy bytes from `reader` to `writer`, forward to log_sender, and forward to FileMonitor.
642/// The same formatted lines go to both log_sender and file_monitor.
643async fn tee(
644    mut reader: impl AsyncRead + Unpin + Send + 'static,
645    mut std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
646    log_sender: Option<Box<dyn LogSender + Send>>,
647    file_monitor_addr: Option<ChannelAddr>,
648    target: OutputTarget,
649    prefix: Option<String>,
650    stop: Arc<Notify>,
651    recent_lines_buf: RotatingLineBuffer,
652) -> Result<(), io::Error> {
653    let mut buf = [0u8; 8192];
654    let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE);
655    let mut log_sender = log_sender;
656
657    // Dial the file monitor channel if provided
658    let mut file_monitor_tx: Option<ChannelTx<FileMonitorMessage>> =
659        file_monitor_addr.and_then(|addr| match channel::dial(addr.clone()) {
660            Ok(tx) => Some(tx),
661            Err(e) => {
662                tracing::warn!("Failed to dial file monitor channel {}: {}", addr, e);
663                None
664            }
665        });
666
667    loop {
668        tokio::select! {
669            read_result = reader.read(&mut buf) => {
670                match read_result {
671                    Ok(n) => {
672                        if n == 0 {
673                            // EOF reached
674                            tracing::debug!("EOF reached in tee");
675                            break;
676                        }
677
678                        // Write to console
679                        if let Err(e) = std_writer.write_all(&buf[..n]).await {
680                            tracing::warn!("error writing to std: {}", e);
681                        }
682
683                        // Process bytes into lines for log_sender and FileMonitor
684                        let mut completed_lines = Vec::new();
685
686                        for &byte in &buf[..n] {
687                            if byte == b'\n' {
688                                // Complete line found
689                                let mut line = String::from_utf8_lossy(&line_buffer).to_string();
690
691                                // Truncate if too long, respecting UTF-8 boundaries
692                                // (multi-byte chars like emojis can be up to 4 bytes)
693                                if line.len() > MAX_LINE_SIZE {
694                                    let mut truncate_at = MAX_LINE_SIZE;
695                                    while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
696                                        truncate_at -= 1;
697                                    }
698                                    line.truncate(truncate_at);
699                                    line.push_str("... [TRUNCATED]");
700                                }
701
702                                // Prepend with prefix if configured
703                                let final_line = if let Some(ref p) = prefix {
704                                    format!("[{}] {}", p, line)
705                                } else {
706                                    line
707                                };
708
709                                completed_lines.push(final_line);
710                                line_buffer.clear();
711                            } else {
712                                line_buffer.push(byte);
713                            }
714                        }
715
716                        // Send completed lines to both log_sender and FileAppender
717                        if !completed_lines.is_empty() {
718                            if let Some(ref mut sender) = log_sender {
719                                let bytes: Vec<Vec<u8>> = completed_lines.iter()
720                                    .map(|s| s.as_bytes().to_vec())
721                                    .collect();
722                                if let Err(e) = sender.send(target, bytes) {
723                                    tracing::warn!("error sending to log_sender: {}", e);
724                                }
725                            }
726
727                            // Send to FileMonitor via hyperactor channel
728                            if let Some(ref mut tx) = file_monitor_tx {
729                                let msg = FileMonitorMessage {
730                                    lines: completed_lines,
731                                };
732                                // Use post() to avoid blocking
733                                tx.post(msg);
734                            }
735                        }
736
737                        recent_lines_buf.try_add_data(&buf, n);
738                    },
739                    Err(e) => {
740                        tracing::debug!("read error in tee: {}", e);
741                        return Err(e);
742                    }
743                }
744            },
745            _ = stop.notified() => {
746                tracing::debug!("stop signal received in tee");
747                break;
748            }
749        }
750    }
751
752    std_writer.flush().await?;
753
754    // Send any remaining partial line
755    if !line_buffer.is_empty() {
756        let mut line = String::from_utf8_lossy(&line_buffer).to_string();
757        // Truncate if too long, respecting UTF-8 boundaries
758        // (multi-byte chars like emojis can be up to 4 bytes)
759        if line.len() > MAX_LINE_SIZE {
760            let mut truncate_at = MAX_LINE_SIZE;
761            while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
762                truncate_at -= 1;
763            }
764            line.truncate(truncate_at);
765            line.push_str("... [TRUNCATED]");
766        }
767        let final_line = if let Some(ref p) = prefix {
768            format!("[{}] {}", p, line)
769        } else {
770            line
771        };
772
773        let final_lines = vec![final_line];
774
775        // Send to log_sender
776        if let Some(ref mut sender) = log_sender {
777            let bytes: Vec<Vec<u8>> = final_lines.iter().map(|s| s.as_bytes().to_vec()).collect();
778            let _ = sender.send(target, bytes);
779        }
780
781        // Send to FileMonitor
782        if let Some(ref mut tx) = file_monitor_tx {
783            let msg = FileMonitorMessage { lines: final_lines };
784            tx.post(msg);
785        }
786    }
787
788    // Flush log_sender
789    if let Some(ref mut sender) = log_sender {
790        let _ = sender.flush();
791    }
792
793    Ok(())
794}
795
796#[derive(Debug, Clone)]
797struct RotatingLineBuffer {
798    recent_lines: Arc<RwLock<VecDeque<String>>>,
799    max_buffer_size: usize,
800}
801
802impl RotatingLineBuffer {
803    fn try_add_data(&self, buf: &[u8], buf_end: usize) {
804        let data_str = String::from_utf8_lossy(&buf[..buf_end]);
805        let lines: Vec<&str> = data_str.lines().collect();
806
807        if let Ok(mut recent_lines_guard) = self.recent_lines.try_write() {
808            for line in lines {
809                if !line.is_empty() {
810                    recent_lines_guard.push_back(line.to_string());
811                    if recent_lines_guard.len() > self.max_buffer_size {
812                        recent_lines_guard.pop_front();
813                    }
814                }
815            }
816        } else {
817            tracing::debug!("Failed to acquire write lock on recent_lines buffer in tee");
818        }
819    }
820
821    async fn peek(&self) -> Vec<String> {
822        let lines = self.recent_lines.read().await;
823        let start_idx = if lines.len() > self.max_buffer_size {
824            lines.len() - self.max_buffer_size
825        } else {
826            0
827        };
828
829        lines.range(start_idx..).cloned().collect()
830    }
831}
832
833/// Given a stream forwards data to the provided channel.
834pub struct StreamFwder {
835    teer: JoinHandle<Result<(), io::Error>>,
836    // Shared buffer for peek functionality
837    recent_lines_buf: RotatingLineBuffer,
838    // Shutdown signal to stop the monitoring loop
839    stop: Arc<Notify>,
840}
841
842impl StreamFwder {
843    /// Create a new StreamFwder instance, and start monitoring the provided path.
844    /// Once started Monitor will
845    /// - forward logs to log_sender
846    /// - forward logs to file_monitor (if available)
847    /// - pipe reader to target
848    /// - And capture last `max_buffer_size` which can be used to inspect file contents via `peek`.
849    pub fn start(
850        reader: impl AsyncRead + Unpin + Send + 'static,
851        file_monitor_addr: Option<ChannelAddr>,
852        target: OutputTarget,
853        max_buffer_size: usize,
854        log_channel: Option<ChannelAddr>,
855        pid: u32,
856        local_rank: usize,
857    ) -> Self {
858        let prefix = match hyperactor::config::global::get(PREFIX_WITH_RANK) {
859            true => Some(local_rank.to_string()),
860            false => None,
861        };
862        let std_writer = std_writer(target);
863
864        Self::start_with_writer(
865            reader,
866            std_writer,
867            file_monitor_addr,
868            target,
869            max_buffer_size,
870            log_channel,
871            pid,
872            prefix,
873        )
874    }
875
876    /// Create a new StreamFwder instance with a custom writer (used in tests).
877    fn start_with_writer(
878        reader: impl AsyncRead + Unpin + Send + 'static,
879        std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
880        file_monitor_addr: Option<ChannelAddr>,
881        target: OutputTarget,
882        max_buffer_size: usize,
883        log_channel: Option<ChannelAddr>,
884        pid: u32,
885        prefix: Option<String>,
886    ) -> Self {
887        // Sanity: when there is no file sink, no log forwarding, and
888        // `tail_size == 0`, the child should have **inherited** stdio
889        // and no `StreamFwder` should exist. In that case console
890        // mirroring happens via inheritance, not via `StreamFwder`.
891        // If we hit this, we piped unnecessarily.
892        debug_assert!(
893            file_monitor_addr.is_some() || max_buffer_size > 0 || log_channel.is_some(),
894            "StreamFwder started with no sinks and no tail"
895        );
896
897        let stop = Arc::new(Notify::new());
898        let recent_lines_buf = RotatingLineBuffer {
899            recent_lines: Arc::new(RwLock::new(VecDeque::<String>::with_capacity(
900                max_buffer_size,
901            ))),
902            max_buffer_size,
903        };
904
905        let log_sender: Option<Box<dyn LogSender + Send>> = if let Some(addr) = log_channel {
906            match LocalLogSender::new(addr, pid) {
907                Ok(s) => Some(Box::new(s) as Box<dyn LogSender + Send>),
908                Err(e) => {
909                    tracing::error!("failed to create log sender: {}", e);
910                    None
911                }
912            }
913        } else {
914            None
915        };
916
917        let teer_stop = stop.clone();
918        let recent_line_buf_clone = recent_lines_buf.clone();
919        let teer = tokio::spawn(async move {
920            tee(
921                reader,
922                std_writer,
923                log_sender,
924                file_monitor_addr,
925                target,
926                prefix,
927                teer_stop,
928                recent_line_buf_clone,
929            )
930            .await
931        });
932
933        StreamFwder {
934            teer,
935            recent_lines_buf,
936            stop,
937        }
938    }
939
940    pub async fn abort(self) -> (Vec<String>, Result<(), anyhow::Error>) {
941        self.stop.notify_waiters();
942
943        let lines = self.peek().await;
944        let teer_result = self.teer.await;
945
946        let result: Result<(), anyhow::Error> = match teer_result {
947            Ok(inner) => inner.map_err(anyhow::Error::from),
948            Err(e) => Err(e.into()),
949        };
950
951        (lines, result)
952    }
953
954    /// Inspect the latest `max_buffer` lines read from the file being monitored
955    /// Returns lines in chronological order (oldest first)
956    pub async fn peek(&self) -> Vec<String> {
957        self.recent_lines_buf.peek().await
958    }
959}
960
961/// Messages that can be sent to the LogForwarder
962#[derive(
963    Debug,
964    Clone,
965    Serialize,
966    Deserialize,
967    Named,
968    Handler,
969    HandleClient,
970    RefClient,
971    Bind,
972    Unbind
973)]
974pub enum LogForwardMessage {
975    /// Receive the log from the parent process and forward it to the client.
976    Forward {},
977
978    /// If to stream the log back to the client.
979    SetMode { stream_to_client: bool },
980
981    /// Flush the log with a version number.
982    ForceSyncFlush { version: u64 },
983}
984
985/// A log forwarder that receives the log from its parent process and forward it back to the client
986#[derive(Debug)]
987#[hyperactor::export(
988    spawn = true,
989    handlers = [LogForwardMessage {cast = true}],
990)]
991pub struct LogForwardActor {
992    rx: ChannelRx<LogMessage>,
993    flush_tx: Arc<Mutex<ChannelTx<LogMessage>>>,
994    next_flush_deadline: SystemTime,
995    logging_client_ref: ActorRef<LogClientActor>,
996    stream_to_client: bool,
997}
998
999#[async_trait]
1000impl Actor for LogForwardActor {
1001    type Params = ActorRef<LogClientActor>;
1002
1003    async fn new(logging_client_ref: Self::Params) -> Result<Self> {
1004        let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
1005            Ok(channel) => channel.parse()?,
1006            Err(err) => {
1007                tracing::debug!(
1008                    "log forwarder actor failed to read env var {}: {}",
1009                    BOOTSTRAP_LOG_CHANNEL,
1010                    err
1011                );
1012                // TODO: an empty channel to serve
1013                ChannelAddr::any(ChannelTransport::Unix)
1014            }
1015        };
1016        tracing::info!(
1017            "log forwarder {} serve at {}",
1018            std::process::id(),
1019            log_channel
1020        );
1021
1022        let rx = match channel::serve(log_channel.clone()) {
1023            Ok((_, rx)) => rx,
1024            Err(err) => {
1025                // This can happen if we are not spanwed on a separate process like local.
1026                // For local mesh, log streaming anyway is not needed.
1027                tracing::error!(
1028                    "log forwarder actor failed to bootstrap on given channel {}: {}",
1029                    log_channel,
1030                    err
1031                );
1032                channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1
1033            }
1034        };
1035
1036        // Dial the same channel to send flush message to drain the log queue.
1037        let flush_tx = Arc::new(Mutex::new(channel::dial::<LogMessage>(log_channel)?));
1038        let now = RealClock.system_time_now();
1039
1040        Ok(Self {
1041            rx,
1042            flush_tx,
1043            next_flush_deadline: now,
1044            logging_client_ref,
1045            stream_to_client: true,
1046        })
1047    }
1048
1049    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1050        this.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1051
1052        // Make sure we start the flush loop periodically so the log channel will not deadlock.
1053        self.flush_tx
1054            .lock()
1055            .await
1056            .send(LogMessage::Flush { sync_version: None })
1057            .await?;
1058        Ok(())
1059    }
1060}
1061
1062#[async_trait]
1063#[hyperactor::forward(LogForwardMessage)]
1064impl LogForwardMessageHandler for LogForwardActor {
1065    async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
1066        match self.rx.recv().await {
1067            Ok(LogMessage::Flush { sync_version }) => {
1068                let now = RealClock.system_time_now();
1069                match sync_version {
1070                    None => {
1071                        // Schedule another flush to keep the log channel from deadlocking.
1072                        let delay = Duration::from_secs(1);
1073                        if now >= self.next_flush_deadline {
1074                            self.next_flush_deadline = now + delay;
1075                            let flush_tx = self.flush_tx.clone();
1076                            tokio::spawn(async move {
1077                                RealClock.sleep(delay).await;
1078                                if let Err(e) = flush_tx
1079                                    .lock()
1080                                    .await
1081                                    .send(LogMessage::Flush { sync_version: None })
1082                                    .await
1083                                {
1084                                    tracing::error!("failed to send flush message: {}", e);
1085                                }
1086                            });
1087                        }
1088                    }
1089                    version => {
1090                        self.logging_client_ref.flush(ctx, version).await?;
1091                    }
1092                }
1093            }
1094            Ok(LogMessage::Log {
1095                hostname,
1096                pid,
1097                output_target,
1098                payload,
1099            }) => {
1100                if self.stream_to_client {
1101                    self.logging_client_ref
1102                        .log(ctx, hostname, pid, output_target, payload)
1103                        .await?;
1104                }
1105            }
1106            Err(e) => {
1107                return Err(e.into());
1108            }
1109        }
1110
1111        // This is not ideal as we are using raw tx/rx.
1112        ctx.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1113
1114        Ok(())
1115    }
1116
1117    async fn set_mode(
1118        &mut self,
1119        _ctx: &Context<Self>,
1120        stream_to_client: bool,
1121    ) -> Result<(), anyhow::Error> {
1122        self.stream_to_client = stream_to_client;
1123        Ok(())
1124    }
1125
1126    async fn force_sync_flush(
1127        &mut self,
1128        _cx: &Context<Self>,
1129        version: u64,
1130    ) -> Result<(), anyhow::Error> {
1131        self.flush_tx
1132            .lock()
1133            .await
1134            .send(LogMessage::Flush {
1135                sync_version: Some(version),
1136            })
1137            .await
1138            .map_err(anyhow::Error::from)
1139    }
1140}
1141
1142/// Deserialize a serialized message and split it into UTF-8 lines
1143fn deserialize_message_lines(
1144    serialized_message: &hyperactor::data::Serialized,
1145) -> Result<Vec<Vec<String>>> {
1146    // Try to deserialize as Vec<Vec<u8>> first (multiple byte arrays)
1147    if let Ok(message_bytes) = serialized_message.deserialized::<Vec<Vec<u8>>>() {
1148        let mut result = Vec::new();
1149        for bytes in message_bytes {
1150            let message_str = String::from_utf8(bytes)?;
1151            let lines: Vec<String> = message_str.lines().map(|s| s.to_string()).collect();
1152            result.push(lines);
1153        }
1154        return Ok(result);
1155    }
1156
1157    // If that fails, try to deserialize as String and wrap in Vec<Vec<String>>
1158    if let Ok(message) = serialized_message.deserialized::<String>() {
1159        let lines: Vec<String> = message.lines().map(|s| s.to_string()).collect();
1160        return Ok(vec![lines]);
1161    }
1162
1163    // If both fail, return an error
1164    anyhow::bail!("failed to deserialize message as either Vec<Vec<u8>> or String")
1165}
1166
1167/// A client to receive logs from remote processes
1168#[derive(Debug)]
1169#[hyperactor::export(
1170    spawn = true,
1171    handlers = [LogMessage, LogClientMessage],
1172)]
1173pub struct LogClientActor {
1174    aggregate_window_sec: Option<u64>,
1175    aggregators: HashMap<OutputTarget, Aggregator>,
1176    last_flush_time: SystemTime,
1177    next_flush_deadline: Option<SystemTime>,
1178
1179    // For flush sync barrier
1180    current_flush_version: u64,
1181    current_flush_port: Option<OncePortRef<()>>,
1182    current_unflushed_procs: usize,
1183}
1184
1185impl LogClientActor {
1186    fn print_aggregators(&mut self) {
1187        for (output_target, aggregator) in self.aggregators.iter_mut() {
1188            if aggregator.is_empty() {
1189                continue;
1190            }
1191            match output_target {
1192                OutputTarget::Stdout => {
1193                    println!("{}", aggregator);
1194                }
1195                OutputTarget::Stderr => {
1196                    eprintln!("{}", aggregator);
1197                }
1198            }
1199
1200            // Reset the aggregator
1201            aggregator.reset();
1202        }
1203    }
1204
1205    fn print_log_line(hostname: &str, pid: u32, output_target: OutputTarget, line: String) {
1206        let message = format!("[{} {}] {}", hostname, pid, line);
1207
1208        #[cfg(test)]
1209        crate::logging::test_tap::push(&message);
1210
1211        match output_target {
1212            OutputTarget::Stdout => println!("{}", message),
1213            OutputTarget::Stderr => eprintln!("{}", message),
1214        }
1215    }
1216
1217    fn flush_internal(&mut self) {
1218        self.print_aggregators();
1219        self.last_flush_time = RealClock.system_time_now();
1220        self.next_flush_deadline = None;
1221    }
1222}
1223
1224#[async_trait]
1225impl Actor for LogClientActor {
1226    /// The aggregation window in seconds.
1227    type Params = ();
1228
1229    async fn new(_: ()) -> Result<Self, anyhow::Error> {
1230        // Initialize aggregators
1231        let mut aggregators = HashMap::new();
1232        aggregators.insert(OutputTarget::Stderr, Aggregator::new());
1233        aggregators.insert(OutputTarget::Stdout, Aggregator::new());
1234
1235        Ok(Self {
1236            aggregate_window_sec: Some(DEFAULT_AGGREGATE_WINDOW_SEC),
1237            aggregators,
1238            last_flush_time: RealClock.system_time_now(),
1239            next_flush_deadline: None,
1240            current_flush_version: 0,
1241            current_flush_port: None,
1242            current_unflushed_procs: 0,
1243        })
1244    }
1245}
1246
1247impl Drop for LogClientActor {
1248    fn drop(&mut self) {
1249        // Flush the remaining logs before shutting down
1250        self.print_aggregators();
1251    }
1252}
1253
1254#[async_trait]
1255#[hyperactor::forward(LogMessage)]
1256impl LogMessageHandler for LogClientActor {
1257    async fn log(
1258        &mut self,
1259        cx: &Context<Self>,
1260        hostname: String,
1261        pid: u32,
1262        output_target: OutputTarget,
1263        payload: Serialized,
1264    ) -> Result<(), anyhow::Error> {
1265        // Deserialize the message and process line by line with UTF-8
1266        let message_line_groups = deserialize_message_lines(&payload)?;
1267        let hostname = hostname.as_str();
1268
1269        let message_lines: Vec<String> = message_line_groups.into_iter().flatten().collect();
1270        match self.aggregate_window_sec {
1271            None => {
1272                for line in message_lines {
1273                    Self::print_log_line(hostname, pid, output_target, line);
1274                }
1275                self.last_flush_time = RealClock.system_time_now();
1276            }
1277            Some(window) => {
1278                for line in message_lines {
1279                    if let Some(aggregator) = self.aggregators.get_mut(&output_target) {
1280                        if let Err(e) = aggregator.add_line(&line) {
1281                            tracing::error!("error adding log line: {}", e);
1282                            // For the sake of completeness, flush the log lines.
1283                            Self::print_log_line(hostname, pid, output_target, line);
1284                        }
1285                    } else {
1286                        tracing::error!("unknown output target: {:?}", output_target);
1287                        // For the sake of completeness, flush the log lines.
1288                        Self::print_log_line(hostname, pid, output_target, line);
1289                    }
1290                }
1291
1292                let new_deadline = self.last_flush_time + Duration::from_secs(window);
1293                let now = RealClock.system_time_now();
1294                if new_deadline <= now {
1295                    self.flush_internal();
1296                } else {
1297                    let delay = new_deadline.duration_since(now)?;
1298                    match self.next_flush_deadline {
1299                        None => {
1300                            self.next_flush_deadline = Some(new_deadline);
1301                            cx.self_message_with_delay(
1302                                LogMessage::Flush { sync_version: None },
1303                                delay,
1304                            )?;
1305                        }
1306                        Some(deadline) => {
1307                            // Some early log lines have alrady triggered the flush.
1308                            if new_deadline < deadline {
1309                                // This can happen if the user has adjusted the aggregation window.
1310                                self.next_flush_deadline = Some(new_deadline);
1311                                cx.self_message_with_delay(
1312                                    LogMessage::Flush { sync_version: None },
1313                                    delay,
1314                                )?;
1315                            }
1316                        }
1317                    }
1318                }
1319            }
1320        }
1321
1322        Ok(())
1323    }
1324
1325    async fn flush(
1326        &mut self,
1327        cx: &Context<Self>,
1328        sync_version: Option<u64>,
1329    ) -> Result<(), anyhow::Error> {
1330        match sync_version {
1331            None => {
1332                self.flush_internal();
1333            }
1334            Some(version) => {
1335                if version != self.current_flush_version {
1336                    tracing::error!(
1337                        "found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
1338                        version,
1339                        self.current_flush_version
1340                    );
1341                    return Ok(());
1342                }
1343
1344                if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
1345                    // This is a serious issue; it's better to error out.
1346                    anyhow::bail!("found no ongoing flush request");
1347                }
1348                self.current_unflushed_procs -= 1;
1349
1350                tracing::debug!(
1351                    "ack sync flush: version {}; remaining procs: {}",
1352                    self.current_flush_version,
1353                    self.current_unflushed_procs
1354                );
1355
1356                if self.current_unflushed_procs == 0 {
1357                    self.flush_internal();
1358                    let reply = self.current_flush_port.take().unwrap();
1359                    self.current_flush_port = None;
1360                    reply.send(cx, ()).map_err(anyhow::Error::from)?;
1361                }
1362            }
1363        }
1364
1365        Ok(())
1366    }
1367}
1368
1369#[async_trait]
1370#[hyperactor::forward(LogClientMessage)]
1371impl LogClientMessageHandler for LogClientActor {
1372    async fn set_aggregate(
1373        &mut self,
1374        _cx: &Context<Self>,
1375        aggregate_window_sec: Option<u64>,
1376    ) -> Result<(), anyhow::Error> {
1377        if self.aggregate_window_sec.is_some() && aggregate_window_sec.is_none() {
1378            // Make sure we flush whatever in the aggregators before disabling aggregation.
1379            self.print_aggregators();
1380        }
1381        self.aggregate_window_sec = aggregate_window_sec;
1382        Ok(())
1383    }
1384
1385    async fn start_sync_flush(
1386        &mut self,
1387        cx: &Context<Self>,
1388        expected_procs_flushed: usize,
1389        reply: OncePortRef<()>,
1390        version: OncePortRef<u64>,
1391    ) -> Result<(), anyhow::Error> {
1392        if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
1393            tracing::warn!(
1394                "found unfinished ongoing flush: version {}; {} unflushed procs",
1395                self.current_flush_version,
1396                self.current_unflushed_procs,
1397            );
1398        }
1399
1400        self.current_flush_version += 1;
1401        tracing::debug!(
1402            "start sync flush with version {}",
1403            self.current_flush_version
1404        );
1405        self.current_flush_port = Some(reply.clone());
1406        self.current_unflushed_procs = expected_procs_flushed;
1407        version
1408            .send(cx, self.current_flush_version)
1409            .map_err(anyhow::Error::from)?;
1410        Ok(())
1411    }
1412}
1413
1414#[cfg(test)]
1415pub mod test_tap {
1416    use std::sync::Mutex;
1417    use std::sync::OnceLock;
1418
1419    use tokio::sync::mpsc::UnboundedReceiver;
1420    use tokio::sync::mpsc::UnboundedSender;
1421
1422    static TAP: OnceLock<UnboundedSender<String>> = OnceLock::new();
1423    static RX: OnceLock<Mutex<UnboundedReceiver<String>>> = OnceLock::new();
1424
1425    // Called by tests to install the sender.
1426    pub fn install(tx: UnboundedSender<String>) {
1427        let _ = TAP.set(tx);
1428    }
1429
1430    // Called by tests to register the receiver so we can drain later.
1431    pub fn set_receiver(rx: UnboundedReceiver<String>) {
1432        let _ = RX.set(Mutex::new(rx));
1433    }
1434
1435    // Used by LogClientActor (under #[cfg(test)]) to push a line.
1436    pub fn push(s: &str) {
1437        if let Some(tx) = TAP.get() {
1438            let _ = tx.send(s.to_string());
1439        }
1440    }
1441
1442    // Tests call this to collect everything observed so far.
1443    pub fn drain() -> Vec<String> {
1444        let mut out = Vec::new();
1445        if let Some(rx) = RX.get() {
1446            let mut rx = rx.lock().unwrap();
1447            while let Ok(line) = rx.try_recv() {
1448                out.push(line);
1449            }
1450        }
1451        out
1452    }
1453}
1454
1455#[cfg(test)]
1456mod tests {
1457
1458    use std::sync::Arc;
1459    use std::sync::Mutex;
1460
1461    use hyperactor::channel;
1462    use hyperactor::channel::ChannelAddr;
1463    use hyperactor::channel::ChannelTx;
1464    use hyperactor::channel::Tx;
1465    use hyperactor::id;
1466    use hyperactor::mailbox::BoxedMailboxSender;
1467    use hyperactor::mailbox::DialMailboxRouter;
1468    use hyperactor::mailbox::MailboxServer;
1469    use hyperactor::proc::Proc;
1470    use tokio::io::AsyncSeek;
1471    use tokio::io::AsyncSeekExt;
1472    use tokio::io::AsyncWriteExt;
1473    use tokio::io::SeekFrom;
1474    use tokio::sync::mpsc;
1475
1476    use super::*;
1477
1478    /// Result of processing file content
1479    #[derive(Debug)]
1480    struct FileProcessingResult {
1481        /// Complete lines found during processing
1482        lines: Vec<Vec<u8>>,
1483        /// Updated position in the file after processing
1484        new_position: u64,
1485        /// Any remaining incomplete line data, buffered for subsequent reads
1486        incomplete_line_buffer: Vec<u8>,
1487    }
1488
1489    /// Process new file content from a given position, extracting complete lines
1490    /// This function is extracted to enable easier unit testing without file system dependencies
1491    async fn process_file_content<R: AsyncRead + AsyncSeek + Unpin>(
1492        reader: &mut R,
1493        current_position: u64,
1494        file_size: u64,
1495        existing_line_buffer: Vec<u8>,
1496        max_buffer_size: usize,
1497    ) -> Result<FileProcessingResult> {
1498        // If position equals file size, we're at the end
1499        if current_position == file_size {
1500            return Ok(FileProcessingResult {
1501                lines: Vec::new(),
1502                new_position: current_position,
1503                incomplete_line_buffer: existing_line_buffer,
1504            });
1505        }
1506
1507        // Handle potential file truncation/rotation
1508        let actual_position = if current_position > file_size {
1509            tracing::warn!(
1510                "File appears to have been truncated (position {} > file size {}), resetting to start",
1511                current_position,
1512                file_size
1513            );
1514            reader.seek(SeekFrom::Start(0)).await?;
1515            0
1516        } else {
1517            // current_position < file_size
1518            reader.seek(SeekFrom::Start(current_position)).await?;
1519            current_position
1520        };
1521
1522        let mut buf = vec![0u8; 128 * 1024];
1523        let mut line_buffer = existing_line_buffer;
1524        let mut lines = Vec::with_capacity(max_buffer_size);
1525        let mut processed_bytes = 0u64;
1526
1527        loop {
1528            let bytes_read = reader.read(&mut buf).await?;
1529            if bytes_read == 0 {
1530                break;
1531            }
1532
1533            let chunk = &buf[..bytes_read];
1534
1535            let mut start = 0;
1536            while let Some(newline_pos) = chunk[start..].iter().position(|&b| b == b'\n') {
1537                let absolute_pos = start + newline_pos;
1538
1539                line_buffer.extend_from_slice(&chunk[start..absolute_pos]);
1540
1541                if !line_buffer.is_empty() {
1542                    if line_buffer.len() > MAX_LINE_SIZE {
1543                        line_buffer.truncate(MAX_LINE_SIZE);
1544                        line_buffer.extend_from_slice(b"... [TRUNCATED]");
1545                    }
1546
1547                    let line_data = std::mem::replace(&mut line_buffer, Vec::with_capacity(2048));
1548                    lines.push(line_data);
1549                }
1550
1551                start = absolute_pos + 1;
1552
1553                // Check if we've reached the max buffer size after adding each line
1554                if lines.len() >= max_buffer_size {
1555                    // We've processed up to and including the current newline
1556                    // The new position is where we should start reading next time
1557                    let new_position = actual_position + processed_bytes + start as u64;
1558
1559                    // Don't save remaining data - we'll re-read it from the new position
1560                    return Ok(FileProcessingResult {
1561                        lines,
1562                        new_position,
1563                        incomplete_line_buffer: Vec::new(),
1564                    });
1565                }
1566            }
1567
1568            // Only add bytes to processed_bytes if we've fully processed this chunk
1569            processed_bytes += bytes_read as u64;
1570
1571            if start < chunk.len() {
1572                line_buffer.extend_from_slice(&chunk[start..]);
1573            }
1574        }
1575
1576        let new_position = actual_position + processed_bytes;
1577
1578        Ok(FileProcessingResult {
1579            lines,
1580            new_position,
1581            incomplete_line_buffer: line_buffer,
1582        })
1583    }
1584
1585    #[tokio::test]
1586    async fn test_forwarding_log_to_client() {
1587        // Setup the basics
1588        let router = DialMailboxRouter::new();
1589        let (proc_addr, client_rx) =
1590            channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1591        let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
1592        proc.clone().serve(client_rx);
1593        router.bind(id!(client[0]).into(), proc_addr.clone());
1594        let (client, _handle) = proc.instance("client").unwrap();
1595
1596        // Spin up both the forwarder and the client
1597        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
1598        // SAFETY: Unit test
1599        unsafe {
1600            std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
1601        }
1602        let log_client: ActorRef<LogClientActor> =
1603            proc.spawn("log_client", ()).await.unwrap().bind();
1604        let log_forwarder: ActorRef<LogForwardActor> = proc
1605            .spawn("log_forwarder", log_client)
1606            .await
1607            .unwrap()
1608            .bind();
1609
1610        // Write some logs that will not be streamed
1611        let tx: ChannelTx<LogMessage> = channel::dial(log_channel).unwrap();
1612        tx.post(LogMessage::Log {
1613            hostname: "my_host".into(),
1614            pid: 1,
1615            output_target: OutputTarget::Stderr,
1616            payload: Serialized::serialize(&"will not stream".to_string()).unwrap(),
1617        });
1618
1619        // Turn on streaming
1620        log_forwarder.set_mode(&client, true).await.unwrap();
1621        tx.post(LogMessage::Log {
1622            hostname: "my_host".into(),
1623            pid: 1,
1624            output_target: OutputTarget::Stderr,
1625            payload: Serialized::serialize(&"will stream".to_string()).unwrap(),
1626        });
1627
1628        // TODO: it is hard to test out anything meaningful here as the client flushes to stdout.
1629    }
1630
1631    #[test]
1632    fn test_deserialize_message_lines_string() {
1633        // Test deserializing a String message with multiple lines
1634        let message = "Line 1\nLine 2\nLine 3".to_string();
1635        let serialized = Serialized::serialize(&message).unwrap();
1636
1637        let result = deserialize_message_lines(&serialized).unwrap();
1638        assert_eq!(result, vec![vec!["Line 1", "Line 2", "Line 3"]]);
1639
1640        // Test deserializing a Vec<Vec<u8>> message with UTF-8 content
1641        let message_bytes = vec![
1642            "Hello\nWorld".as_bytes().to_vec(),
1643            "UTF-8 \u{1F980}\nTest".as_bytes().to_vec(),
1644        ];
1645        let serialized = Serialized::serialize(&message_bytes).unwrap();
1646
1647        let result = deserialize_message_lines(&serialized).unwrap();
1648        assert_eq!(
1649            result,
1650            vec![vec!["Hello", "World"], vec!["UTF-8 \u{1F980}", "Test"]]
1651        );
1652
1653        // Test deserializing a single line message
1654        let message = "Single line message".to_string();
1655        let serialized = Serialized::serialize(&message).unwrap();
1656
1657        let result = deserialize_message_lines(&serialized).unwrap();
1658
1659        assert_eq!(result, vec![vec!["Single line message"]]);
1660
1661        // Test deserializing an empty lines
1662        let message = "\n\n".to_string();
1663        let serialized = Serialized::serialize(&message).unwrap();
1664
1665        let result = deserialize_message_lines(&serialized).unwrap();
1666
1667        assert_eq!(result, vec![vec!["", ""]]);
1668
1669        // Test error handling for invalid UTF-8 bytes
1670        let invalid_utf8_bytes = vec![vec![0xFF, 0xFE, 0xFD]]; // Invalid UTF-8 sequence in Vec<Vec<u8>>
1671        let serialized = Serialized::serialize(&invalid_utf8_bytes).unwrap();
1672
1673        let result = deserialize_message_lines(&serialized);
1674
1675        // The function should fail when trying to convert invalid UTF-8 bytes to String
1676        assert!(
1677            result.is_err(),
1678            "Expected deserialization to fail with invalid UTF-8 bytes"
1679        );
1680    }
1681    #[allow(dead_code)]
1682    struct MockLogSender {
1683        log_sender: mpsc::UnboundedSender<(OutputTarget, String)>, // (output_target, content)
1684        flush_called: Arc<Mutex<bool>>,                            // Track if flush was called
1685    }
1686
1687    impl MockLogSender {
1688        #[allow(dead_code)]
1689        fn new(log_sender: mpsc::UnboundedSender<(OutputTarget, String)>) -> Self {
1690            Self {
1691                log_sender,
1692                flush_called: Arc::new(Mutex::new(false)),
1693            }
1694        }
1695    }
1696
1697    #[async_trait]
1698    impl LogSender for MockLogSender {
1699        fn send(
1700            &mut self,
1701            output_target: OutputTarget,
1702            payload: Vec<Vec<u8>>,
1703        ) -> anyhow::Result<()> {
1704            // For testing purposes, convert to string if it's valid UTF-8
1705            let lines: Vec<String> = payload
1706                .iter()
1707                .map(|b| String::from_utf8_lossy(b).trim_end_matches('\n').to_owned())
1708                .collect();
1709
1710            for line in lines {
1711                self.log_sender
1712                    .send((output_target, line))
1713                    .map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e))?;
1714            }
1715            Ok(())
1716        }
1717
1718        fn flush(&mut self) -> anyhow::Result<()> {
1719            // Mark that flush was called
1720            let mut flush_called = self.flush_called.lock().unwrap();
1721            *flush_called = true;
1722
1723            // For testing purposes, just return Ok
1724            // In a real implementation, this would wait for all messages to be delivered
1725            Ok(())
1726        }
1727    }
1728
1729    #[test]
1730    fn test_string_similarity() {
1731        // Test exact match
1732        assert_eq!(normalized_edit_distance("hello", "hello"), 0.0);
1733
1734        // Test completely different strings
1735        assert_eq!(normalized_edit_distance("hello", "i'mdiff"), 1.0);
1736
1737        // Test similar strings
1738        assert!(normalized_edit_distance("hello", "helo") < 0.5);
1739        assert!(normalized_edit_distance("hello", "hello!") < 0.5);
1740
1741        // Test empty strings
1742        assert_eq!(normalized_edit_distance("", ""), 0.0);
1743        assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1744    }
1745
1746    #[test]
1747    fn test_add_line_to_empty_aggregator() {
1748        let mut aggregator = Aggregator::new();
1749        let result = aggregator.add_line("ERROR 404 not found");
1750
1751        assert!(result.is_ok());
1752        assert_eq!(aggregator.lines.len(), 1);
1753        assert_eq!(aggregator.lines[0].content, "ERROR 404 not found");
1754        assert_eq!(aggregator.lines[0].count, 1);
1755    }
1756
1757    #[test]
1758    fn test_add_line_merges_with_similar_line() {
1759        let mut aggregator = Aggregator::new_with_threshold(0.2);
1760
1761        // Add first line
1762        aggregator.add_line("ERROR 404 timeout").unwrap();
1763        assert_eq!(aggregator.lines.len(), 1);
1764
1765        // Add second line that should merge (similar enough)
1766        aggregator.add_line("ERROR 500 timeout").unwrap();
1767        assert_eq!(aggregator.lines.len(), 1); // Should still be 1 line after merge
1768        assert_eq!(aggregator.lines[0].count, 2);
1769
1770        // Add third line that's too different
1771        aggregator
1772            .add_line("WARNING database connection failed")
1773            .unwrap();
1774        assert_eq!(aggregator.lines.len(), 2); // Should be 2 lines now
1775
1776        // Add fourth line similar to third
1777        aggregator
1778            .add_line("WARNING database connection timed out")
1779            .unwrap();
1780        assert_eq!(aggregator.lines.len(), 2); // Should still be 2 lines
1781        assert_eq!(aggregator.lines[1].count, 2); // Second group has 2 lines
1782    }
1783
1784    #[test]
1785    fn test_aggregation_of_similar_log_lines() {
1786        let mut aggregator = Aggregator::new_with_threshold(0.2);
1787
1788        // Add the provided log lines with small differences
1789        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/i/ims-ansi, falling back to search/replace to update prefixes for this file.").unwrap();
1790        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,351 conda-unpack-fb:292] Found invalid offsets for lib/pkgconfig/ncursesw.pc, falling back to search/replace to update prefixes for this file.").unwrap();
1791        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/k/kt7, falling back to search/replace to update prefixes for this file.").unwrap();
1792
1793        // Check that we have only one aggregated line due to similarity
1794        assert_eq!(aggregator.lines.len(), 1);
1795
1796        // Check that the count is 3
1797        assert_eq!(aggregator.lines[0].count, 3);
1798    }
1799
1800    #[tokio::test]
1801    async fn test_stream_fwd_creation() {
1802        hyperactor_telemetry::initialize_logging_for_test();
1803
1804        let (mut writer, reader) = tokio::io::duplex(1024);
1805        let (log_channel, mut rx) =
1806            channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1807
1808        // Create a temporary file for testing the writer
1809        let temp_file = tempfile::NamedTempFile::new().unwrap();
1810        let temp_path = temp_file.path().to_path_buf();
1811
1812        // Create file writer that writes to the temp file (using tokio for async compatibility)
1813        let file_writer = tokio::fs::OpenOptions::new()
1814            .create(true)
1815            .write(true)
1816            .append(true)
1817            .open(&temp_path)
1818            .await
1819            .unwrap();
1820
1821        // Create FileMonitor and get address for stdout
1822        let file_monitor = FileAppender::new();
1823        let file_monitor_addr = file_monitor
1824            .as_ref()
1825            .map(|fm| fm.addr_for(OutputTarget::Stdout));
1826
1827        let monitor = StreamFwder::start_with_writer(
1828            reader,
1829            Box::new(file_writer),
1830            file_monitor_addr,
1831            OutputTarget::Stdout,
1832            3, // max_buffer_size
1833            Some(log_channel),
1834            12345, // pid
1835            None,  // no prefix
1836        );
1837
1838        // Wait a bit for set up to be done
1839        RealClock.sleep(Duration::from_millis(500)).await;
1840
1841        // Write initial content through the input writer
1842        writer.write_all(b"Initial log line\n").await.unwrap();
1843        writer.flush().await.unwrap();
1844
1845        // Write more content
1846        for i in 1..=5 {
1847            writer
1848                .write_all(format!("New log line {}\n", i).as_bytes())
1849                .await
1850                .unwrap();
1851        }
1852        writer.flush().await.unwrap();
1853
1854        // Wait a bit for the file to be written and the watcher to detect changes
1855        RealClock.sleep(Duration::from_millis(500)).await;
1856
1857        // Wait until log sender gets message
1858        let timeout = Duration::from_secs(1);
1859        let _ = RealClock
1860            .timeout(timeout, rx.recv())
1861            .await
1862            .unwrap_or_else(|_| panic!("Did not get log message within {:?}", timeout));
1863
1864        // Wait a bit more for all lines to be processed
1865        RealClock.sleep(Duration::from_millis(200)).await;
1866
1867        let (recent_lines, _result) = monitor.abort().await;
1868
1869        assert!(
1870            recent_lines.len() >= 3,
1871            "Expected buffer with at least 3 lines, got {} lines: {:?}",
1872            recent_lines.len(),
1873            recent_lines
1874        );
1875
1876        let file_contents = std::fs::read_to_string(&temp_path).unwrap();
1877        assert!(
1878            file_contents.contains("Initial log line"),
1879            "Expected temp file to contain 'Initial log line', got: {:?}",
1880            file_contents
1881        );
1882        assert!(
1883            file_contents.contains("New log line 1"),
1884            "Expected temp file to contain 'New log line 1', got: {:?}",
1885            file_contents
1886        );
1887        assert!(
1888            file_contents.contains("New log line 5"),
1889            "Expected temp file to contain 'New log line 5', got: {:?}",
1890            file_contents
1891        );
1892    }
1893
1894    #[test]
1895    fn test_aggregator_custom_threshold() {
1896        // Test with very strict threshold (0.05)
1897        let mut strict_aggregator = Aggregator::new_with_threshold(0.05);
1898        strict_aggregator.add_line("ERROR 404").unwrap();
1899        strict_aggregator.add_line("ERROR 500").unwrap(); // Should not merge due to strict threshold
1900        assert_eq!(strict_aggregator.lines.len(), 2);
1901
1902        // Test with very lenient threshold (0.8)
1903        let mut lenient_aggregator = Aggregator::new_with_threshold(0.8);
1904        lenient_aggregator.add_line("ERROR 404").unwrap();
1905        lenient_aggregator.add_line("WARNING 200").unwrap(); // Should merge due to lenient threshold
1906        assert_eq!(lenient_aggregator.lines.len(), 1);
1907        assert_eq!(lenient_aggregator.lines[0].count, 2);
1908    }
1909
1910    #[test]
1911    fn test_format_system_time() {
1912        let test_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1609459200); // 2021-01-01 00:00:00 UTC
1913        let formatted = format_system_time(test_time);
1914
1915        // Just verify it's a reasonable format (contains date and time components)
1916        assert!(formatted.contains("-"));
1917        assert!(formatted.contains(":"));
1918        assert!(formatted.len() > 10); // Should be reasonable length
1919    }
1920
1921    #[test]
1922    fn test_aggregator_display_formatting() {
1923        let mut aggregator = Aggregator::new();
1924        aggregator.add_line("Test error message").unwrap();
1925        aggregator.add_line("Test error message").unwrap(); // Should merge
1926
1927        let display_string = format!("{}", aggregator);
1928
1929        // Verify the output contains expected elements
1930        assert!(display_string.contains("Aggregated Logs"));
1931        assert!(display_string.contains("[2 similar log lines]"));
1932        assert!(display_string.contains("Test error message"));
1933        assert!(display_string.contains(">>>") && display_string.contains("<<<"));
1934    }
1935
1936    #[tokio::test]
1937    async fn test_local_log_sender_inactive_status() {
1938        let (log_channel, _) =
1939            channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1940        let mut sender = LocalLogSender::new(log_channel, 12345).unwrap();
1941
1942        // This test verifies that the sender handles inactive status gracefully
1943        // In a real scenario, the channel would be closed, but for testing we just
1944        // verify the send/flush methods don't panic
1945        let result = sender.send(OutputTarget::Stdout, vec![b"test".to_vec()]);
1946        assert!(result.is_ok());
1947
1948        let result = sender.flush();
1949        assert!(result.is_ok());
1950    }
1951
1952    #[test]
1953    fn test_levenshtein_distance_edge_cases() {
1954        // Test with empty strings
1955        assert_eq!(levenshtein_distance("", ""), 0);
1956        assert_eq!(levenshtein_distance("", "hello"), 5);
1957        assert_eq!(levenshtein_distance("hello", ""), 5);
1958
1959        // Test with identical strings
1960        assert_eq!(levenshtein_distance("hello", "hello"), 0);
1961
1962        // Test with single character differences
1963        assert_eq!(levenshtein_distance("hello", "helo"), 1); // deletion
1964        assert_eq!(levenshtein_distance("helo", "hello"), 1); // insertion
1965        assert_eq!(levenshtein_distance("hello", "hallo"), 1); // substitution
1966
1967        // Test with unicode characters
1968        assert_eq!(levenshtein_distance("café", "cafe"), 1);
1969    }
1970
1971    #[test]
1972    fn test_normalized_edit_distance_edge_cases() {
1973        // Test with empty strings
1974        assert_eq!(normalized_edit_distance("", ""), 0.0);
1975
1976        // Test normalization
1977        assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1978        assert_eq!(normalized_edit_distance("", "hello"), 1.0);
1979
1980        // Test that result is always between 0.0 and 1.0
1981        let distance = normalized_edit_distance("completely", "different");
1982        assert!(distance >= 0.0 && distance <= 1.0);
1983    }
1984
1985    #[tokio::test]
1986    async fn test_deserialize_message_lines_edge_cases() {
1987        // Test with empty string
1988        let empty_message = "".to_string();
1989        let serialized = Serialized::serialize(&empty_message).unwrap();
1990        let result = deserialize_message_lines(&serialized).unwrap();
1991        assert_eq!(result, vec![vec![] as Vec<String>]);
1992
1993        // Test with trailing newline
1994        let trailing_newline = "line1\nline2\n".to_string();
1995        let serialized = Serialized::serialize(&trailing_newline).unwrap();
1996        let result = deserialize_message_lines(&serialized).unwrap();
1997        assert_eq!(result, vec![vec!["line1", "line2"]]);
1998    }
1999
2000    #[test]
2001    fn test_output_target_serialization() {
2002        // Test that OutputTarget can be serialized and deserialized
2003        let stdout_serialized = serde_json::to_string(&OutputTarget::Stdout).unwrap();
2004        let stderr_serialized = serde_json::to_string(&OutputTarget::Stderr).unwrap();
2005
2006        let stdout_deserialized: OutputTarget = serde_json::from_str(&stdout_serialized).unwrap();
2007        let stderr_deserialized: OutputTarget = serde_json::from_str(&stderr_serialized).unwrap();
2008
2009        assert_eq!(stdout_deserialized, OutputTarget::Stdout);
2010        assert_eq!(stderr_deserialized, OutputTarget::Stderr);
2011    }
2012
2013    #[test]
2014    fn test_log_line_display_formatting() {
2015        let log_line = LogLine::new("Test message".to_string());
2016        let display_string = format!("{}", log_line);
2017
2018        assert!(display_string.contains("[1 similar log lines]"));
2019        assert!(display_string.contains("Test message"));
2020
2021        // Test with higher count
2022        let mut log_line_multi = LogLine::new("Test message".to_string());
2023        log_line_multi.count = 5;
2024        let display_string_multi = format!("{}", log_line_multi);
2025
2026        assert!(display_string_multi.contains("[5 similar log lines]"));
2027        assert!(display_string_multi.contains("Test message"));
2028    }
2029
2030    // Mock reader for testing process_file_content using std::io::Cursor
2031    fn create_mock_reader(data: Vec<u8>) -> std::io::Cursor<Vec<u8>> {
2032        std::io::Cursor::new(data)
2033    }
2034
2035    #[tokio::test]
2036    async fn test_process_file_content_basic() {
2037        let data = b"line1\nline2\nline3\n".to_vec();
2038        let mut reader = create_mock_reader(data.clone());
2039        let max_buf_size = 10;
2040
2041        let result =
2042            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2043                .await
2044                .unwrap();
2045
2046        assert_eq!(result.lines.len(), 3);
2047        assert_eq!(result.lines[0], b"line1");
2048        assert_eq!(result.lines[1], b"line2");
2049        assert_eq!(result.lines[2], b"line3");
2050        assert_eq!(result.new_position, data.len() as u64);
2051        assert!(result.incomplete_line_buffer.is_empty());
2052    }
2053
2054    #[tokio::test]
2055    async fn test_process_file_content_incomplete_line() {
2056        let data = b"line1\nline2\npartial".to_vec();
2057        let mut reader = create_mock_reader(data.clone());
2058        let max_buf_size = 10;
2059
2060        let result =
2061            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2062                .await
2063                .unwrap();
2064
2065        assert_eq!(result.lines.len(), 2);
2066        assert_eq!(result.lines[0], b"line1");
2067        assert_eq!(result.lines[1], b"line2");
2068        assert_eq!(result.new_position, data.len() as u64);
2069        assert_eq!(result.incomplete_line_buffer, b"partial");
2070    }
2071
2072    #[tokio::test]
2073    async fn test_process_file_content_with_existing_buffer() {
2074        let data = b"omplete\nline2\nline3\n".to_vec();
2075        let mut reader = create_mock_reader(data.clone());
2076        let existing_buffer = b"inc".to_vec();
2077        let max_buf_size = 10;
2078
2079        let result = process_file_content(
2080            &mut reader,
2081            0,
2082            data.len() as u64,
2083            existing_buffer,
2084            max_buf_size,
2085        )
2086        .await
2087        .unwrap();
2088
2089        assert_eq!(result.lines.len(), 3);
2090        assert_eq!(result.lines[0], b"incomplete");
2091        assert_eq!(result.lines[1], b"line2");
2092        assert_eq!(result.lines[2], b"line3");
2093        assert_eq!(result.new_position, data.len() as u64);
2094        assert!(result.incomplete_line_buffer.is_empty());
2095    }
2096
2097    #[tokio::test]
2098    async fn test_process_file_content_empty_file() {
2099        let data = Vec::new();
2100        let mut reader = create_mock_reader(data.clone());
2101        let max_buf_size = 10;
2102
2103        let result = process_file_content(&mut reader, 0, 0, Vec::new(), max_buf_size)
2104            .await
2105            .unwrap();
2106
2107        assert!(result.lines.is_empty());
2108        assert_eq!(result.new_position, 0);
2109        assert!(result.incomplete_line_buffer.is_empty());
2110    }
2111
2112    #[tokio::test]
2113    async fn test_process_file_content_only_newlines() {
2114        let data = b"\n\n\n".to_vec();
2115        let mut reader = create_mock_reader(data.clone());
2116        let max_buf_size = 10;
2117
2118        let result =
2119            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2120                .await
2121                .unwrap();
2122
2123        // Empty lines should not be added (the function skips empty line_buffer)
2124        assert!(result.lines.is_empty());
2125        assert_eq!(result.new_position, data.len() as u64);
2126        assert!(result.incomplete_line_buffer.is_empty());
2127    }
2128
2129    #[tokio::test]
2130    async fn test_process_file_content_no_newlines() {
2131        let data = b"no newlines here".to_vec();
2132        let mut reader = create_mock_reader(data.clone());
2133        let max_buf_size = 10;
2134
2135        let result =
2136            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2137                .await
2138                .unwrap();
2139
2140        assert!(result.lines.is_empty());
2141        assert_eq!(result.new_position, data.len() as u64);
2142        assert_eq!(result.incomplete_line_buffer, b"no newlines here");
2143    }
2144
2145    #[tokio::test]
2146    async fn test_process_file_content_file_truncation() {
2147        let data = b"line1\nline2\n".to_vec();
2148        let mut reader = create_mock_reader(data.clone());
2149
2150        // Simulate current position being beyond file size (file was truncated)
2151        let result = process_file_content(
2152            &mut reader,
2153            100, // position beyond file size
2154            data.len() as u64,
2155            Vec::new(),
2156            10, // max_buf_size
2157        )
2158        .await
2159        .unwrap();
2160
2161        // Should reset to beginning and read all lines
2162        assert_eq!(result.lines.len(), 2);
2163        assert_eq!(result.lines[0], b"line1");
2164        assert_eq!(result.lines[1], b"line2");
2165        assert_eq!(result.new_position, data.len() as u64);
2166        assert!(result.incomplete_line_buffer.is_empty());
2167    }
2168
2169    #[tokio::test]
2170    async fn test_process_file_content_seek_to_position() {
2171        let data = b"line1\nline2\nline3\n".to_vec();
2172        let mut reader = create_mock_reader(data.clone());
2173
2174        // Start reading from position 6 (after "line1\n")
2175        let result = process_file_content(&mut reader, 6, data.len() as u64, Vec::new(), 10)
2176            .await
2177            .unwrap();
2178
2179        assert_eq!(result.lines.len(), 2);
2180        assert_eq!(result.lines[0], b"line2");
2181        assert_eq!(result.lines[1], b"line3");
2182        assert_eq!(result.new_position, data.len() as u64);
2183        assert!(result.incomplete_line_buffer.is_empty());
2184    }
2185
2186    #[tokio::test]
2187    async fn test_process_file_content_position_equals_file_size() {
2188        let data = b"line1\nline2\n".to_vec();
2189        let mut reader = create_mock_reader(data.clone());
2190
2191        // Start reading from end of file
2192        let result = process_file_content(
2193            &mut reader,
2194            data.len() as u64,
2195            data.len() as u64,
2196            Vec::new(),
2197            10,
2198        )
2199        .await
2200        .unwrap();
2201
2202        // Should not read anything new
2203        assert!(
2204            result.lines.is_empty(),
2205            "Expected empty line got {:?}",
2206            result.lines
2207        );
2208        assert_eq!(result.new_position, data.len() as u64);
2209        assert!(result.incomplete_line_buffer.is_empty());
2210    }
2211
2212    #[tokio::test]
2213    async fn test_process_file_content_large_line_truncation() {
2214        // Create a line longer than MAX_LINE_SIZE
2215        let large_line = "x".repeat(MAX_LINE_SIZE + 1000);
2216        let data = format!("{}\nline2\n", large_line).into_bytes();
2217        let mut reader = create_mock_reader(data.clone());
2218
2219        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2220            .await
2221            .unwrap();
2222
2223        assert_eq!(result.lines.len(), 2);
2224
2225        // First line should be truncated
2226        assert_eq!(
2227            result.lines[0].len(),
2228            MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2229        );
2230        assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2231
2232        // Second line should be normal
2233        assert_eq!(result.lines[1], b"line2");
2234
2235        assert_eq!(result.new_position, data.len() as u64);
2236        assert!(result.incomplete_line_buffer.is_empty());
2237    }
2238
2239    #[tokio::test]
2240    async fn test_process_file_content_mixed_line_endings() {
2241        let data = b"line1\nline2\r\nline3\n".to_vec();
2242        let mut reader = create_mock_reader(data.clone());
2243
2244        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2245            .await
2246            .unwrap();
2247
2248        assert_eq!(result.lines.len(), 3);
2249        assert_eq!(result.lines[0], b"line1");
2250        assert_eq!(result.lines[1], b"line2\r"); // \r is preserved
2251        assert_eq!(result.lines[2], b"line3");
2252        assert_eq!(result.new_position, data.len() as u64);
2253        assert!(result.incomplete_line_buffer.is_empty());
2254    }
2255
2256    #[tokio::test]
2257    async fn test_process_file_content_existing_buffer_with_truncation() {
2258        // Create a scenario where existing buffer + new data creates a line that needs truncation
2259        let existing_buffer = "x".repeat(MAX_LINE_SIZE - 100);
2260        let data = format!("{}\nline2\n", "y".repeat(200)).into_bytes();
2261        let mut reader = create_mock_reader(data.clone());
2262
2263        let result = process_file_content(
2264            &mut reader,
2265            0,
2266            data.len() as u64,
2267            existing_buffer.into_bytes(),
2268            10,
2269        )
2270        .await
2271        .unwrap();
2272
2273        assert_eq!(result.lines.len(), 2);
2274
2275        // First line should be truncated (existing buffer + new data)
2276        assert_eq!(
2277            result.lines[0].len(),
2278            MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2279        );
2280        assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2281
2282        // Second line should be normal
2283        assert_eq!(result.lines[1], b"line2");
2284
2285        assert_eq!(result.new_position, data.len() as u64);
2286        assert!(result.incomplete_line_buffer.is_empty());
2287    }
2288
2289    #[tokio::test]
2290    async fn test_process_file_content_single_character_lines() {
2291        let data = b"a\nb\nc\n".to_vec();
2292        let mut reader = create_mock_reader(data.clone());
2293
2294        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2295            .await
2296            .unwrap();
2297
2298        assert_eq!(result.lines.len(), 3);
2299        assert_eq!(result.lines[0], b"a");
2300        assert_eq!(result.lines[1], b"b");
2301        assert_eq!(result.lines[2], b"c");
2302        assert_eq!(result.new_position, data.len() as u64);
2303        assert!(result.incomplete_line_buffer.is_empty());
2304    }
2305
2306    #[tokio::test]
2307    async fn test_process_file_content_binary_data() {
2308        let data = vec![0x00, 0x01, 0x02, b'\n', 0xFF, 0xFE, b'\n'];
2309        let mut reader = create_mock_reader(data.clone());
2310
2311        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2312            .await
2313            .unwrap();
2314
2315        assert_eq!(result.lines.len(), 2);
2316        assert_eq!(result.lines[0], vec![0x00, 0x01, 0x02]);
2317        assert_eq!(result.lines[1], vec![0xFF, 0xFE]);
2318        assert_eq!(result.new_position, data.len() as u64);
2319        assert!(result.incomplete_line_buffer.is_empty());
2320    }
2321
2322    #[tokio::test]
2323    async fn test_process_file_content_resume_after_max_buffer_size() {
2324        // Test data: 3 lines as specified in the example
2325        let data = b"line 1\nline 2\nline 3\n".to_vec();
2326        let mut reader = create_mock_reader(data.clone());
2327        let max_buffer_size = 2; // Limit to 2 lines per call
2328
2329        // First call: should return first 2 lines
2330        let result1 = process_file_content(
2331            &mut reader,
2332            0, // start from beginning
2333            data.len() as u64,
2334            Vec::new(), // no existing buffer
2335            max_buffer_size,
2336        )
2337        .await
2338        .unwrap();
2339
2340        // Verify first call results
2341        assert_eq!(result1.lines.len(), 2, "First call should return 2 lines");
2342        assert_eq!(result1.lines[0], b"line 1");
2343        assert_eq!(result1.lines[1], b"line 2");
2344        assert!(result1.incomplete_line_buffer.is_empty());
2345
2346        // The position should be after "line 1\nline 2\n" (14 bytes)
2347        let expected_position_after_first_call = b"line 1\nline 2\n".len() as u64;
2348        assert_eq!(result1.new_position, expected_position_after_first_call);
2349
2350        // Second call: resume from where first call left off
2351        let mut reader2 = create_mock_reader(data.clone());
2352        let result2 = process_file_content(
2353            &mut reader2,
2354            result1.new_position, // resume from previous position
2355            data.len() as u64,
2356            result1.incomplete_line_buffer, // pass any incomplete buffer (should be empty)
2357            max_buffer_size,
2358        )
2359        .await
2360        .unwrap();
2361
2362        // Verify second call results
2363        assert_eq!(result2.lines.len(), 1, "Second call should return 1 line");
2364        assert_eq!(result2.lines[0], b"line 3");
2365        assert!(result2.incomplete_line_buffer.is_empty());
2366        assert_eq!(result2.new_position, data.len() as u64);
2367    }
2368
2369    #[tokio::test]
2370    async fn test_utf8_truncation() {
2371        // Test that StreamFwder doesn't panic when truncating lines
2372        // with multi-byte chars.
2373
2374        hyperactor_telemetry::initialize_logging_for_test();
2375
2376        // Create a line longer than MAX_LINE_SIZE with an emoji at the boundary
2377        let mut long_line = "x".repeat(MAX_LINE_SIZE - 1);
2378        long_line.push('🦀'); // 4-byte emoji - truncation will land in the middle
2379        long_line.push('\n');
2380
2381        // Create IO streams
2382        let (mut writer, reader) = tokio::io::duplex(8192);
2383
2384        // Start StreamFwder
2385        let monitor = StreamFwder::start_with_writer(
2386            reader,
2387            Box::new(tokio::io::sink()), // discard output
2388            None,                        // no file monitor needed
2389            OutputTarget::Stdout,
2390            1,     // tail buffer of 1 (need at least one sink)
2391            None,  // no log channel
2392            12345, // pid
2393            None,  // no prefix
2394        );
2395
2396        // Write the problematic line
2397        writer.write_all(long_line.as_bytes()).await.unwrap();
2398        drop(writer); // Close to signal EOF
2399
2400        // Wait for completion - should NOT panic
2401        let (_lines, result) = monitor.abort().await;
2402        result.expect("Should complete without panic despite UTF-8 truncation");
2403    }
2404}