monarch_hyperactor/
logging.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(unsafe_op_in_unsafe_fn)]
10
11use std::ops::Deref;
12use std::sync::Arc;
13use std::sync::atomic::AtomicUsize;
14use std::sync::atomic::Ordering;
15
16use anyhow::Result;
17use async_trait::async_trait;
18use hyperactor::Actor;
19use hyperactor::ActorHandle;
20use hyperactor::Bind;
21use hyperactor::Context;
22use hyperactor::HandleClient;
23use hyperactor::Handler;
24use hyperactor::Instance;
25use hyperactor::RefClient;
26use hyperactor::RemoteSpawn;
27use hyperactor::Unbind;
28use hyperactor::context;
29use hyperactor_config::Flattrs;
30use hyperactor_mesh::ActorMesh;
31use hyperactor_mesh::actor_mesh::ActorMeshRef;
32use hyperactor_mesh::bootstrap::MESH_ENABLE_LOG_FORWARDING;
33use hyperactor_mesh::logging::LogClientActor;
34use hyperactor_mesh::logging::LogClientMessage;
35use hyperactor_mesh::logging::LogForwardActor;
36use hyperactor_mesh::logging::LogForwardMessage;
37use monarch_types::SerializablePyErr;
38use ndslice::View;
39use pyo3::Bound;
40use pyo3::prelude::*;
41use pyo3::types::PyModule;
42use pyo3::types::PyString;
43use serde::Deserialize;
44use serde::Serialize;
45use typeuri::Named;
46
47use crate::context::PyInstance;
48use crate::proc::PyActorId;
49use crate::proc_mesh::PyProcMesh;
50use crate::pytokio::PyPythonTask;
51use crate::runtime::monarch_with_gil;
52
53#[derive(
54    Debug,
55    Clone,
56    Serialize,
57    Deserialize,
58    Named,
59    Handler,
60    HandleClient,
61    RefClient,
62    Bind,
63    Unbind
64)]
65pub enum LoggerRuntimeMessage {
66    SetLogging { level: u8 },
67}
68
69/// Simple Rust actor that invokes python logger APIs. It needs a python runtime.
70#[derive(Debug)]
71#[hyperactor::export(spawn = true, handlers = [LoggerRuntimeMessage {cast = true}])]
72pub struct LoggerRuntimeActor {
73    logger: Arc<Py<PyAny>>,
74}
75
76impl LoggerRuntimeActor {
77    fn get_logger(py: Python) -> PyResult<Py<PyAny>> {
78        // Import the Python AutoReloader class
79        let logging_module = py.import("logging")?;
80        let logger = logging_module.call_method0("getLogger")?;
81
82        Ok(logger.into())
83    }
84
85    fn set_logger_level(py: Python, logger: &Py<PyAny>, level: u8) -> PyResult<()> {
86        let logger = logger.bind(py);
87        logger.call_method1("setLevel", (level,))?;
88        Ok(())
89    }
90}
91#[async_trait]
92impl Actor for LoggerRuntimeActor {
93    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
94        this.set_system();
95        Ok(())
96    }
97}
98
99#[async_trait]
100impl RemoteSpawn for LoggerRuntimeActor {
101    type Params = ();
102
103    async fn new(_: (), _environment: Flattrs) -> Result<Self, anyhow::Error> {
104        let logger =
105            monarch_with_gil(|py| Self::get_logger(py).map_err(SerializablePyErr::from_fn(py)))
106                .await?;
107        Ok(Self {
108            logger: Arc::new(logger),
109        })
110    }
111}
112
113#[async_trait]
114#[hyperactor::handle(LoggerRuntimeMessage)]
115impl LoggerRuntimeMessageHandler for LoggerRuntimeActor {
116    async fn set_logging(&mut self, _cx: &Context<Self>, level: u8) -> Result<(), anyhow::Error> {
117        let logger: Arc<_> = self.logger.clone();
118        monarch_with_gil(|py| {
119            Self::set_logger_level(py, logger.as_ref(), level)
120                .map_err(SerializablePyErr::from_fn(py))
121        })
122        .await?;
123        Ok(())
124    }
125}
126
127/// `LoggingMeshClient` is the Python-facing handle for distributed
128/// logging over a `ProcMesh`.
129///
130/// Calling `spawn(...)` builds three pieces of logging infra:
131///
132///   - `client_actor`: a single `LogClientActor` running in the
133///     *local* process. It aggregates forwarded stdout/stderr,
134///     batches it, and coordinates sync flush barriers.
135///
136///   - `forwarder_mesh`: (optional) an `ActorMesh<LogForwardActor>`
137///     with one actor per remote proc. Each `LogForwardActor` sits in
138///     that proc and forwards its stdout/stderr back to the client.
139///     This mesh only exists if `MESH_ENABLE_LOG_FORWARDING` was `true`
140///     at startup; otherwise it's `None` and we never spawn any
141///     forwarders.
142///
143///   - `logger_mesh`: an `ActorMesh<LoggerRuntimeActor>` with one
144///     actor per remote proc. Each `LoggerRuntimeActor` controls that
145///     proc's Python logging runtime (log level, handlers, etc.).
146///     This mesh is always created, even if forwarding is disabled.
147///
148/// The Python object you get back holds references to all of this so
149/// that you can:
150///   - toggle streaming vs "stay quiet" (`set_mode(...)`),
151///   - adjust the per-proc Python log level (`set_mode(...)`),
152///   - force a sync flush of forwarded output and wait for completion
153///     (`flush(...)`).
154///
155/// Drop semantics:
156///   Dropping the Python handle runs `Drop` on this Rust struct,
157///   which drains/stops the local `LogClientActor` but does *not*
158///   synchronously tear down the per-proc meshes. The remote
159///   `LogForwardActor` / `LoggerRuntimeActor` instances keep running
160///   until the remote procs themselves are shut down (e.g. via
161///   `host_mesh.shutdown(...)` in tests).
162#[pyclass(
163    frozen,
164    name = "LoggingMeshClient",
165    module = "monarch._rust_bindings.monarch_hyperactor.logging"
166)]
167pub struct LoggingMeshClient {
168    // Per-proc LogForwardActor mesh (optional). When enabled, each
169    // remote proc forwards its stdout/stderr back to the client. This
170    // actor does not interact with the embedded Python runtime.
171    forwarder_mesh: Option<ActorMesh<LogForwardActor>>,
172
173    // Per-proc LoggerRuntimeActor mesh. One LoggerRuntimeActor runs
174    // on every proc in the mesh and is responsible for driving that
175    // proc's Python logging configuration (log level, handlers,
176    // etc.).
177    //
178    // `set_mode(..)` always broadcasts the requested log level to
179    // this mesh, regardless of whether stdout/stderr forwarding is
180    // enabled.
181    //
182    // Even on a proc that isn't meaningfully running Python code, we
183    // still spawn LoggerRuntimeActor and it will still apply the new
184    // level to that proc's Python logger. In that case, updating the
185    // level may have no visible effect simply because nothing on that
186    // proc ever emits logs through Python's `logging` module.
187    logger_mesh: ActorMesh<LoggerRuntimeActor>,
188
189    // Client-side LogClientActor. Lives in the client process;
190    // receives forwarded output, aggregates and buffers it, and
191    // coordinates sync flush barriers.
192    client_actor: ActorHandle<LogClientActor>,
193}
194
195impl LoggingMeshClient {
196    /// Drive a synchronous "drain all logs now" barrier across the
197    /// mesh.
198    ///
199    /// Protocol:
200    ///   1. Tell the local `LogClientActor` we're starting a sync
201    ///      flush. We give it:
202    ///      - how many procs we expect to hear from
203    ///        (`expected_procs`),
204    ///      - a `reply` port it will use to signal completion,
205    ///      - a `version` port it will use to hand us a flush version
206    ///        token. After this send, the client_actor is now in "sync
207    ///        flush vN" mode.
208    ///
209    ///   2. Wait for that version token from the client. This tells
210    ///      us which flush epoch we're coordinating
211    ///      (`version_rx.recv()`).
212    ///
213    ///   3. Broadcast `ForceSyncFlush { version }` to every
214    ///      `LogForwardActor` in the `forwarder_mesh`. Each forwarder
215    ///      tells its proc-local logger/forwarding loop: "flush
216    ///      everything you have for this version now, then report
217    ///      back."
218    ///
219    ///   4. Wait on `reply_rx`. The `LogClientActor` only replies
220    ///      once it has:
221    ///      - received the per-proc sync points for this version from
222    ///        all forwarders,
223    ///      - emitted/forwarded their buffered output,
224    ///      - and finished flushing its own buffers.
225    ///
226    /// When this returns `Ok(())`, all stdout/stderr that existed at
227    /// the moment we kicked off the flush has been forwarded to the
228    /// client and drained. This is used by
229    /// `LoggingMeshClient.flush()`.
230    async fn flush_internal(
231        cx: &impl context::Actor,
232        client_actor: ActorHandle<LogClientActor>,
233        forwarder_mesh: ActorMeshRef<LogForwardActor>,
234    ) -> Result<(), anyhow::Error> {
235        let (reply_tx, reply_rx) = cx.instance().open_once_port::<()>();
236        let (version_tx, version_rx) = cx.instance().open_once_port::<u64>();
237
238        // First initialize a sync flush.
239        client_actor.send(
240            cx,
241            LogClientMessage::StartSyncFlush {
242                expected_procs: forwarder_mesh.region().num_ranks(),
243                reply: reply_tx.bind(),
244                version: version_tx.bind(),
245            },
246        )?;
247
248        let version = version_rx.recv().await?;
249
250        // Then ask all the flushers to ask the log forwarders to sync
251        // flush
252        forwarder_mesh.cast(cx, LogForwardMessage::ForceSyncFlush { version })?;
253
254        // Finally the forwarder will send sync point back to the
255        // client, flush, and return.
256        reply_rx.recv().await?;
257
258        Ok(())
259    }
260}
261
262#[pymethods]
263impl LoggingMeshClient {
264    /// Initialize logging for a `ProcMesh` and return a
265    /// `LoggingMeshClient`.
266    ///
267    /// This wires up three pieces of logging infrastructure:
268    ///
269    /// 1. A single `LogClientActor` in the *client* process. This
270    ///    actor receives forwarded stdout/stderr, buffers and
271    ///    aggregates it, and coordinates sync flush barriers.
272    ///
273    /// 2. (Optional) A `LogForwardActor` on every remote proc in the
274    ///    mesh. These forwarders read that proc's stdout/stderr and
275    ///    stream it back to the client. We only spawn this mesh if
276    ///    `MESH_ENABLE_LOG_FORWARDING` was `true` in the config. If
277    ///    forwarding is disabled at startup, we do not spawn these
278    ///    actors and `forwarder_mesh` will be `None`.
279    ///
280    /// 3. A `LoggerRuntimeActor` on every remote proc in the mesh.
281    ///    This actor controls the Python logging runtime (log level,
282    ///    handlers, etc.) in that process. This is always spawned,
283    ///    even if log forwarding is disabled.
284    ///
285    /// The returned `LoggingMeshClient` holds handles to those
286    /// actors. Later, `set_mode(...)` can adjust per-proc log level
287    /// and (if forwarding was enabled) toggle whether remote output
288    /// is actually streamed back to the client. If forwarding was
289    /// disabled by config, requests to enable streaming will fail.
290    #[staticmethod]
291    fn spawn(instance: &PyInstance, proc_mesh: &PyProcMesh) -> PyResult<PyPythonTask> {
292        let proc_mesh = proc_mesh.mesh_ref()?;
293        let instance = instance.clone();
294
295        PyPythonTask::new(async move {
296            // 1. Spawn the client-side coordinator actor (lives in
297            // the caller's process).
298            static LOG_CLIENT_COUNTER: AtomicUsize = AtomicUsize::new(0);
299            let id = LOG_CLIENT_COUNTER.fetch_add(1, Ordering::Relaxed);
300            let name = if id == 0 {
301                "log_client".to_string()
302            } else {
303                format!("log_client_{}", id)
304            };
305            let client_actor: ActorHandle<LogClientActor> =
306                instance.proc().spawn(&name, LogClientActor::default())?;
307            let client_actor_ref = client_actor.bind();
308
309            // Read config to decide if we stand up per-proc
310            // stdout/stderr forwarding.
311            let forwarding_enabled = hyperactor_config::global::get(MESH_ENABLE_LOG_FORWARDING);
312
313            // 2. Optionally spawn per-proc `LogForwardActor` mesh
314            // (stdout/stderr forwarders).
315            let forwarder_mesh = if forwarding_enabled {
316                // Spawn a `LogFwdActor` on every proc.
317                let mesh = proc_mesh
318                    .spawn(instance.deref(), "log_forwarder", &client_actor_ref)
319                    .await
320                    .map_err(anyhow::Error::from)?;
321
322                Some(mesh)
323            } else {
324                None
325            };
326
327            // 3. Always spawn a `LoggerRuntimeActor` on every proc.
328            let logger_mesh = proc_mesh
329                .spawn(instance.deref(), "logger", &())
330                .await
331                .map_err(anyhow::Error::from)?;
332
333            Ok(Self {
334                forwarder_mesh,
335                logger_mesh,
336                client_actor,
337            })
338        })
339    }
340
341    /// Update logging behavior for this mesh.
342    ///
343    /// `stream_to_client` controls whether remote procs actively
344    /// stream their stdout/stderr back to the client process.
345    ///
346    /// - If log forwarding was enabled at startup, `forwarder_mesh`
347    ///   is `Some` and we propagate this flag to every per-proc
348    ///   `LogForwardActor`.
349    /// - If log forwarding was disabled at startup, `forwarder_mesh`
350    ///   is `None`.
351    ///   In that case:
352    ///     * requesting `stream_to_client = false` is a no-op
353    ///       (accepted),
354    ///     * requesting `stream_to_client = true` is rejected,
355    ///       because we did not spawn forwarders and we don't
356    ///       dynamically create them later.
357    ///
358    /// `aggregate_window_sec` configures how the client-side
359    /// `LogClientActor` batches forwarded output. It is only
360    /// meaningful when streaming is enabled. Calling this with
361    /// `Some(..)` while `stream_to_client == false` is invalid and
362    /// returns an error.
363    ///
364    /// `level` is the desired Python logging level. We always
365    /// broadcast this to the per-proc `LoggerRuntimeActor` mesh so
366    /// each remote process can update its own Python logger
367    /// configuration, regardless of whether stdout/stderr forwarding
368    /// is active.
369    fn set_mode(
370        &self,
371        instance: &PyInstance,
372        stream_to_client: bool,
373        aggregate_window_sec: Option<u64>,
374        level: u8,
375    ) -> PyResult<()> {
376        // We can't ask for an aggregation window if we're not
377        // streaming.
378        if aggregate_window_sec.is_some() && !stream_to_client {
379            return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
380                "cannot set aggregate window without streaming to client".to_string(),
381            ));
382        }
383
384        // Handle the forwarder side (stdout/stderr streaming back to
385        // client).
386        match (&self.forwarder_mesh, stream_to_client) {
387            // Forwarders exist (config enabled at startup). We can
388            // toggle live.
389            (Some(fwd_mesh), _) => {
390                fwd_mesh
391                    .cast(
392                        instance.deref(),
393                        LogForwardMessage::SetMode { stream_to_client },
394                    )
395                    .map_err(|e| {
396                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string())
397                    })?;
398            }
399
400            // Forwarders were never spawned (global forwarding
401            // disabled) and the caller is asking NOT to stream.
402            // That's effectively a no-op so we silently accept.
403            (None, false) => {
404                // Nothing to do.
405            }
406
407            // Forwarders were never spawned, but caller is asking to
408            // stream. We can't satisfy this request without
409            // re-spawning infra, which we deliberately don't do at
410            // runtime.
411            (None, true) => {
412                // return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
413                //     "log forwarding disabled by config at startup; cannot enable streaming_to_client",
414                // ));
415            }
416        }
417
418        // Always update the per-proc Python logging level.
419        self.logger_mesh
420            .cast(instance.deref(), LoggerRuntimeMessage::SetLogging { level })
421            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
422
423        // Always update the client actor's aggregation window.
424        self.client_actor
425            .send(
426                instance.deref(),
427                LogClientMessage::SetAggregate {
428                    aggregate_window_sec,
429                },
430            )
431            .map_err(anyhow::Error::msg)?;
432
433        Ok(())
434    }
435
436    /// Force a sync flush of remote stdout/stderr back to the client,
437    /// and wait for completion.
438    ///
439    /// If log forwarding was disabled at startup (so we never spawned
440    /// any `LogForwardActor`s), this becomes a no-op success: there's
441    /// nothing to flush from remote procs in that mode, and we don't
442    /// try to manufacture it dynamically.
443    fn flush(&self, instance: &PyInstance) -> PyResult<PyPythonTask> {
444        let forwarder_mesh_opt = self
445            .forwarder_mesh
446            .as_ref()
447            .map(|mesh| mesh.deref().clone());
448        let client_actor = self.client_actor.clone();
449        let instance = instance.clone();
450
451        PyPythonTask::new(async move {
452            // If there's no forwarer mesh (forwarding disabled by
453            // config), we just succeed immediately.
454            let Some(forwarder_mesh) = forwarder_mesh_opt else {
455                return Ok(());
456            };
457
458            Self::flush_internal(instance.deref(), client_actor, forwarder_mesh)
459                .await
460                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))
461        })
462    }
463}
464
465// NOTE ON LIFECYCLE / CLEANUP
466//
467// `LoggingMeshClient` is a thin owner for three pieces of logging
468// infra:
469//
470//   - `client_actor`: a single `LogClientActor` in the *local*
471//     process.
472//   - `forwarder_mesh`: (optional) an `ActorMesh<LogForwardActor>`
473//     with one actor per remote proc in the `ProcMesh`, responsible for
474//     forwarding that proc's stdout/stderr back to the client.
475//   - `logger_mesh`: an `ActorMesh<LoggerRuntimeActor>` with one
476//     actor per remote proc, responsible for driving that proc's Python
477//     logging configuration.
478//
479// The Python-facing handle we hand back to callers is a
480// `Py<LoggingMeshClient>`. When that handle is dropped (or goes out
481// of scope in a test), PyO3 will run `Drop` for `LoggingMeshClient`.
482//
483// Important:
484//
485// - In `Drop` we *only* call `drain_and_stop()` on the local
486//   `LogClientActor`. This asks the client-side aggregator to
487//   flush/stop so we don't leave a local task running.
488// - We do NOT synchronously tear down the per-proc meshes here.
489//   Dropping `forwarder_mesh` / `logger_mesh` just releases our
490//   handles; the actual `LogForwardActor` / `LoggerRuntimeActor`
491//   instances keep running on the remote procs until those procs are
492//   shut down.
493//
494// This is fine in tests because we always shut the world down
495// afterward via `host_mesh.shutdown(&instance)`, which tears down the
496// spawned procs and all actors running in them. In other words:
497//
498//   drop(Py<LoggingMeshClient>)
499//     → stops the local `LogClientActor`, drops mesh handles
500//   host_mesh.shutdown(...)
501//     → kills the remote procs, which takes out the per-proc actors
502//
503// If you reuse this type outside tests, keep in mind that simply
504// dropping `LoggingMeshClient` does *not* on its own tear down the
505// remote logging actors; it only stops the local client actor.
506impl Drop for LoggingMeshClient {
507    fn drop(&mut self) {
508        // Use catch_unwind to guard against panics during interpreter shutdown.
509        // During Python teardown, the tokio runtime or channels may already be
510        // deallocated, and attempting to drain could cause a segfault.
511        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
512            match self.client_actor.drain_and_stop("logging client shutdown") {
513                Ok(_) => {}
514                Err(e) => {
515                    // it is ok as during shutdown, the channel might already be closed
516                    tracing::debug!("error draining logging client actor during shutdown: {}", e);
517                }
518            }
519        }));
520    }
521}
522
523/// Turns a python exception into a string with a traceback. If the traceback doesn't
524/// exist or can't be formatted, returns just the exception message.
525pub(crate) fn format_traceback<'py>(py: Python<'py>, err: &PyErr) -> String {
526    let traceback = err.traceback(py);
527    if traceback.is_some() {
528        let inner = || -> PyResult<String> {
529            let formatted = py
530                .import("traceback")?
531                .call_method1("format_exception", (err.clone_ref(py),))?;
532            Ok(PyString::new(py, "")
533                .call_method1("join", (formatted,))?
534                .to_string())
535        };
536        match inner() {
537            Ok(s) => s,
538            Err(e) => format!("{}: no traceback {}", err, e),
539        }
540    } else {
541        err.to_string()
542    }
543}
544
545#[pyfunction]
546fn log_endpoint_exception<'py>(
547    py: Python<'py>,
548    e: Py<PyAny>,
549    endpoint: Py<PyAny>,
550    actor_id: PyActorId,
551) {
552    let pyerr = PyErr::from_value(e.into_bound(py));
553    let exception_str = format_traceback(py, &pyerr);
554    let endpoint = endpoint.into_bound(py).to_string();
555    tracing::info!(
556        actor_id = actor_id.inner.to_string(),
557        %endpoint,
558        "exception occurred in endpoint: {}",
559        exception_str,
560    );
561}
562
563/// Register the Python-facing types for this module.
564///
565/// `pyo3` calls this when building `monarch._rust_bindings...`. We
566/// expose `LoggingMeshClient` so that Python can construct it and
567/// call its methods (`spawn`, `set_mode`, `flush`, ...).
568pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
569    module.add_class::<LoggingMeshClient>()?;
570    let log_endpoint_exception = wrap_pyfunction!(log_endpoint_exception, module.py())?;
571    log_endpoint_exception.setattr(
572        "__module__",
573        "monarch._rust_bindings.monarch_hyperactor.logging",
574    )?;
575    module.add_function(log_endpoint_exception)?;
576    Ok(())
577}
578
579#[cfg(test)]
580mod tests {
581    use anyhow::Result;
582    use hyperactor::Instance;
583    use hyperactor::channel::ChannelTransport;
584    use hyperactor::proc::Proc;
585    use hyperactor_mesh::ProcMesh;
586    use hyperactor_mesh::host_mesh::HostMesh;
587    use ndslice::Extent;
588    use ndslice::View; // .region(), .num_ranks() etc.
589
590    use super::*;
591    use crate::actor::PythonActor;
592    use crate::pytokio::AwaitPyExt;
593    use crate::pytokio::ensure_python;
594
595    /// Bring up a minimal "world" suitable for integration-style
596    /// tests.
597    pub async fn test_world() -> Result<(Proc, Instance<PythonActor>, HostMesh, ProcMesh)> {
598        ensure_python();
599
600        let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
601            .expect("failed to start root Proc");
602
603        let ai = proc
604            .actor_instance("client")
605            .expect("failed to create proc Instance");
606        let instance = ai.instance;
607
608        let host_mesh = HostMesh::local_with_bootstrap(
609            crate::testresource::get("monarch/monarch_hyperactor/bootstrap").into(),
610        )
611        .await
612        .expect("failed to bootstrap HostMesh");
613
614        let proc_mesh = host_mesh
615            .spawn(&instance, "p0", Extent::unity())
616            .await
617            .expect("failed to spawn ProcMesh");
618
619        Ok((proc, instance, host_mesh, proc_mesh))
620    }
621
622    #[tokio::test]
623    async fn test_world_smoke() {
624        let (proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
625
626        assert_eq!(
627            host_mesh.region().num_ranks(),
628            1,
629            "should allocate exactly one host"
630        );
631        assert_eq!(
632            proc_mesh.region().num_ranks(),
633            1,
634            "should spawn exactly one proc"
635        );
636        assert_eq!(
637            instance.self_id().proc_id(),
638            proc.proc_id(),
639            "returned Instance<()> should be bound to the root Proc"
640        );
641
642        host_mesh.shutdown(&instance).await.expect("host shutdown");
643    }
644
645    #[tokio::test]
646    async fn spawn_respects_forwarding_flag() {
647        let (_, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
648
649        let py_instance = PyInstance::from(&instance);
650        let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
651        let lock = hyperactor_config::global::lock();
652
653        // Case 1: forwarding disabled => `forwarder_mesh` should be `None`.
654        {
655            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
656
657            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
658                .expect("spawn PyPythonTask (forwarding disabled)");
659
660            let client_py: Py<LoggingMeshClient> = client_task
661                .await_py()
662                .await
663                .expect("spawn failed (forwarding disabled)");
664
665            monarch_with_gil(|py| {
666                let client_ref = client_py.borrow(py);
667                assert!(
668                    client_ref.forwarder_mesh.is_none(),
669                    "forwarder_mesh should be None when forwarding disabled"
670                );
671            })
672            .await;
673
674            drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
675        }
676
677        // Case 2: forwarding enabled => `forwarder_mesh` should be `Some`.
678        {
679            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
680
681            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
682                .expect("spawn PyPythonTask (forwarding enabled)");
683
684            let client_py: Py<LoggingMeshClient> = client_task
685                .await_py()
686                .await
687                .expect("spawn failed (forwarding enabled)");
688
689            monarch_with_gil(|py| {
690                let client_ref = client_py.borrow(py);
691                assert!(
692                    client_ref.forwarder_mesh.is_some(),
693                    "forwarder_mesh should be Some(..) when forwarding is enabled"
694                );
695            })
696            .await;
697
698            drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
699        }
700
701        host_mesh.shutdown(&instance).await.expect("host shutdown");
702    }
703
704    #[tokio::test]
705    async fn set_mode_behaviors() {
706        let (_proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
707
708        let py_instance = PyInstance::from(&instance);
709        let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
710        let lock = hyperactor_config::global::lock();
711
712        // Case 1: forwarding disabled => `forwarder_mesh.is_none()`.
713        {
714            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
715
716            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
717                .expect("spawn PyPythonTask (forwarding disabled)");
718
719            let client_py: Py<LoggingMeshClient> = client_task
720                .await_py()
721                .await
722                .expect("spawn failed (forwarding disabled)");
723
724            monarch_with_gil(|py| {
725                let client_ref = client_py.borrow(py);
726
727                // (a) stream_to_client = false, no aggregate window
728                // -> OK
729                let res = client_ref.set_mode(&py_instance, false, None, 10);
730                assert!(res.is_ok(), "expected Ok(..), got {res:?}");
731
732                // (b) stream_to_client = false,
733                // aggregate_window_sec.is_some() -> Err = Some(..) ->
734                // Err
735                let res = client_ref.set_mode(&py_instance, false, Some(1), 10);
736                assert!(
737                    res.is_err(),
738                    "expected Err(..) for window without streaming"
739                );
740                if let Err(e) = res {
741                    let msg = e.to_string();
742                    assert!(
743                        msg.contains("cannot set aggregate window without streaming to client"),
744                        "unexpected err for aggregate_window without streaming: {msg}"
745                    );
746                }
747
748                /*
749                // Update (SF: 2025, 11, 13): We now ignore stream to client requests if
750                // log forwarding is enabled.
751                // (c) stream_to_client = true when forwarding was
752                //     never spawned -> Err
753                let res = client_ref.set_mode(&py_instance, true, None, 10);
754                assert!(
755                    res.is_err(),
756                    "expected Err(..) when enabling streaming but no forwarders"
757                );
758                if let Err(e) = res {
759                    let msg = e.to_string();
760                    assert!(
761                        msg.contains("log forwarding disabled by config at startup"),
762                        "unexpected err when enabling streaming with no forwarders: {msg}"
763                    );
764                }
765                */
766            })
767            .await;
768
769            drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
770        }
771
772        // Case 2: forwarding enabled => `forwarder_mesh.is_some()`.
773        {
774            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
775
776            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
777                .expect("spawn PyPythonTask (forwarding enabled)");
778
779            let client_py: Py<LoggingMeshClient> = client_task
780                .await_py()
781                .await
782                .expect("spawn failed (forwarding enabled)");
783
784            monarch_with_gil(|py| {
785                let client_ref = client_py.borrow(py);
786
787                // (d) stream_to_client = true, aggregate_window_sec =
788                //     Some(..) -> OK now that we *do* have forwarders,
789                //     enabling streaming should succeed.
790                let res = client_ref.set_mode(&py_instance, true, Some(2), 20);
791                assert!(
792                    res.is_ok(),
793                    "expected Ok(..) enabling streaming w/ window: {res:?}"
794                );
795
796                // (e) aggregate_window_sec = Some(..) but
797                //     stream_to_client = false -> still Err (this
798                //     rule doesn't care about forwarding being
799                //     enabled or not).
800                let res = client_ref.set_mode(&py_instance, false, Some(2), 20);
801                assert!(
802                    res.is_err(),
803                    "expected Err(..) for window without streaming even w/ forwarders"
804                );
805                if let Err(e) = res {
806                    let msg = e.to_string();
807                    assert!(
808                        msg.contains("cannot set aggregate window without streaming to client"),
809                        "unexpected err when setting window but disabling streaming: {msg}"
810                    );
811                }
812            })
813            .await;
814
815            drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
816        }
817
818        host_mesh.shutdown(&instance).await.expect("host shutdown");
819    }
820
821    #[tokio::test]
822    async fn flush_behaviors() {
823        let (_proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
824
825        let py_instance = PyInstance::from(&instance);
826        let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
827        let lock = hyperactor_config::global::lock();
828
829        // Case 1: forwarding disabled => `forwarder_mesh.is_none()`.
830        {
831            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
832
833            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
834                .expect("spawn PyPythonTask (forwarding disabled)");
835
836            let client_py: Py<LoggingMeshClient> = client_task
837                .await_py()
838                .await
839                .expect("spawn failed (forwarding disabled)");
840
841            // Call flush() and bring the PyPythonTask back out.
842            let flush_task = monarch_with_gil(|py| {
843                let client_ref = client_py.borrow(py);
844                client_ref
845                    .flush(&py_instance)
846                    .expect("flush() PyPythonTask (forwarding disabled)")
847            })
848            .await;
849
850            // Await the returned PyPythonTask's future outside the
851            // GIL.
852            let flush_result = flush_task
853                .await_unit()
854                .await
855                .expect("flush failed (forwarding disabled)");
856
857            let _ = flush_result;
858            drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
859        }
860
861        // Case 2: forwarding enabled => `forwarder_mesh.is_some()`.
862        {
863            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
864
865            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
866                .expect("spawn PyPythonTask (forwarding enabled)");
867
868            let client_py: Py<LoggingMeshClient> = client_task
869                .await_py()
870                .await
871                .expect("spawn failed (forwarding enabled)");
872
873            // Call flush() to exercise the barrier path, and pull the
874            // PyPythonTask out.
875            let flush_task = monarch_with_gil(|py| {
876                client_py
877                    .borrow(py)
878                    .flush(&py_instance)
879                    .expect("flush() PyPythonTask (forwarding enabled)")
880            })
881            .await;
882
883            // Await the returned PyPythonTask's future outside the
884            // GIL.
885            let flush_result = flush_task
886                .await_unit()
887                .await
888                .expect("flush failed (forwarding enabled)");
889
890            let _ = flush_result;
891            drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
892        }
893
894        host_mesh.shutdown(&instance).await.expect("host shutdown");
895    }
896}