hyperactor/actor/
remote.rs1use std::any::TypeId;
12use std::collections::HashMap;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::LazyLock;
16
17use crate::Actor;
18use crate::Data;
19use crate::proc::Proc;
20use crate::reference::ActorId;
21
22pub const USER_PORT_OFFSET: u64 = 1024;
24
25#[macro_export]
37macro_rules! remote {
38 ($actor:ty) => {
39 $crate::paste! {
40 static [<$actor:snake:upper _NAME>]: std::sync::LazyLock<&'static str> =
41 std::sync::LazyLock::new(|| <$actor as $crate::data::Named>::typename());
42 $crate::submit! {
43 $crate::actor::remote::SpawnableActor {
44 name: &[<$actor:snake:upper _NAME>],
45 gspawn: <$actor as $crate::actor::RemotableActor>::gspawn,
46 get_type_id: <$actor as $crate::actor::RemotableActor>::get_type_id,
47 }
48 }
49 }
50 };
51}
52
53#[derive(Debug)]
56pub struct SpawnableActor {
57 pub name: &'static LazyLock<&'static str>,
63
64 pub gspawn: fn(
66 &Proc,
67 &str,
68 Data,
69 ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>>,
70
71 pub get_type_id: fn() -> TypeId,
74}
75
76inventory::collect!(SpawnableActor);
77
78#[derive(Debug)]
81pub struct Remote {
82 by_name: HashMap<&'static str, &'static SpawnableActor>,
83 by_type_id: HashMap<TypeId, &'static SpawnableActor>,
84}
85
86impl Remote {
87 pub fn collect() -> Self {
89 let mut by_name = HashMap::new();
90 let mut by_type_id = HashMap::new();
91 for entry in inventory::iter::<SpawnableActor> {
92 if by_name.insert(**entry.name, entry).is_some() {
93 panic!("actor name {} registered multiple times", **entry.name);
94 }
95 let type_id = (entry.get_type_id)();
96 if by_type_id.insert(type_id, entry).is_some() {
97 panic!(
98 "type id {:?} ({}) registered multiple times",
99 type_id, **entry.name
100 );
101 }
102 }
103 Self {
104 by_name,
105 by_type_id,
106 }
107 }
108
109 pub fn name_of<A: Actor>(&self) -> Option<&'static str> {
111 self.by_type_id
112 .get(&TypeId::of::<A>())
113 .map(|entry| **entry.name)
114 }
115
116 pub async fn gspawn(
120 &self,
121 proc: &Proc,
122 actor_type: &str,
123 actor_name: &str,
124 params: Data,
125 ) -> Result<ActorId, anyhow::Error> {
126 let entry = self
127 .by_name
128 .get(actor_type)
129 .ok_or_else(|| anyhow::anyhow!("actor type {} not registered", actor_type))?;
130 (entry.gspawn)(proc, actor_name, params).await
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use std::assert_matches::assert_matches;
137
138 use async_trait::async_trait;
139
140 use super::*;
141 use crate as hyperactor; use crate::Context;
143 use crate::Handler;
144
145 #[derive(Debug)]
146 #[hyperactor::export(handlers = [()])]
147 struct MyActor;
148
149 #[async_trait]
150 impl Actor for MyActor {
151 type Params = bool;
152
153 async fn new(params: bool) -> Result<Self, anyhow::Error> {
154 if params {
155 Ok(MyActor)
156 } else {
157 Err(anyhow::anyhow!("some failure"))
158 }
159 }
160 }
161
162 #[async_trait]
163 impl Handler<()> for MyActor {
164 async fn handle(&mut self, _cx: &Context<Self>, _message: ()) -> anyhow::Result<()> {
165 unimplemented!()
166 }
167 }
168
169 remote!(MyActor);
170
171 #[tokio::test]
172 async fn test_registry() {
173 let remote = Remote::collect();
174 assert_matches!(
175 remote.name_of::<MyActor>(),
176 Some("hyperactor::actor::remote::tests::MyActor")
177 );
178
179 let _ = remote
180 .gspawn(
181 &Proc::local(),
182 "hyperactor::actor::remote::tests::MyActor",
183 "actor",
184 bincode::serialize(&true).unwrap(),
185 )
186 .await
187 .unwrap();
188
189 let err = remote
190 .gspawn(
191 &Proc::local(),
192 "hyperactor::actor::remote::tests::MyActor",
193 "actor",
194 bincode::serialize(&false).unwrap(),
195 )
196 .await
197 .unwrap_err();
198
199 assert_eq!(err.to_string().as_str(), "some failure");
200 }
201}