# Stream Forwarders (bootstrap/host side) The bootstrap side is responsible for capturing each child's stdout/stderr, teeing them locally, and shipping structured log lines to the child's in-band `LogForwardActor` over a hyperactor channel. ## What they are `StreamFwder` is a lightweight controller owned by the `Host`'s `BootstrapProcManager`. Each controller spawns a background tee task that does the actual I/O. The struct itself only holds control/diagnostic handles. ```rust pub struct StreamFwder { /// Join handle for the spawned tee task (child stdout/stderr reader). teer: JoinHandle>, /// Circular buffer for recent lines (peek/diagnostics). recent_lines_buf: RotatingLineBuffer, /// Shutdown signal observed by the tee task. stop: Arc, } ``` The background task owns the OS pipe reader and does the heavy lifting: - reads child stdout/stderr, - tees to bootstrap's own stdout/stderr, - optionally appends to per-child files via `FileAppender`, - batches lines and uses `LocalLogSender` (wrapping `Tx`) to send `LogMessage::Log`, - on EOF it calls `flush()` once (posts `Flush(None)`) as a final nudge (periodic heartbeats are generated by the child's `LogForwardActor`). ## Components & wiring (at a glance) ``` Host └─ BootstrapProcManager └─ BootstrapProcHandle { proc_id, pipes, ... } ├─ StreamFwder(stdout) ──(spawned task → tee, FileAppender?, LocalLogSender) └─ StreamFwder(stderr) ──(spawned task → tee, FileAppender?, LocalLogSender) ``` `BootstrapProcHandle` is the per-child record that owns the two `StreamFwder` controllers. Each `StreamFwder` spawns exactly one background "tee" task and keeps its own control/diagnostic handles (e.g., `recent_lines_buf`, `stop`). ## Wiring & message flow ``` [host / bootstrap proc] [child / remote proc] StreamFwder ── LocalLogSender ──► (unix) ──► LogForwardActor (serve) (stdout / stderr) ^ ^ | | └─ dials same channel for in-band Flush └─ FileAppender (optional) | BOOTSTRAP_LOG_CHANNEL (env) ``` - `BootstrapProcManager::spawn` chooses a **per-child** Unix channel address and exports it to the child via `BOOTSTRAP_LOG_CHANNEL`. - The child's `LogForwardActor` serves that address; the parent's `LocalLogSender` dials it. - The forwarder also self-dials the same address to post `Flush` control markers **in-band**, guaranteeing ordering w.r.t. data. - If `BOOTSTRAP_LOG_CHANNEL` is absent (tests/local), the forwarder falls back to `ChannelAddr::any(ChannelTransport::Unix)`; in that mode no parent is connected and streaming is effectively disabled. ### Ordering guarantees - Per stream (stdout or stderr) and per child only. - No cross-stream order guarantee (stdout vs stderr may interleave). ## Responsibilities (`StreamFwder`, bootstrap side) - Read bytes from each child's stdout/stderr pipes reliably. - Split into lines (preserve non-UTF8 as raw bytes; normalize \r\n). - Tee: - bootstrap's own stdout/stderr (for local visibility), - optional `FileAppender` (durable local files, rotation), - and `LocalLogSender` → `LogMessage::Log { … payload: Vec> }`. - On EOF, call `flush()` (posts `Flush(None)`) once (periodic `Flush(None)` heartbeats are issued by the child LogForwardActor.). - Backpressure: apply bounded batching; never unboundedly buffer. ## Read & batch algorithm (outline) - `tokio::io::BufReader` around each pipe fd. - `read_until(b'\n')` (or manual chunking) → line frames (strip `\n`, normalize `\r\n`). - Batch up to `MAX_LINES` or `MAX_BYTES` or `MAX_DEADLINE` (whichever first). - Emit one `LogMessage::Log` with payload = `Vec>` (lines). - (Periodic `Flush(None)` heartbeats are issued by the child’s `LogForwardActor`, not here.) ## Where `LocalLogSender` comes from (bootstrap side) - `BootstrapProcManager::spawn` chooses a per-child Unix channel address and exports it to the child via `BOOTSTRAP_LOG_CHANNEL`. The child's `LogForwardActor` serves that address (see its `new()`), and the bootstrap dials it locally. - When wiring a child's streams, the bootstrap builds a `LocalLogSender` and passes it into the spawned `tee` task from `StreamFwder::start_with_writer`. The sender and all I/O live inside the spawned task; the `StreamFwder` controller only keeps control/diagnostic handles (`teer`, `recent_lines_buf`, `stop`). ```rust // logging.rs :: StreamFwder::start_with_writer(..) — abridged let log_sender = match LocalLogSender::new(log_channel, pid) { Ok(s) => Some(Box::new(s) as Box), Err(e) => { tracing::error!("failed to create log sender: {}", e); None } }; let teer_stop = stop.clone(); let recent_line_buf_clone = recent_lines_buf.clone(); let teer = tokio::spawn(async move { tee( reader, // child's pipe (stdout / stderr) std_writer, // bootstrap’s own stdout/stderr writer log_sender, // in-band sender to child LogForwardActor file_monitor_addr, // optional FileAppender’s channel target, // OutputTarget::{Stdout, Stderr} prefix, // optional rank prefix teer_stop, // shutdown signal recent_line_buf_clone, ).await }); StreamFwder { teer, recent_lines_buf, stop } ``` ### What `LocalLogSender` actually does - `LocalLogSender::new(log_channel, pid)` dials the child's served channel (`channel::dial::`), holds a `ChannelTx` and a `Receiver` for liveness. - `send(..)` checks `TxStatus::Active` and uses `tx.post(..)` (non-blocking) to emit: ```rust LogMessage::Log { hostname, pid, output_target, payload: wirevalue::Any::serialize(&Vec>) } ``` - `flush()` also checks status and uses `tx.post(..)` to emit: ```rust LogMessage::Flush { sync_version: None } ``` (non-blocking "liveness" ping; no await). ### What the `tee` task forwards (`tee(..)`) - Reads bytes from the child pipe, writes through to the bootstrap’s std writer, frames on `\n`, applies `MAX_LINE_SIZE` truncation and optional rank prefix, then: - sends batched lines to `LocalLogSender::send(target, Vec>)` (if present), - posts the same lines to `FileAppender` via its channel (if present), - maintains a rotating in-memory window via `recent_lines_buf.try_add_data(..)`. - On EOF, it flushes the std writer and calls `log_sender.flush()` once to push a `Flush(None)`. > Note on heartbeats & ordering: periodic `Flush(None)` heartbeats are driven by `LogForwardActor` (see its `init()` and the rescheduling in `forward()`), not by the stream forwarder. The stream forwarder only issues a `flush()` at EOF. In-band ordering is preserved because both `Log` and `Flush` travel on the same channel that the child's `LogForwardActor` serves. ### Source-level view: `tee()` ```rust // logging.rs — abridged, annotated async fn tee( mut reader: impl AsyncRead + Unpin + Send + 'static, // child's pipe (stdout/stderr) mut std_writer: Box, // bootstrap's own stdout/stderr mut log_sender: Option>, // in-band sender to child forwarder file_monitor_addr: Option, // optional FileAppender channel target: OutputTarget, // Stdout | Stderr prefix: Option, // optional rank prefix stop: Arc, // shutdown signal recent_lines_buf: RotatingLineBuffer, // rolling diagnostics buffer ) -> Result<(), io::Error> { let mut buf = [0u8; 8192]; let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE); // If a file-aggregator is active, dial its channel once. let mut file_monitor_tx: Option> = file_monitor_addr.and_then(|addr| channel::dial(addr).ok()); loop { tokio::select! { // --- main IO path: read child pipe in chunks --- read_result = reader.read(&mut buf) => { let n = match read_result { Ok(n) => n, Err(e) => { tracing::debug!("read error in tee: {e}"); return Err(e); } }; if n == 0 { tracing::debug!("EOF reached in tee"); break; // -> flush and drain partials below } // Always tee raw bytes to the bootstrap's local stdout/stderr for immediacy. if let Err(e) = std_writer.write_all(&buf[..n]).await { tracing::warn!("error writing to std: {e}"); } // Frame by '\n' -> completed UTF-8 lines (lossy), truncate, prefix if configured. let mut completed_lines = Vec::new(); for &byte in &buf[..n] { if byte == b'\n' { let mut line = String::from_utf8_lossy(&line_buffer).to_string(); if line.len() > MAX_LINE_SIZE { line.truncate(MAX_LINE_SIZE); line.push_str("... [TRUNCATED]"); } let final_line = match &prefix { Some(p) => format!("[{p}] {line}"), None => line }; completed_lines.push(final_line); line_buffer.clear(); } else { line_buffer.push(byte); } } // Forward completed lines to the in-band log channel and file aggregator (if any). if !completed_lines.is_empty() { if let Some(sender) = log_sender.as_mut() { let bytes: Vec> = completed_lines.iter() .map(|s| s.as_bytes().to_vec()) .collect(); if let Err(e) = sender.send(target, bytes) { tracing::warn!("error sending to log_sender: {e}"); } } if let Some(tx) = file_monitor_tx.as_mut() { tx.post(FileMonitorMessage { lines: completed_lines }); } } // Keep a small rolling window for debugging/peek(). recent_lines_buf.try_add_data(&buf, n); } // --- cooperative shutdown --- _ = stop.notified() => { tracing::debug!("stop signal received in tee"); break; } } } // Ensure local std writer is flushed. std_writer.flush().await?; // If there's a final partial line, forward it through both paths. if !line_buffer.is_empty() { let mut line = String::from_utf8_lossy(&line_buffer).to_string(); if line.len() > MAX_LINE_SIZE { line.truncate(MAX_LINE_SIZE); line.push_str("... [TRUNCATED]"); } let final_line = match &prefix { Some(p) => format!("[{p}] {line}"), None => line }; if let Some(sender) = log_sender.as_mut() { let _ = sender.send(target, vec![final_line.as_bytes().to_vec()]); } if let Some(tx) = file_monitor_tx.as_mut() { tx.post(FileMonitorMessage { lines: vec![final_line] }); } } // Nudge the in-band channel once on exit to avoid downstream starvation. if let Some(sender) = log_sender.as_mut() { let _ = sender.flush(); // posts LogMessage::Flush { sync_version: None } } Ok(()) } ```