hyperactor_mesh/global_context.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Process-global context, root client actor, and supervision bridge.
10//!
11//! This module provides the Rust equivalent of Python's `context()`,
12//! `this_host()`, and `this_proc()`. A singleton [`Host`] is lazily
13//! created with the [`GlobalClientActor`] on its `local_proc`:
14//!
15//! ```rust,ignore
16//! let cx = context().await;
17//! cx.actor_instance // c.f. Python: context().actor_instance
18//! this_host().await // c.f. Python: this_host()
19//! this_proc().await // c.f Python: this_proc()
20//! ```
21//!
22//! ## Undeliverables → supervision
23//!
24//! When the runtime detects that a message cannot be delivered, it
25//! produces an [`Undeliverable<MessageEnvelope>`]. The global root
26//! client observes these failures, converts them into
27//! [`ActorSupervisionEvent`]s, and forwards them to the currently
28//! active mesh supervision sink.
29//!
30//! **GC-1 (undeliverable routing):** Any
31//! `Undeliverable<MessageEnvelope>` observed by the global root
32//! client must be reported as an [`ActorSupervisionEvent`] to the
33//! active `ProcMesh`, and handling that failure must never crash the
34//! global client. The root client acts as a monitor, not a
35//! participant: routing failures are treated as signals to be
36//! reported, not fatal errors.
37//!
38//! ## Multiple ProcMeshes
39//!
40//! A process may allocate more than one `ProcMesh` (e.g.
41//! internal/controller meshes plus an application mesh). The root
42//! client is a process-wide singleton, so its supervision sink is
43//! also process-global.
44//!
45//! The active mesh is defined using **last-sink-wins** semantics:
46//! each newly allocated `ProcMesh` installs its sink, overriding the
47//! previous one.
48//!
49//! If no sink has been installed yet (early/late binding),
50//! undeliverables are logged and dropped, preserving forward progress
51//! until a mesh becomes available.
52
53use std::sync::OnceLock;
54use std::sync::RwLock;
55
56use async_trait::async_trait;
57use hyperactor::Actor;
58use hyperactor::ActorHandle;
59use hyperactor::Context;
60use hyperactor::Handler;
61use hyperactor::Instance;
62use hyperactor::actor::ActorError;
63use hyperactor::actor::ActorErrorKind;
64use hyperactor::actor::ActorStatus;
65use hyperactor::actor::Signal;
66use hyperactor::host::Host;
67use hyperactor::host::LocalProcManager;
68use hyperactor::mailbox::DeliveryError;
69use hyperactor::mailbox::MessageEnvelope;
70use hyperactor::mailbox::PortReceiver;
71use hyperactor::mailbox::Undeliverable;
72use hyperactor::proc::Proc;
73use hyperactor::proc::WorkCell;
74use hyperactor::reference as hyperactor_reference;
75use hyperactor::supervision::ActorSupervisionEvent;
76use tokio::sync::mpsc;
77use tokio::task::JoinHandle;
78
79use crate::HostMeshRef;
80use crate::Name;
81use crate::host_mesh::host_agent::GetLocalProcClient;
82use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
83use crate::host_mesh::host_agent::HostAgent;
84use crate::host_mesh::host_agent::HostAgentMode;
85use crate::host_mesh::host_agent::ProcManagerSpawnFn;
86use crate::proc_agent::GetProcClient;
87use crate::proc_agent::ProcAgent;
88use crate::proc_mesh::ProcMeshRef;
89use crate::proc_mesh::ProcRef;
90use crate::supervision::MeshFailure;
91use crate::transport::default_bind_spec;
92
93/// Single, process-wide supervision sink storage.
94///
95/// Routes undeliverables observed by the process-global root client
96/// (c.f. [`context()`]) to the *currently active* `ProcMesh`'s
97/// agent. Newer meshes override older ones ("last sink wins").
98///
99/// Uses `PortRef` (not `PortHandle`) because the sink target
100/// (`ProcAgent`) runs in a remote worker process.
101static GLOBAL_SUPERVISION_SINK: OnceLock<
102 RwLock<Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>>,
103> = OnceLock::new();
104
105/// Returns the lazily-initialized container that holds the current
106/// process-global supervision sink.
107fn sink_cell() -> &'static RwLock<Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>> {
108 GLOBAL_SUPERVISION_SINK.get_or_init(|| RwLock::new(None))
109}
110
111/// Install (or replace) the process-global supervision sink used by
112/// the [`context()`] undeliverable → supervision bridge.
113///
114/// This uses **last-sink-wins** semantics: if multiple `ProcMesh`
115/// instances are created in the same process (e.g. controller meshes
116/// plus an application mesh), the most recently installed sink
117/// becomes the active destination for forwarded
118/// [`ActorSupervisionEvent`]s.
119///
120/// Returns the previously installed sink, if any, to allow callers to
121/// log/inspect overrides.
122///
123/// Note: the sink is a [`PortRef`] (not a `PortHandle`) because the
124/// destination [`ProcAgent`] may live in a different
125/// process/rank.
126pub(crate) fn set_global_supervision_sink(
127 sink: hyperactor_reference::PortRef<ActorSupervisionEvent>,
128) -> Option<hyperactor_reference::PortRef<ActorSupervisionEvent>> {
129 let cell = sink_cell();
130 let mut guard = cell.write().unwrap();
131 let prev = guard.take();
132 *guard = Some(sink);
133 prev
134}
135
136/// Get the current process-global supervision sink used by the
137/// [`context()`] undeliverable → supervision bridge.
138///
139/// Returns `None` until some mesh creation path installs a sink
140/// (early/late binding). Callers should treat this as "no active mesh
141/// yet": log and drop undeliverables rather than crashing the global
142/// root client.
143///
144/// Cloning a [`PortRef`] is cheap.
145///
146/// Used only by the process-global root client.
147fn get_global_supervision_sink() -> Option<hyperactor_reference::PortRef<ActorSupervisionEvent>> {
148 sink_cell().read().unwrap().clone()
149}
150
151/// Process-global "root client" actor.
152///
153/// This actor lives on the `local_proc` of the singleton [`Host`]
154/// created by [`context()`], symmetric with Python's
155/// `RootClientActor` on `bootstrap_host()`'s local proc.
156///
157/// It acts as a *monitor* for routing failures observed at the
158/// process boundary: undeliverable messages are treated as signals to
159/// be reported via mesh supervision (when a sink is installed), not
160/// as fatal errors.
161///
162/// The actor is driven by `run()`, which `select!`s over:
163/// - `work_rx`: the primary dispatch queue for bound handler work
164/// items (including `Undeliverable<MessageEnvelope>` and
165/// `MeshFailure>`),
166/// - `supervision_rx`: supervision events delivered to this actor,
167/// and
168/// - `signal_rx`: control signals (currently minimal handling).
169#[derive(Debug)]
170#[hyperactor::export(handlers = [MeshFailure])]
171pub struct GlobalClientActor {
172 /// Control signals for the actor's proc (shutdown, etc.).
173 signal_rx: PortReceiver<Signal>,
174 /// Supervision events delivered to this actor instance.
175 ///
176 /// The root client is a monitor, so it should process these
177 /// events without crashing on routine routing/delivery failures
178 /// it observes.
179 supervision_rx: PortReceiver<ActorSupervisionEvent>,
180 /// Primary work queue for handler dispatch.
181 ///
182 /// Any bound handler message (e.g. `MeshFailure`,
183 /// `Undeliverable<MessageEnvelope>`, introspection, etc.) is
184 /// received here and executed via `WorkCell::handle`.
185 work_rx: mpsc::UnboundedReceiver<WorkCell<Self>>,
186}
187
188impl GlobalClientActor {
189 fn run(mut self, instance: &'static Instance<Self>) -> JoinHandle<()> {
190 tokio::spawn(async move {
191 #[allow(unused_labels)]
192 let err = 'messages: loop {
193 tokio::select! {
194 work = self.work_rx.recv() => {
195 let work = work.expect("inconsistent work queue state");
196 if let Err(err) = work.handle(&mut self, instance).await {
197 for supervision_event in self.supervision_rx.drain() {
198 instance.handle_supervision_event(&mut self, supervision_event).await
199 .expect("GlobalClientActor::handle_supervision_event is infallible");
200 }
201 let kind = ActorErrorKind::processing(err);
202 break ActorError {
203 actor_id: Box::new(instance.self_id().clone()),
204 kind: Box::new(kind),
205 };
206 }
207 }
208 _ = self.signal_rx.recv() => {
209 // TODO: do we need any signal handling for the root client?
210 }
211 Ok(supervision_event) = self.supervision_rx.recv() => {
212 instance.handle_supervision_event(&mut self, supervision_event).await
213 .expect("GlobalClientActor::handle_supervision_event is infallible");
214 }
215 };
216 };
217 let event = match *err.kind {
218 ActorErrorKind::UnhandledSupervisionEvent(event) => *event,
219 _ => {
220 let status = ActorStatus::generic_failure(err.kind.to_string());
221 ActorSupervisionEvent::new(
222 instance.self_id().clone(),
223 Some("testclient".into()),
224 status,
225 None,
226 )
227 }
228 };
229 instance
230 .proc()
231 .handle_unhandled_supervision_event(instance, event);
232 })
233 }
234}
235
236/// Handle a returned (undeliverable) message observed by the
237/// process-global root client.
238///
239/// The global root client is a **monitor**, not a participant: it
240/// must not crash or propagate failures just because a routed message
241/// could not be delivered.
242///
243/// Instead, we translate the undeliverable into an
244/// `ActorSupervisionEvent` and forward it to the **active**
245/// `ProcMesh` via the process-global supervision sink ("last sink
246/// wins"). If no sink has been installed yet (e.g., before the first
247/// `ProcMesh` allocation completes), we log and drop the event.
248#[async_trait]
249impl Actor for GlobalClientActor {
250 /// The global root client is the root of the supervision tree:
251 /// there is no parent to escalate to. Child-actor failures (e.g.
252 /// ActorMeshControllers detecting dead procs after mesh teardown)
253 /// are expected and must not crash the process.
254 async fn handle_supervision_event(
255 &mut self,
256 _this: &Instance<Self>,
257 event: &ActorSupervisionEvent,
258 ) -> Result<bool, anyhow::Error> {
259 tracing::warn!(
260 %event,
261 "global root client absorbed child supervision event",
262 );
263 Ok(true)
264 }
265
266 async fn handle_undeliverable_message(
267 &mut self,
268 cx: &Instance<Self>,
269 Undeliverable(mut env): Undeliverable<MessageEnvelope>,
270 ) -> Result<(), anyhow::Error> {
271 env.set_error(DeliveryError::BrokenLink(
272 "message returned to global root client".to_string(),
273 ));
274 let actor_id = env.dest().actor_id().clone();
275 let headers = env.headers().clone();
276 let event = ActorSupervisionEvent::new(
277 actor_id.clone(),
278 None,
279 ActorStatus::generic_failure(format!("message not delivered: {}", env)),
280 Some(headers),
281 );
282
283 match get_global_supervision_sink() {
284 Some(sink) => {
285 if let Err(e) = sink.send(cx, event) {
286 tracing::warn!(
287 %e,
288 actor=%actor_id,
289 "failed to forward supervision event from undeliverable"
290 );
291 }
292 }
293 None => {
294 tracing::warn!(
295 actor=%actor_id,
296 error=?env.errors(),
297 "no supervision sink; undeliverable message logged but not forwarded"
298 );
299 }
300 }
301 Ok(())
302 }
303}
304
305/// `MeshFailure` is a terminal supervision signal for an `ActorMesh`.
306///
307/// The process-global root client should never be a consumer of
308/// mesh-level supervision failures during normal operation: those
309/// events are expected to be observed and handled by the owning
310/// mesh/controller, not by the global client.
311///
312/// In processes that create and destroy multiple meshes (e.g.,
313/// benchmarks), `MeshFailure` events may arrive here during or after
314/// mesh teardown. Log loudly but do not crash — the global client is
315/// a monitor and must preserve forward progress.
316#[async_trait]
317impl Handler<MeshFailure> for GlobalClientActor {
318 async fn handle(&mut self, _cx: &Context<Self>, message: MeshFailure) -> anyhow::Result<()> {
319 tracing::error!("supervision failure reached global client: {}", message);
320 Ok(())
321 }
322}
323
324struct GlobalState {
325 actor_instance: &'static Instance<GlobalClientActor>,
326 host_mesh: HostMeshRef,
327 proc_mesh: ProcMeshRef,
328}
329
330/// Process-global, lazily-initialized Monarch context.
331///
332/// Backed by a `tokio::sync::OnceCell` so initialization is async and
333/// runs at most once per process. The first caller bootstraps the
334/// singleton host and root client actor (mirroring Python's
335/// `bootstrap_host()` / `context()`), and subsequent callers reuse
336/// the same `GlobalState`.
337///
338/// This provides a stable root `actor_instance` plus `this_host()` /
339/// `this_proc()` accessors.
340static GLOBAL_CONTEXT: tokio::sync::OnceCell<GlobalState> = tokio::sync::OnceCell::const_new();
341
342/// Bootstrap the singleton Host and GlobalClientActor. Mirrors
343/// Python's `bootstrap_host()` (monarch_hyperactor/src/host_mesh.rs).
344async fn bootstrap_host() -> GlobalState {
345 // 1. Create Host with LocalProcManager. The spawn closure is the
346 // ProcAgent boot function, called by HostAgent on GetLocalProc.
347 let spawn: ProcManagerSpawnFn =
348 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
349 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
350 let host = Host::new(manager, default_bind_spec().binding_addr())
351 .await
352 .expect("failed to create global host");
353
354 // 2. Extract system_proc before moving Host into HostAgent.
355 let system_proc = host.system_proc().clone();
356
357 // 3. Spawn HostAgent on system_proc (takes ownership of Host).
358 let host_agent = system_proc
359 .spawn(
360 HOST_MESH_AGENT_ACTOR_NAME,
361 HostAgent::new(HostAgentMode::Local(host)),
362 )
363 .expect("failed to spawn host agent");
364
365 // 4. Build HostMeshRef.
366 let host_mesh =
367 HostMeshRef::from_host_agent(Name::new_reserved("local").unwrap(), host_agent.bind())
368 .expect("failed to create host mesh ref");
369
370 // 5. Get local_proc via HostAgent (lazily boots ProcAgent).
371 //
372 // We use a throwaway Proc::local() for the bootstrap request-reply
373 // calls, matching Python's bootstrap_host() (host_mesh.rs:330-333).
374 // This creates a temporary in-process-only proc context during init
375 // — intentionally acceptable for cross-language symmetry and easier
376 // reasoning about the bootstrap sequence.
377 let temp_proc = Proc::local();
378 let (bootstrap_cx, _guard) = temp_proc
379 .instance("bootstrap")
380 .expect("failed to create bootstrap instance");
381 let local_proc_agent: ActorHandle<ProcAgent> = host_agent
382 .get_local_proc(&bootstrap_cx)
383 .await
384 .expect("failed to get local proc agent");
385
386 // 6. Get the actual Proc object.
387 let local_proc = local_proc_agent
388 .get_proc(&bootstrap_cx)
389 .await
390 .expect("failed to get local proc");
391
392 // 7. Build ProcMeshRef.
393 let proc_mesh = ProcMeshRef::new_singleton(
394 Name::new_reserved("local").unwrap(),
395 ProcRef::new(
396 local_proc_agent.actor_id().proc_id().clone(),
397 0,
398 local_proc_agent.bind(),
399 ),
400 );
401 let actor_instance = local_proc
402 .actor_instance::<GlobalClientActor>("client")
403 .expect("failed to create root client instance");
404
405 let hyperactor::proc::ActorInstance {
406 instance: client_instance,
407 handle,
408 supervision,
409 signal,
410 work,
411 } = actor_instance;
412
413 // GlobalClientActor uses a custom run loop that bypasses
414 // Actor::init, so set_system() must be called explicitly.
415 client_instance.set_system();
416 handle.bind::<GlobalClientActor>();
417
418 // Use a static OnceLock to get 'static lifetime for the instance.
419 static INSTANCE: OnceLock<(Instance<GlobalClientActor>, ActorHandle<GlobalClientActor>)> =
420 OnceLock::new();
421 INSTANCE
422 .set((client_instance, handle))
423 .map_err(|_| "already initialized root client instance")
424 .unwrap();
425 let (instance, _handle) = INSTANCE.get().unwrap();
426
427 let client = GlobalClientActor {
428 signal_rx: signal,
429 supervision_rx: supervision,
430 work_rx: work,
431 };
432 client.run(instance);
433
434 GlobalState {
435 actor_instance: instance,
436 host_mesh,
437 proc_mesh,
438 }
439}
440
441/// Process-global Monarch context for Rust programs. Symmetric with
442/// Python's `context()`.
443pub struct GlobalContext {
444 /// Consistent with Python's `context().actor_instance`
445 pub actor_instance: &'static Instance<GlobalClientActor>,
446 /// The singleton HostMesh. See also [`this_host()`].
447 pub host_mesh: &'static HostMeshRef,
448 /// The local ProcMesh. See also [`this_proc()`].
449 pub proc_mesh: &'static ProcMeshRef,
450}
451
452/// Returns the process-global Monarch context, lazily initialized.
453///
454/// On first call, creates a singleton [`Host`] and bootstraps
455/// [`GlobalClientActor`] on its `local_proc` — symmetric with
456/// Python's `bootstrap_host()`. Subsequent calls return immediately.
457///
458/// ```rust,ignore
459/// let cx = context().await;
460/// cx.actor_instance // c.f. Python: context().actor_instance
461/// ```
462///
463/// **Python programs do not use this.** Python has its own root
464/// client actor bootstrapped separately.
465pub async fn context() -> GlobalContext {
466 let state = GLOBAL_CONTEXT.get_or_init(bootstrap_host).await;
467 GlobalContext {
468 actor_instance: state.actor_instance,
469 host_mesh: &state.host_mesh,
470 proc_mesh: &state.proc_mesh,
471 }
472}
473
474/// Returns the singleton HostMesh c.f. Python's `this_host()`.
475pub async fn this_host() -> &'static HostMeshRef {
476 &GLOBAL_CONTEXT.get_or_init(bootstrap_host).await.host_mesh
477}
478
479/// Returns the local ProcMesh c.f Python's `this_proc()`.
480pub async fn this_proc() -> &'static ProcMeshRef {
481 &GLOBAL_CONTEXT.get_or_init(bootstrap_host).await.proc_mesh
482}
483
484/// Separate storage for client host registered by non-Rust runtimes
485/// (e.g. Python's `bootstrap_host()`). Checked by `try_this_host()`
486/// alongside `GLOBAL_CONTEXT`.
487static REGISTERED_CLIENT_HOST: std::sync::OnceLock<HostMeshRef> = std::sync::OnceLock::new();
488
489/// Register the client host mesh from an external runtime (Python).
490/// Called by Python's `bootstrap_host()` so that `try_this_host()`
491/// can discover C for the A/C invariant.
492pub fn register_client_host(host_mesh: HostMeshRef) {
493 let _ = REGISTERED_CLIENT_HOST.set(host_mesh);
494}
495
496/// Returns the client host mesh if available, without triggering
497/// lazy bootstrap. Checks both the Rust global context and the
498/// external registration (Python). Used by `MeshAdminAgent` to
499/// discover C at query time (A/C invariant).
500pub fn try_this_host() -> Option<&'static HostMeshRef> {
501 GLOBAL_CONTEXT
502 .get()
503 .map(|state| &state.host_mesh)
504 .or_else(|| REGISTERED_CLIENT_HOST.get())
505}
506
507#[cfg(test)]
508mod tests {
509 use std::time::Duration;
510
511 use hyperactor::reference as hyperactor_reference;
512 use hyperactor::testing::ids::test_actor_id;
513 use hyperactor_config::Flattrs;
514 use ndslice::extent;
515
516 use super::*;
517 use crate::testing;
518
519 /// Helper: send an `Undeliverable<MessageEnvelope>` to the global
520 /// root client's well-known undeliverable port via the runtime's
521 /// routing/dispatch path.
522 ///
523 /// This exercises the full integration boundary: serialisation →
524 /// routing → work_rx dispatch → `handle_undeliverable_message`.
525 ///
526 /// Uses the provided `dest_actor` so callers can distinguish
527 /// events from different injections (important because the global
528 /// sink is shared across tests running in the same process).
529 fn inject_undeliverable(
530 client: &'static Instance<GlobalClientActor>,
531 dest_actor: hyperactor::reference::ActorId,
532 ) {
533 let env = MessageEnvelope::new(
534 client.self_id().clone(),
535 hyperactor_reference::PortId::new(dest_actor, 0),
536 wirevalue::Any::serialize(&0u64).unwrap(),
537 Flattrs::new(),
538 );
539 // Target the global root client's well-known Undeliverable port.
540 let undeliverable_port =
541 hyperactor_reference::PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
542 client.self_id(),
543 );
544 undeliverable_port
545 .send(client, Undeliverable(env))
546 .expect("inject_undeliverable: send failed");
547 }
548
549 /// Verifies that creating a `ProcMesh` installs the
550 /// process-global supervision sink used by the global root
551 /// client.
552 #[tokio::test]
553 async fn test_sink_installed_after_mesh_creation() {
554 let (_mesh, _actor, _router) = testing::local_proc_mesh(extent!(replica = 2)).await;
555 assert!(
556 get_global_supervision_sink().is_some(),
557 "supervision sink must be set after ProcMesh creation"
558 );
559 }
560
561 /// Proves the full forwarding pipeline:
562 ///
563 /// Undeliverable<MessageEnvelope>
564 /// → GlobalClientActor::handle_undeliverable_message
565 /// → GLOBAL_SUPERVISION_SINK (PortRef)
566 /// → ActorSupervisionEvent delivered
567 ///
568 /// Installs a local port as the sink and verifies that the
569 /// `ActorSupervisionEvent` arrives there.
570 #[tokio::test]
571 async fn test_undeliverable_forwarded_to_sink() {
572 let cx = context().await;
573 let client = cx.actor_instance;
574
575 // Install a test sink we control.
576 let (sink_handle, mut sink_rx) = client.open_port::<ActorSupervisionEvent>();
577 set_global_supervision_sink(sink_handle.bind());
578
579 let marker = test_actor_id("fwd_test", "marker_actor");
580 inject_undeliverable(client, marker.clone());
581
582 // The handler runs asynchronously via work_rx; wait for the
583 // forwarded event with our marker.
584 let event = tokio::time::timeout(Duration::from_secs(5), async {
585 loop {
586 let ev = sink_rx.recv().await.expect("sink channel closed");
587 if ev.actor_id == marker {
588 return ev;
589 }
590 // Discard stale events from other tests sharing the
591 // global sink.
592 }
593 })
594 .await
595 .expect("timed out waiting for supervision event");
596
597 assert_eq!(
598 event.actor_id, marker,
599 "forwarded event must reference the undeliverable's destination actor"
600 );
601 }
602
603 /// Proves last-sink-wins: when two sinks are installed in
604 /// sequence, only the second receives the forwarded event.
605 #[tokio::test]
606 async fn test_last_sink_wins() {
607 let cx = context().await;
608 let client = cx.actor_instance;
609
610 // Install sink A.
611 let (sink_a_handle, _sink_a_rx) = client.open_port::<ActorSupervisionEvent>();
612 set_global_supervision_sink(sink_a_handle.bind());
613
614 // Install sink B (overrides A).
615 let (sink_b_handle, mut sink_b_rx) = client.open_port::<ActorSupervisionEvent>();
616 set_global_supervision_sink(sink_b_handle.bind());
617
618 let marker = test_actor_id("last_wins", "marker_actor");
619 inject_undeliverable(client, marker.clone());
620
621 // B should receive our marked event.
622 let event = tokio::time::timeout(Duration::from_secs(5), async {
623 loop {
624 let ev = sink_b_rx.recv().await.expect("sink B channel closed");
625 if ev.actor_id == marker {
626 return ev;
627 }
628 }
629 })
630 .await
631 .expect("timed out waiting for supervision event on sink B");
632 assert_eq!(event.actor_id, marker);
633 }
634
635 /// Proves the global client does not crash when no sink is
636 /// installed (early/late binding). The handler must log and
637 /// drop gracefully, and the client must remain usable
638 /// afterward.
639 #[tokio::test]
640 async fn test_no_crash_without_sink() {
641 let cx = context().await;
642 let client = cx.actor_instance;
643
644 // Clear any previously installed sink.
645 *sink_cell().write().unwrap() = None;
646
647 // Inject an undeliverable — should not panic.
648 inject_undeliverable(client, test_actor_id("no_sink", "marker_actor"));
649
650 // Give the async handler time to run.
651 tokio::time::sleep(Duration::from_millis(100)).await;
652
653 // The global client must still be alive and usable.
654 // Verify by installing a new sink and sending another
655 // undeliverable that arrives correctly.
656 let (sink_handle, mut sink_rx) = client.open_port::<ActorSupervisionEvent>();
657 set_global_supervision_sink(sink_handle.bind());
658
659 let marker = test_actor_id("no_sink_recovery", "marker_actor");
660 inject_undeliverable(client, marker.clone());
661
662 let event = tokio::time::timeout(Duration::from_secs(5), async {
663 loop {
664 let ev = sink_rx.recv().await.expect("sink channel closed");
665 if ev.actor_id == marker {
666 return ev;
667 }
668 }
669 })
670 .await
671 .expect("timed out: global client crashed or stopped processing");
672 assert_eq!(event.actor_id, marker);
673 }
674}