hyperactor_mesh/
global_context.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Process-global context, root client actor, and supervision bridge.
10//!
11//! This module provides the Rust equivalent of Python's `context()`,
12//! `this_host()`, and `this_proc()`. A singleton [`Host`] is lazily
13//! created with the [`GlobalClientActor`] on its `local_proc`:
14//!
15//! ```rust,ignore
16//! let cx = context().await;
17//! cx.actor_instance    // c.f. Python: context().actor_instance
18//! this_host().await    // c.f. Python: this_host()
19//! this_proc().await    // c.f Python: this_proc()
20//! ```
21//!
22//! ## Undeliverables → supervision
23//!
24//! When the runtime detects that a message cannot be delivered, it
25//! produces an [`Undeliverable<MessageEnvelope>`]. The global root
26//! client observes these failures, converts them into
27//! [`ActorSupervisionEvent`]s, and forwards them to the currently
28//! active mesh supervision sink.
29//!
30//! **GC-1 (undeliverable routing):** Any
31//! `Undeliverable<MessageEnvelope>` observed by the global root
32//! client must be reported as an [`ActorSupervisionEvent`] to the
33//! active `ProcMesh`, and handling that failure must never crash the
34//! global client. The root client acts as a monitor, not a
35//! participant: routing failures are treated as signals to be
36//! reported, not fatal errors.
37//!
38//! ## Multiple ProcMeshes
39//!
40//! A process may allocate more than one `ProcMesh` (e.g.
41//! internal/controller meshes plus an application mesh). The root
42//! client is a process-wide singleton, so its supervision sink is
43//! also process-global.
44//!
45//! The active mesh is defined using **last-sink-wins** semantics:
46//! each newly allocated `ProcMesh` installs its sink, overriding the
47//! previous one.
48//!
49//! If no sink has been installed yet (early/late binding),
50//! undeliverables are logged and dropped, preserving forward progress
51//! until a mesh becomes available.
52
53use std::sync::OnceLock;
54use std::sync::RwLock;
55
56use async_trait::async_trait;
57use hyperactor::Actor;
58use hyperactor::ActorHandle;
59use hyperactor::Context;
60use hyperactor::Handler;
61use hyperactor::Instance;
62use hyperactor::actor::ActorError;
63use hyperactor::actor::ActorErrorKind;
64use hyperactor::actor::ActorStatus;
65use hyperactor::actor::Signal;
66use hyperactor::host::Host;
67use hyperactor::host::LocalProcManager;
68use hyperactor::mailbox::DeliveryError;
69use hyperactor::mailbox::MessageEnvelope;
70use hyperactor::mailbox::PortReceiver;
71use hyperactor::mailbox::Undeliverable;
72use hyperactor::proc::Proc;
73use hyperactor::proc::WorkCell;
74use hyperactor::reference as hyperactor_reference;
75use hyperactor::supervision::ActorSupervisionEvent;
76use tokio::sync::mpsc;
77use tokio::task::JoinHandle;
78
79use crate::HostMeshRef;
80use crate::Name;
81use crate::host_mesh::host_agent::GetLocalProcClient;
82use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
83use crate::host_mesh::host_agent::HostAgent;
84use crate::host_mesh::host_agent::HostAgentMode;
85use crate::host_mesh::host_agent::ProcManagerSpawnFn;
86use crate::proc_agent::GetProcClient;
87use crate::proc_agent::ProcAgent;
88use crate::proc_mesh::ProcMeshRef;
89use crate::proc_mesh::ProcRef;
90use crate::supervision::MeshFailure;
91use crate::transport::default_bind_spec;
92
93/// Single, process-wide supervision sink storage.
94///
95/// Routes undeliverables observed by the process-global root client
96/// (c.f. [`context()`]) to the *currently active* `ProcMesh`'s
97/// agent. Newer meshes override older ones ("last sink wins").
98///
99/// Uses `PortRef` (not `PortHandle`) because the sink target
100/// (`ProcAgent`) runs in a remote worker process.
101static GLOBAL_SUPERVISION_SINK: OnceLock<
102    RwLock<Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>>,
103> = OnceLock::new();
104
105/// Returns the lazily-initialized container that holds the current
106/// process-global supervision sink.
107fn sink_cell() -> &'static RwLock<Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>> {
108    GLOBAL_SUPERVISION_SINK.get_or_init(|| RwLock::new(None))
109}
110
111/// Install (or replace) the process-global supervision sink used by
112/// the [`context()`] undeliverable → supervision bridge.
113///
114/// This uses **last-sink-wins** semantics: if multiple `ProcMesh`
115/// instances are created in the same process (e.g. controller meshes
116/// plus an application mesh), the most recently installed sink
117/// becomes the active destination for forwarded
118/// [`ActorSupervisionEvent`]s.
119///
120/// Returns the previously installed sink, if any, to allow callers to
121/// log/inspect overrides.
122///
123/// Note: the sink is a [`PortRef`] (not a `PortHandle`) because the
124/// destination [`ProcAgent`] may live in a different
125/// process/rank.
126pub(crate) fn set_global_supervision_sink(
127    sink: hyperactor_reference::PortRef<ActorSupervisionEvent>,
128) -> Option<hyperactor_reference::PortRef<ActorSupervisionEvent>> {
129    let cell = sink_cell();
130    let mut guard = cell.write().unwrap();
131    let prev = guard.take();
132    *guard = Some(sink);
133    prev
134}
135
136/// Get the current process-global supervision sink used by the
137/// [`context()`] undeliverable → supervision bridge.
138///
139/// Returns `None` until some mesh creation path installs a sink
140/// (early/late binding). Callers should treat this as "no active mesh
141/// yet": log and drop undeliverables rather than crashing the global
142/// root client.
143///
144/// Cloning a [`PortRef`] is cheap.
145///
146/// Used only by the process-global root client.
147fn get_global_supervision_sink() -> Option<hyperactor_reference::PortRef<ActorSupervisionEvent>> {
148    sink_cell().read().unwrap().clone()
149}
150
151/// Process-global "root client" actor.
152///
153/// This actor lives on the `local_proc` of the singleton [`Host`]
154/// created by [`context()`], symmetric with Python's
155/// `RootClientActor` on `bootstrap_host()`'s local proc.
156///
157/// It acts as a *monitor* for routing failures observed at the
158/// process boundary: undeliverable messages are treated as signals to
159/// be reported via mesh supervision (when a sink is installed), not
160/// as fatal errors.
161///
162/// The actor is driven by `run()`, which `select!`s over:
163/// - `work_rx`: the primary dispatch queue for bound handler work
164///   items (including `Undeliverable<MessageEnvelope>` and
165///   `MeshFailure>`),
166/// - `supervision_rx`: supervision events delivered to this actor,
167///   and
168/// - `signal_rx`: control signals (currently minimal handling).
169#[derive(Debug)]
170#[hyperactor::export(handlers = [MeshFailure])]
171pub struct GlobalClientActor {
172    /// Control signals for the actor's proc (shutdown, etc.).
173    signal_rx: PortReceiver<Signal>,
174    /// Supervision events delivered to this actor instance.
175    ///
176    /// The root client is a monitor, so it should process these
177    /// events without crashing on routine routing/delivery failures
178    /// it observes.
179    supervision_rx: PortReceiver<ActorSupervisionEvent>,
180    /// Primary work queue for handler dispatch.
181    ///
182    /// Any bound handler message (e.g. `MeshFailure`,
183    /// `Undeliverable<MessageEnvelope>`, introspection, etc.) is
184    /// received here and executed via `WorkCell::handle`.
185    work_rx: mpsc::UnboundedReceiver<WorkCell<Self>>,
186}
187
188impl GlobalClientActor {
189    fn run(mut self, instance: &'static Instance<Self>) -> JoinHandle<()> {
190        tokio::spawn(async move {
191            #[allow(unused_labels)]
192            let err = 'messages: loop {
193                tokio::select! {
194                    work = self.work_rx.recv() => {
195                        let work = work.expect("inconsistent work queue state");
196                        if let Err(err) = work.handle(&mut self, instance).await {
197                            for supervision_event in self.supervision_rx.drain() {
198                                instance.handle_supervision_event(&mut self, supervision_event).await
199                                    .expect("GlobalClientActor::handle_supervision_event is infallible");
200                            }
201                            let kind = ActorErrorKind::processing(err);
202                            break ActorError {
203                                actor_id: Box::new(instance.self_id().clone()),
204                                kind: Box::new(kind),
205                            };
206                        }
207                    }
208                    _ = self.signal_rx.recv() => {
209                        // TODO: do we need any signal handling for the root client?
210                    }
211                    Ok(supervision_event) = self.supervision_rx.recv() => {
212                        instance.handle_supervision_event(&mut self, supervision_event).await
213                            .expect("GlobalClientActor::handle_supervision_event is infallible");
214                    }
215                };
216            };
217            let event = match *err.kind {
218                ActorErrorKind::UnhandledSupervisionEvent(event) => *event,
219                _ => {
220                    let status = ActorStatus::generic_failure(err.kind.to_string());
221                    ActorSupervisionEvent::new(
222                        instance.self_id().clone(),
223                        Some("testclient".into()),
224                        status,
225                        None,
226                    )
227                }
228            };
229            instance
230                .proc()
231                .handle_unhandled_supervision_event(instance, event);
232        })
233    }
234}
235
236/// Handle a returned (undeliverable) message observed by the
237/// process-global root client.
238///
239/// The global root client is a **monitor**, not a participant: it
240/// must not crash or propagate failures just because a routed message
241/// could not be delivered.
242///
243/// Instead, we translate the undeliverable into an
244/// `ActorSupervisionEvent` and forward it to the **active**
245/// `ProcMesh` via the process-global supervision sink ("last sink
246/// wins"). If no sink has been installed yet (e.g., before the first
247/// `ProcMesh` allocation completes), we log and drop the event.
248#[async_trait]
249impl Actor for GlobalClientActor {
250    /// The global root client is the root of the supervision tree:
251    /// there is no parent to escalate to. Child-actor failures (e.g.
252    /// ActorMeshControllers detecting dead procs after mesh teardown)
253    /// are expected and must not crash the process.
254    async fn handle_supervision_event(
255        &mut self,
256        _this: &Instance<Self>,
257        event: &ActorSupervisionEvent,
258    ) -> Result<bool, anyhow::Error> {
259        tracing::warn!(
260            %event,
261            "global root client absorbed child supervision event",
262        );
263        Ok(true)
264    }
265
266    async fn handle_undeliverable_message(
267        &mut self,
268        cx: &Instance<Self>,
269        Undeliverable(mut env): Undeliverable<MessageEnvelope>,
270    ) -> Result<(), anyhow::Error> {
271        env.set_error(DeliveryError::BrokenLink(
272            "message returned to global root client".to_string(),
273        ));
274        let actor_id = env.dest().actor_id().clone();
275        let headers = env.headers().clone();
276        let event = ActorSupervisionEvent::new(
277            actor_id.clone(),
278            None,
279            ActorStatus::generic_failure(format!("message not delivered: {}", env)),
280            Some(headers),
281        );
282
283        match get_global_supervision_sink() {
284            Some(sink) => {
285                if let Err(e) = sink.send(cx, event) {
286                    tracing::warn!(
287                        %e,
288                        actor=%actor_id,
289                        "failed to forward supervision event from undeliverable"
290                    );
291                }
292            }
293            None => {
294                tracing::warn!(
295                    actor=%actor_id,
296                    error=?env.errors(),
297                    "no supervision sink; undeliverable message logged but not forwarded"
298                );
299            }
300        }
301        Ok(())
302    }
303}
304
305/// `MeshFailure` is a terminal supervision signal for an `ActorMesh`.
306///
307/// The process-global root client should never be a consumer of
308/// mesh-level supervision failures during normal operation: those
309/// events are expected to be observed and handled by the owning
310/// mesh/controller, not by the global client.
311///
312/// In processes that create and destroy multiple meshes (e.g.,
313/// benchmarks), `MeshFailure` events may arrive here during or after
314/// mesh teardown. Log loudly but do not crash — the global client is
315/// a monitor and must preserve forward progress.
316#[async_trait]
317impl Handler<MeshFailure> for GlobalClientActor {
318    async fn handle(&mut self, _cx: &Context<Self>, message: MeshFailure) -> anyhow::Result<()> {
319        tracing::error!("supervision failure reached global client: {}", message);
320        Ok(())
321    }
322}
323
324struct GlobalState {
325    actor_instance: &'static Instance<GlobalClientActor>,
326    host_mesh: HostMeshRef,
327    proc_mesh: ProcMeshRef,
328}
329
330/// Process-global, lazily-initialized Monarch context.
331///
332/// Backed by a `tokio::sync::OnceCell` so initialization is async and
333/// runs at most once per process. The first caller bootstraps the
334/// singleton host and root client actor (mirroring Python's
335/// `bootstrap_host()` / `context()`), and subsequent callers reuse
336/// the same `GlobalState`.
337///
338/// This provides a stable root `actor_instance` plus `this_host()` /
339/// `this_proc()` accessors.
340static GLOBAL_CONTEXT: tokio::sync::OnceCell<GlobalState> = tokio::sync::OnceCell::const_new();
341
342/// Bootstrap the singleton Host and GlobalClientActor. Mirrors
343/// Python's `bootstrap_host()` (monarch_hyperactor/src/host_mesh.rs).
344async fn bootstrap_host() -> GlobalState {
345    // 1. Create Host with LocalProcManager. The spawn closure is the
346    // ProcAgent boot function, called by HostAgent on GetLocalProc.
347    let spawn: ProcManagerSpawnFn =
348        Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
349    let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
350    let host = Host::new(manager, default_bind_spec().binding_addr())
351        .await
352        .expect("failed to create global host");
353
354    // 2. Extract system_proc before moving Host into HostAgent.
355    let system_proc = host.system_proc().clone();
356
357    // 3. Spawn HostAgent on system_proc (takes ownership of Host).
358    let host_agent = system_proc
359        .spawn(
360            HOST_MESH_AGENT_ACTOR_NAME,
361            HostAgent::new(HostAgentMode::Local(host)),
362        )
363        .expect("failed to spawn host agent");
364
365    // 4. Build HostMeshRef.
366    let host_mesh =
367        HostMeshRef::from_host_agent(Name::new_reserved("local").unwrap(), host_agent.bind())
368            .expect("failed to create host mesh ref");
369
370    // 5. Get local_proc via HostAgent (lazily boots ProcAgent).
371    //
372    // We use a throwaway Proc::local() for the bootstrap request-reply
373    // calls, matching Python's bootstrap_host() (host_mesh.rs:330-333).
374    // This creates a temporary in-process-only proc context during init
375    // — intentionally acceptable for cross-language symmetry and easier
376    // reasoning about the bootstrap sequence.
377    let temp_proc = Proc::local();
378    let (bootstrap_cx, _guard) = temp_proc
379        .instance("bootstrap")
380        .expect("failed to create bootstrap instance");
381    let local_proc_agent: ActorHandle<ProcAgent> = host_agent
382        .get_local_proc(&bootstrap_cx)
383        .await
384        .expect("failed to get local proc agent");
385
386    // 6. Get the actual Proc object.
387    let local_proc = local_proc_agent
388        .get_proc(&bootstrap_cx)
389        .await
390        .expect("failed to get local proc");
391
392    // 7. Build ProcMeshRef.
393    let proc_mesh = ProcMeshRef::new_singleton(
394        Name::new_reserved("local").unwrap(),
395        ProcRef::new(
396            local_proc_agent.actor_id().proc_id().clone(),
397            0,
398            local_proc_agent.bind(),
399        ),
400    );
401    let actor_instance = local_proc
402        .actor_instance::<GlobalClientActor>("client")
403        .expect("failed to create root client instance");
404
405    let hyperactor::proc::ActorInstance {
406        instance: client_instance,
407        handle,
408        supervision,
409        signal,
410        work,
411    } = actor_instance;
412
413    // GlobalClientActor uses a custom run loop that bypasses
414    // Actor::init, so set_system() must be called explicitly.
415    client_instance.set_system();
416    handle.bind::<GlobalClientActor>();
417
418    // Use a static OnceLock to get 'static lifetime for the instance.
419    static INSTANCE: OnceLock<(Instance<GlobalClientActor>, ActorHandle<GlobalClientActor>)> =
420        OnceLock::new();
421    INSTANCE
422        .set((client_instance, handle))
423        .map_err(|_| "already initialized root client instance")
424        .unwrap();
425    let (instance, _handle) = INSTANCE.get().unwrap();
426
427    let client = GlobalClientActor {
428        signal_rx: signal,
429        supervision_rx: supervision,
430        work_rx: work,
431    };
432    client.run(instance);
433
434    GlobalState {
435        actor_instance: instance,
436        host_mesh,
437        proc_mesh,
438    }
439}
440
441/// Process-global Monarch context for Rust programs. Symmetric with
442/// Python's `context()`.
443pub struct GlobalContext {
444    /// Consistent with Python's `context().actor_instance`
445    pub actor_instance: &'static Instance<GlobalClientActor>,
446    /// The singleton HostMesh. See also [`this_host()`].
447    pub host_mesh: &'static HostMeshRef,
448    /// The local ProcMesh. See also [`this_proc()`].
449    pub proc_mesh: &'static ProcMeshRef,
450}
451
452/// Returns the process-global Monarch context, lazily initialized.
453///
454/// On first call, creates a singleton [`Host`] and bootstraps
455/// [`GlobalClientActor`] on its `local_proc` — symmetric with
456/// Python's `bootstrap_host()`. Subsequent calls return immediately.
457///
458/// ```rust,ignore
459/// let cx = context().await;
460/// cx.actor_instance    // c.f. Python: context().actor_instance
461/// ```
462///
463/// **Python programs do not use this.** Python has its own root
464/// client actor bootstrapped separately.
465pub async fn context() -> GlobalContext {
466    let state = GLOBAL_CONTEXT.get_or_init(bootstrap_host).await;
467    GlobalContext {
468        actor_instance: state.actor_instance,
469        host_mesh: &state.host_mesh,
470        proc_mesh: &state.proc_mesh,
471    }
472}
473
474/// Returns the singleton HostMesh c.f. Python's `this_host()`.
475pub async fn this_host() -> &'static HostMeshRef {
476    &GLOBAL_CONTEXT.get_or_init(bootstrap_host).await.host_mesh
477}
478
479/// Returns the local ProcMesh c.f Python's `this_proc()`.
480pub async fn this_proc() -> &'static ProcMeshRef {
481    &GLOBAL_CONTEXT.get_or_init(bootstrap_host).await.proc_mesh
482}
483
484/// Separate storage for client host registered by non-Rust runtimes
485/// (e.g. Python's `bootstrap_host()`). Checked by `try_this_host()`
486/// alongside `GLOBAL_CONTEXT`.
487static REGISTERED_CLIENT_HOST: std::sync::OnceLock<HostMeshRef> = std::sync::OnceLock::new();
488
489/// Register the client host mesh from an external runtime (Python).
490/// Called by Python's `bootstrap_host()` so that `try_this_host()`
491/// can discover C for the A/C invariant.
492pub fn register_client_host(host_mesh: HostMeshRef) {
493    let _ = REGISTERED_CLIENT_HOST.set(host_mesh);
494}
495
496/// Returns the client host mesh if available, without triggering
497/// lazy bootstrap. Checks both the Rust global context and the
498/// external registration (Python). Used by `MeshAdminAgent` to
499/// discover C at query time (A/C invariant).
500pub fn try_this_host() -> Option<&'static HostMeshRef> {
501    GLOBAL_CONTEXT
502        .get()
503        .map(|state| &state.host_mesh)
504        .or_else(|| REGISTERED_CLIENT_HOST.get())
505}
506
507#[cfg(test)]
508mod tests {
509    use std::time::Duration;
510
511    use hyperactor::reference as hyperactor_reference;
512    use hyperactor::testing::ids::test_actor_id;
513    use hyperactor_config::Flattrs;
514    use ndslice::extent;
515
516    use super::*;
517    use crate::testing;
518
519    /// Helper: send an `Undeliverable<MessageEnvelope>` to the global
520    /// root client's well-known undeliverable port via the runtime's
521    /// routing/dispatch path.
522    ///
523    /// This exercises the full integration boundary: serialisation →
524    /// routing → work_rx dispatch → `handle_undeliverable_message`.
525    ///
526    /// Uses the provided `dest_actor` so callers can distinguish
527    /// events from different injections (important because the global
528    /// sink is shared across tests running in the same process).
529    fn inject_undeliverable(
530        client: &'static Instance<GlobalClientActor>,
531        dest_actor: hyperactor::reference::ActorId,
532    ) {
533        let env = MessageEnvelope::new(
534            client.self_id().clone(),
535            hyperactor_reference::PortId::new(dest_actor, 0),
536            wirevalue::Any::serialize(&0u64).unwrap(),
537            Flattrs::new(),
538        );
539        // Target the global root client's well-known Undeliverable port.
540        let undeliverable_port =
541            hyperactor_reference::PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
542                client.self_id(),
543            );
544        undeliverable_port
545            .send(client, Undeliverable(env))
546            .expect("inject_undeliverable: send failed");
547    }
548
549    /// Verifies that creating a `ProcMesh` installs the
550    /// process-global supervision sink used by the global root
551    /// client.
552    #[tokio::test]
553    async fn test_sink_installed_after_mesh_creation() {
554        let (_mesh, _actor, _router) = testing::local_proc_mesh(extent!(replica = 2)).await;
555        assert!(
556            get_global_supervision_sink().is_some(),
557            "supervision sink must be set after ProcMesh creation"
558        );
559    }
560
561    /// Proves the full forwarding pipeline:
562    ///
563    ///   Undeliverable<MessageEnvelope>
564    ///       → GlobalClientActor::handle_undeliverable_message
565    ///       → GLOBAL_SUPERVISION_SINK (PortRef)
566    ///       → ActorSupervisionEvent delivered
567    ///
568    /// Installs a local port as the sink and verifies that the
569    /// `ActorSupervisionEvent` arrives there.
570    #[tokio::test]
571    async fn test_undeliverable_forwarded_to_sink() {
572        let cx = context().await;
573        let client = cx.actor_instance;
574
575        // Install a test sink we control.
576        let (sink_handle, mut sink_rx) = client.open_port::<ActorSupervisionEvent>();
577        set_global_supervision_sink(sink_handle.bind());
578
579        let marker = test_actor_id("fwd_test", "marker_actor");
580        inject_undeliverable(client, marker.clone());
581
582        // The handler runs asynchronously via work_rx; wait for the
583        // forwarded event with our marker.
584        let event = tokio::time::timeout(Duration::from_secs(5), async {
585            loop {
586                let ev = sink_rx.recv().await.expect("sink channel closed");
587                if ev.actor_id == marker {
588                    return ev;
589                }
590                // Discard stale events from other tests sharing the
591                // global sink.
592            }
593        })
594        .await
595        .expect("timed out waiting for supervision event");
596
597        assert_eq!(
598            event.actor_id, marker,
599            "forwarded event must reference the undeliverable's destination actor"
600        );
601    }
602
603    /// Proves last-sink-wins: when two sinks are installed in
604    /// sequence, only the second receives the forwarded event.
605    #[tokio::test]
606    async fn test_last_sink_wins() {
607        let cx = context().await;
608        let client = cx.actor_instance;
609
610        // Install sink A.
611        let (sink_a_handle, _sink_a_rx) = client.open_port::<ActorSupervisionEvent>();
612        set_global_supervision_sink(sink_a_handle.bind());
613
614        // Install sink B (overrides A).
615        let (sink_b_handle, mut sink_b_rx) = client.open_port::<ActorSupervisionEvent>();
616        set_global_supervision_sink(sink_b_handle.bind());
617
618        let marker = test_actor_id("last_wins", "marker_actor");
619        inject_undeliverable(client, marker.clone());
620
621        // B should receive our marked event.
622        let event = tokio::time::timeout(Duration::from_secs(5), async {
623            loop {
624                let ev = sink_b_rx.recv().await.expect("sink B channel closed");
625                if ev.actor_id == marker {
626                    return ev;
627                }
628            }
629        })
630        .await
631        .expect("timed out waiting for supervision event on sink B");
632        assert_eq!(event.actor_id, marker);
633    }
634
635    /// Proves the global client does not crash when no sink is
636    /// installed (early/late binding). The handler must log and
637    /// drop gracefully, and the client must remain usable
638    /// afterward.
639    #[tokio::test]
640    async fn test_no_crash_without_sink() {
641        let cx = context().await;
642        let client = cx.actor_instance;
643
644        // Clear any previously installed sink.
645        *sink_cell().write().unwrap() = None;
646
647        // Inject an undeliverable — should not panic.
648        inject_undeliverable(client, test_actor_id("no_sink", "marker_actor"));
649
650        // Give the async handler time to run.
651        tokio::time::sleep(Duration::from_millis(100)).await;
652
653        // The global client must still be alive and usable.
654        // Verify by installing a new sink and sending another
655        // undeliverable that arrives correctly.
656        let (sink_handle, mut sink_rx) = client.open_port::<ActorSupervisionEvent>();
657        set_global_supervision_sink(sink_handle.bind());
658
659        let marker = test_actor_id("no_sink_recovery", "marker_actor");
660        inject_undeliverable(client, marker.clone());
661
662        let event = tokio::time::timeout(Duration::from_secs(5), async {
663            loop {
664                let ev = sink_rx.recv().await.expect("sink channel closed");
665                if ev.actor_id == marker {
666                    return ev;
667                }
668            }
669        })
670        .await
671        .expect("timed out: global client crashed or stopped processing");
672        assert_eq!(event.actor_id, marker);
673    }
674}