1use std::collections::HashMap;
31use std::fmt;
32use std::future::Future;
33use std::pin::Pin;
34use std::sync::Arc;
35use std::sync::OnceLock;
36use std::sync::RwLock;
37use std::sync::Weak;
38use std::sync::atomic::AtomicBool;
39use std::sync::atomic::Ordering;
40use std::task::Context;
41use std::task::Poll;
42
43use async_trait::async_trait;
44
45use crate::Location;
46use crate::ProcAddr;
47use crate::ProcId;
48use crate::channel;
49use crate::channel::ChannelAddr;
50use crate::channel::ChannelError;
51use crate::channel::ChannelTransport;
52use crate::mailbox::BoxedMailboxSender;
53use crate::mailbox::DeliveryError;
54use crate::mailbox::DialMailboxRouter;
55use crate::mailbox::IntoBoxedMailboxSender as _;
56use crate::mailbox::MailboxSender as _;
57use crate::mailbox::MailboxServer as _;
58use crate::mailbox::MailboxServerHandle;
59use crate::mailbox::MessageEnvelope;
60use crate::mailbox::PortHandle;
61use crate::mailbox::Undeliverable;
62use crate::mailbox::UnroutableMailboxSender;
63use crate::proc::Proc;
64use crate::proc::WeakProc;
65
66#[derive(Clone)]
68pub struct Gateway {
69 inner: Arc<GatewayState>,
70}
71
72struct GatewayState {
73 fallback_location: Location,
75
76 default_location: RwLock<Location>,
79
80 forwarder: BoxedMailboxSender,
82
83 procs: RwLock<HashMap<ProcId, WeakProc>>,
85
86 active_servers: RwLock<Vec<Location>>,
89}
90
91impl Gateway {
92 pub fn new() -> Self {
94 Self::configured(
95 channel::reserve_local_addr().into(),
96 DialMailboxRouter::new().into_boxed(),
97 )
98 }
99
100 pub fn isolated() -> Self {
102 Self::configured(
103 channel::reserve_local_addr().into(),
104 BoxedMailboxSender::new(UnroutableMailboxSender),
105 )
106 }
107
108 pub fn global() -> &'static Self {
110 static GLOBAL_GATEWAY: OnceLock<Gateway> = OnceLock::new();
111 GLOBAL_GATEWAY.get_or_init(Self::new)
112 }
113
114 pub(crate) fn configured(default_location: Location, forwarder: BoxedMailboxSender) -> Self {
115 Self {
116 inner: Arc::new(GatewayState {
117 fallback_location: default_location.clone(),
118 default_location: RwLock::new(default_location),
119 forwarder,
120 procs: RwLock::new(HashMap::new()),
121 active_servers: RwLock::new(Vec::new()),
122 }),
123 }
124 }
125
126 pub fn default_location(&self) -> Location {
128 self.inner.default_location.read().unwrap().clone()
129 }
130
131 pub fn set_default_location(&self, location: Location) {
133 *self.inner.default_location.write().unwrap() = location;
134 }
135
136 pub fn proc_addr(&self, proc_id: &ProcId) -> ProcAddr {
138 ProcAddr::new(proc_id.clone(), self.default_location())
139 }
140
141 pub(crate) fn forwarder(&self) -> &BoxedMailboxSender {
142 &self.inner.forwarder
143 }
144
145 pub(crate) fn attach(&self, proc: &Proc) {
146 let proc_id = proc.proc_id().clone();
147 if let Some(existing) = self
148 .inner
149 .procs
150 .write()
151 .unwrap()
152 .insert(proc_id.clone(), proc.downgrade())
153 && existing.upgrade().is_some()
154 {
155 panic!("gateway already has a live proc with id {}", proc_id)
156 }
157 }
158
159 pub(crate) fn serve_rx(
160 &self,
161 rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
162 ) -> MailboxServerHandle {
163 WeakGateway::new(self).serve(rx)
164 }
165
166 pub fn serve(&self, addr: ChannelAddr) -> Result<GatewayServeHandle, ChannelError> {
180 let (location, handle) = self.serve_inner(addr)?;
181 Ok(GatewayServeHandle {
182 gateway: self.clone(),
183 location,
184 handle,
185 stopped: Arc::new(AtomicBool::new(false)),
186 })
187 }
188
189 fn serve_inner(
190 &self,
191 addr: ChannelAddr,
192 ) -> Result<(Location, MailboxServerHandle), ChannelError> {
193 let addr = self.resolve_serve_addr(addr);
194 let (addr, rx) = channel::serve(addr)?;
195 let location = Location::from(addr);
196 self.add_server(location.clone());
197 Ok((location, self.serve_rx(rx)))
198 }
199
200 fn resolve_serve_addr(&self, addr: ChannelAddr) -> ChannelAddr {
201 if addr == ChannelAddr::any(ChannelTransport::Local)
205 && self.inner.active_servers.read().unwrap().is_empty()
206 && matches!(self.inner.fallback_location.addr(), ChannelAddr::Local(_))
207 {
208 return self.inner.fallback_location.addr().clone();
209 }
210 addr
211 }
212
213 fn add_server(&self, location: Location) {
214 let mut active_servers = self.inner.active_servers.write().unwrap();
215 active_servers.push(location.clone());
216 *self.inner.default_location.write().unwrap() = location;
217 }
218
219 fn remove_server(&self, location: &Location) {
220 let mut active_servers = self.inner.active_servers.write().unwrap();
221 if let Some(index) = active_servers.iter().rposition(|active| active == location) {
222 active_servers.remove(index);
223 }
224 let default_location = active_servers
225 .last()
226 .cloned()
227 .unwrap_or_else(|| self.inner.fallback_location.clone());
228 *self.inner.default_location.write().unwrap() = default_location;
229 }
230
231 pub(crate) async fn flush(&self) -> Result<(), anyhow::Error> {
245 let procs = self
246 .inner
247 .procs
248 .read()
249 .unwrap()
250 .values()
251 .filter_map(WeakProc::upgrade)
252 .collect::<Vec<_>>();
253 for proc in procs {
254 proc.muxer().flush().await?;
255 }
256 self.inner.forwarder.flush().await
257 }
258}
259
260impl fmt::Debug for Gateway {
261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262 f.debug_struct("Gateway")
263 .field("default_location", &self.default_location())
264 .finish()
265 }
266}
267
268#[derive(Debug)]
270pub struct GatewayServeHandle {
271 gateway: Gateway,
272 location: Location,
273 handle: MailboxServerHandle,
274 stopped: Arc<AtomicBool>,
275}
276
277impl GatewayServeHandle {
278 pub fn stop(&self, reason: &str) {
280 if !self.stopped.swap(true, Ordering::AcqRel) {
281 self.handle.stop(reason);
282 self.gateway.remove_server(&self.location);
283 }
284 }
285}
286
287impl Future for GatewayServeHandle {
288 type Output = <MailboxServerHandle as Future>::Output;
289
290 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
291 let handle = unsafe {
296 self.as_mut()
297 .map_unchecked_mut(|container| &mut container.handle)
298 };
299 let result = handle.poll(cx);
300 if result.is_ready() {
301 let this = unsafe { self.get_unchecked_mut() };
305 if !this.stopped.swap(true, Ordering::AcqRel) {
306 this.gateway.remove_server(&this.location);
307 }
308 }
309 result
310 }
311}
312
313#[derive(Clone, Debug)]
314struct WeakGateway(Weak<GatewayState>);
315
316impl WeakGateway {
317 fn new(gateway: &Gateway) -> Self {
318 Self(Arc::downgrade(&gateway.inner))
319 }
320
321 fn upgrade(&self) -> Option<Gateway> {
322 self.0.upgrade().map(|inner| Gateway { inner })
323 }
324}
325
326#[async_trait]
327impl crate::mailbox::MailboxSender for WeakGateway {
328 fn post_unchecked(
329 &self,
330 envelope: MessageEnvelope,
331 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
332 ) {
333 match self.upgrade() {
334 Some(gateway) => gateway.post(envelope, return_handle),
335 None => envelope.undeliverable(
336 DeliveryError::BrokenLink("failed to upgrade WeakGateway".to_string()),
337 return_handle,
338 ),
339 }
340 }
341
342 async fn flush(&self) -> Result<(), anyhow::Error> {
343 match self.upgrade() {
344 Some(gateway) => Gateway::flush(&gateway).await,
345 None => Ok(()),
346 }
347 }
348}
349
350#[async_trait]
351impl crate::mailbox::MailboxSender for Gateway {
352 fn post_unchecked(
353 &self,
354 envelope: MessageEnvelope,
355 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
356 ) {
357 let dest_proc = envelope.dest().actor_addr().proc_addr();
358 let weak_proc = self
359 .inner
360 .procs
361 .read()
362 .unwrap()
363 .get(dest_proc.id())
364 .cloned();
365 let proc = weak_proc.as_ref().and_then(WeakProc::upgrade);
366
367 if weak_proc.is_some() && proc.is_none() {
368 let mut procs = self.inner.procs.write().unwrap();
369 if procs
370 .get(dest_proc.id())
371 .and_then(WeakProc::upgrade)
372 .is_none()
373 {
374 procs.remove(dest_proc.id());
375 }
376 }
377
378 if let Some(proc) = proc
379 && proc.is_local_delivery_target(&dest_proc)
380 {
381 proc.muxer().post(envelope, return_handle);
382 return;
383 }
384
385 self.inner.forwarder.post(envelope, return_handle)
386 }
387
388 async fn flush(&self) -> Result<(), anyhow::Error> {
389 Gateway::flush(self).await
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use std::sync::Arc;
396 use std::sync::atomic::AtomicUsize;
397 use std::sync::atomic::Ordering;
398 use std::time::Duration;
399
400 use async_trait::async_trait;
401 use hyperactor_config::Flattrs;
402 use tokio::time;
403
404 use super::*;
405 use crate::Endpoint as _;
406 use crate::Label;
407 use crate::mailbox::MailboxSender;
408 use crate::mailbox::PortLocation;
409 use crate::mailbox::monitored_return_handle;
410 use crate::port::Port;
411 use crate::proc::Proc;
412 use crate::testing::ids::test_actor_id;
413 use crate::testing::pingpong::PingPongActor;
414 use crate::testing::pingpong::PingPongMessage;
415
416 #[tokio::test]
423 async fn test_gateway_post_demuxes_by_proc_id() {
424 #[derive(Clone)]
425 struct CountingSender(Arc<AtomicUsize>);
426
427 #[async_trait]
428 impl MailboxSender for CountingSender {
429 fn post_unchecked(
430 &self,
431 _envelope: MessageEnvelope,
432 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
433 ) {
434 self.0.fetch_add(1, Ordering::SeqCst);
435 }
436 }
437
438 let forwarded = Arc::new(AtomicUsize::new(0));
439 let gateway = Gateway::configured(
440 channel::reserve_local_addr().into(),
441 BoxedMailboxSender::new(CountingSender(forwarded.clone())),
442 );
443
444 let alpha = Proc::builder()
445 .proc_id(ProcId::instance(Label::strip("alpha")))
446 .shared_gateway(gateway.clone())
447 .build()
448 .unwrap();
449 let beta = Proc::builder()
450 .proc_id(ProcId::instance(Label::strip("beta")))
451 .shared_gateway(gateway.clone())
452 .build()
453 .unwrap();
454
455 let (alpha_client, _) = alpha.client("client").unwrap();
456 let (alpha_port, mut alpha_rx) = alpha_client.bind_handler_port::<u64>();
457 let PortLocation::Bound(alpha_dest) = alpha_port.location() else {
458 panic!("alpha handler port must be bound");
459 };
460
461 let (beta_client, _) = beta.client("client").unwrap();
462 let (beta_port, mut beta_rx) = beta_client.bind_handler_port::<u64>();
463 let PortLocation::Bound(beta_dest) = beta_port.location() else {
464 panic!("beta handler port must be bound");
465 };
466
467 let sender = test_actor_id("test", "sender");
468
469 gateway.post(
470 MessageEnvelope::serialize(sender.clone(), alpha_dest.clone(), &111u64, Flattrs::new())
471 .unwrap(),
472 monitored_return_handle(),
473 );
474 let received = time::timeout(Duration::from_secs(5), alpha_rx.recv())
475 .await
476 .expect("alpha_rx timed out")
477 .expect("alpha_rx closed");
478 assert_eq!(received, 111);
479 assert_eq!(forwarded.load(Ordering::SeqCst), 0);
480
481 gateway.post(
482 MessageEnvelope::serialize(sender.clone(), beta_dest.clone(), &222u64, Flattrs::new())
483 .unwrap(),
484 monitored_return_handle(),
485 );
486 let received = time::timeout(Duration::from_secs(5), beta_rx.recv())
487 .await
488 .expect("beta_rx timed out")
489 .expect("beta_rx closed");
490 assert_eq!(received, 222);
491 assert_eq!(forwarded.load(Ordering::SeqCst), 0);
492
493 let stranger_proc = ProcAddr::instance(ChannelAddr::Local(9999), "stranger");
494 let stranger_dest = stranger_proc
495 .actor_addr("ghost")
496 .port_addr(Port::from(0u64));
497 gateway.post(
498 MessageEnvelope::serialize(sender, stranger_dest, &333u64, Flattrs::new()).unwrap(),
499 monitored_return_handle(),
500 );
501 assert_eq!(forwarded.load(Ordering::SeqCst), 1);
502 assert!(
503 time::timeout(Duration::from_millis(50), alpha_rx.recv())
504 .await
505 .is_err(),
506 "alpha_rx received a message after stranger post",
507 );
508 assert!(
509 time::timeout(Duration::from_millis(50), beta_rx.recv())
510 .await
511 .is_err(),
512 "beta_rx received a message after stranger post",
513 );
514 }
515
516 #[tokio::test]
521 async fn test_ping_pong_across_shared_gateway() {
522 let gateway = Gateway::isolated();
523
524 let alpha = Proc::builder()
525 .proc_id(ProcId::instance(Label::strip("alpha")))
526 .shared_gateway(gateway.clone())
527 .build()
528 .unwrap();
529 let beta = Proc::builder()
530 .proc_id(ProcId::instance(Label::strip("beta")))
531 .shared_gateway(gateway.clone())
532 .build()
533 .unwrap();
534
535 let (client, _) = alpha.client("client").unwrap();
536 let (undeliverable_msg_tx, mut undeliverable_rx) =
537 client.open_port::<Undeliverable<MessageEnvelope>>();
538
539 let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
540 let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
541 let ping_handle = alpha.spawn::<PingPongActor>("ping", ping_actor).unwrap();
542 let pong_handle = beta.spawn::<PingPongActor>("pong", pong_actor).unwrap();
543
544 let (local_port, local_receiver) = client.open_once_port();
545
546 ping_handle.post(
547 &client,
548 PingPongMessage(10, pong_handle.bind(), local_port.bind()),
549 );
550
551 let received = time::timeout(Duration::from_secs(5), local_receiver.recv())
552 .await
553 .expect("local_receiver timed out")
554 .expect("ping pong did not complete");
555 assert!(received);
556
557 assert!(
558 time::timeout(Duration::from_millis(50), undeliverable_rx.recv())
559 .await
560 .is_err(),
561 "unexpected undeliverable during cross-proc ping-pong",
562 );
563 }
564
565 #[tokio::test]
571 async fn test_gateway_post_removes_stale_weak_procs() {
572 #[derive(Clone)]
573 struct CountingSender(Arc<AtomicUsize>);
574
575 #[async_trait]
576 impl MailboxSender for CountingSender {
577 fn post_unchecked(
578 &self,
579 _envelope: MessageEnvelope,
580 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
581 ) {
582 self.0.fetch_add(1, Ordering::SeqCst);
583 }
584 }
585
586 let forwarded = Arc::new(AtomicUsize::new(0));
587 let gateway = Gateway::configured(
588 channel::reserve_local_addr().into(),
589 BoxedMailboxSender::new(CountingSender(forwarded.clone())),
590 );
591
592 let proc = Proc::builder()
593 .proc_id(ProcId::instance(Label::strip("alpha")))
594 .shared_gateway(gateway.clone())
595 .build()
596 .unwrap();
597 let proc_id = proc.proc_id().clone();
598 let dest = proc
599 .proc_addr()
600 .actor_addr("worker")
601 .port_addr(Port::from(0u64));
602
603 assert_eq!(gateway.inner.procs.read().unwrap().len(), 1);
605
606 drop(proc);
607
608 {
611 let procs = gateway.inner.procs.read().unwrap();
612 assert_eq!(procs.len(), 1);
613 assert!(
614 procs.get(&proc_id).and_then(WeakProc::upgrade).is_none(),
615 "expected dropped proc to remain only as a stale WeakProc entry",
616 );
617 }
618
619 gateway.post(
620 MessageEnvelope::serialize(
621 test_actor_id("test", "sender"),
622 dest,
623 &42u64,
624 Flattrs::new(),
625 )
626 .unwrap(),
627 monitored_return_handle(),
628 );
629
630 assert_eq!(forwarded.load(Ordering::SeqCst), 1);
633 {
634 let procs = gateway.inner.procs.read().unwrap();
635 assert_eq!(procs.len(), 0);
636 assert!(procs.get(&proc_id).is_none());
637 }
638 }
639
640 #[test]
645 #[should_panic(expected = "gateway already has a live proc")]
646 fn test_gateway_attach_panics_on_duplicate_live_proc() {
647 let gateway = Gateway::isolated();
648 let proc_id = ProcId::instance(Label::strip("alpha"));
649
650 let _first = Proc::builder()
655 .proc_id(proc_id.clone())
656 .shared_gateway(gateway.clone())
657 .build()
658 .unwrap();
659
660 let _second = Proc::builder()
661 .proc_id(proc_id)
662 .shared_gateway(gateway.clone())
663 .build()
664 .unwrap();
665 }
666
667 #[tokio::test]
674 async fn test_gateway_flush_propagates_to_attached_procs() {
675 #[derive(Clone)]
676 struct FlushCountingSender(Arc<AtomicUsize>);
677
678 #[async_trait]
679 impl MailboxSender for FlushCountingSender {
680 fn post_unchecked(
681 &self,
682 _envelope: MessageEnvelope,
683 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
684 ) {
685 }
687
688 async fn flush(&self) -> Result<(), anyhow::Error> {
689 self.0.fetch_add(1, Ordering::SeqCst);
690 Ok(())
691 }
692 }
693
694 let alpha_flushed = Arc::new(AtomicUsize::new(0));
695 let beta_flushed = Arc::new(AtomicUsize::new(0));
696 let forwarder_flushed = Arc::new(AtomicUsize::new(0));
697
698 let gateway = Gateway::configured(
699 channel::reserve_local_addr().into(),
700 BoxedMailboxSender::new(FlushCountingSender(forwarder_flushed.clone())),
701 );
702
703 let alpha = Proc::builder()
704 .proc_id(ProcId::instance(Label::strip("alpha")))
705 .shared_gateway(gateway.clone())
706 .build()
707 .unwrap();
708 let beta = Proc::builder()
709 .proc_id(ProcId::instance(Label::strip("beta")))
710 .shared_gateway(gateway.clone())
711 .build()
712 .unwrap();
713
714 let alpha_probe = alpha.proc_addr().actor_addr("alpha_probe").id().clone();
718 let beta_probe = beta.proc_addr().actor_addr("beta_probe").id().clone();
719 assert!(
720 alpha
721 .muxer()
722 .bind(alpha_probe, FlushCountingSender(alpha_flushed.clone()))
723 );
724 assert!(
725 beta.muxer()
726 .bind(beta_probe, FlushCountingSender(beta_flushed.clone()))
727 );
728
729 assert_eq!(gateway.inner.procs.read().unwrap().len(), 2);
731
732 gateway.flush().await.unwrap();
733
734 assert_eq!(alpha_flushed.load(Ordering::SeqCst), 1);
735 assert_eq!(beta_flushed.load(Ordering::SeqCst), 1);
736 assert_eq!(forwarder_flushed.load(Ordering::SeqCst), 1);
737 }
738
739 #[tokio::test]
747 async fn test_weak_gateway_bounces_broken_link_after_drop() {
748 let gateway = Gateway::isolated();
749 let weak = WeakGateway::new(&gateway);
750 drop(gateway);
751
752 let scratch = Proc::isolated();
754 let (scratch_client, _) = scratch.client("return").unwrap();
755 let (return_handle, mut return_rx) =
756 scratch_client.open_port::<Undeliverable<MessageEnvelope>>();
757
758 let dest_proc = ProcAddr::instance(ChannelAddr::Local(1234), "stranger");
762 let dest = dest_proc.actor_addr("ghost").port_addr(Port::from(0u64));
763 let envelope = MessageEnvelope::serialize(
764 test_actor_id("test", "sender"),
765 dest.clone(),
766 &42u64,
767 Flattrs::new(),
768 )
769 .unwrap();
770
771 weak.post(envelope, return_handle);
775
776 let Undeliverable::Message(envelope) =
777 time::timeout(Duration::from_secs(5), return_rx.recv())
778 .await
779 .expect("return_rx timed out")
780 .expect("return_rx closed")
781 else {
782 panic!("expected returned envelope");
783 };
784 assert_eq!(envelope.dest(), &dest);
785 assert!(
786 envelope
787 .errors()
788 .iter()
789 .any(|e| matches!(e, DeliveryError::BrokenLink(_))),
790 "expected BrokenLink bounce, got {:?}",
791 envelope.errors(),
792 );
793 }
794
795 #[tokio::test]
799 async fn test_gateway_attach_silently_replaces_dead_entry() {
800 let gateway = Gateway::isolated();
801 let proc_id = ProcId::instance(Label::strip("alpha"));
802
803 let first = Proc::builder()
804 .proc_id(proc_id.clone())
805 .shared_gateway(gateway.clone())
806 .build()
807 .unwrap();
808 drop(first);
809
810 {
812 let procs = gateway.inner.procs.read().unwrap();
813 assert_eq!(procs.len(), 1);
814 assert!(
815 procs.get(&proc_id).and_then(WeakProc::upgrade).is_none(),
816 "expected first proc to remain only as a stale WeakProc entry",
817 );
818 }
819
820 let second = Proc::builder()
824 .proc_id(proc_id.clone())
825 .shared_gateway(gateway.clone())
826 .build()
827 .unwrap();
828 assert_eq!(gateway.inner.procs.read().unwrap().len(), 1);
829
830 let (client, _) = second.client("client").unwrap();
832 let (port, mut rx) = client.bind_handler_port::<u64>();
833 let dest = port.bind().port_addr().clone();
834
835 gateway.post(
836 MessageEnvelope::serialize(
837 test_actor_id("test", "sender"),
838 dest,
839 &42u64,
840 Flattrs::new(),
841 )
842 .unwrap(),
843 monitored_return_handle(),
844 );
845
846 let received = time::timeout(Duration::from_secs(5), rx.recv())
847 .await
848 .expect("rx timed out")
849 .expect("rx closed");
850 assert_eq!(received, 42);
851 }
852
853 #[tokio::test]
859 async fn test_gateway_serve_stop_unwinds_in_any_order() {
860 let gateway = Gateway::isolated();
861 let fallback = gateway.default_location();
862
863 let s1 = Gateway::serve(&gateway, ChannelAddr::any(ChannelTransport::Local)).unwrap();
864 let loc1 = gateway.default_location();
865 let s2 = Gateway::serve(&gateway, ChannelAddr::any(ChannelTransport::Local)).unwrap();
866 let loc2 = gateway.default_location();
867 let s3 = Gateway::serve(&gateway, ChannelAddr::any(ChannelTransport::Local)).unwrap();
868 let loc3 = gateway.default_location();
869
870 assert_eq!(loc1, fallback);
874 assert_ne!(loc1, loc2);
875 assert_ne!(loc2, loc3);
876 assert_ne!(loc1, loc3);
877
878 s2.stop("test");
881 s2.await.unwrap().unwrap();
882 assert_eq!(gateway.default_location(), loc3);
883
884 s3.stop("test");
886 s3.await.unwrap().unwrap();
887 assert_eq!(gateway.default_location(), loc1);
888
889 s1.stop("test");
892 s1.await.unwrap().unwrap();
893 assert_eq!(gateway.default_location(), fallback);
894 }
895}