Skip to main content

monarch_hyperactor/
logging.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(unsafe_op_in_unsafe_fn)]
10
11use std::ops::Deref;
12use std::sync::Arc;
13use std::sync::atomic::AtomicUsize;
14use std::sync::atomic::Ordering;
15
16use anyhow::Result;
17use async_trait::async_trait;
18use hyperactor::Actor;
19use hyperactor::ActorHandle;
20use hyperactor::Bind;
21use hyperactor::Context;
22use hyperactor::Endpoint as _;
23use hyperactor::HandleClient;
24use hyperactor::Handler;
25use hyperactor::Instance;
26use hyperactor::RefClient;
27use hyperactor::RemoteSpawn;
28use hyperactor::Unbind;
29use hyperactor::context;
30use hyperactor_config::Flattrs;
31use hyperactor_mesh::ActorMesh;
32use hyperactor_mesh::actor_mesh::ActorMeshRef;
33use hyperactor_mesh::bootstrap::MESH_ENABLE_LOG_FORWARDING;
34use hyperactor_mesh::logging::LogClientActor;
35use hyperactor_mesh::logging::LogClientMessage;
36use hyperactor_mesh::logging::LogForwardActor;
37use hyperactor_mesh::logging::LogForwardMessage;
38use monarch_types::SerializablePyErr;
39use ndslice::View;
40use pyo3::Bound;
41use pyo3::prelude::*;
42use pyo3::types::PyModule;
43use pyo3::types::PyString;
44use serde::Deserialize;
45use serde::Serialize;
46use typeuri::Named;
47
48use crate::context::PyInstance;
49use crate::proc::PyActorAddr;
50use crate::proc_mesh::PyProcMesh;
51use crate::pytokio::PyPythonTask;
52use crate::runtime::monarch_with_gil;
53
54#[derive(
55    Debug,
56    Clone,
57    Serialize,
58    Deserialize,
59    Named,
60    Handler,
61    HandleClient,
62    RefClient,
63    Bind,
64    Unbind
65)]
66pub enum LoggerRuntimeMessage {
67    SetLogging { level: u8 },
68}
69
70/// Simple Rust actor that invokes python logger APIs. It needs a python runtime.
71#[derive(Debug)]
72#[hyperactor::export(handlers = [LoggerRuntimeMessage {cast = true}])]
73#[hyperactor::spawnable]
74pub struct LoggerRuntimeActor {
75    logger: Arc<Py<PyAny>>,
76}
77
78impl LoggerRuntimeActor {
79    fn get_logger(py: Python) -> PyResult<Py<PyAny>> {
80        // Import the Python AutoReloader class
81        let logging_module = py.import("logging")?;
82        let logger = logging_module.call_method0("getLogger")?;
83
84        Ok(logger.into())
85    }
86
87    fn set_logger_level(py: Python, logger: &Py<PyAny>, level: u8) -> PyResult<()> {
88        let logger = logger.bind(py);
89        logger.call_method1("setLevel", (level,))?;
90        Ok(())
91    }
92}
93#[async_trait]
94impl Actor for LoggerRuntimeActor {
95    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
96        this.set_system();
97        Ok(())
98    }
99}
100
101#[async_trait]
102impl RemoteSpawn for LoggerRuntimeActor {
103    type Params = ();
104
105    async fn new(_: (), _environment: Flattrs) -> Result<Self, anyhow::Error> {
106        let logger =
107            monarch_with_gil(|py| Self::get_logger(py).map_err(SerializablePyErr::from_fn(py)))
108                .await?;
109        Ok(Self {
110            logger: Arc::new(logger),
111        })
112    }
113}
114
115#[async_trait]
116#[hyperactor::handle(LoggerRuntimeMessage)]
117impl LoggerRuntimeMessageHandler for LoggerRuntimeActor {
118    async fn set_logging(&mut self, _cx: &Context<Self>, level: u8) -> Result<(), anyhow::Error> {
119        let logger: Arc<_> = self.logger.clone();
120        monarch_with_gil(|py| {
121            Self::set_logger_level(py, logger.as_ref(), level)
122                .map_err(SerializablePyErr::from_fn(py))
123        })
124        .await?;
125        Ok(())
126    }
127}
128
129/// `LoggingMeshClient` is the Python-facing handle for distributed
130/// logging over a `ProcMesh`.
131///
132/// Calling `spawn(...)` builds three pieces of logging infra:
133///
134///   - `client_actor`: a single `LogClientActor` running in the
135///     *local* process. It aggregates forwarded stdout/stderr,
136///     batches it, and coordinates sync flush barriers.
137///
138///   - `forwarder_mesh`: (optional) an `ActorMesh<LogForwardActor>`
139///     with one actor per remote proc. Each `LogForwardActor` sits in
140///     that proc and forwards its stdout/stderr back to the client.
141///     This mesh only exists if `MESH_ENABLE_LOG_FORWARDING` was `true`
142///     at startup; otherwise it's `None` and we never spawn any
143///     forwarders.
144///
145///   - `logger_mesh`: an `ActorMesh<LoggerRuntimeActor>` with one
146///     actor per remote proc. Each `LoggerRuntimeActor` controls that
147///     proc's Python logging runtime (log level, handlers, etc.).
148///     This mesh is always created, even if forwarding is disabled.
149///
150/// The Python object you get back holds references to all of this so
151/// that you can:
152///   - toggle streaming vs "stay quiet" (`set_mode(...)`),
153///   - adjust the per-proc Python log level (`set_mode(...)`),
154///   - force a sync flush of forwarded output and wait for completion
155///     (`flush(...)`).
156///
157/// Drop semantics:
158///   Dropping the Python handle runs `Drop` on this Rust struct,
159///   which drains/stops the local `LogClientActor` but does *not*
160///   synchronously tear down the per-proc meshes. The remote
161///   `LogForwardActor` / `LoggerRuntimeActor` instances keep running
162///   until the remote procs themselves are shut down (e.g. via
163///   `host_mesh.shutdown(...)` in tests).
164#[pyclass(
165    frozen,
166    name = "LoggingMeshClient",
167    module = "monarch._rust_bindings.monarch_hyperactor.logging"
168)]
169pub struct LoggingMeshClient {
170    // Per-proc LogForwardActor mesh (optional). When enabled, each
171    // remote proc forwards its stdout/stderr back to the client. This
172    // actor does not interact with the embedded Python runtime.
173    forwarder_mesh: Option<ActorMesh<LogForwardActor>>,
174
175    // Per-proc LoggerRuntimeActor mesh. One LoggerRuntimeActor runs
176    // on every proc in the mesh and is responsible for driving that
177    // proc's Python logging configuration (log level, handlers,
178    // etc.).
179    //
180    // `set_mode(..)` always broadcasts the requested log level to
181    // this mesh, regardless of whether stdout/stderr forwarding is
182    // enabled.
183    //
184    // Even on a proc that isn't meaningfully running Python code, we
185    // still spawn LoggerRuntimeActor and it will still apply the new
186    // level to that proc's Python logger. In that case, updating the
187    // level may have no visible effect simply because nothing on that
188    // proc ever emits logs through Python's `logging` module.
189    logger_mesh: ActorMesh<LoggerRuntimeActor>,
190
191    // Client-side LogClientActor. Lives in the client process;
192    // receives forwarded output, aggregates and buffers it, and
193    // coordinates sync flush barriers.
194    client_actor: ActorHandle<LogClientActor>,
195}
196
197impl LoggingMeshClient {
198    /// Drive a synchronous "drain all logs now" barrier across the
199    /// mesh.
200    ///
201    /// Protocol:
202    ///   1. Tell the local `LogClientActor` we're starting a sync
203    ///      flush. We give it:
204    ///      - how many procs we expect to hear from
205    ///        (`expected_procs`),
206    ///      - a `reply` port it will use to signal completion,
207    ///      - a `version` port it will use to hand us a flush version
208    ///        token. After this send, the client_actor is now in "sync
209    ///        flush vN" mode.
210    ///
211    ///   2. Wait for that version token from the client. This tells
212    ///      us which flush epoch we're coordinating
213    ///      (`version_rx.recv()`).
214    ///
215    ///   3. Broadcast `ForceSyncFlush { version }` to every
216    ///      `LogForwardActor` in the `forwarder_mesh`. Each forwarder
217    ///      tells its proc-local logger/forwarding loop: "flush
218    ///      everything you have for this version now, then report
219    ///      back."
220    ///
221    ///   4. Wait on `reply_rx`. The `LogClientActor` only replies
222    ///      once it has:
223    ///      - received the per-proc sync points for this version from
224    ///        all forwarders,
225    ///      - emitted/forwarded their buffered output,
226    ///      - and finished flushing its own buffers.
227    ///
228    /// When this returns `Ok(())`, all stdout/stderr that existed at
229    /// the moment we kicked off the flush has been forwarded to the
230    /// client and drained. This is used by
231    /// `LoggingMeshClient.flush()`.
232    async fn flush_internal(
233        cx: &impl context::Actor,
234        client_actor: ActorHandle<LogClientActor>,
235        forwarder_mesh: ActorMeshRef<LogForwardActor>,
236    ) -> Result<(), anyhow::Error> {
237        let (reply_tx, reply_rx) = cx.instance().open_once_port::<()>();
238        let (version_tx, version_rx) = cx.instance().open_once_port::<u64>();
239
240        // First initialize a sync flush.
241        client_actor.post(
242            cx,
243            LogClientMessage::StartSyncFlush {
244                expected_procs: forwarder_mesh.region().num_ranks(),
245                reply: reply_tx.bind(),
246                version: version_tx.bind(),
247            },
248        );
249
250        let version = version_rx.recv().await?;
251
252        // Then ask all the flushers to ask the log forwarders to sync
253        // flush
254        forwarder_mesh.cast(cx, LogForwardMessage::ForceSyncFlush { version })?;
255
256        // Finally the forwarder will send sync point back to the
257        // client, flush, and return.
258        reply_rx.recv().await?;
259
260        Ok(())
261    }
262}
263
264#[pymethods]
265impl LoggingMeshClient {
266    /// Initialize logging for a `ProcMesh` and return a
267    /// `LoggingMeshClient`.
268    ///
269    /// This wires up three pieces of logging infrastructure:
270    ///
271    /// 1. A single `LogClientActor` in the *client* process. This
272    ///    actor receives forwarded stdout/stderr, buffers and
273    ///    aggregates it, and coordinates sync flush barriers.
274    ///
275    /// 2. (Optional) A `LogForwardActor` on every remote proc in the
276    ///    mesh. These forwarders read that proc's stdout/stderr and
277    ///    stream it back to the client. We only spawn this mesh if
278    ///    `MESH_ENABLE_LOG_FORWARDING` was `true` in the config. If
279    ///    forwarding is disabled at startup, we do not spawn these
280    ///    actors and `forwarder_mesh` will be `None`.
281    ///
282    /// 3. A `LoggerRuntimeActor` on every remote proc in the mesh.
283    ///    This actor controls the Python logging runtime (log level,
284    ///    handlers, etc.) in that process. This is always spawned,
285    ///    even if log forwarding is disabled.
286    ///
287    /// The returned `LoggingMeshClient` holds handles to those
288    /// actors. Later, `set_mode(...)` can adjust per-proc log level
289    /// and (if forwarding was enabled) toggle whether remote output
290    /// is actually streamed back to the client. If forwarding was
291    /// disabled by config, requests to enable streaming will fail.
292    #[staticmethod]
293    fn spawn(instance: &PyInstance, proc_mesh: &PyProcMesh) -> PyResult<PyPythonTask> {
294        let proc_mesh = proc_mesh.mesh_ref()?;
295        let instance = instance.clone();
296
297        PyPythonTask::new(async move {
298            // 1. Spawn the client-side coordinator actor (lives in
299            // the caller's process).
300            static LOG_CLIENT_COUNTER: AtomicUsize = AtomicUsize::new(0);
301            let id = LOG_CLIENT_COUNTER.fetch_add(1, Ordering::Relaxed);
302            let name = if id == 0 {
303                "log_client".to_string()
304            } else {
305                format!("log_client_{}", id)
306            };
307            let client_actor: ActorHandle<LogClientActor> =
308                instance.proc().spawn(&name, LogClientActor::default())?;
309            let client_actor_ref = client_actor.bind();
310
311            // Read config to decide if we stand up per-proc
312            // stdout/stderr forwarding.
313            let forwarding_enabled = hyperactor_config::global::get(MESH_ENABLE_LOG_FORWARDING);
314
315            // 2. Optionally spawn per-proc `LogForwardActor` mesh
316            // (stdout/stderr forwarders).
317            let forwarder_mesh = if forwarding_enabled {
318                // Spawn a `LogFwdActor` on every proc.
319                let mesh = proc_mesh
320                    .spawn(instance.deref(), "log_forwarder", &client_actor_ref)
321                    .await
322                    .map_err(anyhow::Error::from)?;
323
324                Some(mesh)
325            } else {
326                None
327            };
328
329            // 3. Always spawn a `LoggerRuntimeActor` on every proc.
330            let logger_mesh = proc_mesh
331                .spawn(instance.deref(), "logger", &())
332                .await
333                .map_err(anyhow::Error::from)?;
334
335            Ok(Self {
336                forwarder_mesh,
337                logger_mesh,
338                client_actor,
339            })
340        })
341    }
342
343    /// Update logging behavior for this mesh.
344    ///
345    /// `stream_to_client` controls whether remote procs actively
346    /// stream their stdout/stderr back to the client process.
347    ///
348    /// - If log forwarding was enabled at startup, `forwarder_mesh`
349    ///   is `Some` and we propagate this flag to every per-proc
350    ///   `LogForwardActor`.
351    /// - If log forwarding was disabled at startup, `forwarder_mesh`
352    ///   is `None`.
353    ///   In that case:
354    ///     * requesting `stream_to_client = false` is a no-op
355    ///       (accepted),
356    ///     * requesting `stream_to_client = true` is rejected,
357    ///       because we did not spawn forwarders and we don't
358    ///       dynamically create them later.
359    ///
360    /// `aggregate_window_sec` configures how the client-side
361    /// `LogClientActor` batches forwarded output. It is only
362    /// meaningful when streaming is enabled. Calling this with
363    /// `Some(..)` while `stream_to_client == false` is invalid and
364    /// returns an error.
365    ///
366    /// `level` is the desired Python logging level. We always
367    /// broadcast this to the per-proc `LoggerRuntimeActor` mesh so
368    /// each remote process can update its own Python logger
369    /// configuration, regardless of whether stdout/stderr forwarding
370    /// is active.
371    fn set_mode(
372        &self,
373        instance: &PyInstance,
374        stream_to_client: bool,
375        aggregate_window_sec: Option<u64>,
376        level: u8,
377    ) -> PyResult<()> {
378        // We can't ask for an aggregation window if we're not
379        // streaming.
380        if aggregate_window_sec.is_some() && !stream_to_client {
381            return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
382                "cannot set aggregate window without streaming to client".to_string(),
383            ));
384        }
385
386        // Handle the forwarder side (stdout/stderr streaming back to
387        // client).
388        match (&self.forwarder_mesh, stream_to_client) {
389            // Forwarders exist (config enabled at startup). We can
390            // toggle live.
391            (Some(fwd_mesh), _) => {
392                fwd_mesh
393                    .cast(
394                        instance.deref(),
395                        LogForwardMessage::SetMode { stream_to_client },
396                    )
397                    .map_err(|e| {
398                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string())
399                    })?;
400            }
401
402            // Forwarders were never spawned (global forwarding
403            // disabled) and the caller is asking NOT to stream.
404            // That's effectively a no-op so we silently accept.
405            (None, false) => {
406                // Nothing to do.
407            }
408
409            // Forwarders were never spawned, but caller is asking to
410            // stream. We can't satisfy this request without
411            // re-spawning infra, which we deliberately don't do at
412            // runtime.
413            (None, true) => {
414                // return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
415                //     "log forwarding disabled by config at startup; cannot enable streaming_to_client",
416                // ));
417            }
418        }
419
420        // Always update the per-proc Python logging level.
421        self.logger_mesh
422            .cast(instance.deref(), LoggerRuntimeMessage::SetLogging { level })
423            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
424
425        // Always update the client actor's aggregation window.
426        self.client_actor.post(
427            instance.deref(),
428            LogClientMessage::SetAggregate {
429                aggregate_window_sec,
430            },
431        );
432
433        Ok(())
434    }
435
436    /// Force a sync flush of remote stdout/stderr back to the client,
437    /// and wait for completion.
438    ///
439    /// If log forwarding was disabled at startup (so we never spawned
440    /// any `LogForwardActor`s), this becomes a no-op success: there's
441    /// nothing to flush from remote procs in that mode, and we don't
442    /// try to manufacture it dynamically.
443    fn flush(&self, instance: &PyInstance) -> PyResult<PyPythonTask> {
444        let forwarder_mesh_opt = self
445            .forwarder_mesh
446            .as_ref()
447            .map(|mesh| mesh.deref().clone());
448        let client_actor = self.client_actor.clone();
449        let instance = instance.clone();
450
451        PyPythonTask::new(async move {
452            // If there's no forwarer mesh (forwarding disabled by
453            // config), we just succeed immediately.
454            let Some(forwarder_mesh) = forwarder_mesh_opt else {
455                return Ok(());
456            };
457
458            Self::flush_internal(instance.deref(), client_actor, forwarder_mesh)
459                .await
460                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))
461        })
462    }
463}
464
465// NOTE ON LIFECYCLE / CLEANUP
466//
467// `LoggingMeshClient` is a thin owner for three pieces of logging
468// infra:
469//
470//   - `client_actor`: a single `LogClientActor` in the *local*
471//     process.
472//   - `forwarder_mesh`: (optional) an `ActorMesh<LogForwardActor>`
473//     with one actor per remote proc in the `ProcMesh`, responsible for
474//     forwarding that proc's stdout/stderr back to the client.
475//   - `logger_mesh`: an `ActorMesh<LoggerRuntimeActor>` with one
476//     actor per remote proc, responsible for driving that proc's Python
477//     logging configuration.
478//
479// The Python-facing handle we hand back to callers is a
480// `Py<LoggingMeshClient>`. When that handle is dropped (or goes out
481// of scope in a test), PyO3 will run `Drop` for `LoggingMeshClient`.
482//
483// Important:
484//
485// - In `Drop` we *only* call `drain_and_stop()` on the local
486//   `LogClientActor`. This asks the client-side aggregator to
487//   flush/stop so we don't leave a local task running.
488// - We do NOT synchronously tear down the per-proc meshes here.
489//   Dropping `forwarder_mesh` / `logger_mesh` just releases our
490//   handles; the actual `LogForwardActor` / `LoggerRuntimeActor`
491//   instances keep running on the remote procs until those procs are
492//   shut down.
493//
494// This is fine in tests because we always shut the world down
495// afterward via `host_mesh.shutdown(&instance)`, which tears down the
496// spawned procs and all actors running in them. In other words:
497//
498//   drop(Py<LoggingMeshClient>)
499//     → stops the local `LogClientActor`, drops mesh handles
500//   host_mesh.shutdown(...)
501//     → kills the remote procs, which takes out the per-proc actors
502//
503// If you reuse this type outside tests, keep in mind that simply
504// dropping `LoggingMeshClient` does *not* on its own tear down the
505// remote logging actors; it only stops the local client actor.
506impl Drop for LoggingMeshClient {
507    fn drop(&mut self) {
508        // Use catch_unwind to guard against panics during interpreter shutdown.
509        // During Python teardown, the tokio runtime or channels may already be
510        // deallocated, and attempting to drain could cause a segfault.
511        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
512            match self.client_actor.drain_and_stop("logging client shutdown") {
513                Ok(_) => {}
514                Err(e) => {
515                    // it is ok as during shutdown, the channel might already be closed
516                    tracing::debug!("error draining logging client actor during shutdown: {}", e);
517                }
518            }
519        }));
520    }
521}
522
523/// Turns a python exception into a string with a traceback. If the traceback doesn't
524/// exist or can't be formatted, returns just the exception message.
525pub(crate) fn format_traceback(py: Python<'_>, err: &PyErr) -> String {
526    let traceback = err.traceback(py);
527    if traceback.is_some() {
528        let inner = || -> PyResult<String> {
529            let formatted = py
530                .import("traceback")?
531                .call_method1("format_exception", (err.clone_ref(py),))?;
532            Ok(PyString::new(py, "")
533                .call_method1("join", (formatted,))?
534                .to_string())
535        };
536        match inner() {
537            Ok(s) => s,
538            Err(e) => format!("{}: no traceback {}", err, e),
539        }
540    } else {
541        err.to_string()
542    }
543}
544
545#[pyfunction]
546fn log_endpoint_exception(
547    py: Python<'_>,
548    e: Py<PyAny>,
549    endpoint: Py<PyAny>,
550    actor_id: PyActorAddr,
551) {
552    let pyerr = PyErr::from_value(e.into_bound(py));
553    let exception_str = format_traceback(py, &pyerr);
554    let endpoint = endpoint.into_bound(py).to_string();
555    tracing::info!(
556        actor_id = actor_id.inner.to_string(),
557        %endpoint,
558        "exception occurred in endpoint: {}",
559        exception_str,
560    );
561}
562
563/// Register the Python-facing types for this module.
564///
565/// `pyo3` calls this when building `monarch._rust_bindings...`. We
566/// expose `LoggingMeshClient` so that Python can construct it and
567/// call its methods (`spawn`, `set_mode`, `flush`, ...).
568pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
569    module.add_class::<LoggingMeshClient>()?;
570    let log_endpoint_exception = wrap_pyfunction!(log_endpoint_exception, module.py())?;
571    log_endpoint_exception.setattr(
572        "__module__",
573        "monarch._rust_bindings.monarch_hyperactor.logging",
574    )?;
575    module.add_function(log_endpoint_exception)?;
576    Ok(())
577}
578
579#[cfg(test)]
580mod tests {
581    use anyhow::Result;
582    use hyperactor::Instance;
583    use hyperactor::channel::ChannelTransport;
584    use hyperactor::proc::Proc;
585    use hyperactor_mesh::ProcMesh;
586    use hyperactor_mesh::host_mesh::HostMesh;
587    use ndslice::Extent;
588    use ndslice::View; // .region(), .num_ranks() etc.
589
590    use super::*;
591    use crate::actor::PythonActor;
592    use crate::pytokio::AwaitPyExt;
593    use crate::pytokio::ensure_python;
594
595    /// Bring up a minimal "world" suitable for integration-style
596    /// tests.
597    pub async fn test_world() -> Result<(Proc, Instance<PythonActor>, HostMesh, ProcMesh)> {
598        ensure_python();
599
600        let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
601            .expect("failed to start root Proc");
602
603        let ai = proc
604            .actor_instance("client")
605            .expect("failed to create proc Instance");
606        let instance = ai.instance;
607
608        let host_mesh = HostMesh::local_with_bootstrap(
609            crate::testresource::get("monarch/monarch_hyperactor/bootstrap").into(),
610        )
611        .await
612        .expect("failed to bootstrap HostMesh");
613
614        let proc_mesh = host_mesh
615            .spawn(&instance, "p0", Extent::unity(), None, None)
616            .await
617            .expect("failed to spawn ProcMesh");
618
619        Ok((proc, instance, host_mesh, proc_mesh))
620    }
621
622    #[cfg_attr(not(target_os = "linux"), ignore = "linux-only")]
623    #[tokio::test]
624    async fn test_world_smoke() {
625        let (proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
626
627        assert_eq!(
628            host_mesh.region().num_ranks(),
629            1,
630            "should allocate exactly one host"
631        );
632        assert_eq!(
633            proc_mesh.region().num_ranks(),
634            1,
635            "should spawn exactly one proc"
636        );
637        assert_eq!(
638            instance.self_addr().proc_addr(),
639            proc.proc_addr().clone(),
640            "returned Instance<()> should be bound to the root Proc"
641        );
642
643        host_mesh.shutdown(&instance).await.expect("host shutdown");
644    }
645
646    #[cfg_attr(not(target_os = "linux"), ignore = "linux-only")]
647    #[tokio::test]
648    async fn spawn_respects_forwarding_flag() {
649        let (_, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
650
651        let py_instance = PyInstance::from(&instance);
652        let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
653        let lock = hyperactor_config::global::lock();
654
655        // Case 1: forwarding disabled => `forwarder_mesh` should be `None`.
656        {
657            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
658
659            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
660                .expect("spawn PyPythonTask (forwarding disabled)");
661
662            let client_py: Py<LoggingMeshClient> = client_task
663                .await_py()
664                .await
665                .expect("spawn failed (forwarding disabled)");
666
667            monarch_with_gil(|py| {
668                let client_ref = client_py.borrow(py);
669                assert!(
670                    client_ref.forwarder_mesh.is_none(),
671                    "forwarder_mesh should be None when forwarding disabled"
672                );
673            })
674            .await;
675
676            drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
677        }
678
679        // Case 2: forwarding enabled => `forwarder_mesh` should be `Some`.
680        {
681            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
682
683            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
684                .expect("spawn PyPythonTask (forwarding enabled)");
685
686            let client_py: Py<LoggingMeshClient> = client_task
687                .await_py()
688                .await
689                .expect("spawn failed (forwarding enabled)");
690
691            monarch_with_gil(|py| {
692                let client_ref = client_py.borrow(py);
693                assert!(
694                    client_ref.forwarder_mesh.is_some(),
695                    "forwarder_mesh should be Some(..) when forwarding is enabled"
696                );
697            })
698            .await;
699
700            drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
701        }
702
703        host_mesh.shutdown(&instance).await.expect("host shutdown");
704    }
705
706    #[cfg_attr(not(target_os = "linux"), ignore = "linux-only")]
707    #[tokio::test]
708    async fn set_mode_behaviors() {
709        let (_proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
710
711        let py_instance = PyInstance::from(&instance);
712        let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
713        let lock = hyperactor_config::global::lock();
714
715        // Case 1: forwarding disabled => `forwarder_mesh.is_none()`.
716        {
717            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
718
719            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
720                .expect("spawn PyPythonTask (forwarding disabled)");
721
722            let client_py: Py<LoggingMeshClient> = client_task
723                .await_py()
724                .await
725                .expect("spawn failed (forwarding disabled)");
726
727            monarch_with_gil(|py| {
728                let client_ref = client_py.borrow(py);
729
730                // (a) stream_to_client = false, no aggregate window
731                // -> OK
732                let res = client_ref.set_mode(&py_instance, false, None, 10);
733                assert!(res.is_ok(), "expected Ok(..), got {res:?}");
734
735                // (b) stream_to_client = false,
736                // aggregate_window_sec.is_some() -> Err = Some(..) ->
737                // Err
738                let res = client_ref.set_mode(&py_instance, false, Some(1), 10);
739                assert!(
740                    res.is_err(),
741                    "expected Err(..) for window without streaming"
742                );
743                if let Err(e) = res {
744                    let msg = e.to_string();
745                    assert!(
746                        msg.contains("cannot set aggregate window without streaming to client"),
747                        "unexpected err for aggregate_window without streaming: {msg}"
748                    );
749                }
750
751                /*
752                // Update (SF: 2025, 11, 13): We now ignore stream to client requests if
753                // log forwarding is enabled.
754                // (c) stream_to_client = true when forwarding was
755                //     never spawned -> Err
756                let res = client_ref.set_mode(&py_instance, true, None, 10);
757                assert!(
758                    res.is_err(),
759                    "expected Err(..) when enabling streaming but no forwarders"
760                );
761                if let Err(e) = res {
762                    let msg = e.to_string();
763                    assert!(
764                        msg.contains("log forwarding disabled by config at startup"),
765                        "unexpected err when enabling streaming with no forwarders: {msg}"
766                    );
767                }
768                */
769            })
770            .await;
771
772            drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
773        }
774
775        // Case 2: forwarding enabled => `forwarder_mesh.is_some()`.
776        {
777            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
778
779            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
780                .expect("spawn PyPythonTask (forwarding enabled)");
781
782            let client_py: Py<LoggingMeshClient> = client_task
783                .await_py()
784                .await
785                .expect("spawn failed (forwarding enabled)");
786
787            monarch_with_gil(|py| {
788                let client_ref = client_py.borrow(py);
789
790                // (d) stream_to_client = true, aggregate_window_sec =
791                //     Some(..) -> OK now that we *do* have forwarders,
792                //     enabling streaming should succeed.
793                let res = client_ref.set_mode(&py_instance, true, Some(2), 20);
794                assert!(
795                    res.is_ok(),
796                    "expected Ok(..) enabling streaming w/ window: {res:?}"
797                );
798
799                // (e) aggregate_window_sec = Some(..) but
800                //     stream_to_client = false -> still Err (this
801                //     rule doesn't care about forwarding being
802                //     enabled or not).
803                let res = client_ref.set_mode(&py_instance, false, Some(2), 20);
804                assert!(
805                    res.is_err(),
806                    "expected Err(..) for window without streaming even w/ forwarders"
807                );
808                if let Err(e) = res {
809                    let msg = e.to_string();
810                    assert!(
811                        msg.contains("cannot set aggregate window without streaming to client"),
812                        "unexpected err when setting window but disabling streaming: {msg}"
813                    );
814                }
815            })
816            .await;
817
818            drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
819        }
820
821        host_mesh.shutdown(&instance).await.expect("host shutdown");
822    }
823
824    #[cfg_attr(not(target_os = "linux"), ignore = "linux-only")]
825    #[tokio::test]
826    async fn flush_behaviors() {
827        let (_proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
828
829        let py_instance = PyInstance::from(&instance);
830        let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
831        let lock = hyperactor_config::global::lock();
832
833        // Case 1: forwarding disabled => `forwarder_mesh.is_none()`.
834        {
835            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
836
837            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
838                .expect("spawn PyPythonTask (forwarding disabled)");
839
840            let client_py: Py<LoggingMeshClient> = client_task
841                .await_py()
842                .await
843                .expect("spawn failed (forwarding disabled)");
844
845            // Call flush() and bring the PyPythonTask back out.
846            let flush_task = monarch_with_gil(|py| {
847                let client_ref = client_py.borrow(py);
848                client_ref
849                    .flush(&py_instance)
850                    .expect("flush() PyPythonTask (forwarding disabled)")
851            })
852            .await;
853
854            // Await the returned PyPythonTask's future outside the
855            // GIL.
856            flush_task
857                .await_unit()
858                .await
859                .expect("flush failed (forwarding disabled)");
860
861            drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
862        }
863
864        // Case 2: forwarding enabled => `forwarder_mesh.is_some()`.
865        {
866            let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
867
868            let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
869                .expect("spawn PyPythonTask (forwarding enabled)");
870
871            let client_py: Py<LoggingMeshClient> = client_task
872                .await_py()
873                .await
874                .expect("spawn failed (forwarding enabled)");
875
876            // Call flush() to exercise the barrier path, and pull the
877            // PyPythonTask out.
878            let flush_task = monarch_with_gil(|py| {
879                client_py
880                    .borrow(py)
881                    .flush(&py_instance)
882                    .expect("flush() PyPythonTask (forwarding enabled)")
883            })
884            .await;
885
886            // Await the returned PyPythonTask's future outside the
887            // GIL.
888            flush_task
889                .await_unit()
890                .await
891                .expect("flush failed (forwarding enabled)");
892
893            drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
894        }
895
896        host_mesh.shutdown(&instance).await.expect("host shutdown");
897    }
898}