monarch_hyperactor/logging.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(unsafe_op_in_unsafe_fn)]
10
11use std::ops::Deref;
12use std::sync::Arc;
13use std::sync::atomic::AtomicUsize;
14use std::sync::atomic::Ordering;
15
16use anyhow::Result;
17use async_trait::async_trait;
18use hyperactor::Actor;
19use hyperactor::ActorHandle;
20use hyperactor::Bind;
21use hyperactor::Context;
22use hyperactor::Endpoint as _;
23use hyperactor::HandleClient;
24use hyperactor::Handler;
25use hyperactor::Instance;
26use hyperactor::RefClient;
27use hyperactor::RemoteSpawn;
28use hyperactor::Unbind;
29use hyperactor::context;
30use hyperactor_config::Flattrs;
31use hyperactor_mesh::ActorMesh;
32use hyperactor_mesh::actor_mesh::ActorMeshRef;
33use hyperactor_mesh::bootstrap::MESH_ENABLE_LOG_FORWARDING;
34use hyperactor_mesh::logging::LogClientActor;
35use hyperactor_mesh::logging::LogClientMessage;
36use hyperactor_mesh::logging::LogForwardActor;
37use hyperactor_mesh::logging::LogForwardMessage;
38use monarch_types::SerializablePyErr;
39use ndslice::View;
40use pyo3::Bound;
41use pyo3::prelude::*;
42use pyo3::types::PyModule;
43use pyo3::types::PyString;
44use serde::Deserialize;
45use serde::Serialize;
46use typeuri::Named;
47
48use crate::context::PyInstance;
49use crate::proc::PyActorAddr;
50use crate::proc_mesh::PyProcMesh;
51use crate::pytokio::PyPythonTask;
52use crate::runtime::monarch_with_gil;
53
54#[derive(
55 Debug,
56 Clone,
57 Serialize,
58 Deserialize,
59 Named,
60 Handler,
61 HandleClient,
62 RefClient,
63 Bind,
64 Unbind
65)]
66pub enum LoggerRuntimeMessage {
67 SetLogging { level: u8 },
68}
69
70/// Simple Rust actor that invokes python logger APIs. It needs a python runtime.
71#[derive(Debug)]
72#[hyperactor::export(handlers = [LoggerRuntimeMessage {cast = true}])]
73#[hyperactor::spawnable]
74pub struct LoggerRuntimeActor {
75 logger: Arc<Py<PyAny>>,
76}
77
78impl LoggerRuntimeActor {
79 fn get_logger(py: Python) -> PyResult<Py<PyAny>> {
80 // Import the Python AutoReloader class
81 let logging_module = py.import("logging")?;
82 let logger = logging_module.call_method0("getLogger")?;
83
84 Ok(logger.into())
85 }
86
87 fn set_logger_level(py: Python, logger: &Py<PyAny>, level: u8) -> PyResult<()> {
88 let logger = logger.bind(py);
89 logger.call_method1("setLevel", (level,))?;
90 Ok(())
91 }
92}
93#[async_trait]
94impl Actor for LoggerRuntimeActor {
95 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
96 this.set_system();
97 Ok(())
98 }
99}
100
101#[async_trait]
102impl RemoteSpawn for LoggerRuntimeActor {
103 type Params = ();
104
105 async fn new(_: (), _environment: Flattrs) -> Result<Self, anyhow::Error> {
106 let logger =
107 monarch_with_gil(|py| Self::get_logger(py).map_err(SerializablePyErr::from_fn(py)))
108 .await?;
109 Ok(Self {
110 logger: Arc::new(logger),
111 })
112 }
113}
114
115#[async_trait]
116#[hyperactor::handle(LoggerRuntimeMessage)]
117impl LoggerRuntimeMessageHandler for LoggerRuntimeActor {
118 async fn set_logging(&mut self, _cx: &Context<Self>, level: u8) -> Result<(), anyhow::Error> {
119 let logger: Arc<_> = self.logger.clone();
120 monarch_with_gil(|py| {
121 Self::set_logger_level(py, logger.as_ref(), level)
122 .map_err(SerializablePyErr::from_fn(py))
123 })
124 .await?;
125 Ok(())
126 }
127}
128
129/// `LoggingMeshClient` is the Python-facing handle for distributed
130/// logging over a `ProcMesh`.
131///
132/// Calling `spawn(...)` builds three pieces of logging infra:
133///
134/// - `client_actor`: a single `LogClientActor` running in the
135/// *local* process. It aggregates forwarded stdout/stderr,
136/// batches it, and coordinates sync flush barriers.
137///
138/// - `forwarder_mesh`: (optional) an `ActorMesh<LogForwardActor>`
139/// with one actor per remote proc. Each `LogForwardActor` sits in
140/// that proc and forwards its stdout/stderr back to the client.
141/// This mesh only exists if `MESH_ENABLE_LOG_FORWARDING` was `true`
142/// at startup; otherwise it's `None` and we never spawn any
143/// forwarders.
144///
145/// - `logger_mesh`: an `ActorMesh<LoggerRuntimeActor>` with one
146/// actor per remote proc. Each `LoggerRuntimeActor` controls that
147/// proc's Python logging runtime (log level, handlers, etc.).
148/// This mesh is always created, even if forwarding is disabled.
149///
150/// The Python object you get back holds references to all of this so
151/// that you can:
152/// - toggle streaming vs "stay quiet" (`set_mode(...)`),
153/// - adjust the per-proc Python log level (`set_mode(...)`),
154/// - force a sync flush of forwarded output and wait for completion
155/// (`flush(...)`).
156///
157/// Drop semantics:
158/// Dropping the Python handle runs `Drop` on this Rust struct,
159/// which drains/stops the local `LogClientActor` but does *not*
160/// synchronously tear down the per-proc meshes. The remote
161/// `LogForwardActor` / `LoggerRuntimeActor` instances keep running
162/// until the remote procs themselves are shut down (e.g. via
163/// `host_mesh.shutdown(...)` in tests).
164#[pyclass(
165 frozen,
166 name = "LoggingMeshClient",
167 module = "monarch._rust_bindings.monarch_hyperactor.logging"
168)]
169pub struct LoggingMeshClient {
170 // Per-proc LogForwardActor mesh (optional). When enabled, each
171 // remote proc forwards its stdout/stderr back to the client. This
172 // actor does not interact with the embedded Python runtime.
173 forwarder_mesh: Option<ActorMesh<LogForwardActor>>,
174
175 // Per-proc LoggerRuntimeActor mesh. One LoggerRuntimeActor runs
176 // on every proc in the mesh and is responsible for driving that
177 // proc's Python logging configuration (log level, handlers,
178 // etc.).
179 //
180 // `set_mode(..)` always broadcasts the requested log level to
181 // this mesh, regardless of whether stdout/stderr forwarding is
182 // enabled.
183 //
184 // Even on a proc that isn't meaningfully running Python code, we
185 // still spawn LoggerRuntimeActor and it will still apply the new
186 // level to that proc's Python logger. In that case, updating the
187 // level may have no visible effect simply because nothing on that
188 // proc ever emits logs through Python's `logging` module.
189 logger_mesh: ActorMesh<LoggerRuntimeActor>,
190
191 // Client-side LogClientActor. Lives in the client process;
192 // receives forwarded output, aggregates and buffers it, and
193 // coordinates sync flush barriers.
194 client_actor: ActorHandle<LogClientActor>,
195}
196
197impl LoggingMeshClient {
198 /// Drive a synchronous "drain all logs now" barrier across the
199 /// mesh.
200 ///
201 /// Protocol:
202 /// 1. Tell the local `LogClientActor` we're starting a sync
203 /// flush. We give it:
204 /// - how many procs we expect to hear from
205 /// (`expected_procs`),
206 /// - a `reply` port it will use to signal completion,
207 /// - a `version` port it will use to hand us a flush version
208 /// token. After this send, the client_actor is now in "sync
209 /// flush vN" mode.
210 ///
211 /// 2. Wait for that version token from the client. This tells
212 /// us which flush epoch we're coordinating
213 /// (`version_rx.recv()`).
214 ///
215 /// 3. Broadcast `ForceSyncFlush { version }` to every
216 /// `LogForwardActor` in the `forwarder_mesh`. Each forwarder
217 /// tells its proc-local logger/forwarding loop: "flush
218 /// everything you have for this version now, then report
219 /// back."
220 ///
221 /// 4. Wait on `reply_rx`. The `LogClientActor` only replies
222 /// once it has:
223 /// - received the per-proc sync points for this version from
224 /// all forwarders,
225 /// - emitted/forwarded their buffered output,
226 /// - and finished flushing its own buffers.
227 ///
228 /// When this returns `Ok(())`, all stdout/stderr that existed at
229 /// the moment we kicked off the flush has been forwarded to the
230 /// client and drained. This is used by
231 /// `LoggingMeshClient.flush()`.
232 async fn flush_internal(
233 cx: &impl context::Actor,
234 client_actor: ActorHandle<LogClientActor>,
235 forwarder_mesh: ActorMeshRef<LogForwardActor>,
236 ) -> Result<(), anyhow::Error> {
237 let (reply_tx, reply_rx) = cx.instance().open_once_port::<()>();
238 let (version_tx, version_rx) = cx.instance().open_once_port::<u64>();
239
240 // First initialize a sync flush.
241 client_actor.post(
242 cx,
243 LogClientMessage::StartSyncFlush {
244 expected_procs: forwarder_mesh.region().num_ranks(),
245 reply: reply_tx.bind(),
246 version: version_tx.bind(),
247 },
248 );
249
250 let version = version_rx.recv().await?;
251
252 // Then ask all the flushers to ask the log forwarders to sync
253 // flush
254 forwarder_mesh.cast(cx, LogForwardMessage::ForceSyncFlush { version })?;
255
256 // Finally the forwarder will send sync point back to the
257 // client, flush, and return.
258 reply_rx.recv().await?;
259
260 Ok(())
261 }
262}
263
264#[pymethods]
265impl LoggingMeshClient {
266 /// Initialize logging for a `ProcMesh` and return a
267 /// `LoggingMeshClient`.
268 ///
269 /// This wires up three pieces of logging infrastructure:
270 ///
271 /// 1. A single `LogClientActor` in the *client* process. This
272 /// actor receives forwarded stdout/stderr, buffers and
273 /// aggregates it, and coordinates sync flush barriers.
274 ///
275 /// 2. (Optional) A `LogForwardActor` on every remote proc in the
276 /// mesh. These forwarders read that proc's stdout/stderr and
277 /// stream it back to the client. We only spawn this mesh if
278 /// `MESH_ENABLE_LOG_FORWARDING` was `true` in the config. If
279 /// forwarding is disabled at startup, we do not spawn these
280 /// actors and `forwarder_mesh` will be `None`.
281 ///
282 /// 3. A `LoggerRuntimeActor` on every remote proc in the mesh.
283 /// This actor controls the Python logging runtime (log level,
284 /// handlers, etc.) in that process. This is always spawned,
285 /// even if log forwarding is disabled.
286 ///
287 /// The returned `LoggingMeshClient` holds handles to those
288 /// actors. Later, `set_mode(...)` can adjust per-proc log level
289 /// and (if forwarding was enabled) toggle whether remote output
290 /// is actually streamed back to the client. If forwarding was
291 /// disabled by config, requests to enable streaming will fail.
292 #[staticmethod]
293 fn spawn(instance: &PyInstance, proc_mesh: &PyProcMesh) -> PyResult<PyPythonTask> {
294 let proc_mesh = proc_mesh.mesh_ref()?;
295 let instance = instance.clone();
296
297 PyPythonTask::new(async move {
298 // 1. Spawn the client-side coordinator actor (lives in
299 // the caller's process).
300 static LOG_CLIENT_COUNTER: AtomicUsize = AtomicUsize::new(0);
301 let id = LOG_CLIENT_COUNTER.fetch_add(1, Ordering::Relaxed);
302 let name = if id == 0 {
303 "log_client".to_string()
304 } else {
305 format!("log_client_{}", id)
306 };
307 let client_actor: ActorHandle<LogClientActor> =
308 instance.proc().spawn(&name, LogClientActor::default())?;
309 let client_actor_ref = client_actor.bind();
310
311 // Read config to decide if we stand up per-proc
312 // stdout/stderr forwarding.
313 let forwarding_enabled = hyperactor_config::global::get(MESH_ENABLE_LOG_FORWARDING);
314
315 // 2. Optionally spawn per-proc `LogForwardActor` mesh
316 // (stdout/stderr forwarders).
317 let forwarder_mesh = if forwarding_enabled {
318 // Spawn a `LogFwdActor` on every proc.
319 let mesh = proc_mesh
320 .spawn(instance.deref(), "log_forwarder", &client_actor_ref)
321 .await
322 .map_err(anyhow::Error::from)?;
323
324 Some(mesh)
325 } else {
326 None
327 };
328
329 // 3. Always spawn a `LoggerRuntimeActor` on every proc.
330 let logger_mesh = proc_mesh
331 .spawn(instance.deref(), "logger", &())
332 .await
333 .map_err(anyhow::Error::from)?;
334
335 Ok(Self {
336 forwarder_mesh,
337 logger_mesh,
338 client_actor,
339 })
340 })
341 }
342
343 /// Update logging behavior for this mesh.
344 ///
345 /// `stream_to_client` controls whether remote procs actively
346 /// stream their stdout/stderr back to the client process.
347 ///
348 /// - If log forwarding was enabled at startup, `forwarder_mesh`
349 /// is `Some` and we propagate this flag to every per-proc
350 /// `LogForwardActor`.
351 /// - If log forwarding was disabled at startup, `forwarder_mesh`
352 /// is `None`.
353 /// In that case:
354 /// * requesting `stream_to_client = false` is a no-op
355 /// (accepted),
356 /// * requesting `stream_to_client = true` is rejected,
357 /// because we did not spawn forwarders and we don't
358 /// dynamically create them later.
359 ///
360 /// `aggregate_window_sec` configures how the client-side
361 /// `LogClientActor` batches forwarded output. It is only
362 /// meaningful when streaming is enabled. Calling this with
363 /// `Some(..)` while `stream_to_client == false` is invalid and
364 /// returns an error.
365 ///
366 /// `level` is the desired Python logging level. We always
367 /// broadcast this to the per-proc `LoggerRuntimeActor` mesh so
368 /// each remote process can update its own Python logger
369 /// configuration, regardless of whether stdout/stderr forwarding
370 /// is active.
371 fn set_mode(
372 &self,
373 instance: &PyInstance,
374 stream_to_client: bool,
375 aggregate_window_sec: Option<u64>,
376 level: u8,
377 ) -> PyResult<()> {
378 // We can't ask for an aggregation window if we're not
379 // streaming.
380 if aggregate_window_sec.is_some() && !stream_to_client {
381 return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
382 "cannot set aggregate window without streaming to client".to_string(),
383 ));
384 }
385
386 // Handle the forwarder side (stdout/stderr streaming back to
387 // client).
388 match (&self.forwarder_mesh, stream_to_client) {
389 // Forwarders exist (config enabled at startup). We can
390 // toggle live.
391 (Some(fwd_mesh), _) => {
392 fwd_mesh
393 .cast(
394 instance.deref(),
395 LogForwardMessage::SetMode { stream_to_client },
396 )
397 .map_err(|e| {
398 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string())
399 })?;
400 }
401
402 // Forwarders were never spawned (global forwarding
403 // disabled) and the caller is asking NOT to stream.
404 // That's effectively a no-op so we silently accept.
405 (None, false) => {
406 // Nothing to do.
407 }
408
409 // Forwarders were never spawned, but caller is asking to
410 // stream. We can't satisfy this request without
411 // re-spawning infra, which we deliberately don't do at
412 // runtime.
413 (None, true) => {
414 // return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
415 // "log forwarding disabled by config at startup; cannot enable streaming_to_client",
416 // ));
417 }
418 }
419
420 // Always update the per-proc Python logging level.
421 self.logger_mesh
422 .cast(instance.deref(), LoggerRuntimeMessage::SetLogging { level })
423 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
424
425 // Always update the client actor's aggregation window.
426 self.client_actor.post(
427 instance.deref(),
428 LogClientMessage::SetAggregate {
429 aggregate_window_sec,
430 },
431 );
432
433 Ok(())
434 }
435
436 /// Force a sync flush of remote stdout/stderr back to the client,
437 /// and wait for completion.
438 ///
439 /// If log forwarding was disabled at startup (so we never spawned
440 /// any `LogForwardActor`s), this becomes a no-op success: there's
441 /// nothing to flush from remote procs in that mode, and we don't
442 /// try to manufacture it dynamically.
443 fn flush(&self, instance: &PyInstance) -> PyResult<PyPythonTask> {
444 let forwarder_mesh_opt = self
445 .forwarder_mesh
446 .as_ref()
447 .map(|mesh| mesh.deref().clone());
448 let client_actor = self.client_actor.clone();
449 let instance = instance.clone();
450
451 PyPythonTask::new(async move {
452 // If there's no forwarer mesh (forwarding disabled by
453 // config), we just succeed immediately.
454 let Some(forwarder_mesh) = forwarder_mesh_opt else {
455 return Ok(());
456 };
457
458 Self::flush_internal(instance.deref(), client_actor, forwarder_mesh)
459 .await
460 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))
461 })
462 }
463}
464
465// NOTE ON LIFECYCLE / CLEANUP
466//
467// `LoggingMeshClient` is a thin owner for three pieces of logging
468// infra:
469//
470// - `client_actor`: a single `LogClientActor` in the *local*
471// process.
472// - `forwarder_mesh`: (optional) an `ActorMesh<LogForwardActor>`
473// with one actor per remote proc in the `ProcMesh`, responsible for
474// forwarding that proc's stdout/stderr back to the client.
475// - `logger_mesh`: an `ActorMesh<LoggerRuntimeActor>` with one
476// actor per remote proc, responsible for driving that proc's Python
477// logging configuration.
478//
479// The Python-facing handle we hand back to callers is a
480// `Py<LoggingMeshClient>`. When that handle is dropped (or goes out
481// of scope in a test), PyO3 will run `Drop` for `LoggingMeshClient`.
482//
483// Important:
484//
485// - In `Drop` we *only* call `drain_and_stop()` on the local
486// `LogClientActor`. This asks the client-side aggregator to
487// flush/stop so we don't leave a local task running.
488// - We do NOT synchronously tear down the per-proc meshes here.
489// Dropping `forwarder_mesh` / `logger_mesh` just releases our
490// handles; the actual `LogForwardActor` / `LoggerRuntimeActor`
491// instances keep running on the remote procs until those procs are
492// shut down.
493//
494// This is fine in tests because we always shut the world down
495// afterward via `host_mesh.shutdown(&instance)`, which tears down the
496// spawned procs and all actors running in them. In other words:
497//
498// drop(Py<LoggingMeshClient>)
499// → stops the local `LogClientActor`, drops mesh handles
500// host_mesh.shutdown(...)
501// → kills the remote procs, which takes out the per-proc actors
502//
503// If you reuse this type outside tests, keep in mind that simply
504// dropping `LoggingMeshClient` does *not* on its own tear down the
505// remote logging actors; it only stops the local client actor.
506impl Drop for LoggingMeshClient {
507 fn drop(&mut self) {
508 // Use catch_unwind to guard against panics during interpreter shutdown.
509 // During Python teardown, the tokio runtime or channels may already be
510 // deallocated, and attempting to drain could cause a segfault.
511 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
512 match self.client_actor.drain_and_stop("logging client shutdown") {
513 Ok(_) => {}
514 Err(e) => {
515 // it is ok as during shutdown, the channel might already be closed
516 tracing::debug!("error draining logging client actor during shutdown: {}", e);
517 }
518 }
519 }));
520 }
521}
522
523/// Turns a python exception into a string with a traceback. If the traceback doesn't
524/// exist or can't be formatted, returns just the exception message.
525pub(crate) fn format_traceback(py: Python<'_>, err: &PyErr) -> String {
526 let traceback = err.traceback(py);
527 if traceback.is_some() {
528 let inner = || -> PyResult<String> {
529 let formatted = py
530 .import("traceback")?
531 .call_method1("format_exception", (err.clone_ref(py),))?;
532 Ok(PyString::new(py, "")
533 .call_method1("join", (formatted,))?
534 .to_string())
535 };
536 match inner() {
537 Ok(s) => s,
538 Err(e) => format!("{}: no traceback {}", err, e),
539 }
540 } else {
541 err.to_string()
542 }
543}
544
545#[pyfunction]
546fn log_endpoint_exception(
547 py: Python<'_>,
548 e: Py<PyAny>,
549 endpoint: Py<PyAny>,
550 actor_id: PyActorAddr,
551) {
552 let pyerr = PyErr::from_value(e.into_bound(py));
553 let exception_str = format_traceback(py, &pyerr);
554 let endpoint = endpoint.into_bound(py).to_string();
555 tracing::info!(
556 actor_id = actor_id.inner.to_string(),
557 %endpoint,
558 "exception occurred in endpoint: {}",
559 exception_str,
560 );
561}
562
563/// Register the Python-facing types for this module.
564///
565/// `pyo3` calls this when building `monarch._rust_bindings...`. We
566/// expose `LoggingMeshClient` so that Python can construct it and
567/// call its methods (`spawn`, `set_mode`, `flush`, ...).
568pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
569 module.add_class::<LoggingMeshClient>()?;
570 let log_endpoint_exception = wrap_pyfunction!(log_endpoint_exception, module.py())?;
571 log_endpoint_exception.setattr(
572 "__module__",
573 "monarch._rust_bindings.monarch_hyperactor.logging",
574 )?;
575 module.add_function(log_endpoint_exception)?;
576 Ok(())
577}
578
579#[cfg(test)]
580mod tests {
581 use anyhow::Result;
582 use hyperactor::Instance;
583 use hyperactor::channel::ChannelTransport;
584 use hyperactor::proc::Proc;
585 use hyperactor_mesh::ProcMesh;
586 use hyperactor_mesh::host_mesh::HostMesh;
587 use ndslice::Extent;
588 use ndslice::View; // .region(), .num_ranks() etc.
589
590 use super::*;
591 use crate::actor::PythonActor;
592 use crate::pytokio::AwaitPyExt;
593 use crate::pytokio::ensure_python;
594
595 /// Bring up a minimal "world" suitable for integration-style
596 /// tests.
597 pub async fn test_world() -> Result<(Proc, Instance<PythonActor>, HostMesh, ProcMesh)> {
598 ensure_python();
599
600 let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
601 .expect("failed to start root Proc");
602
603 let ai = proc
604 .actor_instance("client")
605 .expect("failed to create proc Instance");
606 let instance = ai.instance;
607
608 let host_mesh = HostMesh::local_with_bootstrap(
609 crate::testresource::get("monarch/monarch_hyperactor/bootstrap").into(),
610 )
611 .await
612 .expect("failed to bootstrap HostMesh");
613
614 let proc_mesh = host_mesh
615 .spawn(&instance, "p0", Extent::unity(), None, None)
616 .await
617 .expect("failed to spawn ProcMesh");
618
619 Ok((proc, instance, host_mesh, proc_mesh))
620 }
621
622 #[cfg_attr(not(target_os = "linux"), ignore = "linux-only")]
623 #[tokio::test]
624 async fn test_world_smoke() {
625 let (proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
626
627 assert_eq!(
628 host_mesh.region().num_ranks(),
629 1,
630 "should allocate exactly one host"
631 );
632 assert_eq!(
633 proc_mesh.region().num_ranks(),
634 1,
635 "should spawn exactly one proc"
636 );
637 assert_eq!(
638 instance.self_addr().proc_addr(),
639 proc.proc_addr().clone(),
640 "returned Instance<()> should be bound to the root Proc"
641 );
642
643 host_mesh.shutdown(&instance).await.expect("host shutdown");
644 }
645
646 #[cfg_attr(not(target_os = "linux"), ignore = "linux-only")]
647 #[tokio::test]
648 async fn spawn_respects_forwarding_flag() {
649 let (_, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
650
651 let py_instance = PyInstance::from(&instance);
652 let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
653 let lock = hyperactor_config::global::lock();
654
655 // Case 1: forwarding disabled => `forwarder_mesh` should be `None`.
656 {
657 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
658
659 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
660 .expect("spawn PyPythonTask (forwarding disabled)");
661
662 let client_py: Py<LoggingMeshClient> = client_task
663 .await_py()
664 .await
665 .expect("spawn failed (forwarding disabled)");
666
667 monarch_with_gil(|py| {
668 let client_ref = client_py.borrow(py);
669 assert!(
670 client_ref.forwarder_mesh.is_none(),
671 "forwarder_mesh should be None when forwarding disabled"
672 );
673 })
674 .await;
675
676 drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
677 }
678
679 // Case 2: forwarding enabled => `forwarder_mesh` should be `Some`.
680 {
681 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
682
683 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
684 .expect("spawn PyPythonTask (forwarding enabled)");
685
686 let client_py: Py<LoggingMeshClient> = client_task
687 .await_py()
688 .await
689 .expect("spawn failed (forwarding enabled)");
690
691 monarch_with_gil(|py| {
692 let client_ref = client_py.borrow(py);
693 assert!(
694 client_ref.forwarder_mesh.is_some(),
695 "forwarder_mesh should be Some(..) when forwarding is enabled"
696 );
697 })
698 .await;
699
700 drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
701 }
702
703 host_mesh.shutdown(&instance).await.expect("host shutdown");
704 }
705
706 #[cfg_attr(not(target_os = "linux"), ignore = "linux-only")]
707 #[tokio::test]
708 async fn set_mode_behaviors() {
709 let (_proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
710
711 let py_instance = PyInstance::from(&instance);
712 let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
713 let lock = hyperactor_config::global::lock();
714
715 // Case 1: forwarding disabled => `forwarder_mesh.is_none()`.
716 {
717 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
718
719 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
720 .expect("spawn PyPythonTask (forwarding disabled)");
721
722 let client_py: Py<LoggingMeshClient> = client_task
723 .await_py()
724 .await
725 .expect("spawn failed (forwarding disabled)");
726
727 monarch_with_gil(|py| {
728 let client_ref = client_py.borrow(py);
729
730 // (a) stream_to_client = false, no aggregate window
731 // -> OK
732 let res = client_ref.set_mode(&py_instance, false, None, 10);
733 assert!(res.is_ok(), "expected Ok(..), got {res:?}");
734
735 // (b) stream_to_client = false,
736 // aggregate_window_sec.is_some() -> Err = Some(..) ->
737 // Err
738 let res = client_ref.set_mode(&py_instance, false, Some(1), 10);
739 assert!(
740 res.is_err(),
741 "expected Err(..) for window without streaming"
742 );
743 if let Err(e) = res {
744 let msg = e.to_string();
745 assert!(
746 msg.contains("cannot set aggregate window without streaming to client"),
747 "unexpected err for aggregate_window without streaming: {msg}"
748 );
749 }
750
751 /*
752 // Update (SF: 2025, 11, 13): We now ignore stream to client requests if
753 // log forwarding is enabled.
754 // (c) stream_to_client = true when forwarding was
755 // never spawned -> Err
756 let res = client_ref.set_mode(&py_instance, true, None, 10);
757 assert!(
758 res.is_err(),
759 "expected Err(..) when enabling streaming but no forwarders"
760 );
761 if let Err(e) = res {
762 let msg = e.to_string();
763 assert!(
764 msg.contains("log forwarding disabled by config at startup"),
765 "unexpected err when enabling streaming with no forwarders: {msg}"
766 );
767 }
768 */
769 })
770 .await;
771
772 drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
773 }
774
775 // Case 2: forwarding enabled => `forwarder_mesh.is_some()`.
776 {
777 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
778
779 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
780 .expect("spawn PyPythonTask (forwarding enabled)");
781
782 let client_py: Py<LoggingMeshClient> = client_task
783 .await_py()
784 .await
785 .expect("spawn failed (forwarding enabled)");
786
787 monarch_with_gil(|py| {
788 let client_ref = client_py.borrow(py);
789
790 // (d) stream_to_client = true, aggregate_window_sec =
791 // Some(..) -> OK now that we *do* have forwarders,
792 // enabling streaming should succeed.
793 let res = client_ref.set_mode(&py_instance, true, Some(2), 20);
794 assert!(
795 res.is_ok(),
796 "expected Ok(..) enabling streaming w/ window: {res:?}"
797 );
798
799 // (e) aggregate_window_sec = Some(..) but
800 // stream_to_client = false -> still Err (this
801 // rule doesn't care about forwarding being
802 // enabled or not).
803 let res = client_ref.set_mode(&py_instance, false, Some(2), 20);
804 assert!(
805 res.is_err(),
806 "expected Err(..) for window without streaming even w/ forwarders"
807 );
808 if let Err(e) = res {
809 let msg = e.to_string();
810 assert!(
811 msg.contains("cannot set aggregate window without streaming to client"),
812 "unexpected err when setting window but disabling streaming: {msg}"
813 );
814 }
815 })
816 .await;
817
818 drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
819 }
820
821 host_mesh.shutdown(&instance).await.expect("host shutdown");
822 }
823
824 #[cfg_attr(not(target_os = "linux"), ignore = "linux-only")]
825 #[tokio::test]
826 async fn flush_behaviors() {
827 let (_proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
828
829 let py_instance = PyInstance::from(&instance);
830 let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
831 let lock = hyperactor_config::global::lock();
832
833 // Case 1: forwarding disabled => `forwarder_mesh.is_none()`.
834 {
835 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false);
836
837 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
838 .expect("spawn PyPythonTask (forwarding disabled)");
839
840 let client_py: Py<LoggingMeshClient> = client_task
841 .await_py()
842 .await
843 .expect("spawn failed (forwarding disabled)");
844
845 // Call flush() and bring the PyPythonTask back out.
846 let flush_task = monarch_with_gil(|py| {
847 let client_ref = client_py.borrow(py);
848 client_ref
849 .flush(&py_instance)
850 .expect("flush() PyPythonTask (forwarding disabled)")
851 })
852 .await;
853
854 // Await the returned PyPythonTask's future outside the
855 // GIL.
856 flush_task
857 .await_unit()
858 .await
859 .expect("flush failed (forwarding disabled)");
860
861 drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP"
862 }
863
864 // Case 2: forwarding enabled => `forwarder_mesh.is_some()`.
865 {
866 let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true);
867
868 let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh)
869 .expect("spawn PyPythonTask (forwarding enabled)");
870
871 let client_py: Py<LoggingMeshClient> = client_task
872 .await_py()
873 .await
874 .expect("spawn failed (forwarding enabled)");
875
876 // Call flush() to exercise the barrier path, and pull the
877 // PyPythonTask out.
878 let flush_task = monarch_with_gil(|py| {
879 client_py
880 .borrow(py)
881 .flush(&py_instance)
882 .expect("flush() PyPythonTask (forwarding enabled)")
883 })
884 .await;
885
886 // Await the returned PyPythonTask's future outside the
887 // GIL.
888 flush_task
889 .await_unit()
890 .await
891 .expect("flush failed (forwarding enabled)");
892
893 drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
894 }
895
896 host_mesh.shutdown(&instance).await.expect("host shutdown");
897 }
898}