Client Actor#
What it does#
Aggregates or prints log lines from all remote procs (stdout/stderr), optionally buckets similar lines within a time window, and participates in a versioned sync-flush barrier so the caller can deterministically wait until “all logs up to here” are delivered.
Placement: There is exactly one
LogClientActor, and it runs inside the Python/driver process (e.g., your notebook/REPL). All per-procLogForwardActors send to this single client.
Message types#
Data (from forwarders)
LogMessage::Log { hostname, pid, output_target, payload }— payload iswirevalue::AnyofVec<Vec<u8>>lines (orStringfallback).LogMessage::Flush { sync_version: Option<u64> }—None= heartbeat;Some(v)= barrier marker.
Control (to client actor)
LogClientMessage::SetAggregate { aggregate_window_sec: Option<u64> }LogClientMessage::StartSyncFlush { expected_procs, reply: OncePortRef<()> , version: OncePortRef<u64> }
Lifecycle & fields#
Fields
aggregate_window_sec: Option<u64>(defaultSome(5))aggregators: HashMap<OutputTarget, Aggregator>last_flush_time: SystemTimenext_flush_deadline: Option<SystemTime>current_flush_version: u64current_flush_port: Option<OncePortRef<()>>current_unflushed_procs: usize
Lifecycle
new()initializes aggregators and timers.dropprints any buffered aggregates (final flush).
Data path: LogMessage::Log#
Deserialize
payloadtoVec<Vec<String>>(lines).If
aggregate_window_sec == None: print each line immediately with[hostname pid]prefix.Else:
Aggregator::add_lineper line; set/adjustnext_flush_deadline; on deadline, flush aggregates to stdout/stderr.
Aggregation: Aggregator + LogLine#
AggregatorholdsVec<LogLine>and astart_time.Similarity via normalized Levenshtein; default threshold ≈
0.15.add_linemerges into the closest group under threshold; else starts a new group.Displayprints a time-window header and grouped counts:"[N similar log lines] …".
Barrier protocol: sync flush#
Caller requests flush →
StartSyncFlushallocatesversion = ++current_flush_version, recordsexpected_procs, returnsversion.Python tells each forwarder
ForceSyncFlush { version }(they injectFlush(Some(version))in-band).Client actor receives
Flush(Some(v))from each forwarder, decrementscurrent_unflushed_procs.When it reaches zero: print pending aggregates, send
reply.
Teardown & failures#
drop: prints remaining aggregates.Mismatched versions are logged and ignored (stale
Flush(Some(v))).If a sync
Flush(Some(v))arrives with no active barrier, error is raised.Heartbeats (
Flush(None)) are benign; they just help liveness.
Quick reference (defaults & knobs)#
Placement: single
LogClientActorin the Python/driver process; oneLogForwardActorper remote proc.Aggregation window: 5s by default (
DEFAULT_AGGREGATE_WINDOW_SEC).Similarity threshold: 0.15 (merge if normalized edit distance < 0.15).
Line truncation: 4 KiB per line (
MAX_LINE_SIZE) with"… [TRUNCATED]"suffix.Printing prefix:
"[{hostname} {pid}] "before each line (non-aggregated path).Flush barrier: versioned; completes when all forwarders ack
Flush(Some(v)).
Data path: LogMessage::Log (source excerpt)#
Where it runs: a single
LogClientActorin the Python/driver process receives log data from all remote procs. Upstream: per-procLogForwardActor(in each remote proc) forwardsLogMessage::Logif streaming is enabled.
Type legend:
LogClientActor— client-side coordinator (single instance)wirevalue::Any— opaque bytes; expected to beVec<Vec<u8>>(lines) orStringOutputTarget—Stdout | Stderr
1) Deserializing payload into lines#
File: logging.rs
Item: free function deserialize_message_lines(...)
fn deserialize_message_lines(
serialized_message: &wirevalue::Any,
) -> anyhow::Result<Vec<Vec<String>>> {
// Prefer Vec<Vec<u8>> → Vec<Vec<String>>
if let Ok(message_bytes) = serialized_message.deserialized::<Vec<Vec<u8>>>() {
let mut result = Vec::new();
for bytes in message_bytes {
let message_str = String::from_utf8(bytes)?;
let lines: Vec<String> = message_str.lines().map(|s| s.to_string()).collect();
result.push(lines);
}
return Ok(result);
}
// Fallback: plain String → wrap
if let Ok(message) = serialized_message.deserialized::<String>() {
let lines: Vec<String> = message.lines().map(|s| s.to_string()).collect();
return Ok(vec![lines]);
}
anyhow::bail!("failed to deserialize message as either Vec<Vec<u8>> or String")
}
2) Immediate print helper (non-aggregated path)#
File: logging.rs
Type/impl: impl LogClientActor
Item: print_log_line(...)
fn print_log_line(hostname: &str, pid: u32, output_target: OutputTarget, line: String) {
let message = format!("[{} {}] {}", hostname, pid, line);
#[cfg(test)] crate::logging::test_tap::push(&message);
match output_target {
OutputTarget::Stdout => println!("{}", message),
OutputTarget::Stderr => eprintln!("{}", message),
}
}
3) The handler: aggregate or print, and schedule flush if needed#
File: logging.rs
Type/impl: impl LogMessageHandler for LogClientActor
Handler: log(...) (abridged)
async fn log(
&mut self,
cx: &Context<Self>,
hostname: String,
pid: u32,
output_target: OutputTarget,
payload: wirevalue::Any,
) -> Result<(), anyhow::Error> {
let message_line_groups = deserialize_message_lines(&payload)?; // Vec<Vec<String>>
let hostname = hostname.as_str();
let message_lines: Vec<String> = message_line_groups.into_iter().flatten().collect();
match self.aggregate_window_sec {
// --- no aggregation: print immediately ---
None => {
for line in message_lines {
Self::print_log_line(hostname, pid, output_target, line);
}
self.last_flush_time = RealClock.system_time_now();
}
// --- aggregate within a time window, then flush ---
Some(window) => {
for line in message_lines {
if let Some(agg) = self.aggregators.get_mut(&output_target) {
// levenshtein-based bucketing, may start a new group
if let Err(e) = agg.add_line(&line) {
tracing::error!("error adding log line: {}", e);
// fail-open: print line immediately
Self::print_log_line(hostname, pid, output_target, line);
}
} else {
tracing::error!("unknown output target: {:?}", output_target);
Self::print_log_line(hostname, pid, output_target, line);
}
}
// compute (or tighten) the next flush deadline and self-schedule
let new_deadline = self.last_flush_time + Duration::from_secs(window);
let now = RealClock.system_time_now();
if new_deadline <= now {
self.flush_internal(); // prints and resets aggregators
} else {
let delay = new_deadline.duration_since(now)?;
match self.next_flush_deadline {
None => {
self.next_flush_deadline = Some(new_deadline);
cx.self_message_with_delay(LogMessage::Flush { sync_version: None }, delay)?;
}
Some(deadline) if new_deadline < deadline => {
self.next_flush_deadline = Some(new_deadline);
cx.self_message_with_delay(LogMessage::Flush { sync_version: None }, delay)?;
}
_ => {}
}
}
}
}
Ok(())
}
Notes#
Line truncation (4 KiB + “… [TRUNCATED]”) happens earlier in the bootstrap’s
tee(...); the client assumes safe UTF-8 afterdeserialize_message_lines.Aggregation runs per target (
StdoutvsStderr) with the default 5s window and ~0.15 similarity threshold.
Barrier protocol: sync flush (source excerpts)#
Where it runs: the kickoff happens in Python via
LoggingMeshClient, which triggers a barrier across all per-procLogForwardActors. The singleLogClientActorin the Python process accounts arrivals ofFlush(Some(v))and unblocks when all procs have acknowledged.
Type legend:
ActorHandle<LogClientActor>— handle for the client actor (runs in the Python/driver proc)ActorMeshRef<LogForwardActor>— reference to per-proc forwarders (run in remote procs)OncePortRef<T>— one-shot reply port for sync responses
1) Kickoff from Python: LoggingMeshClient::flush_internal#
File: logging.rs
Type/impl: impl LoggingMeshClient
Method: async fn flush_internal(...)
async fn flush_internal(
cx: &impl context::Actor,
client_actor: ActorHandle<LogClientActor>,
forwarder_mesh: ActorMeshRef<LogForwardActor>,
) -> Result<(), anyhow::Error> {
let (reply_tx, reply_rx) = cx.instance().open_once_port::<()>();
let (version_tx, version_rx) = cx.instance().open_once_port::<u64>();
// 1) Ask the client actor to start a barrier and return a version.
client_actor.send(LogClientMessage::StartSyncFlush {
expected_procs: forwarder_mesh.region().num_ranks(),
reply: reply_tx.bind(), // OncePortRef<()>
version: version_tx.bind(), // OncePortRef<u64>
})?;
let version = version_rx.recv().await?;
// 2) Tell every forwarder to inject the barrier marker in-band.
forwarder_mesh.cast(cx, LogForwardMessage::ForceSyncFlush { version })?;
// 3) Block until the client actor acks that all procs reported.
reply_rx.recv().await?;
Ok(())
}
Why in-band? Each LogForwardActor posts Flush(Some(version)) on the same ordered channel as its Log data, so the barrier marker is guaranteed to come after all prior lines.
2) Forwarder injects the barrier marker (in-band)#
File: logging.rs
Type/impl: impl LogForwardMessageHandler for LogForwardActor
Handler: force_sync_flush(...)
async fn force_sync_flush(
&mut self,
_cx: &Context<Self>,
version: u64,
) -> Result<(), anyhow::Error> {
// Post the barrier marker into the same ordered stream as data.
self.flush_tx
.lock()
.await
.send(LogMessage::Flush { sync_version: Some(version) })
.await
.map_err(anyhow::Error::from)
}
… and when the forwarder later reads that marker from its rx, it acks the client:
Type/impl: impl LogForwardMessageHandler for LogForwardActor
Handler: forward(...) (excerpt)
match self.rx.recv().await {
Ok(LogMessage::Flush { sync_version: Some(version) }) => {
// All prior logs on this channel are now observed; ack the client.
self.logging_client_ref.flush(ctx, version).await?;
}
// ...
_ => { /* other cases */ }
}
3) Client actor tracks & releases the barrier#
File: logging.rs
Type/impl: impl LogClientMessageHandler for LogClientActor
Handler: start_sync_flush(...)
// logging.rs :: LogClientMessageHandler for LogClientActor — StartSyncFlush
async fn start_sync_flush(
&mut self,
cx: &Context<Self>,
expected_procs_flushed: usize,
reply: OncePortRef<()>,
version: OncePortRef<u64>,
) -> Result<(), anyhow::Error> {
if self.current_unflushed_procs > 0 || self.current_flush_port.is_some() {
tracing::warn!(
"found unfinished ongoing flush: version {}; {} unflushed procs",
self.current_flush_version,
self.current_unflushed_procs,
);
}
self.current_flush_version += 1;
tracing::debug!("start sync flush with version {}", self.current_flush_version);
self.current_flush_port = Some(reply.clone());
self.current_unflushed_procs = expected_procs_flushed;
version
.send(cx, self.current_flush_version)
.map_err(anyhow::Error::from)?;
Ok(())
Type/impl: impl LogMessageHandler for LogClientActor
Handler: flush(...) (counts acks, then releases)
// logging.rs :: LogMessageHandler for LogClientActor — Flush(Some(v)) branch
async fn flush(
&mut self,
cx: &Context<Self>,
sync_version: Option<u64>,
) -> Result<(), anyhow::Error> {
match sync_version {
None => {
self.flush_internal();
}
Some(version) => {
if version != self.current_flush_version {
tracing::error!(
"found mismatched flush versions: got {}, expect {}; this can happen if some previous flush didn't finish fully",
version,
self.current_flush_version
);
return Ok(());
}
if self.current_unflushed_procs == 0 || self.current_flush_port.is_none() {
anyhow::bail!("found no ongoing flush request");
}
self.current_unflushed_procs -= 1;
tracing::debug!(
"ack sync flush: version {}; remaining procs: {}",
self.current_flush_version,
self.current_unflushed_procs
);
if self.current_unflushed_procs == 0 {
self.flush_internal();
let reply = self.current_flush_port.take().unwrap();
self.current_flush_port = None;
reply.send(cx, ()).map_err(anyhow::Error::from)?;
}
}
}
Ok(())
}
Summary:#
Python asks the client actor to start a barrier → gets version v.
Python tells every forwarder to in-band post
Flush(Some(v)).The client actor
decrements current_unflushed_procsas each arrives; when it hits zero, it prints aggregates and replies to Python.Heartbeats
Flush(None)are unrelated to the barrier; they just maintain liveness.