Skip to main content

hyperactor_mesh/
logging.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::fmt;
12use std::path::Path;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::Duration;
16use std::time::SystemTime;
17
18use anyhow::Result;
19use async_trait::async_trait;
20use chrono::DateTime;
21use chrono::Local;
22use hostname;
23use hyperactor::Actor;
24use hyperactor::ActorRef;
25use hyperactor::Bind;
26use hyperactor::Context;
27use hyperactor::Endpoint as _;
28use hyperactor::HandleClient;
29use hyperactor::Handler;
30use hyperactor::Instance;
31use hyperactor::OncePortRef;
32use hyperactor::ProcAddr;
33use hyperactor::RefClient;
34use hyperactor::Unbind;
35use hyperactor::channel;
36use hyperactor::channel::ChannelAddr;
37use hyperactor::channel::ChannelRx;
38use hyperactor::channel::ChannelTransport;
39use hyperactor::channel::ChannelTx;
40use hyperactor::channel::Rx;
41use hyperactor::channel::Tx;
42use hyperactor::channel::TxStatus;
43use hyperactor_config::CONFIG;
44use hyperactor_config::ConfigAttr;
45use hyperactor_config::Flattrs;
46use hyperactor_config::attrs::declare_attrs;
47use hyperactor_telemetry::env;
48use hyperactor_telemetry::log_file_path;
49use serde::Deserialize;
50use serde::Serialize;
51use tokio::io;
52use tokio::io::AsyncRead;
53use tokio::io::AsyncReadExt;
54use tokio::io::AsyncWriteExt;
55use tokio::sync::Mutex;
56use tokio::sync::Notify;
57use tokio::sync::RwLock;
58use tokio::sync::watch::Receiver;
59use tokio::task::JoinHandle;
60use tracing::Level;
61use typeuri::Named;
62
63use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
64use crate::shortuuid::ShortUuid;
65
66mod line_prefixing_writer;
67
68pub(crate) const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5;
69const MAX_LINE_SIZE: usize = 4 * 1024;
70
71declare_attrs! {
72    /// Maximum number of lines to batch before flushing to client
73    /// This means that stdout/err reader will be paused after reading `HYPERACTOR_READ_LOG_BUFFER` lines.
74    /// After pause lines will be flushed and reading will resume.
75    @meta(CONFIG = ConfigAttr::new(
76        Some("HYPERACTOR_READ_LOG_BUFFER".to_string()),
77        Some("read_log_buffer".to_string()),
78    ))
79    pub attr READ_LOG_BUFFER: usize = 100;
80
81    /// If enabled, local logs are also written to a file and aggregated
82    @meta(CONFIG = ConfigAttr::new(
83        Some("HYPERACTOR_FORCE_FILE_LOG".to_string()),
84        Some("force_file_log".to_string()),
85    ))
86    pub attr FORCE_FILE_LOG: bool = false;
87
88    /// Prefixes logs with rank
89    @meta(CONFIG = ConfigAttr::new(
90        Some("HYPERACTOR_PREFIX_WITH_RANK".to_string()),
91        Some("prefix_with_rank".to_string()),
92    ))
93    pub attr PREFIX_WITH_RANK: bool = true;
94}
95
96/// Calculate the Levenshtein distance between two strings
97fn levenshtein_distance(left: &str, right: &str) -> usize {
98    let left_chars: Vec<char> = left.chars().collect();
99    let right_chars: Vec<char> = right.chars().collect();
100
101    let left_len = left_chars.len();
102    let right_len = right_chars.len();
103
104    // Handle edge cases
105    if left_len == 0 {
106        return right_len;
107    }
108    if right_len == 0 {
109        return left_len;
110    }
111
112    // Create a matrix of size (len_s1+1) x (len_s2+1)
113    let mut matrix = vec![vec![0; right_len + 1]; left_len + 1];
114
115    // Initialize the first row and column
116    for (i, row) in matrix.iter_mut().enumerate().take(left_len + 1) {
117        row[0] = i;
118    }
119    for (j, cell) in matrix[0].iter_mut().enumerate().take(right_len + 1) {
120        *cell = j;
121    }
122
123    // Fill the matrix
124    for i in 1..=left_len {
125        for j in 1..=right_len {
126            let cost = if left_chars[i - 1] == right_chars[j - 1] {
127                0
128            } else {
129                1
130            };
131
132            matrix[i][j] = std::cmp::min(
133                std::cmp::min(
134                    matrix[i - 1][j] + 1, // deletion
135                    matrix[i][j - 1] + 1, // insertion
136                ),
137                matrix[i - 1][j - 1] + cost, // substitution
138            );
139        }
140    }
141
142    // Return the bottom-right cell
143    matrix[left_len][right_len]
144}
145
146/// Calculate the normalized edit distance between two strings (0.0 to 1.0)
147fn normalized_edit_distance(left: &str, right: &str) -> f64 {
148    let distance = levenshtein_distance(left, right) as f64;
149    let max_len = std::cmp::max(left.len(), right.len()) as f64;
150
151    if max_len == 0.0 {
152        0.0 // Both strings are empty, so they're identical
153    } else {
154        distance / max_len
155    }
156}
157
158#[derive(Debug, Clone)]
159/// LogLine represents a single log line with its content and count
160struct LogLine {
161    content: String,
162    pub count: u64,
163}
164
165impl LogLine {
166    fn new(content: String) -> Self {
167        Self { content, count: 1 }
168    }
169}
170
171impl fmt::Display for LogLine {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        write!(
174            f,
175            "\x1b[33m[{} similar log lines]\x1b[0m {}",
176            self.count, self.content
177        )
178    }
179}
180
181#[derive(Debug, Clone)]
182/// Aggregator is a struct that holds a list of LogLines and a start time.
183/// It can aggregate new log lines to existing ones if they are "similar" based on edit distance.
184struct Aggregator {
185    lines: Vec<LogLine>,
186    start_time: SystemTime,
187    similarity_threshold: f64, // Threshold for considering two strings similar (0.0 to 1.0)
188}
189
190impl Aggregator {
191    fn new() -> Self {
192        // Default threshold: strings with normalized edit distance < 0.15 are considered similar
193        Self::new_with_threshold(0.15)
194    }
195
196    fn new_with_threshold(threshold: f64) -> Self {
197        Aggregator {
198            lines: vec![],
199            start_time: std::time::SystemTime::now(),
200            similarity_threshold: threshold,
201        }
202    }
203
204    fn reset(&mut self) {
205        self.lines.clear();
206        self.start_time = std::time::SystemTime::now();
207    }
208
209    fn add_line(&mut self, line: &str) -> anyhow::Result<()> {
210        // Find the most similar existing line
211        let mut best_match_idx = None;
212        let mut best_similarity = f64::MAX;
213
214        for (idx, existing_line) in self.lines.iter().enumerate() {
215            let distance = normalized_edit_distance(&existing_line.content, line);
216
217            // If this line is more similar than our current best match
218            if distance < best_similarity && distance < self.similarity_threshold {
219                best_match_idx = Some(idx);
220                best_similarity = distance;
221            }
222        }
223
224        // If we found a similar enough line, increment its count
225        if let Some(idx) = best_match_idx {
226            self.lines[idx].count += 1;
227        } else {
228            // Otherwise, add a new line
229            self.lines.push(LogLine::new(line.to_string()));
230        }
231
232        Ok(())
233    }
234
235    fn is_empty(&self) -> bool {
236        self.lines.is_empty()
237    }
238}
239
240// Helper function to format SystemTime
241fn format_system_time(time: SystemTime) -> String {
242    let datetime: DateTime<Local> = time.into();
243    datetime.format("%Y-%m-%d %H:%M:%S").to_string()
244}
245
246impl fmt::Display for Aggregator {
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        // Format the start time
249        let start_time_str = format_system_time(self.start_time);
250
251        // Get and format the current time
252        let current_time = std::time::SystemTime::now();
253        let end_time_str = format_system_time(current_time);
254
255        // Write the header with formatted time window
256        writeln!(
257            f,
258            "\x1b[36m>>> Aggregated Logs ({}) >>>\x1b[0m",
259            start_time_str
260        )?;
261
262        // Write each log line
263        for line in self.lines.iter() {
264            writeln!(f, "{}", line)?;
265        }
266        writeln!(
267            f,
268            "\x1b[36m<<< Aggregated Logs ({}) <<<\x1b[0m",
269            end_time_str
270        )?;
271        Ok(())
272    }
273}
274
275/// Messages that can be sent to the LogClientActor remotely.
276#[derive(
277    Debug,
278    Clone,
279    Serialize,
280    Deserialize,
281    Named,
282    Handler,
283    HandleClient,
284    RefClient
285)]
286pub enum LogMessage {
287    /// Log details
288    Log {
289        /// The hostname of the process that generated the log
290        hostname: String,
291        /// String representation of the ProcAddr that generated the log
292        proc_id: String,
293        /// The target output stream (stdout or stderr)
294        output_target: OutputTarget,
295        /// The log payload as bytes
296        payload: wirevalue::Any,
297    },
298
299    /// Flush the log
300    Flush {
301        /// Indicate if the current flush is synced or non-synced.
302        /// If synced, a version number is available. Otherwise, none.
303        sync_version: Option<u64>,
304    },
305}
306
307/// Messages that can be sent to the LogClient locally.
308#[derive(
309    Debug,
310    Clone,
311    Serialize,
312    Deserialize,
313    Named,
314    Handler,
315    HandleClient,
316    RefClient
317)]
318#[expect(
319    clippy::large_enum_variant,
320    reason = "actor message enum with Handler/HandleClient/RefClient derives; boxing fields ripples into client/handler call sites and may require derive-macro changes — separate diff"
321)]
322pub enum LogClientMessage {
323    SetAggregate {
324        /// The time window in seconds to aggregate logs. If None, aggregation is disabled.
325        aggregate_window_sec: Option<u64>,
326    },
327
328    /// Synchronously flush all the logs from all the procs. This is for client to call.
329    StartSyncFlush {
330        /// Expect these many procs to ack the flush message.
331        expected_procs: usize,
332        /// Return once we have received the acks from all the procs
333        reply: OncePortRef<()>,
334        /// Return to the caller the current flush version
335        version: OncePortRef<u64>,
336    },
337}
338
339/// Trait for sending logs
340#[async_trait]
341pub trait LogSender: Send + Sync {
342    /// Send a log payload in bytes
343    fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()>;
344
345    /// Flush the log channel, ensuring all messages are delivered
346    /// Returns when the flush message has been acknowledged
347    fn flush(&mut self) -> anyhow::Result<()>;
348}
349
350/// Represents the target output stream (stdout or stderr)
351#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
352pub enum OutputTarget {
353    /// Standard output stream
354    Stdout,
355    /// Standard error stream
356    Stderr,
357}
358
359#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
360pub enum Stream {
361    /// Standard output stream
362    ChildStdout,
363    /// Standard error stream
364    ChildStderr,
365}
366
367/// Write the log to a local unix channel so some actors can listen to it and stream the log back.
368pub struct LocalLogSender {
369    hostname: String,
370    proc_id: String,
371    tx: ChannelTx<LogMessage>,
372    status: Receiver<TxStatus>,
373}
374
375impl LocalLogSender {
376    fn new(log_channel: ChannelAddr, proc_id: &ProcAddr) -> Result<Self, anyhow::Error> {
377        let tx = channel::dial::<LogMessage>(log_channel)?;
378        let status = tx.status().clone();
379
380        let hostname = hostname::get()
381            .unwrap_or_else(|_| "unknown_host".into())
382            .into_string()
383            .unwrap_or("unknown_host".to_string());
384        Ok(Self {
385            hostname,
386            proc_id: proc_id.to_string(),
387            tx,
388            status,
389        })
390    }
391}
392
393#[async_trait]
394impl LogSender for LocalLogSender {
395    fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()> {
396        if TxStatus::Active == *self.status.borrow() {
397            self.tx.post(LogMessage::Log {
398                hostname: self.hostname.clone(),
399                proc_id: self.proc_id.clone(),
400                output_target: target,
401                payload: wirevalue::Any::serialize(&payload)?,
402            });
403        }
404
405        Ok(())
406    }
407
408    fn flush(&mut self) -> anyhow::Result<()> {
409        // send will make sure message is delivered
410        if TxStatus::Active == *self.status.borrow() {
411            self.tx.post(LogMessage::Flush { sync_version: None });
412        }
413        Ok(())
414    }
415}
416
417/// Message sent to FileMonitor
418#[derive(Debug, Clone, Serialize, Deserialize, Named)]
419pub struct FileMonitorMessage {
420    lines: Vec<String>,
421}
422wirevalue::register_type!(FileMonitorMessage);
423
424/// File appender, coordinates write access to a file via a channel.
425pub struct FileAppender {
426    stdout_addr: ChannelAddr,
427    stderr_addr: ChannelAddr,
428    #[allow(dead_code)] // Tasks are self terminating
429    stdout_task: JoinHandle<()>,
430    #[allow(dead_code)]
431    stderr_task: JoinHandle<()>,
432    stop: Arc<Notify>,
433}
434
435impl fmt::Debug for FileAppender {
436    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437        f.debug_struct("FileMonitor")
438            .field("stdout_addr", &self.stdout_addr)
439            .field("stderr_addr", &self.stderr_addr)
440            .finish()
441    }
442}
443
444impl FileAppender {
445    /// Create a new FileAppender with aggregated log files for stdout and stderr
446    /// Returns None if file creation fails
447    pub fn new() -> Option<Self> {
448        let stop = Arc::new(Notify::new());
449        // TODO make it configurable
450        let file_name_tag = hostname::get()
451            .unwrap_or_else(|_| "unknown_host".into())
452            .into_string()
453            .unwrap_or("unknown_host".to_string());
454
455        // Create stdout file and task
456        let (stdout_path, stdout_writer) =
457            match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stdout) {
458                Some(writer) => writer,
459                None => {
460                    tracing::warn!("failed to create stdout file");
461                    return None;
462                }
463            };
464        let (stdout_addr, stdout_rx) = {
465            let _guard = tracing::span!(Level::INFO, "appender", file = "stdout").entered();
466            match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
467                Ok((addr, rx)) => (addr, rx),
468                Err(e) => {
469                    tracing::warn!("failed to serve stdout channel: {}", e);
470                    return None;
471                }
472            }
473        };
474        let stdout_stop = stop.clone();
475        let stdout_task = tokio::spawn(file_monitor_task(
476            stdout_rx,
477            stdout_writer,
478            OutputTarget::Stdout,
479            stdout_stop,
480        ));
481
482        // Create stderr file and task
483        let (stderr_path, stderr_writer) =
484            match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stderr) {
485                Some(writer) => writer,
486                None => {
487                    tracing::warn!("failed to create stderr file");
488                    return None;
489                }
490            };
491        let (stderr_addr, stderr_rx) = {
492            let _guard = tracing::span!(Level::INFO, "appender", file = "stderr").entered();
493            match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
494                Ok((addr, rx)) => (addr, rx),
495                Err(e) => {
496                    tracing::warn!("failed to serve stderr channel: {}", e);
497                    return None;
498                }
499            }
500        };
501        let stderr_stop = stop.clone();
502        let stderr_task = tokio::spawn(file_monitor_task(
503            stderr_rx,
504            stderr_writer,
505            OutputTarget::Stderr,
506            stderr_stop,
507        ));
508
509        tracing::debug!(
510            "FileAppender: created for stdout {} stderr {} ",
511            stdout_path.display(),
512            stderr_path.display()
513        );
514
515        Some(Self {
516            stdout_addr,
517            stderr_addr,
518            stdout_task,
519            stderr_task,
520            stop,
521        })
522    }
523
524    /// Get a channel address for the specified output target
525    pub fn addr_for(&self, target: OutputTarget) -> ChannelAddr {
526        match target {
527            OutputTarget::Stdout => self.stdout_addr.clone(),
528            OutputTarget::Stderr => self.stderr_addr.clone(),
529        }
530    }
531}
532
533impl Drop for FileAppender {
534    fn drop(&mut self) {
535        // Trigger stop signal to notify tasks to exit
536        self.stop.notify_waiters();
537        tracing::debug!("FileMonitor: dropping, stop signal sent, tasks will flush and exit");
538    }
539}
540
541/// Task that receives lines from StreamFwds and writes them to the aggregated file
542async fn file_monitor_task(
543    mut rx: ChannelRx<FileMonitorMessage>,
544    mut writer: Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
545    target: OutputTarget,
546    stop: Arc<Notify>,
547) {
548    loop {
549        tokio::select! {
550            msg = rx.recv() => {
551                match msg {
552                    Ok(msg) => {
553                        // Write lines to aggregated file
554                        for line in &msg.lines {
555                            if let Err(e) = writer.write_all(line.as_bytes()).await {
556                                tracing::warn!("FileMonitor: failed to write line to file: {}", e);
557                                continue;
558                            }
559                            if let Err(e) = writer.write_all(b"\n").await {
560                                tracing::warn!("FileMonitor: failed to write newline to file: {}", e);
561                            }
562                        }
563                        if let Err(e) = writer.flush().await {
564                            tracing::warn!("FileMonitor: failed to flush file: {}", e);
565                        }
566                    }
567                    Err(e) => {
568                        // Channel error
569                        tracing::debug!("FileMonitor task for {:?}: channel error: {}", target, e);
570                        break;
571                    }
572                }
573            }
574            _ = stop.notified() => {
575                tracing::debug!("FileMonitor task for {:?}: stop signal received", target);
576                break;
577            }
578        }
579    }
580
581    // Graceful shutdown: flush one last time
582    if let Err(e) = writer.flush().await {
583        tracing::warn!("FileMonitor: failed final flush: {}", e);
584    }
585    tracing::debug!("FileMonitor task for {:?} exiting", target);
586}
587
588fn create_unique_file_writer(
589    file_name_tag: &str,
590    output_target: OutputTarget,
591    env: env::Env,
592) -> Result<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
593    let suffix = match output_target {
594        OutputTarget::Stderr => "stderr",
595        OutputTarget::Stdout => "stdout",
596    };
597    let (path, filename) = log_file_path(env, None)?;
598    let path = Path::new(&path);
599    let mut full_path = PathBuf::from(path);
600
601    let uuid = ShortUuid::generate();
602
603    full_path.push(format!(
604        "{}_{}_{}.{}",
605        filename, file_name_tag, uuid, suffix
606    ));
607    let file = std::fs::OpenOptions::new()
608        .create(true)
609        .append(true)
610        .open(full_path.clone())?;
611    let tokio_file = tokio::fs::File::from_std(file);
612    // TODO: should we buffer this?
613    Ok((full_path, Box::new(tokio_file)))
614}
615
616fn get_unique_local_log_destination(
617    file_name_tag: &str,
618    output_target: OutputTarget,
619) -> Option<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
620    let env: env::Env = env::Env::current();
621    if env == env::Env::Local && !hyperactor_config::global::get(FORCE_FILE_LOG) {
622        tracing::debug!("not creating log file because of env type");
623        None
624    } else {
625        match create_unique_file_writer(file_name_tag, output_target, env) {
626            Ok((a, b)) => Some((a, b)),
627            Err(e) => {
628                tracing::warn!("failed to create unique file writer: {}", e);
629                None
630            }
631        }
632    }
633}
634
635/// Create a writer for stdout or stderr
636fn std_writer(target: OutputTarget) -> Box<dyn io::AsyncWrite + Send + Unpin> {
637    // Return the appropriate standard output or error writer
638    match target {
639        OutputTarget::Stdout => Box::new(tokio::io::stdout()),
640        OutputTarget::Stderr => Box::new(tokio::io::stderr()),
641    }
642}
643
644/// Copy bytes from `reader` to `writer`, forward to log_sender, and forward to FileMonitor.
645/// The same formatted lines go to both log_sender and file_monitor.
646async fn tee(
647    mut reader: impl AsyncRead + Unpin + Send + 'static,
648    mut std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
649    log_sender: Option<Box<dyn LogSender + Send>>,
650    file_monitor_addr: Option<ChannelAddr>,
651    target: OutputTarget,
652    prefix: Option<String>,
653    stop: Arc<Notify>,
654    recent_lines_buf: RotatingLineBuffer,
655) -> Result<(), io::Error> {
656    let mut buf = [0u8; 8192];
657    let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE);
658    let mut log_sender = log_sender;
659
660    // Dial the file monitor channel if provided
661    let mut file_monitor_tx: Option<ChannelTx<FileMonitorMessage>> =
662        file_monitor_addr.and_then(|addr| match channel::dial(addr.clone()) {
663            Ok(tx) => Some(tx),
664            Err(e) => {
665                tracing::warn!("Failed to dial file monitor channel {}: {}", addr, e);
666                None
667            }
668        });
669
670    loop {
671        tokio::select! {
672            read_result = reader.read(&mut buf) => {
673                match read_result {
674                    Ok(n) => {
675                        if n == 0 {
676                            // EOF reached
677                            tracing::debug!("EOF reached in tee");
678                            break;
679                        }
680
681                        // Write to console
682                        if let Err(e) = std_writer.write_all(&buf[..n]).await {
683                            tracing::warn!("error writing to std: {}", e);
684                        }
685
686                        // Process bytes into lines for log_sender and FileMonitor
687                        let mut completed_lines = Vec::new();
688
689                        for &byte in &buf[..n] {
690                            if byte == b'\n' {
691                                // Complete line found
692                                let mut line = String::from_utf8_lossy(&line_buffer).to_string();
693
694                                // Truncate if too long, respecting UTF-8 boundaries
695                                // (multi-byte chars like emojis can be up to 4 bytes)
696                                if line.len() > MAX_LINE_SIZE {
697                                    let mut truncate_at = MAX_LINE_SIZE;
698                                    while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
699                                        truncate_at -= 1;
700                                    }
701                                    line.truncate(truncate_at);
702                                    line.push_str("... [TRUNCATED]");
703                                }
704
705                                // Prepend with prefix if configured
706                                let final_line = if let Some(ref p) = prefix {
707                                    format!("[{}] {}", p, line)
708                                } else {
709                                    line
710                                };
711
712                                completed_lines.push(final_line);
713                                line_buffer.clear();
714                            } else {
715                                line_buffer.push(byte);
716                            }
717                        }
718
719                        // Send completed lines to both log_sender and FileAppender
720                        if !completed_lines.is_empty() {
721                            if let Some(ref mut sender) = log_sender {
722                                let bytes: Vec<Vec<u8>> = completed_lines.iter()
723                                    .map(|s| s.as_bytes().to_vec())
724                                    .collect();
725                                if let Err(e) = sender.send(target, bytes) {
726                                    tracing::warn!("error sending to log_sender: {}", e);
727                                }
728                            }
729
730                            // Send to FileMonitor via hyperactor channel
731                            if let Some(ref mut tx) = file_monitor_tx {
732                                let msg = FileMonitorMessage {
733                                    lines: completed_lines,
734                                };
735                                // Use post() to avoid blocking
736                                tx.post(msg);
737                            }
738                        }
739
740                        recent_lines_buf.try_add_data(&buf, n);
741                    },
742                    Err(e) => {
743                        tracing::debug!("read error in tee: {}", e);
744                        return Err(e);
745                    }
746                }
747            },
748            _ = stop.notified() => {
749                tracing::debug!("stop signal received in tee");
750                break;
751            }
752        }
753    }
754
755    std_writer.flush().await?;
756
757    // Send any remaining partial line
758    if !line_buffer.is_empty() {
759        let mut line = String::from_utf8_lossy(&line_buffer).to_string();
760        // Truncate if too long, respecting UTF-8 boundaries
761        // (multi-byte chars like emojis can be up to 4 bytes)
762        if line.len() > MAX_LINE_SIZE {
763            let mut truncate_at = MAX_LINE_SIZE;
764            while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
765                truncate_at -= 1;
766            }
767            line.truncate(truncate_at);
768            line.push_str("... [TRUNCATED]");
769        }
770        let final_line = if let Some(ref p) = prefix {
771            format!("[{}] {}", p, line)
772        } else {
773            line
774        };
775
776        let final_lines = vec![final_line];
777
778        // Send to log_sender
779        if let Some(ref mut sender) = log_sender {
780            let bytes: Vec<Vec<u8>> = final_lines.iter().map(|s| s.as_bytes().to_vec()).collect();
781            let _ = sender.send(target, bytes);
782        }
783
784        // Send to FileMonitor
785        if let Some(ref mut tx) = file_monitor_tx {
786            let msg = FileMonitorMessage { lines: final_lines };
787            tx.post(msg);
788        }
789    }
790
791    // Flush log_sender
792    if let Some(ref mut sender) = log_sender {
793        let _ = sender.flush();
794    }
795
796    Ok(())
797}
798
799#[derive(Debug, Clone)]
800struct RotatingLineBuffer {
801    recent_lines: Arc<RwLock<VecDeque<String>>>,
802    max_buffer_size: usize,
803}
804
805impl RotatingLineBuffer {
806    fn try_add_data(&self, buf: &[u8], buf_end: usize) {
807        let data_str = String::from_utf8_lossy(&buf[..buf_end]);
808        let lines: Vec<&str> = data_str.lines().collect();
809
810        if let Ok(mut recent_lines_guard) = self.recent_lines.try_write() {
811            for line in lines {
812                if !line.is_empty() {
813                    recent_lines_guard.push_back(line.to_string());
814                    if recent_lines_guard.len() > self.max_buffer_size {
815                        recent_lines_guard.pop_front();
816                    }
817                }
818            }
819        } else {
820            tracing::debug!("Failed to acquire write lock on recent_lines buffer in tee");
821        }
822    }
823
824    async fn peek(&self) -> Vec<String> {
825        let lines = self.recent_lines.read().await;
826        let start_idx = if lines.len() > self.max_buffer_size {
827            lines.len() - self.max_buffer_size
828        } else {
829            0
830        };
831
832        lines.range(start_idx..).cloned().collect()
833    }
834}
835
836/// Given a stream forwards data to the provided channel.
837pub struct StreamFwder {
838    teer: JoinHandle<Result<(), io::Error>>,
839    // Shared buffer for peek functionality
840    recent_lines_buf: RotatingLineBuffer,
841    // Shutdown signal to stop the monitoring loop
842    stop: Arc<Notify>,
843}
844
845impl StreamFwder {
846    /// Create a new StreamFwder instance, and start monitoring the provided path.
847    /// Once started Monitor will
848    /// - forward logs to log_sender
849    /// - forward logs to file_monitor (if available)
850    /// - pipe reader to target
851    /// - And capture last `max_buffer_size` which can be used to inspect file contents via `peek`.
852    pub fn start(
853        reader: impl AsyncRead + Unpin + Send + 'static,
854        file_monitor_addr: Option<ChannelAddr>,
855        target: OutputTarget,
856        max_buffer_size: usize,
857        log_channel: Option<ChannelAddr>,
858        proc_id: &ProcAddr,
859        local_rank: usize,
860    ) -> Self {
861        let prefix = match hyperactor_config::global::get(PREFIX_WITH_RANK) {
862            true => Some(local_rank.to_string()),
863            false => None,
864        };
865        let std_writer = std_writer(target);
866
867        Self::start_with_writer(
868            reader,
869            std_writer,
870            file_monitor_addr,
871            target,
872            max_buffer_size,
873            log_channel,
874            proc_id,
875            prefix,
876        )
877    }
878
879    /// Create a new StreamFwder instance with a custom writer (used in tests).
880    fn start_with_writer(
881        reader: impl AsyncRead + Unpin + Send + 'static,
882        std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
883        file_monitor_addr: Option<ChannelAddr>,
884        target: OutputTarget,
885        max_buffer_size: usize,
886        log_channel: Option<ChannelAddr>,
887        proc_id: &ProcAddr,
888        prefix: Option<String>,
889    ) -> Self {
890        // Sanity: when there is no file sink, no log forwarding, and
891        // `tail_size == 0`, the child should have **inherited** stdio
892        // and no `StreamFwder` should exist. In that case console
893        // mirroring happens via inheritance, not via `StreamFwder`.
894        // If we hit this, we piped unnecessarily.
895        debug_assert!(
896            file_monitor_addr.is_some() || max_buffer_size > 0 || log_channel.is_some(),
897            "StreamFwder started with no sinks and no tail"
898        );
899
900        let stop = Arc::new(Notify::new());
901        let recent_lines_buf = RotatingLineBuffer {
902            recent_lines: Arc::new(RwLock::new(VecDeque::<String>::with_capacity(
903                max_buffer_size,
904            ))),
905            max_buffer_size,
906        };
907
908        let log_sender: Option<Box<dyn LogSender + Send>> = if let Some(addr) = log_channel {
909            match LocalLogSender::new(addr, proc_id) {
910                Ok(s) => Some(Box::new(s) as Box<dyn LogSender + Send>),
911                Err(e) => {
912                    tracing::error!("failed to create log sender: {}", e);
913                    None
914                }
915            }
916        } else {
917            None
918        };
919
920        let teer_stop = stop.clone();
921        let recent_line_buf_clone = recent_lines_buf.clone();
922        let teer = tokio::spawn(async move {
923            tee(
924                reader,
925                std_writer,
926                log_sender,
927                file_monitor_addr,
928                target,
929                prefix,
930                teer_stop,
931                recent_line_buf_clone,
932            )
933            .await
934        });
935
936        StreamFwder {
937            teer,
938            recent_lines_buf,
939            stop,
940        }
941    }
942
943    pub async fn abort(self) -> (Vec<String>, Result<(), anyhow::Error>) {
944        self.stop.notify_waiters();
945
946        let lines = self.peek().await;
947        let teer_result = self.teer.await;
948
949        let result: Result<(), anyhow::Error> = match teer_result {
950            Ok(inner) => inner.map_err(anyhow::Error::from),
951            Err(e) => Err(e.into()),
952        };
953
954        (lines, result)
955    }
956
957    /// Inspect the latest `max_buffer` lines read from the file being monitored
958    /// Returns lines in chronological order (oldest first)
959    pub async fn peek(&self) -> Vec<String> {
960        self.recent_lines_buf.peek().await
961    }
962}
963
964/// Messages that can be sent to the LogForwarder
965#[derive(
966    Debug,
967    Clone,
968    Serialize,
969    Deserialize,
970    Named,
971    Handler,
972    HandleClient,
973    RefClient,
974    Bind,
975    Unbind
976)]
977pub enum LogForwardMessage {
978    /// Receive the log from the parent process and forward it to the client.
979    Forward {},
980
981    /// If to stream the log back to the client.
982    SetMode { stream_to_client: bool },
983
984    /// Flush the log with a version number.
985    ForceSyncFlush { version: u64 },
986}
987
988/// A log forwarder that receives the log from its parent process and forward it back to the client
989#[hyperactor::export(LogForwardMessage { cast = true })]
990#[hyperactor::spawnable]
991pub struct LogForwardActor {
992    rx: ChannelRx<LogMessage>,
993    flush_tx: Arc<Mutex<ChannelTx<LogMessage>>>,
994    next_flush_deadline: SystemTime,
995    logging_client_ref: ActorRef<LogClientActor>,
996    stream_to_client: bool,
997}
998
999#[async_trait]
1000impl Actor for LogForwardActor {
1001    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1002        this.set_system();
1003        this.post_after(this, LogForwardMessage::Forward {}, Duration::from_secs(0));
1004
1005        // Make sure we start the flush loop periodically so the log channel will not deadlock.
1006        self.flush_tx
1007            .lock()
1008            .await
1009            .send(LogMessage::Flush { sync_version: None })
1010            .await?;
1011        Ok(())
1012    }
1013}
1014
1015#[async_trait]
1016impl hyperactor::RemoteSpawn for LogForwardActor {
1017    type Params = ActorRef<LogClientActor>;
1018
1019    async fn new(logging_client_ref: Self::Params, _environment: Flattrs) -> Result<Self> {
1020        let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
1021            Ok(channel) => channel.parse()?,
1022            Err(err) => {
1023                tracing::debug!(
1024                    "log forwarder actor failed to read env var {}: {}",
1025                    BOOTSTRAP_LOG_CHANNEL,
1026                    err
1027                );
1028                // TODO: an empty channel to serve
1029                ChannelAddr::any(ChannelTransport::Unix)
1030            }
1031        };
1032        tracing::info!(
1033            "log forwarder {} serve at {}",
1034            std::process::id(),
1035            log_channel
1036        );
1037
1038        let rx = match channel::serve(log_channel.clone()) {
1039            Ok((_, rx)) => rx,
1040            Err(err) => {
1041                // This can happen if we are not spanwed on a separate process like local.
1042                // For local mesh, log streaming anyway is not needed.
1043                tracing::error!(
1044                    "log forwarder actor failed to bootstrap on given channel {}: {}",
1045                    log_channel,
1046                    err
1047                );
1048                channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1
1049            }
1050        };
1051
1052        // Dial the same channel to send flush message to drain the log queue.
1053        let flush_tx = Arc::new(Mutex::new(channel::dial::<LogMessage>(log_channel)?));
1054        let now = std::time::SystemTime::now();
1055
1056        Ok(Self {
1057            rx,
1058            flush_tx,
1059            next_flush_deadline: now,
1060            logging_client_ref,
1061            stream_to_client: true,
1062        })
1063    }
1064}
1065
1066#[async_trait]
1067#[hyperactor::handle(LogForwardMessage)]
1068impl LogForwardMessageHandler for LogForwardActor {
1069    async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
1070        match self.rx.recv().await {
1071            Ok(LogMessage::Flush { sync_version }) => {
1072                let now = std::time::SystemTime::now();
1073                match sync_version {
1074                    None => {
1075                        // Schedule another flush to keep the log channel from deadlocking.
1076                        let delay = Duration::from_secs(1);
1077                        if now >= self.next_flush_deadline {
1078                            self.next_flush_deadline = now + delay;
1079                            let flush_tx = self.flush_tx.clone();
1080                            tokio::spawn(async move {
1081                                tokio::time::sleep(delay).await;
1082                                if let Err(e) = flush_tx
1083                                    .lock()
1084                                    .await
1085                                    .send(LogMessage::Flush { sync_version: None })
1086                                    .await
1087                                {
1088                                    tracing::error!("failed to send flush message: {}", e);
1089                                }
1090                            });
1091                        }
1092                    }
1093                    version => {
1094                        self.logging_client_ref.flush(ctx, version).await?;
1095                    }
1096                }
1097            }
1098            Ok(LogMessage::Log {
1099                hostname,
1100                proc_id,
1101                output_target,
1102                payload,
1103            }) => {
1104                if self.stream_to_client {
1105                    self.logging_client_ref
1106                        .log(ctx, hostname, proc_id, output_target, payload)
1107                        .await?;
1108                }
1109            }
1110            Err(e) => {
1111                return Err(e.into());
1112            }
1113        }
1114
1115        // This is not ideal as we are using raw tx/rx.
1116        ctx.post_after(ctx, LogForwardMessage::Forward {}, Duration::from_secs(0));
1117
1118        Ok(())
1119    }
1120
1121    async fn set_mode(
1122        &mut self,
1123        _ctx: &Context<Self>,
1124        stream_to_client: bool,
1125    ) -> Result<(), anyhow::Error> {
1126        self.stream_to_client = stream_to_client;
1127        Ok(())
1128    }
1129
1130    async fn force_sync_flush(
1131        &mut self,
1132        _cx: &Context<Self>,
1133        version: u64,
1134    ) -> Result<(), anyhow::Error> {
1135        self.flush_tx
1136            .lock()
1137            .await
1138            .send(LogMessage::Flush {
1139                sync_version: Some(version),
1140            })
1141            .await
1142            .map_err(anyhow::Error::from)
1143    }
1144}
1145
1146/// Deserialize a serialized message and split it into UTF-8 lines
1147fn deserialize_message_lines(serialized_message: &wirevalue::Any) -> Result<Vec<Vec<String>>> {
1148    // Try to deserialize as Vec<Vec<u8>> first (multiple byte arrays)
1149    if let Ok(message_bytes) = serialized_message.deserialized::<Vec<Vec<u8>>>() {
1150        let mut result = Vec::new();
1151        for bytes in message_bytes {
1152            let message_str = String::from_utf8(bytes)?;
1153            let lines: Vec<String> = message_str.lines().map(|s| s.to_string()).collect();
1154            result.push(lines);
1155        }
1156        return Ok(result);
1157    }
1158
1159    // If that fails, try to deserialize as String and wrap in Vec<Vec<String>>
1160    if let Ok(message) = serialized_message.deserialized::<String>() {
1161        let lines: Vec<String> = message.lines().map(|s| s.to_string()).collect();
1162        return Ok(vec![lines]);
1163    }
1164
1165    // If both fail, return an error
1166    anyhow::bail!("failed to deserialize message as either Vec<Vec<u8>> or String")
1167}
1168
1169/// A client to receive logs from remote processes
1170#[derive(Debug)]
1171#[hyperactor::export(LogMessage, LogClientMessage)]
1172#[hyperactor::spawnable]
1173pub struct LogClientActor {
1174    aggregate_window_sec: Option<u64>,
1175    aggregators: HashMap<OutputTarget, Aggregator>,
1176    last_flush_time: SystemTime,
1177    next_flush_deadline: Option<SystemTime>,
1178
1179    // For flush sync barrier
1180    current_flush_version: u64,
1181    current_flush_port: Option<OncePortRef<()>>,
1182    current_unflushed_procs: usize,
1183}
1184
1185impl Default for LogClientActor {
1186    fn default() -> Self {
1187        // Initialize aggregators
1188        let mut aggregators = HashMap::new();
1189        aggregators.insert(OutputTarget::Stderr, Aggregator::new());
1190        aggregators.insert(OutputTarget::Stdout, Aggregator::new());
1191
1192        Self {
1193            aggregate_window_sec: Some(DEFAULT_AGGREGATE_WINDOW_SEC),
1194            aggregators,
1195            last_flush_time: std::time::SystemTime::now(),
1196            next_flush_deadline: None,
1197            current_flush_version: 0,
1198            current_flush_port: None,
1199            current_unflushed_procs: 0,
1200        }
1201    }
1202}
1203
1204#[async_trait]
1205impl Actor for LogClientActor {
1206    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1207        this.set_system();
1208        Ok(())
1209    }
1210}
1211
1212impl LogClientActor {
1213    fn print_aggregators(&mut self) {
1214        for (output_target, aggregator) in self.aggregators.iter_mut() {
1215            if aggregator.is_empty() {
1216                continue;
1217            }
1218            match output_target {
1219                OutputTarget::Stdout => {
1220                    println!("{}", aggregator);
1221                }
1222                OutputTarget::Stderr => {
1223                    eprintln!("{}", aggregator);
1224                }
1225            }
1226
1227            // Reset the aggregator
1228            aggregator.reset();
1229        }
1230    }
1231
1232    fn print_log_line(hostname: &str, proc_id: &str, output_target: OutputTarget, line: String) {
1233        let message = format!("[{} {}] {}", hostname, proc_id, line);
1234
1235        #[cfg(test)]
1236        crate::logging::test_tap::push(&message);
1237
1238        match output_target {
1239            OutputTarget::Stdout => println!("{}", message),
1240            OutputTarget::Stderr => eprintln!("{}", message),
1241        }
1242    }
1243
1244    fn flush_internal(&mut self) {
1245        self.print_aggregators();
1246        self.last_flush_time = std::time::SystemTime::now();
1247        self.next_flush_deadline = None;
1248    }
1249}
1250
1251impl Drop for LogClientActor {
1252    fn drop(&mut self) {
1253        // Flush the remaining logs before shutting down
1254        self.print_aggregators();
1255    }
1256}
1257
1258#[async_trait]
1259#[hyperactor::handle(LogMessage)]
1260impl LogMessageHandler for LogClientActor {
1261    async fn log(
1262        &mut self,
1263        cx: &Context<Self>,
1264        hostname: String,
1265        proc_id: String,
1266        output_target: OutputTarget,
1267        payload: wirevalue::Any,
1268    ) -> Result<(), anyhow::Error> {
1269        // Deserialize the message and process line by line with UTF-8
1270        let message_line_groups = deserialize_message_lines(&payload)?;
1271        let hostname = hostname.as_str();
1272
1273        let message_lines: Vec<String> = message_line_groups.into_iter().flatten().collect();
1274        match self.aggregate_window_sec {
1275            None => {
1276                for line in message_lines {
1277                    Self::print_log_line(hostname, &proc_id, output_target, line);
1278                }
1279                self.last_flush_time = std::time::SystemTime::now();
1280            }
1281            Some(window) => {
1282                for line in message_lines {
1283                    if let Some(aggregator) = self.aggregators.get_mut(&output_target) {
1284                        if let Err(e) = aggregator.add_line(&line) {
1285                            tracing::error!("error adding log line: {}", e);
1286                            // For the sake of completeness, flush the log lines.
1287                            Self::print_log_line(hostname, &proc_id, output_target, line);
1288                        }
1289                    } else {
1290                        tracing::error!("unknown output target: {:?}", output_target);
1291                        // For the sake of completeness, flush the log lines.
1292                        Self::print_log_line(hostname, &proc_id, output_target, line);
1293                    }
1294                }
1295
1296                let new_deadline = self.last_flush_time + Duration::from_secs(window);
1297                let now = std::time::SystemTime::now();
1298                if new_deadline <= now {
1299                    self.flush_internal();
1300                } else {
1301                    let delay = new_deadline.duration_since(now)?;
1302                    match self.next_flush_deadline {
1303                        None => {
1304                            self.next_flush_deadline = Some(new_deadline);
1305                            cx.post_after(cx, LogMessage::Flush { sync_version: None }, delay);
1306                        }
1307                        Some(deadline) => {
1308                            // Some early log lines have alrady triggered the flush.
1309                            if new_deadline < deadline {
1310                                // This can happen if the user has adjusted the aggregation window.
1311                                self.next_flush_deadline = Some(new_deadline);
1312                                cx.post_after(cx, LogMessage::Flush { sync_version: None }, delay);
1313                            }
1314                        }
1315                    }
1316                }
1317            }
1318        }
1319
1320        Ok(())
1321    }
1322
1323    async fn flush(
1324        &mut self,
1325        cx: &Context<Self>,
1326        sync_version: Option<u64>,
1327    ) -> Result<(), anyhow::Error> {
1328        match sync_version {
1329            None => {
1330                self.flush_internal();
1331            }
1332            Some(version) => {
1333                if version != self.current_flush_version {
1334                    tracing::error!(
1335                        "found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
1336                        version,
1337                        self.current_flush_version
1338                    );
1339                    return Ok(());
1340                }
1341
1342                if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
1343                    // This is a serious issue; it's better to error out.
1344                    anyhow::bail!("found no ongoing flush request");
1345                }
1346                self.current_unflushed_procs -= 1;
1347
1348                tracing::debug!(
1349                    "ack sync flush: version {}; remaining procs: {}",
1350                    self.current_flush_version,
1351                    self.current_unflushed_procs
1352                );
1353
1354                if self.current_unflushed_procs == 0 {
1355                    self.flush_internal();
1356                    let reply = self.current_flush_port.take().unwrap();
1357                    self.current_flush_port = None;
1358                    reply.post(cx, ());
1359                }
1360            }
1361        }
1362
1363        Ok(())
1364    }
1365}
1366
1367#[async_trait]
1368#[hyperactor::handle(LogClientMessage)]
1369impl LogClientMessageHandler for LogClientActor {
1370    async fn set_aggregate(
1371        &mut self,
1372        _cx: &Context<Self>,
1373        aggregate_window_sec: Option<u64>,
1374    ) -> Result<(), anyhow::Error> {
1375        if self.aggregate_window_sec.is_some() && aggregate_window_sec.is_none() {
1376            // Make sure we flush whatever in the aggregators before disabling aggregation.
1377            self.print_aggregators();
1378        }
1379        self.aggregate_window_sec = aggregate_window_sec;
1380        Ok(())
1381    }
1382
1383    async fn start_sync_flush(
1384        &mut self,
1385        cx: &Context<Self>,
1386        expected_procs_flushed: usize,
1387        reply: OncePortRef<()>,
1388        version: OncePortRef<u64>,
1389    ) -> Result<(), anyhow::Error> {
1390        if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
1391            tracing::warn!(
1392                "found unfinished ongoing flush: version {}; {} unflushed procs",
1393                self.current_flush_version,
1394                self.current_unflushed_procs,
1395            );
1396        }
1397
1398        self.current_flush_version += 1;
1399        tracing::debug!(
1400            "start sync flush with version {}",
1401            self.current_flush_version
1402        );
1403        self.current_flush_port = Some(reply.clone());
1404        self.current_unflushed_procs = expected_procs_flushed;
1405        version.post(cx, self.current_flush_version);
1406        Ok(())
1407    }
1408}
1409
1410#[cfg(test)]
1411pub mod test_tap {
1412    use std::sync::Mutex;
1413    use std::sync::OnceLock;
1414
1415    use tokio::sync::mpsc::UnboundedReceiver;
1416    use tokio::sync::mpsc::UnboundedSender;
1417
1418    static TAP: OnceLock<UnboundedSender<String>> = OnceLock::new();
1419    static RX: OnceLock<Mutex<UnboundedReceiver<String>>> = OnceLock::new();
1420
1421    // Called by tests to install the sender.
1422    pub fn install(tx: UnboundedSender<String>) {
1423        let _ = TAP.set(tx);
1424    }
1425
1426    // Called by tests to register the receiver so we can drain later.
1427    pub fn set_receiver(rx: UnboundedReceiver<String>) {
1428        let _ = RX.set(Mutex::new(rx));
1429    }
1430
1431    // Used by LogClientActor (under #[cfg(test)]) to push a line.
1432    pub fn push(s: &str) {
1433        if let Some(tx) = TAP.get() {
1434            let _ = tx.send(s.to_string());
1435        }
1436    }
1437
1438    // Tests call this to collect everything observed so far.
1439    pub fn drain() -> Vec<String> {
1440        let mut out = Vec::new();
1441        if let Some(rx) = RX.get() {
1442            let mut rx = rx.lock().unwrap();
1443            while let Ok(line) = rx.try_recv() {
1444                out.push(line);
1445            }
1446        }
1447        out
1448    }
1449}
1450
1451#[cfg(test)]
1452mod tests {
1453
1454    use std::sync::Arc;
1455    use std::sync::Mutex;
1456
1457    use hyperactor::ProcAddr;
1458    use hyperactor::RemoteSpawn;
1459    use hyperactor::channel;
1460    use hyperactor::channel::ChannelAddr;
1461    use hyperactor::channel::ChannelTx;
1462    use hyperactor::channel::Tx;
1463    use hyperactor::mailbox::BoxedMailboxSender;
1464    use hyperactor::mailbox::DialMailboxRouter;
1465    use hyperactor::mailbox::MailboxServer;
1466    use hyperactor::proc::Proc;
1467    use hyperactor::testing::ids::test_proc_id;
1468    use tokio::io::AsyncSeek;
1469    use tokio::io::AsyncSeekExt;
1470    use tokio::io::AsyncWriteExt;
1471    use tokio::io::SeekFrom;
1472    use tokio::sync::mpsc;
1473
1474    use super::*;
1475
1476    /// Result of processing file content
1477    #[derive(Debug)]
1478    struct FileProcessingResult {
1479        /// Complete lines found during processing
1480        lines: Vec<Vec<u8>>,
1481        /// Updated position in the file after processing
1482        new_position: u64,
1483        /// Any remaining incomplete line data, buffered for subsequent reads
1484        incomplete_line_buffer: Vec<u8>,
1485    }
1486
1487    /// Process new file content from a given position, extracting complete lines
1488    /// This function is extracted to enable easier unit testing without file system dependencies
1489    async fn process_file_content<R: AsyncRead + AsyncSeek + Unpin>(
1490        reader: &mut R,
1491        current_position: u64,
1492        file_size: u64,
1493        existing_line_buffer: Vec<u8>,
1494        max_buffer_size: usize,
1495    ) -> Result<FileProcessingResult> {
1496        // If position equals file size, we're at the end
1497        if current_position == file_size {
1498            return Ok(FileProcessingResult {
1499                lines: Vec::new(),
1500                new_position: current_position,
1501                incomplete_line_buffer: existing_line_buffer,
1502            });
1503        }
1504
1505        // Handle potential file truncation/rotation
1506        let actual_position = if current_position > file_size {
1507            tracing::warn!(
1508                "File appears to have been truncated (position {} > file size {}), resetting to start",
1509                current_position,
1510                file_size
1511            );
1512            reader.seek(SeekFrom::Start(0)).await?;
1513            0
1514        } else {
1515            // current_position < file_size
1516            reader.seek(SeekFrom::Start(current_position)).await?;
1517            current_position
1518        };
1519
1520        let mut buf = vec![0u8; 128 * 1024];
1521        let mut line_buffer = existing_line_buffer;
1522        let mut lines = Vec::with_capacity(max_buffer_size);
1523        let mut processed_bytes = 0u64;
1524
1525        loop {
1526            let bytes_read = reader.read(&mut buf).await?;
1527            if bytes_read == 0 {
1528                break;
1529            }
1530
1531            let chunk = &buf[..bytes_read];
1532
1533            let mut start = 0;
1534            while let Some(newline_pos) = chunk[start..].iter().position(|&b| b == b'\n') {
1535                let absolute_pos = start + newline_pos;
1536
1537                line_buffer.extend_from_slice(&chunk[start..absolute_pos]);
1538
1539                if !line_buffer.is_empty() {
1540                    if line_buffer.len() > MAX_LINE_SIZE {
1541                        line_buffer.truncate(MAX_LINE_SIZE);
1542                        line_buffer.extend_from_slice(b"... [TRUNCATED]");
1543                    }
1544
1545                    let line_data = std::mem::replace(&mut line_buffer, Vec::with_capacity(2048));
1546                    lines.push(line_data);
1547                }
1548
1549                start = absolute_pos + 1;
1550
1551                // Check if we've reached the max buffer size after adding each line
1552                if lines.len() >= max_buffer_size {
1553                    // We've processed up to and including the current newline
1554                    // The new position is where we should start reading next time
1555                    let new_position = actual_position + processed_bytes + start as u64;
1556
1557                    // Don't save remaining data - we'll re-read it from the new position
1558                    return Ok(FileProcessingResult {
1559                        lines,
1560                        new_position,
1561                        incomplete_line_buffer: Vec::new(),
1562                    });
1563                }
1564            }
1565
1566            // Only add bytes to processed_bytes if we've fully processed this chunk
1567            processed_bytes += bytes_read as u64;
1568
1569            if start < chunk.len() {
1570                line_buffer.extend_from_slice(&chunk[start..]);
1571            }
1572        }
1573
1574        let new_position = actual_position + processed_bytes;
1575
1576        Ok(FileProcessingResult {
1577            lines,
1578            new_position,
1579            incomplete_line_buffer: line_buffer,
1580        })
1581    }
1582
1583    #[tokio::test]
1584    async fn test_forwarding_log_to_client() {
1585        // Setup the basics
1586        let router = DialMailboxRouter::new();
1587        let (proc_addr, client_rx) =
1588            channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1589        let proc = Proc::configured(
1590            test_proc_id("client_0"),
1591            BoxedMailboxSender::new(router.clone()),
1592        );
1593        proc.clone().serve(client_rx);
1594        let proc_ref: ProcAddr = test_proc_id("client_0");
1595        router.bind(proc_ref, proc_addr.clone());
1596        let (client, _handle) = proc.client("client").unwrap();
1597
1598        // Spin up both the forwarder and the client
1599        let log_channel = ChannelAddr::any(ChannelTransport::Unix);
1600        // SAFETY: Unit test
1601        unsafe {
1602            std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
1603        }
1604        let log_client_actor = LogClientActor::new((), Flattrs::default()).await.unwrap();
1605        let log_client: ActorRef<LogClientActor> =
1606            proc.spawn("log_client", log_client_actor).unwrap().bind();
1607        let log_forwarder_actor = LogForwardActor::new(log_client.clone(), Flattrs::default())
1608            .await
1609            .unwrap();
1610        let log_forwarder: ActorRef<LogForwardActor> = proc
1611            .spawn("log_forwarder", log_forwarder_actor)
1612            .unwrap()
1613            .bind();
1614
1615        // Write some logs that will not be streamed
1616        let tx: ChannelTx<LogMessage> = channel::dial(log_channel).unwrap();
1617        tx.post(LogMessage::Log {
1618            hostname: "my_host".into(),
1619            proc_id: "test_proc".into(),
1620            output_target: OutputTarget::Stderr,
1621            payload: wirevalue::Any::serialize(&"will not stream".to_string()).unwrap(),
1622        });
1623
1624        // Turn on streaming
1625        log_forwarder.set_mode(&client, true).await.unwrap();
1626        tx.post(LogMessage::Log {
1627            hostname: "my_host".into(),
1628            proc_id: "test_proc".into(),
1629            output_target: OutputTarget::Stderr,
1630            payload: wirevalue::Any::serialize(&"will stream".to_string()).unwrap(),
1631        });
1632
1633        // TODO: it is hard to test out anything meaningful here as the client flushes to stdout.
1634    }
1635
1636    #[test]
1637    fn test_deserialize_message_lines_string() {
1638        // Test deserializing a String message with multiple lines
1639        let message = "Line 1\nLine 2\nLine 3".to_string();
1640        let serialized = wirevalue::Any::serialize(&message).unwrap();
1641
1642        let result = deserialize_message_lines(&serialized).unwrap();
1643        assert_eq!(result, vec![vec!["Line 1", "Line 2", "Line 3"]]);
1644
1645        // Test deserializing a Vec<Vec<u8>> message with UTF-8 content
1646        let message_bytes = vec![
1647            "Hello\nWorld".as_bytes().to_vec(),
1648            "UTF-8 \u{1F980}\nTest".as_bytes().to_vec(),
1649        ];
1650        let serialized = wirevalue::Any::serialize(&message_bytes).unwrap();
1651
1652        let result = deserialize_message_lines(&serialized).unwrap();
1653        assert_eq!(
1654            result,
1655            vec![vec!["Hello", "World"], vec!["UTF-8 \u{1F980}", "Test"]]
1656        );
1657
1658        // Test deserializing a single line message
1659        let message = "Single line message".to_string();
1660        let serialized = wirevalue::Any::serialize(&message).unwrap();
1661
1662        let result = deserialize_message_lines(&serialized).unwrap();
1663
1664        assert_eq!(result, vec![vec!["Single line message"]]);
1665
1666        // Test deserializing an empty lines
1667        let message = "\n\n".to_string();
1668        let serialized = wirevalue::Any::serialize(&message).unwrap();
1669
1670        let result = deserialize_message_lines(&serialized).unwrap();
1671
1672        assert_eq!(result, vec![vec!["", ""]]);
1673
1674        // Test error handling for invalid UTF-8 bytes
1675        let invalid_utf8_bytes = vec![vec![0xFF, 0xFE, 0xFD]]; // Invalid UTF-8 sequence in Vec<Vec<u8>>
1676        let serialized = wirevalue::Any::serialize(&invalid_utf8_bytes).unwrap();
1677
1678        let result = deserialize_message_lines(&serialized);
1679
1680        // The function should fail when trying to convert invalid UTF-8 bytes to String
1681        assert!(
1682            result.is_err(),
1683            "Expected deserialization to fail with invalid UTF-8 bytes"
1684        );
1685    }
1686    #[allow(dead_code)]
1687    struct MockLogSender {
1688        log_sender: mpsc::UnboundedSender<(OutputTarget, String)>, // (output_target, content)
1689        flush_called: Arc<Mutex<bool>>,                            // Track if flush was called
1690    }
1691
1692    impl MockLogSender {
1693        #[allow(dead_code)]
1694        fn new(log_sender: mpsc::UnboundedSender<(OutputTarget, String)>) -> Self {
1695            Self {
1696                log_sender,
1697                flush_called: Arc::new(Mutex::new(false)),
1698            }
1699        }
1700    }
1701
1702    #[async_trait]
1703    impl LogSender for MockLogSender {
1704        fn send(
1705            &mut self,
1706            output_target: OutputTarget,
1707            payload: Vec<Vec<u8>>,
1708        ) -> anyhow::Result<()> {
1709            // For testing purposes, convert to string if it's valid UTF-8
1710            let lines: Vec<String> = payload
1711                .iter()
1712                .map(|b| String::from_utf8_lossy(b).trim_end_matches('\n').to_owned())
1713                .collect();
1714
1715            for line in lines {
1716                self.log_sender
1717                    .send((output_target, line))
1718                    .map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e))?;
1719            }
1720            Ok(())
1721        }
1722
1723        fn flush(&mut self) -> anyhow::Result<()> {
1724            // Mark that flush was called
1725            let mut flush_called = self.flush_called.lock().unwrap();
1726            *flush_called = true;
1727
1728            // For testing purposes, just return Ok
1729            // In a real implementation, this would wait for all messages to be delivered
1730            Ok(())
1731        }
1732    }
1733
1734    #[test]
1735    fn test_string_similarity() {
1736        // Test exact match
1737        assert_eq!(normalized_edit_distance("hello", "hello"), 0.0);
1738
1739        // Test completely different strings
1740        assert_eq!(normalized_edit_distance("hello", "i'mdiff"), 1.0);
1741
1742        // Test similar strings
1743        assert!(normalized_edit_distance("hello", "helo") < 0.5);
1744        assert!(normalized_edit_distance("hello", "hello!") < 0.5);
1745
1746        // Test empty strings
1747        assert_eq!(normalized_edit_distance("", ""), 0.0);
1748        assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1749    }
1750
1751    #[test]
1752    fn test_add_line_to_empty_aggregator() {
1753        let mut aggregator = Aggregator::new();
1754        let result = aggregator.add_line("ERROR 404 not found");
1755
1756        assert!(result.is_ok());
1757        assert_eq!(aggregator.lines.len(), 1);
1758        assert_eq!(aggregator.lines[0].content, "ERROR 404 not found");
1759        assert_eq!(aggregator.lines[0].count, 1);
1760    }
1761
1762    #[test]
1763    fn test_add_line_merges_with_similar_line() {
1764        let mut aggregator = Aggregator::new_with_threshold(0.2);
1765
1766        // Add first line
1767        aggregator.add_line("ERROR 404 timeout").unwrap();
1768        assert_eq!(aggregator.lines.len(), 1);
1769
1770        // Add second line that should merge (similar enough)
1771        aggregator.add_line("ERROR 500 timeout").unwrap();
1772        assert_eq!(aggregator.lines.len(), 1); // Should still be 1 line after merge
1773        assert_eq!(aggregator.lines[0].count, 2);
1774
1775        // Add third line that's too different
1776        aggregator
1777            .add_line("WARNING database connection failed")
1778            .unwrap();
1779        assert_eq!(aggregator.lines.len(), 2); // Should be 2 lines now
1780
1781        // Add fourth line similar to third
1782        aggregator
1783            .add_line("WARNING database connection timed out")
1784            .unwrap();
1785        assert_eq!(aggregator.lines.len(), 2); // Should still be 2 lines
1786        assert_eq!(aggregator.lines[1].count, 2); // Second group has 2 lines
1787    }
1788
1789    #[test]
1790    fn test_aggregation_of_similar_log_lines() {
1791        let mut aggregator = Aggregator::new_with_threshold(0.2);
1792
1793        // Add the provided log lines with small differences
1794        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/i/ims-ansi, falling back to search/replace to update prefixes for this file.").unwrap();
1795        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,351 conda-unpack-fb:292] Found invalid offsets for lib/pkgconfig/ncursesw.pc, falling back to search/replace to update prefixes for this file.").unwrap();
1796        aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/k/kt7, falling back to search/replace to update prefixes for this file.").unwrap();
1797
1798        // Check that we have only one aggregated line due to similarity
1799        assert_eq!(aggregator.lines.len(), 1);
1800
1801        // Check that the count is 3
1802        assert_eq!(aggregator.lines[0].count, 3);
1803    }
1804
1805    #[tokio::test]
1806    async fn test_stream_fwd_creation() {
1807        hyperactor_telemetry::initialize_logging_for_test();
1808
1809        let (mut writer, reader) = tokio::io::duplex(1024);
1810        let (log_channel, mut rx) =
1811            channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1812
1813        // Create a temporary file for testing the writer
1814        let temp_file = tempfile::NamedTempFile::new().unwrap();
1815        let temp_path = temp_file.path().to_path_buf();
1816
1817        // Create file writer that writes to the temp file (using tokio for async compatibility)
1818        let file_writer = tokio::fs::OpenOptions::new()
1819            .create(true)
1820            .write(true)
1821            .append(true)
1822            .open(&temp_path)
1823            .await
1824            .unwrap();
1825
1826        // Create FileMonitor and get address for stdout
1827        let file_monitor = FileAppender::new();
1828        let file_monitor_addr = file_monitor
1829            .as_ref()
1830            .map(|fm| fm.addr_for(OutputTarget::Stdout));
1831
1832        let the_test_proc_id = test_proc_id("testproc_0");
1833        let monitor = StreamFwder::start_with_writer(
1834            reader,
1835            Box::new(file_writer),
1836            file_monitor_addr,
1837            OutputTarget::Stdout,
1838            3, // max_buffer_size
1839            Some(log_channel),
1840            &the_test_proc_id,
1841            None, // no prefix
1842        );
1843
1844        // Wait a bit for set up to be done
1845        tokio::time::sleep(Duration::from_millis(500)).await;
1846
1847        // Write initial content through the input writer
1848        writer.write_all(b"Initial log line\n").await.unwrap();
1849        writer.flush().await.unwrap();
1850
1851        // Write more content
1852        for i in 1..=5 {
1853            writer
1854                .write_all(format!("New log line {}\n", i).as_bytes())
1855                .await
1856                .unwrap();
1857        }
1858        writer.flush().await.unwrap();
1859
1860        // Wait a bit for the file to be written and the watcher to detect changes
1861        tokio::time::sleep(Duration::from_millis(500)).await;
1862
1863        // Wait until log sender gets message
1864        let timeout = Duration::from_secs(1);
1865        let _ = tokio::time::timeout(timeout, rx.recv())
1866            .await
1867            .unwrap_or_else(|_| panic!("Did not get log message within {:?}", timeout));
1868
1869        // Wait a bit more for all lines to be processed
1870        tokio::time::sleep(Duration::from_millis(200)).await;
1871
1872        let (recent_lines, _result) = monitor.abort().await;
1873
1874        assert!(
1875            recent_lines.len() >= 3,
1876            "Expected buffer with at least 3 lines, got {} lines: {:?}",
1877            recent_lines.len(),
1878            recent_lines
1879        );
1880
1881        let file_contents = std::fs::read_to_string(&temp_path).unwrap();
1882        assert!(
1883            file_contents.contains("Initial log line"),
1884            "Expected temp file to contain 'Initial log line', got: {:?}",
1885            file_contents
1886        );
1887        assert!(
1888            file_contents.contains("New log line 1"),
1889            "Expected temp file to contain 'New log line 1', got: {:?}",
1890            file_contents
1891        );
1892        assert!(
1893            file_contents.contains("New log line 5"),
1894            "Expected temp file to contain 'New log line 5', got: {:?}",
1895            file_contents
1896        );
1897    }
1898
1899    #[test]
1900    fn test_aggregator_custom_threshold() {
1901        // Test with very strict threshold (0.05)
1902        let mut strict_aggregator = Aggregator::new_with_threshold(0.05);
1903        strict_aggregator.add_line("ERROR 404").unwrap();
1904        strict_aggregator.add_line("ERROR 500").unwrap(); // Should not merge due to strict threshold
1905        assert_eq!(strict_aggregator.lines.len(), 2);
1906
1907        // Test with very lenient threshold (0.8)
1908        let mut lenient_aggregator = Aggregator::new_with_threshold(0.8);
1909        lenient_aggregator.add_line("ERROR 404").unwrap();
1910        lenient_aggregator.add_line("WARNING 200").unwrap(); // Should merge due to lenient threshold
1911        assert_eq!(lenient_aggregator.lines.len(), 1);
1912        assert_eq!(lenient_aggregator.lines[0].count, 2);
1913    }
1914
1915    #[test]
1916    fn test_format_system_time() {
1917        let test_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1609459200); // 2021-01-01 00:00:00 UTC
1918        let formatted = format_system_time(test_time);
1919
1920        // Just verify it's a reasonable format (contains date and time components)
1921        assert!(formatted.contains("-"));
1922        assert!(formatted.contains(":"));
1923        assert!(formatted.len() > 10); // Should be reasonable length
1924    }
1925
1926    #[test]
1927    fn test_aggregator_display_formatting() {
1928        let mut aggregator = Aggregator::new();
1929        aggregator.add_line("Test error message").unwrap();
1930        aggregator.add_line("Test error message").unwrap(); // Should merge
1931
1932        let display_string = format!("{}", aggregator);
1933
1934        // Verify the output contains expected elements
1935        assert!(display_string.contains("Aggregated Logs"));
1936        assert!(display_string.contains("[2 similar log lines]"));
1937        assert!(display_string.contains("Test error message"));
1938        assert!(display_string.contains(">>>") && display_string.contains("<<<"));
1939    }
1940
1941    #[tokio::test]
1942    async fn test_local_log_sender_inactive_status() {
1943        let (log_channel, _) =
1944            channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1945        let test_proc_id = test_proc_id("testproc_0");
1946        let mut sender = LocalLogSender::new(log_channel, &test_proc_id).unwrap();
1947
1948        // This test verifies that the sender handles inactive status gracefully
1949        // In a real scenario, the channel would be closed, but for testing we just
1950        // verify the send/flush methods don't panic
1951        let result = sender.send(OutputTarget::Stdout, vec![b"test".to_vec()]);
1952        assert!(result.is_ok());
1953
1954        let result = sender.flush();
1955        assert!(result.is_ok());
1956    }
1957
1958    #[test]
1959    fn test_levenshtein_distance_edge_cases() {
1960        // Test with empty strings
1961        assert_eq!(levenshtein_distance("", ""), 0);
1962        assert_eq!(levenshtein_distance("", "hello"), 5);
1963        assert_eq!(levenshtein_distance("hello", ""), 5);
1964
1965        // Test with identical strings
1966        assert_eq!(levenshtein_distance("hello", "hello"), 0);
1967
1968        // Test with single character differences
1969        assert_eq!(levenshtein_distance("hello", "helo"), 1); // deletion
1970        assert_eq!(levenshtein_distance("helo", "hello"), 1); // insertion
1971        assert_eq!(levenshtein_distance("hello", "hallo"), 1); // substitution
1972
1973        // Test with unicode characters
1974        assert_eq!(levenshtein_distance("café", "cafe"), 1);
1975    }
1976
1977    #[test]
1978    fn test_normalized_edit_distance_edge_cases() {
1979        // Test with empty strings
1980        assert_eq!(normalized_edit_distance("", ""), 0.0);
1981
1982        // Test normalization
1983        assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1984        assert_eq!(normalized_edit_distance("", "hello"), 1.0);
1985
1986        // Test that result is always between 0.0 and 1.0
1987        let distance = normalized_edit_distance("completely", "different");
1988        assert!((0.0..=1.0).contains(&distance));
1989    }
1990
1991    #[tokio::test]
1992    async fn test_deserialize_message_lines_edge_cases() {
1993        // Test with empty string
1994        let empty_message = "".to_string();
1995        let serialized = wirevalue::Any::serialize(&empty_message).unwrap();
1996        let result = deserialize_message_lines(&serialized).unwrap();
1997        assert_eq!(result, vec![vec![] as Vec<String>]);
1998
1999        // Test with trailing newline
2000        let trailing_newline = "line1\nline2\n".to_string();
2001        let serialized = wirevalue::Any::serialize(&trailing_newline).unwrap();
2002        let result = deserialize_message_lines(&serialized).unwrap();
2003        assert_eq!(result, vec![vec!["line1", "line2"]]);
2004    }
2005
2006    #[test]
2007    fn test_output_target_serialization() {
2008        // Test that OutputTarget can be serialized and deserialized
2009        let stdout_serialized = serde_json::to_string(&OutputTarget::Stdout).unwrap();
2010        let stderr_serialized = serde_json::to_string(&OutputTarget::Stderr).unwrap();
2011
2012        let stdout_deserialized: OutputTarget = serde_json::from_str(&stdout_serialized).unwrap();
2013        let stderr_deserialized: OutputTarget = serde_json::from_str(&stderr_serialized).unwrap();
2014
2015        assert_eq!(stdout_deserialized, OutputTarget::Stdout);
2016        assert_eq!(stderr_deserialized, OutputTarget::Stderr);
2017    }
2018
2019    #[test]
2020    fn test_log_line_display_formatting() {
2021        let log_line = LogLine::new("Test message".to_string());
2022        let display_string = format!("{}", log_line);
2023
2024        assert!(display_string.contains("[1 similar log lines]"));
2025        assert!(display_string.contains("Test message"));
2026
2027        // Test with higher count
2028        let mut log_line_multi = LogLine::new("Test message".to_string());
2029        log_line_multi.count = 5;
2030        let display_string_multi = format!("{}", log_line_multi);
2031
2032        assert!(display_string_multi.contains("[5 similar log lines]"));
2033        assert!(display_string_multi.contains("Test message"));
2034    }
2035
2036    // Mock reader for testing process_file_content using std::io::Cursor
2037    fn create_mock_reader(data: Vec<u8>) -> std::io::Cursor<Vec<u8>> {
2038        std::io::Cursor::new(data)
2039    }
2040
2041    #[tokio::test]
2042    async fn test_process_file_content_basic() {
2043        let data = b"line1\nline2\nline3\n".to_vec();
2044        let mut reader = create_mock_reader(data.clone());
2045        let max_buf_size = 10;
2046
2047        let result =
2048            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2049                .await
2050                .unwrap();
2051
2052        assert_eq!(result.lines.len(), 3);
2053        assert_eq!(result.lines[0], b"line1");
2054        assert_eq!(result.lines[1], b"line2");
2055        assert_eq!(result.lines[2], b"line3");
2056        assert_eq!(result.new_position, data.len() as u64);
2057        assert!(result.incomplete_line_buffer.is_empty());
2058    }
2059
2060    #[tokio::test]
2061    async fn test_process_file_content_incomplete_line() {
2062        let data = b"line1\nline2\npartial".to_vec();
2063        let mut reader = create_mock_reader(data.clone());
2064        let max_buf_size = 10;
2065
2066        let result =
2067            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2068                .await
2069                .unwrap();
2070
2071        assert_eq!(result.lines.len(), 2);
2072        assert_eq!(result.lines[0], b"line1");
2073        assert_eq!(result.lines[1], b"line2");
2074        assert_eq!(result.new_position, data.len() as u64);
2075        assert_eq!(result.incomplete_line_buffer, b"partial");
2076    }
2077
2078    #[tokio::test]
2079    async fn test_process_file_content_with_existing_buffer() {
2080        let data = b"omplete\nline2\nline3\n".to_vec();
2081        let mut reader = create_mock_reader(data.clone());
2082        let existing_buffer = b"inc".to_vec();
2083        let max_buf_size = 10;
2084
2085        let result = process_file_content(
2086            &mut reader,
2087            0,
2088            data.len() as u64,
2089            existing_buffer,
2090            max_buf_size,
2091        )
2092        .await
2093        .unwrap();
2094
2095        assert_eq!(result.lines.len(), 3);
2096        assert_eq!(result.lines[0], b"incomplete");
2097        assert_eq!(result.lines[1], b"line2");
2098        assert_eq!(result.lines[2], b"line3");
2099        assert_eq!(result.new_position, data.len() as u64);
2100        assert!(result.incomplete_line_buffer.is_empty());
2101    }
2102
2103    #[tokio::test]
2104    async fn test_process_file_content_empty_file() {
2105        let data = Vec::new();
2106        let mut reader = create_mock_reader(data.clone());
2107        let max_buf_size = 10;
2108
2109        let result = process_file_content(&mut reader, 0, 0, Vec::new(), max_buf_size)
2110            .await
2111            .unwrap();
2112
2113        assert!(result.lines.is_empty());
2114        assert_eq!(result.new_position, 0);
2115        assert!(result.incomplete_line_buffer.is_empty());
2116    }
2117
2118    #[tokio::test]
2119    async fn test_process_file_content_only_newlines() {
2120        let data = b"\n\n\n".to_vec();
2121        let mut reader = create_mock_reader(data.clone());
2122        let max_buf_size = 10;
2123
2124        let result =
2125            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2126                .await
2127                .unwrap();
2128
2129        // Empty lines should not be added (the function skips empty line_buffer)
2130        assert!(result.lines.is_empty());
2131        assert_eq!(result.new_position, data.len() as u64);
2132        assert!(result.incomplete_line_buffer.is_empty());
2133    }
2134
2135    #[tokio::test]
2136    async fn test_process_file_content_no_newlines() {
2137        let data = b"no newlines here".to_vec();
2138        let mut reader = create_mock_reader(data.clone());
2139        let max_buf_size = 10;
2140
2141        let result =
2142            process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2143                .await
2144                .unwrap();
2145
2146        assert!(result.lines.is_empty());
2147        assert_eq!(result.new_position, data.len() as u64);
2148        assert_eq!(result.incomplete_line_buffer, b"no newlines here");
2149    }
2150
2151    #[tokio::test]
2152    async fn test_process_file_content_file_truncation() {
2153        let data = b"line1\nline2\n".to_vec();
2154        let mut reader = create_mock_reader(data.clone());
2155
2156        // Simulate current position being beyond file size (file was truncated)
2157        let result = process_file_content(
2158            &mut reader,
2159            100, // position beyond file size
2160            data.len() as u64,
2161            Vec::new(),
2162            10, // max_buf_size
2163        )
2164        .await
2165        .unwrap();
2166
2167        // Should reset to beginning and read all lines
2168        assert_eq!(result.lines.len(), 2);
2169        assert_eq!(result.lines[0], b"line1");
2170        assert_eq!(result.lines[1], b"line2");
2171        assert_eq!(result.new_position, data.len() as u64);
2172        assert!(result.incomplete_line_buffer.is_empty());
2173    }
2174
2175    #[tokio::test]
2176    async fn test_process_file_content_seek_to_position() {
2177        let data = b"line1\nline2\nline3\n".to_vec();
2178        let mut reader = create_mock_reader(data.clone());
2179
2180        // Start reading from position 6 (after "line1\n")
2181        let result = process_file_content(&mut reader, 6, data.len() as u64, Vec::new(), 10)
2182            .await
2183            .unwrap();
2184
2185        assert_eq!(result.lines.len(), 2);
2186        assert_eq!(result.lines[0], b"line2");
2187        assert_eq!(result.lines[1], b"line3");
2188        assert_eq!(result.new_position, data.len() as u64);
2189        assert!(result.incomplete_line_buffer.is_empty());
2190    }
2191
2192    #[tokio::test]
2193    async fn test_process_file_content_position_equals_file_size() {
2194        let data = b"line1\nline2\n".to_vec();
2195        let mut reader = create_mock_reader(data.clone());
2196
2197        // Start reading from end of file
2198        let result = process_file_content(
2199            &mut reader,
2200            data.len() as u64,
2201            data.len() as u64,
2202            Vec::new(),
2203            10,
2204        )
2205        .await
2206        .unwrap();
2207
2208        // Should not read anything new
2209        assert!(
2210            result.lines.is_empty(),
2211            "Expected empty line got {:?}",
2212            result.lines
2213        );
2214        assert_eq!(result.new_position, data.len() as u64);
2215        assert!(result.incomplete_line_buffer.is_empty());
2216    }
2217
2218    #[tokio::test]
2219    async fn test_process_file_content_large_line_truncation() {
2220        // Create a line longer than MAX_LINE_SIZE
2221        let large_line = "x".repeat(MAX_LINE_SIZE + 1000);
2222        let data = format!("{}\nline2\n", large_line).into_bytes();
2223        let mut reader = create_mock_reader(data.clone());
2224
2225        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2226            .await
2227            .unwrap();
2228
2229        assert_eq!(result.lines.len(), 2);
2230
2231        // First line should be truncated
2232        assert_eq!(
2233            result.lines[0].len(),
2234            MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2235        );
2236        assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2237
2238        // Second line should be normal
2239        assert_eq!(result.lines[1], b"line2");
2240
2241        assert_eq!(result.new_position, data.len() as u64);
2242        assert!(result.incomplete_line_buffer.is_empty());
2243    }
2244
2245    #[tokio::test]
2246    async fn test_process_file_content_mixed_line_endings() {
2247        let data = b"line1\nline2\r\nline3\n".to_vec();
2248        let mut reader = create_mock_reader(data.clone());
2249
2250        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2251            .await
2252            .unwrap();
2253
2254        assert_eq!(result.lines.len(), 3);
2255        assert_eq!(result.lines[0], b"line1");
2256        assert_eq!(result.lines[1], b"line2\r"); // \r is preserved
2257        assert_eq!(result.lines[2], b"line3");
2258        assert_eq!(result.new_position, data.len() as u64);
2259        assert!(result.incomplete_line_buffer.is_empty());
2260    }
2261
2262    #[tokio::test]
2263    async fn test_process_file_content_existing_buffer_with_truncation() {
2264        // Create a scenario where existing buffer + new data creates a line that needs truncation
2265        let existing_buffer = "x".repeat(MAX_LINE_SIZE - 100);
2266        let data = format!("{}\nline2\n", "y".repeat(200)).into_bytes();
2267        let mut reader = create_mock_reader(data.clone());
2268
2269        let result = process_file_content(
2270            &mut reader,
2271            0,
2272            data.len() as u64,
2273            existing_buffer.into_bytes(),
2274            10,
2275        )
2276        .await
2277        .unwrap();
2278
2279        assert_eq!(result.lines.len(), 2);
2280
2281        // First line should be truncated (existing buffer + new data)
2282        assert_eq!(
2283            result.lines[0].len(),
2284            MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2285        );
2286        assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2287
2288        // Second line should be normal
2289        assert_eq!(result.lines[1], b"line2");
2290
2291        assert_eq!(result.new_position, data.len() as u64);
2292        assert!(result.incomplete_line_buffer.is_empty());
2293    }
2294
2295    #[tokio::test]
2296    async fn test_process_file_content_single_character_lines() {
2297        let data = b"a\nb\nc\n".to_vec();
2298        let mut reader = create_mock_reader(data.clone());
2299
2300        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2301            .await
2302            .unwrap();
2303
2304        assert_eq!(result.lines.len(), 3);
2305        assert_eq!(result.lines[0], b"a");
2306        assert_eq!(result.lines[1], b"b");
2307        assert_eq!(result.lines[2], b"c");
2308        assert_eq!(result.new_position, data.len() as u64);
2309        assert!(result.incomplete_line_buffer.is_empty());
2310    }
2311
2312    #[tokio::test]
2313    async fn test_process_file_content_binary_data() {
2314        let data = vec![0x00, 0x01, 0x02, b'\n', 0xFF, 0xFE, b'\n'];
2315        let mut reader = create_mock_reader(data.clone());
2316
2317        let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2318            .await
2319            .unwrap();
2320
2321        assert_eq!(result.lines.len(), 2);
2322        assert_eq!(result.lines[0], vec![0x00, 0x01, 0x02]);
2323        assert_eq!(result.lines[1], vec![0xFF, 0xFE]);
2324        assert_eq!(result.new_position, data.len() as u64);
2325        assert!(result.incomplete_line_buffer.is_empty());
2326    }
2327
2328    #[tokio::test]
2329    async fn test_process_file_content_resume_after_max_buffer_size() {
2330        // Test data: 3 lines as specified in the example
2331        let data = b"line 1\nline 2\nline 3\n".to_vec();
2332        let mut reader = create_mock_reader(data.clone());
2333        let max_buffer_size = 2; // Limit to 2 lines per call
2334
2335        // First call: should return first 2 lines
2336        let result1 = process_file_content(
2337            &mut reader,
2338            0, // start from beginning
2339            data.len() as u64,
2340            Vec::new(), // no existing buffer
2341            max_buffer_size,
2342        )
2343        .await
2344        .unwrap();
2345
2346        // Verify first call results
2347        assert_eq!(result1.lines.len(), 2, "First call should return 2 lines");
2348        assert_eq!(result1.lines[0], b"line 1");
2349        assert_eq!(result1.lines[1], b"line 2");
2350        assert!(result1.incomplete_line_buffer.is_empty());
2351
2352        // The position should be after "line 1\nline 2\n" (14 bytes)
2353        let expected_position_after_first_call = b"line 1\nline 2\n".len() as u64;
2354        assert_eq!(result1.new_position, expected_position_after_first_call);
2355
2356        // Second call: resume from where first call left off
2357        let mut reader2 = create_mock_reader(data.clone());
2358        let result2 = process_file_content(
2359            &mut reader2,
2360            result1.new_position, // resume from previous position
2361            data.len() as u64,
2362            result1.incomplete_line_buffer, // pass any incomplete buffer (should be empty)
2363            max_buffer_size,
2364        )
2365        .await
2366        .unwrap();
2367
2368        // Verify second call results
2369        assert_eq!(result2.lines.len(), 1, "Second call should return 1 line");
2370        assert_eq!(result2.lines[0], b"line 3");
2371        assert!(result2.incomplete_line_buffer.is_empty());
2372        assert_eq!(result2.new_position, data.len() as u64);
2373    }
2374
2375    #[tokio::test]
2376    async fn test_utf8_truncation() {
2377        // Test that StreamFwder doesn't panic when truncating lines
2378        // with multi-byte chars.
2379
2380        hyperactor_telemetry::initialize_logging_for_test();
2381
2382        // Create a line longer than MAX_LINE_SIZE with an emoji at the boundary
2383        let mut long_line = "x".repeat(MAX_LINE_SIZE - 1);
2384        long_line.push('🦀'); // 4-byte emoji - truncation will land in the middle
2385        long_line.push('\n');
2386
2387        // Create IO streams
2388        let (mut writer, reader) = tokio::io::duplex(8192);
2389
2390        // Start StreamFwder
2391        let test_proc_id = test_proc_id("testproc_0");
2392        let monitor = StreamFwder::start_with_writer(
2393            reader,
2394            Box::new(tokio::io::sink()), // discard output
2395            None,                        // no file monitor needed
2396            OutputTarget::Stdout,
2397            1,    // tail buffer of 1 (need at least one sink)
2398            None, // no log channel
2399            &test_proc_id,
2400            None, // no prefix
2401        );
2402
2403        // Write the problematic line
2404        writer.write_all(long_line.as_bytes()).await.unwrap();
2405        drop(writer); // Close to signal EOF
2406
2407        // Wait for completion - should NOT panic
2408        let (_lines, result) = monitor.abort().await;
2409        result.expect("Should complete without panic despite UTF-8 truncation");
2410    }
2411}