hyperactor_mesh/
logging.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::fmt;
12use std::path::Path;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::Duration;
16use std::time::SystemTime;
17
18use anyhow::Result;
19use async_trait::async_trait;
20use chrono::DateTime;
21use chrono::Local;
22use hostname;
23use hyperactor::Actor;
24use hyperactor::Bind;
25use hyperactor::Context;
26use hyperactor::HandleClient;
27use hyperactor::Handler;
28use hyperactor::Instance;
29use hyperactor::RefClient;
30use hyperactor::Unbind;
31use hyperactor::channel;
32use hyperactor::channel::ChannelAddr;
33use hyperactor::channel::ChannelRx;
34use hyperactor::channel::ChannelTransport;
35use hyperactor::channel::ChannelTx;
36use hyperactor::channel::Rx;
37use hyperactor::channel::Tx;
38use hyperactor::channel::TxStatus;
39use hyperactor::reference as hyperactor_reference;
40use hyperactor_config::CONFIG;
41use hyperactor_config::ConfigAttr;
42use hyperactor_config::Flattrs;
43use hyperactor_config::attrs::declare_attrs;
44use hyperactor_telemetry::env;
45use hyperactor_telemetry::log_file_path;
46use serde::Deserialize;
47use serde::Serialize;
48use tokio::io;
49use tokio::io::AsyncRead;
50use tokio::io::AsyncReadExt;
51use tokio::io::AsyncWriteExt;
52use tokio::sync::Mutex;
53use tokio::sync::Notify;
54use tokio::sync::RwLock;
55use tokio::sync::watch::Receiver;
56use tokio::task::JoinHandle;
57use tracing::Level;
58use typeuri::Named;
59
60use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
61use crate::shortuuid::ShortUuid;
62
63mod line_prefixing_writer;
64
65pub(crate) const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5;
66const MAX_LINE_SIZE: usize = 4 * 1024;
67
68declare_attrs! {
69    /// Maximum number of lines to batch before flushing to client
70    /// This means that stdout/err reader will be paused after reading `HYPERACTOR_READ_LOG_BUFFER` lines.
71    /// After pause lines will be flushed and reading will resume.
72    @meta(CONFIG = ConfigAttr::new(
73        Some("HYPERACTOR_READ_LOG_BUFFER".to_string()),
74        Some("read_log_buffer".to_string()),
75    ))
76    pub attr READ_LOG_BUFFER: usize = 100;
77
78    /// If enabled, local logs are also written to a file and aggregated
79    @meta(CONFIG = ConfigAttr::new(
80        Some("HYPERACTOR_FORCE_FILE_LOG".to_string()),
81        Some("force_file_log".to_string()),
82    ))
83    pub attr FORCE_FILE_LOG: bool = false;
84
85    /// Prefixes logs with rank
86    @meta(CONFIG = ConfigAttr::new(
87        Some("HYPERACTOR_PREFIX_WITH_RANK".to_string()),
88        Some("prefix_with_rank".to_string()),
89    ))
90    pub attr PREFIX_WITH_RANK: bool = true;
91}
92
93/// Calculate the Levenshtein distance between two strings
94fn levenshtein_distance(left: &str, right: &str) -> usize {
95    let left_chars: Vec<char> = left.chars().collect();
96    let right_chars: Vec<char> = right.chars().collect();
97
98    let left_len = left_chars.len();
99    let right_len = right_chars.len();
100
101    // Handle edge cases
102    if left_len == 0 {
103        return right_len;
104    }
105    if right_len == 0 {
106        return left_len;
107    }
108
109    // Create a matrix of size (len_s1+1) x (len_s2+1)
110    let mut matrix = vec![vec![0; right_len + 1]; left_len + 1];
111
112    // Initialize the first row and column
113    for (i, row) in matrix.iter_mut().enumerate().take(left_len + 1) {
114        row[0] = i;
115    }
116    for (j, cell) in matrix[0].iter_mut().enumerate().take(right_len + 1) {
117        *cell = j;
118    }
119
120    // Fill the matrix
121    for i in 1..=left_len {
122        for j in 1..=right_len {
123            let cost = if left_chars[i - 1] == right_chars[j - 1] {
124                0
125            } else {
126                1
127            };
128
129            matrix[i][j] = std::cmp::min(
130                std::cmp::min(
131                    matrix[i - 1][j] + 1, // deletion
132                    matrix[i][j - 1] + 1, // insertion
133                ),
134                matrix[i - 1][j - 1] + cost, // substitution
135            );
136        }
137    }
138
139    // Return the bottom-right cell
140    matrix[left_len][right_len]
141}
142
143/// Calculate the normalized edit distance between two strings (0.0 to 1.0)
144fn normalized_edit_distance(left: &str, right: &str) -> f64 {
145    let distance = levenshtein_distance(left, right) as f64;
146    let max_len = std::cmp::max(left.len(), right.len()) as f64;
147
148    if max_len == 0.0 {
149        0.0 // Both strings are empty, so they're identical
150    } else {
151        distance / max_len
152    }
153}
154
155#[derive(Debug, Clone)]
156/// LogLine represents a single log line with its content and count
157struct LogLine {
158    content: String,
159    pub count: u64,
160}
161
162impl LogLine {
163    fn new(content: String) -> Self {
164        Self { content, count: 1 }
165    }
166}
167
168impl fmt::Display for LogLine {
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        write!(
171            f,
172            "\x1b[33m[{} similar log lines]\x1b[0m {}",
173            self.count, self.content
174        )
175    }
176}
177
178#[derive(Debug, Clone)]
179/// Aggregator is a struct that holds a list of LogLines and a start time.
180/// It can aggregate new log lines to existing ones if they are "similar" based on edit distance.
181struct Aggregator {
182    lines: Vec<LogLine>,
183    start_time: SystemTime,
184    similarity_threshold: f64, // Threshold for considering two strings similar (0.0 to 1.0)
185}
186
187impl Aggregator {
188    fn new() -> Self {
189        // Default threshold: strings with normalized edit distance < 0.15 are considered similar
190        Self::new_with_threshold(0.15)
191    }
192
193    fn new_with_threshold(threshold: f64) -> Self {
194        Aggregator {
195            lines: vec![],
196            start_time: std::time::SystemTime::now(),
197            similarity_threshold: threshold,
198        }
199    }
200
201    fn reset(&mut self) {
202        self.lines.clear();
203        self.start_time = std::time::SystemTime::now();
204    }
205
206    fn add_line(&mut self, line: &str) -> anyhow::Result<()> {
207        // Find the most similar existing line
208        let mut best_match_idx = None;
209        let mut best_similarity = f64::MAX;
210
211        for (idx, existing_line) in self.lines.iter().enumerate() {
212            let distance = normalized_edit_distance(&existing_line.content, line);
213
214            // If this line is more similar than our current best match
215            if distance < best_similarity && distance < self.similarity_threshold {
216                best_match_idx = Some(idx);
217                best_similarity = distance;
218            }
219        }
220
221        // If we found a similar enough line, increment its count
222        if let Some(idx) = best_match_idx {
223            self.lines[idx].count += 1;
224        } else {
225            // Otherwise, add a new line
226            self.lines.push(LogLine::new(line.to_string()));
227        }
228
229        Ok(())
230    }
231
232    fn is_empty(&self) -> bool {
233        self.lines.is_empty()
234    }
235}
236
237// Helper function to format SystemTime
238fn format_system_time(time: SystemTime) -> String {
239    let datetime: DateTime<Local> = time.into();
240    datetime.format("%Y-%m-%d %H:%M:%S").to_string()
241}
242
243impl fmt::Display for Aggregator {
244    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245        // Format the start time
246        let start_time_str = format_system_time(self.start_time);
247
248        // Get and format the current time
249        let current_time = std::time::SystemTime::now();
250        let end_time_str = format_system_time(current_time);
251
252        // Write the header with formatted time window
253        writeln!(
254            f,
255            "\x1b[36m>>> Aggregated Logs ({}) >>>\x1b[0m",
256            start_time_str
257        )?;
258
259        // Write each log line
260        for line in self.lines.iter() {
261            writeln!(f, "{}", line)?;
262        }
263        writeln!(
264            f,
265            "\x1b[36m<<< Aggregated Logs ({}) <<<\x1b[0m",
266            end_time_str
267        )?;
268        Ok(())
269    }
270}
271
272/// Messages that can be sent to the LogClientActor remotely.
273#[derive(
274    Debug,
275    Clone,
276    Serialize,
277    Deserialize,
278    Named,
279    Handler,
280    HandleClient,
281    RefClient
282)]
283pub enum LogMessage {
284    /// Log details
285    Log {
286        /// The hostname of the process that generated the log
287        hostname: String,
288        /// String representation of the ProcId that generated the log
289        proc_id: String,
290        /// The target output stream (stdout or stderr)
291        output_target: OutputTarget,
292        /// The log payload as bytes
293        payload: wirevalue::Any,
294    },
295
296    /// Flush the log
297    Flush {
298        /// Indicate if the current flush is synced or non-synced.
299        /// If synced, a version number is available. Otherwise, none.
300        sync_version: Option<u64>,
301    },
302}
303
304/// Messages that can be sent to the LogClient locally.
305#[derive(
306    Debug,
307    Clone,
308    Serialize,
309    Deserialize,
310    Named,
311    Handler,
312    HandleClient,
313    RefClient
314)]
315pub enum LogClientMessage {
316    SetAggregate {
317        /// The time window in seconds to aggregate logs. If None, aggregation is disabled.
318        aggregate_window_sec: Option<u64>,
319    },
320
321    /// Synchronously flush all the logs from all the procs. This is for client to call.
322    StartSyncFlush {
323        /// Expect these many procs to ack the flush message.
324        expected_procs: usize,
325        /// Return once we have received the acks from all the procs
326        reply: hyperactor_reference::OncePortRef<()>,
327        /// Return to the caller the current flush version
328        version: hyperactor_reference::OncePortRef<u64>,
329    },
330}
331
332/// Trait for sending logs
333#[async_trait]
334pub trait LogSender: Send + Sync {
335    /// Send a log payload in bytes
336    fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()>;
337
338    /// Flush the log channel, ensuring all messages are delivered
339    /// Returns when the flush message has been acknowledged
340    fn flush(&mut self) -> anyhow::Result<()>;
341}
342
343/// Represents the target output stream (stdout or stderr)
344#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
345pub enum OutputTarget {
346    /// Standard output stream
347    Stdout,
348    /// Standard error stream
349    Stderr,
350}
351
352#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
353pub enum Stream {
354    /// Standard output stream
355    ChildStdout,
356    /// Standard error stream
357    ChildStderr,
358}
359
360/// Write the log to a local unix channel so some actors can listen to it and stream the log back.
361pub struct LocalLogSender {
362    hostname: String,
363    proc_id: String,
364    tx: ChannelTx<LogMessage>,
365    status: Receiver<TxStatus>,
366}
367
368impl LocalLogSender {
369    fn new(
370        log_channel: ChannelAddr,
371        proc_id: &hyperactor_reference::ProcId,
372    ) -> Result<Self, anyhow::Error> {
373        let tx = channel::dial::<LogMessage>(log_channel)?;
374        let status = tx.status().clone();
375
376        let hostname = hostname::get()
377            .unwrap_or_else(|_| "unknown_host".into())
378            .into_string()
379            .unwrap_or("unknown_host".to_string());
380        Ok(Self {
381            hostname,
382            proc_id: proc_id.to_string(),
383            tx,
384            status,
385        })
386    }
387}
388
389#[async_trait]
390impl LogSender for LocalLogSender {
391    fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()> {
392        if TxStatus::Active == *self.status.borrow() {
393            // Do not use tx.send, it will block the allocator as the child process state is unknown.
394            self.tx.post(LogMessage::Log {
395                hostname: self.hostname.clone(),
396                proc_id: self.proc_id.clone(),
397                output_target: target,
398                payload: wirevalue::Any::serialize(&payload)?,
399            });
400        }
401
402        Ok(())
403    }
404
405    fn flush(&mut self) -> anyhow::Result<()> {
406        // send will make sure message is delivered
407        if TxStatus::Active == *self.status.borrow() {
408            // Do not use tx.send, it will block the allocator as the child process state is unknown.
409            self.tx.post(LogMessage::Flush { sync_version: None });
410        }
411        Ok(())
412    }
413}
414
415/// Message sent to FileMonitor
416#[derive(Debug, Clone, Serialize, Deserialize, Named)]
417pub struct FileMonitorMessage {
418    lines: Vec<String>,
419}
420wirevalue::register_type!(FileMonitorMessage);
421
422/// File appender, coordinates write access to a file via a channel.
423pub struct FileAppender {
424    stdout_addr: ChannelAddr,
425    stderr_addr: ChannelAddr,
426    #[allow(dead_code)] // Tasks are self terminating
427    stdout_task: JoinHandle<()>,
428    #[allow(dead_code)]
429    stderr_task: JoinHandle<()>,
430    stop: Arc<Notify>,
431}
432
433impl fmt::Debug for FileAppender {
434    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
435        f.debug_struct("FileMonitor")
436            .field("stdout_addr", &self.stdout_addr)
437            .field("stderr_addr", &self.stderr_addr)
438            .finish()
439    }
440}
441
442impl FileAppender {
443    /// Create a new FileAppender with aggregated log files for stdout and stderr
444    /// Returns None if file creation fails
445    pub fn new() -> Option<Self> {
446        let stop = Arc::new(Notify::new());
447        // TODO make it configurable
448        let file_name_tag = hostname::get()
449            .unwrap_or_else(|_| "unknown_host".into())
450            .into_string()
451            .unwrap_or("unknown_host".to_string());
452
453        // Create stdout file and task
454        let (stdout_path, stdout_writer) =
455            match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stdout) {
456                Some(writer) => writer,
457                None => {
458                    tracing::warn!("failed to create stdout file");
459                    return None;
460                }
461            };
462        let (stdout_addr, stdout_rx) = {
463            let _guard = tracing::span!(Level::INFO, "appender", file = "stdout").entered();
464            match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
465                Ok((addr, rx)) => (addr, rx),
466                Err(e) => {
467                    tracing::warn!("failed to serve stdout channel: {}", e);
468                    return None;
469                }
470            }
471        };
472        let stdout_stop = stop.clone();
473        let stdout_task = tokio::spawn(file_monitor_task(
474            stdout_rx,
475            stdout_writer,
476            OutputTarget::Stdout,
477            stdout_stop,
478        ));
479
480        // Create stderr file and task
481        let (stderr_path, stderr_writer) =
482            match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stderr) {
483                Some(writer) => writer,
484                None => {
485                    tracing::warn!("failed to create stderr file");
486                    return None;
487                }
488            };
489        let (stderr_addr, stderr_rx) = {
490            let _guard = tracing::span!(Level::INFO, "appender", file = "stderr").entered();
491            match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
492                Ok((addr, rx)) => (addr, rx),
493                Err(e) => {
494                    tracing::warn!("failed to serve stderr channel: {}", e);
495                    return None;
496                }
497            }
498        };
499        let stderr_stop = stop.clone();
500        let stderr_task = tokio::spawn(file_monitor_task(
501            stderr_rx,
502            stderr_writer,
503            OutputTarget::Stderr,
504            stderr_stop,
505        ));
506
507        tracing::debug!(
508            "FileAppender: created for stdout {} stderr {} ",
509            stdout_path.display(),
510            stderr_path.display()
511        );
512
513        Some(Self {
514            stdout_addr,
515            stderr_addr,
516            stdout_task,
517            stderr_task,
518            stop,
519        })
520    }
521
522    /// Get a channel address for the specified output target
523    pub fn addr_for(&self, target: OutputTarget) -> ChannelAddr {
524        match target {
525            OutputTarget::Stdout => self.stdout_addr.clone(),
526            OutputTarget::Stderr => self.stderr_addr.clone(),
527        }
528    }
529}
530
531impl Drop for FileAppender {
532    fn drop(&mut self) {
533        // Trigger stop signal to notify tasks to exit
534        self.stop.notify_waiters();
535        tracing::debug!("FileMonitor: dropping, stop signal sent, tasks will flush and exit");
536    }
537}
538
539/// Task that receives lines from StreamFwds and writes them to the aggregated file
540async fn file_monitor_task(
541    mut rx: ChannelRx<FileMonitorMessage>,
542    mut writer: Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
543    target: OutputTarget,
544    stop: Arc<Notify>,
545) {
546    loop {
547        tokio::select! {
548            msg = rx.recv() => {
549                match msg {
550                    Ok(msg) => {
551                        // Write lines to aggregated file
552                        for line in &msg.lines {
553                            if let Err(e) = writer.write_all(line.as_bytes()).await {
554                                tracing::warn!("FileMonitor: failed to write line to file: {}", e);
555                                continue;
556                            }
557                            if let Err(e) = writer.write_all(b"\n").await {
558                                tracing::warn!("FileMonitor: failed to write newline to file: {}", e);
559                            }
560                        }
561                        if let Err(e) = writer.flush().await {
562                            tracing::warn!("FileMonitor: failed to flush file: {}", e);
563                        }
564                    }
565                    Err(e) => {
566                        // Channel error
567                        tracing::debug!("FileMonitor task for {:?}: channel error: {}", target, e);
568                        break;
569                    }
570                }
571            }
572            _ = stop.notified() => {
573                tracing::debug!("FileMonitor task for {:?}: stop signal received", target);
574                break;
575            }
576        }
577    }
578
579    // Graceful shutdown: flush one last time
580    if let Err(e) = writer.flush().await {
581        tracing::warn!("FileMonitor: failed final flush: {}", e);
582    }
583    tracing::debug!("FileMonitor task for {:?} exiting", target);
584}
585
586fn create_unique_file_writer(
587    file_name_tag: &str,
588    output_target: OutputTarget,
589    env: env::Env,
590) -> Result<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
591    let suffix = match output_target {
592        OutputTarget::Stderr => "stderr",
593        OutputTarget::Stdout => "stdout",
594    };
595    let (path, filename) = log_file_path(env, None)?;
596    let path = Path::new(&path);
597    let mut full_path = PathBuf::from(path);
598
599    let uuid = ShortUuid::generate();
600
601    full_path.push(format!(
602        "{}_{}_{}.{}",
603        filename, file_name_tag, uuid, suffix
604    ));
605    let file = std::fs::OpenOptions::new()
606        .create(true)
607        .append(true)
608        .open(full_path.clone())?;
609    let tokio_file = tokio::fs::File::from_std(file);
610    // TODO: should we buffer this?
611    Ok((full_path, Box::new(tokio_file)))
612}
613
614fn get_unique_local_log_destination(
615    file_name_tag: &str,
616    output_target: OutputTarget,
617) -> Option<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
618    let env: env::Env = env::Env::current();
619    if env == env::Env::Local && !hyperactor_config::global::get(FORCE_FILE_LOG) {
620        tracing::debug!("not creating log file because of env type");
621        None
622    } else {
623        match create_unique_file_writer(file_name_tag, output_target, env) {
624            Ok((a, b)) => Some((a, b)),
625            Err(e) => {
626                tracing::warn!("failed to create unique file writer: {}", e);
627                None
628            }
629        }
630    }
631}
632
633/// Create a writer for stdout or stderr
634fn std_writer(target: OutputTarget) -> Box<dyn io::AsyncWrite + Send + Unpin> {
635    // Return the appropriate standard output or error writer
636    match target {
637        OutputTarget::Stdout => Box::new(tokio::io::stdout()),
638        OutputTarget::Stderr => Box::new(tokio::io::stderr()),
639    }
640}
641
642/// Copy bytes from `reader` to `writer`, forward to log_sender, and forward to FileMonitor.
643/// The same formatted lines go to both log_sender and file_monitor.
644async fn tee(
645    mut reader: impl AsyncRead + Unpin + Send + 'static,
646    mut std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
647    log_sender: Option<Box<dyn LogSender + Send>>,
648    file_monitor_addr: Option<ChannelAddr>,
649    target: OutputTarget,
650    prefix: Option<String>,
651    stop: Arc<Notify>,
652    recent_lines_buf: RotatingLineBuffer,
653) -> Result<(), io::Error> {
654    let mut buf = [0u8; 8192];
655    let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE);
656    let mut log_sender = log_sender;
657
658    // Dial the file monitor channel if provided
659    let mut file_monitor_tx: Option<ChannelTx<FileMonitorMessage>> =
660        file_monitor_addr.and_then(|addr| match channel::dial(addr.clone()) {
661            Ok(tx) => Some(tx),
662            Err(e) => {
663                tracing::warn!("Failed to dial file monitor channel {}: {}", addr, e);
664                None
665            }
666        });
667
668    loop {
669        tokio::select! {
670            read_result = reader.read(&mut buf) => {
671                match read_result {
672                    Ok(n) => {
673                        if n == 0 {
674                            // EOF reached
675                            tracing::debug!("EOF reached in tee");
676                            break;
677                        }
678
679                        // Write to console
680                        if let Err(e) = std_writer.write_all(&buf[..n]).await {
681                            tracing::warn!("error writing to std: {}", e);
682                        }
683
684                        // Process bytes into lines for log_sender and FileMonitor
685                        let mut completed_lines = Vec::new();
686
687                        for &byte in &buf[..n] {
688                            if byte == b'\n' {
689                                // Complete line found
690                                let mut line = String::from_utf8_lossy(&line_buffer).to_string();
691
692                                // Truncate if too long, respecting UTF-8 boundaries
693                                // (multi-byte chars like emojis can be up to 4 bytes)
694                                if line.len() > MAX_LINE_SIZE {
695                                    let mut truncate_at = MAX_LINE_SIZE;
696                                    while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
697                                        truncate_at -= 1;
698                                    }
699                                    line.truncate(truncate_at);
700                                    line.push_str("... [TRUNCATED]");
701                                }
702
703                                // Prepend with prefix if configured
704                                let final_line = if let Some(ref p) = prefix {
705                                    format!("[{}] {}", p, line)
706                                } else {
707                                    line
708                                };
709
710                                completed_lines.push(final_line);
711                                line_buffer.clear();
712                            } else {
713                                line_buffer.push(byte);
714                            }
715                        }
716
717                        // Send completed lines to both log_sender and FileAppender
718                        if !completed_lines.is_empty() {
719                            if let Some(ref mut sender) = log_sender {
720                                let bytes: Vec<Vec<u8>> = completed_lines.iter()
721                                    .map(|s| s.as_bytes().to_vec())
722                                    .collect();
723                                if let Err(e) = sender.send(target, bytes) {
724                                    tracing::warn!("error sending to log_sender: {}", e);
725                                }
726                            }
727
728                            // Send to FileMonitor via hyperactor channel
729                            if let Some(ref mut tx) = file_monitor_tx {
730                                let msg = FileMonitorMessage {
731                                    lines: completed_lines,
732                                };
733                                // Use post() to avoid blocking
734                                tx.post(msg);
735                            }
736                        }
737
738                        recent_lines_buf.try_add_data(&buf, n);
739                    },
740                    Err(e) => {
741                        tracing::debug!("read error in tee: {}", e);
742                        return Err(e);
743                    }
744                }
745            },
746            _ = stop.notified() => {
747                tracing::debug!("stop signal received in tee");
748                break;
749            }
750        }
751    }
752
753    std_writer.flush().await?;
754
755    // Send any remaining partial line
756    if !line_buffer.is_empty() {
757        let mut line = String::from_utf8_lossy(&line_buffer).to_string();
758        // Truncate if too long, respecting UTF-8 boundaries
759        // (multi-byte chars like emojis can be up to 4 bytes)
760        if line.len() > MAX_LINE_SIZE {
761            let mut truncate_at = MAX_LINE_SIZE;
762            while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
763                truncate_at -= 1;
764            }
765            line.truncate(truncate_at);
766            line.push_str("... [TRUNCATED]");
767        }
768        let final_line = if let Some(ref p) = prefix {
769            format!("[{}] {}", p, line)
770        } else {
771            line
772        };
773
774        let final_lines = vec![final_line];
775
776        // Send to log_sender
777        if let Some(ref mut sender) = log_sender {
778            let bytes: Vec<Vec<u8>> = final_lines.iter().map(|s| s.as_bytes().to_vec()).collect();
779            let _ = sender.send(target, bytes);
780        }
781
782        // Send to FileMonitor
783        if let Some(ref mut tx) = file_monitor_tx {
784            let msg = FileMonitorMessage { lines: final_lines };
785            tx.post(msg);
786        }
787    }
788
789    // Flush log_sender
790    if let Some(ref mut sender) = log_sender {
791        let _ = sender.flush();
792    }
793
794    Ok(())
795}
796
797#[derive(Debug, Clone)]
798struct RotatingLineBuffer {
799    recent_lines: Arc<RwLock<VecDeque<String>>>,
800    max_buffer_size: usize,
801}
802
803impl RotatingLineBuffer {
804    fn try_add_data(&self, buf: &[u8], buf_end: usize) {
805        let data_str = String::from_utf8_lossy(&buf[..buf_end]);
806        let lines: Vec<&str> = data_str.lines().collect();
807
808        if let Ok(mut recent_lines_guard) = self.recent_lines.try_write() {
809            for line in lines {
810                if !line.is_empty() {
811                    recent_lines_guard.push_back(line.to_string());
812                    if recent_lines_guard.len() > self.max_buffer_size {
813                        recent_lines_guard.pop_front();
814                    }
815                }
816            }
817        } else {
818            tracing::debug!("Failed to acquire write lock on recent_lines buffer in tee");
819        }
820    }
821
822    async fn peek(&self) -> Vec<String> {
823        let lines = self.recent_lines.read().await;
824        let start_idx = if lines.len() > self.max_buffer_size {
825            lines.len() - self.max_buffer_size
826        } else {
827            0
828        };
829
830        lines.range(start_idx..).cloned().collect()
831    }
832}
833
834/// Given a stream forwards data to the provided channel.
835pub struct StreamFwder {
836    teer: JoinHandle<Result<(), io::Error>>,
837    // Shared buffer for peek functionality
838    recent_lines_buf: RotatingLineBuffer,
839    // Shutdown signal to stop the monitoring loop
840    stop: Arc<Notify>,
841}
842
843impl StreamFwder {
844    /// Create a new StreamFwder instance, and start monitoring the provided path.
845    /// Once started Monitor will
846    /// - forward logs to log_sender
847    /// - forward logs to file_monitor (if available)
848    /// - pipe reader to target
849    /// - And capture last `max_buffer_size` which can be used to inspect file contents via `peek`.
850    pub fn start(
851        reader: impl AsyncRead + Unpin + Send + 'static,
852        file_monitor_addr: Option<ChannelAddr>,
853        target: OutputTarget,
854        max_buffer_size: usize,
855        log_channel: Option<ChannelAddr>,
856        proc_id: &hyperactor_reference::ProcId,
857        local_rank: usize,
858    ) -> Self {
859        let prefix = match hyperactor_config::global::get(PREFIX_WITH_RANK) {
860            true => Some(local_rank.to_string()),
861            false => None,
862        };
863        let std_writer = std_writer(target);
864
865        Self::start_with_writer(
866            reader,
867            std_writer,
868            file_monitor_addr,
869            target,
870            max_buffer_size,
871            log_channel,
872            proc_id,
873            prefix,
874        )
875    }
876
877    /// Create a new StreamFwder instance with a custom writer (used in tests).
878    fn start_with_writer(
879        reader: impl AsyncRead + Unpin + Send + 'static,
880        std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
881        file_monitor_addr: Option<ChannelAddr>,
882        target: OutputTarget,
883        max_buffer_size: usize,
884        log_channel: Option<ChannelAddr>,
885        proc_id: &hyperactor_reference::ProcId,
886        prefix: Option<String>,
887    ) -> Self {
888        // Sanity: when there is no file sink, no log forwarding, and
889        // `tail_size == 0`, the child should have **inherited** stdio
890        // and no `StreamFwder` should exist. In that case console
891        // mirroring happens via inheritance, not via `StreamFwder`.
892        // If we hit this, we piped unnecessarily.
893        debug_assert!(
894            file_monitor_addr.is_some() || max_buffer_size > 0 || log_channel.is_some(),
895            "StreamFwder started with no sinks and no tail"
896        );
897
898        let stop = Arc::new(Notify::new());
899        let recent_lines_buf = RotatingLineBuffer {
900            recent_lines: Arc::new(RwLock::new(VecDeque::<String>::with_capacity(
901                max_buffer_size,
902            ))),
903            max_buffer_size,
904        };
905
906        let log_sender: Option<Box<dyn LogSender + Send>> = if let Some(addr) = log_channel {
907            match LocalLogSender::new(addr, proc_id) {
908                Ok(s) => Some(Box::new(s) as Box<dyn LogSender + Send>),
909                Err(e) => {
910                    tracing::error!("failed to create log sender: {}", e);
911                    None
912                }
913            }
914        } else {
915            None
916        };
917
918        let teer_stop = stop.clone();
919        let recent_line_buf_clone = recent_lines_buf.clone();
920        let teer = tokio::spawn(async move {
921            tee(
922                reader,
923                std_writer,
924                log_sender,
925                file_monitor_addr,
926                target,
927                prefix,
928                teer_stop,
929                recent_line_buf_clone,
930            )
931            .await
932        });
933
934        StreamFwder {
935            teer,
936            recent_lines_buf,
937            stop,
938        }
939    }
940
941    pub async fn abort(self) -> (Vec<String>, Result<(), anyhow::Error>) {
942        self.stop.notify_waiters();
943
944        let lines = self.peek().await;
945        let teer_result = self.teer.await;
946
947        let result: Result<(), anyhow::Error> = match teer_result {
948            Ok(inner) => inner.map_err(anyhow::Error::from),
949            Err(e) => Err(e.into()),
950        };
951
952        (lines, result)
953    }
954
955    /// Inspect the latest `max_buffer` lines read from the file being monitored
956    /// Returns lines in chronological order (oldest first)
957    pub async fn peek(&self) -> Vec<String> {
958        self.recent_lines_buf.peek().await
959    }
960}
961
962/// Messages that can be sent to the LogForwarder
963#[derive(
964    Debug,
965    Clone,
966    Serialize,
967    Deserialize,
968    Named,
969    Handler,
970    HandleClient,
971    RefClient,
972    Bind,
973    Unbind
974)]
975pub enum LogForwardMessage {
976    /// Receive the log from the parent process and forward it to the client.
977    Forward {},
978
979    /// If to stream the log back to the client.
980    SetMode { stream_to_client: bool },
981
982    /// Flush the log with a version number.
983    ForceSyncFlush { version: u64 },
984}
985
986/// A log forwarder that receives the log from its parent process and forward it back to the client
987#[hyperactor::export(
988    spawn = true,
989    handlers = [LogForwardMessage {cast = true}],
990)]
991pub struct LogForwardActor {
992    rx: ChannelRx<LogMessage>,
993    flush_tx: Arc<Mutex<ChannelTx<LogMessage>>>,
994    next_flush_deadline: SystemTime,
995    logging_client_ref: hyperactor_reference::ActorRef<LogClientActor>,
996    stream_to_client: bool,
997}
998
999#[async_trait]
1000impl Actor for LogForwardActor {
1001    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1002        this.set_system();
1003        this.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1004
1005        // Make sure we start the flush loop periodically so the log channel will not deadlock.
1006        self.flush_tx
1007            .lock()
1008            .await
1009            .send(LogMessage::Flush { sync_version: None })
1010            .await?;
1011        Ok(())
1012    }
1013}
1014
1015#[async_trait]
1016impl hyperactor::RemoteSpawn for LogForwardActor {
1017    type Params = hyperactor_reference::ActorRef<LogClientActor>;
1018
1019    async fn new(logging_client_ref: Self::Params, _environment: Flattrs) -> Result<Self> {
1020        let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
1021            Ok(channel) => channel.parse()?,
1022            Err(err) => {
1023                tracing::debug!(
1024                    "log forwarder actor failed to read env var {}: {}",
1025                    BOOTSTRAP_LOG_CHANNEL,
1026                    err
1027                );
1028                // TODO: an empty channel to serve
1029                ChannelAddr::any(ChannelTransport::Unix)
1030            }
1031        };
1032        tracing::info!(
1033            "log forwarder {} serve at {}",
1034            std::process::id(),
1035            log_channel
1036        );
1037
1038        let rx = match channel::serve(log_channel.clone()) {
1039            Ok((_, rx)) => rx,
1040            Err(err) => {
1041                // This can happen if we are not spanwed on a separate process like local.
1042                // For local mesh, log streaming anyway is not needed.
1043                tracing::error!(
1044                    "log forwarder actor failed to bootstrap on given channel {}: {}",
1045                    log_channel,
1046                    err
1047                );
1048                channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1
1049            }
1050        };
1051
1052        // Dial the same channel to send flush message to drain the log queue.
1053        let flush_tx = Arc::new(Mutex::new(channel::dial::<LogMessage>(log_channel)?));
1054        let now = std::time::SystemTime::now();
1055
1056        Ok(Self {
1057            rx,
1058            flush_tx,
1059            next_flush_deadline: now,
1060            logging_client_ref,
1061            stream_to_client: true,
1062        })
1063    }
1064}
1065
1066#[async_trait]
1067#[hyperactor::handle(LogForwardMessage)]
1068impl LogForwardMessageHandler for LogForwardActor {
1069    async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
1070        match self.rx.recv().await {
1071            Ok(LogMessage::Flush { sync_version }) => {
1072                let now = std::time::SystemTime::now();
1073                match sync_version {
1074                    None => {
1075                        // Schedule another flush to keep the log channel from deadlocking.
1076                        let delay = Duration::from_secs(1);
1077                        if now >= self.next_flush_deadline {
1078                            self.next_flush_deadline = now + delay;
1079                            let flush_tx = self.flush_tx.clone();
1080                            tokio::spawn(async move {
1081                                tokio::time::sleep(delay).await;
1082                                if let Err(e) = flush_tx
1083                                    .lock()
1084                                    .await
1085                                    .send(LogMessage::Flush { sync_version: None })
1086                                    .await
1087                                {
1088                                    tracing::error!("failed to send flush message: {}", e);
1089                                }
1090                            });
1091                        }
1092                    }
1093                    version => {
1094                        self.logging_client_ref.flush(ctx, version).await?;
1095                    }
1096                }
1097            }
1098            Ok(LogMessage::Log {
1099                hostname,
1100                proc_id,
1101                output_target,
1102                payload,
1103            }) => {
1104                if self.stream_to_client {
1105                    self.logging_client_ref
1106                        .log(ctx, hostname, proc_id, output_target, payload)
1107                        .await?;
1108                }
1109            }
1110            Err(e) => {
1111                return Err(e.into());
1112            }
1113        }
1114
1115        // This is not ideal as we are using raw tx/rx.
1116        ctx.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1117
1118        Ok(())
1119    }
1120
1121    async fn set_mode(
1122        &mut self,
1123        _ctx: &Context<Self>,
1124        stream_to_client: bool,
1125    ) -> Result<(), anyhow::Error> {
1126        self.stream_to_client = stream_to_client;
1127        Ok(())
1128    }
1129
1130    async fn force_sync_flush(
1131        &mut self,
1132        _cx: &Context<Self>,
1133        version: u64,
1134    ) -> Result<(), anyhow::Error> {
1135        self.flush_tx
1136            .lock()
1137            .await
1138            .send(LogMessage::Flush {
1139                sync_version: Some(version),
1140            })
1141            .await
1142            .map_err(anyhow::Error::from)
1143    }
1144}
1145
1146/// Deserialize a serialized message and split it into UTF-8 lines
1147fn deserialize_message_lines(serialized_message: &wirevalue::Any) -> Result<Vec<Vec<String>>> {
1148    // Try to deserialize as Vec<Vec<u8>> first (multiple byte arrays)
1149    if let Ok(message_bytes) = serialized_message.deserialized::<Vec<Vec<u8>>>() {
1150        let mut result = Vec::new();
1151        for bytes in message_bytes {
1152            let message_str = String::from_utf8(bytes)?;
1153            let lines: Vec<String> = message_str.lines().map(|s| s.to_string()).collect();
1154            result.push(lines);
1155        }
1156        return Ok(result);
1157    }
1158
1159    // If that fails, try to deserialize as String and wrap in Vec<Vec<String>>
1160    if let Ok(message) = serialized_message.deserialized::<String>() {
1161        let lines: Vec<String> = message.lines().map(|s| s.to_string()).collect();
1162        return Ok(vec![lines]);
1163    }
1164
1165    // If both fail, return an error
1166    anyhow::bail!("failed to deserialize message as either Vec<Vec<u8>> or String")
1167}
1168
1169/// A client to receive logs from remote processes
1170#[derive(Debug)]
1171#[hyperactor::export(
1172    spawn = true,
1173    handlers = [LogMessage, LogClientMessage],
1174)]
1175pub struct LogClientActor {
1176    aggregate_window_sec: Option<u64>,
1177    aggregators: HashMap<OutputTarget, Aggregator>,
1178    last_flush_time: SystemTime,
1179    next_flush_deadline: Option<SystemTime>,
1180
1181    // For flush sync barrier
1182    current_flush_version: u64,
1183    current_flush_port: Option<hyperactor_reference::OncePortRef<()>>,
1184    current_unflushed_procs: usize,
1185}
1186
1187impl Default for LogClientActor {
1188    fn default() -> Self {
1189        // Initialize aggregators
1190        let mut aggregators = HashMap::new();
1191        aggregators.insert(OutputTarget::Stderr, Aggregator::new());
1192        aggregators.insert(OutputTarget::Stdout, Aggregator::new());
1193
1194        Self {
1195            aggregate_window_sec: Some(DEFAULT_AGGREGATE_WINDOW_SEC),
1196            aggregators,
1197            last_flush_time: std::time::SystemTime::now(),
1198            next_flush_deadline: None,
1199            current_flush_version: 0,
1200            current_flush_port: None,
1201            current_unflushed_procs: 0,
1202        }
1203    }
1204}
1205
1206#[async_trait]
1207impl Actor for LogClientActor {
1208    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1209        this.set_system();
1210        Ok(())
1211    }
1212}
1213
1214impl LogClientActor {
1215    fn print_aggregators(&mut self) {
1216        for (output_target, aggregator) in self.aggregators.iter_mut() {
1217            if aggregator.is_empty() {
1218                continue;
1219            }
1220            match output_target {
1221                OutputTarget::Stdout => {
1222                    println!("{}", aggregator);
1223                }
1224                OutputTarget::Stderr => {
1225                    eprintln!("{}", aggregator);
1226                }
1227            }
1228
1229            // Reset the aggregator
1230            aggregator.reset();
1231        }
1232    }
1233
1234    fn print_log_line(hostname: &str, proc_id: &str, output_target: OutputTarget, line: String) {
1235        let message = format!("[{} {}] {}", hostname, proc_id, line);
1236
1237        #[cfg(test)]
1238        crate::logging::test_tap::push(&message);
1239
1240        match output_target {
1241            OutputTarget::Stdout => println!("{}", message),
1242            OutputTarget::Stderr => eprintln!("{}", message),
1243        }
1244    }
1245
1246    fn flush_internal(&mut self) {
1247        self.print_aggregators();
1248        self.last_flush_time = std::time::SystemTime::now();
1249        self.next_flush_deadline = None;
1250    }
1251}
1252
1253impl Drop for LogClientActor {
1254    fn drop(&mut self) {
1255        // Flush the remaining logs before shutting down
1256        self.print_aggregators();
1257    }
1258}
1259
1260#[async_trait]
1261#[hyperactor::handle(LogMessage)]
1262impl LogMessageHandler for LogClientActor {
1263    async fn log(
1264        &mut self,
1265        cx: &Context<Self>,
1266        hostname: String,
1267        proc_id: String,
1268        output_target: OutputTarget,
1269        payload: wirevalue::Any,
1270    ) -> Result<(), anyhow::Error> {
1271        // Deserialize the message and process line by line with UTF-8
1272        let message_line_groups = deserialize_message_lines(&payload)?;
1273        let hostname = hostname.as_str();
1274
1275        let message_lines: Vec<String> = message_line_groups.into_iter().flatten().collect();
1276        match self.aggregate_window_sec {
1277            None => {
1278                for line in message_lines {
1279                    Self::print_log_line(hostname, &proc_id, output_target, line);
1280                }
1281                self.last_flush_time = std::time::SystemTime::now();
1282            }
1283            Some(window) => {
1284                for line in message_lines {
1285                    if let Some(aggregator) = self.aggregators.get_mut(&output_target) {
1286                        if let Err(e) = aggregator.add_line(&line) {
1287                            tracing::error!("error adding log line: {}", e);
1288                            // For the sake of completeness, flush the log lines.
1289                            Self::print_log_line(hostname, &proc_id, output_target, line);
1290                        }
1291                    } else {
1292                        tracing::error!("unknown output target: {:?}", output_target);
1293                        // For the sake of completeness, flush the log lines.
1294                        Self::print_log_line(hostname, &proc_id, output_target, line);
1295                    }
1296                }
1297
1298                let new_deadline = self.last_flush_time + Duration::from_secs(window);
1299                let now = std::time::SystemTime::now();
1300                if new_deadline <= now {
1301                    self.flush_internal();
1302                } else {
1303                    let delay = new_deadline.duration_since(now)?;
1304                    match self.next_flush_deadline {
1305                        None => {
1306                            self.next_flush_deadline = Some(new_deadline);
1307                            cx.self_message_with_delay(
1308                                LogMessage::Flush { sync_version: None },
1309                                delay,
1310                            )?;
1311                        }
1312                        Some(deadline) => {
1313                            // Some early log lines have alrady triggered the flush.
1314                            if new_deadline < deadline {
1315                                // This can happen if the user has adjusted the aggregation window.
1316                                self.next_flush_deadline = Some(new_deadline);
1317                                cx.self_message_with_delay(
1318                                    LogMessage::Flush { sync_version: None },
1319                                    delay,
1320                                )?;
1321                            }
1322                        }
1323                    }
1324                }
1325            }
1326        }
1327
1328        Ok(())
1329    }
1330
1331    async fn flush(
1332        &mut self,
1333        cx: &Context<Self>,
1334        sync_version: Option<u64>,
1335    ) -> Result<(), anyhow::Error> {
1336        match sync_version {
1337            None => {
1338                self.flush_internal();
1339            }
1340            Some(version) => {
1341                if version != self.current_flush_version {
1342                    tracing::error!(
1343                        "found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
1344                        version,
1345                        self.current_flush_version
1346                    );
1347                    return Ok(());
1348                }
1349
1350                if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
1351                    // This is a serious issue; it's better to error out.
1352                    anyhow::bail!("found no ongoing flush request");
1353                }
1354                self.current_unflushed_procs -= 1;
1355
1356                tracing::debug!(
1357                    "ack sync flush: version {}; remaining procs: {}",
1358                    self.current_flush_version,
1359                    self.current_unflushed_procs
1360                );
1361
1362                if self.current_unflushed_procs == 0 {
1363                    self.flush_internal();
1364                    let reply = self.current_flush_port.take().unwrap();
1365                    self.current_flush_port = None;
1366                    reply.send(cx, ()).map_err(anyhow::Error::from)?;
1367                }
1368            }
1369        }
1370
1371        Ok(())
1372    }
1373}
1374
1375#[async_trait]
1376#[hyperactor::handle(LogClientMessage)]
1377impl LogClientMessageHandler for LogClientActor {
1378    async fn set_aggregate(
1379        &mut self,
1380        _cx: &Context<Self>,
1381        aggregate_window_sec: Option<u64>,
1382    ) -> Result<(), anyhow::Error> {
1383        if self.aggregate_window_sec.is_some() && aggregate_window_sec.is_none() {
1384            // Make sure we flush whatever in the aggregators before disabling aggregation.
1385            self.print_aggregators();
1386        }
1387        self.aggregate_window_sec = aggregate_window_sec;
1388        Ok(())
1389    }
1390
1391    async fn start_sync_flush(
1392        &mut self,
1393        cx: &Context<Self>,
1394        expected_procs_flushed: usize,
1395        reply: hyperactor_reference::OncePortRef<()>,
1396        version: hyperactor_reference::OncePortRef<u64>,
1397    ) -> Result<(), anyhow::Error> {
1398        if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
1399            tracing::warn!(
1400                "found unfinished ongoing flush: version {}; {} unflushed procs",
1401                self.current_flush_version,
1402                self.current_unflushed_procs,
1403            );
1404        }
1405
1406        self.current_flush_version += 1;
1407        tracing::debug!(
1408            "start sync flush with version {}",
1409            self.current_flush_version
1410        );
1411        self.current_flush_port = Some(reply.clone());
1412        self.current_unflushed_procs = expected_procs_flushed;
1413        version
1414            .send(cx, self.current_flush_version)
1415            .map_err(anyhow::Error::from)?;
1416        Ok(())
1417    }
1418}
1419
1420#[cfg(test)]
1421pub mod test_tap {
1422    use std::sync::Mutex;
1423    use std::sync::OnceLock;
1424
1425    use tokio::sync::mpsc::UnboundedReceiver;
1426    use tokio::sync::mpsc::UnboundedSender;
1427
1428    static TAP: OnceLock<UnboundedSender<String>> = OnceLock::new();
1429    static RX: OnceLock<Mutex<UnboundedReceiver<String>>> = OnceLock::new();
1430
1431    // Called by tests to install the sender.
1432    pub fn install(tx: UnboundedSender<String>) {
1433        let _ = TAP.set(tx);
1434    }
1435
1436    // Called by tests to register the receiver so we can drain later.
1437    pub fn set_receiver(rx: UnboundedReceiver<String>) {
1438        let _ = RX.set(Mutex::new(rx));
1439    }
1440
1441    // Used by LogClientActor (under #[cfg(test)]) to push a line.
1442    pub fn push(s: &str) {
1443        if let Some(tx) = TAP.get() {
1444            let _ = tx.send(s.to_string());
1445        }
1446    }
1447
1448    // Tests call this to collect everything observed so far.
1449    pub fn drain() -> Vec<String> {
1450        let mut out = Vec::new();
1451        if let Some(rx) = RX.get() {
1452            let mut rx = rx.lock().unwrap();
1453            while let Ok(line) = rx.try_recv() {
1454                out.push(line);
1455            }
1456        }
1457        out
1458    }
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463
1464    use std::sync::Arc;
1465    use std::sync::Mutex;
1466
1467    use hyperactor::RemoteSpawn;
1468    use hyperactor::channel;
1469    use hyperactor::channel::ChannelAddr;
1470    use hyperactor::channel::ChannelTx;
1471    use hyperactor::channel::Tx;
1472    use hyperactor::mailbox::BoxedMailboxSender;
1473    use hyperactor::mailbox::DialMailboxRouter;
1474    use hyperactor::mailbox::MailboxServer;
1475    use hyperactor::proc::Proc;
1476    use hyperactor::testing::ids::test_proc_id;
1477    use tokio::io::AsyncSeek;
1478    use tokio::io::AsyncSeekExt;
1479    use tokio::io::AsyncWriteExt;
1480    use tokio::io::SeekFrom;
1481    use tokio::sync::mpsc;
1482
1483    use super::*;
1484
1485    /// Result of processing file content
1486    #[derive(Debug)]
1487    struct FileProcessingResult {
1488        /// Complete lines found during processing
1489        lines: Vec<Vec<u8>>,
1490        /// Updated position in the file after processing
1491        new_position: u64,
1492        /// Any remaining incomplete line data, buffered for subsequent reads
1493        incomplete_line_buffer: Vec<u8>,
1494    }
1495
1496    /// Process new file content from a given position, extracting complete lines
1497    /// This function is extracted to enable easier unit testing without file system dependencies
1498    async fn process_file_content<R: AsyncRead + AsyncSeek + Unpin>(
1499        reader: &mut R,
1500        current_position: u64,
1501        file_size: u64,
1502        existing_line_buffer: Vec<u8>,
1503        max_buffer_size: usize,
1504    ) -> Result<FileProcessingResult> {
1505        // If position equals file size, we're at the end
1506        if current_position == file_size {
1507            return Ok(FileProcessingResult {
1508                lines: Vec::new(),
1509                new_position: current_position,
1510                incomplete_line_buffer: existing_line_buffer,
1511            });
1512        }
1513
1514        // Handle potential file truncation/rotation
1515        let actual_position = if current_position > file_size {
1516            tracing::warn!(
1517                "File appears to have been truncated (position {} > file size {}), resetting to start",
1518                current_position,
1519                file_size
1520            );
1521            reader.seek(SeekFrom::Start(0)).await?;
1522            0
1523        } else {
1524            // current_position < file_size
1525            reader.seek(SeekFrom::Start(current_position)).await?;
1526            current_position
1527        };
1528
1529        let mut buf = vec![0u8; 128 * 1024];
1530        let mut line_buffer = existing_line_buffer;
1531        let mut lines = Vec::with_capacity(max_buffer_size);
1532        let mut processed_bytes = 0u64;
1533
1534        loop {
1535            let bytes_read = reader.read(&mut buf).await?;
1536            if bytes_read == 0 {
1537                break;
1538            }
1539
1540            let chunk = &buf[..bytes_read];
1541
1542            let mut start = 0;
1543            while let Some(newline_pos) = chunk[start..].iter().position(|&b| b == b'\n') {
1544                let absolute_pos = start + newline_pos;
1545
1546                line_buffer.extend_from_slice(&chunk[start..absolute_pos]);
1547
1548                if !line_buffer.is_empty() {
1549                    if line_buffer.len() > MAX_LINE_SIZE {
1550                        line_buffer.truncate(MAX_LINE_SIZE);
1551                        line_buffer.extend_from_slice(b"... [TRUNCATED]");
1552                    }
1553
1554                    let line_data = std::mem::replace(&mut line_buffer, Vec::with_capacity(2048));
1555                    lines.push(line_data);
1556                }
1557
1558                start = absolute_pos + 1;
1559
1560                // Check if we've reached the max buffer size after adding each line
1561                if lines.len() >= max_buffer_size {
1562                    // We've processed up to and including the current newline
1563                    // The new position is where we should start reading next time
1564                    let new_position = actual_position + processed_bytes + start as u64;
1565
1566                    // Don't save remaining data - we'll re-read it from the new position
1567                    return Ok(FileProcessingResult {
1568                        lines,
1569                        new_position,
1570                        incomplete_line_buffer: Vec::new(),
1571                    });
1572                }
1573            }
1574
1575            // Only add bytes to processed_bytes if we've fully processed this chunk
1576            processed_bytes += bytes_read as u64;
1577
1578            if start < chunk.len() {
1579                line_buffer.extend_from_slice(&chunk[start..]);
1580            }
1581        }
1582
1583        let new_position = actual_position + processed_bytes;
1584
1585        Ok(FileProcessingResult {
1586            lines,
1587            new_position,
1588            incomplete_line_buffer: line_buffer,
1589        })
1590    }
1591
1592    #[tokio::test]
1593    async fn test_forwarding_log_to_client() {
1594        // Setup the basics
1595        let router = DialMailboxRouter::new();
1596        let (proc_addr, client_rx) =
1597            channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1598        let proc = Proc::configured(
1599            test_proc_id("client_0"),
1600            BoxedMailboxSender::new(router.clone()),
1601        );
1602        proc.clone().serve(client_rx);
1603        router.bind(test_proc_id("client_0").into(), proc_addr.clone());
1604        let (client, _handle) = proc.instance("client").unwrap();
1605
1606        // Spin up both the forwarder and the client
1607        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
1608        // SAFETY: Unit test
1609        unsafe {
1610            std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
1611        }
1612        let log_client_actor = LogClientActor::new((), Flattrs::default()).await.unwrap();
1613        let log_client: hyperactor_reference::ActorRef<LogClientActor> =
1614            proc.spawn("log_client", log_client_actor).unwrap().bind();
1615        let log_forwarder_actor = LogForwardActor::new(log_client.clone(), Flattrs::default())
1616            .await
1617            .unwrap();
1618        let log_forwarder: hyperactor_reference::ActorRef<LogForwardActor> = proc
1619            .spawn("log_forwarder", log_forwarder_actor)
1620            .unwrap()
1621            .bind();
1622
1623        // Write some logs that will not be streamed
1624        let tx: ChannelTx<LogMessage> = channel::dial(log_channel).unwrap();
1625        tx.post(LogMessage::Log {
1626            hostname: "my_host".into(),
1627            proc_id: "test_proc".into(),
1628            output_target: OutputTarget::Stderr,
1629            payload: wirevalue::Any::serialize(&"will not stream".to_string()).unwrap(),
1630        });
1631
1632        // Turn on streaming
1633        log_forwarder.set_mode(&client, true).await.unwrap();
1634        tx.post(LogMessage::Log {
1635            hostname: "my_host".into(),
1636            proc_id: "test_proc".into(),
1637            output_target: OutputTarget::Stderr,
1638            payload: wirevalue::Any::serialize(&"will stream".to_string()).unwrap(),
1639        });
1640
1641        // TODO: it is hard to test out anything meaningful here as the client flushes to stdout.
1642    }
1643
1644    #[test]
1645    fn test_deserialize_message_lines_string() {
1646        // Test deserializing a String message with multiple lines
1647        let message = "Line 1\nLine 2\nLine 3".to_string();
1648        let serialized = wirevalue::Any::serialize(&message).unwrap();
1649
1650        let result = deserialize_message_lines(&serialized).unwrap();
1651        assert_eq!(result, vec![vec!["Line 1", "Line 2", "Line 3"]]);
1652
1653        // Test deserializing a Vec<Vec<u8>> message with UTF-8 content
1654        let message_bytes = vec![
1655            "Hello\nWorld".as_bytes().to_vec(),
1656            "UTF-8 \u{1F980}\nTest".as_bytes().to_vec(),
1657        ];
1658        let serialized = wirevalue::Any::serialize(&message_bytes).unwrap();
1659
1660        let result = deserialize_message_lines(&serialized).unwrap();
1661        assert_eq!(
1662            result,
1663            vec![vec!["Hello", "World"], vec!["UTF-8 \u{1F980}", "Test"]]
1664        );
1665
1666        // Test deserializing a single line message
1667        let message = "Single line message".to_string();
1668        let serialized = wirevalue::Any::serialize(&message).unwrap();
1669
1670        let result = deserialize_message_lines(&serialized).unwrap();
1671
1672        assert_eq!(result, vec![vec!["Single line message"]]);
1673
1674        // Test deserializing an empty lines
1675        let message = "\n\n".to_string();
1676        let serialized = wirevalue::Any::serialize(&message).unwrap();
1677
1678        let result = deserialize_message_lines(&serialized).unwrap();
1679
1680        assert_eq!(result, vec![vec!["", ""]]);
1681
1682        // Test error handling for invalid UTF-8 bytes
1683        let invalid_utf8_bytes = vec![vec![0xFF, 0xFE, 0xFD]]; // Invalid UTF-8 sequence in Vec<Vec<u8>>
1684        let serialized = wirevalue::Any::serialize(&invalid_utf8_bytes).unwrap();
1685
1686        let result = deserialize_message_lines(&serialized);
1687
1688        // The function should fail when trying to convert invalid UTF-8 bytes to String
1689        assert!(
1690            result.is_err(),
1691            "Expected deserialization to fail with invalid UTF-8 bytes"
1692        );
1693    }
1694    #[allow(dead_code)]
1695    struct MockLogSender {
1696        log_sender: mpsc::UnboundedSender<(OutputTarget, String)>, // (output_target, content)
1697        flush_called: Arc<Mutex<bool>>,                            // Track if flush was called
1698    }
1699
1700    impl MockLogSender {
1701        #[allow(dead_code)]
1702        fn new(log_sender: mpsc::UnboundedSender<(OutputTarget, String)>) -> Self {
1703            Self {
1704                log_sender,
1705                flush_called: Arc::new(Mutex::new(false)),
1706            }
1707        }
1708    }
1709
1710    #[async_trait]
1711    impl LogSender for MockLogSender {
1712        fn send(
1713            &mut self,
1714            output_target: OutputTarget,
1715            payload: Vec<Vec<u8>>,
1716        ) -> anyhow::Result<()> {
1717            // For testing purposes, convert to string if it's valid UTF-8
1718            let lines: Vec<String> = payload
1719                .iter()
1720                .map(|b| String::from_utf8_lossy(b).trim_end_matches('\n').to_owned())
1721                .collect();
1722
1723            for line in lines {
1724                self.log_sender
1725                    .send((output_target, line))
1726                    .map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e))?;
1727            }
1728            Ok(())
1729        }
1730
1731        fn flush(&mut self) -> anyhow::Result<()> {
1732            // Mark that flush was called
1733            let mut flush_called = self.flush_called.lock().unwrap();
1734            *flush_called = true;
1735
1736            // For testing purposes, just return Ok
1737            // In a real implementation, this would wait for all messages to be delivered
1738            Ok(())
1739        }
1740    }
1741
1742    #[test]
1743    fn test_string_similarity() {
1744        // Test exact match
1745        assert_eq!(normalized_edit_distance("hello", "hello"), 0.0);
1746
1747        // Test completely different strings
1748        assert_eq!(normalized_edit_distance("hello", "i'mdiff"), 1.0);
1749
1750        // Test similar strings
1751        assert!(normalized_edit_distance("hello", "helo") < 0.5);
1752        assert!(normalized_edit_distance("hello", "hello!") < 0.5);
1753
1754        // Test empty strings
1755        assert_eq!(normalized_edit_distance("", ""), 0.0);
1756        assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1757    }
1758
1759    #[test]
1760    fn test_add_line_to_empty_aggregator() {
1761        let mut aggregator = Aggregator::new();
1762        let result = aggregator.add_line("ERROR 404 not found");
1763
1764        assert!(result.is_ok());
1765        assert_eq!(aggregator.lines.len(), 1);
1766        assert_eq!(aggregator.lines[0].content, "ERROR 404 not found");
1767        assert_eq!(aggregator.lines[0].count, 1);
1768    }
1769
1770    #[test]
1771    fn test_add_line_merges_with_similar_line() {
1772        let mut aggregator = Aggregator::new_with_threshold(0.2);
1773
1774        // Add first line
1775        aggregator.add_line("ERROR 404 timeout").unwrap();
1776        assert_eq!(aggregator.lines.len(), 1);
1777
1778        // Add second line that should merge (similar enough)
1779        aggregator.add_line("ERROR 500 timeout").unwrap();
1780        assert_eq!(aggregator.lines.len(), 1); // Should still be 1 line after merge
1781        assert_eq!(aggregator.lines[0].count, 2);
1782
1783        // Add third line that's too different
1784        aggregator
1785            .add_line("WARNING database connection failed")
1786            .unwrap();
1787        assert_eq!(aggregator.lines.len(), 2); // Should be 2 lines now
1788
1789        // Add fourth line similar to third
1790        aggregator
1791            .add_line("WARNING database connection timed out")
1792            .unwrap();
1793        assert_eq!(aggregator.lines.len(), 2); // Should still be 2 lines
1794        assert_eq!(aggregator.lines[1].count, 2); // Second group has 2 lines
1795    }
1796
1797    #[test]
1798    fn test_aggregation_of_similar_log_lines() {
1799        let mut aggregator = Aggregator::new_with_threshold(0.2);
1800
1801        // Add the provided log lines with small differences
1802        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/i/ims-ansi, falling back to search/replace to update prefixes for this file.").unwrap();
1803        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,351 conda-unpack-fb:292] Found invalid offsets for lib/pkgconfig/ncursesw.pc, falling back to search/replace to update prefixes for this file.").unwrap();
1804        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/k/kt7, falling back to search/replace to update prefixes for this file.").unwrap();
1805
1806        // Check that we have only one aggregated line due to similarity
1807        assert_eq!(aggregator.lines.len(), 1);
1808
1809        // Check that the count is 3
1810        assert_eq!(aggregator.lines[0].count, 3);
1811    }
1812
1813    #[tokio::test]
1814    async fn test_stream_fwd_creation() {
1815        hyperactor_telemetry::initialize_logging_for_test();
1816
1817        let (mut writer, reader) = tokio::io::duplex(1024);
1818        let (log_channel, mut rx) =
1819            channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1820
1821        // Create a temporary file for testing the writer
1822        let temp_file = tempfile::NamedTempFile::new().unwrap();
1823        let temp_path = temp_file.path().to_path_buf();
1824
1825        // Create file writer that writes to the temp file (using tokio for async compatibility)
1826        let file_writer = tokio::fs::OpenOptions::new()
1827            .create(true)
1828            .write(true)
1829            .append(true)
1830            .open(&temp_path)
1831            .await
1832            .unwrap();
1833
1834        // Create FileMonitor and get address for stdout
1835        let file_monitor = FileAppender::new();
1836        let file_monitor_addr = file_monitor
1837            .as_ref()
1838            .map(|fm| fm.addr_for(OutputTarget::Stdout));
1839
1840        let the_test_proc_id = test_proc_id("testproc_0");
1841        let monitor = StreamFwder::start_with_writer(
1842            reader,
1843            Box::new(file_writer),
1844            file_monitor_addr,
1845            OutputTarget::Stdout,
1846            3, // max_buffer_size
1847            Some(log_channel),
1848            &the_test_proc_id,
1849            None, // no prefix
1850        );
1851
1852        // Wait a bit for set up to be done
1853        tokio::time::sleep(Duration::from_millis(500)).await;
1854
1855        // Write initial content through the input writer
1856        writer.write_all(b"Initial log line\n").await.unwrap();
1857        writer.flush().await.unwrap();
1858
1859        // Write more content
1860        for i in 1..=5 {
1861            writer
1862                .write_all(format!("New log line {}\n", i).as_bytes())
1863                .await
1864                .unwrap();
1865        }
1866        writer.flush().await.unwrap();
1867
1868        // Wait a bit for the file to be written and the watcher to detect changes
1869        tokio::time::sleep(Duration::from_millis(500)).await;
1870
1871        // Wait until log sender gets message
1872        let timeout = Duration::from_secs(1);
1873        let _ = tokio::time::timeout(timeout, rx.recv())
1874            .await
1875            .unwrap_or_else(|_| panic!("Did not get log message within {:?}", timeout));
1876
1877        // Wait a bit more for all lines to be processed
1878        tokio::time::sleep(Duration::from_millis(200)).await;
1879
1880        let (recent_lines, _result) = monitor.abort().await;
1881
1882        assert!(
1883            recent_lines.len() >= 3,
1884            "Expected buffer with at least 3 lines, got {} lines: {:?}",
1885            recent_lines.len(),
1886            recent_lines
1887        );
1888
1889        let file_contents = std::fs::read_to_string(&temp_path).unwrap();
1890        assert!(
1891            file_contents.contains("Initial log line"),
1892            "Expected temp file to contain 'Initial log line', got: {:?}",
1893            file_contents
1894        );
1895        assert!(
1896            file_contents.contains("New log line 1"),
1897            "Expected temp file to contain 'New log line 1', got: {:?}",
1898            file_contents
1899        );
1900        assert!(
1901            file_contents.contains("New log line 5"),
1902            "Expected temp file to contain 'New log line 5', got: {:?}",
1903            file_contents
1904        );
1905    }
1906
1907    #[test]
1908    fn test_aggregator_custom_threshold() {
1909        // Test with very strict threshold (0.05)
1910        let mut strict_aggregator = Aggregator::new_with_threshold(0.05);
1911        strict_aggregator.add_line("ERROR 404").unwrap();
1912        strict_aggregator.add_line("ERROR 500").unwrap(); // Should not merge due to strict threshold
1913        assert_eq!(strict_aggregator.lines.len(), 2);
1914
1915        // Test with very lenient threshold (0.8)
1916        let mut lenient_aggregator = Aggregator::new_with_threshold(0.8);
1917        lenient_aggregator.add_line("ERROR 404").unwrap();
1918        lenient_aggregator.add_line("WARNING 200").unwrap(); // Should merge due to lenient threshold
1919        assert_eq!(lenient_aggregator.lines.len(), 1);
1920        assert_eq!(lenient_aggregator.lines[0].count, 2);
1921    }
1922
1923    #[test]
1924    fn test_format_system_time() {
1925        let test_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1609459200); // 2021-01-01 00:00:00 UTC
1926        let formatted = format_system_time(test_time);
1927
1928        // Just verify it's a reasonable format (contains date and time components)
1929        assert!(formatted.contains("-"));
1930        assert!(formatted.contains(":"));
1931        assert!(formatted.len() > 10); // Should be reasonable length
1932    }
1933
1934    #[test]
1935    fn test_aggregator_display_formatting() {
1936        let mut aggregator = Aggregator::new();
1937        aggregator.add_line("Test error message").unwrap();
1938        aggregator.add_line("Test error message").unwrap(); // Should merge
1939
1940        let display_string = format!("{}", aggregator);
1941
1942        // Verify the output contains expected elements
1943        assert!(display_string.contains("Aggregated Logs"));
1944        assert!(display_string.contains("[2 similar log lines]"));
1945        assert!(display_string.contains("Test error message"));
1946        assert!(display_string.contains(">>>") && display_string.contains("<<<"));
1947    }
1948
1949    #[tokio::test]
1950    async fn test_local_log_sender_inactive_status() {
1951        let (log_channel, _) =
1952            channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1953        let test_proc_id = test_proc_id("testproc_0");
1954        let mut sender = LocalLogSender::new(log_channel, &test_proc_id).unwrap();
1955
1956        // This test verifies that the sender handles inactive status gracefully
1957        // In a real scenario, the channel would be closed, but for testing we just
1958        // verify the send/flush methods don't panic
1959        let result = sender.send(OutputTarget::Stdout, vec![b"test".to_vec()]);
1960        assert!(result.is_ok());
1961
1962        let result = sender.flush();
1963        assert!(result.is_ok());
1964    }
1965
1966    #[test]
1967    fn test_levenshtein_distance_edge_cases() {
1968        // Test with empty strings
1969        assert_eq!(levenshtein_distance("", ""), 0);
1970        assert_eq!(levenshtein_distance("", "hello"), 5);
1971        assert_eq!(levenshtein_distance("hello", ""), 5);
1972
1973        // Test with identical strings
1974        assert_eq!(levenshtein_distance("hello", "hello"), 0);
1975
1976        // Test with single character differences
1977        assert_eq!(levenshtein_distance("hello", "helo"), 1); // deletion
1978        assert_eq!(levenshtein_distance("helo", "hello"), 1); // insertion
1979        assert_eq!(levenshtein_distance("hello", "hallo"), 1); // substitution
1980
1981        // Test with unicode characters
1982        assert_eq!(levenshtein_distance("café", "cafe"), 1);
1983    }
1984
1985    #[test]
1986    fn test_normalized_edit_distance_edge_cases() {
1987        // Test with empty strings
1988        assert_eq!(normalized_edit_distance("", ""), 0.0);
1989
1990        // Test normalization
1991        assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1992        assert_eq!(normalized_edit_distance("", "hello"), 1.0);
1993
1994        // Test that result is always between 0.0 and 1.0
1995        let distance = normalized_edit_distance("completely", "different");
1996        assert!(distance >= 0.0 && distance <= 1.0);
1997    }
1998
1999    #[tokio::test]
2000    async fn test_deserialize_message_lines_edge_cases() {
2001        // Test with empty string
2002        let empty_message = "".to_string();
2003        let serialized = wirevalue::Any::serialize(&empty_message).unwrap();
2004        let result = deserialize_message_lines(&serialized).unwrap();
2005        assert_eq!(result, vec![vec![] as Vec<String>]);
2006
2007        // Test with trailing newline
2008        let trailing_newline = "line1\nline2\n".to_string();
2009        let serialized = wirevalue::Any::serialize(&trailing_newline).unwrap();
2010        let result = deserialize_message_lines(&serialized).unwrap();
2011        assert_eq!(result, vec![vec!["line1", "line2"]]);
2012    }
2013
2014    #[test]
2015    fn test_output_target_serialization() {
2016        // Test that OutputTarget can be serialized and deserialized
2017        let stdout_serialized = serde_json::to_string(&OutputTarget::Stdout).unwrap();
2018        let stderr_serialized = serde_json::to_string(&OutputTarget::Stderr).unwrap();
2019
2020        let stdout_deserialized: OutputTarget = serde_json::from_str(&stdout_serialized).unwrap();
2021        let stderr_deserialized: OutputTarget = serde_json::from_str(&stderr_serialized).unwrap();
2022
2023        assert_eq!(stdout_deserialized, OutputTarget::Stdout);
2024        assert_eq!(stderr_deserialized, OutputTarget::Stderr);
2025    }
2026
2027    #[test]
2028    fn test_log_line_display_formatting() {
2029        let log_line = LogLine::new("Test message".to_string());
2030        let display_string = format!("{}", log_line);
2031
2032        assert!(display_string.contains("[1 similar log lines]"));
2033        assert!(display_string.contains("Test message"));
2034
2035        // Test with higher count
2036        let mut log_line_multi = LogLine::new("Test message".to_string());
2037        log_line_multi.count = 5;
2038        let display_string_multi = format!("{}", log_line_multi);
2039
2040        assert!(display_string_multi.contains("[5 similar log lines]"));
2041        assert!(display_string_multi.contains("Test message"));
2042    }
2043
2044    // Mock reader for testing process_file_content using std::io::Cursor
2045    fn create_mock_reader(data: Vec<u8>) -> std::io::Cursor<Vec<u8>> {
2046        std::io::Cursor::new(data)
2047    }
2048
2049    #[tokio::test]
2050    async fn test_process_file_content_basic() {
2051        let data = b"line1\nline2\nline3\n".to_vec();
2052        let mut reader = create_mock_reader(data.clone());
2053        let max_buf_size = 10;
2054
2055        let result =
2056            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2057                .await
2058                .unwrap();
2059
2060        assert_eq!(result.lines.len(), 3);
2061        assert_eq!(result.lines[0], b"line1");
2062        assert_eq!(result.lines[1], b"line2");
2063        assert_eq!(result.lines[2], b"line3");
2064        assert_eq!(result.new_position, data.len() as u64);
2065        assert!(result.incomplete_line_buffer.is_empty());
2066    }
2067
2068    #[tokio::test]
2069    async fn test_process_file_content_incomplete_line() {
2070        let data = b"line1\nline2\npartial".to_vec();
2071        let mut reader = create_mock_reader(data.clone());
2072        let max_buf_size = 10;
2073
2074        let result =
2075            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2076                .await
2077                .unwrap();
2078
2079        assert_eq!(result.lines.len(), 2);
2080        assert_eq!(result.lines[0], b"line1");
2081        assert_eq!(result.lines[1], b"line2");
2082        assert_eq!(result.new_position, data.len() as u64);
2083        assert_eq!(result.incomplete_line_buffer, b"partial");
2084    }
2085
2086    #[tokio::test]
2087    async fn test_process_file_content_with_existing_buffer() {
2088        let data = b"omplete\nline2\nline3\n".to_vec();
2089        let mut reader = create_mock_reader(data.clone());
2090        let existing_buffer = b"inc".to_vec();
2091        let max_buf_size = 10;
2092
2093        let result = process_file_content(
2094            &mut reader,
2095            0,
2096            data.len() as u64,
2097            existing_buffer,
2098            max_buf_size,
2099        )
2100        .await
2101        .unwrap();
2102
2103        assert_eq!(result.lines.len(), 3);
2104        assert_eq!(result.lines[0], b"incomplete");
2105        assert_eq!(result.lines[1], b"line2");
2106        assert_eq!(result.lines[2], b"line3");
2107        assert_eq!(result.new_position, data.len() as u64);
2108        assert!(result.incomplete_line_buffer.is_empty());
2109    }
2110
2111    #[tokio::test]
2112    async fn test_process_file_content_empty_file() {
2113        let data = Vec::new();
2114        let mut reader = create_mock_reader(data.clone());
2115        let max_buf_size = 10;
2116
2117        let result = process_file_content(&mut reader, 0, 0, Vec::new(), max_buf_size)
2118            .await
2119            .unwrap();
2120
2121        assert!(result.lines.is_empty());
2122        assert_eq!(result.new_position, 0);
2123        assert!(result.incomplete_line_buffer.is_empty());
2124    }
2125
2126    #[tokio::test]
2127    async fn test_process_file_content_only_newlines() {
2128        let data = b"\n\n\n".to_vec();
2129        let mut reader = create_mock_reader(data.clone());
2130        let max_buf_size = 10;
2131
2132        let result =
2133            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2134                .await
2135                .unwrap();
2136
2137        // Empty lines should not be added (the function skips empty line_buffer)
2138        assert!(result.lines.is_empty());
2139        assert_eq!(result.new_position, data.len() as u64);
2140        assert!(result.incomplete_line_buffer.is_empty());
2141    }
2142
2143    #[tokio::test]
2144    async fn test_process_file_content_no_newlines() {
2145        let data = b"no newlines here".to_vec();
2146        let mut reader = create_mock_reader(data.clone());
2147        let max_buf_size = 10;
2148
2149        let result =
2150            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2151                .await
2152                .unwrap();
2153
2154        assert!(result.lines.is_empty());
2155        assert_eq!(result.new_position, data.len() as u64);
2156        assert_eq!(result.incomplete_line_buffer, b"no newlines here");
2157    }
2158
2159    #[tokio::test]
2160    async fn test_process_file_content_file_truncation() {
2161        let data = b"line1\nline2\n".to_vec();
2162        let mut reader = create_mock_reader(data.clone());
2163
2164        // Simulate current position being beyond file size (file was truncated)
2165        let result = process_file_content(
2166            &mut reader,
2167            100, // position beyond file size
2168            data.len() as u64,
2169            Vec::new(),
2170            10, // max_buf_size
2171        )
2172        .await
2173        .unwrap();
2174
2175        // Should reset to beginning and read all lines
2176        assert_eq!(result.lines.len(), 2);
2177        assert_eq!(result.lines[0], b"line1");
2178        assert_eq!(result.lines[1], b"line2");
2179        assert_eq!(result.new_position, data.len() as u64);
2180        assert!(result.incomplete_line_buffer.is_empty());
2181    }
2182
2183    #[tokio::test]
2184    async fn test_process_file_content_seek_to_position() {
2185        let data = b"line1\nline2\nline3\n".to_vec();
2186        let mut reader = create_mock_reader(data.clone());
2187
2188        // Start reading from position 6 (after "line1\n")
2189        let result = process_file_content(&mut reader, 6, data.len() as u64, Vec::new(), 10)
2190            .await
2191            .unwrap();
2192
2193        assert_eq!(result.lines.len(), 2);
2194        assert_eq!(result.lines[0], b"line2");
2195        assert_eq!(result.lines[1], b"line3");
2196        assert_eq!(result.new_position, data.len() as u64);
2197        assert!(result.incomplete_line_buffer.is_empty());
2198    }
2199
2200    #[tokio::test]
2201    async fn test_process_file_content_position_equals_file_size() {
2202        let data = b"line1\nline2\n".to_vec();
2203        let mut reader = create_mock_reader(data.clone());
2204
2205        // Start reading from end of file
2206        let result = process_file_content(
2207            &mut reader,
2208            data.len() as u64,
2209            data.len() as u64,
2210            Vec::new(),
2211            10,
2212        )
2213        .await
2214        .unwrap();
2215
2216        // Should not read anything new
2217        assert!(
2218            result.lines.is_empty(),
2219            "Expected empty line got {:?}",
2220            result.lines
2221        );
2222        assert_eq!(result.new_position, data.len() as u64);
2223        assert!(result.incomplete_line_buffer.is_empty());
2224    }
2225
2226    #[tokio::test]
2227    async fn test_process_file_content_large_line_truncation() {
2228        // Create a line longer than MAX_LINE_SIZE
2229        let large_line = "x".repeat(MAX_LINE_SIZE + 1000);
2230        let data = format!("{}\nline2\n", large_line).into_bytes();
2231        let mut reader = create_mock_reader(data.clone());
2232
2233        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2234            .await
2235            .unwrap();
2236
2237        assert_eq!(result.lines.len(), 2);
2238
2239        // First line should be truncated
2240        assert_eq!(
2241            result.lines[0].len(),
2242            MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2243        );
2244        assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2245
2246        // Second line should be normal
2247        assert_eq!(result.lines[1], b"line2");
2248
2249        assert_eq!(result.new_position, data.len() as u64);
2250        assert!(result.incomplete_line_buffer.is_empty());
2251    }
2252
2253    #[tokio::test]
2254    async fn test_process_file_content_mixed_line_endings() {
2255        let data = b"line1\nline2\r\nline3\n".to_vec();
2256        let mut reader = create_mock_reader(data.clone());
2257
2258        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2259            .await
2260            .unwrap();
2261
2262        assert_eq!(result.lines.len(), 3);
2263        assert_eq!(result.lines[0], b"line1");
2264        assert_eq!(result.lines[1], b"line2\r"); // \r is preserved
2265        assert_eq!(result.lines[2], b"line3");
2266        assert_eq!(result.new_position, data.len() as u64);
2267        assert!(result.incomplete_line_buffer.is_empty());
2268    }
2269
2270    #[tokio::test]
2271    async fn test_process_file_content_existing_buffer_with_truncation() {
2272        // Create a scenario where existing buffer + new data creates a line that needs truncation
2273        let existing_buffer = "x".repeat(MAX_LINE_SIZE - 100);
2274        let data = format!("{}\nline2\n", "y".repeat(200)).into_bytes();
2275        let mut reader = create_mock_reader(data.clone());
2276
2277        let result = process_file_content(
2278            &mut reader,
2279            0,
2280            data.len() as u64,
2281            existing_buffer.into_bytes(),
2282            10,
2283        )
2284        .await
2285        .unwrap();
2286
2287        assert_eq!(result.lines.len(), 2);
2288
2289        // First line should be truncated (existing buffer + new data)
2290        assert_eq!(
2291            result.lines[0].len(),
2292            MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2293        );
2294        assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2295
2296        // Second line should be normal
2297        assert_eq!(result.lines[1], b"line2");
2298
2299        assert_eq!(result.new_position, data.len() as u64);
2300        assert!(result.incomplete_line_buffer.is_empty());
2301    }
2302
2303    #[tokio::test]
2304    async fn test_process_file_content_single_character_lines() {
2305        let data = b"a\nb\nc\n".to_vec();
2306        let mut reader = create_mock_reader(data.clone());
2307
2308        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2309            .await
2310            .unwrap();
2311
2312        assert_eq!(result.lines.len(), 3);
2313        assert_eq!(result.lines[0], b"a");
2314        assert_eq!(result.lines[1], b"b");
2315        assert_eq!(result.lines[2], b"c");
2316        assert_eq!(result.new_position, data.len() as u64);
2317        assert!(result.incomplete_line_buffer.is_empty());
2318    }
2319
2320    #[tokio::test]
2321    async fn test_process_file_content_binary_data() {
2322        let data = vec![0x00, 0x01, 0x02, b'\n', 0xFF, 0xFE, b'\n'];
2323        let mut reader = create_mock_reader(data.clone());
2324
2325        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2326            .await
2327            .unwrap();
2328
2329        assert_eq!(result.lines.len(), 2);
2330        assert_eq!(result.lines[0], vec![0x00, 0x01, 0x02]);
2331        assert_eq!(result.lines[1], vec![0xFF, 0xFE]);
2332        assert_eq!(result.new_position, data.len() as u64);
2333        assert!(result.incomplete_line_buffer.is_empty());
2334    }
2335
2336    #[tokio::test]
2337    async fn test_process_file_content_resume_after_max_buffer_size() {
2338        // Test data: 3 lines as specified in the example
2339        let data = b"line 1\nline 2\nline 3\n".to_vec();
2340        let mut reader = create_mock_reader(data.clone());
2341        let max_buffer_size = 2; // Limit to 2 lines per call
2342
2343        // First call: should return first 2 lines
2344        let result1 = process_file_content(
2345            &mut reader,
2346            0, // start from beginning
2347            data.len() as u64,
2348            Vec::new(), // no existing buffer
2349            max_buffer_size,
2350        )
2351        .await
2352        .unwrap();
2353
2354        // Verify first call results
2355        assert_eq!(result1.lines.len(), 2, "First call should return 2 lines");
2356        assert_eq!(result1.lines[0], b"line 1");
2357        assert_eq!(result1.lines[1], b"line 2");
2358        assert!(result1.incomplete_line_buffer.is_empty());
2359
2360        // The position should be after "line 1\nline 2\n" (14 bytes)
2361        let expected_position_after_first_call = b"line 1\nline 2\n".len() as u64;
2362        assert_eq!(result1.new_position, expected_position_after_first_call);
2363
2364        // Second call: resume from where first call left off
2365        let mut reader2 = create_mock_reader(data.clone());
2366        let result2 = process_file_content(
2367            &mut reader2,
2368            result1.new_position, // resume from previous position
2369            data.len() as u64,
2370            result1.incomplete_line_buffer, // pass any incomplete buffer (should be empty)
2371            max_buffer_size,
2372        )
2373        .await
2374        .unwrap();
2375
2376        // Verify second call results
2377        assert_eq!(result2.lines.len(), 1, "Second call should return 1 line");
2378        assert_eq!(result2.lines[0], b"line 3");
2379        assert!(result2.incomplete_line_buffer.is_empty());
2380        assert_eq!(result2.new_position, data.len() as u64);
2381    }
2382
2383    #[tokio::test]
2384    async fn test_utf8_truncation() {
2385        // Test that StreamFwder doesn't panic when truncating lines
2386        // with multi-byte chars.
2387
2388        hyperactor_telemetry::initialize_logging_for_test();
2389
2390        // Create a line longer than MAX_LINE_SIZE with an emoji at the boundary
2391        let mut long_line = "x".repeat(MAX_LINE_SIZE - 1);
2392        long_line.push('🦀'); // 4-byte emoji - truncation will land in the middle
2393        long_line.push('\n');
2394
2395        // Create IO streams
2396        let (mut writer, reader) = tokio::io::duplex(8192);
2397
2398        // Start StreamFwder
2399        let test_proc_id = test_proc_id("testproc_0");
2400        let monitor = StreamFwder::start_with_writer(
2401            reader,
2402            Box::new(tokio::io::sink()), // discard output
2403            None,                        // no file monitor needed
2404            OutputTarget::Stdout,
2405            1,    // tail buffer of 1 (need at least one sink)
2406            None, // no log channel
2407            &test_proc_id,
2408            None, // no prefix
2409        );
2410
2411        // Write the problematic line
2412        writer.write_all(long_line.as_bytes()).await.unwrap();
2413        drop(writer); // Close to signal EOF
2414
2415        // Wait for completion - should NOT panic
2416        let (_lines, result) = monitor.abort().await;
2417        result.expect("Should complete without panic despite UTF-8 truncation");
2418    }
2419}