1use std::fmt;
12use std::marker::PhantomData;
13use std::str::FromStr;
14
15use serde::Deserialize;
16use serde::Serialize;
17use typeuri::Named;
18
19use crate::actor::Referable;
20use crate::id::ActorId;
21use crate::id::Label;
22use crate::mailbox::RemoteMessage;
23use crate::ref_::ActorRef;
24use crate::ref_::Location;
25use crate::ref_::PortRef;
26use crate::ref_::RefParseError;
27
28pub trait Accepts<M: RemoteMessage>: Named {}
33
34impl<M: RemoteMessage> Accepts<M> for M {}
35
36#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub(crate) enum RemoteRef {
39 Actor(ActorRef),
40 Port(PortRef),
41}
42
43impl RemoteRef {
44 pub(crate) fn actor_id(&self) -> &ActorId {
46 match self {
47 RemoteRef::Actor(r) => r.id(),
48 RemoteRef::Port(r) => r.actor_id(),
49 }
50 }
51
52 pub(crate) fn location(&self) -> &Location {
54 match self {
55 RemoteRef::Actor(r) => r.location(),
56 RemoteRef::Port(r) => r.location(),
57 }
58 }
59}
60
61impl fmt::Display for RemoteRef {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 match self {
64 RemoteRef::Actor(r) => fmt::Display::fmt(r, f),
65 RemoteRef::Port(r) => fmt::Display::fmt(r, f),
66 }
67 }
68}
69
70#[derive(Clone, Serialize, Deserialize, typeuri::Named)]
73pub struct Remote<A: Named> {
74 inner: RemoteRef,
75 #[serde(skip)]
76 _phantom: PhantomData<fn() -> A>,
77}
78
79impl<A: Named> Remote<A> {
80 pub fn new(actor_ref: ActorRef) -> Self {
82 Self {
83 inner: RemoteRef::Actor(actor_ref),
84 _phantom: PhantomData,
85 }
86 }
87
88 pub fn from_port(port_ref: PortRef) -> Self {
90 Self {
91 inner: RemoteRef::Port(port_ref),
92 _phantom: PhantomData,
93 }
94 }
95
96 pub fn actor_id(&self) -> &ActorId {
98 self.inner.actor_id()
99 }
100
101 pub fn location(&self) -> &Location {
103 self.inner.location()
104 }
105
106 pub fn send<M: RemoteMessage>(&self, _message: M) -> Result<(), anyhow::Error>
113 where
114 A: Accepts<M>,
115 {
116 Ok(())
118 }
119}
120
121impl<A: Referable> Remote<A> {
122 pub fn narrow<M: RemoteMessage>(self) -> Remote<M>
124 where
125 A: Accepts<M>,
126 {
127 Remote {
128 inner: self.inner,
129 _phantom: PhantomData,
130 }
131 }
132}
133
134impl<A: Named> PartialEq for Remote<A> {
135 fn eq(&self, other: &Self) -> bool {
136 self.inner == other.inner
137 }
138}
139
140impl<A: Named> Eq for Remote<A> {}
141
142impl<A: Named> std::hash::Hash for Remote<A> {
143 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
144 self.inner.hash(state);
145 }
146}
147
148impl<A: Named> PartialOrd for Remote<A> {
149 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
150 Some(self.cmp(other))
151 }
152}
153
154impl<A: Named> Ord for Remote<A> {
155 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
156 self.inner.cmp(&other.inner)
157 }
158}
159
160impl<A: Named> fmt::Display for Remote<A> {
161 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162 fmt::Display::fmt(&self.inner, f)
163 }
164}
165
166fn fmt_remote_debug(
167 f: &mut fmt::Formatter<'_>,
168 short_type: &str,
169 actor_label: Option<&Label>,
170 proc_label: Option<&Label>,
171 id_display: &dyn fmt::Display,
172 location: &Location,
173) -> fmt::Result {
174 match (actor_label, proc_label) {
175 (Some(actor_label), Some(proc_label)) => {
176 write!(
177 f,
178 "<'{}<{}>.{}' {}@{}>",
179 actor_label, short_type, proc_label, id_display, location
180 )
181 }
182 (Some(actor_label), None) => {
183 write!(
184 f,
185 "<'{}<{}>' {}@{}>",
186 actor_label, short_type, id_display, location
187 )
188 }
189 (None, Some(proc_label)) => {
190 write!(
191 f,
192 "<'<{}>.{}' {}@{}>",
193 short_type, proc_label, id_display, location
194 )
195 }
196 (None, None) => {
197 write!(f, "<'<{}>' {}@{}>", short_type, id_display, location)
198 }
199 }
200}
201
202impl<A: Named> fmt::Debug for Remote<A> {
203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204 let typename = A::typename();
205 let short = typename.rsplit("::").next().unwrap_or(typename);
206 let actor_id = self.inner.actor_id();
207 match &self.inner {
208 RemoteRef::Actor(r) => fmt_remote_debug(
209 f,
210 short,
211 actor_id.label(),
212 actor_id.proc_id().label(),
213 r.id(),
214 r.location(),
215 ),
216 RemoteRef::Port(r) => fmt_remote_debug(
217 f,
218 short,
219 actor_id.label(),
220 actor_id.proc_id().label(),
221 r.id(),
222 r.location(),
223 ),
224 }
225 }
226}
227
228impl<A: Named> FromStr for Remote<A> {
229 type Err = RefParseError;
230
231 fn from_str(s: &str) -> Result<Self, Self::Err> {
232 let at = s.find('@').ok_or(RefParseError::MissingSeparator)?;
233 let id_part = &s[..at];
234 if id_part.contains(':') {
235 let port_ref: PortRef = s.parse()?;
236 Ok(Self::from_port(port_ref))
237 } else {
238 let actor_ref: ActorRef = s.parse()?;
239 Ok(Self::new(actor_ref))
240 }
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use std::hash::Hash;
247
248 use serde::Deserialize;
249 use serde::Serialize;
250
251 use super::*;
252 use crate::actor::Referable;
253 use crate::channel::ChannelAddr;
254 use crate::id::Label;
255 use crate::id::PortId;
256 use crate::id::ProcId;
257 use crate::id::Uid;
258 use crate::port::Port;
259
260 #[derive(Clone, Serialize, Deserialize, typeuri::Named)]
261 struct MyWorker;
262
263 #[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
265 struct Ping;
266
267 #[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
268 struct Pong;
269
270 #[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
272 struct TestActor;
273 impl Referable for TestActor {}
274 impl Accepts<Ping> for TestActor {}
275 impl Accepts<Pong> for TestActor {}
276
277 fn make_actor_ref(actor_label: Option<&str>, proc_label: Option<&str>) -> ActorRef {
278 let aid = ActorId::new(
279 Uid::Instance(0xabc123),
280 ProcId::new(
281 Uid::Instance(0xdef456),
282 proc_label.map(|l| Label::new(l).unwrap()),
283 ),
284 actor_label.map(|l| Label::new(l).unwrap()),
285 );
286 let loc: Location = ChannelAddr::Local(42).into();
287 ActorRef::new(aid, loc)
288 }
289
290 fn make_port_ref(actor_label: Option<&str>, proc_label: Option<&str>) -> PortRef {
291 let aid = ActorId::new(
292 Uid::Instance(0xabc123),
293 ProcId::new(
294 Uid::Instance(0xdef456),
295 proc_label.map(|l| Label::new(l).unwrap()),
296 ),
297 actor_label.map(|l| Label::new(l).unwrap()),
298 );
299 let port_id = PortId::new(aid, Port::from(42));
300 let loc: Location = ChannelAddr::Local(7).into();
301 PortRef::new(port_id, loc)
302 }
303
304 fn make_remote<A: Named>() -> Remote<A> {
305 Remote::new(make_actor_ref(Some("my-actor"), Some("my-proc")))
306 }
307
308 fn make_port_remote<A: Named>() -> Remote<A> {
309 Remote::from_port(make_port_ref(Some("my-actor"), Some("my-proc")))
310 }
311
312 #[test]
315 fn test_remote_construction_and_accessors() {
316 let aref = make_actor_ref(Some("my-actor"), Some("my-proc"));
317 let remote = Remote::<MyWorker>::new(aref.clone());
318 assert_eq!(remote.actor_id(), aref.id());
319 assert_eq!(remote.location(), aref.location());
320 }
321
322 #[test]
323 fn test_remote_display_matches_actor_ref() {
324 let aref = make_actor_ref(Some("my-actor"), Some("my-proc"));
325 let remote = Remote::<MyWorker>::new(aref.clone());
326 assert_eq!(remote.to_string(), aref.to_string());
327 }
328
329 #[test]
330 fn test_remote_debug_all_labels() {
331 let remote = Remote::<MyWorker>::new(make_actor_ref(Some("my-actor"), Some("my-proc")));
332 assert_eq!(
333 format!("{:?}", remote),
334 "<'my-actor<MyWorker>.my-proc' 0000000000abc123.0000000000def456@inproc://42>"
335 );
336 }
337
338 #[test]
339 fn test_remote_debug_actor_label_only() {
340 let remote = Remote::<MyWorker>::new(make_actor_ref(Some("my-actor"), None));
341 assert_eq!(
342 format!("{:?}", remote),
343 "<'my-actor<MyWorker>' 0000000000abc123.0000000000def456@inproc://42>"
344 );
345 }
346
347 #[test]
348 fn test_remote_debug_no_labels() {
349 let remote = Remote::<MyWorker>::new(make_actor_ref(None, None));
350 assert_eq!(
351 format!("{:?}", remote),
352 "<'<MyWorker>' 0000000000abc123.0000000000def456@inproc://42>"
353 );
354 }
355
356 #[test]
357 fn test_remote_debug_proc_label_only() {
358 let remote = Remote::<MyWorker>::new(make_actor_ref(None, Some("my-proc")));
359 assert_eq!(
360 format!("{:?}", remote),
361 "<'<MyWorker>.my-proc' 0000000000abc123.0000000000def456@inproc://42>"
362 );
363 }
364
365 #[test]
366 fn test_remote_fromstr_roundtrip() {
367 let aref = make_actor_ref(Some("my-actor"), Some("my-proc"));
368 let remote = Remote::<MyWorker>::new(aref);
369 let s = remote.to_string();
370 let parsed: Remote<MyWorker> = s.parse().unwrap();
371 assert_eq!(remote, parsed);
372 }
373
374 #[test]
375 fn test_remote_serde_roundtrip() {
376 let remote = Remote::<MyWorker>::new(make_actor_ref(Some("my-actor"), Some("my-proc")));
377 let json = serde_json::to_string(&remote).unwrap();
378 let parsed: Remote<MyWorker> = serde_json::from_str(&json).unwrap();
379 assert_eq!(remote, parsed);
380 }
381
382 #[test]
383 fn test_remote_eq_and_hash() {
384 use std::collections::hash_map::DefaultHasher;
385 use std::hash::Hasher;
386
387 let aref = make_actor_ref(Some("actor"), Some("proc"));
388 let a = Remote::<MyWorker>::new(aref.clone());
389 let b = Remote::<MyWorker>::new(aref);
390 assert_eq!(a, b);
391
392 let hash = |r: &Remote<MyWorker>| {
393 let mut h = DefaultHasher::new();
394 r.hash(&mut h);
395 h.finish()
396 };
397 assert_eq!(hash(&a), hash(&b));
398 }
399
400 #[test]
401 fn test_remote_ord() {
402 let a = Remote::<MyWorker>::new(make_actor_ref(Some("a"), None));
403 let b = Remote::<MyWorker>::new(make_actor_ref(Some("b"), None));
404 assert_eq!(a.cmp(&b), std::cmp::Ordering::Equal);
406 }
407
408 #[test]
409 fn test_send_actor_typed_remote() {
410 let remote: Remote<TestActor> = make_remote();
411 remote.send(Ping).unwrap();
412 remote.send(Pong).unwrap();
413 }
414
415 #[test]
416 fn test_send_message_typed_remote() {
417 let remote: Remote<Ping> = make_remote();
418 remote.send(Ping).unwrap();
419 }
420
421 #[test]
422 fn test_narrow() {
423 let actor_remote: Remote<TestActor> = make_remote();
424 let narrowed: Remote<Ping> = actor_remote.narrow();
425 narrowed.send(Ping).unwrap();
426 }
427
428 #[test]
431 fn test_port_remote_construction_and_accessors() {
432 let pref = make_port_ref(Some("my-actor"), Some("my-proc"));
433 let remote = Remote::<MyWorker>::from_port(pref.clone());
434 assert_eq!(remote.actor_id(), pref.actor_id());
435 assert_eq!(remote.location(), pref.location());
436 }
437
438 #[test]
439 fn test_port_remote_display_matches_port_ref() {
440 let pref = make_port_ref(Some("my-actor"), Some("my-proc"));
441 let remote = Remote::<MyWorker>::from_port(pref.clone());
442 assert_eq!(remote.to_string(), pref.to_string());
443 }
444
445 #[test]
446 fn test_port_remote_debug_all_labels() {
447 let remote =
448 Remote::<MyWorker>::from_port(make_port_ref(Some("my-actor"), Some("my-proc")));
449 assert_eq!(
450 format!("{:?}", remote),
451 "<'my-actor<MyWorker>.my-proc' 0000000000abc123.0000000000def456:42@inproc://7>"
452 );
453 }
454
455 #[test]
456 fn test_port_remote_debug_no_labels() {
457 let remote = Remote::<MyWorker>::from_port(make_port_ref(None, None));
458 assert_eq!(
459 format!("{:?}", remote),
460 "<'<MyWorker>' 0000000000abc123.0000000000def456:42@inproc://7>"
461 );
462 }
463
464 #[test]
465 fn test_port_remote_fromstr_roundtrip() {
466 let pref = make_port_ref(Some("my-actor"), Some("my-proc"));
467 let remote = Remote::<MyWorker>::from_port(pref);
468 let s = remote.to_string();
469 let parsed: Remote<MyWorker> = s.parse().unwrap();
470 assert_eq!(remote, parsed);
471 }
472
473 #[test]
474 fn test_port_remote_serde_roundtrip() {
475 let remote =
476 Remote::<MyWorker>::from_port(make_port_ref(Some("my-actor"), Some("my-proc")));
477 let json = serde_json::to_string(&remote).unwrap();
478 let parsed: Remote<MyWorker> = serde_json::from_str(&json).unwrap();
479 assert_eq!(remote, parsed);
480 }
481
482 #[test]
483 fn test_port_remote_eq_and_hash() {
484 use std::collections::hash_map::DefaultHasher;
485 use std::hash::Hasher;
486
487 let pref = make_port_ref(Some("actor"), Some("proc"));
488 let a = Remote::<MyWorker>::from_port(pref.clone());
489 let b = Remote::<MyWorker>::from_port(pref);
490 assert_eq!(a, b);
491
492 let hash = |r: &Remote<MyWorker>| {
493 let mut h = DefaultHasher::new();
494 r.hash(&mut h);
495 h.finish()
496 };
497 assert_eq!(hash(&a), hash(&b));
498 }
499
500 #[test]
501 fn test_actor_and_port_remote_not_equal() {
502 let actor_remote: Remote<MyWorker> = make_remote();
503 let port_remote: Remote<MyWorker> = make_port_remote();
504 assert_ne!(actor_remote, port_remote);
505 }
506
507 #[test]
508 fn test_narrow_port_variant() {
509 let actor_remote: Remote<TestActor> = make_port_remote();
510 let narrowed: Remote<Ping> = actor_remote.narrow();
511 narrowed.send(Ping).unwrap();
512 }
513
514 #[test]
515 fn test_send_port_typed_remote() {
516 let remote: Remote<TestActor> = make_port_remote();
517 remote.send(Ping).unwrap();
518 remote.send(Pong).unwrap();
519 }
520}