1use std::any::TypeId;
12use std::collections::HashMap;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::LazyLock;
16
17use hyperactor_config::Flattrs;
18
19use crate::Actor;
20use crate::AnyActorHandle;
21use crate::Data;
22use crate::id::Uid;
23use crate::proc::InstanceCell;
24use crate::proc::Proc;
25
26pub const USER_PORT_OFFSET: u64 = 1024;
28
29#[macro_export]
41macro_rules! register_spawnable {
42 ($actor:ty) => {
43 const _: () = {
44 static NAME: std::sync::LazyLock<&'static str> = std::sync::LazyLock::new(|| {
45 <$actor as $crate::internal_macro_support::typeuri::Named>::typename()
46 });
47
48 $crate::internal_macro_support::inventory::submit! {
49 $crate::actor::remote::SpawnableActor {
50 name: &NAME,
51 gspawn_root_bind: <$actor as $crate::actor::RemoteSpawn>::gspawn_root_bind,
52 gspawn_child: <$actor as $crate::actor::RemoteSpawn>::gspawn_child,
53 get_type_id: <$actor as $crate::actor::RemoteSpawn>::get_type_id,
54 }
55 }
56 };
57 };
58}
59
60#[derive(Debug)]
63pub struct SpawnableActor {
64 pub name: &'static LazyLock<&'static str>,
70
71 pub gspawn_root_bind: fn(
74 &Proc,
75 Uid,
76 Data,
77 Flattrs,
78 ) -> Pin<
79 Box<dyn Future<Output = Result<crate::ActorAddr, anyhow::Error>> + Send>,
80 >,
81
82 pub gspawn_child:
85 fn(
86 &Proc,
87 InstanceCell,
88 Uid,
89 Data,
90 Flattrs,
91 ) -> Pin<Box<dyn Future<Output = Result<AnyActorHandle, anyhow::Error>> + Send>>,
92
93 pub get_type_id: fn() -> TypeId,
96}
97
98inventory::collect!(SpawnableActor);
99
100#[derive(Debug)]
103pub struct Remote {
104 by_name: HashMap<&'static str, &'static SpawnableActor>,
105 by_type_id: HashMap<TypeId, &'static SpawnableActor>,
106}
107
108impl Remote {
109 pub fn collect() -> Self {
111 let mut by_name = HashMap::new();
112 let mut by_type_id = HashMap::new();
113 for entry in inventory::iter::<SpawnableActor> {
114 if by_name.insert(**entry.name, entry).is_some() {
115 panic!("actor name {} registered multiple times", **entry.name);
116 }
117 let type_id = (entry.get_type_id)();
118 if by_type_id.insert(type_id, entry).is_some() {
119 panic!(
120 "type id {:?} ({}) registered multiple times",
121 type_id, **entry.name
122 );
123 }
124 }
125 Self {
126 by_name,
127 by_type_id,
128 }
129 }
130
131 pub fn global() -> &'static Self {
133 static REMOTE: LazyLock<Remote> = LazyLock::new(Remote::collect);
134 &REMOTE
135 }
136
137 pub fn name_of<A: Actor>(&self) -> Option<&'static str> {
139 self.by_type_id
140 .get(&TypeId::of::<A>())
141 .map(|entry| **entry.name)
142 }
143
144 pub async fn gspawn(
148 &self,
149 proc: &Proc,
150 actor_type: &str,
151 actor_uid: Uid,
152 params: Data,
153 environment: Flattrs,
154 ) -> Result<crate::ActorAddr, anyhow::Error> {
155 let entry = self
156 .by_name
157 .get(actor_type)
158 .ok_or_else(|| anyhow::anyhow!("actor type {} not registered", actor_type))?;
159 (entry.gspawn_root_bind)(proc, actor_uid, params, environment).await
160 }
161
162 pub async fn gspawn_child(
165 &self,
166 proc: &Proc,
167 parent: InstanceCell,
168 actor_type: &str,
169 actor_uid: Uid,
170 params: Data,
171 environment: Flattrs,
172 ) -> Result<AnyActorHandle, anyhow::Error> {
173 let entry = self
174 .by_name
175 .get(actor_type)
176 .ok_or_else(|| anyhow::anyhow!("actor type {} not registered", actor_type))?;
177 (entry.gspawn_child)(proc, parent, actor_uid, params, environment).await
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use std::assert_matches;
184
185 use async_trait::async_trait;
186 use hyperactor_config::Flattrs;
187
188 use super::*;
189 use crate as hyperactor; use crate::Context;
191 use crate::Handler;
192 use crate::RemoteSpawn;
193 use crate::id::Label;
194
195 #[derive(Debug)]
196 #[hyperactor::export(())]
197 struct MyActor;
198
199 #[async_trait]
200 impl Actor for MyActor {}
201
202 #[async_trait]
203 impl RemoteSpawn for MyActor {
204 type Params = bool;
205
206 async fn new(params: bool, _environment: Flattrs) -> Result<Self, anyhow::Error> {
207 if params {
208 Ok(MyActor)
209 } else {
210 Err(anyhow::anyhow!("some failure"))
211 }
212 }
213 }
214
215 #[async_trait]
216 impl Handler<()> for MyActor {
217 async fn handle(&mut self, _cx: &Context<Self>, _message: ()) -> anyhow::Result<()> {
218 unimplemented!()
219 }
220 }
221
222 register_spawnable!(MyActor);
223
224 #[derive(Debug, Default)]
225 #[hyperactor::export(())]
226 struct GenericActor<T>(std::marker::PhantomData<T>);
227
228 #[async_trait]
229 impl<T: Send + 'static> Actor for GenericActor<T> {}
230
231 #[async_trait]
232 impl<T: Send + Sync + 'static> Handler<()> for GenericActor<T> {
233 async fn handle(&mut self, _cx: &Context<Self>, _message: ()) -> anyhow::Result<()> {
234 unimplemented!()
235 }
236 }
237
238 register_spawnable!(GenericActor<u64>);
239 register_spawnable!(GenericActor<bool>);
240
241 #[tokio::test]
242 async fn test_registry() {
243 let remote = Remote::collect();
244 assert_matches!(
245 remote.name_of::<MyActor>(),
246 Some("hyperactor::actor::remote::tests::MyActor")
247 );
248 assert_matches!(
249 remote.name_of::<GenericActor<u64>>(),
250 Some("hyperactor::actor::remote::tests::GenericActor<u64>")
251 );
252 assert_matches!(
253 remote.name_of::<GenericActor<bool>>(),
254 Some("hyperactor::actor::remote::tests::GenericActor<bool>")
255 );
256 assert_ne!(
257 <GenericActor<u64> as typeuri::Named>::typename(),
258 <GenericActor<bool> as typeuri::Named>::typename()
259 );
260
261 let _ = remote
262 .gspawn(
263 &Proc::isolated(),
264 "hyperactor::actor::remote::tests::MyActor",
265 Uid::instance(Label::new("actor").unwrap()),
266 bincode::serde::encode_to_vec(true, bincode::config::legacy()).unwrap(),
267 Flattrs::default(),
268 )
269 .await
270 .unwrap();
271
272 let err = remote
273 .gspawn(
274 &Proc::isolated(),
275 "hyperactor::actor::remote::tests::MyActor",
276 Uid::instance(Label::new("actor").unwrap()),
277 bincode::serde::encode_to_vec(false, bincode::config::legacy()).unwrap(),
278 Flattrs::default(),
279 )
280 .await
281 .unwrap_err();
282
283 assert_eq!(err.to_string().as_str(), "some failure");
284 }
285
286 #[tokio::test]
287 async fn test_instance_gspawn_child_returns_erased_handle() {
288 let proc = Proc::isolated();
289 let (parent, _parent_handle) = proc.client("parent").unwrap();
290
291 let child = parent
292 .gspawn(
293 "hyperactor::actor::remote::tests::MyActor",
294 bincode::serde::encode_to_vec(true, bincode::config::legacy()).unwrap(),
295 )
296 .await
297 .unwrap();
298
299 assert!(!child.actor_id().is_root());
300 assert!(child.downcast::<MyActor>().is_some());
301 assert!(child.downcast::<GenericActor<u64>>().is_none());
302
303 child.stop("test").unwrap();
304 child.await;
305 }
306
307 #[tokio::test]
308 async fn test_instance_gspawn_uid_uses_explicit_uid() {
309 let proc = Proc::isolated();
310 let (parent, _parent_handle) = proc.client("parent").unwrap();
311 let uid = Uid::instance(Label::new("child").unwrap());
312
313 let child = parent
314 .gspawn_uid(
315 "hyperactor::actor::remote::tests::MyActor",
316 uid.clone(),
317 bincode::serde::encode_to_vec(true, bincode::config::legacy()).unwrap(),
318 )
319 .await
320 .unwrap();
321
322 assert_eq!(child.actor_id().uid(), &uid);
323
324 child.stop("test").unwrap();
325 child.await;
326 }
327
328 #[tokio::test]
329 async fn test_instance_gspawn_uid_rejects_duplicate_uid() {
330 let proc = Proc::isolated();
331 let (parent, _parent_handle) = proc.client("parent").unwrap();
332 let uid = Uid::instance(Label::new("child").unwrap());
333
334 let child = parent
335 .gspawn_uid(
336 "hyperactor::actor::remote::tests::MyActor",
337 uid.clone(),
338 bincode::serde::encode_to_vec(true, bincode::config::legacy()).unwrap(),
339 )
340 .await
341 .unwrap();
342
343 let err = parent
344 .gspawn_uid(
345 "hyperactor::actor::remote::tests::MyActor",
346 uid,
347 bincode::serde::encode_to_vec(true, bincode::config::legacy()).unwrap(),
348 )
349 .await
350 .unwrap_err();
351
352 assert!(err.to_string().contains("has already been spawned"));
353
354 child.stop("test").unwrap();
355 child.await;
356 }
357}