hyperactor_mesh/
namespace.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Namespace-based mesh registry for discovering and connecting to remote meshes.
10//!
11//! This module provides a namespace abstraction for registering and looking up
12//! meshes (host, proc, and actor) by name. Names are unique within a namespace,
13//! and the full key is composed of (namespace, kind, name).
14//!
15//! # Example
16//!
17//! ```ignore
18//! let ns = InMemoryNamespace::new("my.prefix.tier");
19//!
20//! // Register meshes
21//! ns.register("workers", &actor_mesh_ref).await?;
22//! ns.register("procs", &proc_mesh_ref).await?;
23//!
24//! // Lookup meshes
25//! let actors: ActorMeshRef<MyActor> = ns.get("workers").await?;
26//! let procs: ProcMeshRef = ns.get("procs").await?;
27//! ```
28
29use std::collections::HashMap;
30use std::sync::RwLock;
31
32use async_trait::async_trait;
33use hyperactor::actor::Referable;
34use serde::Serialize;
35use serde::de::DeserializeOwned;
36
37use crate::ActorMeshRef;
38use crate::HostMeshRef;
39use crate::ProcMeshRef;
40
41/// The kind of mesh being registered.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub enum MeshKind {
44    Host,
45    Proc,
46    Actor,
47}
48
49impl MeshKind {
50    /// Returns the string representation of this kind.
51    pub fn as_str(&self) -> &'static str {
52        match self {
53            MeshKind::Host => "host",
54            MeshKind::Proc => "proc",
55            MeshKind::Actor => "actor",
56        }
57    }
58}
59
60impl std::fmt::Display for MeshKind {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        write!(f, "{}", self.as_str())
63    }
64}
65
66/// Errors that can occur during namespace operations.
67#[derive(Debug, thiserror::Error)]
68pub enum NamespaceError {
69    #[error("serialization failed: {0}")]
70    SerializationError(String),
71    #[error("deserialization failed: {0}")]
72    DeserializationError(String),
73    #[error("operation failed: {0}")]
74    OperationError(String),
75    #[error("not found: {0}")]
76    NotFound(String),
77}
78
79/// Trait for mesh types that can be registered in a namespace.
80///
81/// This trait is implemented for `HostMeshRef`, `ProcMeshRef`, and `ActorMeshRef<A>`,
82/// allowing them to be registered and looked up using the generic `register` and `get`
83/// methods on `Namespace`.
84pub trait Registrable: Serialize + DeserializeOwned + Send + Sync {
85    /// The kind of mesh this type represents.
86    fn kind() -> MeshKind;
87}
88
89impl Registrable for HostMeshRef {
90    fn kind() -> MeshKind {
91        MeshKind::Host
92    }
93}
94
95impl Registrable for ProcMeshRef {
96    fn kind() -> MeshKind {
97        MeshKind::Proc
98    }
99}
100
101impl<A: Referable> Registrable for ActorMeshRef<A> {
102    fn kind() -> MeshKind {
103        MeshKind::Actor
104    }
105}
106
107/// A namespace for registering and looking up meshes.
108///
109/// Namespaces provide isolation for mesh names. A mesh registered as "foo" in
110/// namespace "a.b.c" does not conflict with "foo" in namespace "x.y.z".
111///
112/// The full key for a registered mesh is `{namespace}.{kind}.{name}`, e.g.,
113/// `my.namespace.actor.workers`.
114#[async_trait]
115pub trait Namespace {
116    /// The namespace name (e.g., "my.namespace").
117    fn name(&self) -> &str;
118
119    /// Register a mesh under the given name.
120    ///
121    /// The mesh type determines the kind (host, proc, or actor) automatically
122    /// via the `Registrable` trait.
123    async fn register<T: Registrable>(&self, name: &str, mesh: &T) -> Result<(), NamespaceError>;
124
125    /// Lookup a mesh by name.
126    ///
127    /// The mesh type must be specified (e.g., `ns.get::<ProcMeshRef>("name")`).
128    async fn get<T: Registrable>(&self, name: &str) -> Result<T, NamespaceError>;
129
130    /// Unregister a mesh by name.
131    async fn unregister<T: Registrable>(&self, name: &str) -> Result<(), NamespaceError>;
132
133    /// Check if a mesh exists in this namespace.
134    async fn contains<T: Registrable>(&self, name: &str) -> Result<bool, NamespaceError>;
135}
136
137/// An in-memory namespace implementation for testing.
138#[derive(Debug)]
139pub struct InMemoryNamespace {
140    namespace_name: String,
141    data: RwLock<HashMap<String, Vec<u8>>>,
142}
143
144impl InMemoryNamespace {
145    /// Create a new in-memory namespace with the given name.
146    pub fn new(name: impl Into<String>) -> Self {
147        Self {
148            namespace_name: name.into(),
149            data: RwLock::new(HashMap::new()),
150        }
151    }
152
153    /// Build the full key string: `{namespace}.{kind}.{name}`.
154    fn full_key(&self, kind: MeshKind, name: &str) -> String {
155        format!("{}.{}.{}", self.namespace_name, kind.as_str(), name)
156    }
157}
158
159#[async_trait]
160impl Namespace for InMemoryNamespace {
161    fn name(&self) -> &str {
162        &self.namespace_name
163    }
164
165    async fn register<T: Registrable>(&self, name: &str, mesh: &T) -> Result<(), NamespaceError> {
166        let data = serde_json::to_vec(mesh)
167            .map_err(|e| NamespaceError::SerializationError(e.to_string()))?;
168        let key = self.full_key(T::kind(), name);
169        self.data
170            .write()
171            .map_err(|e| NamespaceError::OperationError(e.to_string()))?
172            .insert(key.clone(), data);
173        tracing::debug!(
174            key = %key,
175            "registered mesh to in-memory namespace"
176        );
177        Ok(())
178    }
179
180    async fn get<T: Registrable>(&self, name: &str) -> Result<T, NamespaceError> {
181        let key = self.full_key(T::kind(), name);
182        let data = self
183            .data
184            .read()
185            .map_err(|e| NamespaceError::OperationError(e.to_string()))?
186            .get(&key)
187            .cloned()
188            .ok_or(NamespaceError::NotFound(key))?;
189        serde_json::from_slice(&data)
190            .map_err(|e| NamespaceError::DeserializationError(e.to_string()))
191    }
192
193    async fn unregister<T: Registrable>(&self, name: &str) -> Result<(), NamespaceError> {
194        let key = self.full_key(T::kind(), name);
195        self.data
196            .write()
197            .map_err(|e| NamespaceError::OperationError(e.to_string()))?
198            .remove(&key);
199        tracing::debug!(
200            key = %key,
201            "unregistered mesh from in-memory namespace"
202        );
203        Ok(())
204    }
205
206    async fn contains<T: Registrable>(&self, name: &str) -> Result<bool, NamespaceError> {
207        let key = self.full_key(T::kind(), name);
208        Ok(self
209            .data
210            .read()
211            .map_err(|e| NamespaceError::OperationError(e.to_string()))?
212            .contains_key(&key))
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use std::str::FromStr;
219
220    use super::*;
221
222    fn make_host_mesh_ref(name: &str) -> HostMeshRef {
223        // Parse a HostMeshRef from string format: "name:host1,host2@region"
224        let s = format!("{}:tcp:127.0.0.1:1234,tcp:127.0.0.1:1235@replica=2/1", name);
225        HostMeshRef::from_str(&s).unwrap()
226    }
227
228    #[tokio::test]
229    async fn test_register_and_get() {
230        let ns = InMemoryNamespace::new("test.namespace");
231
232        let mesh = make_host_mesh_ref("test_mesh");
233
234        // Register
235        ns.register("my_hosts", &mesh).await.unwrap();
236
237        // Get
238        let retrieved: HostMeshRef = ns.get("my_hosts").await.unwrap();
239        assert_eq!(retrieved, mesh);
240    }
241
242    #[tokio::test]
243    async fn test_contains() {
244        let ns = InMemoryNamespace::new("test.namespace");
245
246        let mesh = make_host_mesh_ref("workers");
247
248        // Not registered yet
249        assert!(!ns.contains::<HostMeshRef>("my_hosts").await.unwrap());
250
251        // Register
252        ns.register("my_hosts", &mesh).await.unwrap();
253
254        // Now exists
255        assert!(ns.contains::<HostMeshRef>("my_hosts").await.unwrap());
256
257        // Different name doesn't exist
258        assert!(!ns.contains::<HostMeshRef>("other").await.unwrap());
259    }
260
261    #[tokio::test]
262    async fn test_unregister() {
263        let ns = InMemoryNamespace::new("test.namespace");
264
265        let mesh = make_host_mesh_ref("workers");
266
267        ns.register("my_hosts", &mesh).await.unwrap();
268        assert!(ns.contains::<HostMeshRef>("my_hosts").await.unwrap());
269
270        // Unregister
271        ns.unregister::<HostMeshRef>("my_hosts").await.unwrap();
272        assert!(!ns.contains::<HostMeshRef>("my_hosts").await.unwrap());
273
274        // Get after unregister should fail
275        let result: Result<HostMeshRef, _> = ns.get("my_hosts").await;
276        assert!(result.is_err());
277    }
278
279    #[tokio::test]
280    async fn test_get_not_found() {
281        let ns = InMemoryNamespace::new("test.namespace");
282
283        let result: Result<HostMeshRef, _> = ns.get("nonexistent").await;
284        assert!(matches!(result, Err(NamespaceError::NotFound(_))));
285    }
286
287    #[tokio::test]
288    async fn test_multiple_meshes() {
289        let ns = InMemoryNamespace::new("test");
290
291        let mesh1 = make_host_mesh_ref("mesh1");
292        let mesh2 = make_host_mesh_ref("mesh2");
293
294        // Register both under different names
295        ns.register("hosts_a", &mesh1).await.unwrap();
296        ns.register("hosts_b", &mesh2).await.unwrap();
297
298        // Retrieve each correctly
299        let retrieved1: HostMeshRef = ns.get("hosts_a").await.unwrap();
300        let retrieved2: HostMeshRef = ns.get("hosts_b").await.unwrap();
301
302        assert_eq!(retrieved1, mesh1);
303        assert_eq!(retrieved2, mesh2);
304    }
305
306    #[tokio::test]
307    async fn test_overwrite_registration() {
308        let ns = InMemoryNamespace::new("test");
309
310        let mesh1 = make_host_mesh_ref("mesh1");
311        let mesh2 = make_host_mesh_ref("mesh2");
312
313        // Register first mesh
314        ns.register("hosts", &mesh1).await.unwrap();
315        let retrieved: HostMeshRef = ns.get("hosts").await.unwrap();
316        assert_eq!(retrieved, mesh1);
317
318        // Overwrite with second mesh
319        ns.register("hosts", &mesh2).await.unwrap();
320        let retrieved: HostMeshRef = ns.get("hosts").await.unwrap();
321        assert_eq!(retrieved, mesh2);
322    }
323
324    #[test]
325    fn test_mesh_kind_as_str() {
326        assert_eq!(MeshKind::Host.as_str(), "host");
327        assert_eq!(MeshKind::Proc.as_str(), "proc");
328        assert_eq!(MeshKind::Actor.as_str(), "actor");
329    }
330
331    #[test]
332    fn test_name() {
333        let ns = InMemoryNamespace::new("my.namespace");
334        assert_eq!(ns.name(), "my.namespace");
335    }
336
337    #[test]
338    fn test_registrable_impl_for_host_mesh_ref() {
339        assert_eq!(HostMeshRef::kind(), MeshKind::Host);
340    }
341
342    #[test]
343    fn test_registrable_impl_for_proc_mesh_ref() {
344        assert_eq!(ProcMeshRef::kind(), MeshKind::Proc);
345    }
346}