Stream Forwarders (bootstrap/host side)#
The bootstrap side is responsible for capturing each child’s stdout/stderr, teeing them locally, and shipping structured log lines to the child’s in-band LogForwardActor over a hyperactor channel.
What they are#
StreamFwder is a lightweight controller owned by the Host’s BootstrapProcManager. Each controller spawns a background tee task that does the actual I/O. The struct itself only holds control/diagnostic handles.
pub struct StreamFwder {
/// Join handle for the spawned tee task (child stdout/stderr reader).
teer: JoinHandle<Result<(), io::Error>>,
/// Circular buffer for recent lines (peek/diagnostics).
recent_lines_buf: RotatingLineBuffer,
/// Shutdown signal observed by the tee task.
stop: Arc<Notify>,
}
The background task owns the OS pipe reader and does the heavy lifting:
reads child stdout/stderr,
tees to bootstrap’s own stdout/stderr,
optionally appends to per-child files via
FileAppender,batches lines and uses
LocalLogSender(wrappingTx<LogMessage>) to sendLogMessage::Log,on EOF it calls
flush()once (postsFlush(None)) as a final nudge (periodic heartbeats are generated by the child’sLogForwardActor).
Components & wiring (at a glance)#
Host<M=BootstrapProcManager>
└─ BootstrapProcManager
└─ BootstrapProcHandle { proc_id, pipes, ... }
├─ StreamFwder(stdout) ──(spawned task → tee, FileAppender?, LocalLogSender)
└─ StreamFwder(stderr) ──(spawned task → tee, FileAppender?, LocalLogSender)
BootstrapProcHandle is the per-child record that owns the two StreamFwder controllers. Each StreamFwder spawns exactly one background “tee” task and keeps its own control/diagnostic handles (e.g., recent_lines_buf, stop).
Wiring & message flow#
[host / bootstrap proc] [child / remote proc]
StreamFwder ── LocalLogSender ──► (unix) ──► LogForwardActor (serve)
(stdout / stderr) ^ ^
| | └─ dials same channel for in-band Flush
└─ FileAppender (optional) |
BOOTSTRAP_LOG_CHANNEL (env)
BootstrapProcManager::spawnchooses a per-child Unix channel address and exports it to the child viaBOOTSTRAP_LOG_CHANNEL.The child’s
LogForwardActorserves that address; the parent’sLocalLogSenderdials it.The forwarder also self-dials the same address to post
Flushcontrol markers in-band, guaranteeing ordering w.r.t. data.If
BOOTSTRAP_LOG_CHANNELis absent (tests/local), the forwarder falls back toChannelAddr::any(ChannelTransport::Unix); in that mode no parent is connected and streaming is effectively disabled.
Ordering guarantees#
Per stream (stdout or stderr) and per child only.
No cross-stream order guarantee (stdout vs stderr may interleave).
Responsibilities (StreamFwder, bootstrap side)#
Read bytes from each child’s stdout/stderr pipes reliably.
Split into lines (preserve non-UTF8 as raw bytes; normalize \r\n).
Tee:
bootstrap’s own stdout/stderr (for local visibility),
optional
FileAppender(durable local files, rotation),and
LocalLogSender→LogMessage::Log { … payload: Vec<Vec<u8>> }.On EOF, call
flush()(postsFlush(None)) once (periodicFlush(None)heartbeats are issued by the child LogForwardActor.).Backpressure: apply bounded batching; never unboundedly buffer.
Read & batch algorithm (outline)#
tokio::io::BufReaderaround each pipe fd.read_until(b'\n')(or manual chunking) → line frames (strip\n, normalize\r\n).Batch up to
MAX_LINESorMAX_BYTESorMAX_DEADLINE(whichever first).Emit one
LogMessage::Logwith payload =Vec<Vec<u8>>(lines).(Periodic
Flush(None)heartbeats are issued by the child’sLogForwardActor, not here.)
Where LocalLogSender comes from (bootstrap side)#
BootstrapProcManager::spawnchooses a per-child Unix channel address and exports it to the child viaBOOTSTRAP_LOG_CHANNEL. The child’sLogForwardActorserves that address (see itsnew()), and the bootstrap dials it locally.When wiring a child’s streams, the bootstrap builds a
LocalLogSenderand passes it into the spawnedteetask fromStreamFwder::start_with_writer. The sender and all I/O live inside the spawned task; theStreamFwdercontroller only keeps control/diagnostic handles (teer,recent_lines_buf,stop).
// logging.rs :: StreamFwder::start_with_writer(..) — abridged
let log_sender = match LocalLogSender::new(log_channel, pid) {
Ok(s) => Some(Box::new(s) as Box<dyn LogSender + Send>),
Err(e) => {
tracing::error!("failed to create log sender: {}", e);
None
}
};
let teer_stop = stop.clone();
let recent_line_buf_clone = recent_lines_buf.clone();
let teer = tokio::spawn(async move {
tee(
reader, // child's pipe (stdout / stderr)
std_writer, // bootstrap’s own stdout/stderr writer
log_sender, // in-band sender to child LogForwardActor
file_monitor_addr, // optional FileAppender’s channel
target, // OutputTarget::{Stdout, Stderr}
prefix, // optional rank prefix
teer_stop, // shutdown signal
recent_line_buf_clone,
).await
});
StreamFwder { teer, recent_lines_buf, stop }
What LocalLogSender actually does#
LocalLogSender::new(log_channel, pid)dials the child’s served channel (channel::dial::<LogMessage>), holds aChannelTx<LogMessage>and aReceiver<TxStatus>for liveness.send(..)checksTxStatus::Activeand usestx.post(..)(non-blocking) to emit:
LogMessage::Log { hostname, pid, output_target, payload: wirevalue::Any::serialize(&Vec<Vec<u8>>) }
flush()also checks status and usestx.post(..)to emit:
LogMessage::Flush { sync_version: None }
(non-blocking “liveness” ping; no await).
What the tee task forwards (tee(..))#
Reads bytes from the child pipe, writes through to the bootstrap’s std writer, frames on
\n, appliesMAX_LINE_SIZEtruncation and optional rank prefix, then:sends batched lines to
LocalLogSender::send(target, Vec<Vec<u8>>)(if present),posts the same lines to
FileAppendervia its channel (if present),maintains a rotating in-memory window via
recent_lines_buf.try_add_data(..).
On EOF, it flushes the std writer and calls
log_sender.flush()once to push aFlush(None).
Note on heartbeats & ordering: periodic
Flush(None)heartbeats are driven byLogForwardActor(see itsinit()and the rescheduling inforward()), not by the stream forwarder. The stream forwarder only issues aflush()at EOF. In-band ordering is preserved because bothLogandFlushtravel on the same channel that the child’sLogForwardActorserves.
Source-level view: tee()#
// logging.rs — abridged, annotated
async fn tee(
mut reader: impl AsyncRead + Unpin + Send + 'static, // child's pipe (stdout/stderr)
mut std_writer: Box<dyn io::AsyncWrite + Send + Unpin>, // bootstrap's own stdout/stderr
mut log_sender: Option<Box<dyn LogSender + Send>>, // in-band sender to child forwarder
file_monitor_addr: Option<ChannelAddr>, // optional FileAppender channel
target: OutputTarget, // Stdout | Stderr
prefix: Option<String>, // optional rank prefix
stop: Arc<Notify>, // shutdown signal
recent_lines_buf: RotatingLineBuffer, // rolling diagnostics buffer
) -> Result<(), io::Error> {
let mut buf = [0u8; 8192];
let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE);
// If a file-aggregator is active, dial its channel once.
let mut file_monitor_tx: Option<ChannelTx<FileMonitorMessage>> =
file_monitor_addr.and_then(|addr| channel::dial(addr).ok());
loop {
tokio::select! {
// --- main IO path: read child pipe in chunks ---
read_result = reader.read(&mut buf) => {
let n = match read_result {
Ok(n) => n,
Err(e) => { tracing::debug!("read error in tee: {e}"); return Err(e); }
};
if n == 0 {
tracing::debug!("EOF reached in tee");
break; // -> flush and drain partials below
}
// Always tee raw bytes to the bootstrap's local stdout/stderr for immediacy.
if let Err(e) = std_writer.write_all(&buf[..n]).await {
tracing::warn!("error writing to std: {e}");
}
// Frame by '\n' -> completed UTF-8 lines (lossy), truncate, prefix if configured.
let mut completed_lines = Vec::new();
for &byte in &buf[..n] {
if byte == b'\n' {
let mut line = String::from_utf8_lossy(&line_buffer).to_string();
if line.len() > MAX_LINE_SIZE {
line.truncate(MAX_LINE_SIZE);
line.push_str("... [TRUNCATED]");
}
let final_line = match &prefix { Some(p) => format!("[{p}] {line}"), None => line };
completed_lines.push(final_line);
line_buffer.clear();
} else {
line_buffer.push(byte);
}
}
// Forward completed lines to the in-band log channel and file aggregator (if any).
if !completed_lines.is_empty() {
if let Some(sender) = log_sender.as_mut() {
let bytes: Vec<Vec<u8>> = completed_lines.iter()
.map(|s| s.as_bytes().to_vec())
.collect();
if let Err(e) = sender.send(target, bytes) {
tracing::warn!("error sending to log_sender: {e}");
}
}
if let Some(tx) = file_monitor_tx.as_mut() {
tx.post(FileMonitorMessage { lines: completed_lines });
}
}
// Keep a small rolling window for debugging/peek().
recent_lines_buf.try_add_data(&buf, n);
}
// --- cooperative shutdown ---
_ = stop.notified() => {
tracing::debug!("stop signal received in tee");
break;
}
}
}
// Ensure local std writer is flushed.
std_writer.flush().await?;
// If there's a final partial line, forward it through both paths.
if !line_buffer.is_empty() {
let mut line = String::from_utf8_lossy(&line_buffer).to_string();
if line.len() > MAX_LINE_SIZE {
line.truncate(MAX_LINE_SIZE);
line.push_str("... [TRUNCATED]");
}
let final_line = match &prefix { Some(p) => format!("[{p}] {line}"), None => line };
if let Some(sender) = log_sender.as_mut() {
let _ = sender.send(target, vec![final_line.as_bytes().to_vec()]);
}
if let Some(tx) = file_monitor_tx.as_mut() {
tx.post(FileMonitorMessage { lines: vec![final_line] });
}
}
// Nudge the in-band channel once on exit to avoid downstream starvation.
if let Some(sender) = log_sender.as_mut() {
let _ = sender.flush(); // posts LogMessage::Flush { sync_version: None }
}
Ok(())
}