Skip to main content

hyperactor/
gateway.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Connectivity layer for Hyperactor procs.
10//!
11//! A proc by itself is an isolated actor runtime. It owns local actor
12//! lifecycle and mailboxes, but it communicates with other procs by
13//! attaching to a gateway. The gateway encapsulates the proc's connectivity
14//! layer: it gives attached procs an advertised location, accepts inbound
15//! traffic for that location, and forwards outbound traffic to destinations
16//! outside the proc.
17//!
18//! This separation lets us compose different topologies without changing
19//! proc identity. A host can attach all of its procs to one gateway, so the
20//! gateway multiplexes ingress to those procs and routes egress on their
21//! behalf. A proc from a foreign host can also attach through another host's
22//! gateway, inheriting that host's advertised location while still retaining
23//! its own proc id. Gateways can also act as pure proxies when they do not
24//! own any local procs.
25//!
26//! From the channel/connectivity perspective, each location has one gateway.
27//! Operationally, a gateway is both a proc multiplexer for ingress and a
28//! router for egress.
29
30use std::collections::HashMap;
31use std::fmt;
32use std::future::Future;
33use std::pin::Pin;
34use std::sync::Arc;
35use std::sync::OnceLock;
36use std::sync::RwLock;
37use std::sync::Weak;
38use std::sync::atomic::AtomicBool;
39use std::sync::atomic::Ordering;
40use std::task::Context;
41use std::task::Poll;
42
43use async_trait::async_trait;
44
45use crate::Location;
46use crate::ProcAddr;
47use crate::ProcId;
48use crate::channel;
49use crate::channel::ChannelAddr;
50use crate::channel::ChannelError;
51use crate::channel::ChannelTransport;
52use crate::mailbox::BoxedMailboxSender;
53use crate::mailbox::DeliveryError;
54use crate::mailbox::DialMailboxRouter;
55use crate::mailbox::IntoBoxedMailboxSender as _;
56use crate::mailbox::MailboxSender as _;
57use crate::mailbox::MailboxServer as _;
58use crate::mailbox::MailboxServerHandle;
59use crate::mailbox::MessageEnvelope;
60use crate::mailbox::PortHandle;
61use crate::mailbox::Undeliverable;
62use crate::mailbox::UnroutableMailboxSender;
63use crate::proc::Proc;
64use crate::proc::WeakProc;
65
66/// Connectivity boundary for one or more procs.
67#[derive(Clone)]
68pub struct Gateway {
69    inner: Arc<GatewayState>,
70}
71
72struct GatewayState {
73    /// The location to use when no server is active.
74    fallback_location: Location,
75
76    /// The location used when constructing routeable addresses for
77    /// newly bound refs.
78    default_location: RwLock<Location>,
79
80    /// Sender used to forward messages outside of the proc.
81    forwarder: BoxedMailboxSender,
82
83    /// Procs attached to this gateway, keyed by runtime identity.
84    procs: RwLock<HashMap<ProcId, WeakProc>>,
85
86    /// Locations currently served by this gateway. The last location
87    /// is the default advertised location.
88    active_servers: RwLock<Vec<Location>>,
89}
90
91impl Gateway {
92    /// Create a fresh unserved gateway with dial-based forwarding.
93    pub fn new() -> Self {
94        Self::configured(
95            channel::reserve_local_addr().into(),
96            DialMailboxRouter::new().into_boxed(),
97        )
98    }
99
100    /// Create a fresh unserved local-only gateway.
101    pub fn isolated() -> Self {
102        Self::configured(
103            channel::reserve_local_addr().into(),
104            BoxedMailboxSender::new(UnroutableMailboxSender),
105        )
106    }
107
108    /// Return the process-wide global gateway.
109    pub fn global() -> &'static Self {
110        static GLOBAL_GATEWAY: OnceLock<Gateway> = OnceLock::new();
111        GLOBAL_GATEWAY.get_or_init(Self::new)
112    }
113
114    pub(crate) fn configured(default_location: Location, forwarder: BoxedMailboxSender) -> Self {
115        Self {
116            inner: Arc::new(GatewayState {
117                fallback_location: default_location.clone(),
118                default_location: RwLock::new(default_location),
119                forwarder,
120                procs: RwLock::new(HashMap::new()),
121                active_servers: RwLock::new(Vec::new()),
122            }),
123        }
124    }
125
126    /// The gateway's default advertised location.
127    pub fn default_location(&self) -> Location {
128        self.inner.default_location.read().unwrap().clone()
129    }
130
131    /// Set the gateway's default advertised location.
132    pub fn set_default_location(&self, location: Location) {
133        *self.inner.default_location.write().unwrap() = location;
134    }
135
136    /// Construct a routeable proc address using this gateway's default location.
137    pub fn proc_addr(&self, proc_id: &ProcId) -> ProcAddr {
138        ProcAddr::new(proc_id.clone(), self.default_location())
139    }
140
141    pub(crate) fn forwarder(&self) -> &BoxedMailboxSender {
142        &self.inner.forwarder
143    }
144
145    pub(crate) fn attach(&self, proc: &Proc) {
146        let proc_id = proc.proc_id().clone();
147        if let Some(existing) = self
148            .inner
149            .procs
150            .write()
151            .unwrap()
152            .insert(proc_id.clone(), proc.downgrade())
153            && existing.upgrade().is_some()
154        {
155            panic!("gateway already has a live proc with id {}", proc_id)
156        }
157    }
158
159    pub(crate) fn serve_rx(
160        &self,
161        rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
162    ) -> MailboxServerHandle {
163        WeakGateway::new(self).serve(rx)
164    }
165
166    /// Serve this gateway on the provided channel address.
167    ///
168    /// When serving the first local [`ChannelAddr::any`] address, the gateway
169    /// binds the local address that was reserved when the gateway was created.
170    /// Local reservation is separate from local binding so a gateway can have a
171    /// stable location before it has a runtime available to run a server.
172    /// Later local `any` serves allocate fresh local ports, so the gateway can
173    /// have multiple active local servers.
174    ///
175    /// Serving updates the gateway's default location to the newly served
176    /// address. When that server stops, the default location falls back to the
177    /// previous active server, or to the reserved fallback location when no
178    /// server remains.
179    pub fn serve(&self, addr: ChannelAddr) -> Result<GatewayServeHandle, ChannelError> {
180        let (location, handle) = self.serve_inner(addr)?;
181        Ok(GatewayServeHandle {
182            gateway: self.clone(),
183            location,
184            handle,
185            stopped: Arc::new(AtomicBool::new(false)),
186        })
187    }
188
189    fn serve_inner(
190        &self,
191        addr: ChannelAddr,
192    ) -> Result<(Location, MailboxServerHandle), ChannelError> {
193        let addr = self.resolve_serve_addr(addr);
194        let (addr, rx) = channel::serve(addr)?;
195        let location = Location::from(addr);
196        self.add_server(location.clone());
197        Ok((location, self.serve_rx(rx)))
198    }
199
200    fn resolve_serve_addr(&self, addr: ChannelAddr) -> ChannelAddr {
201        // The first local-any serve activates the address that was reserved at
202        // construction time. Subsequent local-any serves should allocate new
203        // ports, so multiple local servers can coexist for the same gateway.
204        if addr == ChannelAddr::any(ChannelTransport::Local)
205            && self.inner.active_servers.read().unwrap().is_empty()
206            && matches!(self.inner.fallback_location.addr(), ChannelAddr::Local(_))
207        {
208            return self.inner.fallback_location.addr().clone();
209        }
210        addr
211    }
212
213    fn add_server(&self, location: Location) {
214        let mut active_servers = self.inner.active_servers.write().unwrap();
215        active_servers.push(location.clone());
216        *self.inner.default_location.write().unwrap() = location;
217    }
218
219    fn remove_server(&self, location: &Location) {
220        let mut active_servers = self.inner.active_servers.write().unwrap();
221        if let Some(index) = active_servers.iter().rposition(|active| active == location) {
222            active_servers.remove(index);
223        }
224        let default_location = active_servers
225            .last()
226            .cloned()
227            .unwrap_or_else(|| self.inner.fallback_location.clone());
228        *self.inner.default_location.write().unwrap() = default_location;
229    }
230
231    /// Flush pending gateway traffic.
232    ///
233    /// This first flushes the muxers for all live procs attached to the
234    /// gateway, then flushes the gateway's forwarder. Flushing the proc muxers
235    /// drains local delivery and any return paths rooted in attached procs;
236    /// flushing the forwarder drains outbound traffic that the gateway has
237    /// routed away from those procs.
238    ///
239    /// The live proc set is snapshotted before awaiting, so we do not hold the
240    /// proc map while flushing. Procs that have already been dropped are
241    /// ignored. Concurrent posts may race with this operation; `flush` only
242    /// guarantees that each flushed sender observes its usual sender-level
243    /// flush semantics at the time it is flushed.
244    pub(crate) async fn flush(&self) -> Result<(), anyhow::Error> {
245        let procs = self
246            .inner
247            .procs
248            .read()
249            .unwrap()
250            .values()
251            .filter_map(WeakProc::upgrade)
252            .collect::<Vec<_>>();
253        for proc in procs {
254            proc.muxer().flush().await?;
255        }
256        self.inner.forwarder.flush().await
257    }
258}
259
260impl fmt::Debug for Gateway {
261    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262        f.debug_struct("Gateway")
263            .field("default_location", &self.default_location())
264            .finish()
265    }
266}
267
268/// A running gateway server.
269#[derive(Debug)]
270pub struct GatewayServeHandle {
271    gateway: Gateway,
272    location: Location,
273    handle: MailboxServerHandle,
274    stopped: Arc<AtomicBool>,
275}
276
277impl GatewayServeHandle {
278    /// Signal the gateway server to stop.
279    pub fn stop(&self, reason: &str) {
280        if !self.stopped.swap(true, Ordering::AcqRel) {
281            self.handle.stop(reason);
282            self.gateway.remove_server(&self.location);
283        }
284    }
285}
286
287impl Future for GatewayServeHandle {
288    type Output = <MailboxServerHandle as Future>::Output;
289
290    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
291        // SAFETY: `handle` is structurally pinned with `GatewayServeHandle`:
292        // this type is `!Unpin` because `MailboxServerHandle` is `!Unpin`, it
293        // has no `Drop` impl that moves `handle`, and no method moves `handle`
294        // out of a pinned `GatewayServeHandle`.
295        let handle = unsafe {
296            self.as_mut()
297                .map_unchecked_mut(|container| &mut container.handle)
298        };
299        let result = handle.poll(cx);
300        if result.is_ready() {
301            // SAFETY: We only mutate unpinned bookkeeping fields after polling
302            // the pinned `handle`; this does not move `handle` or any other
303            // pinned field out of `self`.
304            let this = unsafe { self.get_unchecked_mut() };
305            if !this.stopped.swap(true, Ordering::AcqRel) {
306                this.gateway.remove_server(&this.location);
307            }
308        }
309        result
310    }
311}
312
313#[derive(Clone, Debug)]
314struct WeakGateway(Weak<GatewayState>);
315
316impl WeakGateway {
317    fn new(gateway: &Gateway) -> Self {
318        Self(Arc::downgrade(&gateway.inner))
319    }
320
321    fn upgrade(&self) -> Option<Gateway> {
322        self.0.upgrade().map(|inner| Gateway { inner })
323    }
324}
325
326#[async_trait]
327impl crate::mailbox::MailboxSender for WeakGateway {
328    fn post_unchecked(
329        &self,
330        envelope: MessageEnvelope,
331        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
332    ) {
333        match self.upgrade() {
334            Some(gateway) => gateway.post(envelope, return_handle),
335            None => envelope.undeliverable(
336                DeliveryError::BrokenLink("failed to upgrade WeakGateway".to_string()),
337                return_handle,
338            ),
339        }
340    }
341
342    async fn flush(&self) -> Result<(), anyhow::Error> {
343        match self.upgrade() {
344            Some(gateway) => Gateway::flush(&gateway).await,
345            None => Ok(()),
346        }
347    }
348}
349
350#[async_trait]
351impl crate::mailbox::MailboxSender for Gateway {
352    fn post_unchecked(
353        &self,
354        envelope: MessageEnvelope,
355        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
356    ) {
357        let dest_proc = envelope.dest().actor_addr().proc_addr();
358        let weak_proc = self
359            .inner
360            .procs
361            .read()
362            .unwrap()
363            .get(dest_proc.id())
364            .cloned();
365        let proc = weak_proc.as_ref().and_then(WeakProc::upgrade);
366
367        if weak_proc.is_some() && proc.is_none() {
368            let mut procs = self.inner.procs.write().unwrap();
369            if procs
370                .get(dest_proc.id())
371                .and_then(WeakProc::upgrade)
372                .is_none()
373            {
374                procs.remove(dest_proc.id());
375            }
376        }
377
378        if let Some(proc) = proc
379            && proc.is_local_delivery_target(&dest_proc)
380        {
381            proc.muxer().post(envelope, return_handle);
382            return;
383        }
384
385        self.inner.forwarder.post(envelope, return_handle)
386    }
387
388    async fn flush(&self) -> Result<(), anyhow::Error> {
389        Gateway::flush(self).await
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use std::sync::Arc;
396    use std::sync::atomic::AtomicUsize;
397    use std::sync::atomic::Ordering;
398    use std::time::Duration;
399
400    use async_trait::async_trait;
401    use hyperactor_config::Flattrs;
402    use tokio::time;
403
404    use super::*;
405    use crate::Endpoint as _;
406    use crate::Label;
407    use crate::mailbox::MailboxSender;
408    use crate::mailbox::PortLocation;
409    use crate::mailbox::monitored_return_handle;
410    use crate::port::Port;
411    use crate::proc::Proc;
412    use crate::testing::ids::test_actor_id;
413    use crate::testing::pingpong::PingPongActor;
414    use crate::testing::pingpong::PingPongMessage;
415
416    /// `Gateway::post_unchecked` demuxes inbound envelopes by
417    /// destination `ProcId` to the matching attached proc's muxer,
418    /// and falls through to the configured forwarder for unknown
419    /// destinations. Attached procs only receive envelopes addressed
420    /// to them — a stranger-addressed envelope does not leak to local
421    /// receivers.
422    #[tokio::test]
423    async fn test_gateway_post_demuxes_by_proc_id() {
424        #[derive(Clone)]
425        struct CountingSender(Arc<AtomicUsize>);
426
427        #[async_trait]
428        impl MailboxSender for CountingSender {
429            fn post_unchecked(
430                &self,
431                _envelope: MessageEnvelope,
432                _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
433            ) {
434                self.0.fetch_add(1, Ordering::SeqCst);
435            }
436        }
437
438        let forwarded = Arc::new(AtomicUsize::new(0));
439        let gateway = Gateway::configured(
440            channel::reserve_local_addr().into(),
441            BoxedMailboxSender::new(CountingSender(forwarded.clone())),
442        );
443
444        let alpha = Proc::builder()
445            .proc_id(ProcId::instance(Label::strip("alpha")))
446            .shared_gateway(gateway.clone())
447            .build()
448            .unwrap();
449        let beta = Proc::builder()
450            .proc_id(ProcId::instance(Label::strip("beta")))
451            .shared_gateway(gateway.clone())
452            .build()
453            .unwrap();
454
455        let (alpha_client, _) = alpha.client("client").unwrap();
456        let (alpha_port, mut alpha_rx) = alpha_client.bind_handler_port::<u64>();
457        let PortLocation::Bound(alpha_dest) = alpha_port.location() else {
458            panic!("alpha handler port must be bound");
459        };
460
461        let (beta_client, _) = beta.client("client").unwrap();
462        let (beta_port, mut beta_rx) = beta_client.bind_handler_port::<u64>();
463        let PortLocation::Bound(beta_dest) = beta_port.location() else {
464            panic!("beta handler port must be bound");
465        };
466
467        let sender = test_actor_id("test", "sender");
468
469        gateway.post(
470            MessageEnvelope::serialize(sender.clone(), alpha_dest.clone(), &111u64, Flattrs::new())
471                .unwrap(),
472            monitored_return_handle(),
473        );
474        let received = time::timeout(Duration::from_secs(5), alpha_rx.recv())
475            .await
476            .expect("alpha_rx timed out")
477            .expect("alpha_rx closed");
478        assert_eq!(received, 111);
479        assert_eq!(forwarded.load(Ordering::SeqCst), 0);
480
481        gateway.post(
482            MessageEnvelope::serialize(sender.clone(), beta_dest.clone(), &222u64, Flattrs::new())
483                .unwrap(),
484            monitored_return_handle(),
485        );
486        let received = time::timeout(Duration::from_secs(5), beta_rx.recv())
487            .await
488            .expect("beta_rx timed out")
489            .expect("beta_rx closed");
490        assert_eq!(received, 222);
491        assert_eq!(forwarded.load(Ordering::SeqCst), 0);
492
493        let stranger_proc = ProcAddr::instance(ChannelAddr::Local(9999), "stranger");
494        let stranger_dest = stranger_proc
495            .actor_addr("ghost")
496            .port_addr(Port::from(0u64));
497        gateway.post(
498            MessageEnvelope::serialize(sender, stranger_dest, &333u64, Flattrs::new()).unwrap(),
499            monitored_return_handle(),
500        );
501        assert_eq!(forwarded.load(Ordering::SeqCst), 1);
502        assert!(
503            time::timeout(Duration::from_millis(50), alpha_rx.recv())
504                .await
505                .is_err(),
506            "alpha_rx received a message after stranger post",
507        );
508        assert!(
509            time::timeout(Duration::from_millis(50), beta_rx.recv())
510                .await
511                .is_err(),
512            "beta_rx received a message after stranger post",
513        );
514    }
515
516    /// Ping-pong between two `PingPongActor`s on two procs that share
517    /// one gateway. Each cross-proc hop goes `Proc::post_unchecked` →
518    /// `Gateway::post_unchecked` demux → destination proc's muxer
519    /// directly, without touching the gateway's forwarder.
520    #[tokio::test]
521    async fn test_ping_pong_across_shared_gateway() {
522        let gateway = Gateway::isolated();
523
524        let alpha = Proc::builder()
525            .proc_id(ProcId::instance(Label::strip("alpha")))
526            .shared_gateway(gateway.clone())
527            .build()
528            .unwrap();
529        let beta = Proc::builder()
530            .proc_id(ProcId::instance(Label::strip("beta")))
531            .shared_gateway(gateway.clone())
532            .build()
533            .unwrap();
534
535        let (client, _) = alpha.client("client").unwrap();
536        let (undeliverable_msg_tx, mut undeliverable_rx) =
537            client.open_port::<Undeliverable<MessageEnvelope>>();
538
539        let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
540        let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
541        let ping_handle = alpha.spawn::<PingPongActor>("ping", ping_actor).unwrap();
542        let pong_handle = beta.spawn::<PingPongActor>("pong", pong_actor).unwrap();
543
544        let (local_port, local_receiver) = client.open_once_port();
545
546        ping_handle.post(
547            &client,
548            PingPongMessage(10, pong_handle.bind(), local_port.bind()),
549        );
550
551        let received = time::timeout(Duration::from_secs(5), local_receiver.recv())
552            .await
553            .expect("local_receiver timed out")
554            .expect("ping pong did not complete");
555        assert!(received);
556
557        assert!(
558            time::timeout(Duration::from_millis(50), undeliverable_rx.recv())
559                .await
560                .is_err(),
561            "unexpected undeliverable during cross-proc ping-pong",
562        );
563    }
564
565    /// `Gateway::post_unchecked` removes stale `WeakProc` entries
566    /// lazily on the post path. Dropping a proc leaves a dead
567    /// `WeakProc` in the gateway's `procs` map; the next post to that
568    /// proc's id falls through to the forwarder and removes the stale
569    /// entry.
570    #[tokio::test]
571    async fn test_gateway_post_removes_stale_weak_procs() {
572        #[derive(Clone)]
573        struct CountingSender(Arc<AtomicUsize>);
574
575        #[async_trait]
576        impl MailboxSender for CountingSender {
577            fn post_unchecked(
578                &self,
579                _envelope: MessageEnvelope,
580                _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
581            ) {
582                self.0.fetch_add(1, Ordering::SeqCst);
583            }
584        }
585
586        let forwarded = Arc::new(AtomicUsize::new(0));
587        let gateway = Gateway::configured(
588            channel::reserve_local_addr().into(),
589            BoxedMailboxSender::new(CountingSender(forwarded.clone())),
590        );
591
592        let proc = Proc::builder()
593            .proc_id(ProcId::instance(Label::strip("alpha")))
594            .shared_gateway(gateway.clone())
595            .build()
596            .unwrap();
597        let proc_id = proc.proc_id().clone();
598        let dest = proc
599            .proc_addr()
600            .actor_addr("worker")
601            .port_addr(Port::from(0u64));
602
603        // Sanity: proc is attached.
604        assert_eq!(gateway.inner.procs.read().unwrap().len(), 1);
605
606        drop(proc);
607
608        // The entry is still present, but it is now a *stale* WeakProc:
609        // upgrade must fail.
610        {
611            let procs = gateway.inner.procs.read().unwrap();
612            assert_eq!(procs.len(), 1);
613            assert!(
614                procs.get(&proc_id).and_then(WeakProc::upgrade).is_none(),
615                "expected dropped proc to remain only as a stale WeakProc entry",
616            );
617        }
618
619        gateway.post(
620            MessageEnvelope::serialize(
621                test_actor_id("test", "sender"),
622                dest,
623                &42u64,
624                Flattrs::new(),
625            )
626            .unwrap(),
627            monitored_return_handle(),
628        );
629
630        // Envelope fell through to the forwarder; stale entry
631        // removed.
632        assert_eq!(forwarded.load(Ordering::SeqCst), 1);
633        {
634            let procs = gateway.inner.procs.read().unwrap();
635            assert_eq!(procs.len(), 0);
636            assert!(procs.get(&proc_id).is_none());
637        }
638    }
639
640    /// `Gateway::attach` panics when a second proc with the same
641    /// `ProcId` is built against the same gateway while the first is
642    /// still alive. The check is in `Gateway::attach`, invoked from
643    /// `Proc::builder().build()` via `Proc::from_parts_unchecked`.
644    #[test]
645    #[should_panic(expected = "gateway already has a live proc")]
646    fn test_gateway_attach_panics_on_duplicate_live_proc() {
647        let gateway = Gateway::isolated();
648        let proc_id = ProcId::instance(Label::strip("alpha"));
649
650        // Hold the first proc in a binding so it stays alive across
651        // the second build; if the first were dropped, the gateway's
652        // stale-entry path would silently replace it instead of
653        // panicking.
654        let _first = Proc::builder()
655            .proc_id(proc_id.clone())
656            .shared_gateway(gateway.clone())
657            .build()
658            .unwrap();
659
660        let _second = Proc::builder()
661            .proc_id(proc_id)
662            .shared_gateway(gateway.clone())
663            .build()
664            .unwrap();
665    }
666
667    /// `Gateway::flush()` propagates the flush to each attached
668    /// proc's muxer (which in turn flushes its bound senders) and
669    /// then to the gateway's forwarder. Verified by binding a
670    /// `FlushCountingSender` into each proc's muxer and asserting all
671    /// three counters (alpha's, beta's, the forwarder's) increment
672    /// exactly once.
673    #[tokio::test]
674    async fn test_gateway_flush_propagates_to_attached_procs() {
675        #[derive(Clone)]
676        struct FlushCountingSender(Arc<AtomicUsize>);
677
678        #[async_trait]
679        impl MailboxSender for FlushCountingSender {
680            fn post_unchecked(
681                &self,
682                _envelope: MessageEnvelope,
683                _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
684            ) {
685                // Not exercised by this test.
686            }
687
688            async fn flush(&self) -> Result<(), anyhow::Error> {
689                self.0.fetch_add(1, Ordering::SeqCst);
690                Ok(())
691            }
692        }
693
694        let alpha_flushed = Arc::new(AtomicUsize::new(0));
695        let beta_flushed = Arc::new(AtomicUsize::new(0));
696        let forwarder_flushed = Arc::new(AtomicUsize::new(0));
697
698        let gateway = Gateway::configured(
699            channel::reserve_local_addr().into(),
700            BoxedMailboxSender::new(FlushCountingSender(forwarder_flushed.clone())),
701        );
702
703        let alpha = Proc::builder()
704            .proc_id(ProcId::instance(Label::strip("alpha")))
705            .shared_gateway(gateway.clone())
706            .build()
707            .unwrap();
708        let beta = Proc::builder()
709            .proc_id(ProcId::instance(Label::strip("beta")))
710            .shared_gateway(gateway.clone())
711            .build()
712            .unwrap();
713
714        // Bind a flush-counting probe into each proc's muxer. Use a
715        // fabricated actor id under the proc — no actor is spawned
716        // there; the muxer just routes flushes to whatever's bound.
717        let alpha_probe = alpha.proc_addr().actor_addr("alpha_probe").id().clone();
718        let beta_probe = beta.proc_addr().actor_addr("beta_probe").id().clone();
719        assert!(
720            alpha
721                .muxer()
722                .bind(alpha_probe, FlushCountingSender(alpha_flushed.clone()))
723        );
724        assert!(
725            beta.muxer()
726                .bind(beta_probe, FlushCountingSender(beta_flushed.clone()))
727        );
728
729        // Sanity: two procs attached, both live.
730        assert_eq!(gateway.inner.procs.read().unwrap().len(), 2);
731
732        gateway.flush().await.unwrap();
733
734        assert_eq!(alpha_flushed.load(Ordering::SeqCst), 1);
735        assert_eq!(beta_flushed.load(Ordering::SeqCst), 1);
736        assert_eq!(forwarder_flushed.load(Ordering::SeqCst), 1);
737    }
738
739    /// After the gateway is dropped, `WeakGateway::post_unchecked`
740    /// (the sender used by gateway-served mailbox tasks) bounces
741    /// envelopes as `BrokenLink` rather than panicking or hanging.
742    /// Tested directly against `WeakGateway` — no channel server, no
743    /// task lifecycle — because the bounce is in-process and
744    /// observable at the caller's return port without going through
745    /// any serialize/dispatch path.
746    #[tokio::test]
747    async fn test_weak_gateway_bounces_broken_link_after_drop() {
748        let gateway = Gateway::isolated();
749        let weak = WeakGateway::new(&gateway);
750        drop(gateway);
751
752        // Scratch proc just to host the return port.
753        let scratch = Proc::isolated();
754        let (scratch_client, _) = scratch.client("return").unwrap();
755        let (return_handle, mut return_rx) =
756            scratch_client.open_port::<Undeliverable<MessageEnvelope>>();
757
758        // Fabricate a destination — its contents don't matter; the
759        // bounce happens at WeakGateway::upgrade before any demux
760        // would run.
761        let dest_proc = ProcAddr::instance(ChannelAddr::Local(1234), "stranger");
762        let dest = dest_proc.actor_addr("ghost").port_addr(Port::from(0u64));
763        let envelope = MessageEnvelope::serialize(
764            test_actor_id("test", "sender"),
765            dest.clone(),
766            &42u64,
767            Flattrs::new(),
768        )
769        .unwrap();
770
771        // Post directly through the WeakGateway. Upgrade fails →
772        // envelope.undeliverable(BrokenLink, return_handle) sends
773        // synchronously to our return port.
774        weak.post(envelope, return_handle);
775
776        let Undeliverable::Message(envelope) =
777            time::timeout(Duration::from_secs(5), return_rx.recv())
778                .await
779                .expect("return_rx timed out")
780                .expect("return_rx closed")
781        else {
782            panic!("expected returned envelope");
783        };
784        assert_eq!(envelope.dest(), &dest);
785        assert!(
786            envelope
787                .errors()
788                .iter()
789                .any(|e| matches!(e, DeliveryError::BrokenLink(_))),
790            "expected BrokenLink bounce, got {:?}",
791            envelope.errors(),
792        );
793    }
794
795    /// `Gateway::attach` silently replaces a stale `WeakProc` entry
796    /// (one whose strong reference has dropped). No panic; the new
797    /// proc takes over routing for that `ProcId`.
798    #[tokio::test]
799    async fn test_gateway_attach_silently_replaces_dead_entry() {
800        let gateway = Gateway::isolated();
801        let proc_id = ProcId::instance(Label::strip("alpha"));
802
803        let first = Proc::builder()
804            .proc_id(proc_id.clone())
805            .shared_gateway(gateway.clone())
806            .build()
807            .unwrap();
808        drop(first);
809
810        // The entry is still present but stale (upgrade fails).
811        {
812            let procs = gateway.inner.procs.read().unwrap();
813            assert_eq!(procs.len(), 1);
814            assert!(
815                procs.get(&proc_id).and_then(WeakProc::upgrade).is_none(),
816                "expected first proc to remain only as a stale WeakProc entry",
817            );
818        }
819
820        // Attach a second proc with the same id. The dead WeakProc
821        // entry is silently replaced; no panic; map still has exactly
822        // one entry.
823        let second = Proc::builder()
824            .proc_id(proc_id.clone())
825            .shared_gateway(gateway.clone())
826            .build()
827            .unwrap();
828        assert_eq!(gateway.inner.procs.read().unwrap().len(), 1);
829
830        // Verify the new proc is reachable via the gateway.
831        let (client, _) = second.client("client").unwrap();
832        let (port, mut rx) = client.bind_handler_port::<u64>();
833        let dest = port.bind().port_addr().clone();
834
835        gateway.post(
836            MessageEnvelope::serialize(
837                test_actor_id("test", "sender"),
838                dest,
839                &42u64,
840                Flattrs::new(),
841            )
842            .unwrap(),
843            monitored_return_handle(),
844        );
845
846        let received = time::timeout(Duration::from_secs(5), rx.recv())
847            .await
848            .expect("rx timed out")
849            .expect("rx closed");
850        assert_eq!(received, 42);
851    }
852
853    /// `Gateway::remove_server` correctly unwinds `active_servers`
854    /// when handles stop out of order. Three concurrent servers; stop
855    /// the middle one, then the last, then the first, asserting the
856    /// gateway's `default_location` at each step. Final empty state
857    /// reverts to the construction-time fallback.
858    #[tokio::test]
859    async fn test_gateway_serve_stop_unwinds_in_any_order() {
860        let gateway = Gateway::isolated();
861        let fallback = gateway.default_location();
862
863        let s1 = Gateway::serve(&gateway, ChannelAddr::any(ChannelTransport::Local)).unwrap();
864        let loc1 = gateway.default_location();
865        let s2 = Gateway::serve(&gateway, ChannelAddr::any(ChannelTransport::Local)).unwrap();
866        let loc2 = gateway.default_location();
867        let s3 = Gateway::serve(&gateway, ChannelAddr::any(ChannelTransport::Local)).unwrap();
868        let loc3 = gateway.default_location();
869
870        // First serve(any) reuses the gateway's reserved fallback
871        // address (see resolve_serve_addr); subsequent serves
872        // allocate fresh ports.
873        assert_eq!(loc1, fallback);
874        assert_ne!(loc1, loc2);
875        assert_ne!(loc2, loc3);
876        assert_ne!(loc1, loc3);
877
878        // Middle handle stops first: default stays at loc3 (still the
879        // last entry in active_servers).
880        s2.stop("test");
881        s2.await.unwrap().unwrap();
882        assert_eq!(gateway.default_location(), loc3);
883
884        // Last handle stops: default falls back to loc1.
885        s3.stop("test");
886        s3.await.unwrap().unwrap();
887        assert_eq!(gateway.default_location(), loc1);
888
889        // Final handle stops: default reverts to the
890        // construction-time fallback.
891        s1.stop("test");
892        s1.await.unwrap().unwrap();
893        assert_eq!(gateway.default_location(), fallback);
894    }
895}