1use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::fmt;
12use std::path::Path;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::Duration;
16use std::time::SystemTime;
17
18use anyhow::Result;
19use async_trait::async_trait;
20use chrono::DateTime;
21use chrono::Local;
22use hostname;
23use hyperactor::Actor;
24use hyperactor::ActorRef;
25use hyperactor::Bind;
26use hyperactor::Context;
27use hyperactor::Endpoint as _;
28use hyperactor::HandleClient;
29use hyperactor::Handler;
30use hyperactor::Instance;
31use hyperactor::OncePortRef;
32use hyperactor::ProcAddr;
33use hyperactor::RefClient;
34use hyperactor::Unbind;
35use hyperactor::channel;
36use hyperactor::channel::ChannelAddr;
37use hyperactor::channel::ChannelRx;
38use hyperactor::channel::ChannelTransport;
39use hyperactor::channel::ChannelTx;
40use hyperactor::channel::Rx;
41use hyperactor::channel::Tx;
42use hyperactor::channel::TxStatus;
43use hyperactor_config::CONFIG;
44use hyperactor_config::ConfigAttr;
45use hyperactor_config::Flattrs;
46use hyperactor_config::attrs::declare_attrs;
47use hyperactor_telemetry::env;
48use hyperactor_telemetry::log_file_path;
49use serde::Deserialize;
50use serde::Serialize;
51use tokio::io;
52use tokio::io::AsyncRead;
53use tokio::io::AsyncReadExt;
54use tokio::io::AsyncWriteExt;
55use tokio::sync::Mutex;
56use tokio::sync::Notify;
57use tokio::sync::RwLock;
58use tokio::sync::watch::Receiver;
59use tokio::task::JoinHandle;
60use tracing::Level;
61use typeuri::Named;
62
63use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
64use crate::shortuuid::ShortUuid;
65
66mod line_prefixing_writer;
67
68pub(crate) const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5;
69const MAX_LINE_SIZE: usize = 4 * 1024;
70
71declare_attrs! {
72 @meta(CONFIG = ConfigAttr::new(
76 Some("HYPERACTOR_READ_LOG_BUFFER".to_string()),
77 Some("read_log_buffer".to_string()),
78 ))
79 pub attr READ_LOG_BUFFER: usize = 100;
80
81 @meta(CONFIG = ConfigAttr::new(
83 Some("HYPERACTOR_FORCE_FILE_LOG".to_string()),
84 Some("force_file_log".to_string()),
85 ))
86 pub attr FORCE_FILE_LOG: bool = false;
87
88 @meta(CONFIG = ConfigAttr::new(
90 Some("HYPERACTOR_PREFIX_WITH_RANK".to_string()),
91 Some("prefix_with_rank".to_string()),
92 ))
93 pub attr PREFIX_WITH_RANK: bool = true;
94}
95
96fn levenshtein_distance(left: &str, right: &str) -> usize {
98 let left_chars: Vec<char> = left.chars().collect();
99 let right_chars: Vec<char> = right.chars().collect();
100
101 let left_len = left_chars.len();
102 let right_len = right_chars.len();
103
104 if left_len == 0 {
106 return right_len;
107 }
108 if right_len == 0 {
109 return left_len;
110 }
111
112 let mut matrix = vec![vec![0; right_len + 1]; left_len + 1];
114
115 for (i, row) in matrix.iter_mut().enumerate().take(left_len + 1) {
117 row[0] = i;
118 }
119 for (j, cell) in matrix[0].iter_mut().enumerate().take(right_len + 1) {
120 *cell = j;
121 }
122
123 for i in 1..=left_len {
125 for j in 1..=right_len {
126 let cost = if left_chars[i - 1] == right_chars[j - 1] {
127 0
128 } else {
129 1
130 };
131
132 matrix[i][j] = std::cmp::min(
133 std::cmp::min(
134 matrix[i - 1][j] + 1, matrix[i][j - 1] + 1, ),
137 matrix[i - 1][j - 1] + cost, );
139 }
140 }
141
142 matrix[left_len][right_len]
144}
145
146fn normalized_edit_distance(left: &str, right: &str) -> f64 {
148 let distance = levenshtein_distance(left, right) as f64;
149 let max_len = std::cmp::max(left.len(), right.len()) as f64;
150
151 if max_len == 0.0 {
152 0.0 } else {
154 distance / max_len
155 }
156}
157
158#[derive(Debug, Clone)]
159struct LogLine {
161 content: String,
162 pub count: u64,
163}
164
165impl LogLine {
166 fn new(content: String) -> Self {
167 Self { content, count: 1 }
168 }
169}
170
171impl fmt::Display for LogLine {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 write!(
174 f,
175 "\x1b[33m[{} similar log lines]\x1b[0m {}",
176 self.count, self.content
177 )
178 }
179}
180
181#[derive(Debug, Clone)]
182struct Aggregator {
185 lines: Vec<LogLine>,
186 start_time: SystemTime,
187 similarity_threshold: f64, }
189
190impl Aggregator {
191 fn new() -> Self {
192 Self::new_with_threshold(0.15)
194 }
195
196 fn new_with_threshold(threshold: f64) -> Self {
197 Aggregator {
198 lines: vec![],
199 start_time: std::time::SystemTime::now(),
200 similarity_threshold: threshold,
201 }
202 }
203
204 fn reset(&mut self) {
205 self.lines.clear();
206 self.start_time = std::time::SystemTime::now();
207 }
208
209 fn add_line(&mut self, line: &str) -> anyhow::Result<()> {
210 let mut best_match_idx = None;
212 let mut best_similarity = f64::MAX;
213
214 for (idx, existing_line) in self.lines.iter().enumerate() {
215 let distance = normalized_edit_distance(&existing_line.content, line);
216
217 if distance < best_similarity && distance < self.similarity_threshold {
219 best_match_idx = Some(idx);
220 best_similarity = distance;
221 }
222 }
223
224 if let Some(idx) = best_match_idx {
226 self.lines[idx].count += 1;
227 } else {
228 self.lines.push(LogLine::new(line.to_string()));
230 }
231
232 Ok(())
233 }
234
235 fn is_empty(&self) -> bool {
236 self.lines.is_empty()
237 }
238}
239
240fn format_system_time(time: SystemTime) -> String {
242 let datetime: DateTime<Local> = time.into();
243 datetime.format("%Y-%m-%d %H:%M:%S").to_string()
244}
245
246impl fmt::Display for Aggregator {
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 let start_time_str = format_system_time(self.start_time);
250
251 let current_time = std::time::SystemTime::now();
253 let end_time_str = format_system_time(current_time);
254
255 writeln!(
257 f,
258 "\x1b[36m>>> Aggregated Logs ({}) >>>\x1b[0m",
259 start_time_str
260 )?;
261
262 for line in self.lines.iter() {
264 writeln!(f, "{}", line)?;
265 }
266 writeln!(
267 f,
268 "\x1b[36m<<< Aggregated Logs ({}) <<<\x1b[0m",
269 end_time_str
270 )?;
271 Ok(())
272 }
273}
274
275#[derive(
277 Debug,
278 Clone,
279 Serialize,
280 Deserialize,
281 Named,
282 Handler,
283 HandleClient,
284 RefClient
285)]
286pub enum LogMessage {
287 Log {
289 hostname: String,
291 proc_id: String,
293 output_target: OutputTarget,
295 payload: wirevalue::Any,
297 },
298
299 Flush {
301 sync_version: Option<u64>,
304 },
305}
306
307#[derive(
309 Debug,
310 Clone,
311 Serialize,
312 Deserialize,
313 Named,
314 Handler,
315 HandleClient,
316 RefClient
317)]
318#[expect(
319 clippy::large_enum_variant,
320 reason = "actor message enum with Handler/HandleClient/RefClient derives; boxing fields ripples into client/handler call sites and may require derive-macro changes — separate diff"
321)]
322pub enum LogClientMessage {
323 SetAggregate {
324 aggregate_window_sec: Option<u64>,
326 },
327
328 StartSyncFlush {
330 expected_procs: usize,
332 reply: OncePortRef<()>,
334 version: OncePortRef<u64>,
336 },
337}
338
339#[async_trait]
341pub trait LogSender: Send + Sync {
342 fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()>;
344
345 fn flush(&mut self) -> anyhow::Result<()>;
348}
349
350#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
352pub enum OutputTarget {
353 Stdout,
355 Stderr,
357}
358
359#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
360pub enum Stream {
361 ChildStdout,
363 ChildStderr,
365}
366
367pub struct LocalLogSender {
369 hostname: String,
370 proc_id: String,
371 tx: ChannelTx<LogMessage>,
372 status: Receiver<TxStatus>,
373}
374
375impl LocalLogSender {
376 fn new(log_channel: ChannelAddr, proc_id: &ProcAddr) -> Result<Self, anyhow::Error> {
377 let tx = channel::dial::<LogMessage>(log_channel)?;
378 let status = tx.status().clone();
379
380 let hostname = hostname::get()
381 .unwrap_or_else(|_| "unknown_host".into())
382 .into_string()
383 .unwrap_or("unknown_host".to_string());
384 Ok(Self {
385 hostname,
386 proc_id: proc_id.to_string(),
387 tx,
388 status,
389 })
390 }
391}
392
393#[async_trait]
394impl LogSender for LocalLogSender {
395 fn send(&mut self, target: OutputTarget, payload: Vec<Vec<u8>>) -> anyhow::Result<()> {
396 if TxStatus::Active == *self.status.borrow() {
397 self.tx.post(LogMessage::Log {
398 hostname: self.hostname.clone(),
399 proc_id: self.proc_id.clone(),
400 output_target: target,
401 payload: wirevalue::Any::serialize(&payload)?,
402 });
403 }
404
405 Ok(())
406 }
407
408 fn flush(&mut self) -> anyhow::Result<()> {
409 if TxStatus::Active == *self.status.borrow() {
411 self.tx.post(LogMessage::Flush { sync_version: None });
412 }
413 Ok(())
414 }
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize, Named)]
419pub struct FileMonitorMessage {
420 lines: Vec<String>,
421}
422wirevalue::register_type!(FileMonitorMessage);
423
424pub struct FileAppender {
426 stdout_addr: ChannelAddr,
427 stderr_addr: ChannelAddr,
428 #[allow(dead_code)] stdout_task: JoinHandle<()>,
430 #[allow(dead_code)]
431 stderr_task: JoinHandle<()>,
432 stop: Arc<Notify>,
433}
434
435impl fmt::Debug for FileAppender {
436 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437 f.debug_struct("FileMonitor")
438 .field("stdout_addr", &self.stdout_addr)
439 .field("stderr_addr", &self.stderr_addr)
440 .finish()
441 }
442}
443
444impl FileAppender {
445 pub fn new() -> Option<Self> {
448 let stop = Arc::new(Notify::new());
449 let file_name_tag = hostname::get()
451 .unwrap_or_else(|_| "unknown_host".into())
452 .into_string()
453 .unwrap_or("unknown_host".to_string());
454
455 let (stdout_path, stdout_writer) =
457 match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stdout) {
458 Some(writer) => writer,
459 None => {
460 tracing::warn!("failed to create stdout file");
461 return None;
462 }
463 };
464 let (stdout_addr, stdout_rx) = {
465 let _guard = tracing::span!(Level::INFO, "appender", file = "stdout").entered();
466 match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
467 Ok((addr, rx)) => (addr, rx),
468 Err(e) => {
469 tracing::warn!("failed to serve stdout channel: {}", e);
470 return None;
471 }
472 }
473 };
474 let stdout_stop = stop.clone();
475 let stdout_task = tokio::spawn(file_monitor_task(
476 stdout_rx,
477 stdout_writer,
478 OutputTarget::Stdout,
479 stdout_stop,
480 ));
481
482 let (stderr_path, stderr_writer) =
484 match get_unique_local_log_destination(&file_name_tag, OutputTarget::Stderr) {
485 Some(writer) => writer,
486 None => {
487 tracing::warn!("failed to create stderr file");
488 return None;
489 }
490 };
491 let (stderr_addr, stderr_rx) = {
492 let _guard = tracing::span!(Level::INFO, "appender", file = "stderr").entered();
493 match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) {
494 Ok((addr, rx)) => (addr, rx),
495 Err(e) => {
496 tracing::warn!("failed to serve stderr channel: {}", e);
497 return None;
498 }
499 }
500 };
501 let stderr_stop = stop.clone();
502 let stderr_task = tokio::spawn(file_monitor_task(
503 stderr_rx,
504 stderr_writer,
505 OutputTarget::Stderr,
506 stderr_stop,
507 ));
508
509 tracing::debug!(
510 "FileAppender: created for stdout {} stderr {} ",
511 stdout_path.display(),
512 stderr_path.display()
513 );
514
515 Some(Self {
516 stdout_addr,
517 stderr_addr,
518 stdout_task,
519 stderr_task,
520 stop,
521 })
522 }
523
524 pub fn addr_for(&self, target: OutputTarget) -> ChannelAddr {
526 match target {
527 OutputTarget::Stdout => self.stdout_addr.clone(),
528 OutputTarget::Stderr => self.stderr_addr.clone(),
529 }
530 }
531}
532
533impl Drop for FileAppender {
534 fn drop(&mut self) {
535 self.stop.notify_waiters();
537 tracing::debug!("FileMonitor: dropping, stop signal sent, tasks will flush and exit");
538 }
539}
540
541async fn file_monitor_task(
543 mut rx: ChannelRx<FileMonitorMessage>,
544 mut writer: Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
545 target: OutputTarget,
546 stop: Arc<Notify>,
547) {
548 loop {
549 tokio::select! {
550 msg = rx.recv() => {
551 match msg {
552 Ok(msg) => {
553 for line in &msg.lines {
555 if let Err(e) = writer.write_all(line.as_bytes()).await {
556 tracing::warn!("FileMonitor: failed to write line to file: {}", e);
557 continue;
558 }
559 if let Err(e) = writer.write_all(b"\n").await {
560 tracing::warn!("FileMonitor: failed to write newline to file: {}", e);
561 }
562 }
563 if let Err(e) = writer.flush().await {
564 tracing::warn!("FileMonitor: failed to flush file: {}", e);
565 }
566 }
567 Err(e) => {
568 tracing::debug!("FileMonitor task for {:?}: channel error: {}", target, e);
570 break;
571 }
572 }
573 }
574 _ = stop.notified() => {
575 tracing::debug!("FileMonitor task for {:?}: stop signal received", target);
576 break;
577 }
578 }
579 }
580
581 if let Err(e) = writer.flush().await {
583 tracing::warn!("FileMonitor: failed final flush: {}", e);
584 }
585 tracing::debug!("FileMonitor task for {:?} exiting", target);
586}
587
588fn create_unique_file_writer(
589 file_name_tag: &str,
590 output_target: OutputTarget,
591 env: env::Env,
592) -> Result<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
593 let suffix = match output_target {
594 OutputTarget::Stderr => "stderr",
595 OutputTarget::Stdout => "stdout",
596 };
597 let (path, filename) = log_file_path(env, None)?;
598 let path = Path::new(&path);
599 let mut full_path = PathBuf::from(path);
600
601 let uuid = ShortUuid::generate();
602
603 full_path.push(format!(
604 "{}_{}_{}.{}",
605 filename, file_name_tag, uuid, suffix
606 ));
607 let file = std::fs::OpenOptions::new()
608 .create(true)
609 .append(true)
610 .open(full_path.clone())?;
611 let tokio_file = tokio::fs::File::from_std(file);
612 Ok((full_path, Box::new(tokio_file)))
614}
615
616fn get_unique_local_log_destination(
617 file_name_tag: &str,
618 output_target: OutputTarget,
619) -> Option<(PathBuf, Box<dyn io::AsyncWrite + Send + Unpin + 'static>)> {
620 let env: env::Env = env::Env::current();
621 if env == env::Env::Local && !hyperactor_config::global::get(FORCE_FILE_LOG) {
622 tracing::debug!("not creating log file because of env type");
623 None
624 } else {
625 match create_unique_file_writer(file_name_tag, output_target, env) {
626 Ok((a, b)) => Some((a, b)),
627 Err(e) => {
628 tracing::warn!("failed to create unique file writer: {}", e);
629 None
630 }
631 }
632 }
633}
634
635fn std_writer(target: OutputTarget) -> Box<dyn io::AsyncWrite + Send + Unpin> {
637 match target {
639 OutputTarget::Stdout => Box::new(tokio::io::stdout()),
640 OutputTarget::Stderr => Box::new(tokio::io::stderr()),
641 }
642}
643
644async fn tee(
647 mut reader: impl AsyncRead + Unpin + Send + 'static,
648 mut std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
649 log_sender: Option<Box<dyn LogSender + Send>>,
650 file_monitor_addr: Option<ChannelAddr>,
651 target: OutputTarget,
652 prefix: Option<String>,
653 stop: Arc<Notify>,
654 recent_lines_buf: RotatingLineBuffer,
655) -> Result<(), io::Error> {
656 let mut buf = [0u8; 8192];
657 let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE);
658 let mut log_sender = log_sender;
659
660 let mut file_monitor_tx: Option<ChannelTx<FileMonitorMessage>> =
662 file_monitor_addr.and_then(|addr| match channel::dial(addr.clone()) {
663 Ok(tx) => Some(tx),
664 Err(e) => {
665 tracing::warn!("Failed to dial file monitor channel {}: {}", addr, e);
666 None
667 }
668 });
669
670 loop {
671 tokio::select! {
672 read_result = reader.read(&mut buf) => {
673 match read_result {
674 Ok(n) => {
675 if n == 0 {
676 tracing::debug!("EOF reached in tee");
678 break;
679 }
680
681 if let Err(e) = std_writer.write_all(&buf[..n]).await {
683 tracing::warn!("error writing to std: {}", e);
684 }
685
686 let mut completed_lines = Vec::new();
688
689 for &byte in &buf[..n] {
690 if byte == b'\n' {
691 let mut line = String::from_utf8_lossy(&line_buffer).to_string();
693
694 if line.len() > MAX_LINE_SIZE {
697 let mut truncate_at = MAX_LINE_SIZE;
698 while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
699 truncate_at -= 1;
700 }
701 line.truncate(truncate_at);
702 line.push_str("... [TRUNCATED]");
703 }
704
705 let final_line = if let Some(ref p) = prefix {
707 format!("[{}] {}", p, line)
708 } else {
709 line
710 };
711
712 completed_lines.push(final_line);
713 line_buffer.clear();
714 } else {
715 line_buffer.push(byte);
716 }
717 }
718
719 if !completed_lines.is_empty() {
721 if let Some(ref mut sender) = log_sender {
722 let bytes: Vec<Vec<u8>> = completed_lines.iter()
723 .map(|s| s.as_bytes().to_vec())
724 .collect();
725 if let Err(e) = sender.send(target, bytes) {
726 tracing::warn!("error sending to log_sender: {}", e);
727 }
728 }
729
730 if let Some(ref mut tx) = file_monitor_tx {
732 let msg = FileMonitorMessage {
733 lines: completed_lines,
734 };
735 tx.post(msg);
737 }
738 }
739
740 recent_lines_buf.try_add_data(&buf, n);
741 },
742 Err(e) => {
743 tracing::debug!("read error in tee: {}", e);
744 return Err(e);
745 }
746 }
747 },
748 _ = stop.notified() => {
749 tracing::debug!("stop signal received in tee");
750 break;
751 }
752 }
753 }
754
755 std_writer.flush().await?;
756
757 if !line_buffer.is_empty() {
759 let mut line = String::from_utf8_lossy(&line_buffer).to_string();
760 if line.len() > MAX_LINE_SIZE {
763 let mut truncate_at = MAX_LINE_SIZE;
764 while truncate_at > 0 && !line.is_char_boundary(truncate_at) {
765 truncate_at -= 1;
766 }
767 line.truncate(truncate_at);
768 line.push_str("... [TRUNCATED]");
769 }
770 let final_line = if let Some(ref p) = prefix {
771 format!("[{}] {}", p, line)
772 } else {
773 line
774 };
775
776 let final_lines = vec![final_line];
777
778 if let Some(ref mut sender) = log_sender {
780 let bytes: Vec<Vec<u8>> = final_lines.iter().map(|s| s.as_bytes().to_vec()).collect();
781 let _ = sender.send(target, bytes);
782 }
783
784 if let Some(ref mut tx) = file_monitor_tx {
786 let msg = FileMonitorMessage { lines: final_lines };
787 tx.post(msg);
788 }
789 }
790
791 if let Some(ref mut sender) = log_sender {
793 let _ = sender.flush();
794 }
795
796 Ok(())
797}
798
799#[derive(Debug, Clone)]
800struct RotatingLineBuffer {
801 recent_lines: Arc<RwLock<VecDeque<String>>>,
802 max_buffer_size: usize,
803}
804
805impl RotatingLineBuffer {
806 fn try_add_data(&self, buf: &[u8], buf_end: usize) {
807 let data_str = String::from_utf8_lossy(&buf[..buf_end]);
808 let lines: Vec<&str> = data_str.lines().collect();
809
810 if let Ok(mut recent_lines_guard) = self.recent_lines.try_write() {
811 for line in lines {
812 if !line.is_empty() {
813 recent_lines_guard.push_back(line.to_string());
814 if recent_lines_guard.len() > self.max_buffer_size {
815 recent_lines_guard.pop_front();
816 }
817 }
818 }
819 } else {
820 tracing::debug!("Failed to acquire write lock on recent_lines buffer in tee");
821 }
822 }
823
824 async fn peek(&self) -> Vec<String> {
825 let lines = self.recent_lines.read().await;
826 let start_idx = if lines.len() > self.max_buffer_size {
827 lines.len() - self.max_buffer_size
828 } else {
829 0
830 };
831
832 lines.range(start_idx..).cloned().collect()
833 }
834}
835
836pub struct StreamFwder {
838 teer: JoinHandle<Result<(), io::Error>>,
839 recent_lines_buf: RotatingLineBuffer,
841 stop: Arc<Notify>,
843}
844
845impl StreamFwder {
846 pub fn start(
853 reader: impl AsyncRead + Unpin + Send + 'static,
854 file_monitor_addr: Option<ChannelAddr>,
855 target: OutputTarget,
856 max_buffer_size: usize,
857 log_channel: Option<ChannelAddr>,
858 proc_id: &ProcAddr,
859 local_rank: usize,
860 ) -> Self {
861 let prefix = match hyperactor_config::global::get(PREFIX_WITH_RANK) {
862 true => Some(local_rank.to_string()),
863 false => None,
864 };
865 let std_writer = std_writer(target);
866
867 Self::start_with_writer(
868 reader,
869 std_writer,
870 file_monitor_addr,
871 target,
872 max_buffer_size,
873 log_channel,
874 proc_id,
875 prefix,
876 )
877 }
878
879 fn start_with_writer(
881 reader: impl AsyncRead + Unpin + Send + 'static,
882 std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,
883 file_monitor_addr: Option<ChannelAddr>,
884 target: OutputTarget,
885 max_buffer_size: usize,
886 log_channel: Option<ChannelAddr>,
887 proc_id: &ProcAddr,
888 prefix: Option<String>,
889 ) -> Self {
890 debug_assert!(
896 file_monitor_addr.is_some() || max_buffer_size > 0 || log_channel.is_some(),
897 "StreamFwder started with no sinks and no tail"
898 );
899
900 let stop = Arc::new(Notify::new());
901 let recent_lines_buf = RotatingLineBuffer {
902 recent_lines: Arc::new(RwLock::new(VecDeque::<String>::with_capacity(
903 max_buffer_size,
904 ))),
905 max_buffer_size,
906 };
907
908 let log_sender: Option<Box<dyn LogSender + Send>> = if let Some(addr) = log_channel {
909 match LocalLogSender::new(addr, proc_id) {
910 Ok(s) => Some(Box::new(s) as Box<dyn LogSender + Send>),
911 Err(e) => {
912 tracing::error!("failed to create log sender: {}", e);
913 None
914 }
915 }
916 } else {
917 None
918 };
919
920 let teer_stop = stop.clone();
921 let recent_line_buf_clone = recent_lines_buf.clone();
922 let teer = tokio::spawn(async move {
923 tee(
924 reader,
925 std_writer,
926 log_sender,
927 file_monitor_addr,
928 target,
929 prefix,
930 teer_stop,
931 recent_line_buf_clone,
932 )
933 .await
934 });
935
936 StreamFwder {
937 teer,
938 recent_lines_buf,
939 stop,
940 }
941 }
942
943 pub async fn abort(self) -> (Vec<String>, Result<(), anyhow::Error>) {
944 self.stop.notify_waiters();
945
946 let lines = self.peek().await;
947 let teer_result = self.teer.await;
948
949 let result: Result<(), anyhow::Error> = match teer_result {
950 Ok(inner) => inner.map_err(anyhow::Error::from),
951 Err(e) => Err(e.into()),
952 };
953
954 (lines, result)
955 }
956
957 pub async fn peek(&self) -> Vec<String> {
960 self.recent_lines_buf.peek().await
961 }
962}
963
964#[derive(
966 Debug,
967 Clone,
968 Serialize,
969 Deserialize,
970 Named,
971 Handler,
972 HandleClient,
973 RefClient,
974 Bind,
975 Unbind
976)]
977pub enum LogForwardMessage {
978 Forward {},
980
981 SetMode { stream_to_client: bool },
983
984 ForceSyncFlush { version: u64 },
986}
987
988#[hyperactor::export(LogForwardMessage { cast = true })]
990#[hyperactor::spawnable]
991pub struct LogForwardActor {
992 rx: ChannelRx<LogMessage>,
993 flush_tx: Arc<Mutex<ChannelTx<LogMessage>>>,
994 next_flush_deadline: SystemTime,
995 logging_client_ref: ActorRef<LogClientActor>,
996 stream_to_client: bool,
997}
998
999#[async_trait]
1000impl Actor for LogForwardActor {
1001 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1002 this.set_system();
1003 this.post_after(this, LogForwardMessage::Forward {}, Duration::from_secs(0));
1004
1005 self.flush_tx
1007 .lock()
1008 .await
1009 .send(LogMessage::Flush { sync_version: None })
1010 .await?;
1011 Ok(())
1012 }
1013}
1014
1015#[async_trait]
1016impl hyperactor::RemoteSpawn for LogForwardActor {
1017 type Params = ActorRef<LogClientActor>;
1018
1019 async fn new(logging_client_ref: Self::Params, _environment: Flattrs) -> Result<Self> {
1020 let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
1021 Ok(channel) => channel.parse()?,
1022 Err(err) => {
1023 tracing::debug!(
1024 "log forwarder actor failed to read env var {}: {}",
1025 BOOTSTRAP_LOG_CHANNEL,
1026 err
1027 );
1028 ChannelAddr::any(ChannelTransport::Unix)
1030 }
1031 };
1032 tracing::info!(
1033 "log forwarder {} serve at {}",
1034 std::process::id(),
1035 log_channel
1036 );
1037
1038 let rx = match channel::serve(log_channel.clone()) {
1039 Ok((_, rx)) => rx,
1040 Err(err) => {
1041 tracing::error!(
1044 "log forwarder actor failed to bootstrap on given channel {}: {}",
1045 log_channel,
1046 err
1047 );
1048 channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1
1049 }
1050 };
1051
1052 let flush_tx = Arc::new(Mutex::new(channel::dial::<LogMessage>(log_channel)?));
1054 let now = std::time::SystemTime::now();
1055
1056 Ok(Self {
1057 rx,
1058 flush_tx,
1059 next_flush_deadline: now,
1060 logging_client_ref,
1061 stream_to_client: true,
1062 })
1063 }
1064}
1065
1066#[async_trait]
1067#[hyperactor::handle(LogForwardMessage)]
1068impl LogForwardMessageHandler for LogForwardActor {
1069 async fn forward(&mut self, ctx: &Context<Self>) -> Result<(), anyhow::Error> {
1070 match self.rx.recv().await {
1071 Ok(LogMessage::Flush { sync_version }) => {
1072 let now = std::time::SystemTime::now();
1073 match sync_version {
1074 None => {
1075 let delay = Duration::from_secs(1);
1077 if now >= self.next_flush_deadline {
1078 self.next_flush_deadline = now + delay;
1079 let flush_tx = self.flush_tx.clone();
1080 tokio::spawn(async move {
1081 tokio::time::sleep(delay).await;
1082 if let Err(e) = flush_tx
1083 .lock()
1084 .await
1085 .send(LogMessage::Flush { sync_version: None })
1086 .await
1087 {
1088 tracing::error!("failed to send flush message: {}", e);
1089 }
1090 });
1091 }
1092 }
1093 version => {
1094 self.logging_client_ref.flush(ctx, version).await?;
1095 }
1096 }
1097 }
1098 Ok(LogMessage::Log {
1099 hostname,
1100 proc_id,
1101 output_target,
1102 payload,
1103 }) => {
1104 if self.stream_to_client {
1105 self.logging_client_ref
1106 .log(ctx, hostname, proc_id, output_target, payload)
1107 .await?;
1108 }
1109 }
1110 Err(e) => {
1111 return Err(e.into());
1112 }
1113 }
1114
1115 ctx.post_after(ctx, LogForwardMessage::Forward {}, Duration::from_secs(0));
1117
1118 Ok(())
1119 }
1120
1121 async fn set_mode(
1122 &mut self,
1123 _ctx: &Context<Self>,
1124 stream_to_client: bool,
1125 ) -> Result<(), anyhow::Error> {
1126 self.stream_to_client = stream_to_client;
1127 Ok(())
1128 }
1129
1130 async fn force_sync_flush(
1131 &mut self,
1132 _cx: &Context<Self>,
1133 version: u64,
1134 ) -> Result<(), anyhow::Error> {
1135 self.flush_tx
1136 .lock()
1137 .await
1138 .send(LogMessage::Flush {
1139 sync_version: Some(version),
1140 })
1141 .await
1142 .map_err(anyhow::Error::from)
1143 }
1144}
1145
1146fn deserialize_message_lines(serialized_message: &wirevalue::Any) -> Result<Vec<Vec<String>>> {
1148 if let Ok(message_bytes) = serialized_message.deserialized::<Vec<Vec<u8>>>() {
1150 let mut result = Vec::new();
1151 for bytes in message_bytes {
1152 let message_str = String::from_utf8(bytes)?;
1153 let lines: Vec<String> = message_str.lines().map(|s| s.to_string()).collect();
1154 result.push(lines);
1155 }
1156 return Ok(result);
1157 }
1158
1159 if let Ok(message) = serialized_message.deserialized::<String>() {
1161 let lines: Vec<String> = message.lines().map(|s| s.to_string()).collect();
1162 return Ok(vec![lines]);
1163 }
1164
1165 anyhow::bail!("failed to deserialize message as either Vec<Vec<u8>> or String")
1167}
1168
1169#[derive(Debug)]
1171#[hyperactor::export(LogMessage, LogClientMessage)]
1172#[hyperactor::spawnable]
1173pub struct LogClientActor {
1174 aggregate_window_sec: Option<u64>,
1175 aggregators: HashMap<OutputTarget, Aggregator>,
1176 last_flush_time: SystemTime,
1177 next_flush_deadline: Option<SystemTime>,
1178
1179 current_flush_version: u64,
1181 current_flush_port: Option<OncePortRef<()>>,
1182 current_unflushed_procs: usize,
1183}
1184
1185impl Default for LogClientActor {
1186 fn default() -> Self {
1187 let mut aggregators = HashMap::new();
1189 aggregators.insert(OutputTarget::Stderr, Aggregator::new());
1190 aggregators.insert(OutputTarget::Stdout, Aggregator::new());
1191
1192 Self {
1193 aggregate_window_sec: Some(DEFAULT_AGGREGATE_WINDOW_SEC),
1194 aggregators,
1195 last_flush_time: std::time::SystemTime::now(),
1196 next_flush_deadline: None,
1197 current_flush_version: 0,
1198 current_flush_port: None,
1199 current_unflushed_procs: 0,
1200 }
1201 }
1202}
1203
1204#[async_trait]
1205impl Actor for LogClientActor {
1206 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1207 this.set_system();
1208 Ok(())
1209 }
1210}
1211
1212impl LogClientActor {
1213 fn print_aggregators(&mut self) {
1214 for (output_target, aggregator) in self.aggregators.iter_mut() {
1215 if aggregator.is_empty() {
1216 continue;
1217 }
1218 match output_target {
1219 OutputTarget::Stdout => {
1220 println!("{}", aggregator);
1221 }
1222 OutputTarget::Stderr => {
1223 eprintln!("{}", aggregator);
1224 }
1225 }
1226
1227 aggregator.reset();
1229 }
1230 }
1231
1232 fn print_log_line(hostname: &str, proc_id: &str, output_target: OutputTarget, line: String) {
1233 let message = format!("[{} {}] {}", hostname, proc_id, line);
1234
1235 #[cfg(test)]
1236 crate::logging::test_tap::push(&message);
1237
1238 match output_target {
1239 OutputTarget::Stdout => println!("{}", message),
1240 OutputTarget::Stderr => eprintln!("{}", message),
1241 }
1242 }
1243
1244 fn flush_internal(&mut self) {
1245 self.print_aggregators();
1246 self.last_flush_time = std::time::SystemTime::now();
1247 self.next_flush_deadline = None;
1248 }
1249}
1250
1251impl Drop for LogClientActor {
1252 fn drop(&mut self) {
1253 self.print_aggregators();
1255 }
1256}
1257
1258#[async_trait]
1259#[hyperactor::handle(LogMessage)]
1260impl LogMessageHandler for LogClientActor {
1261 async fn log(
1262 &mut self,
1263 cx: &Context<Self>,
1264 hostname: String,
1265 proc_id: String,
1266 output_target: OutputTarget,
1267 payload: wirevalue::Any,
1268 ) -> Result<(), anyhow::Error> {
1269 let message_line_groups = deserialize_message_lines(&payload)?;
1271 let hostname = hostname.as_str();
1272
1273 let message_lines: Vec<String> = message_line_groups.into_iter().flatten().collect();
1274 match self.aggregate_window_sec {
1275 None => {
1276 for line in message_lines {
1277 Self::print_log_line(hostname, &proc_id, output_target, line);
1278 }
1279 self.last_flush_time = std::time::SystemTime::now();
1280 }
1281 Some(window) => {
1282 for line in message_lines {
1283 if let Some(aggregator) = self.aggregators.get_mut(&output_target) {
1284 if let Err(e) = aggregator.add_line(&line) {
1285 tracing::error!("error adding log line: {}", e);
1286 Self::print_log_line(hostname, &proc_id, output_target, line);
1288 }
1289 } else {
1290 tracing::error!("unknown output target: {:?}", output_target);
1291 Self::print_log_line(hostname, &proc_id, output_target, line);
1293 }
1294 }
1295
1296 let new_deadline = self.last_flush_time + Duration::from_secs(window);
1297 let now = std::time::SystemTime::now();
1298 if new_deadline <= now {
1299 self.flush_internal();
1300 } else {
1301 let delay = new_deadline.duration_since(now)?;
1302 match self.next_flush_deadline {
1303 None => {
1304 self.next_flush_deadline = Some(new_deadline);
1305 cx.post_after(cx, LogMessage::Flush { sync_version: None }, delay);
1306 }
1307 Some(deadline) => {
1308 if new_deadline < deadline {
1310 self.next_flush_deadline = Some(new_deadline);
1312 cx.post_after(cx, LogMessage::Flush { sync_version: None }, delay);
1313 }
1314 }
1315 }
1316 }
1317 }
1318 }
1319
1320 Ok(())
1321 }
1322
1323 async fn flush(
1324 &mut self,
1325 cx: &Context<Self>,
1326 sync_version: Option<u64>,
1327 ) -> Result<(), anyhow::Error> {
1328 match sync_version {
1329 None => {
1330 self.flush_internal();
1331 }
1332 Some(version) => {
1333 if version != self.current_flush_version {
1334 tracing::error!(
1335 "found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
1336 version,
1337 self.current_flush_version
1338 );
1339 return Ok(());
1340 }
1341
1342 if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
1343 anyhow::bail!("found no ongoing flush request");
1345 }
1346 self.current_unflushed_procs -= 1;
1347
1348 tracing::debug!(
1349 "ack sync flush: version {}; remaining procs: {}",
1350 self.current_flush_version,
1351 self.current_unflushed_procs
1352 );
1353
1354 if self.current_unflushed_procs == 0 {
1355 self.flush_internal();
1356 let reply = self.current_flush_port.take().unwrap();
1357 self.current_flush_port = None;
1358 reply.post(cx, ());
1359 }
1360 }
1361 }
1362
1363 Ok(())
1364 }
1365}
1366
1367#[async_trait]
1368#[hyperactor::handle(LogClientMessage)]
1369impl LogClientMessageHandler for LogClientActor {
1370 async fn set_aggregate(
1371 &mut self,
1372 _cx: &Context<Self>,
1373 aggregate_window_sec: Option<u64>,
1374 ) -> Result<(), anyhow::Error> {
1375 if self.aggregate_window_sec.is_some() && aggregate_window_sec.is_none() {
1376 self.print_aggregators();
1378 }
1379 self.aggregate_window_sec = aggregate_window_sec;
1380 Ok(())
1381 }
1382
1383 async fn start_sync_flush(
1384 &mut self,
1385 cx: &Context<Self>,
1386 expected_procs_flushed: usize,
1387 reply: OncePortRef<()>,
1388 version: OncePortRef<u64>,
1389 ) -> Result<(), anyhow::Error> {
1390 if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
1391 tracing::warn!(
1392 "found unfinished ongoing flush: version {}; {} unflushed procs",
1393 self.current_flush_version,
1394 self.current_unflushed_procs,
1395 );
1396 }
1397
1398 self.current_flush_version += 1;
1399 tracing::debug!(
1400 "start sync flush with version {}",
1401 self.current_flush_version
1402 );
1403 self.current_flush_port = Some(reply.clone());
1404 self.current_unflushed_procs = expected_procs_flushed;
1405 version.post(cx, self.current_flush_version);
1406 Ok(())
1407 }
1408}
1409
1410#[cfg(test)]
1411pub mod test_tap {
1412 use std::sync::Mutex;
1413 use std::sync::OnceLock;
1414
1415 use tokio::sync::mpsc::UnboundedReceiver;
1416 use tokio::sync::mpsc::UnboundedSender;
1417
1418 static TAP: OnceLock<UnboundedSender<String>> = OnceLock::new();
1419 static RX: OnceLock<Mutex<UnboundedReceiver<String>>> = OnceLock::new();
1420
1421 pub fn install(tx: UnboundedSender<String>) {
1423 let _ = TAP.set(tx);
1424 }
1425
1426 pub fn set_receiver(rx: UnboundedReceiver<String>) {
1428 let _ = RX.set(Mutex::new(rx));
1429 }
1430
1431 pub fn push(s: &str) {
1433 if let Some(tx) = TAP.get() {
1434 let _ = tx.send(s.to_string());
1435 }
1436 }
1437
1438 pub fn drain() -> Vec<String> {
1440 let mut out = Vec::new();
1441 if let Some(rx) = RX.get() {
1442 let mut rx = rx.lock().unwrap();
1443 while let Ok(line) = rx.try_recv() {
1444 out.push(line);
1445 }
1446 }
1447 out
1448 }
1449}
1450
1451#[cfg(test)]
1452mod tests {
1453
1454 use std::sync::Arc;
1455 use std::sync::Mutex;
1456
1457 use hyperactor::ProcAddr;
1458 use hyperactor::RemoteSpawn;
1459 use hyperactor::channel;
1460 use hyperactor::channel::ChannelAddr;
1461 use hyperactor::channel::ChannelTx;
1462 use hyperactor::channel::Tx;
1463 use hyperactor::mailbox::BoxedMailboxSender;
1464 use hyperactor::mailbox::DialMailboxRouter;
1465 use hyperactor::mailbox::MailboxServer;
1466 use hyperactor::proc::Proc;
1467 use hyperactor::testing::ids::test_proc_id;
1468 use tokio::io::AsyncSeek;
1469 use tokio::io::AsyncSeekExt;
1470 use tokio::io::AsyncWriteExt;
1471 use tokio::io::SeekFrom;
1472 use tokio::sync::mpsc;
1473
1474 use super::*;
1475
1476 #[derive(Debug)]
1478 struct FileProcessingResult {
1479 lines: Vec<Vec<u8>>,
1481 new_position: u64,
1483 incomplete_line_buffer: Vec<u8>,
1485 }
1486
1487 async fn process_file_content<R: AsyncRead + AsyncSeek + Unpin>(
1490 reader: &mut R,
1491 current_position: u64,
1492 file_size: u64,
1493 existing_line_buffer: Vec<u8>,
1494 max_buffer_size: usize,
1495 ) -> Result<FileProcessingResult> {
1496 if current_position == file_size {
1498 return Ok(FileProcessingResult {
1499 lines: Vec::new(),
1500 new_position: current_position,
1501 incomplete_line_buffer: existing_line_buffer,
1502 });
1503 }
1504
1505 let actual_position = if current_position > file_size {
1507 tracing::warn!(
1508 "File appears to have been truncated (position {} > file size {}), resetting to start",
1509 current_position,
1510 file_size
1511 );
1512 reader.seek(SeekFrom::Start(0)).await?;
1513 0
1514 } else {
1515 reader.seek(SeekFrom::Start(current_position)).await?;
1517 current_position
1518 };
1519
1520 let mut buf = vec![0u8; 128 * 1024];
1521 let mut line_buffer = existing_line_buffer;
1522 let mut lines = Vec::with_capacity(max_buffer_size);
1523 let mut processed_bytes = 0u64;
1524
1525 loop {
1526 let bytes_read = reader.read(&mut buf).await?;
1527 if bytes_read == 0 {
1528 break;
1529 }
1530
1531 let chunk = &buf[..bytes_read];
1532
1533 let mut start = 0;
1534 while let Some(newline_pos) = chunk[start..].iter().position(|&b| b == b'\n') {
1535 let absolute_pos = start + newline_pos;
1536
1537 line_buffer.extend_from_slice(&chunk[start..absolute_pos]);
1538
1539 if !line_buffer.is_empty() {
1540 if line_buffer.len() > MAX_LINE_SIZE {
1541 line_buffer.truncate(MAX_LINE_SIZE);
1542 line_buffer.extend_from_slice(b"... [TRUNCATED]");
1543 }
1544
1545 let line_data = std::mem::replace(&mut line_buffer, Vec::with_capacity(2048));
1546 lines.push(line_data);
1547 }
1548
1549 start = absolute_pos + 1;
1550
1551 if lines.len() >= max_buffer_size {
1553 let new_position = actual_position + processed_bytes + start as u64;
1556
1557 return Ok(FileProcessingResult {
1559 lines,
1560 new_position,
1561 incomplete_line_buffer: Vec::new(),
1562 });
1563 }
1564 }
1565
1566 processed_bytes += bytes_read as u64;
1568
1569 if start < chunk.len() {
1570 line_buffer.extend_from_slice(&chunk[start..]);
1571 }
1572 }
1573
1574 let new_position = actual_position + processed_bytes;
1575
1576 Ok(FileProcessingResult {
1577 lines,
1578 new_position,
1579 incomplete_line_buffer: line_buffer,
1580 })
1581 }
1582
1583 #[tokio::test]
1584 async fn test_forwarding_log_to_client() {
1585 let router = DialMailboxRouter::new();
1587 let (proc_addr, client_rx) =
1588 channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1589 let proc = Proc::configured(
1590 test_proc_id("client_0"),
1591 BoxedMailboxSender::new(router.clone()),
1592 );
1593 proc.clone().serve(client_rx);
1594 let proc_ref: ProcAddr = test_proc_id("client_0");
1595 router.bind(proc_ref, proc_addr.clone());
1596 let (client, _handle) = proc.client("client").unwrap();
1597
1598 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
1600 unsafe {
1602 std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
1603 }
1604 let log_client_actor = LogClientActor::new((), Flattrs::default()).await.unwrap();
1605 let log_client: ActorRef<LogClientActor> =
1606 proc.spawn("log_client", log_client_actor).unwrap().bind();
1607 let log_forwarder_actor = LogForwardActor::new(log_client.clone(), Flattrs::default())
1608 .await
1609 .unwrap();
1610 let log_forwarder: ActorRef<LogForwardActor> = proc
1611 .spawn("log_forwarder", log_forwarder_actor)
1612 .unwrap()
1613 .bind();
1614
1615 let tx: ChannelTx<LogMessage> = channel::dial(log_channel).unwrap();
1617 tx.post(LogMessage::Log {
1618 hostname: "my_host".into(),
1619 proc_id: "test_proc".into(),
1620 output_target: OutputTarget::Stderr,
1621 payload: wirevalue::Any::serialize(&"will not stream".to_string()).unwrap(),
1622 });
1623
1624 log_forwarder.set_mode(&client, true).await.unwrap();
1626 tx.post(LogMessage::Log {
1627 hostname: "my_host".into(),
1628 proc_id: "test_proc".into(),
1629 output_target: OutputTarget::Stderr,
1630 payload: wirevalue::Any::serialize(&"will stream".to_string()).unwrap(),
1631 });
1632
1633 }
1635
1636 #[test]
1637 fn test_deserialize_message_lines_string() {
1638 let message = "Line 1\nLine 2\nLine 3".to_string();
1640 let serialized = wirevalue::Any::serialize(&message).unwrap();
1641
1642 let result = deserialize_message_lines(&serialized).unwrap();
1643 assert_eq!(result, vec![vec!["Line 1", "Line 2", "Line 3"]]);
1644
1645 let message_bytes = vec![
1647 "Hello\nWorld".as_bytes().to_vec(),
1648 "UTF-8 \u{1F980}\nTest".as_bytes().to_vec(),
1649 ];
1650 let serialized = wirevalue::Any::serialize(&message_bytes).unwrap();
1651
1652 let result = deserialize_message_lines(&serialized).unwrap();
1653 assert_eq!(
1654 result,
1655 vec![vec!["Hello", "World"], vec!["UTF-8 \u{1F980}", "Test"]]
1656 );
1657
1658 let message = "Single line message".to_string();
1660 let serialized = wirevalue::Any::serialize(&message).unwrap();
1661
1662 let result = deserialize_message_lines(&serialized).unwrap();
1663
1664 assert_eq!(result, vec![vec!["Single line message"]]);
1665
1666 let message = "\n\n".to_string();
1668 let serialized = wirevalue::Any::serialize(&message).unwrap();
1669
1670 let result = deserialize_message_lines(&serialized).unwrap();
1671
1672 assert_eq!(result, vec![vec!["", ""]]);
1673
1674 let invalid_utf8_bytes = vec![vec![0xFF, 0xFE, 0xFD]]; let serialized = wirevalue::Any::serialize(&invalid_utf8_bytes).unwrap();
1677
1678 let result = deserialize_message_lines(&serialized);
1679
1680 assert!(
1682 result.is_err(),
1683 "Expected deserialization to fail with invalid UTF-8 bytes"
1684 );
1685 }
1686 #[allow(dead_code)]
1687 struct MockLogSender {
1688 log_sender: mpsc::UnboundedSender<(OutputTarget, String)>, flush_called: Arc<Mutex<bool>>, }
1691
1692 impl MockLogSender {
1693 #[allow(dead_code)]
1694 fn new(log_sender: mpsc::UnboundedSender<(OutputTarget, String)>) -> Self {
1695 Self {
1696 log_sender,
1697 flush_called: Arc::new(Mutex::new(false)),
1698 }
1699 }
1700 }
1701
1702 #[async_trait]
1703 impl LogSender for MockLogSender {
1704 fn send(
1705 &mut self,
1706 output_target: OutputTarget,
1707 payload: Vec<Vec<u8>>,
1708 ) -> anyhow::Result<()> {
1709 let lines: Vec<String> = payload
1711 .iter()
1712 .map(|b| String::from_utf8_lossy(b).trim_end_matches('\n').to_owned())
1713 .collect();
1714
1715 for line in lines {
1716 self.log_sender
1717 .send((output_target, line))
1718 .map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e))?;
1719 }
1720 Ok(())
1721 }
1722
1723 fn flush(&mut self) -> anyhow::Result<()> {
1724 let mut flush_called = self.flush_called.lock().unwrap();
1726 *flush_called = true;
1727
1728 Ok(())
1731 }
1732 }
1733
1734 #[test]
1735 fn test_string_similarity() {
1736 assert_eq!(normalized_edit_distance("hello", "hello"), 0.0);
1738
1739 assert_eq!(normalized_edit_distance("hello", "i'mdiff"), 1.0);
1741
1742 assert!(normalized_edit_distance("hello", "helo") < 0.5);
1744 assert!(normalized_edit_distance("hello", "hello!") < 0.5);
1745
1746 assert_eq!(normalized_edit_distance("", ""), 0.0);
1748 assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1749 }
1750
1751 #[test]
1752 fn test_add_line_to_empty_aggregator() {
1753 let mut aggregator = Aggregator::new();
1754 let result = aggregator.add_line("ERROR 404 not found");
1755
1756 assert!(result.is_ok());
1757 assert_eq!(aggregator.lines.len(), 1);
1758 assert_eq!(aggregator.lines[0].content, "ERROR 404 not found");
1759 assert_eq!(aggregator.lines[0].count, 1);
1760 }
1761
1762 #[test]
1763 fn test_add_line_merges_with_similar_line() {
1764 let mut aggregator = Aggregator::new_with_threshold(0.2);
1765
1766 aggregator.add_line("ERROR 404 timeout").unwrap();
1768 assert_eq!(aggregator.lines.len(), 1);
1769
1770 aggregator.add_line("ERROR 500 timeout").unwrap();
1772 assert_eq!(aggregator.lines.len(), 1); assert_eq!(aggregator.lines[0].count, 2);
1774
1775 aggregator
1777 .add_line("WARNING database connection failed")
1778 .unwrap();
1779 assert_eq!(aggregator.lines.len(), 2); aggregator
1783 .add_line("WARNING database connection timed out")
1784 .unwrap();
1785 assert_eq!(aggregator.lines.len(), 2); assert_eq!(aggregator.lines[1].count, 2); }
1788
1789 #[test]
1790 fn test_aggregation_of_similar_log_lines() {
1791 let mut aggregator = Aggregator::new_with_threshold(0.2);
1792
1793 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/i/ims-ansi, falling back to search/replace to update prefixes for this file.").unwrap();
1795 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,351 conda-unpack-fb:292] Found invalid offsets for lib/pkgconfig/ncursesw.pc, falling back to search/replace to update prefixes for this file.").unwrap();
1796 aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/k/kt7, falling back to search/replace to update prefixes for this file.").unwrap();
1797
1798 assert_eq!(aggregator.lines.len(), 1);
1800
1801 assert_eq!(aggregator.lines[0].count, 3);
1803 }
1804
1805 #[tokio::test]
1806 async fn test_stream_fwd_creation() {
1807 hyperactor_telemetry::initialize_logging_for_test();
1808
1809 let (mut writer, reader) = tokio::io::duplex(1024);
1810 let (log_channel, mut rx) =
1811 channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1812
1813 let temp_file = tempfile::NamedTempFile::new().unwrap();
1815 let temp_path = temp_file.path().to_path_buf();
1816
1817 let file_writer = tokio::fs::OpenOptions::new()
1819 .create(true)
1820 .write(true)
1821 .append(true)
1822 .open(&temp_path)
1823 .await
1824 .unwrap();
1825
1826 let file_monitor = FileAppender::new();
1828 let file_monitor_addr = file_monitor
1829 .as_ref()
1830 .map(|fm| fm.addr_for(OutputTarget::Stdout));
1831
1832 let the_test_proc_id = test_proc_id("testproc_0");
1833 let monitor = StreamFwder::start_with_writer(
1834 reader,
1835 Box::new(file_writer),
1836 file_monitor_addr,
1837 OutputTarget::Stdout,
1838 3, Some(log_channel),
1840 &the_test_proc_id,
1841 None, );
1843
1844 tokio::time::sleep(Duration::from_millis(500)).await;
1846
1847 writer.write_all(b"Initial log line\n").await.unwrap();
1849 writer.flush().await.unwrap();
1850
1851 for i in 1..=5 {
1853 writer
1854 .write_all(format!("New log line {}\n", i).as_bytes())
1855 .await
1856 .unwrap();
1857 }
1858 writer.flush().await.unwrap();
1859
1860 tokio::time::sleep(Duration::from_millis(500)).await;
1862
1863 let timeout = Duration::from_secs(1);
1865 let _ = tokio::time::timeout(timeout, rx.recv())
1866 .await
1867 .unwrap_or_else(|_| panic!("Did not get log message within {:?}", timeout));
1868
1869 tokio::time::sleep(Duration::from_millis(200)).await;
1871
1872 let (recent_lines, _result) = monitor.abort().await;
1873
1874 assert!(
1875 recent_lines.len() >= 3,
1876 "Expected buffer with at least 3 lines, got {} lines: {:?}",
1877 recent_lines.len(),
1878 recent_lines
1879 );
1880
1881 let file_contents = std::fs::read_to_string(&temp_path).unwrap();
1882 assert!(
1883 file_contents.contains("Initial log line"),
1884 "Expected temp file to contain 'Initial log line', got: {:?}",
1885 file_contents
1886 );
1887 assert!(
1888 file_contents.contains("New log line 1"),
1889 "Expected temp file to contain 'New log line 1', got: {:?}",
1890 file_contents
1891 );
1892 assert!(
1893 file_contents.contains("New log line 5"),
1894 "Expected temp file to contain 'New log line 5', got: {:?}",
1895 file_contents
1896 );
1897 }
1898
1899 #[test]
1900 fn test_aggregator_custom_threshold() {
1901 let mut strict_aggregator = Aggregator::new_with_threshold(0.05);
1903 strict_aggregator.add_line("ERROR 404").unwrap();
1904 strict_aggregator.add_line("ERROR 500").unwrap(); assert_eq!(strict_aggregator.lines.len(), 2);
1906
1907 let mut lenient_aggregator = Aggregator::new_with_threshold(0.8);
1909 lenient_aggregator.add_line("ERROR 404").unwrap();
1910 lenient_aggregator.add_line("WARNING 200").unwrap(); assert_eq!(lenient_aggregator.lines.len(), 1);
1912 assert_eq!(lenient_aggregator.lines[0].count, 2);
1913 }
1914
1915 #[test]
1916 fn test_format_system_time() {
1917 let test_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1609459200); let formatted = format_system_time(test_time);
1919
1920 assert!(formatted.contains("-"));
1922 assert!(formatted.contains(":"));
1923 assert!(formatted.len() > 10); }
1925
1926 #[test]
1927 fn test_aggregator_display_formatting() {
1928 let mut aggregator = Aggregator::new();
1929 aggregator.add_line("Test error message").unwrap();
1930 aggregator.add_line("Test error message").unwrap(); let display_string = format!("{}", aggregator);
1933
1934 assert!(display_string.contains("Aggregated Logs"));
1936 assert!(display_string.contains("[2 similar log lines]"));
1937 assert!(display_string.contains("Test error message"));
1938 assert!(display_string.contains(">>>") && display_string.contains("<<<"));
1939 }
1940
1941 #[tokio::test]
1942 async fn test_local_log_sender_inactive_status() {
1943 let (log_channel, _) =
1944 channel::serve::<LogMessage>(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
1945 let test_proc_id = test_proc_id("testproc_0");
1946 let mut sender = LocalLogSender::new(log_channel, &test_proc_id).unwrap();
1947
1948 let result = sender.send(OutputTarget::Stdout, vec![b"test".to_vec()]);
1952 assert!(result.is_ok());
1953
1954 let result = sender.flush();
1955 assert!(result.is_ok());
1956 }
1957
1958 #[test]
1959 fn test_levenshtein_distance_edge_cases() {
1960 assert_eq!(levenshtein_distance("", ""), 0);
1962 assert_eq!(levenshtein_distance("", "hello"), 5);
1963 assert_eq!(levenshtein_distance("hello", ""), 5);
1964
1965 assert_eq!(levenshtein_distance("hello", "hello"), 0);
1967
1968 assert_eq!(levenshtein_distance("hello", "helo"), 1); assert_eq!(levenshtein_distance("helo", "hello"), 1); assert_eq!(levenshtein_distance("hello", "hallo"), 1); assert_eq!(levenshtein_distance("café", "cafe"), 1);
1975 }
1976
1977 #[test]
1978 fn test_normalized_edit_distance_edge_cases() {
1979 assert_eq!(normalized_edit_distance("", ""), 0.0);
1981
1982 assert_eq!(normalized_edit_distance("hello", ""), 1.0);
1984 assert_eq!(normalized_edit_distance("", "hello"), 1.0);
1985
1986 let distance = normalized_edit_distance("completely", "different");
1988 assert!((0.0..=1.0).contains(&distance));
1989 }
1990
1991 #[tokio::test]
1992 async fn test_deserialize_message_lines_edge_cases() {
1993 let empty_message = "".to_string();
1995 let serialized = wirevalue::Any::serialize(&empty_message).unwrap();
1996 let result = deserialize_message_lines(&serialized).unwrap();
1997 assert_eq!(result, vec![vec![] as Vec<String>]);
1998
1999 let trailing_newline = "line1\nline2\n".to_string();
2001 let serialized = wirevalue::Any::serialize(&trailing_newline).unwrap();
2002 let result = deserialize_message_lines(&serialized).unwrap();
2003 assert_eq!(result, vec![vec!["line1", "line2"]]);
2004 }
2005
2006 #[test]
2007 fn test_output_target_serialization() {
2008 let stdout_serialized = serde_json::to_string(&OutputTarget::Stdout).unwrap();
2010 let stderr_serialized = serde_json::to_string(&OutputTarget::Stderr).unwrap();
2011
2012 let stdout_deserialized: OutputTarget = serde_json::from_str(&stdout_serialized).unwrap();
2013 let stderr_deserialized: OutputTarget = serde_json::from_str(&stderr_serialized).unwrap();
2014
2015 assert_eq!(stdout_deserialized, OutputTarget::Stdout);
2016 assert_eq!(stderr_deserialized, OutputTarget::Stderr);
2017 }
2018
2019 #[test]
2020 fn test_log_line_display_formatting() {
2021 let log_line = LogLine::new("Test message".to_string());
2022 let display_string = format!("{}", log_line);
2023
2024 assert!(display_string.contains("[1 similar log lines]"));
2025 assert!(display_string.contains("Test message"));
2026
2027 let mut log_line_multi = LogLine::new("Test message".to_string());
2029 log_line_multi.count = 5;
2030 let display_string_multi = format!("{}", log_line_multi);
2031
2032 assert!(display_string_multi.contains("[5 similar log lines]"));
2033 assert!(display_string_multi.contains("Test message"));
2034 }
2035
2036 fn create_mock_reader(data: Vec<u8>) -> std::io::Cursor<Vec<u8>> {
2038 std::io::Cursor::new(data)
2039 }
2040
2041 #[tokio::test]
2042 async fn test_process_file_content_basic() {
2043 let data = b"line1\nline2\nline3\n".to_vec();
2044 let mut reader = create_mock_reader(data.clone());
2045 let max_buf_size = 10;
2046
2047 let result =
2048 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2049 .await
2050 .unwrap();
2051
2052 assert_eq!(result.lines.len(), 3);
2053 assert_eq!(result.lines[0], b"line1");
2054 assert_eq!(result.lines[1], b"line2");
2055 assert_eq!(result.lines[2], b"line3");
2056 assert_eq!(result.new_position, data.len() as u64);
2057 assert!(result.incomplete_line_buffer.is_empty());
2058 }
2059
2060 #[tokio::test]
2061 async fn test_process_file_content_incomplete_line() {
2062 let data = b"line1\nline2\npartial".to_vec();
2063 let mut reader = create_mock_reader(data.clone());
2064 let max_buf_size = 10;
2065
2066 let result =
2067 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2068 .await
2069 .unwrap();
2070
2071 assert_eq!(result.lines.len(), 2);
2072 assert_eq!(result.lines[0], b"line1");
2073 assert_eq!(result.lines[1], b"line2");
2074 assert_eq!(result.new_position, data.len() as u64);
2075 assert_eq!(result.incomplete_line_buffer, b"partial");
2076 }
2077
2078 #[tokio::test]
2079 async fn test_process_file_content_with_existing_buffer() {
2080 let data = b"omplete\nline2\nline3\n".to_vec();
2081 let mut reader = create_mock_reader(data.clone());
2082 let existing_buffer = b"inc".to_vec();
2083 let max_buf_size = 10;
2084
2085 let result = process_file_content(
2086 &mut reader,
2087 0,
2088 data.len() as u64,
2089 existing_buffer,
2090 max_buf_size,
2091 )
2092 .await
2093 .unwrap();
2094
2095 assert_eq!(result.lines.len(), 3);
2096 assert_eq!(result.lines[0], b"incomplete");
2097 assert_eq!(result.lines[1], b"line2");
2098 assert_eq!(result.lines[2], b"line3");
2099 assert_eq!(result.new_position, data.len() as u64);
2100 assert!(result.incomplete_line_buffer.is_empty());
2101 }
2102
2103 #[tokio::test]
2104 async fn test_process_file_content_empty_file() {
2105 let data = Vec::new();
2106 let mut reader = create_mock_reader(data.clone());
2107 let max_buf_size = 10;
2108
2109 let result = process_file_content(&mut reader, 0, 0, Vec::new(), max_buf_size)
2110 .await
2111 .unwrap();
2112
2113 assert!(result.lines.is_empty());
2114 assert_eq!(result.new_position, 0);
2115 assert!(result.incomplete_line_buffer.is_empty());
2116 }
2117
2118 #[tokio::test]
2119 async fn test_process_file_content_only_newlines() {
2120 let data = b"\n\n\n".to_vec();
2121 let mut reader = create_mock_reader(data.clone());
2122 let max_buf_size = 10;
2123
2124 let result =
2125 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2126 .await
2127 .unwrap();
2128
2129 assert!(result.lines.is_empty());
2131 assert_eq!(result.new_position, data.len() as u64);
2132 assert!(result.incomplete_line_buffer.is_empty());
2133 }
2134
2135 #[tokio::test]
2136 async fn test_process_file_content_no_newlines() {
2137 let data = b"no newlines here".to_vec();
2138 let mut reader = create_mock_reader(data.clone());
2139 let max_buf_size = 10;
2140
2141 let result =
2142 process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), max_buf_size)
2143 .await
2144 .unwrap();
2145
2146 assert!(result.lines.is_empty());
2147 assert_eq!(result.new_position, data.len() as u64);
2148 assert_eq!(result.incomplete_line_buffer, b"no newlines here");
2149 }
2150
2151 #[tokio::test]
2152 async fn test_process_file_content_file_truncation() {
2153 let data = b"line1\nline2\n".to_vec();
2154 let mut reader = create_mock_reader(data.clone());
2155
2156 let result = process_file_content(
2158 &mut reader,
2159 100, data.len() as u64,
2161 Vec::new(),
2162 10, )
2164 .await
2165 .unwrap();
2166
2167 assert_eq!(result.lines.len(), 2);
2169 assert_eq!(result.lines[0], b"line1");
2170 assert_eq!(result.lines[1], b"line2");
2171 assert_eq!(result.new_position, data.len() as u64);
2172 assert!(result.incomplete_line_buffer.is_empty());
2173 }
2174
2175 #[tokio::test]
2176 async fn test_process_file_content_seek_to_position() {
2177 let data = b"line1\nline2\nline3\n".to_vec();
2178 let mut reader = create_mock_reader(data.clone());
2179
2180 let result = process_file_content(&mut reader, 6, data.len() as u64, Vec::new(), 10)
2182 .await
2183 .unwrap();
2184
2185 assert_eq!(result.lines.len(), 2);
2186 assert_eq!(result.lines[0], b"line2");
2187 assert_eq!(result.lines[1], b"line3");
2188 assert_eq!(result.new_position, data.len() as u64);
2189 assert!(result.incomplete_line_buffer.is_empty());
2190 }
2191
2192 #[tokio::test]
2193 async fn test_process_file_content_position_equals_file_size() {
2194 let data = b"line1\nline2\n".to_vec();
2195 let mut reader = create_mock_reader(data.clone());
2196
2197 let result = process_file_content(
2199 &mut reader,
2200 data.len() as u64,
2201 data.len() as u64,
2202 Vec::new(),
2203 10,
2204 )
2205 .await
2206 .unwrap();
2207
2208 assert!(
2210 result.lines.is_empty(),
2211 "Expected empty line got {:?}",
2212 result.lines
2213 );
2214 assert_eq!(result.new_position, data.len() as u64);
2215 assert!(result.incomplete_line_buffer.is_empty());
2216 }
2217
2218 #[tokio::test]
2219 async fn test_process_file_content_large_line_truncation() {
2220 let large_line = "x".repeat(MAX_LINE_SIZE + 1000);
2222 let data = format!("{}\nline2\n", large_line).into_bytes();
2223 let mut reader = create_mock_reader(data.clone());
2224
2225 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2226 .await
2227 .unwrap();
2228
2229 assert_eq!(result.lines.len(), 2);
2230
2231 assert_eq!(
2233 result.lines[0].len(),
2234 MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2235 );
2236 assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2237
2238 assert_eq!(result.lines[1], b"line2");
2240
2241 assert_eq!(result.new_position, data.len() as u64);
2242 assert!(result.incomplete_line_buffer.is_empty());
2243 }
2244
2245 #[tokio::test]
2246 async fn test_process_file_content_mixed_line_endings() {
2247 let data = b"line1\nline2\r\nline3\n".to_vec();
2248 let mut reader = create_mock_reader(data.clone());
2249
2250 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2251 .await
2252 .unwrap();
2253
2254 assert_eq!(result.lines.len(), 3);
2255 assert_eq!(result.lines[0], b"line1");
2256 assert_eq!(result.lines[1], b"line2\r"); assert_eq!(result.lines[2], b"line3");
2258 assert_eq!(result.new_position, data.len() as u64);
2259 assert!(result.incomplete_line_buffer.is_empty());
2260 }
2261
2262 #[tokio::test]
2263 async fn test_process_file_content_existing_buffer_with_truncation() {
2264 let existing_buffer = "x".repeat(MAX_LINE_SIZE - 100);
2266 let data = format!("{}\nline2\n", "y".repeat(200)).into_bytes();
2267 let mut reader = create_mock_reader(data.clone());
2268
2269 let result = process_file_content(
2270 &mut reader,
2271 0,
2272 data.len() as u64,
2273 existing_buffer.into_bytes(),
2274 10,
2275 )
2276 .await
2277 .unwrap();
2278
2279 assert_eq!(result.lines.len(), 2);
2280
2281 assert_eq!(
2283 result.lines[0].len(),
2284 MAX_LINE_SIZE + b"... [TRUNCATED]".len()
2285 );
2286 assert!(result.lines[0].ends_with(b"... [TRUNCATED]"));
2287
2288 assert_eq!(result.lines[1], b"line2");
2290
2291 assert_eq!(result.new_position, data.len() as u64);
2292 assert!(result.incomplete_line_buffer.is_empty());
2293 }
2294
2295 #[tokio::test]
2296 async fn test_process_file_content_single_character_lines() {
2297 let data = b"a\nb\nc\n".to_vec();
2298 let mut reader = create_mock_reader(data.clone());
2299
2300 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2301 .await
2302 .unwrap();
2303
2304 assert_eq!(result.lines.len(), 3);
2305 assert_eq!(result.lines[0], b"a");
2306 assert_eq!(result.lines[1], b"b");
2307 assert_eq!(result.lines[2], b"c");
2308 assert_eq!(result.new_position, data.len() as u64);
2309 assert!(result.incomplete_line_buffer.is_empty());
2310 }
2311
2312 #[tokio::test]
2313 async fn test_process_file_content_binary_data() {
2314 let data = vec![0x00, 0x01, 0x02, b'\n', 0xFF, 0xFE, b'\n'];
2315 let mut reader = create_mock_reader(data.clone());
2316
2317 let result = process_file_content(&mut reader, 0, data.len() as u64, Vec::new(), 10)
2318 .await
2319 .unwrap();
2320
2321 assert_eq!(result.lines.len(), 2);
2322 assert_eq!(result.lines[0], vec![0x00, 0x01, 0x02]);
2323 assert_eq!(result.lines[1], vec![0xFF, 0xFE]);
2324 assert_eq!(result.new_position, data.len() as u64);
2325 assert!(result.incomplete_line_buffer.is_empty());
2326 }
2327
2328 #[tokio::test]
2329 async fn test_process_file_content_resume_after_max_buffer_size() {
2330 let data = b"line 1\nline 2\nline 3\n".to_vec();
2332 let mut reader = create_mock_reader(data.clone());
2333 let max_buffer_size = 2; let result1 = process_file_content(
2337 &mut reader,
2338 0, data.len() as u64,
2340 Vec::new(), max_buffer_size,
2342 )
2343 .await
2344 .unwrap();
2345
2346 assert_eq!(result1.lines.len(), 2, "First call should return 2 lines");
2348 assert_eq!(result1.lines[0], b"line 1");
2349 assert_eq!(result1.lines[1], b"line 2");
2350 assert!(result1.incomplete_line_buffer.is_empty());
2351
2352 let expected_position_after_first_call = b"line 1\nline 2\n".len() as u64;
2354 assert_eq!(result1.new_position, expected_position_after_first_call);
2355
2356 let mut reader2 = create_mock_reader(data.clone());
2358 let result2 = process_file_content(
2359 &mut reader2,
2360 result1.new_position, data.len() as u64,
2362 result1.incomplete_line_buffer, max_buffer_size,
2364 )
2365 .await
2366 .unwrap();
2367
2368 assert_eq!(result2.lines.len(), 1, "Second call should return 1 line");
2370 assert_eq!(result2.lines[0], b"line 3");
2371 assert!(result2.incomplete_line_buffer.is_empty());
2372 assert_eq!(result2.new_position, data.len() as u64);
2373 }
2374
2375 #[tokio::test]
2376 async fn test_utf8_truncation() {
2377 hyperactor_telemetry::initialize_logging_for_test();
2381
2382 let mut long_line = "x".repeat(MAX_LINE_SIZE - 1);
2384 long_line.push('🦀'); long_line.push('\n');
2386
2387 let (mut writer, reader) = tokio::io::duplex(8192);
2389
2390 let test_proc_id = test_proc_id("testproc_0");
2392 let monitor = StreamFwder::start_with_writer(
2393 reader,
2394 Box::new(tokio::io::sink()), None, OutputTarget::Stdout,
2397 1, None, &test_proc_id,
2400 None, );
2402
2403 writer.write_all(long_line.as_bytes()).await.unwrap();
2405 drop(writer); let (_lines, result) = monitor.abort().await;
2409 result.expect("Should complete without panic despite UTF-8 truncation");
2410 }
2411}