Skip to main content

hyperactor/actor/
remote.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Management of actor registration for remote spawning.
10
11use std::any::TypeId;
12use std::collections::HashMap;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::LazyLock;
16
17use hyperactor_config::Flattrs;
18
19use crate::Actor;
20use crate::AnyActorHandle;
21use crate::Data;
22use crate::id::Uid;
23use crate::proc::InstanceCell;
24use crate::proc::Proc;
25
26/// The offset of user-defined ports (i.e., arbitrarily bound).
27pub const USER_PORT_OFFSET: u64 = 1024;
28
29/// Register an actor type so that it can be spawned remotely. The actor
30/// type must implement [`typeuri::Named`], which will be used to identify
31/// the actor globally.
32///
33/// Example:
34///
35/// ```ignore
36/// struct MyActor { ... }
37///
38/// register_spawnable!(MyActor);
39/// ```
40#[macro_export]
41macro_rules! register_spawnable {
42    ($actor:ty) => {
43        const _: () = {
44            static NAME: std::sync::LazyLock<&'static str> = std::sync::LazyLock::new(|| {
45                <$actor as $crate::internal_macro_support::typeuri::Named>::typename()
46            });
47
48            $crate::internal_macro_support::inventory::submit! {
49                $crate::actor::remote::SpawnableActor {
50                    name: &NAME,
51                    gspawn_root_bind: <$actor as $crate::actor::RemoteSpawn>::gspawn_root_bind,
52                    gspawn_child: <$actor as $crate::actor::RemoteSpawn>::gspawn_child,
53                    get_type_id: <$actor as $crate::actor::RemoteSpawn>::get_type_id,
54                }
55            }
56        };
57    };
58}
59
60/// A type-erased actor registration entry. These are constructed via
61/// [`crate::register_spawnable`].
62#[derive(Debug)]
63pub struct SpawnableActor {
64    /// A URI that globally identifies an actor. It is an error to register
65    /// multiple actors with the same name.
66    ///
67    /// This is a LazyLock because the names are provided through a trait
68    /// implementation, which can not yet be `const`.
69    pub name: &'static LazyLock<&'static str>,
70
71    /// Type-erased root spawn function. This is the type's
72    /// [`RemoteSpawn::gspawn_root_bind`].
73    pub gspawn_root_bind: fn(
74        &Proc,
75        Uid,
76        Data,
77        Flattrs,
78    ) -> Pin<
79        Box<dyn Future<Output = Result<crate::ActorAddr, anyhow::Error>> + Send>,
80    >,
81
82    /// Type-erased child spawn function. This is the type's
83    /// [`RemoteSpawn::gspawn_child`].
84    pub gspawn_child:
85        fn(
86            &Proc,
87            InstanceCell,
88            Uid,
89            Data,
90            Flattrs,
91        ) -> Pin<Box<dyn Future<Output = Result<AnyActorHandle, anyhow::Error>> + Send>>,
92
93    /// A function to retrieve the type id of the actor itself. This is
94    /// used to translate a concrete type to a global name.
95    pub get_type_id: fn() -> TypeId,
96}
97
98inventory::collect!(SpawnableActor);
99
100/// Registry of actors linked into this image and registered by way of
101/// [`crate::register_spawnable`].
102#[derive(Debug)]
103pub struct Remote {
104    by_name: HashMap<&'static str, &'static SpawnableActor>,
105    by_type_id: HashMap<TypeId, &'static SpawnableActor>,
106}
107
108impl Remote {
109    /// Construct a registry. Panics if there are conflicting registrations.
110    pub fn collect() -> Self {
111        let mut by_name = HashMap::new();
112        let mut by_type_id = HashMap::new();
113        for entry in inventory::iter::<SpawnableActor> {
114            if by_name.insert(**entry.name, entry).is_some() {
115                panic!("actor name {} registered multiple times", **entry.name);
116            }
117            let type_id = (entry.get_type_id)();
118            if by_type_id.insert(type_id, entry).is_some() {
119                panic!(
120                    "type id {:?} ({}) registered multiple times",
121                    type_id, **entry.name
122                );
123            }
124        }
125        Self {
126            by_name,
127            by_type_id,
128        }
129    }
130
131    /// Return the process-wide remote spawn registry.
132    pub fn global() -> &'static Self {
133        static REMOTE: LazyLock<Remote> = LazyLock::new(Remote::collect);
134        &REMOTE
135    }
136
137    /// Returns the name of the provided actor, if registered.
138    pub fn name_of<A: Actor>(&self) -> Option<&'static str> {
139        self.by_type_id
140            .get(&TypeId::of::<A>())
141            .map(|entry| **entry.name)
142    }
143
144    /// Spawns the actor with the provided sender, actor uid,
145    /// and serialized parameters. Returns an error if the actor is not
146    /// registered, or if the actor's spawn fails.
147    pub async fn gspawn(
148        &self,
149        proc: &Proc,
150        actor_type: &str,
151        actor_uid: Uid,
152        params: Data,
153        environment: Flattrs,
154    ) -> Result<crate::ActorAddr, anyhow::Error> {
155        let entry = self
156            .by_name
157            .get(actor_type)
158            .ok_or_else(|| anyhow::anyhow!("actor type {} not registered", actor_type))?;
159        (entry.gspawn_root_bind)(proc, actor_uid, params, environment).await
160    }
161
162    /// Spawns the actor as a child of the provided parent. Returns an
163    /// erased lifecycle handle.
164    pub async fn gspawn_child(
165        &self,
166        proc: &Proc,
167        parent: InstanceCell,
168        actor_type: &str,
169        actor_uid: Uid,
170        params: Data,
171        environment: Flattrs,
172    ) -> Result<AnyActorHandle, anyhow::Error> {
173        let entry = self
174            .by_name
175            .get(actor_type)
176            .ok_or_else(|| anyhow::anyhow!("actor type {} not registered", actor_type))?;
177        (entry.gspawn_child)(proc, parent, actor_uid, params, environment).await
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use std::assert_matches;
184
185    use async_trait::async_trait;
186    use hyperactor_config::Flattrs;
187
188    use super::*;
189    use crate as hyperactor; // for macros
190    use crate::Context;
191    use crate::Handler;
192    use crate::RemoteSpawn;
193    use crate::id::Label;
194
195    #[derive(Debug)]
196    #[hyperactor::export(())]
197    struct MyActor;
198
199    #[async_trait]
200    impl Actor for MyActor {}
201
202    #[async_trait]
203    impl RemoteSpawn for MyActor {
204        type Params = bool;
205
206        async fn new(params: bool, _environment: Flattrs) -> Result<Self, anyhow::Error> {
207            if params {
208                Ok(MyActor)
209            } else {
210                Err(anyhow::anyhow!("some failure"))
211            }
212        }
213    }
214
215    #[async_trait]
216    impl Handler<()> for MyActor {
217        async fn handle(&mut self, _cx: &Context<Self>, _message: ()) -> anyhow::Result<()> {
218            unimplemented!()
219        }
220    }
221
222    register_spawnable!(MyActor);
223
224    #[derive(Debug, Default)]
225    #[hyperactor::export(())]
226    struct GenericActor<T>(std::marker::PhantomData<T>);
227
228    #[async_trait]
229    impl<T: Send + 'static> Actor for GenericActor<T> {}
230
231    #[async_trait]
232    impl<T: Send + Sync + 'static> Handler<()> for GenericActor<T> {
233        async fn handle(&mut self, _cx: &Context<Self>, _message: ()) -> anyhow::Result<()> {
234            unimplemented!()
235        }
236    }
237
238    register_spawnable!(GenericActor<u64>);
239    register_spawnable!(GenericActor<bool>);
240
241    #[tokio::test]
242    async fn test_registry() {
243        let remote = Remote::collect();
244        assert_matches!(
245            remote.name_of::<MyActor>(),
246            Some("hyperactor::actor::remote::tests::MyActor")
247        );
248        assert_matches!(
249            remote.name_of::<GenericActor<u64>>(),
250            Some("hyperactor::actor::remote::tests::GenericActor<u64>")
251        );
252        assert_matches!(
253            remote.name_of::<GenericActor<bool>>(),
254            Some("hyperactor::actor::remote::tests::GenericActor<bool>")
255        );
256        assert_ne!(
257            <GenericActor<u64> as typeuri::Named>::typename(),
258            <GenericActor<bool> as typeuri::Named>::typename()
259        );
260
261        let _ = remote
262            .gspawn(
263                &Proc::isolated(),
264                "hyperactor::actor::remote::tests::MyActor",
265                Uid::instance(Label::new("actor").unwrap()),
266                bincode::serde::encode_to_vec(true, bincode::config::legacy()).unwrap(),
267                Flattrs::default(),
268            )
269            .await
270            .unwrap();
271
272        let err = remote
273            .gspawn(
274                &Proc::isolated(),
275                "hyperactor::actor::remote::tests::MyActor",
276                Uid::instance(Label::new("actor").unwrap()),
277                bincode::serde::encode_to_vec(false, bincode::config::legacy()).unwrap(),
278                Flattrs::default(),
279            )
280            .await
281            .unwrap_err();
282
283        assert_eq!(err.to_string().as_str(), "some failure");
284    }
285
286    #[tokio::test]
287    async fn test_instance_gspawn_child_returns_erased_handle() {
288        let proc = Proc::isolated();
289        let (parent, _parent_handle) = proc.client("parent").unwrap();
290
291        let child = parent
292            .gspawn(
293                "hyperactor::actor::remote::tests::MyActor",
294                bincode::serde::encode_to_vec(true, bincode::config::legacy()).unwrap(),
295            )
296            .await
297            .unwrap();
298
299        assert!(!child.actor_id().is_root());
300        assert!(child.downcast::<MyActor>().is_some());
301        assert!(child.downcast::<GenericActor<u64>>().is_none());
302
303        child.stop("test").unwrap();
304        child.await;
305    }
306
307    #[tokio::test]
308    async fn test_instance_gspawn_uid_uses_explicit_uid() {
309        let proc = Proc::isolated();
310        let (parent, _parent_handle) = proc.client("parent").unwrap();
311        let uid = Uid::instance(Label::new("child").unwrap());
312
313        let child = parent
314            .gspawn_uid(
315                "hyperactor::actor::remote::tests::MyActor",
316                uid.clone(),
317                bincode::serde::encode_to_vec(true, bincode::config::legacy()).unwrap(),
318            )
319            .await
320            .unwrap();
321
322        assert_eq!(child.actor_id().uid(), &uid);
323
324        child.stop("test").unwrap();
325        child.await;
326    }
327
328    #[tokio::test]
329    async fn test_instance_gspawn_uid_rejects_duplicate_uid() {
330        let proc = Proc::isolated();
331        let (parent, _parent_handle) = proc.client("parent").unwrap();
332        let uid = Uid::instance(Label::new("child").unwrap());
333
334        let child = parent
335            .gspawn_uid(
336                "hyperactor::actor::remote::tests::MyActor",
337                uid.clone(),
338                bincode::serde::encode_to_vec(true, bincode::config::legacy()).unwrap(),
339            )
340            .await
341            .unwrap();
342
343        let err = parent
344            .gspawn_uid(
345                "hyperactor::actor::remote::tests::MyActor",
346                uid,
347                bincode::serde::encode_to_vec(true, bincode::config::legacy()).unwrap(),
348            )
349            .await
350            .unwrap_err();
351
352        assert!(err.to_string().contains("has already been spawned"));
353
354        child.stop("test").unwrap();
355        child.await;
356    }
357}