monarch_hyperactor/logging.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(unsafe_op_in_unsafe_fn)]
10
11use std::ops::Deref;
12use std::sync::Arc;
13use std::sync::atomic::AtomicUsize;
14use std::sync::atomic::Ordering;
15
16use anyhow::Result;
17use async_trait::async_trait;
18use hyperactor::Actor;
19use hyperactor::ActorHandle;
20use hyperactor::Bind;
21use hyperactor::Context;
22use hyperactor::HandleClient;
23use hyperactor::Handler;
24use hyperactor::Instance;
25use hyperactor::RefClient;
26use hyperactor::RemoteSpawn;
27use hyperactor::Unbind;
28use hyperactor::context;
29use hyperactor_config::Flattrs;
30use hyperactor_mesh::ActorMesh;
31use hyperactor_mesh::actor_mesh::ActorMeshRef;
32use hyperactor_mesh::bootstrap::MESH_ENABLE_LOG_FORWARDING;
33use hyperactor_mesh::logging::LogClientActor;
34use hyperactor_mesh::logging::LogClientMessage;
35use hyperactor_mesh::logging::LogForwardActor;
36use hyperactor_mesh::logging::LogForwardMessage;
37use monarch_types::SerializablePyErr;
38use ndslice::View;
39use pyo3::Bound;
40use pyo3::prelude::*;
41use pyo3::types::PyModule;
42use pyo3::types::PyString;
43use serde::Deserialize;
44use serde::Serialize;
45use typeuri::Named;
46
47use crate::context::PyInstance;
48use crate::proc::PyActorId;
49use crate::proc_mesh::PyProcMesh;
50use crate::pytokio::PyPythonTask;
51use crate::runtime::monarch_with_gil;
52
53#[derive(
54 Debug,
55 Clone,
56 Serialize,
57 Deserialize,
58 Named,
59 Handler,
60 HandleClient,
61 RefClient,
62 Bind,
63 Unbind
64)]
65pub enum LoggerRuntimeMessage {
66 SetLogging { level: u8 },
67}
68
69/// Simple Rust actor that invokes python logger APIs. It needs a python runtime.
70#[derive(Debug)]
71#[hyperactor::export(spawn = true, handlers = [LoggerRuntimeMessage {cast = true}])]
72pub struct LoggerRuntimeActor {
73 logger: Arc<Py<PyAny>>,
74}
75
76impl LoggerRuntimeActor {
77 fn get_logger(py: Python) -> PyResult<Py<PyAny>> {
78 // Import the Python AutoReloader class
79 let logging_module = py.import("logging")?;
80 let logger = logging_module.call_method0("getLogger")?;
81
82 Ok(logger.into())
83 }
84
85 fn set_logger_level(py: Python, logger: &Py<PyAny>, level: u8) -> PyResult<()> {
86 let logger = logger.bind(py);
87 logger.call_method1("setLevel", (level,))?;
88 Ok(())
89 }
90}
91#[async_trait]
92impl Actor for LoggerRuntimeActor {
93 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
94 this.set_system();
95 Ok(())
96 }
97}
98
99#[async_trait]
100impl RemoteSpawn for LoggerRuntimeActor {
101 type Params = ();
102
103 async fn new(_: (), _environment: Flattrs) -> Result<Self, anyhow::Error> {
104 let logger =
105 monarch_with_gil(|py| Self::get_logger(py).map_err(SerializablePyErr::from_fn(py)))
106 .await?;
107 Ok(Self {
108 logger: Arc::new(logger),
109 })
110 }
111}
112
113#[async_trait]
114#[hyperactor::handle(LoggerRuntimeMessage)]
115impl LoggerRuntimeMessageHandler for LoggerRuntimeActor {
116 async fn set_logging(&mut self, _cx: &Context<Self>, level: u8) -> Result<(), anyhow::Error> {
117 let logger: Arc<_> = self.logger.clone();
118 monarch_with_gil(|py| {
119 Self::set_logger_level(py, logger.as_ref(), level)
120 .map_err(SerializablePyErr::from_fn(py))
121 })
122 .await?;
123 Ok(())
124 }
125}
126
127/// `LoggingMeshClient` is the Python-facing handle for distributed
128/// logging over a `ProcMesh`.
129///
130/// Calling `spawn(...)` builds three pieces of logging infra:
131///
132/// - `client_actor`: a single `LogClientActor` running in the
133/// *local* process. It aggregates forwarded stdout/stderr,
134/// batches it, and coordinates sync flush barriers.
135///
136/// - `forwarder_mesh`: (optional) an `ActorMesh<LogForwardActor>`
137/// with one actor per remote proc. Each `LogForwardActor` sits in
138/// that proc and forwards its stdout/stderr back to the client.
139/// This mesh only exists if `MESH_ENABLE_LOG_FORWARDING` was `true`
140/// at startup; otherwise it's `None` and we never spawn any
141/// forwarders.
142///
143/// - `logger_mesh`: an `ActorMesh<LoggerRuntimeActor>` with one
144/// actor per remote proc. Each `LoggerRuntimeActor` controls that
145/// proc's Python logging runtime (log level, handlers, etc.).
146/// This mesh is always created, even if forwarding is disabled.
147///
148/// The Python object you get back holds references to all of this so
149/// that you can:
150/// - toggle streaming vs "stay quiet" (`set_mode(...)`),
151/// - adjust the per-proc Python log level (`set_mode(...)`),
152/// - force a sync flush of forwarded output and wait for completion
153/// (`flush(...)`).
154///
155/// Drop semantics:
156/// Dropping the Python handle runs `Drop` on this Rust struct,
157/// which drains/stops the local `LogClientActor` but does *not*
158/// synchronously tear down the per-proc meshes. The remote
159/// `LogForwardActor` / `LoggerRuntimeActor` instances keep running
160/// until the remote procs themselves are shut down (e.g. via
161/// `host_mesh.shutdown(...)` in tests).
162#[pyclass(
163 frozen,
164 name = "LoggingMeshClient",
165 module = "monarch._rust_bindings.monarch_hyperactor.logging"
166)]
167pub struct LoggingMeshClient {
168 // Per-proc LogForwardActor mesh (optional). When enabled, each
169 // remote proc forwards its stdout/stderr back to the client. This
170 // actor does not interact with the embedded Python runtime.
171 forwarder_mesh: Option<ActorMesh<LogForwardActor>>,
172
173 // Per-proc LoggerRuntimeActor mesh. One LoggerRuntimeActor runs
174 // on every proc in the mesh and is responsible for driving that
175 // proc's Python logging configuration (log level, handlers,
176 // etc.).
177 //
178 // `set_mode(..)` always broadcasts the requested log level to
179 // this mesh, regardless of whether stdout/stderr forwarding is
180 // enabled.
181 //
182 // Even on a proc that isn't meaningfully running Python code, we
183 // still spawn LoggerRuntimeActor and it will still apply the new
184 // level to that proc's Python logger. In that case, updating the
185 // level may have no visible effect simply because nothing on that
186 // proc ever emits logs through Python's `logging` module.
187 logger_mesh: ActorMesh<LoggerRuntimeActor>,
188
189 // Client-side LogClientActor. Lives in the client process;
190 // receives forwarded output, aggregates and buffers it, and
191 // coordinates sync flush barriers.
192 client_actor: ActorHandle<LogClientActor>,
193}
194
195impl LoggingMeshClient {
196 /// Drive a synchronous "drain all logs now" barrier across the
197 /// mesh.
198 ///
199 /// Protocol:
200 /// 1. Tell the local `LogClientActor` we're starting a sync
201 /// flush. We give it:
202 /// - how many procs we expect to hear from
203 /// (`expected_procs`),
204 /// - a `reply` port it will use to signal completion,
205 /// - a `version` port it will use to hand us a flush version
206 /// token. After this send, the client_actor is now in "sync
207 /// flush vN" mode.
208 ///
209 /// 2. Wait for that version token from the client. This tells
210 /// us which flush epoch we're coordinating
211 /// (`version_rx.recv()`).
212 ///
213 /// 3. Broadcast `ForceSyncFlush { version }` to every
214 /// `LogForwardActor` in the `forwarder_mesh`. Each forwarder
215 /// tells its proc-local logger/forwarding loop: "flush
216 /// everything you have for this version now, then report
217 /// back."
218 ///
219 /// 4. Wait on `reply_rx`. The `LogClientActor` only replies
220 /// once it has:
221 /// - received the per-proc sync points for this version from
222 /// all forwarders,
223 /// - emitted/forwarded their buffered output,
224 /// - and finished flushing its own buffers.
225 ///
226 /// When this returns `Ok(())`, all stdout/stderr that existed at
227 /// the moment we kicked off the flush has been forwarded to the
228 /// client and drained. This is used by
229 /// `LoggingMeshClient.flush()`.
230 async fn flush_internal(
231 cx: &impl context::Actor,
232 client_actor: ActorHandle<LogClientActor>,
233 forwarder_mesh: ActorMeshRef<LogForwardActor>,
234 ) -> Result<(), anyhow::Error> {
235 let (reply_tx, reply_rx) = cx.instance().open_once_port::<()>();
236 let (version_tx, version_rx) = cx.instance().open_once_port::<u64>();
237
238 // First initialize a sync flush.
239 client_actor.send(
240 cx,
241 LogClientMessage::StartSyncFlush {
242 expected_procs: forwarder_mesh.region().num_ranks(),
243 reply: reply_tx.bind(),
244 version: version_tx.bind(),
245 },
246 )?;
247
248 let version = version_rx.recv().await?;
249
250 // Then ask all the flushers to ask the log forwarders to sync
251 // flush
252 forwarder_mesh.cast(cx, LogForwardMessage::ForceSyncFlush { version })?;
253
254 // Finally the forwarder will send sync point back to the
255 // client, flush, and return.
256 reply_rx.recv().await?;
257
258 Ok(())
259 }
260}
261
262#[pymethods]
263impl LoggingMeshClient {
264 /// Initialize logging for a `ProcMesh` and return a
265 /// `LoggingMeshClient`.
266 ///
267 /// This wires up three pieces of logging infrastructure:
268 ///
269 /// 1. A single `LogClientActor` in the *client* process. This
270 /// actor receives forwarded stdout/stderr, buffers and
271 /// aggregates it, and coordinates sync flush barriers.
272 ///
273 /// 2. (Optional) A `LogForwardActor` on every remote proc in the
274 /// mesh. These forwarders read that proc's stdout/stderr and
275 /// stream it back to the client. We only spawn this mesh if
276 /// `MESH_ENABLE_LOG_FORWARDING` was `true` in the config. If
277 /// forwarding is disabled at startup, we do not spawn these
278 /// actors and `forwarder_mesh` will be `None`.
279 ///
280 /// 3. A `LoggerRuntimeActor` on every remote proc in the mesh.
281 /// This actor controls the Python logging runtime (log level,
282 /// handlers, etc.) in that process. This is always spawned,
283 /// even if log forwarding is disabled.
284 ///
285 /// The returned `LoggingMeshClient` holds handles to those
286 /// actors. Later, `set_mode(...)` can adjust per-proc log level
287 /// and (if forwarding was enabled) toggle whether remote output
288 /// is actually streamed back to the client. If forwarding was
289 /// disabled by config, requests to enable streaming will fail.
290 #[staticmethod]
291 fn spawn(instance: &PyInstance, proc_mesh: &PyProcMesh) -> PyResult<PyPythonTask> {
292 let proc_mesh = proc_mesh.mesh_ref()?;
293 let instance = instance.clone();
294
295 PyPythonTask::new(async move {
296 // 1. Spawn the client-side coordinator actor (lives in
297 // the caller's process).
298 static LOG_CLIENT_COUNTER: AtomicUsize = AtomicUsize::new(0);
299 let id = LOG_CLIENT_COUNTER.fetch_add(1, Ordering::Relaxed);
300 let name = if id == 0 {
301 "log_client".to_string()
302 } else {
303 format!("log_client_{}", id)
304 };
305 let client_actor: ActorHandle<LogClientActor> =
306 instance.proc().spawn(&name, LogClientActor::default())?;
307 let client_actor_ref = client_actor.bind();
308
309 // Read config to decide if we stand up per-proc
310 // stdout/stderr forwarding.
311 let forwarding_enabled = hyperactor_config::global::get(MESH_ENABLE_LOG_FORWARDING);
312
313 // 2. Optionally spawn per-proc `LogForwardActor` mesh
314 // (stdout/stderr forwarders).
315 let forwarder_mesh = if forwarding_enabled {
316 // Spawn a `LogFwdActor` on every proc.
317 let mesh = proc_mesh
318 .spawn(instance.deref(), "log_forwarder", &client_actor_ref)
319 .await
320 .map_err(anyhow::Error::from)?;
321
322 Some(mesh)
323 } else {
324 None
325 };
326
327 // 3. Always spawn a `LoggerRuntimeActor` on every proc.
328 let logger_mesh = proc_mesh
329 .spawn(instance.deref(), "logger", &())
330 .await
331 .map_err(anyhow::Error::from)?;
332
333 Ok(Self {
334 forwarder_mesh,
335 logger_mesh,
336 client_actor,
337 })
338 })
339 }
340
341 /// Update logging behavior for this mesh.
342 ///
343 /// `stream_to_client` controls whether remote procs actively
344 /// stream their stdout/stderr back to the client process.
345 ///
346 /// - If log forwarding was enabled at startup, `forwarder_mesh`
347 /// is `Some` and we propagate this flag to every per-proc
348 /// `LogForwardActor`.
349 /// - If log forwarding was disabled at startup, `forwarder_mesh`
350 /// is `None`.
351 /// In that case:
352 /// * requesting `stream_to_client = false` is a no-op
353 /// (accepted),
354 /// * requesting `stream_to_client = true` is rejected,
355 /// because we did not spawn forwarders and we don't
356 /// dynamically create them later.
357 ///
358 /// `aggregate_window_sec` configures how the client-side
359 /// `LogClientActor` batches forwarded output. It is only
360 /// meaningful when streaming is enabled. Calling this with
361 /// `Some(..)` while `stream_to_client == false` is invalid and
362 /// returns an error.
363 ///
364 /// `level` is the desired Python logging level. We always
365 /// broadcast this to the per-proc `LoggerRuntimeActor` mesh so
366 /// each remote process can update its own Python logger
367 /// configuration, regardless of whether stdout/stderr forwarding
368 /// is active.
369 fn set_mode(
370 &self,
371 instance: &PyInstance,
372 stream_to_client: bool,
373 aggregate_window_sec: Option<u64>,
374 level: u8,
375 ) -> PyResult<()> {
376 // We can't ask for an aggregation window if we're not
377 // streaming.
378 if aggregate_window_sec.is_some() && !stream_to_client {
379 return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
380 "cannot set aggregate window without streaming to client".to_string(),
381 ));
382 }
383
384 // Handle the forwarder side (stdout/stderr streaming back to
385 // client).
386 match (&self.forwarder_mesh, stream_to_client) {
387 // Forwarders exist (config enabled at startup). We can
388 // toggle live.
389 (Some(fwd_mesh), _) => {
390 fwd_mesh
391 .cast(
392 instance.deref(),
393 LogForwardMessage::SetMode { stream_to_client },
394 )
395 .map_err(|e| {
396 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string())
397 })?;
398 }
399
400 // Forwarders were never spawned (global forwarding
401 // disabled) and the caller is asking NOT to stream.
402 // That's effectively a no-op so we silently accept.
403 (None, false) => {
404 // Nothing to do.
405 }
406
407 // Forwarders were never spawned, but caller is asking to
408 // stream. We can't satisfy this request without
409 // re-spawning infra, which we deliberately don't do at
410 // runtime.
411 (None, true) => {
412 // return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
413 // "log forwarding disabled by config at startup; cannot enable streaming_to_client",
414 // ));
415 }
416 }
417
418 // Always update the per-proc Python logging level.
419 self.logger_mesh
420 .cast(instance.deref(), LoggerRuntimeMessage::SetLogging { level })
421 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
422
423 // Always update the client actor's aggregation window.
424 self.client_actor
425 .send(
426 instance.deref(),
427 LogClientMessage::SetAggregate {
428 aggregate_window_sec,
429 },
430 )
431 .map_err(anyhow::Error::msg)?;
432
433 Ok(())
434 }
435
436 /// Force a sync flush of remote stdout/stderr back to the client,
437 /// and wait for completion.
438 ///
439 /// If log forwarding was disabled at startup (so we never spawned
440 /// any `LogForwardActor`s), this becomes a no-op success: there's
441 /// nothing to flush from remote procs in that mode, and we don't
442 /// try to manufacture it dynamically.
443 fn flush(&self, instance: &PyInstance) -> PyResult<PyPythonTask> {
444 let forwarder_mesh_opt = self
445 .forwarder_mesh
446 .as_ref()
447 .map(|mesh| mesh.deref().clone());
448 let client_actor = self.client_actor.clone();
449 let instance = instance.clone();
450
451 PyPythonTask::new(async move {
452 // If there's no forwarer mesh (forwarding disabled by
453 // config), we just succeed immediately.
454 let Some(forwarder_mesh) = forwarder_mesh_opt else {
455 return Ok(());
456 };
457
458 Self::flush_internal(instance.deref(), client_actor, forwarder_mesh)
459 .await
460 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))
461 })
462 }
463}
464
465// NOTE ON LIFECYCLE / CLEANUP
466//
467// `LoggingMeshClient` is a thin owner for three pieces of logging
468// infra:
469//
470// - `client_actor`: a single `LogClientActor` in the *local*
471// process.
472// - `forwarder_mesh`: (optional) an `ActorMesh<LogForwardActor>`
473// with one actor per remote proc in the `ProcMesh`, responsible for
474// forwarding that proc's stdout/stderr back to the client.
475// - `logger_mesh`: an `ActorMesh<LoggerRuntimeActor>` with one
476// actor per remote proc, responsible for driving that proc's Python
477// logging configuration.
478//
479// The Python-facing handle we hand back to callers is a
480// `Py<LoggingMeshClient>`. When that handle is dropped (or goes out
481// of scope in a test), PyO3 will run `Drop` for `LoggingMeshClient`.
482//
483// Important:
484//
485// - In `Drop` we *only* call `drain_and_stop()` on the local
486// `LogClientActor`. This asks the client-side aggregator to
487// flush/stop so we don't leave a local task running.
488// - We do NOT synchronously tear down the per-proc meshes here.
489// Dropping `forwarder_mesh` / `logger_mesh` just releases our
490// handles; the actual `LogForwardActor` / `LoggerRuntimeActor`
491// instances keep running on the remote procs until those procs are
492// shut down.
493//
494// This is fine in tests because we always shut the world down
495// afterward via `host_mesh.shutdown(&instance)`, which tears down the
496// spawned procs and all actors running in them. In other words:
497//
498// drop(Py<LoggingMeshClient>)
499// → stops the local `LogClientActor`, drops mesh handles
500// host_mesh.shutdown(...)
501// → kills the remote procs, which takes out the per-proc actors
502//
503// If you reuse this type outside tests, keep in mind that simply
504// dropping `LoggingMeshClient` does *not* on its own tear down the
505// remote logging actors; it only stops the local client actor.
506impl Drop for LoggingMeshClient {
507 fn drop(&mut self) {
508 // Use catch_unwind to guard against panics during interpreter shutdown.
509 // During Python teardown, the tokio runtime or channels may already be
510 // deallocated, and attempting to drain could cause a segfault.
511 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
512 match self.client_actor.drain_and_stop("logging client shutdown") {
513 Ok(_) => {}
514 Err(e) => {
515 // it is ok as during shutdown, the channel might already be closed
516 tracing::debug!("error draining logging client actor during shutdown: {}", e);
517 }
518 }
519 }));
520 }
521}
522
523/// Turns a python exception into a string with a traceback. If the traceback doesn't
524/// exist or can't be formatted, returns just the exception message.
525pub(crate) fn format_traceback<'py>(py: Python<'py>, err: &PyErr) -> String {
526 let traceback = err.traceback(py);
527 if traceback.is_some() {
528 let inner = || -> PyResult<String> {
529 let formatted = py
530 .import("traceback")?
531 .call_method1("format_exception", (err.clone_ref(py),))?;
532 Ok(PyString::new(py, "")
533 .call_method1("join", (formatted,))?
534 .to_string())
535 };
536 match inner() {
537 Ok(s) => s,
538 Err(e) => format!("{}: no traceback {}", err, e),
539 }
540 } else {
541 err.to_string()
542 }
543}
544
545#[pyfunction]
546fn log_endpoint_exception<'py>(
547 py: Python<'py>,
548 e: Py<PyAny>,
549 endpoint: Py<PyAny>,
550 actor_id: PyActorId,
551) {
552 let pyerr = PyErr::from_value(e.into_bound(py));
553 let exception_str = format_traceback(py, &pyerr);
554 let endpoint = endpoint.into_bound(py).to_string();
555 tracing::info!(
556 actor_id = actor_id.inner.to_string(),
557 %endpoint,
558 "exception occurred in endpoint: {}",
559 exception_str,
560 );
561}
562
563/// Register the Python-facing types for this module.
564///
565/// `pyo3` calls this when building `monarch._rust_bindings...`. We
566/// expose `LoggingMeshClient` so that Python can construct it and
567/// call its methods (`spawn`, `set_mode`, `flush`, ...).
568pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
569 module.add_class::<LoggingMeshClient>()?;
570 let log_endpoint_exception = wrap_pyfunction!(log_endpoint_exception, module.py())?;
571 log_endpoint_exception.setattr(
572 "__module__",
573 "monarch._rust_bindings.monarch_hyperactor.logging",
574 )?;
575 module.add_function(log_endpoint_exception)?;
576 Ok(())
577}
578
579#[cfg(test)]
580mod tests {
581 use anyhow::Result;
582 use hyperactor::Instance;
583 use hyperactor::channel::ChannelTransport;
584 use hyperactor::proc::Proc;
585 use hyperactor_mesh::ProcMesh;
586 use hyperactor_mesh::host_mesh::HostMesh;
587 use ndslice::Extent;
588 use ndslice::View; // .region(), .num_ranks() etc.
589
590 use super::*;
591 use crate::actor::PythonActor;
592 use crate::pytokio::AwaitPyExt;
593 use crate::pytokio::ensure_python;
594
595 /// Bring up a minimal "world" suitable for integration-style
596 /// tests.
597 pub async fn test_world() -> Result<(Proc, Instance<PythonActor>, HostMesh, ProcMesh)> {
598 ensure_python();
599
600 let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
601 .expect("failed to start root Proc");
602
603 let ai = proc
604 .actor_instance("client")
605 .expect("failed to create proc Instance");
606 let instance = ai.instance;
607
608 let host_mesh = HostMesh::local_with_bootstrap(
609 crate::testresource::get("monarch/monarch_hyperactor/bootstrap").into(),
610 )
611 .await
612 .expect("failed to bootstrap HostMesh");
613
614 let proc_mesh = host_mesh
615 .spawn(&instance, "p0", Extent::unity())
616 .await
617 .expect("failed to spawn ProcMesh");
618
619 Ok((proc, instance, host_mesh, proc_mesh))
620 }
621
622 #[tokio::test]
623 async fn test_world_smoke() {
624 let (proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
625
626 assert_eq!(
627 host_mesh.region().num_ranks(),
628 1,
629 "should allocate exactly one host"
630 );
631 assert_eq!(
632 proc_mesh.region().num_ranks(),
633 1,
634 "should spawn exactly one proc"
635 );
636 assert_eq!(
637 instance.self_id().proc_id(),
638 proc.proc_id(),
639 "returned Instance<()> should be bound to the root Proc"
640 );
641
642 host_mesh.shutdown(&instance).await.expect("host shutdown");
643 }
644
645 #[tokio::test]
646 async fn spawn_respects_forwarding_flag() {
647 let (_, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
648
649 let py_instance = PyInstance::from(&instance);
650 let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
651 let lock = hyperactor_config::global::lock();
652
653 // Case 1: forwarding disabled => `forwarder_mesh` should be `None`.
654 {
655 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
656
657 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
658 .expect("spawn PyPythonTask (forwarding disabled)");
659
660 let client_py: Py<LoggingMeshClient> = client_task
661 .await_py()
662 .await
663 .expect("spawn failed (forwarding disabled)");
664
665 monarch_with_gil(|py| {
666 let client_ref = client_py.borrow(py);
667 assert!(
668 client_ref.forwarder_mesh.is_none(),
669 "forwarder_mesh should be None when forwarding disabled"
670 );
671 })
672 .await;
673
674 drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
675 }
676
677 // Case 2: forwarding enabled => `forwarder_mesh` should be `Some`.
678 {
679 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
680
681 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
682 .expect("spawn PyPythonTask (forwarding enabled)");
683
684 let client_py: Py<LoggingMeshClient> = client_task
685 .await_py()
686 .await
687 .expect("spawn failed (forwarding enabled)");
688
689 monarch_with_gil(|py| {
690 let client_ref = client_py.borrow(py);
691 assert!(
692 client_ref.forwarder_mesh.is_some(),
693 "forwarder_mesh should be Some(..) when forwarding is enabled"
694 );
695 })
696 .await;
697
698 drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
699 }
700
701 host_mesh.shutdown(&instance).await.expect("host shutdown");
702 }
703
704 #[tokio::test]
705 async fn set_mode_behaviors() {
706 let (_proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
707
708 let py_instance = PyInstance::from(&instance);
709 let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
710 let lock = hyperactor_config::global::lock();
711
712 // Case 1: forwarding disabled => `forwarder_mesh.is_none()`.
713 {
714 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
715
716 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
717 .expect("spawn PyPythonTask (forwarding disabled)");
718
719 let client_py: Py<LoggingMeshClient> = client_task
720 .await_py()
721 .await
722 .expect("spawn failed (forwarding disabled)");
723
724 monarch_with_gil(|py| {
725 let client_ref = client_py.borrow(py);
726
727 // (a) stream_to_client = false, no aggregate window
728 // -> OK
729 let res = client_ref.set_mode(&py_instance, false, None, 10);
730 assert!(res.is_ok(), "expected Ok(..), got {res:?}");
731
732 // (b) stream_to_client = false,
733 // aggregate_window_sec.is_some() -> Err = Some(..) ->
734 // Err
735 let res = client_ref.set_mode(&py_instance, false, Some(1), 10);
736 assert!(
737 res.is_err(),
738 "expected Err(..) for window without streaming"
739 );
740 if let Err(e) = res {
741 let msg = e.to_string();
742 assert!(
743 msg.contains("cannot set aggregate window without streaming to client"),
744 "unexpected err for aggregate_window without streaming: {msg}"
745 );
746 }
747
748 /*
749 // Update (SF: 2025, 11, 13): We now ignore stream to client requests if
750 // log forwarding is enabled.
751 // (c) stream_to_client = true when forwarding was
752 // never spawned -> Err
753 let res = client_ref.set_mode(&py_instance, true, None, 10);
754 assert!(
755 res.is_err(),
756 "expected Err(..) when enabling streaming but no forwarders"
757 );
758 if let Err(e) = res {
759 let msg = e.to_string();
760 assert!(
761 msg.contains("log forwarding disabled by config at startup"),
762 "unexpected err when enabling streaming with no forwarders: {msg}"
763 );
764 }
765 */
766 })
767 .await;
768
769 drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
770 }
771
772 // Case 2: forwarding enabled => `forwarder_mesh.is_some()`.
773 {
774 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
775
776 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
777 .expect("spawn PyPythonTask (forwarding enabled)");
778
779 let client_py: Py<LoggingMeshClient> = client_task
780 .await_py()
781 .await
782 .expect("spawn failed (forwarding enabled)");
783
784 monarch_with_gil(|py| {
785 let client_ref = client_py.borrow(py);
786
787 // (d) stream_to_client = true, aggregate_window_sec =
788 // Some(..) -> OK now that we *do* have forwarders,
789 // enabling streaming should succeed.
790 let res = client_ref.set_mode(&py_instance, true, Some(2), 20);
791 assert!(
792 res.is_ok(),
793 "expected Ok(..) enabling streaming w/ window: {res:?}"
794 );
795
796 // (e) aggregate_window_sec = Some(..) but
797 // stream_to_client = false -> still Err (this
798 // rule doesn't care about forwarding being
799 // enabled or not).
800 let res = client_ref.set_mode(&py_instance, false, Some(2), 20);
801 assert!(
802 res.is_err(),
803 "expected Err(..) for window without streaming even w/ forwarders"
804 );
805 if let Err(e) = res {
806 let msg = e.to_string();
807 assert!(
808 msg.contains("cannot set aggregate window without streaming to client"),
809 "unexpected err when setting window but disabling streaming: {msg}"
810 );
811 }
812 })
813 .await;
814
815 drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
816 }
817
818 host_mesh.shutdown(&instance).await.expect("host shutdown");
819 }
820
821 #[tokio::test]
822 async fn flush_behaviors() {
823 let (_proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
824
825 let py_instance = PyInstance::from(&instance);
826 let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
827 let lock = hyperactor_config::global::lock();
828
829 // Case 1: forwarding disabled => `forwarder_mesh.is_none()`.
830 {
831 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
832
833 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
834 .expect("spawn PyPythonTask (forwarding disabled)");
835
836 let client_py: Py<LoggingMeshClient> = client_task
837 .await_py()
838 .await
839 .expect("spawn failed (forwarding disabled)");
840
841 // Call flush() and bring the PyPythonTask back out.
842 let flush_task = monarch_with_gil(|py| {
843 let client_ref = client_py.borrow(py);
844 client_ref
845 .flush(&py_instance)
846 .expect("flush() PyPythonTask (forwarding disabled)")
847 })
848 .await;
849
850 // Await the returned PyPythonTask's future outside the
851 // GIL.
852 let flush_result = flush_task
853 .await_unit()
854 .await
855 .expect("flush failed (forwarding disabled)");
856
857 let _ = flush_result;
858 drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
859 }
860
861 // Case 2: forwarding enabled => `forwarder_mesh.is_some()`.
862 {
863 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
864
865 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
866 .expect("spawn PyPythonTask (forwarding enabled)");
867
868 let client_py: Py<LoggingMeshClient> = client_task
869 .await_py()
870 .await
871 .expect("spawn failed (forwarding enabled)");
872
873 // Call flush() to exercise the barrier path, and pull the
874 // PyPythonTask out.
875 let flush_task = monarch_with_gil(|py| {
876 client_py
877 .borrow(py)
878 .flush(&py_instance)
879 .expect("flush() PyPythonTask (forwarding enabled)")
880 })
881 .await;
882
883 // Await the returned PyPythonTask's future outside the
884 // GIL.
885 let flush_result = flush_task
886 .await_unit()
887 .await
888 .expect("flush failed (forwarding enabled)");
889
890 let _ = flush_result;
891 drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
892 }
893
894 host_mesh.shutdown(&instance).await.expect("host shutdown");
895 }
896}