Skip to main content

hyperactor_mesh/
global_context.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Process-global context, root client actor, and supervision bridge.
10//!
11//! This module provides the Rust equivalent of Python's `context()`,
12//! `this_host()`, and `this_proc()`. A singleton [`Host`] is lazily
13//! created with the [`GlobalClientActor`] on its `local_proc`:
14//!
15//! ```rust,ignore
16//! let cx = context().await;
17//! cx.actor_instance    // c.f. Python: context().actor_instance
18//! this_host().await    // c.f. Python: this_host()
19//! this_proc().await    // c.f Python: this_proc()
20//! ```
21//!
22//! ## Undeliverables → supervision
23//!
24//! When the runtime detects that a message cannot be delivered, it
25//! produces an [`Undeliverable<MessageEnvelope>`]. The global root
26//! client observes these failures, converts them into
27//! [`ActorSupervisionEvent`]s, and forwards them to the currently
28//! active mesh supervision sink.
29//!
30//! **GC-1 (undeliverable routing):** Any
31//! `Undeliverable<MessageEnvelope>` observed by the global root
32//! client must be reported as an [`ActorSupervisionEvent`] to the
33//! active `ProcMesh`, and handling that failure must never crash the
34//! global client. The root client acts as a monitor, not a
35//! participant: routing failures are treated as signals to be
36//! reported, not fatal errors.
37//!
38//! ## Multiple ProcMeshes
39//!
40//! A process may allocate more than one `ProcMesh` (e.g.
41//! internal/controller meshes plus an application mesh). The root
42//! client is a process-wide singleton, so its supervision sink is
43//! also process-global.
44//!
45//! The active mesh is defined using **last-sink-wins** semantics:
46//! each newly allocated `ProcMesh` installs its sink, overriding the
47//! previous one.
48//!
49//! If no sink has been installed yet (early/late binding),
50//! undeliverables are logged and dropped, preserving forward progress
51//! until a mesh becomes available.
52
53use std::sync::OnceLock;
54use std::sync::RwLock;
55
56use async_trait::async_trait;
57use hyperactor::Actor;
58use hyperactor::ActorHandle;
59use hyperactor::Context;
60use hyperactor::Endpoint as _;
61use hyperactor::Handler;
62use hyperactor::Instance;
63use hyperactor::PortRef;
64use hyperactor::actor::ActorError;
65use hyperactor::actor::ActorErrorKind;
66use hyperactor::actor::ActorStatus;
67use hyperactor::actor::Signal;
68use hyperactor::id::Label;
69use hyperactor::mailbox::DeliveryError;
70use hyperactor::mailbox::MessageEnvelope;
71use hyperactor::mailbox::PortReceiver;
72use hyperactor::mailbox::Undeliverable;
73use hyperactor::proc::Proc;
74use hyperactor::proc::WorkCell;
75use hyperactor::supervision::ActorSupervisionEvent;
76use tokio::sync::mpsc;
77use tokio::task::JoinHandle;
78
79use crate::HostMeshRef;
80use crate::host::Host;
81use crate::host::LocalProcManager;
82use crate::host_mesh::host_agent::GetLocalProcClient;
83use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
84use crate::host_mesh::host_agent::HostAgent;
85use crate::host_mesh::host_agent::HostAgentMode;
86use crate::host_mesh::host_agent::ProcManagerSpawnFn;
87use crate::mesh_id::HostMeshId;
88use crate::mesh_id::ProcMeshId;
89use crate::proc_agent::GetProcClient;
90use crate::proc_agent::ProcAgent;
91use crate::proc_mesh::ProcMeshRef;
92use crate::proc_mesh::ProcRef;
93use crate::supervision::MeshFailure;
94use crate::transport::default_bind_spec;
95
96/// Single, process-wide supervision sink storage.
97///
98/// Routes undeliverables observed by the process-global root client
99/// (c.f. [`context()`]) to the *currently active* `ProcMesh`'s
100/// agent. Newer meshes override older ones ("last sink wins").
101///
102/// Uses `PortRef` (not `PortHandle`) because the sink target
103/// (`ProcAgent`) runs in a remote worker process.
104static GLOBAL_SUPERVISION_SINK: OnceLock<RwLock<Option<PortRef<ActorSupervisionEvent>>>> =
105    OnceLock::new();
106
107/// Returns the lazily-initialized container that holds the current
108/// process-global supervision sink.
109fn sink_cell() -> &'static RwLock<Option<PortRef<ActorSupervisionEvent>>> {
110    GLOBAL_SUPERVISION_SINK.get_or_init(|| RwLock::new(None))
111}
112
113/// Install (or replace) the process-global supervision sink used by
114/// the [`context()`] undeliverable → supervision bridge.
115///
116/// This uses **last-sink-wins** semantics: if multiple `ProcMesh`
117/// instances are created in the same process (e.g. controller meshes
118/// plus an application mesh), the most recently installed sink
119/// becomes the active destination for forwarded
120/// [`ActorSupervisionEvent`]s.
121///
122/// Returns the previously installed sink, if any, to allow callers to
123/// log/inspect overrides.
124///
125/// Note: the sink is a [`PortRef`] (not a `PortHandle`) because the
126/// destination [`ProcAgent`] may live in a different
127/// process/rank.
128pub(crate) fn set_global_supervision_sink(
129    sink: PortRef<ActorSupervisionEvent>,
130) -> Option<PortRef<ActorSupervisionEvent>> {
131    let cell = sink_cell();
132    let mut guard = cell.write().unwrap();
133    let prev = guard.take();
134    *guard = Some(sink);
135    prev
136}
137
138/// Get the current process-global supervision sink used by the
139/// [`context()`] undeliverable → supervision bridge.
140///
141/// Returns `None` until some mesh creation path installs a sink
142/// (early/late binding). Callers should treat this as "no active mesh
143/// yet": log and drop undeliverables rather than crashing the global
144/// root client.
145///
146/// Cloning a [`PortRef`] is cheap.
147///
148/// Used only by the process-global root client.
149fn get_global_supervision_sink() -> Option<PortRef<ActorSupervisionEvent>> {
150    sink_cell().read().unwrap().clone()
151}
152
153/// Process-global "root client" actor.
154///
155/// This actor lives on the `local_proc` of the singleton [`Host`]
156/// created by [`context()`], symmetric with Python's
157/// `RootClientActor` on `bootstrap_host()`'s local proc.
158///
159/// It acts as a *monitor* for routing failures observed at the
160/// process boundary: undeliverable messages are treated as signals to
161/// be reported via mesh supervision (when a sink is installed), not
162/// as fatal errors.
163///
164/// The actor is driven by `run()`, which `select!`s over:
165/// - `work_rx`: the primary dispatch queue for bound handler work
166///   items (including `Undeliverable<MessageEnvelope>` and
167///   `MeshFailure>`),
168/// - `supervision_rx`: supervision events delivered to this actor,
169///   and
170/// - `signal_rx`: control signals (currently minimal handling).
171#[derive(Debug)]
172#[hyperactor::export(handlers = [MeshFailure])]
173pub struct GlobalClientActor {
174    /// Control signals for the actor's proc (shutdown, etc.).
175    signal_rx: PortReceiver<Signal>,
176    /// Supervision events delivered to this actor instance.
177    ///
178    /// The root client is a monitor, so it should process these
179    /// events without crashing on routine routing/delivery failures
180    /// it observes.
181    supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
182    /// Primary work queue for handler dispatch.
183    ///
184    /// Any bound handler message (e.g. `MeshFailure`,
185    /// `Undeliverable<MessageEnvelope>`, introspection, etc.) is
186    /// received here and executed via `WorkCell::handle`.
187    work_rx: mpsc::UnboundedReceiver<WorkCell<Self>>,
188}
189
190impl GlobalClientActor {
191    fn run(mut self, instance: &'static Instance<Self>) -> JoinHandle<()> {
192        tokio::spawn(async move {
193            #[allow(unused_labels)]
194            let err = 'messages: loop {
195                tokio::select! {
196                    work = self.work_rx.recv() => {
197                        let work = work.expect("inconsistent work queue state");
198                        if let Err(err) = work.handle(&mut self, instance).await {
199                            while let Ok(supervision_event) = self.supervision_rx.try_recv() {
200                                instance.handle_supervision_event(&mut self, supervision_event).await
201                                    .expect("GlobalClientActor::handle_supervision_event is infallible");
202                            }
203                            let kind = ActorErrorKind::processing(err);
204                            break ActorError {
205                                actor_id: Box::new(instance.self_addr().clone()),
206                                kind: Box::new(kind),
207                            };
208                        }
209                    }
210                    _ = self.signal_rx.recv() => {
211                        // TODO: do we need any signal handling for the root client?
212                    }
213                    Some(supervision_event) = self.supervision_rx.recv() => {
214                        instance.handle_supervision_event(&mut self, supervision_event).await
215                            .expect("GlobalClientActor::handle_supervision_event is infallible");
216                    }
217                };
218            };
219            let event = match *err.kind {
220                ActorErrorKind::UnhandledSupervisionEvent(event) => *event,
221                _ => {
222                    let status = ActorStatus::generic_failure(err.kind.to_string());
223                    ActorSupervisionEvent::new(
224                        instance.self_addr().clone(),
225                        Some("testclient".into()),
226                        status,
227                        None,
228                    )
229                }
230            };
231            instance
232                .proc()
233                .handle_unhandled_supervision_event(instance, event);
234        })
235    }
236}
237
238/// Handle a returned (undeliverable) message observed by the
239/// process-global root client.
240///
241/// The global root client is a **monitor**, not a participant: it
242/// must not crash or propagate failures just because a routed message
243/// could not be delivered.
244///
245/// Instead, we translate the undeliverable into an
246/// `ActorSupervisionEvent` and forward it to the **active**
247/// `ProcMesh` via the process-global supervision sink ("last sink
248/// wins"). If no sink has been installed yet (e.g., before the first
249/// `ProcMesh` allocation completes), we log and drop the event.
250#[async_trait]
251impl Actor for GlobalClientActor {
252    /// The global root client is the root of the supervision tree:
253    /// there is no parent to escalate to. Child-actor failures (e.g.
254    /// ActorMeshControllers detecting dead procs after mesh teardown)
255    /// are expected and must not crash the process.
256    async fn handle_supervision_event(
257        &mut self,
258        _this: &Instance<Self>,
259        event: &ActorSupervisionEvent,
260    ) -> Result<bool, anyhow::Error> {
261        tracing::warn!(
262            %event,
263            "global root client absorbed child supervision event",
264        );
265        Ok(true)
266    }
267
268    async fn handle_undeliverable_message(
269        &mut self,
270        cx: &Instance<Self>,
271        undeliverable: Undeliverable<MessageEnvelope>,
272    ) -> Result<(), anyhow::Error> {
273        let mut env = match undeliverable {
274            Undeliverable::Message(env) => env,
275            Undeliverable::Lost(lost) => {
276                let actor_ref = lost.dest.actor_addr();
277                let event = ActorSupervisionEvent::new(
278                    actor_ref.clone(),
279                    None,
280                    ActorStatus::generic_failure(format!(
281                        "message not delivered to {}: {}",
282                        lost.dest, lost.error
283                    )),
284                    None,
285                );
286                match get_global_supervision_sink() {
287                    Some(sink) => {
288                        sink.post(cx, event);
289                    }
290                    None => {
291                        tracing::warn!(
292                            actor=%actor_ref,
293                            error=%lost.error,
294                            "no supervision sink; lost message logged but not forwarded"
295                        );
296                    }
297                }
298                return Ok(());
299            }
300        };
301        env.set_error(DeliveryError::BrokenLink(
302            "message returned to global root client".to_string(),
303        ));
304        let actor_ref = env.dest().actor_addr();
305        let headers = env.headers().clone();
306        let event = ActorSupervisionEvent::new(
307            actor_ref.clone(),
308            None,
309            ActorStatus::generic_failure(format!("message not delivered: {}", env)),
310            Some(headers),
311        );
312
313        match get_global_supervision_sink() {
314            Some(sink) => {
315                sink.post(cx, event);
316            }
317            None => {
318                tracing::warn!(
319                    actor=%actor_ref,
320                    error=?env.errors(),
321                    "no supervision sink; undeliverable message logged but not forwarded"
322                );
323            }
324        }
325        Ok(())
326    }
327}
328
329/// `MeshFailure` is a terminal supervision signal for an `ActorMesh`.
330///
331/// The process-global root client should never be a consumer of
332/// mesh-level supervision failures during normal operation: those
333/// events are expected to be observed and handled by the owning
334/// mesh/controller, not by the global client.
335///
336/// In processes that create and destroy multiple meshes (e.g.,
337/// benchmarks), `MeshFailure` events may arrive here during or after
338/// mesh teardown. Log loudly but do not crash — the global client is
339/// a monitor and must preserve forward progress.
340#[async_trait]
341impl Handler<MeshFailure> for GlobalClientActor {
342    async fn handle(&mut self, _cx: &Context<Self>, message: MeshFailure) -> anyhow::Result<()> {
343        tracing::error!("supervision failure reached global client: {}", message);
344        Ok(())
345    }
346}
347
348struct GlobalState {
349    actor_instance: &'static Instance<GlobalClientActor>,
350    host_mesh: HostMeshRef,
351    proc_mesh: ProcMeshRef,
352}
353
354/// Process-global, lazily-initialized Monarch context.
355///
356/// Backed by a `tokio::sync::OnceCell` so initialization is async and
357/// runs at most once per process. The first caller bootstraps the
358/// singleton host and root client actor (mirroring Python's
359/// `bootstrap_host()` / `context()`), and subsequent callers reuse
360/// the same `GlobalState`.
361///
362/// This provides a stable root `actor_instance` plus `this_host()` /
363/// `this_proc()` accessors.
364static GLOBAL_CONTEXT: tokio::sync::OnceCell<GlobalState> = tokio::sync::OnceCell::const_new();
365
366/// Bootstrap the singleton Host and GlobalClientActor. Mirrors
367/// Python's `bootstrap_host()` (monarch_hyperactor/src/host_mesh.rs).
368async fn bootstrap_host() -> GlobalState {
369    // 1. Create Host with LocalProcManager. The spawn closure is the
370    // ProcAgent boot function, called by HostAgent on GetLocalProc.
371    let spawn: ProcManagerSpawnFn =
372        Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
373    let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
374    let host = Host::new(manager, default_bind_spec().binding_addr())
375        .await
376        .expect("failed to create global host");
377
378    // 2. Extract system_proc before moving Host into HostAgent.
379    let system_proc = host.system_proc().clone();
380
381    // 3. Spawn HostAgent on system_proc (takes ownership of Host).
382    let host_agent = system_proc
383        .spawn(
384            HOST_MESH_AGENT_ACTOR_NAME,
385            HostAgent::new(HostAgentMode::Local(host)),
386        )
387        .expect("failed to spawn host agent");
388
389    // 4. Build HostMeshRef.
390    let host_mesh = HostMeshRef::from_host_agent(
391        HostMeshId::singleton(Label::new("local").unwrap()),
392        host_agent.bind(),
393    )
394    .expect("failed to create host mesh ref");
395
396    // 5. Get local_proc via HostAgent (lazily boots ProcAgent).
397    //
398    // We use a throwaway Proc::isolated() for the bootstrap request-reply
399    // calls, matching Python's bootstrap_host() (host_mesh.rs:330-333).
400    // This creates a temporary in-process-only proc context during init
401    // — intentionally acceptable for cross-language symmetry and easier
402    // reasoning about the bootstrap sequence.
403    let temp_proc = Proc::isolated();
404    let (bootstrap_cx, _guard) = temp_proc
405        .client("bootstrap")
406        .expect("failed to create bootstrap instance");
407    let local_proc_agent: ActorHandle<ProcAgent> = host_agent
408        .get_local_proc(&bootstrap_cx)
409        .await
410        .expect("failed to get local proc agent");
411
412    // 6. Get the actual Proc object.
413    let local_proc = local_proc_agent
414        .get_proc(&bootstrap_cx)
415        .await
416        .expect("failed to get local proc");
417
418    // 7. Build ProcMeshRef.
419    let proc_mesh = ProcMeshRef::new_singleton(
420        ProcMeshId::singleton(Label::new("local").unwrap()),
421        ProcRef::new(
422            local_proc_agent.actor_addr().proc_addr(),
423            0,
424            local_proc_agent.bind(),
425        ),
426    );
427    let actor_instance = local_proc
428        .actor_instance::<GlobalClientActor>("client")
429        .expect("failed to create root client instance");
430
431    let hyperactor::proc::ActorInstance {
432        instance: client_instance,
433        handle,
434        supervision,
435        signal,
436        work,
437    } = actor_instance;
438
439    // GlobalClientActor uses a custom run loop that bypasses
440    // Actor::init, so set_system() must be called explicitly.
441    client_instance.set_system();
442    handle.bind::<GlobalClientActor>();
443
444    // Use a static OnceLock to get 'static lifetime for the instance.
445    static INSTANCE: OnceLock<(Instance<GlobalClientActor>, ActorHandle<GlobalClientActor>)> =
446        OnceLock::new();
447    INSTANCE
448        .set((client_instance, handle))
449        .map_err(|_| "already initialized root client instance")
450        .unwrap();
451    let (instance, _handle) = INSTANCE.get().unwrap();
452
453    let client = GlobalClientActor {
454        signal_rx: signal,
455        supervision_rx: supervision,
456        work_rx: work,
457    };
458    client.run(instance);
459
460    GlobalState {
461        actor_instance: instance,
462        host_mesh,
463        proc_mesh,
464    }
465}
466
467/// Process-global Monarch context for Rust programs. Symmetric with
468/// Python's `context()`.
469pub struct GlobalContext {
470    /// Consistent with Python's `context().actor_instance`
471    pub actor_instance: &'static Instance<GlobalClientActor>,
472    /// The singleton HostMesh. See also [`this_host()`].
473    pub host_mesh: &'static HostMeshRef,
474    /// The local ProcMesh. See also [`this_proc()`].
475    pub proc_mesh: &'static ProcMeshRef,
476}
477
478/// Returns the process-global Monarch context, lazily initialized.
479///
480/// On first call, creates a singleton [`Host`] and bootstraps
481/// [`GlobalClientActor`] on its `local_proc` — symmetric with
482/// Python's `bootstrap_host()`. Subsequent calls return immediately.
483///
484/// ```rust,ignore
485/// let cx = context().await;
486/// cx.actor_instance    // c.f. Python: context().actor_instance
487/// ```
488///
489/// **Python programs do not use this.** Python has its own root
490/// client actor bootstrapped separately.
491pub async fn context() -> GlobalContext {
492    let state = GLOBAL_CONTEXT.get_or_init(bootstrap_host).await;
493    GlobalContext {
494        actor_instance: state.actor_instance,
495        host_mesh: &state.host_mesh,
496        proc_mesh: &state.proc_mesh,
497    }
498}
499
500/// Returns the singleton HostMesh c.f. Python's `this_host()`.
501pub async fn this_host() -> &'static HostMeshRef {
502    &GLOBAL_CONTEXT.get_or_init(bootstrap_host).await.host_mesh
503}
504
505/// Returns the local ProcMesh c.f Python's `this_proc()`.
506pub async fn this_proc() -> &'static ProcMeshRef {
507    &GLOBAL_CONTEXT.get_or_init(bootstrap_host).await.proc_mesh
508}
509
510/// Separate storage for client host registered by non-Rust runtimes
511/// (e.g. Python's `bootstrap_host()`). Checked by `try_this_host()`
512/// alongside `GLOBAL_CONTEXT`.
513static REGISTERED_CLIENT_HOST: std::sync::OnceLock<HostMeshRef> = std::sync::OnceLock::new();
514
515/// Register the client host mesh from an external runtime (Python).
516/// Called by Python's `bootstrap_host()` so that `try_this_host()`
517/// can discover C for the A/C invariant.
518pub fn register_client_host(host_mesh: HostMeshRef) {
519    let _ = REGISTERED_CLIENT_HOST.set(host_mesh);
520}
521
522/// Returns the client host mesh if available, without triggering
523/// lazy bootstrap. Checks both the Rust global context and the
524/// external registration (Python). Used by `MeshAdminAgent` to
525/// discover C at query time (A/C invariant).
526pub fn try_this_host() -> Option<&'static HostMeshRef> {
527    GLOBAL_CONTEXT
528        .get()
529        .map(|state| &state.host_mesh)
530        .or_else(|| REGISTERED_CLIENT_HOST.get())
531}
532
533#[cfg(test)]
534mod tests {
535    use std::time::Duration;
536
537    use hyperactor::testing::ids::test_actor_id;
538    use hyperactor_config::Flattrs;
539    #[cfg(fbcode_build)]
540    use ndslice::view::Extent;
541    #[cfg(fbcode_build)]
542    use timed_test::async_timed_test;
543
544    use super::*;
545    #[cfg(fbcode_build)]
546    use crate::testing;
547
548    /// Helper: send an `Undeliverable<MessageEnvelope>` to the global
549    /// root client's well-known undeliverable port via the runtime's
550    /// routing/dispatch path.
551    ///
552    /// This exercises the full integration boundary: serialisation →
553    /// routing → work_rx dispatch → `handle_undeliverable_message`.
554    ///
555    /// Uses the provided `dest_actor` so callers can distinguish
556    /// events from different injections (important because the global
557    /// sink is shared across tests running in the same process).
558    fn inject_undeliverable(
559        client: &'static Instance<GlobalClientActor>,
560        dest_actor: hyperactor::ActorAddr,
561    ) {
562        let env = MessageEnvelope::new(
563            client.self_addr().clone(),
564            dest_actor.port_addr(0.into()),
565            wirevalue::Any::serialize(&0u64).unwrap(),
566            Flattrs::new(),
567        );
568        // Target the global root client's well-known Undeliverable port.
569        let client_actor_id: hyperactor::ActorAddr = client.self_addr().clone();
570        let undeliverable_port =
571            PortRef::<Undeliverable<MessageEnvelope>>::attest_handler_port(&client_actor_id);
572        undeliverable_port.post(client, Undeliverable::Message(env));
573    }
574
575    /// Verifies that creating a `ProcMesh` installs the
576    /// process-global supervision sink used by the global root
577    /// client.
578    #[async_timed_test(timeout_secs = 30)]
579    #[cfg(fbcode_build)]
580    async fn test_sink_installed_after_mesh_creation() {
581        let instance = testing::instance();
582        let mut hm = testing::host_mesh(2).await;
583        let _mesh = hm
584            .spawn(instance, "test", Extent::unity(), None, None)
585            .await
586            .unwrap();
587        assert!(
588            get_global_supervision_sink().is_some(),
589            "supervision sink must be set after ProcMesh creation"
590        );
591        let _ = hm.shutdown(instance).await;
592    }
593
594    /// Proves the full forwarding pipeline:
595    ///
596    ///   Undeliverable<MessageEnvelope>
597    ///       → GlobalClientActor::handle_undeliverable_message
598    ///       → GLOBAL_SUPERVISION_SINK (PortRef)
599    ///       → ActorSupervisionEvent delivered
600    ///
601    /// Installs a local port as the sink and verifies that the
602    /// `ActorSupervisionEvent` arrives there.
603    #[tokio::test]
604    async fn test_undeliverable_forwarded_to_sink() {
605        let cx = context().await;
606        let client = cx.actor_instance;
607
608        // Install a test sink we control.
609        let (sink_handle, mut sink_rx) = client.open_port::<ActorSupervisionEvent>();
610        set_global_supervision_sink(sink_handle.bind());
611
612        let marker = test_actor_id("fwd_test", "marker_actor");
613        inject_undeliverable(client, marker.clone());
614
615        // The handler runs asynchronously via work_rx; wait for the
616        // forwarded event with our marker.
617        let event = tokio::time::timeout(Duration::from_secs(5), async {
618            loop {
619                let ev = sink_rx.recv().await.expect("sink channel closed");
620                if ev.actor_id == marker {
621                    return ev;
622                }
623                // Discard stale events from other tests sharing the
624                // global sink.
625            }
626        })
627        .await
628        .expect("timed out waiting for supervision event");
629
630        assert_eq!(
631            event.actor_id, marker,
632            "forwarded event must reference the undeliverable's destination actor"
633        );
634    }
635
636    /// Proves last-sink-wins: when two sinks are installed in
637    /// sequence, only the second receives the forwarded event.
638    #[tokio::test]
639    async fn test_last_sink_wins() {
640        let cx = context().await;
641        let client = cx.actor_instance;
642
643        // Install sink A.
644        let (sink_a_handle, _sink_a_rx) = client.open_port::<ActorSupervisionEvent>();
645        set_global_supervision_sink(sink_a_handle.bind());
646
647        // Install sink B (overrides A).
648        let (sink_b_handle, mut sink_b_rx) = client.open_port::<ActorSupervisionEvent>();
649        set_global_supervision_sink(sink_b_handle.bind());
650
651        let marker = test_actor_id("last_wins", "marker_actor");
652        inject_undeliverable(client, marker.clone());
653
654        // B should receive our marked event.
655        let event = tokio::time::timeout(Duration::from_secs(5), async {
656            loop {
657                let ev = sink_b_rx.recv().await.expect("sink B channel closed");
658                if ev.actor_id == marker {
659                    return ev;
660                }
661            }
662        })
663        .await
664        .expect("timed out waiting for supervision event on sink B");
665        assert_eq!(event.actor_id, marker);
666    }
667
668    /// Proves the global client does not crash when no sink is
669    /// installed (early/late binding). The handler must log and
670    /// drop gracefully, and the client must remain usable
671    /// afterward.
672    #[tokio::test]
673    async fn test_no_crash_without_sink() {
674        let cx = context().await;
675        let client = cx.actor_instance;
676
677        // Clear any previously installed sink.
678        *sink_cell().write().unwrap() = None;
679
680        // Inject an undeliverable — should not panic.
681        inject_undeliverable(client, test_actor_id("no_sink", "marker_actor"));
682
683        // Give the async handler time to run.
684        tokio::time::sleep(Duration::from_millis(100)).await;
685
686        // The global client must still be alive and usable.
687        // Verify by installing a new sink and sending another
688        // undeliverable that arrives correctly.
689        let (sink_handle, mut sink_rx) = client.open_port::<ActorSupervisionEvent>();
690        set_global_supervision_sink(sink_handle.bind());
691
692        let marker = test_actor_id("no_sink_recovery", "marker_actor");
693        inject_undeliverable(client, marker.clone());
694
695        let event = tokio::time::timeout(Duration::from_secs(5), async {
696            loop {
697                let ev = sink_rx.recv().await.expect("sink channel closed");
698                if ev.actor_id == marker {
699                    return ev;
700                }
701            }
702        })
703        .await
704        .expect("timed out: global client crashed or stopped processing");
705        assert_eq!(event.actor_id, marker);
706    }
707}