Rate this Page

Stream Forwarders (bootstrap/host side)#

The bootstrap side is responsible for capturing each child’s stdout/stderr, teeing them locally, and shipping structured log lines to the child’s in-band LogForwardActor over a hyperactor channel.

What they are#

StreamFwder is a lightweight controller owned by the Host’s BootstrapProcManager. Each controller spawns a background tee task that does the actual I/O. The struct itself only holds control/diagnostic handles.

pub struct StreamFwder {
    /// Join handle for the spawned tee task (child stdout/stderr reader).
    teer: JoinHandle<Result<(), io::Error>>,

    /// Circular buffer for recent lines (peek/diagnostics).
    recent_lines_buf: RotatingLineBuffer,

    /// Shutdown signal observed by the tee task.
    stop: Arc<Notify>,
}

The background task owns the OS pipe reader and does the heavy lifting:

  • reads child stdout/stderr,

  • tees to bootstrap’s own stdout/stderr,

  • optionally appends to per-child files via FileAppender,

  • batches lines and uses LocalLogSender (wrapping Tx<LogMessage>) to send LogMessage::Log,

  • on EOF it calls flush() once (posts Flush(None)) as a final nudge (periodic heartbeats are generated by the child’s LogForwardActor).

Components & wiring (at a glance)#

Host<M=BootstrapProcManager>
  └─ BootstrapProcManager
     └─ BootstrapProcHandle { proc_id, pipes, ... }
        ├─ StreamFwder(stdout)  ──(spawned task → tee, FileAppender?, LocalLogSender)
        └─ StreamFwder(stderr)  ──(spawned task → tee, FileAppender?, LocalLogSender)

BootstrapProcHandle is the per-child record that owns the two StreamFwder controllers. Each StreamFwder spawns exactly one background “tee” task and keeps its own control/diagnostic handles (e.g., recent_lines_buf, stop).

Wiring & message flow#

[host / bootstrap proc]                      [child / remote proc]
StreamFwder ── LocalLogSender ──►  (unix)  ──► LogForwardActor (serve)
(stdout / stderr)                               ^     ^
  |                                             |     └─ dials same channel for in-band Flush
  └─ FileAppender (optional)                    |
                                        BOOTSTRAP_LOG_CHANNEL (env)
  • BootstrapProcManager::spawn chooses a per-child Unix channel address and exports it to the child via BOOTSTRAP_LOG_CHANNEL.

  • The child’s LogForwardActor serves that address; the parent’s LocalLogSender dials it.

  • The forwarder also self-dials the same address to post Flush control markers in-band, guaranteeing ordering w.r.t. data.

  • If BOOTSTRAP_LOG_CHANNEL is absent (tests/local), the forwarder falls back to ChannelAddr::any(ChannelTransport::Unix); in that mode no parent is connected and streaming is effectively disabled.

Ordering guarantees#

  • Per stream (stdout or stderr) and per child only.

  • No cross-stream order guarantee (stdout vs stderr may interleave).

Responsibilities (StreamFwder, bootstrap side)#

  • Read bytes from each child’s stdout/stderr pipes reliably.

  • Split into lines (preserve non-UTF8 as raw bytes; normalize \r\n).

  • Tee:

    • bootstrap’s own stdout/stderr (for local visibility),

    • optional FileAppender (durable local files, rotation),

    • and LocalLogSenderLogMessage::Log { payload: Vec<Vec<u8>> }.

    • On EOF, call flush() (posts Flush(None)) once (periodic Flush(None) heartbeats are issued by the child LogForwardActor.).

    • Backpressure: apply bounded batching; never unboundedly buffer.

Read & batch algorithm (outline)#

  • tokio::io::BufReader around each pipe fd.

  • read_until(b'\n') (or manual chunking) → line frames (strip \n, normalize \r\n).

  • Batch up to MAX_LINES or MAX_BYTES or MAX_DEADLINE (whichever first).

    • Emit one LogMessage::Log with payload = Vec<Vec<u8>> (lines).

    • (Periodic Flush(None) heartbeats are issued by the child’s LogForwardActor, not here.)

Where LocalLogSender comes from (bootstrap side)#

  • BootstrapProcManager::spawn chooses a per-child Unix channel address and exports it to the child via BOOTSTRAP_LOG_CHANNEL. The child’s LogForwardActor serves that address (see its new()), and the bootstrap dials it locally.

  • When wiring a child’s streams, the bootstrap builds a LocalLogSender and passes it into the spawned tee task from StreamFwder::start_with_writer. The sender and all I/O live inside the spawned task; the StreamFwder controller only keeps control/diagnostic handles (teer, recent_lines_buf, stop).

// logging.rs :: StreamFwder::start_with_writer(..) — abridged

let log_sender = match LocalLogSender::new(log_channel, pid) {
    Ok(s)  => Some(Box::new(s) as Box<dyn LogSender + Send>),
    Err(e) => {
        tracing::error!("failed to create log sender: {}", e);
        None
    }
};

let teer_stop = stop.clone();
let recent_line_buf_clone = recent_lines_buf.clone();

let teer = tokio::spawn(async move {
    tee(
        reader,              // child's pipe (stdout / stderr)
        std_writer,          // bootstrap’s own stdout/stderr writer
        log_sender,          // in-band sender to child LogForwardActor
        file_monitor_addr,   // optional FileAppender’s channel
        target,              // OutputTarget::{Stdout, Stderr}
        prefix,              // optional rank prefix
        teer_stop,           // shutdown signal
        recent_line_buf_clone,
    ).await
});

StreamFwder { teer, recent_lines_buf, stop }

What LocalLogSender actually does#

  • LocalLogSender::new(log_channel, pid) dials the child’s served channel (channel::dial::<LogMessage>), holds a ChannelTx<LogMessage> and a Receiver<TxStatus> for liveness.

  • send(..) checks TxStatus::Active and uses tx.post(..) (non-blocking) to emit:

LogMessage::Log { hostname, pid, output_target, payload: wirevalue::Any::serialize(&Vec<Vec<u8>>) }
  • flush() also checks status and uses tx.post(..) to emit:

LogMessage::Flush { sync_version: None }

(non-blocking “liveness” ping; no await).

What the tee task forwards (tee(..))#

  • Reads bytes from the child pipe, writes through to the bootstrap’s std writer, frames on \n, applies MAX_LINE_SIZE truncation and optional rank prefix, then:

    • sends batched lines to LocalLogSender::send(target, Vec<Vec<u8>>) (if present),

    • posts the same lines to FileAppender via its channel (if present),

    • maintains a rotating in-memory window via recent_lines_buf.try_add_data(..).

  • On EOF, it flushes the std writer and calls log_sender.flush() once to push a Flush(None).

Note on heartbeats & ordering: periodic Flush(None) heartbeats are driven by LogForwardActor (see its init() and the rescheduling in forward()), not by the stream forwarder. The stream forwarder only issues a flush() at EOF. In-band ordering is preserved because both Log and Flush travel on the same channel that the child’s LogForwardActor serves.

Source-level view: tee()#

// logging.rs — abridged, annotated
async fn tee(
    mut reader: impl AsyncRead + Unpin + Send + 'static,        // child's pipe (stdout/stderr)
    mut std_writer: Box<dyn io::AsyncWrite + Send + Unpin>,     // bootstrap's own stdout/stderr
    mut log_sender: Option<Box<dyn LogSender + Send>>,          // in-band sender to child forwarder
    file_monitor_addr: Option<ChannelAddr>,                     // optional FileAppender channel
    target: OutputTarget,                                       // Stdout | Stderr
    prefix: Option<String>,                                     // optional rank prefix
    stop: Arc<Notify>,                                          // shutdown signal
    recent_lines_buf: RotatingLineBuffer,                       // rolling diagnostics buffer
) -> Result<(), io::Error> {
    let mut buf = [0u8; 8192];
    let mut line_buffer = Vec::with_capacity(MAX_LINE_SIZE);

    // If a file-aggregator is active, dial its channel once.
    let mut file_monitor_tx: Option<ChannelTx<FileMonitorMessage>> =
        file_monitor_addr.and_then(|addr| channel::dial(addr).ok());

    loop {
        tokio::select! {
            // --- main IO path: read child pipe in chunks ---
            read_result = reader.read(&mut buf) => {
                let n = match read_result {
                    Ok(n) => n,
                    Err(e) => { tracing::debug!("read error in tee: {e}"); return Err(e); }
                };
                if n == 0 {
                    tracing::debug!("EOF reached in tee");
                    break; // -> flush and drain partials below
                }

                // Always tee raw bytes to the bootstrap's local stdout/stderr for immediacy.
                if let Err(e) = std_writer.write_all(&buf[..n]).await {
                    tracing::warn!("error writing to std: {e}");
                }

                // Frame by '\n' -> completed UTF-8 lines (lossy), truncate, prefix if configured.
                let mut completed_lines = Vec::new();
                for &byte in &buf[..n] {
                    if byte == b'\n' {
                        let mut line = String::from_utf8_lossy(&line_buffer).to_string();
                        if line.len() > MAX_LINE_SIZE {
                            line.truncate(MAX_LINE_SIZE);
                            line.push_str("... [TRUNCATED]");
                        }
                        let final_line = match &prefix { Some(p) => format!("[{p}] {line}"), None => line };
                        completed_lines.push(final_line);
                        line_buffer.clear();
                    } else {
                        line_buffer.push(byte);
                    }
                }

                // Forward completed lines to the in-band log channel and file aggregator (if any).
                if !completed_lines.is_empty() {
                    if let Some(sender) = log_sender.as_mut() {
                        let bytes: Vec<Vec<u8>> = completed_lines.iter()
                            .map(|s| s.as_bytes().to_vec())
                            .collect();
                        if let Err(e) = sender.send(target, bytes) {
                            tracing::warn!("error sending to log_sender: {e}");
                        }
                    }
                    if let Some(tx) = file_monitor_tx.as_mut() {
                        tx.post(FileMonitorMessage { lines: completed_lines });
                    }
                }

                // Keep a small rolling window for debugging/peek().
                recent_lines_buf.try_add_data(&buf, n);
            }

            // --- cooperative shutdown ---
            _ = stop.notified() => {
                tracing::debug!("stop signal received in tee");
                break;
            }
        }
    }

    // Ensure local std writer is flushed.
    std_writer.flush().await?;

    // If there's a final partial line, forward it through both paths.
    if !line_buffer.is_empty() {
        let mut line = String::from_utf8_lossy(&line_buffer).to_string();
        if line.len() > MAX_LINE_SIZE {
            line.truncate(MAX_LINE_SIZE);
            line.push_str("... [TRUNCATED]");
        }
        let final_line = match &prefix { Some(p) => format!("[{p}] {line}"), None => line };
        if let Some(sender) = log_sender.as_mut() {
            let _ = sender.send(target, vec![final_line.as_bytes().to_vec()]);
        }
        if let Some(tx) = file_monitor_tx.as_mut() {
            tx.post(FileMonitorMessage { lines: vec![final_line] });
        }
    }

    // Nudge the in-band channel once on exit to avoid downstream starvation.
    if let Some(sender) = log_sender.as_mut() {
        let _ = sender.flush(); // posts LogMessage::Flush { sync_version: None }
    }

    Ok(())
}