hyperactor_mesh/global_context.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Process-global context, root client actor, and supervision bridge.
10//!
11//! This module provides the Rust equivalent of Python's `context()`,
12//! `this_host()`, and `this_proc()`. A singleton [`Host`] is lazily
13//! created with the [`GlobalClientActor`] on its `local_proc`:
14//!
15//! ```rust,ignore
16//! let cx = context().await;
17//! cx.actor_instance // c.f. Python: context().actor_instance
18//! this_host().await // c.f. Python: this_host()
19//! this_proc().await // c.f Python: this_proc()
20//! ```
21//!
22//! ## Undeliverables → supervision
23//!
24//! When the runtime detects that a message cannot be delivered, it
25//! produces an [`Undeliverable<MessageEnvelope>`]. The global root
26//! client observes these failures, converts them into
27//! [`ActorSupervisionEvent`]s, and forwards them to the currently
28//! active mesh supervision sink.
29//!
30//! **GC-1 (undeliverable routing):** Any
31//! `Undeliverable<MessageEnvelope>` observed by the global root
32//! client must be reported as an [`ActorSupervisionEvent`] to the
33//! active `ProcMesh`, and handling that failure must never crash the
34//! global client. The root client acts as a monitor, not a
35//! participant: routing failures are treated as signals to be
36//! reported, not fatal errors.
37//!
38//! ## Multiple ProcMeshes
39//!
40//! A process may allocate more than one `ProcMesh` (e.g.
41//! internal/controller meshes plus an application mesh). The root
42//! client is a process-wide singleton, so its supervision sink is
43//! also process-global.
44//!
45//! The active mesh is defined using **last-sink-wins** semantics:
46//! each newly allocated `ProcMesh` installs its sink, overriding the
47//! previous one.
48//!
49//! If no sink has been installed yet (early/late binding),
50//! undeliverables are logged and dropped, preserving forward progress
51//! until a mesh becomes available.
52
53use std::sync::OnceLock;
54use std::sync::RwLock;
55
56use async_trait::async_trait;
57use hyperactor::Actor;
58use hyperactor::ActorHandle;
59use hyperactor::Context;
60use hyperactor::Endpoint as _;
61use hyperactor::Handler;
62use hyperactor::Instance;
63use hyperactor::PortRef;
64use hyperactor::actor::ActorError;
65use hyperactor::actor::ActorErrorKind;
66use hyperactor::actor::ActorStatus;
67use hyperactor::actor::Signal;
68use hyperactor::id::Label;
69use hyperactor::mailbox::DeliveryError;
70use hyperactor::mailbox::MessageEnvelope;
71use hyperactor::mailbox::PortReceiver;
72use hyperactor::mailbox::Undeliverable;
73use hyperactor::proc::Proc;
74use hyperactor::proc::WorkCell;
75use hyperactor::supervision::ActorSupervisionEvent;
76use tokio::sync::mpsc;
77use tokio::task::JoinHandle;
78
79use crate::HostMeshRef;
80use crate::host::Host;
81use crate::host::LocalProcManager;
82use crate::host_mesh::host_agent::GetLocalProcClient;
83use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
84use crate::host_mesh::host_agent::HostAgent;
85use crate::host_mesh::host_agent::HostAgentMode;
86use crate::host_mesh::host_agent::ProcManagerSpawnFn;
87use crate::mesh_id::HostMeshId;
88use crate::mesh_id::ProcMeshId;
89use crate::proc_agent::GetProcClient;
90use crate::proc_agent::ProcAgent;
91use crate::proc_mesh::ProcMeshRef;
92use crate::proc_mesh::ProcRef;
93use crate::supervision::MeshFailure;
94use crate::transport::default_bind_spec;
95
96/// Single, process-wide supervision sink storage.
97///
98/// Routes undeliverables observed by the process-global root client
99/// (c.f. [`context()`]) to the *currently active* `ProcMesh`'s
100/// agent. Newer meshes override older ones ("last sink wins").
101///
102/// Uses `PortRef` (not `PortHandle`) because the sink target
103/// (`ProcAgent`) runs in a remote worker process.
104static GLOBAL_SUPERVISION_SINK: OnceLock<RwLock<Option<PortRef<ActorSupervisionEvent>>>> =
105 OnceLock::new();
106
107/// Returns the lazily-initialized container that holds the current
108/// process-global supervision sink.
109fn sink_cell() -> &'static RwLock<Option<PortRef<ActorSupervisionEvent>>> {
110 GLOBAL_SUPERVISION_SINK.get_or_init(|| RwLock::new(None))
111}
112
113/// Install (or replace) the process-global supervision sink used by
114/// the [`context()`] undeliverable → supervision bridge.
115///
116/// This uses **last-sink-wins** semantics: if multiple `ProcMesh`
117/// instances are created in the same process (e.g. controller meshes
118/// plus an application mesh), the most recently installed sink
119/// becomes the active destination for forwarded
120/// [`ActorSupervisionEvent`]s.
121///
122/// Returns the previously installed sink, if any, to allow callers to
123/// log/inspect overrides.
124///
125/// Note: the sink is a [`PortRef`] (not a `PortHandle`) because the
126/// destination [`ProcAgent`] may live in a different
127/// process/rank.
128pub(crate) fn set_global_supervision_sink(
129 sink: PortRef<ActorSupervisionEvent>,
130) -> Option<PortRef<ActorSupervisionEvent>> {
131 let cell = sink_cell();
132 let mut guard = cell.write().unwrap();
133 let prev = guard.take();
134 *guard = Some(sink);
135 prev
136}
137
138/// Get the current process-global supervision sink used by the
139/// [`context()`] undeliverable → supervision bridge.
140///
141/// Returns `None` until some mesh creation path installs a sink
142/// (early/late binding). Callers should treat this as "no active mesh
143/// yet": log and drop undeliverables rather than crashing the global
144/// root client.
145///
146/// Cloning a [`PortRef`] is cheap.
147///
148/// Used only by the process-global root client.
149fn get_global_supervision_sink() -> Option<PortRef<ActorSupervisionEvent>> {
150 sink_cell().read().unwrap().clone()
151}
152
153/// Process-global "root client" actor.
154///
155/// This actor lives on the `local_proc` of the singleton [`Host`]
156/// created by [`context()`], symmetric with Python's
157/// `RootClientActor` on `bootstrap_host()`'s local proc.
158///
159/// It acts as a *monitor* for routing failures observed at the
160/// process boundary: undeliverable messages are treated as signals to
161/// be reported via mesh supervision (when a sink is installed), not
162/// as fatal errors.
163///
164/// The actor is driven by `run()`, which `select!`s over:
165/// - `work_rx`: the primary dispatch queue for bound handler work
166/// items (including `Undeliverable<MessageEnvelope>` and
167/// `MeshFailure>`),
168/// - `supervision_rx`: supervision events delivered to this actor,
169/// and
170/// - `signal_rx`: control signals (currently minimal handling).
171#[derive(Debug)]
172#[hyperactor::export(handlers = [MeshFailure])]
173pub struct GlobalClientActor {
174 /// Control signals for the actor's proc (shutdown, etc.).
175 signal_rx: PortReceiver<Signal>,
176 /// Supervision events delivered to this actor instance.
177 ///
178 /// The root client is a monitor, so it should process these
179 /// events without crashing on routine routing/delivery failures
180 /// it observes.
181 supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
182 /// Primary work queue for handler dispatch.
183 ///
184 /// Any bound handler message (e.g. `MeshFailure`,
185 /// `Undeliverable<MessageEnvelope>`, introspection, etc.) is
186 /// received here and executed via `WorkCell::handle`.
187 work_rx: mpsc::UnboundedReceiver<WorkCell<Self>>,
188}
189
190impl GlobalClientActor {
191 fn run(mut self, instance: &'static Instance<Self>) -> JoinHandle<()> {
192 tokio::spawn(async move {
193 #[allow(unused_labels)]
194 let err = 'messages: loop {
195 tokio::select! {
196 work = self.work_rx.recv() => {
197 let work = work.expect("inconsistent work queue state");
198 if let Err(err) = work.handle(&mut self, instance).await {
199 while let Ok(supervision_event) = self.supervision_rx.try_recv() {
200 instance.handle_supervision_event(&mut self, supervision_event).await
201 .expect("GlobalClientActor::handle_supervision_event is infallible");
202 }
203 let kind = ActorErrorKind::processing(err);
204 break ActorError {
205 actor_id: Box::new(instance.self_addr().clone()),
206 kind: Box::new(kind),
207 };
208 }
209 }
210 _ = self.signal_rx.recv() => {
211 // TODO: do we need any signal handling for the root client?
212 }
213 Some(supervision_event) = self.supervision_rx.recv() => {
214 instance.handle_supervision_event(&mut self, supervision_event).await
215 .expect("GlobalClientActor::handle_supervision_event is infallible");
216 }
217 };
218 };
219 let event = match *err.kind {
220 ActorErrorKind::UnhandledSupervisionEvent(event) => *event,
221 _ => {
222 let status = ActorStatus::generic_failure(err.kind.to_string());
223 ActorSupervisionEvent::new(
224 instance.self_addr().clone(),
225 Some("testclient".into()),
226 status,
227 None,
228 )
229 }
230 };
231 instance
232 .proc()
233 .handle_unhandled_supervision_event(instance, event);
234 })
235 }
236}
237
238/// Handle a returned (undeliverable) message observed by the
239/// process-global root client.
240///
241/// The global root client is a **monitor**, not a participant: it
242/// must not crash or propagate failures just because a routed message
243/// could not be delivered.
244///
245/// Instead, we translate the undeliverable into an
246/// `ActorSupervisionEvent` and forward it to the **active**
247/// `ProcMesh` via the process-global supervision sink ("last sink
248/// wins"). If no sink has been installed yet (e.g., before the first
249/// `ProcMesh` allocation completes), we log and drop the event.
250#[async_trait]
251impl Actor for GlobalClientActor {
252 /// The global root client is the root of the supervision tree:
253 /// there is no parent to escalate to. Child-actor failures (e.g.
254 /// ActorMeshControllers detecting dead procs after mesh teardown)
255 /// are expected and must not crash the process.
256 async fn handle_supervision_event(
257 &mut self,
258 _this: &Instance<Self>,
259 event: &ActorSupervisionEvent,
260 ) -> Result<bool, anyhow::Error> {
261 tracing::warn!(
262 %event,
263 "global root client absorbed child supervision event",
264 );
265 Ok(true)
266 }
267
268 async fn handle_undeliverable_message(
269 &mut self,
270 cx: &Instance<Self>,
271 undeliverable: Undeliverable<MessageEnvelope>,
272 ) -> Result<(), anyhow::Error> {
273 let mut env = match undeliverable {
274 Undeliverable::Message(env) => env,
275 Undeliverable::Lost(lost) => {
276 let actor_ref = lost.dest.actor_addr();
277 let event = ActorSupervisionEvent::new(
278 actor_ref.clone(),
279 None,
280 ActorStatus::generic_failure(format!(
281 "message not delivered to {}: {}",
282 lost.dest, lost.error
283 )),
284 None,
285 );
286 match get_global_supervision_sink() {
287 Some(sink) => {
288 sink.post(cx, event);
289 }
290 None => {
291 tracing::warn!(
292 actor=%actor_ref,
293 error=%lost.error,
294 "no supervision sink; lost message logged but not forwarded"
295 );
296 }
297 }
298 return Ok(());
299 }
300 };
301 env.set_error(DeliveryError::BrokenLink(
302 "message returned to global root client".to_string(),
303 ));
304 let actor_ref = env.dest().actor_addr();
305 let headers = env.headers().clone();
306 let event = ActorSupervisionEvent::new(
307 actor_ref.clone(),
308 None,
309 ActorStatus::generic_failure(format!("message not delivered: {}", env)),
310 Some(headers),
311 );
312
313 match get_global_supervision_sink() {
314 Some(sink) => {
315 sink.post(cx, event);
316 }
317 None => {
318 tracing::warn!(
319 actor=%actor_ref,
320 error=?env.errors(),
321 "no supervision sink; undeliverable message logged but not forwarded"
322 );
323 }
324 }
325 Ok(())
326 }
327}
328
329/// `MeshFailure` is a terminal supervision signal for an `ActorMesh`.
330///
331/// The process-global root client should never be a consumer of
332/// mesh-level supervision failures during normal operation: those
333/// events are expected to be observed and handled by the owning
334/// mesh/controller, not by the global client.
335///
336/// In processes that create and destroy multiple meshes (e.g.,
337/// benchmarks), `MeshFailure` events may arrive here during or after
338/// mesh teardown. Log loudly but do not crash — the global client is
339/// a monitor and must preserve forward progress.
340#[async_trait]
341impl Handler<MeshFailure> for GlobalClientActor {
342 async fn handle(&mut self, _cx: &Context<Self>, message: MeshFailure) -> anyhow::Result<()> {
343 tracing::error!("supervision failure reached global client: {}", message);
344 Ok(())
345 }
346}
347
348struct GlobalState {
349 actor_instance: &'static Instance<GlobalClientActor>,
350 host_mesh: HostMeshRef,
351 proc_mesh: ProcMeshRef,
352}
353
354/// Process-global, lazily-initialized Monarch context.
355///
356/// Backed by a `tokio::sync::OnceCell` so initialization is async and
357/// runs at most once per process. The first caller bootstraps the
358/// singleton host and root client actor (mirroring Python's
359/// `bootstrap_host()` / `context()`), and subsequent callers reuse
360/// the same `GlobalState`.
361///
362/// This provides a stable root `actor_instance` plus `this_host()` /
363/// `this_proc()` accessors.
364static GLOBAL_CONTEXT: tokio::sync::OnceCell<GlobalState> = tokio::sync::OnceCell::const_new();
365
366/// Bootstrap the singleton Host and GlobalClientActor. Mirrors
367/// Python's `bootstrap_host()` (monarch_hyperactor/src/host_mesh.rs).
368async fn bootstrap_host() -> GlobalState {
369 // 1. Create Host with LocalProcManager. The spawn closure is the
370 // ProcAgent boot function, called by HostAgent on GetLocalProc.
371 let spawn: ProcManagerSpawnFn =
372 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
373 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
374 let host = Host::new(manager, default_bind_spec().binding_addr())
375 .await
376 .expect("failed to create global host");
377
378 // 2. Extract system_proc before moving Host into HostAgent.
379 let system_proc = host.system_proc().clone();
380
381 // 3. Spawn HostAgent on system_proc (takes ownership of Host).
382 let host_agent = system_proc
383 .spawn(
384 HOST_MESH_AGENT_ACTOR_NAME,
385 HostAgent::new(HostAgentMode::Local(host)),
386 )
387 .expect("failed to spawn host agent");
388
389 // 4. Build HostMeshRef.
390 let host_mesh = HostMeshRef::from_host_agent(
391 HostMeshId::singleton(Label::new("local").unwrap()),
392 host_agent.bind(),
393 )
394 .expect("failed to create host mesh ref");
395
396 // 5. Get local_proc via HostAgent (lazily boots ProcAgent).
397 //
398 // We use a throwaway Proc::isolated() for the bootstrap request-reply
399 // calls, matching Python's bootstrap_host() (host_mesh.rs:330-333).
400 // This creates a temporary in-process-only proc context during init
401 // — intentionally acceptable for cross-language symmetry and easier
402 // reasoning about the bootstrap sequence.
403 let temp_proc = Proc::isolated();
404 let (bootstrap_cx, _guard) = temp_proc
405 .client("bootstrap")
406 .expect("failed to create bootstrap instance");
407 let local_proc_agent: ActorHandle<ProcAgent> = host_agent
408 .get_local_proc(&bootstrap_cx)
409 .await
410 .expect("failed to get local proc agent");
411
412 // 6. Get the actual Proc object.
413 let local_proc = local_proc_agent
414 .get_proc(&bootstrap_cx)
415 .await
416 .expect("failed to get local proc");
417
418 // 7. Build ProcMeshRef.
419 let proc_mesh = ProcMeshRef::new_singleton(
420 ProcMeshId::singleton(Label::new("local").unwrap()),
421 ProcRef::new(
422 local_proc_agent.actor_addr().proc_addr(),
423 0,
424 local_proc_agent.bind(),
425 ),
426 );
427 let actor_instance = local_proc
428 .actor_instance::<GlobalClientActor>("client")
429 .expect("failed to create root client instance");
430
431 let hyperactor::proc::ActorInstance {
432 instance: client_instance,
433 handle,
434 supervision,
435 signal,
436 work,
437 } = actor_instance;
438
439 // GlobalClientActor uses a custom run loop that bypasses
440 // Actor::init, so set_system() must be called explicitly.
441 client_instance.set_system();
442 handle.bind::<GlobalClientActor>();
443
444 // Use a static OnceLock to get 'static lifetime for the instance.
445 static INSTANCE: OnceLock<(Instance<GlobalClientActor>, ActorHandle<GlobalClientActor>)> =
446 OnceLock::new();
447 INSTANCE
448 .set((client_instance, handle))
449 .map_err(|_| "already initialized root client instance")
450 .unwrap();
451 let (instance, _handle) = INSTANCE.get().unwrap();
452
453 let client = GlobalClientActor {
454 signal_rx: signal,
455 supervision_rx: supervision,
456 work_rx: work,
457 };
458 client.run(instance);
459
460 GlobalState {
461 actor_instance: instance,
462 host_mesh,
463 proc_mesh,
464 }
465}
466
467/// Process-global Monarch context for Rust programs. Symmetric with
468/// Python's `context()`.
469pub struct GlobalContext {
470 /// Consistent with Python's `context().actor_instance`
471 pub actor_instance: &'static Instance<GlobalClientActor>,
472 /// The singleton HostMesh. See also [`this_host()`].
473 pub host_mesh: &'static HostMeshRef,
474 /// The local ProcMesh. See also [`this_proc()`].
475 pub proc_mesh: &'static ProcMeshRef,
476}
477
478/// Returns the process-global Monarch context, lazily initialized.
479///
480/// On first call, creates a singleton [`Host`] and bootstraps
481/// [`GlobalClientActor`] on its `local_proc` — symmetric with
482/// Python's `bootstrap_host()`. Subsequent calls return immediately.
483///
484/// ```rust,ignore
485/// let cx = context().await;
486/// cx.actor_instance // c.f. Python: context().actor_instance
487/// ```
488///
489/// **Python programs do not use this.** Python has its own root
490/// client actor bootstrapped separately.
491pub async fn context() -> GlobalContext {
492 let state = GLOBAL_CONTEXT.get_or_init(bootstrap_host).await;
493 GlobalContext {
494 actor_instance: state.actor_instance,
495 host_mesh: &state.host_mesh,
496 proc_mesh: &state.proc_mesh,
497 }
498}
499
500/// Returns the singleton HostMesh c.f. Python's `this_host()`.
501pub async fn this_host() -> &'static HostMeshRef {
502 &GLOBAL_CONTEXT.get_or_init(bootstrap_host).await.host_mesh
503}
504
505/// Returns the local ProcMesh c.f Python's `this_proc()`.
506pub async fn this_proc() -> &'static ProcMeshRef {
507 &GLOBAL_CONTEXT.get_or_init(bootstrap_host).await.proc_mesh
508}
509
510/// Separate storage for client host registered by non-Rust runtimes
511/// (e.g. Python's `bootstrap_host()`). Checked by `try_this_host()`
512/// alongside `GLOBAL_CONTEXT`.
513static REGISTERED_CLIENT_HOST: std::sync::OnceLock<HostMeshRef> = std::sync::OnceLock::new();
514
515/// Register the client host mesh from an external runtime (Python).
516/// Called by Python's `bootstrap_host()` so that `try_this_host()`
517/// can discover C for the A/C invariant.
518pub fn register_client_host(host_mesh: HostMeshRef) {
519 let _ = REGISTERED_CLIENT_HOST.set(host_mesh);
520}
521
522/// Returns the client host mesh if available, without triggering
523/// lazy bootstrap. Checks both the Rust global context and the
524/// external registration (Python). Used by `MeshAdminAgent` to
525/// discover C at query time (A/C invariant).
526pub fn try_this_host() -> Option<&'static HostMeshRef> {
527 GLOBAL_CONTEXT
528 .get()
529 .map(|state| &state.host_mesh)
530 .or_else(|| REGISTERED_CLIENT_HOST.get())
531}
532
533#[cfg(test)]
534mod tests {
535 use std::time::Duration;
536
537 use hyperactor::testing::ids::test_actor_id;
538 use hyperactor_config::Flattrs;
539 #[cfg(fbcode_build)]
540 use ndslice::view::Extent;
541 #[cfg(fbcode_build)]
542 use timed_test::async_timed_test;
543
544 use super::*;
545 #[cfg(fbcode_build)]
546 use crate::testing;
547
548 /// Helper: send an `Undeliverable<MessageEnvelope>` to the global
549 /// root client's well-known undeliverable port via the runtime's
550 /// routing/dispatch path.
551 ///
552 /// This exercises the full integration boundary: serialisation →
553 /// routing → work_rx dispatch → `handle_undeliverable_message`.
554 ///
555 /// Uses the provided `dest_actor` so callers can distinguish
556 /// events from different injections (important because the global
557 /// sink is shared across tests running in the same process).
558 fn inject_undeliverable(
559 client: &'static Instance<GlobalClientActor>,
560 dest_actor: hyperactor::ActorAddr,
561 ) {
562 let env = MessageEnvelope::new(
563 client.self_addr().clone(),
564 dest_actor.port_addr(0.into()),
565 wirevalue::Any::serialize(&0u64).unwrap(),
566 Flattrs::new(),
567 );
568 // Target the global root client's well-known Undeliverable port.
569 let client_actor_id: hyperactor::ActorAddr = client.self_addr().clone();
570 let undeliverable_port =
571 PortRef::<Undeliverable<MessageEnvelope>>::attest_handler_port(&client_actor_id);
572 undeliverable_port.post(client, Undeliverable::Message(env));
573 }
574
575 /// Verifies that creating a `ProcMesh` installs the
576 /// process-global supervision sink used by the global root
577 /// client.
578 #[async_timed_test(timeout_secs = 30)]
579 #[cfg(fbcode_build)]
580 async fn test_sink_installed_after_mesh_creation() {
581 let instance = testing::instance();
582 let mut hm = testing::host_mesh(2).await;
583 let _mesh = hm
584 .spawn(instance, "test", Extent::unity(), None, None)
585 .await
586 .unwrap();
587 assert!(
588 get_global_supervision_sink().is_some(),
589 "supervision sink must be set after ProcMesh creation"
590 );
591 let _ = hm.shutdown(instance).await;
592 }
593
594 /// Proves the full forwarding pipeline:
595 ///
596 /// Undeliverable<MessageEnvelope>
597 /// → GlobalClientActor::handle_undeliverable_message
598 /// → GLOBAL_SUPERVISION_SINK (PortRef)
599 /// → ActorSupervisionEvent delivered
600 ///
601 /// Installs a local port as the sink and verifies that the
602 /// `ActorSupervisionEvent` arrives there.
603 #[tokio::test]
604 async fn test_undeliverable_forwarded_to_sink() {
605 let cx = context().await;
606 let client = cx.actor_instance;
607
608 // Install a test sink we control.
609 let (sink_handle, mut sink_rx) = client.open_port::<ActorSupervisionEvent>();
610 set_global_supervision_sink(sink_handle.bind());
611
612 let marker = test_actor_id("fwd_test", "marker_actor");
613 inject_undeliverable(client, marker.clone());
614
615 // The handler runs asynchronously via work_rx; wait for the
616 // forwarded event with our marker.
617 let event = tokio::time::timeout(Duration::from_secs(5), async {
618 loop {
619 let ev = sink_rx.recv().await.expect("sink channel closed");
620 if ev.actor_id == marker {
621 return ev;
622 }
623 // Discard stale events from other tests sharing the
624 // global sink.
625 }
626 })
627 .await
628 .expect("timed out waiting for supervision event");
629
630 assert_eq!(
631 event.actor_id, marker,
632 "forwarded event must reference the undeliverable's destination actor"
633 );
634 }
635
636 /// Proves last-sink-wins: when two sinks are installed in
637 /// sequence, only the second receives the forwarded event.
638 #[tokio::test]
639 async fn test_last_sink_wins() {
640 let cx = context().await;
641 let client = cx.actor_instance;
642
643 // Install sink A.
644 let (sink_a_handle, _sink_a_rx) = client.open_port::<ActorSupervisionEvent>();
645 set_global_supervision_sink(sink_a_handle.bind());
646
647 // Install sink B (overrides A).
648 let (sink_b_handle, mut sink_b_rx) = client.open_port::<ActorSupervisionEvent>();
649 set_global_supervision_sink(sink_b_handle.bind());
650
651 let marker = test_actor_id("last_wins", "marker_actor");
652 inject_undeliverable(client, marker.clone());
653
654 // B should receive our marked event.
655 let event = tokio::time::timeout(Duration::from_secs(5), async {
656 loop {
657 let ev = sink_b_rx.recv().await.expect("sink B channel closed");
658 if ev.actor_id == marker {
659 return ev;
660 }
661 }
662 })
663 .await
664 .expect("timed out waiting for supervision event on sink B");
665 assert_eq!(event.actor_id, marker);
666 }
667
668 /// Proves the global client does not crash when no sink is
669 /// installed (early/late binding). The handler must log and
670 /// drop gracefully, and the client must remain usable
671 /// afterward.
672 #[tokio::test]
673 async fn test_no_crash_without_sink() {
674 let cx = context().await;
675 let client = cx.actor_instance;
676
677 // Clear any previously installed sink.
678 *sink_cell().write().unwrap() = None;
679
680 // Inject an undeliverable — should not panic.
681 inject_undeliverable(client, test_actor_id("no_sink", "marker_actor"));
682
683 // Give the async handler time to run.
684 tokio::time::sleep(Duration::from_millis(100)).await;
685
686 // The global client must still be alive and usable.
687 // Verify by installing a new sink and sending another
688 // undeliverable that arrives correctly.
689 let (sink_handle, mut sink_rx) = client.open_port::<ActorSupervisionEvent>();
690 set_global_supervision_sink(sink_handle.bind());
691
692 let marker = test_actor_id("no_sink_recovery", "marker_actor");
693 inject_undeliverable(client, marker.clone());
694
695 let event = tokio::time::timeout(Duration::from_secs(5), async {
696 loop {
697 let ev = sink_rx.recv().await.expect("sink channel closed");
698 if ev.actor_id == marker {
699 return ev;
700 }
701 }
702 })
703 .await
704 .expect("timed out: global client crashed or stopped processing");
705 assert_eq!(event.actor_id, marker);
706 }
707}