1use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::fmt;
12use std::path::Path;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::Duration;
16use std::time::SystemTime;
17
18use anyhow::Result;
19use async_trait::async_trait;
20use chrono::DateTime;
21use chrono::Local;
22use hostname;
23use hyperactor::Actor;
24use hyperactor::Bind;
25use hyperactor::Context;
26use hyperactor::HandleClient;
27use hyperactor::Handler;
28use hyperactor::Instance;
29use hyperactor::RefClient;
30use hyperactor::Unbind;
31use hyperactor::channel;
32use hyperactor::channel::ChannelAddr;
33use hyperactor::channel::ChannelRx;
34use hyperactor::channel::ChannelTransport;
35use hyperactor::channel::ChannelTx;
36use hyperactor::channel::Rx;
37use hyperactor::channel::Tx;
38use hyperactor::channel::TxStatus;
39use hyperactor::reference as hyperactor_reference;
40use hyperactor_config::CONFIG;
41use hyperactor_config::ConfigAttr;
42use hyperactor_config::Flattrs;
43use hyperactor_config::attrs::declare_attrs;
44use hyperactor_telemetry::env;
45use hyperactor_telemetry::log_file_path;
46use serde::Deserialize;
47use serde::Serialize;
48use tokio::io;
49use tokio::io::AsyncRead;
50use tokio::io::AsyncReadExt;
51use tokio::io::AsyncWriteExt;
52use tokio::sync::Mutex;
53use tokio::sync::Notify;
54use tokio::sync::RwLock;
55use tokio::sync::watch::Receiver;
56use tokio::task::JoinHandle;
57use tracing::Level;
58use typeuri::Named;
59
60use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
61use crate::shortuuid::ShortUuid;
62
63mod line_prefixing_writer;
64
65pub(crate) const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5;
66const MAX_LINE_SIZE: usize = 4 * 1024;
67
68declare_attrs! {
69 @meta(CONFIG = ConfigAttr::new(
73 Some("HYPERACTOR_READ_LOG_BUFFER".to_string()),
74 Some("read_log_buffer".to_string()),
75 ))
76 pub attr READ_LOG_BUFFER: usize = 100;
77
78 @meta(CONFIG = ConfigAttr::new(
80 Some("HYPERACTOR_FORCE_FILE_LOG".to_string()),
81 Some("force_file_log".to_string()),
82 ))
83 pub attr FORCE_FILE_LOG: bool = false;
84
85 @meta(CONFIG = ConfigAttr::new(
87 Some("HYPERACTOR_PREFIX_WITH_RANK".to_string()),
88 Some("prefix_with_rank".to_string()),
89 ))
90 pub attr PREFIX_WITH_RANK: bool = true;
91}
92
93fn levenshtein_distance(left: &str, right: &str) -> usize {
95 let left_chars: Vec<char> = left.chars().collect();
96 let right_chars: Vec<char> = right.chars().collect();
97
98 let left_len = left_chars.len();
99 let right_len = right_chars.len();
100
101 if left_len == 0 {
103 return right_len;
104 }
105 if right_len == 0 {
106 return left_len;
107 }
108
109 let mut matrix = vec![vec![0; right_len + 1]; left_len + 1];
111
112 for (i, row) in matrix.iter_mut().enumerate().take(left_len + 1) {
114 row[0] = i;
115 }
116 for (j, cell) in matrix[0].iter_mut().enumerate().take(right_len + 1) {
117 *cell = j;
118 }
119
120 for i in 1..=left_len {
122 for j in 1..=right_len {
123 let cost = if left_chars[i - 1] == right_chars[j - 1] {
124 0
125 } else {
126 1
127 };
128
129 matrix[i][j] = std::cmp::min(
130 std::cmp::min(
131 matrix[i - 1][j] + 1, matrix[i][j - 1] + 1, ),
134 matrix[i - 1][j - 1] + cost, );
136 }
137 }
138
139 matrix[left_len][right_len]
141}
142
143fn normalized_edit_distance(left: &str, right: &str) -> f64 {
145 let distance = levenshtein_distance(left, right) as f64;
146 let max_len = std::cmp::max(left.len(), right.len()) as f64;
147
148 if max_len == 0.0 {
149 0.0 } else {
151 distance / max_len
152 }
153}
154
155#[derive(Debug, Clone)]
156struct LogLine {
158 content: String,
159 pub count: u64,
160}
161
162impl LogLine {
163 fn new(content: String) -> Self {
164 Self { content, count: 1 }
165 }
166}
167
168impl fmt::Display for LogLine {
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 write!(
171 f,
172 "\x1b[33m[{} similar log lines]\x1b[0m {}",
173 self.count, self.content
174 )
175 }
176}
177
178#[derive(Debug, Clone)]
179struct Aggregator {
182 lines: Vec<LogLine>,
183 start_time: SystemTime,
184 similarity_threshold: f64, }
186
187impl Aggregator {
188 fn new() -> Self {
189 Self::new_with_threshold(0.15)
191 }
192
193 fn new_with_threshold(threshold: f64) -> Self {
194 Aggregator {
195 lines: vec![],
196 start_time: std::time::SystemTime::now(),
197 similarity_threshold: threshold,
198 }
199 }
200
201 fn reset(&mut self) {
202 self.lines.clear();
203 self.start_time = std::time::SystemTime::now();
204 }
205
206 fn add_line(&mut self, line: &str) -> anyhow::Result<()> {
207 let mut best_match_idx = None;
209 let mut best_similarity = f64::MAX;
210
211 for (idx, existing_line) in self.lines.iter().enumerate() {
212 let distance = normalized_edit_distance(&existing_line.content, line);
213
214 if distance < best_similarity && distance < self.similarity_threshold {
216 best_match_idx = Some(idx);
217 best_similarity = distance;
218 }
219 }
220
221 if let Some(idx) = best_match_idx {
223 self.lines[idx].count += 1;
224 } else {
225 self.lines.push(LogLine::new(line.to_string()));
227 }
228
229 Ok(())
230 }
231
232 fn is_empty(&self) -> bool {
233 self.lines.is_empty()
234 }
235}
236
237fn format_system_time(time: SystemTime) -> String {
239 let datetime: DateTime<Local> = time.into();
240 datetime.format("%Y-%m-%d %H:%M:%S").to_string()
241}
242
243impl fmt::Display for Aggregator {
244 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245 let start_time_str = format_system_time(self.start_time);
247
248 let current_time = std::time::SystemTime::now();
250 let end_time_str = format_system_time(current_time);
251
252 writeln!(
254 f,
255 "\x1b[36m>>> Aggregated Logs ({}) >>>\x1b[0m",
256 start_time_str
257 )?;
258
259 for line in self.lines.iter() {
261 writeln!(f, "{}", line)?;
262 }
263 writeln!(
264 f,
265 "\x1b[36m<<< Aggregated Logs ({}) <<<\x1b[0m",
266 end_time_str
267 )?;
268 Ok(())
269 }
270}
271
272#[derive(
274 Debug,
275 Clone,
276 Serialize,
277 Deserialize,
278 Named,
279 Handler,
280 HandleClient,
281 RefClient
282)]
283pub enum LogMessage {
284 Log {
286 hostname: String,
288 proc_id: String,
290 output_target: OutputTarget,
292 payload: wirevalue::Any,
294 },
295
296 Flush {
298 sync_version: Option<u64>,
301 },
302}
303
304#[derive(
306 Debug,
307 Clone,
308 Serialize,
309 Deserialize,
310 Named,
311 Handler,
312 HandleClient,
313 RefClient
314)]
315pub enum LogClientMessage {
316 SetAggregate {
317 aggregate_window_sec: Option<u64>,
319 },
320
321 StartSyncFlush {
323 expected_procs: usize,
325 reply: hyperactor_reference::OncePortRef<()>,
327 version: hyperactor_reference::OncePortRef<u64>,
329 },
330}
331
332#[async_trait]
334pub trait LogSender: Send + Sync {
335 fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()>;
337
338 fn flush(&mut self) -> anyhow::Result<()>;
341}
342
343#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
345pub enum OutputTarget {
346 Stdout,
348 Stderr,
350}
351
352#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
353pub enum Stream {
354 ChildStdout,
356 ChildStderr,
358}
359
360pub struct LocalLogSender {
362 hostname: String,
363 proc_id: String,
364 tx: ChannelTx<LogMessage>,
365 status: Receiver<TxStatus>,
366}
367
368impl LocalLogSender {
369 fn new(
370 log_channel: ChannelAddr,
371 proc_id: &hyperactor_reference::ProcId,
372 ) -> Result<Self, anyhow::Error> {
373 let tx = channel::dial::<LogMessage>(log_channel)?;
374 let status = tx.status().clone();
375
376 let hostname = hostname::get()
377 .unwrap_or_else(|_| "unknown_host".into())
378 .into_string()
379 .unwrap_or("unknown_host".to_string());
380 Ok(Self {
381 hostname,
382 proc_id: proc_id.to_string(),
383 tx,
384 status,
385 })
386 }
387}
388
389#[async_trait]
390impl LogSender for LocalLogSender {
391 fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()> {
392 if TxStatus::Active == *self.status.borrow() {
393 self.tx.post(LogMessage::Log {
395 hostname: self.hostname.clone(),
396 proc_id: self.proc_id.clone(),
397 output_target: target,
398 payload: wirevalue::Any::serialize(&payload)?,
399 });
400 }
401
402 Ok(())
403 }
404
405 fn flush(&mut self) -> anyhow::Result<()> {
406 if TxStatus::Active == *self.status.borrow() {
408 self.tx.post(LogMessage::Flush { sync_version: None });
410 }
411 Ok(())
412 }
413}
414
415#[derive(Debug, Clone, Serialize, Deserialize, Named)]
417pub struct FileMonitorMessage {
418 lines: Vec<String>,
419}
420wirevalue::register_type!(FileMonitorMessage);
421
422pub struct FileAppender {
424 stdout_addr: ChannelAddr,
425 stderr_addr: ChannelAddr,
426 #[allow(dead_code)] stdout_task: JoinHandle<()>,
428 #[allow(dead_code)]
429 stderr_task: JoinHandle<()>,
430 stop: Arc<Notify>,
431}
432
433impl fmt::Debug for FileAppender {
434 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
435 f.debug_struct("FileMonitor")
436 .field("stdout_addr", &self.stdout_addr)
437 .field("stderr_addr", &self.stderr_addr)
438 .finish()
439 }
440}
441
442impl FileAppender {
443 pub fn new() -> Option<Self> {
446 let stop = Arc::new(Notify::new());
447 let file_name_tag = hostname::get()
449 .unwrap_or_else(|_| "unknown_host".into())
450 .into_string()
451 .unwrap_or("unknown_host".to_string());
452
453 let (stdout_path, stdout_writer) =
455 match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stdout) {
456 Some(writer) => writer,
457 None => {
458 tracing::warn!("failed to create stdout file");
459 return None;
460 }
461 };
462 let (stdout_addr, stdout_rx) = {
463 let _guard = tracing::span!(Level::INFO, "appender", file = "stdout").entered();
464 match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
465 Ok((addr, rx)) => (addr, rx),
466 Err(e) => {
467 tracing::warn!("failed to serve stdout channel: {}", e);
468 return None;
469 }
470 }
471 };
472 let stdout_stop = stop.clone();
473 let stdout_task = tokio::spawn(file_monitor_task(
474 stdout_rx,
475 stdout_writer,
476 OutputTarget::Stdout,
477 stdout_stop,
478 ));
479
480 let (stderr_path, stderr_writer) =
482 match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stderr) {
483 Some(writer) => writer,
484 None => {
485 tracing::warn!("failed to create stderr file");
486 return None;
487 }
488 };
489 let (stderr_addr, stderr_rx) = {
490 let _guard = tracing::span!(Level::INFO, "appender", file = "stderr").entered();
491 match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
492 Ok((addr, rx)) => (addr, rx),
493 Err(e) => {
494 tracing::warn!("failed to serve stderr channel: {}", e);
495 return None;
496 }
497 }
498 };
499 let stderr_stop = stop.clone();
500 let stderr_task = tokio::spawn(file_monitor_task(
501 stderr_rx,
502 stderr_writer,
503 OutputTarget::Stderr,
504 stderr_stop,
505 ));
506
507 tracing::debug!(
508 "FileAppender: created for stdout {} stderr {} ",
509 stdout_path.display(),
510 stderr_path.display()
511 );
512
513 Some(Self {
514 stdout_addr,
515 stderr_addr,
516 stdout_task,
517 stderr_task,
518 stop,
519 })
520 }
521
522 pub fn addr_for(&self, target: OutputTarget) -> ChannelAddr {
524 match target {
525 OutputTarget::Stdout => self.stdout_addr.clone(),
526 OutputTarget::Stderr => self.stderr_addr.clone(),
527 }
528 }
529}
530
531impl Drop for FileAppender {
532 fn drop(&mut self) {
533 self.stop.notify_waiters();
535 tracing::debug!("FileMonitor: dropping, stop signal sent, tasks will flush and exit");
536 }
537}
538
539async fn file_monitor_task(
541 mut rx: ChannelRx<FileMonitorMessage>,
542 mut writer: Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
543 target: OutputTarget,
544 stop: Arc<Notify>,
545) {
546 loop {
547 tokio::select! {
548 msg = rx.recv() => {
549 match msg {
550 Ok(msg) => {
551 for line in &msg.lines {
553 if let Err(e) = writer.write_all(line.as_bytes()).await {
554 tracing::warn!("FileMonitor: failed to write line to file: {}", e);
555 continue;
556 }
557 if let Err(e) = writer.write_all(b"\n").await {
558 tracing::warn!("FileMonitor: failed to write newline to file: {}", e);
559 }
560 }
561 if let Err(e) = writer.flush().await {
562 tracing::warn!("FileMonitor: failed to flush file: {}", e);
563 }
564 }
565 Err(e) => {
566 tracing::debug!("FileMonitor task for {:?}: channel error: {}", target, e);
568 break;
569 }
570 }
571 }
572 _ = stop.notified() => {
573 tracing::debug!("FileMonitor task for {:?}: stop signal received", target);
574 break;
575 }
576 }
577 }
578
579 if let Err(e) = writer.flush().await {
581 tracing::warn!("FileMonitor: failed final flush: {}", e);
582 }
583 tracing::debug!("FileMonitor task for {:?} exiting", target);
584}
585
586fn create_unique_file_writer(
587 file_name_tag: &str,
588 output_target: OutputTarget,
589 env: env::Env,
590) -> Result<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
591 let suffix = match output_target {
592 OutputTarget::Stderr => "stderr",
593 OutputTarget::Stdout => "stdout",
594 };
595 let (path, filename) = log_file_path(env, None)?;
596 let path = Path::new(&path);
597 let mut full_path = PathBuf::from(path);
598
599 let uuid = ShortUuid::generate();
600
601 full_path.push(format!(
602 "{}_{}_{}.{}",
603 filename, file_name_tag, uuid, suffix
604 ));
605 let file = std::fs::OpenOptions::new()
606 .create(true)
607 .append(true)
608 .open(full_path.clone())?;
609 let tokio_file = tokio::fs::File::from_std(file);
610 Ok((full_path, Box::new(tokio_file)))
612}
613
614fn get_unique_local_log_destination(
615 file_name_tag: &str,
616 output_target: OutputTarget,
617) -> Option<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
618 let env: env::Env = env::Env::current();
619 if env == env::Env::Local && !hyperactor_config::global::get(FORCE_FILE_LOG) {
620 tracing::debug!("not creating log file because of env type");
621 None
622 } else {
623 match create_unique_file_writer(file_name_tag, output_target, env) {
624 Ok((a, b)) => Some((a, b)),
625 Err(e) => {
626 tracing::warn!("failed to create unique file writer: {}", e);
627 None
628 }
629 }
630 }
631}
632
633fn std_writer(target: OutputTarget) -> Box<dyn io::AsyncWrite + Send + Unpin> {
635 match target {
637 OutputTarget::Stdout => Box::new(tokio::io::stdout()),
638 OutputTarget::Stderr => Box::new(tokio::io::stderr()),
639 }
640}
641
642async fn tee(
645 mut reader: impl AsyncRead + Unpin + Send + 'static,
646 mut std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
647 log_sender: Option<Box<dyn LogSender + Send>>,
648 file_monitor_addr: Option<ChannelAddr>,
649 target: OutputTarget,
650 prefix: Option<String>,
651 stop: Arc<Notify>,
652 recent_lines_buf: RotatingLineBuffer,
653) -> Result<(), io::Error> {
654 let mut buf = [0u8; 8192];
655 let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE);
656 let mut log_sender = log_sender;
657
658 let mut file_monitor_tx: Option<ChannelTx<FileMonitorMessage>> =
660 file_monitor_addr.and_then(|addr| match channel::dial(addr.clone()) {
661 Ok(tx) => Some(tx),
662 Err(e) => {
663 tracing::warn!("Failed to dial file monitor channel {}: {}", addr, e);
664 None
665 }
666 });
667
668 loop {
669 tokio::select! {
670 read_result = reader.read(&mut buf) => {
671 match read_result {
672 Ok(n) => {
673 if n == 0 {
674 tracing::debug!("EOF reached in tee");
676 break;
677 }
678
679 if let Err(e) = std_writer.write_all(&buf[..n]).await {
681 tracing::warn!("error writing to std: {}", e);
682 }
683
684 let mut completed_lines = Vec::new();
686
687 for &byte in &buf[..n] {
688 if byte == b'\n' {
689 let mut line = String::from_utf8_lossy(&line_buffer).to_string();
691
692 if line.len() > MAX_LINE_SIZE {
695 let mut truncate_at = MAX_LINE_SIZE;
696 while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
697 truncate_at -= 1;
698 }
699 line.truncate(truncate_at);
700 line.push_str("... [TRUNCATED]");
701 }
702
703 let final_line = if let Some(ref p) = prefix {
705 format!("[{}] {}", p, line)
706 } else {
707 line
708 };
709
710 completed_lines.push(final_line);
711 line_buffer.clear();
712 } else {
713 line_buffer.push(byte);
714 }
715 }
716
717 if !completed_lines.is_empty() {
719 if let Some(ref mut sender) = log_sender {
720 let bytes: Vec<Vec<u8>> = completed_lines.iter()
721 .map(|s| s.as_bytes().to_vec())
722 .collect();
723 if let Err(e) = sender.send(target, bytes) {
724 tracing::warn!("error sending to log_sender: {}", e);
725 }
726 }
727
728 if let Some(ref mut tx) = file_monitor_tx {
730 let msg = FileMonitorMessage {
731 lines: completed_lines,
732 };
733 tx.post(msg);
735 }
736 }
737
738 recent_lines_buf.try_add_data(&buf, n);
739 },
740 Err(e) => {
741 tracing::debug!("read error in tee: {}", e);
742 return Err(e);
743 }
744 }
745 },
746 _ = stop.notified() => {
747 tracing::debug!("stop signal received in tee");
748 break;
749 }
750 }
751 }
752
753 std_writer.flush().await?;
754
755 if !line_buffer.is_empty() {
757 let mut line = String::from_utf8_lossy(&line_buffer).to_string();
758 if line.len() > MAX_LINE_SIZE {
761 let mut truncate_at = MAX_LINE_SIZE;
762 while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
763 truncate_at -= 1;
764 }
765 line.truncate(truncate_at);
766 line.push_str("... [TRUNCATED]");
767 }
768 let final_line = if let Some(ref p) = prefix {
769 format!("[{}] {}", p, line)
770 } else {
771 line
772 };
773
774 let final_lines = vec![final_line];
775
776 if let Some(ref mut sender) = log_sender {
778 let bytes: Vec<Vec<u8>> = final_lines.iter().map(|s| s.as_bytes().to_vec()).collect();
779 let _ = sender.send(target, bytes);
780 }
781
782 if let Some(ref mut tx) = file_monitor_tx {
784 let msg = FileMonitorMessage { lines: final_lines };
785 tx.post(msg);
786 }
787 }
788
789 if let Some(ref mut sender) = log_sender {
791 let _ = sender.flush();
792 }
793
794 Ok(())
795}
796
797#[derive(Debug, Clone)]
798struct RotatingLineBuffer {
799 recent_lines: Arc<RwLock<VecDeque<String>>>,
800 max_buffer_size: usize,
801}
802
803impl RotatingLineBuffer {
804 fn try_add_data(&self, buf: &[u8], buf_end: usize) {
805 let data_str = String::from_utf8_lossy(&buf[..buf_end]);
806 let lines: Vec<&str> = data_str.lines().collect();
807
808 if let Ok(mut recent_lines_guard) = self.recent_lines.try_write() {
809 for line in lines {
810 if !line.is_empty() {
811 recent_lines_guard.push_back(line.to_string());
812 if recent_lines_guard.len() > self.max_buffer_size {
813 recent_lines_guard.pop_front();
814 }
815 }
816 }
817 } else {
818 tracing::debug!("Failed to acquire write lock on recent_lines buffer in tee");
819 }
820 }
821
822 async fn peek(&self) -> Vec<String> {
823 let lines = self.recent_lines.read().await;
824 let start_idx = if lines.len() > self.max_buffer_size {
825 lines.len() - self.max_buffer_size
826 } else {
827 0
828 };
829
830 lines.range(start_idx..).cloned().collect()
831 }
832}
833
834pub struct StreamFwder {
836 teer: JoinHandle<Result<(), io::Error>>,
837 recent_lines_buf: RotatingLineBuffer,
839 stop: Arc<Notify>,
841}
842
843impl StreamFwder {
844 pub fn start(
851 reader: impl AsyncRead + Unpin + Send + 'static,
852 file_monitor_addr: Option<ChannelAddr>,
853 target: OutputTarget,
854 max_buffer_size: usize,
855 log_channel: Option<ChannelAddr>,
856 proc_id: &hyperactor_reference::ProcId,
857 local_rank: usize,
858 ) -> Self {
859 let prefix = match hyperactor_config::global::get(PREFIX_WITH_RANK) {
860 true => Some(local_rank.to_string()),
861 false => None,
862 };
863 let std_writer = std_writer(target);
864
865 Self::start_with_writer(
866 reader,
867 std_writer,
868 file_monitor_addr,
869 target,
870 max_buffer_size,
871 log_channel,
872 proc_id,
873 prefix,
874 )
875 }
876
877 fn start_with_writer(
879 reader: impl AsyncRead + Unpin + Send + 'static,
880 std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
881 file_monitor_addr: Option<ChannelAddr>,
882 target: OutputTarget,
883 max_buffer_size: usize,
884 log_channel: Option<ChannelAddr>,
885 proc_id: &hyperactor_reference::ProcId,
886 prefix: Option<String>,
887 ) -> Self {
888 debug_assert!(
894 file_monitor_addr.is_some() || max_buffer_size > 0 || log_channel.is_some(),
895 "StreamFwder started with no sinks and no tail"
896 );
897
898 let stop = Arc::new(Notify::new());
899 let recent_lines_buf = RotatingLineBuffer {
900 recent_lines: Arc::new(RwLock::new(VecDeque::<String>::with_capacity(
901 max_buffer_size,
902 ))),
903 max_buffer_size,
904 };
905
906 let log_sender: Option<Box<dyn LogSender + Send>> = if let Some(addr) = log_channel {
907 match LocalLogSender::new(addr, proc_id) {
908 Ok(s) => Some(Box::new(s) as Box<dyn LogSender + Send>),
909 Err(e) => {
910 tracing::error!("failed to create log sender: {}", e);
911 None
912 }
913 }
914 } else {
915 None
916 };
917
918 let teer_stop = stop.clone();
919 let recent_line_buf_clone = recent_lines_buf.clone();
920 let teer = tokio::spawn(async move {
921 tee(
922 reader,
923 std_writer,
924 log_sender,
925 file_monitor_addr,
926 target,
927 prefix,
928 teer_stop,
929 recent_line_buf_clone,
930 )
931 .await
932 });
933
934 StreamFwder {
935 teer,
936 recent_lines_buf,
937 stop,
938 }
939 }
940
941 pub async fn abort(self) -> (Vec<String>, Result<(), anyhow::Error>) {
942 self.stop.notify_waiters();
943
944 let lines = self.peek().await;
945 let teer_result = self.teer.await;
946
947 let result: Result<(), anyhow::Error> = match teer_result {
948 Ok(inner) => inner.map_err(anyhow::Error::from),
949 Err(e) => Err(e.into()),
950 };
951
952 (lines, result)
953 }
954
955 pub async fn peek(&self) -> Vec<String> {
958 self.recent_lines_buf.peek().await
959 }
960}
961
962#[derive(
964 Debug,
965 Clone,
966 Serialize,
967 Deserialize,
968 Named,
969 Handler,
970 HandleClient,
971 RefClient,
972 Bind,
973 Unbind
974)]
975pub enum LogForwardMessage {
976 Forward {},
978
979 SetMode { stream_to_client: bool },
981
982 ForceSyncFlush { version: u64 },
984}
985
986#[hyperactor::export(
988 spawn = true,
989 handlers = [LogForwardMessage {cast = true}],
990)]
991pub struct LogForwardActor {
992 rx: ChannelRx<LogMessage>,
993 flush_tx: Arc<Mutex<ChannelTx<LogMessage>>>,
994 next_flush_deadline: SystemTime,
995 logging_client_ref: hyperactor_reference::ActorRef<LogClientActor>,
996 stream_to_client: bool,
997}
998
999#[async_trait]
1000impl Actor for LogForwardActor {
1001 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1002 this.set_system();
1003 this.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1004
1005 self.flush_tx
1007 .lock()
1008 .await
1009 .send(LogMessage::Flush { sync_version: None })
1010 .await?;
1011 Ok(())
1012 }
1013}
1014
1015#[async_trait]
1016impl hyperactor::RemoteSpawn for LogForwardActor {
1017 type Params = hyperactor_reference::ActorRef<LogClientActor>;
1018
1019 async fn new(logging_client_ref: Self::Params, _environment: Flattrs) -> Result<Self> {
1020 let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
1021 Ok(channel) => channel.parse()?,
1022 Err(err) => {
1023 tracing::debug!(
1024 "log forwarder actor failed to read env var {}: {}",
1025 BOOTSTRAP_LOG_CHANNEL,
1026 err
1027 );
1028 ChannelAddr::any(ChannelTransport::Unix)
1030 }
1031 };
1032 tracing::info!(
1033 "log forwarder {} serve at {}",
1034 std::process::id(),
1035 log_channel
1036 );
1037
1038 let rx = match channel::serve(log_channel.clone()) {
1039 Ok((_, rx)) => rx,
1040 Err(err) => {
1041 tracing::error!(
1044 "log forwarder actor failed to bootstrap on given channel {}: {}",
1045 log_channel,
1046 err
1047 );
1048 channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1
1049 }
1050 };
1051
1052 let flush_tx = Arc::new(Mutex::new(channel::dial::<LogMessage>(log_channel)?));
1054 let now = std::time::SystemTime::now();
1055
1056 Ok(Self {
1057 rx,
1058 flush_tx,
1059 next_flush_deadline: now,
1060 logging_client_ref,
1061 stream_to_client: true,
1062 })
1063 }
1064}
1065
1066#[async_trait]
1067#[hyperactor::handle(LogForwardMessage)]
1068impl LogForwardMessageHandler for LogForwardActor {
1069 async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
1070 match self.rx.recv().await {
1071 Ok(LogMessage::Flush { sync_version }) => {
1072 let now = std::time::SystemTime::now();
1073 match sync_version {
1074 None => {
1075 let delay = Duration::from_secs(1);
1077 if now >= self.next_flush_deadline {
1078 self.next_flush_deadline = now + delay;
1079 let flush_tx = self.flush_tx.clone();
1080 tokio::spawn(async move {
1081 tokio::time::sleep(delay).await;
1082 if let Err(e) = flush_tx
1083 .lock()
1084 .await
1085 .send(LogMessage::Flush { sync_version: None })
1086 .await
1087 {
1088 tracing::error!("failed to send flush message: {}", e);
1089 }
1090 });
1091 }
1092 }
1093 version => {
1094 self.logging_client_ref.flush(ctx, version).await?;
1095 }
1096 }
1097 }
1098 Ok(LogMessage::Log {
1099 hostname,
1100 proc_id,
1101 output_target,
1102 payload,
1103 }) => {
1104 if self.stream_to_client {
1105 self.logging_client_ref
1106 .log(ctx, hostname, proc_id, output_target, payload)
1107 .await?;
1108 }
1109 }
1110 Err(e) => {
1111 return Err(e.into());
1112 }
1113 }
1114
1115 ctx.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?;
1117
1118 Ok(())
1119 }
1120
1121 async fn set_mode(
1122 &mut self,
1123 _ctx: &Context<Self>,
1124 stream_to_client: bool,
1125 ) -> Result<(), anyhow::Error> {
1126 self.stream_to_client = stream_to_client;
1127 Ok(())
1128 }
1129
1130 async fn force_sync_flush(
1131 &mut self,
1132 _cx: &Context<Self>,
1133 version: u64,
1134 ) -> Result<(), anyhow::Error> {
1135 self.flush_tx
1136 .lock()
1137 .await
1138 .send(LogMessage::Flush {
1139 sync_version: Some(version),
1140 })
1141 .await
1142 .map_err(anyhow::Error::from)
1143 }
1144}
1145
1146fn deserialize_message_lines(serialized_message: &wirevalue::Any) -> Result<Vec<Vec<String>>> {
1148 if let Ok(message_bytes) = serialized_message.deserialized::<Vec<Vec<u8>>>() {
1150 let mut result = Vec::new();
1151 for bytes in message_bytes {
1152 let message_str = String::from_utf8(bytes)?;
1153 let lines: Vec<String> = message_str.lines().map(|s| s.to_string()).collect();
1154 result.push(lines);
1155 }
1156 return Ok(result);
1157 }
1158
1159 if let Ok(message) = serialized_message.deserialized::<String>() {
1161 let lines: Vec<String> = message.lines().map(|s| s.to_string()).collect();
1162 return Ok(vec![lines]);
1163 }
1164
1165 anyhow::bail!("failed to deserialize message as either Vec<Vec<u8>> or String")
1167}
1168
1169#[derive(Debug)]
1171#[hyperactor::export(
1172 spawn = true,
1173 handlers = [LogMessage, LogClientMessage],
1174)]
1175pub struct LogClientActor {
1176 aggregate_window_sec: Option<u64>,
1177 aggregators: HashMap<OutputTarget, Aggregator>,
1178 last_flush_time: SystemTime,
1179 next_flush_deadline: Option<SystemTime>,
1180
1181 current_flush_version: u64,
1183 current_flush_port: Option<hyperactor_reference::OncePortRef<()>>,
1184 current_unflushed_procs: usize,
1185}
1186
1187impl Default for LogClientActor {
1188 fn default() -> Self {
1189 let mut aggregators = HashMap::new();
1191 aggregators.insert(OutputTarget::Stderr, Aggregator::new());
1192 aggregators.insert(OutputTarget::Stdout, Aggregator::new());
1193
1194 Self {
1195 aggregate_window_sec: Some(DEFAULT_AGGREGATE_WINDOW_SEC),
1196 aggregators,
1197 last_flush_time: std::time::SystemTime::now(),
1198 next_flush_deadline: None,
1199 current_flush_version: 0,
1200 current_flush_port: None,
1201 current_unflushed_procs: 0,
1202 }
1203 }
1204}
1205
1206#[async_trait]
1207impl Actor for LogClientActor {
1208 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1209 this.set_system();
1210 Ok(())
1211 }
1212}
1213
1214impl LogClientActor {
1215 fn print_aggregators(&mut self) {
1216 for (output_target, aggregator) in self.aggregators.iter_mut() {
1217 if aggregator.is_empty() {
1218 continue;
1219 }
1220 match output_target {
1221 OutputTarget::Stdout => {
1222 println!("{}", aggregator);
1223 }
1224 OutputTarget::Stderr => {
1225 eprintln!("{}", aggregator);
1226 }
1227 }
1228
1229 aggregator.reset();
1231 }
1232 }
1233
1234 fn print_log_line(hostname: &str, proc_id: &str, output_target: OutputTarget, line: String) {
1235 let message = format!("[{} {}] {}", hostname, proc_id, line);
1236
1237 #[cfg(test)]
1238 crate::logging::test_tap::push(&message);
1239
1240 match output_target {
1241 OutputTarget::Stdout => println!("{}", message),
1242 OutputTarget::Stderr => eprintln!("{}", message),
1243 }
1244 }
1245
1246 fn flush_internal(&mut self) {
1247 self.print_aggregators();
1248 self.last_flush_time = std::time::SystemTime::now();
1249 self.next_flush_deadline = None;
1250 }
1251}
1252
1253impl Drop for LogClientActor {
1254 fn drop(&mut self) {
1255 self.print_aggregators();
1257 }
1258}
1259
1260#[async_trait]
1261#[hyperactor::handle(LogMessage)]
1262impl LogMessageHandler for LogClientActor {
1263 async fn log(
1264 &mut self,
1265 cx: &Context<Self>,
1266 hostname: String,
1267 proc_id: String,
1268 output_target: OutputTarget,
1269 payload: wirevalue::Any,
1270 ) -> Result<(), anyhow::Error> {
1271 let message_line_groups = deserialize_message_lines(&payload)?;
1273 let hostname = hostname.as_str();
1274
1275 let message_lines: Vec<String> = message_line_groups.into_iter().flatten().collect();
1276 match self.aggregate_window_sec {
1277 None => {
1278 for line in message_lines {
1279 Self::print_log_line(hostname, &proc_id, output_target, line);
1280 }
1281 self.last_flush_time = std::time::SystemTime::now();
1282 }
1283 Some(window) => {
1284 for line in message_lines {
1285 if let Some(aggregator) = self.aggregators.get_mut(&output_target) {
1286 if let Err(e) = aggregator.add_line(&line) {
1287 tracing::error!("error adding log line: {}", e);
1288 Self::print_log_line(hostname, &proc_id, output_target, line);
1290 }
1291 } else {
1292 tracing::error!("unknown output target: {:?}", output_target);
1293 Self::print_log_line(hostname, &proc_id, output_target, line);
1295 }
1296 }
1297
1298 let new_deadline = self.last_flush_time + Duration::from_secs(window);
1299 let now = std::time::SystemTime::now();
1300 if new_deadline <= now {
1301 self.flush_internal();
1302 } else {
1303 let delay = new_deadline.duration_since(now)?;
1304 match self.next_flush_deadline {
1305 None => {
1306 self.next_flush_deadline = Some(new_deadline);
1307 cx.self_message_with_delay(
1308 LogMessage::Flush { sync_version: None },
1309 delay,
1310 )?;
1311 }
1312 Some(deadline) => {
1313 if new_deadline < deadline {
1315 self.next_flush_deadline = Some(new_deadline);
1317 cx.self_message_with_delay(
1318 LogMessage::Flush { sync_version: None },
1319 delay,
1320 )?;
1321 }
1322 }
1323 }
1324 }
1325 }
1326 }
1327
1328 Ok(())
1329 }
1330
1331 async fn flush(
1332 &mut self,
1333 cx: &Context<Self>,
1334 sync_version: Option<u64>,
1335 ) -> Result<(), anyhow::Error> {
1336 match sync_version {
1337 None => {
1338 self.flush_internal();
1339 }
1340 Some(version) => {
1341 if version != self.current_flush_version {
1342 tracing::error!(
1343 "found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
1344 version,
1345 self.current_flush_version
1346 );
1347 return Ok(());
1348 }
1349
1350 if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
1351 anyhow::bail!("found no ongoing flush request");
1353 }
1354 self.current_unflushed_procs -= 1;
1355
1356 tracing::debug!(
1357 "ack sync flush: version {}; remaining procs: {}",
1358 self.current_flush_version,
1359 self.current_unflushed_procs
1360 );
1361
1362 if self.current_unflushed_procs == 0 {
1363 self.flush_internal();
1364 let reply = self.current_flush_port.take().unwrap();
1365 self.current_flush_port = None;
1366 reply.send(cx, ()).map_err(anyhow::Error::from)?;
1367 }
1368 }
1369 }
1370
1371 Ok(())
1372 }
1373}
1374
1375#[async_trait]
1376#[hyperactor::handle(LogClientMessage)]
1377impl LogClientMessageHandler for LogClientActor {
1378 async fn set_aggregate(
1379 &mut self,
1380 _cx: &Context<Self>,
1381 aggregate_window_sec: Option<u64>,
1382 ) -> Result<(), anyhow::Error> {
1383 if self.aggregate_window_sec.is_some() && aggregate_window_sec.is_none() {
1384 self.print_aggregators();
1386 }
1387 self.aggregate_window_sec = aggregate_window_sec;
1388 Ok(())
1389 }
1390
1391 async fn start_sync_flush(
1392 &mut self,
1393 cx: &Context<Self>,
1394 expected_procs_flushed: usize,
1395 reply: hyperactor_reference::OncePortRef<()>,
1396 version: hyperactor_reference::OncePortRef<u64>,
1397 ) -> Result<(), anyhow::Error> {
1398 if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
1399 tracing::warn!(
1400 "found unfinished ongoing flush: version {}; {} unflushed procs",
1401 self.current_flush_version,
1402 self.current_unflushed_procs,
1403 );
1404 }
1405
1406 self.current_flush_version += 1;
1407 tracing::debug!(
1408 "start sync flush with version {}",
1409 self.current_flush_version
1410 );
1411 self.current_flush_port = Some(reply.clone());
1412 self.current_unflushed_procs = expected_procs_flushed;
1413 version
1414 .send(cx, self.current_flush_version)
1415 .map_err(anyhow::Error::from)?;
1416 Ok(())
1417 }
1418}
1419
1420#[cfg(test)]
1421pub mod test_tap {
1422 use std::sync::Mutex;
1423 use std::sync::OnceLock;
1424
1425 use tokio::sync::mpsc::UnboundedReceiver;
1426 use tokio::sync::mpsc::UnboundedSender;
1427
1428 static TAP: OnceLock<UnboundedSender<String>> = OnceLock::new();
1429 static RX: OnceLock<Mutex<UnboundedReceiver<String>>> = OnceLock::new();
1430
1431 pub fn install(tx: UnboundedSender<String>) {
1433 let _ = TAP.set(tx);
1434 }
1435
1436 pub fn set_receiver(rx: UnboundedReceiver<String>) {
1438 let _ = RX.set(Mutex::new(rx));
1439 }
1440
1441 pub fn push(s: &str) {
1443 if let Some(tx) = TAP.get() {
1444 let _ = tx.send(s.to_string());
1445 }
1446 }
1447
1448 pub fn drain() -> Vec<String> {
1450 let mut out = Vec::new();
1451 if let Some(rx) = RX.get() {
1452 let mut rx = rx.lock().unwrap();
1453 while let Ok(line) = rx.try_recv() {
1454 out.push(line);
1455 }
1456 }
1457 out
1458 }
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463
1464 use std::sync::Arc;
1465 use std::sync::Mutex;
1466
1467 use hyperactor::RemoteSpawn;
1468 use hyperactor::channel;
1469 use hyperactor::channel::ChannelAddr;
1470 use hyperactor::channel::ChannelTx;
1471 use hyperactor::channel::Tx;
1472 use hyperactor::mailbox::BoxedMailboxSender;
1473 use hyperactor::mailbox::DialMailboxRouter;
1474 use hyperactor::mailbox::MailboxServer;
1475 use hyperactor::proc::Proc;
1476 use hyperactor::testing::ids::test_proc_id;
1477 use tokio::io::AsyncSeek;
1478 use tokio::io::AsyncSeekExt;
1479 use tokio::io::AsyncWriteExt;
1480 use tokio::io::SeekFrom;
1481 use tokio::sync::mpsc;
1482
1483 use super::*;
1484
1485 #[derive(Debug)]
1487 struct FileProcessingResult {
1488 lines: Vec<Vec<u8>>,
1490 new_position: u64,
1492 incomplete_line_buffer: Vec<u8>,
1494 }
1495
1496 async fn process_file_content<R: AsyncRead + AsyncSeek + Unpin>(
1499 reader: &mut R,
1500 current_position: u64,
1501 file_size: u64,
1502 existing_line_buffer: Vec<u8>,
1503 max_buffer_size: usize,
1504 ) -> Result<FileProcessingResult> {
1505 if current_position == file_size {
1507 return Ok(FileProcessingResult {
1508 lines: Vec::new(),
1509 new_position: current_position,
1510 incomplete_line_buffer: existing_line_buffer,
1511 });
1512 }
1513
1514 let actual_position = if current_position > file_size {
1516 tracing::warn!(
1517 "File appears to have been truncated (position {} > file size {}), resetting to start",
1518 current_position,
1519 file_size
1520 );
1521 reader.seek(SeekFrom::Start(0)).await?;
1522 0
1523 } else {
1524 reader.seek(SeekFrom::Start(current_position)).await?;
1526 current_position
1527 };
1528
1529 let mut buf = vec![0u8; 128 * 1024];
1530 let mut line_buffer = existing_line_buffer;
1531 let mut lines = Vec::with_capacity(max_buffer_size);
1532 let mut processed_bytes = 0u64;
1533
1534 loop {
1535 let bytes_read = reader.read(&mut buf).await?;
1536 if bytes_read == 0 {
1537 break;
1538 }
1539
1540 let chunk = &buf[..bytes_read];
1541
1542 let mut start = 0;
1543 while let Some(newline_pos) = chunk[start..].iter().position(|&b| b == b'\n') {
1544 let absolute_pos = start + newline_pos;
1545
1546 line_buffer.extend_from_slice(&chunk[start..absolute_pos]);
1547
1548 if !line_buffer.is_empty() {
1549 if line_buffer.len() > MAX_LINE_SIZE {
1550 line_buffer.truncate(MAX_LINE_SIZE);
1551 line_buffer.extend_from_slice(b"... [TRUNCATED]");
1552 }
1553
1554 let line_data = std::mem::replace(&mut line_buffer, Vec::with_capacity(2048));
1555 lines.push(line_data);
1556 }
1557
1558 start = absolute_pos + 1;
1559
1560 if lines.len() >= max_buffer_size {
1562 let new_position = actual_position + processed_bytes + start as u64;
1565
1566 return Ok(FileProcessingResult {
1568 lines,
1569 new_position,
1570 incomplete_line_buffer: Vec::new(),
1571 });
1572 }
1573 }
1574
1575 processed_bytes += bytes_read as u64;
1577
1578 if start < chunk.len() {
1579 line_buffer.extend_from_slice(&chunk[start..]);
1580 }
1581 }
1582
1583 let new_position = actual_position + processed_bytes;
1584
1585 Ok(FileProcessingResult {
1586 lines,
1587 new_position,
1588 incomplete_line_buffer: line_buffer,
1589 })
1590 }
1591
1592 #[tokio::test]
1593 async fn test_forwarding_log_to_client() {
1594 let router = DialMailboxRouter::new();
1596 let (proc_addr, client_rx) =
1597 channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1598 let proc = Proc::configured(
1599 test_proc_id("client_0"),
1600 BoxedMailboxSender::new(router.clone()),
1601 );
1602 proc.clone().serve(client_rx);
1603 router.bind(test_proc_id("client_0").into(), proc_addr.clone());
1604 let (client, _handle) = proc.instance("client").unwrap();
1605
1606 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
1608 unsafe {
1610 std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
1611 }
1612 let log_client_actor = LogClientActor::new((), Flattrs::default()).await.unwrap();
1613 let log_client: hyperactor_reference::ActorRef<LogClientActor> =
1614 proc.spawn("log_client", log_client_actor).unwrap().bind();
1615 let log_forwarder_actor = LogForwardActor::new(log_client.clone(), Flattrs::default())
1616 .await
1617 .unwrap();
1618 let log_forwarder: hyperactor_reference::ActorRef<LogForwardActor> = proc
1619 .spawn("log_forwarder", log_forwarder_actor)
1620 .unwrap()
1621 .bind();
1622
1623 let tx: ChannelTx<LogMessage> = channel::dial(log_channel).unwrap();
1625 tx.post(LogMessage::Log {
1626 hostname: "my_host".into(),
1627 proc_id: "test_proc".into(),
1628 output_target: OutputTarget::Stderr,
1629 payload: wirevalue::Any::serialize(&"will not stream".to_string()).unwrap(),
1630 });
1631
1632 log_forwarder.set_mode(&client, true).await.unwrap();
1634 tx.post(LogMessage::Log {
1635 hostname: "my_host".into(),
1636 proc_id: "test_proc".into(),
1637 output_target: OutputTarget::Stderr,
1638 payload: wirevalue::Any::serialize(&"will stream".to_string()).unwrap(),
1639 });
1640
1641 }
1643
1644 #[test]
1645 fn test_deserialize_message_lines_string() {
1646 let message = "Line 1\nLine 2\nLine 3".to_string();
1648 let serialized = wirevalue::Any::serialize(&message).unwrap();
1649
1650 let result = deserialize_message_lines(&serialized).unwrap();
1651 assert_eq!(result, vec![vec!["Line 1", "Line 2", "Line 3"]]);
1652
1653 let message_bytes = vec![
1655 "Hello\nWorld".as_bytes().to_vec(),
1656 "UTF-8 \u{1F980}\nTest".as_bytes().to_vec(),
1657 ];
1658 let serialized = wirevalue::Any::serialize(&message_bytes).unwrap();
1659
1660 let result = deserialize_message_lines(&serialized).unwrap();
1661 assert_eq!(
1662 result,
1663 vec![vec!["Hello", "World"], vec!["UTF-8 \u{1F980}", "Test"]]
1664 );
1665
1666 let message = "Single line message".to_string();
1668 let serialized = wirevalue::Any::serialize(&message).unwrap();
1669
1670 let result = deserialize_message_lines(&serialized).unwrap();
1671
1672 assert_eq!(result, vec![vec!["Single line message"]]);
1673
1674 let message = "\n\n".to_string();
1676 let serialized = wirevalue::Any::serialize(&message).unwrap();
1677
1678 let result = deserialize_message_lines(&serialized).unwrap();
1679
1680 assert_eq!(result, vec![vec!["", ""]]);
1681
1682 let invalid_utf8_bytes = vec![vec![0xFF, 0xFE, 0xFD]]; let serialized = wirevalue::Any::serialize(&invalid_utf8_bytes).unwrap();
1685
1686 let result = deserialize_message_lines(&serialized);
1687
1688 assert!(
1690 result.is_err(),
1691 "Expected deserialization to fail with invalid UTF-8 bytes"
1692 );
1693 }
1694 #[allow(dead_code)]
1695 struct MockLogSender {
1696 log_sender: mpsc::UnboundedSender<(OutputTarget, String)>, flush_called: Arc<Mutex<bool>>, }
1699
1700 impl MockLogSender {
1701 #[allow(dead_code)]
1702 fn new(log_sender: mpsc::UnboundedSender<(OutputTarget, String)>) -> Self {
1703 Self {
1704 log_sender,
1705 flush_called: Arc::new(Mutex::new(false)),
1706 }
1707 }
1708 }
1709
1710 #[async_trait]
1711 impl LogSender for MockLogSender {
1712 fn send(
1713 &mut self,
1714 output_target: OutputTarget,
1715 payload: Vec<Vec<u8>>,
1716 ) -> anyhow::Result<()> {
1717 let lines: Vec<String> = payload
1719 .iter()
1720 .map(|b| String::from_utf8_lossy(b).trim_end_matches('\n').to_owned())
1721 .collect();
1722
1723 for line in lines {
1724 self.log_sender
1725 .send((output_target, line))
1726 .map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e))?;
1727 }
1728 Ok(())
1729 }
1730
1731 fn flush(&mut self) -> anyhow::Result<()> {
1732 let mut flush_called = self.flush_called.lock().unwrap();
1734 *flush_called = true;
1735
1736 Ok(())
1739 }
1740 }
1741
1742 #[test]
1743 fn test_string_similarity() {
1744 assert_eq!(normalized_edit_distance("hello", "hello"), 0.0);
1746
1747 assert_eq!(normalized_edit_distance("hello", "i'mdiff"), 1.0);
1749
1750 assert!(normalized_edit_distance("hello", "helo") < 0.5);
1752 assert!(normalized_edit_distance("hello", "hello!") < 0.5);
1753
1754 assert_eq!(normalized_edit_distance("", ""), 0.0);
1756 assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1757 }
1758
1759 #[test]
1760 fn test_add_line_to_empty_aggregator() {
1761 let mut aggregator = Aggregator::new();
1762 let result = aggregator.add_line("ERROR 404 not found");
1763
1764 assert!(result.is_ok());
1765 assert_eq!(aggregator.lines.len(), 1);
1766 assert_eq!(aggregator.lines[0].content, "ERROR 404 not found");
1767 assert_eq!(aggregator.lines[0].count, 1);
1768 }
1769
1770 #[test]
1771 fn test_add_line_merges_with_similar_line() {
1772 let mut aggregator = Aggregator::new_with_threshold(0.2);
1773
1774 aggregator.add_line("ERROR 404 timeout").unwrap();
1776 assert_eq!(aggregator.lines.len(), 1);
1777
1778 aggregator.add_line("ERROR 500 timeout").unwrap();
1780 assert_eq!(aggregator.lines.len(), 1); assert_eq!(aggregator.lines[0].count, 2);
1782
1783 aggregator
1785 .add_line("WARNING database connection failed")
1786 .unwrap();
1787 assert_eq!(aggregator.lines.len(), 2); aggregator
1791 .add_line("WARNING database connection timed out")
1792 .unwrap();
1793 assert_eq!(aggregator.lines.len(), 2); assert_eq!(aggregator.lines[1].count, 2); }
1796
1797 #[test]
1798 fn test_aggregation_of_similar_log_lines() {
1799 let mut aggregator = Aggregator::new_with_threshold(0.2);
1800
1801 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/i/ims-ansi, falling back to search/replace to update prefixes for this file.").unwrap();
1803 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,351 conda-unpack-fb:292] Found invalid offsets for lib/pkgconfig/ncursesw.pc, falling back to search/replace to update prefixes for this file.").unwrap();
1804 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/k/kt7, falling back to search/replace to update prefixes for this file.").unwrap();
1805
1806 assert_eq!(aggregator.lines.len(), 1);
1808
1809 assert_eq!(aggregator.lines[0].count, 3);
1811 }
1812
1813 #[tokio::test]
1814 async fn test_stream_fwd_creation() {
1815 hyperactor_telemetry::initialize_logging_for_test();
1816
1817 let (mut writer, reader) = tokio::io::duplex(1024);
1818 let (log_channel, mut rx) =
1819 channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1820
1821 let temp_file = tempfile::NamedTempFile::new().unwrap();
1823 let temp_path = temp_file.path().to_path_buf();
1824
1825 let file_writer = tokio::fs::OpenOptions::new()
1827 .create(true)
1828 .write(true)
1829 .append(true)
1830 .open(&temp_path)
1831 .await
1832 .unwrap();
1833
1834 let file_monitor = FileAppender::new();
1836 let file_monitor_addr = file_monitor
1837 .as_ref()
1838 .map(|fm| fm.addr_for(OutputTarget::Stdout));
1839
1840 let the_test_proc_id = test_proc_id("testproc_0");
1841 let monitor = StreamFwder::start_with_writer(
1842 reader,
1843 Box::new(file_writer),
1844 file_monitor_addr,
1845 OutputTarget::Stdout,
1846 3, Some(log_channel),
1848 &the_test_proc_id,
1849 None, );
1851
1852 tokio::time::sleep(Duration::from_millis(500)).await;
1854
1855 writer.write_all(b"Initial log line\n").await.unwrap();
1857 writer.flush().await.unwrap();
1858
1859 for i in 1..=5 {
1861 writer
1862 .write_all(format!("New log line {}\n", i).as_bytes())
1863 .await
1864 .unwrap();
1865 }
1866 writer.flush().await.unwrap();
1867
1868 tokio::time::sleep(Duration::from_millis(500)).await;
1870
1871 let timeout = Duration::from_secs(1);
1873 let _ = tokio::time::timeout(timeout, rx.recv())
1874 .await
1875 .unwrap_or_else(|_| panic!("Did not get log message within {:?}", timeout));
1876
1877 tokio::time::sleep(Duration::from_millis(200)).await;
1879
1880 let (recent_lines, _result) = monitor.abort().await;
1881
1882 assert!(
1883 recent_lines.len() >= 3,
1884 "Expected buffer with at least 3 lines, got {} lines: {:?}",
1885 recent_lines.len(),
1886 recent_lines
1887 );
1888
1889 let file_contents = std::fs::read_to_string(&temp_path).unwrap();
1890 assert!(
1891 file_contents.contains("Initial log line"),
1892 "Expected temp file to contain 'Initial log line', got: {:?}",
1893 file_contents
1894 );
1895 assert!(
1896 file_contents.contains("New log line 1"),
1897 "Expected temp file to contain 'New log line 1', got: {:?}",
1898 file_contents
1899 );
1900 assert!(
1901 file_contents.contains("New log line 5"),
1902 "Expected temp file to contain 'New log line 5', got: {:?}",
1903 file_contents
1904 );
1905 }
1906
1907 #[test]
1908 fn test_aggregator_custom_threshold() {
1909 let mut strict_aggregator = Aggregator::new_with_threshold(0.05);
1911 strict_aggregator.add_line("ERROR 404").unwrap();
1912 strict_aggregator.add_line("ERROR 500").unwrap(); assert_eq!(strict_aggregator.lines.len(), 2);
1914
1915 let mut lenient_aggregator = Aggregator::new_with_threshold(0.8);
1917 lenient_aggregator.add_line("ERROR 404").unwrap();
1918 lenient_aggregator.add_line("WARNING 200").unwrap(); assert_eq!(lenient_aggregator.lines.len(), 1);
1920 assert_eq!(lenient_aggregator.lines[0].count, 2);
1921 }
1922
1923 #[test]
1924 fn test_format_system_time() {
1925 let test_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1609459200); let formatted = format_system_time(test_time);
1927
1928 assert!(formatted.contains("-"));
1930 assert!(formatted.contains(":"));
1931 assert!(formatted.len() > 10); }
1933
1934 #[test]
1935 fn test_aggregator_display_formatting() {
1936 let mut aggregator = Aggregator::new();
1937 aggregator.add_line("Test error message").unwrap();
1938 aggregator.add_line("Test error message").unwrap(); let display_string = format!("{}", aggregator);
1941
1942 assert!(display_string.contains("Aggregated Logs"));
1944 assert!(display_string.contains("[2 similar log lines]"));
1945 assert!(display_string.contains("Test error message"));
1946 assert!(display_string.contains(">>>") && display_string.contains("<<<"));
1947 }
1948
1949 #[tokio::test]
1950 async fn test_local_log_sender_inactive_status() {
1951 let (log_channel, _) =
1952 channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1953 let test_proc_id = test_proc_id("testproc_0");
1954 let mut sender = LocalLogSender::new(log_channel, &test_proc_id).unwrap();
1955
1956 let result = sender.send(OutputTarget::Stdout, vec![b"test".to_vec()]);
1960 assert!(result.is_ok());
1961
1962 let result = sender.flush();
1963 assert!(result.is_ok());
1964 }
1965
1966 #[test]
1967 fn test_levenshtein_distance_edge_cases() {
1968 assert_eq!(levenshtein_distance("", ""), 0);
1970 assert_eq!(levenshtein_distance("", "hello"), 5);
1971 assert_eq!(levenshtein_distance("hello", ""), 5);
1972
1973 assert_eq!(levenshtein_distance("hello", "hello"), 0);
1975
1976 assert_eq!(levenshtein_distance("hello", "helo"), 1); assert_eq!(levenshtein_distance("helo", "hello"), 1); assert_eq!(levenshtein_distance("hello", "hallo"), 1); assert_eq!(levenshtein_distance("café", "cafe"), 1);
1983 }
1984
1985 #[test]
1986 fn test_normalized_edit_distance_edge_cases() {
1987 assert_eq!(normalized_edit_distance("", ""), 0.0);
1989
1990 assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1992 assert_eq!(normalized_edit_distance("", "hello"), 1.0);
1993
1994 let distance = normalized_edit_distance("completely", "different");
1996 assert!(distance >= 0.0 && distance <= 1.0);
1997 }
1998
1999 #[tokio::test]
2000 async fn test_deserialize_message_lines_edge_cases() {
2001 let empty_message = "".to_string();
2003 let serialized = wirevalue::Any::serialize(&empty_message).unwrap();
2004 let result = deserialize_message_lines(&serialized).unwrap();
2005 assert_eq!(result, vec![vec![] as Vec<String>]);
2006
2007 let trailing_newline = "line1\nline2\n".to_string();
2009 let serialized = wirevalue::Any::serialize(&trailing_newline).unwrap();
2010 let result = deserialize_message_lines(&serialized).unwrap();
2011 assert_eq!(result, vec![vec!["line1", "line2"]]);
2012 }
2013
2014 #[test]
2015 fn test_output_target_serialization() {
2016 let stdout_serialized = serde_json::to_string(&OutputTarget::Stdout).unwrap();
2018 let stderr_serialized = serde_json::to_string(&OutputTarget::Stderr).unwrap();
2019
2020 let stdout_deserialized: OutputTarget = serde_json::from_str(&stdout_serialized).unwrap();
2021 let stderr_deserialized: OutputTarget = serde_json::from_str(&stderr_serialized).unwrap();
2022
2023 assert_eq!(stdout_deserialized, OutputTarget::Stdout);
2024 assert_eq!(stderr_deserialized, OutputTarget::Stderr);
2025 }
2026
2027 #[test]
2028 fn test_log_line_display_formatting() {
2029 let log_line = LogLine::new("Test message".to_string());
2030 let display_string = format!("{}", log_line);
2031
2032 assert!(display_string.contains("[1 similar log lines]"));
2033 assert!(display_string.contains("Test message"));
2034
2035 let mut log_line_multi = LogLine::new("Test message".to_string());
2037 log_line_multi.count = 5;
2038 let display_string_multi = format!("{}", log_line_multi);
2039
2040 assert!(display_string_multi.contains("[5 similar log lines]"));
2041 assert!(display_string_multi.contains("Test message"));
2042 }
2043
2044 fn create_mock_reader(data: Vec<u8>) -> std::io::Cursor<Vec<u8>> {
2046 std::io::Cursor::new(data)
2047 }
2048
2049 #[tokio::test]
2050 async fn test_process_file_content_basic() {
2051 let data = b"line1\nline2\nline3\n".to_vec();
2052 let mut reader = create_mock_reader(data.clone());
2053 let max_buf_size = 10;
2054
2055 let result =
2056 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2057 .await
2058 .unwrap();
2059
2060 assert_eq!(result.lines.len(), 3);
2061 assert_eq!(result.lines[0], b"line1");
2062 assert_eq!(result.lines[1], b"line2");
2063 assert_eq!(result.lines[2], b"line3");
2064 assert_eq!(result.new_position, data.len() as u64);
2065 assert!(result.incomplete_line_buffer.is_empty());
2066 }
2067
2068 #[tokio::test]
2069 async fn test_process_file_content_incomplete_line() {
2070 let data = b"line1\nline2\npartial".to_vec();
2071 let mut reader = create_mock_reader(data.clone());
2072 let max_buf_size = 10;
2073
2074 let result =
2075 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2076 .await
2077 .unwrap();
2078
2079 assert_eq!(result.lines.len(), 2);
2080 assert_eq!(result.lines[0], b"line1");
2081 assert_eq!(result.lines[1], b"line2");
2082 assert_eq!(result.new_position, data.len() as u64);
2083 assert_eq!(result.incomplete_line_buffer, b"partial");
2084 }
2085
2086 #[tokio::test]
2087 async fn test_process_file_content_with_existing_buffer() {
2088 let data = b"omplete\nline2\nline3\n".to_vec();
2089 let mut reader = create_mock_reader(data.clone());
2090 let existing_buffer = b"inc".to_vec();
2091 let max_buf_size = 10;
2092
2093 let result = process_file_content(
2094 &mut reader,
2095 0,
2096 data.len() as u64,
2097 existing_buffer,
2098 max_buf_size,
2099 )
2100 .await
2101 .unwrap();
2102
2103 assert_eq!(result.lines.len(), 3);
2104 assert_eq!(result.lines[0], b"incomplete");
2105 assert_eq!(result.lines[1], b"line2");
2106 assert_eq!(result.lines[2], b"line3");
2107 assert_eq!(result.new_position, data.len() as u64);
2108 assert!(result.incomplete_line_buffer.is_empty());
2109 }
2110
2111 #[tokio::test]
2112 async fn test_process_file_content_empty_file() {
2113 let data = Vec::new();
2114 let mut reader = create_mock_reader(data.clone());
2115 let max_buf_size = 10;
2116
2117 let result = process_file_content(&mut reader, 0, 0, Vec::new(), max_buf_size)
2118 .await
2119 .unwrap();
2120
2121 assert!(result.lines.is_empty());
2122 assert_eq!(result.new_position, 0);
2123 assert!(result.incomplete_line_buffer.is_empty());
2124 }
2125
2126 #[tokio::test]
2127 async fn test_process_file_content_only_newlines() {
2128 let data = b"\n\n\n".to_vec();
2129 let mut reader = create_mock_reader(data.clone());
2130 let max_buf_size = 10;
2131
2132 let result =
2133 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2134 .await
2135 .unwrap();
2136
2137 assert!(result.lines.is_empty());
2139 assert_eq!(result.new_position, data.len() as u64);
2140 assert!(result.incomplete_line_buffer.is_empty());
2141 }
2142
2143 #[tokio::test]
2144 async fn test_process_file_content_no_newlines() {
2145 let data = b"no newlines here".to_vec();
2146 let mut reader = create_mock_reader(data.clone());
2147 let max_buf_size = 10;
2148
2149 let result =
2150 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2151 .await
2152 .unwrap();
2153
2154 assert!(result.lines.is_empty());
2155 assert_eq!(result.new_position, data.len() as u64);
2156 assert_eq!(result.incomplete_line_buffer, b"no newlines here");
2157 }
2158
2159 #[tokio::test]
2160 async fn test_process_file_content_file_truncation() {
2161 let data = b"line1\nline2\n".to_vec();
2162 let mut reader = create_mock_reader(data.clone());
2163
2164 let result = process_file_content(
2166 &mut reader,
2167 100, data.len() as u64,
2169 Vec::new(),
2170 10, )
2172 .await
2173 .unwrap();
2174
2175 assert_eq!(result.lines.len(), 2);
2177 assert_eq!(result.lines[0], b"line1");
2178 assert_eq!(result.lines[1], b"line2");
2179 assert_eq!(result.new_position, data.len() as u64);
2180 assert!(result.incomplete_line_buffer.is_empty());
2181 }
2182
2183 #[tokio::test]
2184 async fn test_process_file_content_seek_to_position() {
2185 let data = b"line1\nline2\nline3\n".to_vec();
2186 let mut reader = create_mock_reader(data.clone());
2187
2188 let result = process_file_content(&mut reader, 6, data.len() as u64, Vec::new(), 10)
2190 .await
2191 .unwrap();
2192
2193 assert_eq!(result.lines.len(), 2);
2194 assert_eq!(result.lines[0], b"line2");
2195 assert_eq!(result.lines[1], b"line3");
2196 assert_eq!(result.new_position, data.len() as u64);
2197 assert!(result.incomplete_line_buffer.is_empty());
2198 }
2199
2200 #[tokio::test]
2201 async fn test_process_file_content_position_equals_file_size() {
2202 let data = b"line1\nline2\n".to_vec();
2203 let mut reader = create_mock_reader(data.clone());
2204
2205 let result = process_file_content(
2207 &mut reader,
2208 data.len() as u64,
2209 data.len() as u64,
2210 Vec::new(),
2211 10,
2212 )
2213 .await
2214 .unwrap();
2215
2216 assert!(
2218 result.lines.is_empty(),
2219 "Expected empty line got {:?}",
2220 result.lines
2221 );
2222 assert_eq!(result.new_position, data.len() as u64);
2223 assert!(result.incomplete_line_buffer.is_empty());
2224 }
2225
2226 #[tokio::test]
2227 async fn test_process_file_content_large_line_truncation() {
2228 let large_line = "x".repeat(MAX_LINE_SIZE + 1000);
2230 let data = format!("{}\nline2\n", large_line).into_bytes();
2231 let mut reader = create_mock_reader(data.clone());
2232
2233 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2234 .await
2235 .unwrap();
2236
2237 assert_eq!(result.lines.len(), 2);
2238
2239 assert_eq!(
2241 result.lines[0].len(),
2242 MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2243 );
2244 assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2245
2246 assert_eq!(result.lines[1], b"line2");
2248
2249 assert_eq!(result.new_position, data.len() as u64);
2250 assert!(result.incomplete_line_buffer.is_empty());
2251 }
2252
2253 #[tokio::test]
2254 async fn test_process_file_content_mixed_line_endings() {
2255 let data = b"line1\nline2\r\nline3\n".to_vec();
2256 let mut reader = create_mock_reader(data.clone());
2257
2258 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2259 .await
2260 .unwrap();
2261
2262 assert_eq!(result.lines.len(), 3);
2263 assert_eq!(result.lines[0], b"line1");
2264 assert_eq!(result.lines[1], b"line2\r"); assert_eq!(result.lines[2], b"line3");
2266 assert_eq!(result.new_position, data.len() as u64);
2267 assert!(result.incomplete_line_buffer.is_empty());
2268 }
2269
2270 #[tokio::test]
2271 async fn test_process_file_content_existing_buffer_with_truncation() {
2272 let existing_buffer = "x".repeat(MAX_LINE_SIZE - 100);
2274 let data = format!("{}\nline2\n", "y".repeat(200)).into_bytes();
2275 let mut reader = create_mock_reader(data.clone());
2276
2277 let result = process_file_content(
2278 &mut reader,
2279 0,
2280 data.len() as u64,
2281 existing_buffer.into_bytes(),
2282 10,
2283 )
2284 .await
2285 .unwrap();
2286
2287 assert_eq!(result.lines.len(), 2);
2288
2289 assert_eq!(
2291 result.lines[0].len(),
2292 MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2293 );
2294 assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2295
2296 assert_eq!(result.lines[1], b"line2");
2298
2299 assert_eq!(result.new_position, data.len() as u64);
2300 assert!(result.incomplete_line_buffer.is_empty());
2301 }
2302
2303 #[tokio::test]
2304 async fn test_process_file_content_single_character_lines() {
2305 let data = b"a\nb\nc\n".to_vec();
2306 let mut reader = create_mock_reader(data.clone());
2307
2308 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2309 .await
2310 .unwrap();
2311
2312 assert_eq!(result.lines.len(), 3);
2313 assert_eq!(result.lines[0], b"a");
2314 assert_eq!(result.lines[1], b"b");
2315 assert_eq!(result.lines[2], b"c");
2316 assert_eq!(result.new_position, data.len() as u64);
2317 assert!(result.incomplete_line_buffer.is_empty());
2318 }
2319
2320 #[tokio::test]
2321 async fn test_process_file_content_binary_data() {
2322 let data = vec![0x00, 0x01, 0x02, b'\n', 0xFF, 0xFE, b'\n'];
2323 let mut reader = create_mock_reader(data.clone());
2324
2325 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2326 .await
2327 .unwrap();
2328
2329 assert_eq!(result.lines.len(), 2);
2330 assert_eq!(result.lines[0], vec![0x00, 0x01, 0x02]);
2331 assert_eq!(result.lines[1], vec![0xFF, 0xFE]);
2332 assert_eq!(result.new_position, data.len() as u64);
2333 assert!(result.incomplete_line_buffer.is_empty());
2334 }
2335
2336 #[tokio::test]
2337 async fn test_process_file_content_resume_after_max_buffer_size() {
2338 let data = b"line 1\nline 2\nline 3\n".to_vec();
2340 let mut reader = create_mock_reader(data.clone());
2341 let max_buffer_size = 2; let result1 = process_file_content(
2345 &mut reader,
2346 0, data.len() as u64,
2348 Vec::new(), max_buffer_size,
2350 )
2351 .await
2352 .unwrap();
2353
2354 assert_eq!(result1.lines.len(), 2, "First call should return 2 lines");
2356 assert_eq!(result1.lines[0], b"line 1");
2357 assert_eq!(result1.lines[1], b"line 2");
2358 assert!(result1.incomplete_line_buffer.is_empty());
2359
2360 let expected_position_after_first_call = b"line 1\nline 2\n".len() as u64;
2362 assert_eq!(result1.new_position, expected_position_after_first_call);
2363
2364 let mut reader2 = create_mock_reader(data.clone());
2366 let result2 = process_file_content(
2367 &mut reader2,
2368 result1.new_position, data.len() as u64,
2370 result1.incomplete_line_buffer, max_buffer_size,
2372 )
2373 .await
2374 .unwrap();
2375
2376 assert_eq!(result2.lines.len(), 1, "Second call should return 1 line");
2378 assert_eq!(result2.lines[0], b"line 3");
2379 assert!(result2.incomplete_line_buffer.is_empty());
2380 assert_eq!(result2.new_position, data.len() as u64);
2381 }
2382
2383 #[tokio::test]
2384 async fn test_utf8_truncation() {
2385 hyperactor_telemetry::initialize_logging_for_test();
2389
2390 let mut long_line = "x".repeat(MAX_LINE_SIZE - 1);
2392 long_line.push('🦀'); long_line.push('\n');
2394
2395 let (mut writer, reader) = tokio::io::duplex(8192);
2397
2398 let test_proc_id = test_proc_id("testproc_0");
2400 let monitor = StreamFwder::start_with_writer(
2401 reader,
2402 Box::new(tokio::io::sink()), None, OutputTarget::Stdout,
2405 1, None, &test_proc_id,
2408 None, );
2410
2411 writer.write_all(long_line.as_bytes()).await.unwrap();
2413 drop(writer); let (_lines, result) = monitor.abort().await;
2417 result.expect("Should complete without panic despite UTF-8 truncation");
2418 }
2419}