hyperactor/actor/
remote.rs1use std::any::TypeId;
12use std::collections::HashMap;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::LazyLock;
16
17use hyperactor_config::Flattrs;
18
19use crate::Actor;
20use crate::Data;
21use crate::proc::Proc;
22use crate::reference;
23
24pub const USER_PORT_OFFSET: u64 = 1024;
26
27#[macro_export]
39macro_rules! remote {
40 ($actor:ty) => {
41 $crate::internal_macro_support::paste! {
42 static [<$actor:snake:upper _NAME>]: std::sync::LazyLock<&'static str> =
43 std::sync::LazyLock::new(|| <$actor as $crate::internal_macro_support::typeuri::Named>::typename());
44 $crate::internal_macro_support::inventory::submit! {
45 $crate::actor::remote::SpawnableActor {
46 name: &[<$actor:snake:upper _NAME>],
47 gspawn: <$actor as $crate::actor::RemoteSpawn>::gspawn,
48 get_type_id: <$actor as $crate::actor::RemoteSpawn>::get_type_id,
49 }
50 }
51 }
52 };
53}
54
55#[derive(Debug)]
58pub struct SpawnableActor {
59 pub name: &'static LazyLock<&'static str>,
65
66 pub gspawn: fn(
68 &Proc,
69 &str,
70 Data,
71 Flattrs,
72 ) -> Pin<
73 Box<dyn Future<Output = Result<reference::ActorId, anyhow::Error>> + Send>,
74 >,
75
76 pub get_type_id: fn() -> TypeId,
79}
80
81inventory::collect!(SpawnableActor);
82
83#[derive(Debug)]
86pub struct Remote {
87 by_name: HashMap<&'static str, &'static SpawnableActor>,
88 by_type_id: HashMap<TypeId, &'static SpawnableActor>,
89}
90
91impl Remote {
92 pub fn collect() -> Self {
94 let mut by_name = HashMap::new();
95 let mut by_type_id = HashMap::new();
96 for entry in inventory::iter::<SpawnableActor> {
97 if by_name.insert(**entry.name, entry).is_some() {
98 panic!("actor name {} registered multiple times", **entry.name);
99 }
100 let type_id = (entry.get_type_id)();
101 if by_type_id.insert(type_id, entry).is_some() {
102 panic!(
103 "type id {:?} ({}) registered multiple times",
104 type_id, **entry.name
105 );
106 }
107 }
108 Self {
109 by_name,
110 by_type_id,
111 }
112 }
113
114 pub fn name_of<A: Actor>(&self) -> Option<&'static str> {
116 self.by_type_id
117 .get(&TypeId::of::<A>())
118 .map(|entry| **entry.name)
119 }
120
121 pub async fn gspawn(
125 &self,
126 proc: &Proc,
127 actor_type: &str,
128 actor_name: &str,
129 params: Data,
130 environment: Flattrs,
131 ) -> Result<reference::ActorId, anyhow::Error> {
132 let entry = self
133 .by_name
134 .get(actor_type)
135 .ok_or_else(|| anyhow::anyhow!("actor type {} not registered", actor_type))?;
136 (entry.gspawn)(proc, actor_name, params, environment).await
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use std::assert_matches::assert_matches;
143
144 use async_trait::async_trait;
145 use hyperactor_config::Flattrs;
146
147 use super::*;
148 use crate as hyperactor; use crate::Context;
150 use crate::Handler;
151 use crate::RemoteSpawn;
152
153 #[derive(Debug)]
154 #[hyperactor::export(handlers = [()])]
155 struct MyActor;
156
157 #[async_trait]
158 impl Actor for MyActor {}
159
160 #[async_trait]
161 impl RemoteSpawn for MyActor {
162 type Params = bool;
163
164 async fn new(params: bool, _environment: Flattrs) -> Result<Self, anyhow::Error> {
165 if params {
166 Ok(MyActor)
167 } else {
168 Err(anyhow::anyhow!("some failure"))
169 }
170 }
171 }
172
173 #[async_trait]
174 impl Handler<()> for MyActor {
175 async fn handle(&mut self, _cx: &Context<Self>, _message: ()) -> anyhow::Result<()> {
176 unimplemented!()
177 }
178 }
179
180 remote!(MyActor);
181
182 #[tokio::test]
183 async fn test_registry() {
184 let remote = Remote::collect();
185 assert_matches!(
186 remote.name_of::<MyActor>(),
187 Some("hyperactor::actor::remote::tests::MyActor")
188 );
189
190 let _ = remote
191 .gspawn(
192 &Proc::local(),
193 "hyperactor::actor::remote::tests::MyActor",
194 "actor",
195 bincode::serialize(&true).unwrap(),
196 Flattrs::default(),
197 )
198 .await
199 .unwrap();
200
201 let err = remote
202 .gspawn(
203 &Proc::local(),
204 "hyperactor::actor::remote::tests::MyActor",
205 "actor",
206 bincode::serialize(&false).unwrap(),
207 Flattrs::default(),
208 )
209 .await
210 .unwrap_err();
211
212 assert_eq!(err.to_string().as_str(), "some failure");
213 }
214}