hyperactor_mesh/
router.rs1use std::collections::HashMap;
12use std::ops::Deref;
13use std::sync::OnceLock;
14
15use hyperactor::channel;
16use hyperactor::channel::ChannelAddr;
17use hyperactor::channel::ChannelError;
18use hyperactor::channel::ChannelTransport;
19use hyperactor::mailbox::DialMailboxRouter;
20use hyperactor::mailbox::MailboxRouter;
21use hyperactor::mailbox::MailboxServer;
22use tokio::sync::Mutex;
23
24pub fn global() -> &'static Router {
26 static GLOBAL_ROUTER: OnceLock<Router> = OnceLock::new();
27 GLOBAL_ROUTER.get_or_init(Router::new)
28}
29
30pub struct Router {
33 router: MailboxRouter,
34 #[allow(dead_code)] servers: Mutex<HashMap<ChannelTransport, ChannelAddr>>,
36}
37
38impl Deref for Router {
40 type Target = MailboxRouter;
41
42 fn deref(&self) -> &Self::Target {
43 &self.router
44 }
45}
46
47impl Router {
48 fn new() -> Self {
50 Self {
51 router: MailboxRouter::new(),
52 servers: Mutex::new(HashMap::new()),
53 }
54 }
55
56 #[allow(dead_code)]
60 #[hyperactor::instrument]
61 pub async fn serve(&self, transport: &ChannelTransport) -> Result<ChannelAddr, ChannelError> {
62 let mut servers = self.servers.lock().await;
63 if let Some(addr) = servers.get(transport) {
64 return Ok(addr.clone());
65 }
66
67 let (addr, rx) = channel::serve(ChannelAddr::any(transport.clone()))?;
68 self.router.clone().serve(rx);
69 servers.insert(transport.clone(), addr.clone());
70 Ok(addr)
71 }
72
73 pub fn bind_dial_router(&self, router: &DialMailboxRouter) {
76 for prefix in router.prefixes() {
77 self.router.bind(prefix, router.clone());
78 }
79 }
80}