hyperactor/
remote.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Remote references: typed wrappers around [`ActorRef`] or [`PortRef`] for type-safe message sending.
10
11use std::fmt;
12use std::marker::PhantomData;
13use std::str::FromStr;
14
15use serde::Deserialize;
16use serde::Serialize;
17use typeuri::Named;
18
19use crate::actor::Referable;
20use crate::id::ActorId;
21use crate::id::Label;
22use crate::mailbox::RemoteMessage;
23use crate::ref_::ActorRef;
24use crate::ref_::Location;
25use crate::ref_::PortRef;
26use crate::ref_::RefParseError;
27
28/// Marker trait: `Remote<T>` can send message `M` when `T: Accepts<M>`.
29///
30/// - For actor/behavior types: generated by `#[export]`/`behavior!` (one impl per handled message).
31/// - For message types: blanket `impl<M: RemoteMessage> Accepts<M> for M {}` covers identity.
32pub trait Accepts<M: RemoteMessage>: Named {}
33
34impl<M: RemoteMessage> Accepts<M> for M {}
35
36/// The inner reference: either an actor or a port.
37#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub(crate) enum RemoteRef {
39    Actor(ActorRef),
40    Port(PortRef),
41}
42
43impl RemoteRef {
44    /// Returns the actor id.
45    pub(crate) fn actor_id(&self) -> &ActorId {
46        match self {
47            RemoteRef::Actor(r) => r.id(),
48            RemoteRef::Port(r) => r.actor_id(),
49        }
50    }
51
52    /// Returns the location.
53    pub(crate) fn location(&self) -> &Location {
54        match self {
55            RemoteRef::Actor(r) => r.location(),
56            RemoteRef::Port(r) => r.location(),
57        }
58    }
59}
60
61impl fmt::Display for RemoteRef {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        match self {
64            RemoteRef::Actor(r) => fmt::Display::fmt(r, f),
65            RemoteRef::Port(r) => fmt::Display::fmt(r, f),
66        }
67    }
68}
69
70/// A typed remote actor reference, wrapping an [`ActorRef`] or [`PortRef`]
71/// with a type parameter for type-safe message sending.
72#[derive(Clone, Serialize, Deserialize, typeuri::Named)]
73pub struct Remote<A: Named> {
74    inner: RemoteRef,
75    #[serde(skip)]
76    _phantom: PhantomData<fn() -> A>,
77}
78
79impl<A: Named> Remote<A> {
80    /// Create a new [`Remote`] reference from an [`ActorRef`].
81    pub fn new(actor_ref: ActorRef) -> Self {
82        Self {
83            inner: RemoteRef::Actor(actor_ref),
84            _phantom: PhantomData,
85        }
86    }
87
88    /// Create a new [`Remote`] reference from a [`PortRef`].
89    pub fn from_port(port_ref: PortRef) -> Self {
90        Self {
91            inner: RemoteRef::Port(port_ref),
92            _phantom: PhantomData,
93        }
94    }
95
96    /// Returns the actor id.
97    pub fn actor_id(&self) -> &ActorId {
98        self.inner.actor_id()
99    }
100
101    /// Returns the location.
102    pub fn location(&self) -> &Location {
103        self.inner.location()
104    }
105
106    /// Send a message to this remote.
107    ///
108    /// For actor-typed remotes (`Remote<MyActor>`), `A: Accepts<M>` is
109    /// generated by `#[export]` / `behavior!`.
110    /// For message-typed remotes (`Remote<Ping>`), the blanket
111    /// `impl<M: RemoteMessage> Accepts<M> for M` covers identity.
112    pub fn send<M: RemoteMessage>(&self, _message: M) -> Result<(), anyhow::Error>
113    where
114        A: Accepts<M>,
115    {
116        // No-op: actual send implementation deferred to channel integration.
117        Ok(())
118    }
119}
120
121impl<A: Referable> Remote<A> {
122    /// Narrow to a single-message remote for message type `M`.
123    pub fn narrow<M: RemoteMessage>(self) -> Remote<M>
124    where
125        A: Accepts<M>,
126    {
127        Remote {
128            inner: self.inner,
129            _phantom: PhantomData,
130        }
131    }
132}
133
134impl<A: Named> PartialEq for Remote<A> {
135    fn eq(&self, other: &Self) -> bool {
136        self.inner == other.inner
137    }
138}
139
140impl<A: Named> Eq for Remote<A> {}
141
142impl<A: Named> std::hash::Hash for Remote<A> {
143    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
144        self.inner.hash(state);
145    }
146}
147
148impl<A: Named> PartialOrd for Remote<A> {
149    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
150        Some(self.cmp(other))
151    }
152}
153
154impl<A: Named> Ord for Remote<A> {
155    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
156        self.inner.cmp(&other.inner)
157    }
158}
159
160impl<A: Named> fmt::Display for Remote<A> {
161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162        fmt::Display::fmt(&self.inner, f)
163    }
164}
165
166fn fmt_remote_debug(
167    f: &mut fmt::Formatter<'_>,
168    short_type: &str,
169    actor_label: Option<&Label>,
170    proc_label: Option<&Label>,
171    id_display: &dyn fmt::Display,
172    location: &Location,
173) -> fmt::Result {
174    match (actor_label, proc_label) {
175        (Some(actor_label), Some(proc_label)) => {
176            write!(
177                f,
178                "<'{}<{}>.{}' {}@{}>",
179                actor_label, short_type, proc_label, id_display, location
180            )
181        }
182        (Some(actor_label), None) => {
183            write!(
184                f,
185                "<'{}<{}>' {}@{}>",
186                actor_label, short_type, id_display, location
187            )
188        }
189        (None, Some(proc_label)) => {
190            write!(
191                f,
192                "<'<{}>.{}' {}@{}>",
193                short_type, proc_label, id_display, location
194            )
195        }
196        (None, None) => {
197            write!(f, "<'<{}>' {}@{}>", short_type, id_display, location)
198        }
199    }
200}
201
202impl<A: Named> fmt::Debug for Remote<A> {
203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204        let typename = A::typename();
205        let short = typename.rsplit("::").next().unwrap_or(typename);
206        let actor_id = self.inner.actor_id();
207        match &self.inner {
208            RemoteRef::Actor(r) => fmt_remote_debug(
209                f,
210                short,
211                actor_id.label(),
212                actor_id.proc_id().label(),
213                r.id(),
214                r.location(),
215            ),
216            RemoteRef::Port(r) => fmt_remote_debug(
217                f,
218                short,
219                actor_id.label(),
220                actor_id.proc_id().label(),
221                r.id(),
222                r.location(),
223            ),
224        }
225    }
226}
227
228impl<A: Named> FromStr for Remote<A> {
229    type Err = RefParseError;
230
231    fn from_str(s: &str) -> Result<Self, Self::Err> {
232        let at = s.find('@').ok_or(RefParseError::MissingSeparator)?;
233        let id_part = &s[..at];
234        if id_part.contains(':') {
235            let port_ref: PortRef = s.parse()?;
236            Ok(Self::from_port(port_ref))
237        } else {
238            let actor_ref: ActorRef = s.parse()?;
239            Ok(Self::new(actor_ref))
240        }
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use std::hash::Hash;
247
248    use serde::Deserialize;
249    use serde::Serialize;
250
251    use super::*;
252    use crate::actor::Referable;
253    use crate::channel::ChannelAddr;
254    use crate::id::Label;
255    use crate::id::PortId;
256    use crate::id::ProcId;
257    use crate::id::Uid;
258    use crate::port::Port;
259
260    #[derive(Clone, Serialize, Deserialize, typeuri::Named)]
261    struct MyWorker;
262
263    // A test message type.
264    #[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
265    struct Ping;
266
267    #[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
268    struct Pong;
269
270    // A test actor type (manually impl the traits that #[export] would generate).
271    #[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
272    struct TestActor;
273    impl Referable for TestActor {}
274    impl Accepts<Ping> for TestActor {}
275    impl Accepts<Pong> for TestActor {}
276
277    fn make_actor_ref(actor_label: Option<&str>, proc_label: Option<&str>) -> ActorRef {
278        let aid = ActorId::new(
279            Uid::Instance(0xabc123),
280            ProcId::new(
281                Uid::Instance(0xdef456),
282                proc_label.map(|l| Label::new(l).unwrap()),
283            ),
284            actor_label.map(|l| Label::new(l).unwrap()),
285        );
286        let loc: Location = ChannelAddr::Local(42).into();
287        ActorRef::new(aid, loc)
288    }
289
290    fn make_port_ref(actor_label: Option<&str>, proc_label: Option<&str>) -> PortRef {
291        let aid = ActorId::new(
292            Uid::Instance(0xabc123),
293            ProcId::new(
294                Uid::Instance(0xdef456),
295                proc_label.map(|l| Label::new(l).unwrap()),
296            ),
297            actor_label.map(|l| Label::new(l).unwrap()),
298        );
299        let port_id = PortId::new(aid, Port::from(42));
300        let loc: Location = ChannelAddr::Local(7).into();
301        PortRef::new(port_id, loc)
302    }
303
304    fn make_remote<A: Named>() -> Remote<A> {
305        Remote::new(make_actor_ref(Some("my-actor"), Some("my-proc")))
306    }
307
308    fn make_port_remote<A: Named>() -> Remote<A> {
309        Remote::from_port(make_port_ref(Some("my-actor"), Some("my-proc")))
310    }
311
312    // --- Actor variant tests ---
313
314    #[test]
315    fn test_remote_construction_and_accessors() {
316        let aref = make_actor_ref(Some("my-actor"), Some("my-proc"));
317        let remote = Remote::<MyWorker>::new(aref.clone());
318        assert_eq!(remote.actor_id(), aref.id());
319        assert_eq!(remote.location(), aref.location());
320    }
321
322    #[test]
323    fn test_remote_display_matches_actor_ref() {
324        let aref = make_actor_ref(Some("my-actor"), Some("my-proc"));
325        let remote = Remote::<MyWorker>::new(aref.clone());
326        assert_eq!(remote.to_string(), aref.to_string());
327    }
328
329    #[test]
330    fn test_remote_debug_all_labels() {
331        let remote = Remote::<MyWorker>::new(make_actor_ref(Some("my-actor"), Some("my-proc")));
332        assert_eq!(
333            format!("{:?}", remote),
334            "<'my-actor<MyWorker>.my-proc' 0000000000abc123.0000000000def456@inproc://42>"
335        );
336    }
337
338    #[test]
339    fn test_remote_debug_actor_label_only() {
340        let remote = Remote::<MyWorker>::new(make_actor_ref(Some("my-actor"), None));
341        assert_eq!(
342            format!("{:?}", remote),
343            "<'my-actor<MyWorker>' 0000000000abc123.0000000000def456@inproc://42>"
344        );
345    }
346
347    #[test]
348    fn test_remote_debug_no_labels() {
349        let remote = Remote::<MyWorker>::new(make_actor_ref(None, None));
350        assert_eq!(
351            format!("{:?}", remote),
352            "<'<MyWorker>' 0000000000abc123.0000000000def456@inproc://42>"
353        );
354    }
355
356    #[test]
357    fn test_remote_debug_proc_label_only() {
358        let remote = Remote::<MyWorker>::new(make_actor_ref(None, Some("my-proc")));
359        assert_eq!(
360            format!("{:?}", remote),
361            "<'<MyWorker>.my-proc' 0000000000abc123.0000000000def456@inproc://42>"
362        );
363    }
364
365    #[test]
366    fn test_remote_fromstr_roundtrip() {
367        let aref = make_actor_ref(Some("my-actor"), Some("my-proc"));
368        let remote = Remote::<MyWorker>::new(aref);
369        let s = remote.to_string();
370        let parsed: Remote<MyWorker> = s.parse().unwrap();
371        assert_eq!(remote, parsed);
372    }
373
374    #[test]
375    fn test_remote_serde_roundtrip() {
376        let remote = Remote::<MyWorker>::new(make_actor_ref(Some("my-actor"), Some("my-proc")));
377        let json = serde_json::to_string(&remote).unwrap();
378        let parsed: Remote<MyWorker> = serde_json::from_str(&json).unwrap();
379        assert_eq!(remote, parsed);
380    }
381
382    #[test]
383    fn test_remote_eq_and_hash() {
384        use std::collections::hash_map::DefaultHasher;
385        use std::hash::Hasher;
386
387        let aref = make_actor_ref(Some("actor"), Some("proc"));
388        let a = Remote::<MyWorker>::new(aref.clone());
389        let b = Remote::<MyWorker>::new(aref);
390        assert_eq!(a, b);
391
392        let hash = |r: &Remote<MyWorker>| {
393            let mut h = DefaultHasher::new();
394            r.hash(&mut h);
395            h.finish()
396        };
397        assert_eq!(hash(&a), hash(&b));
398    }
399
400    #[test]
401    fn test_remote_ord() {
402        let a = Remote::<MyWorker>::new(make_actor_ref(Some("a"), None));
403        let b = Remote::<MyWorker>::new(make_actor_ref(Some("b"), None));
404        // Same ids, so ordering should be equal.
405        assert_eq!(a.cmp(&b), std::cmp::Ordering::Equal);
406    }
407
408    #[test]
409    fn test_send_actor_typed_remote() {
410        let remote: Remote<TestActor> = make_remote();
411        remote.send(Ping).unwrap();
412        remote.send(Pong).unwrap();
413    }
414
415    #[test]
416    fn test_send_message_typed_remote() {
417        let remote: Remote<Ping> = make_remote();
418        remote.send(Ping).unwrap();
419    }
420
421    #[test]
422    fn test_narrow() {
423        let actor_remote: Remote<TestActor> = make_remote();
424        let narrowed: Remote<Ping> = actor_remote.narrow();
425        narrowed.send(Ping).unwrap();
426    }
427
428    // --- Port variant tests ---
429
430    #[test]
431    fn test_port_remote_construction_and_accessors() {
432        let pref = make_port_ref(Some("my-actor"), Some("my-proc"));
433        let remote = Remote::<MyWorker>::from_port(pref.clone());
434        assert_eq!(remote.actor_id(), pref.actor_id());
435        assert_eq!(remote.location(), pref.location());
436    }
437
438    #[test]
439    fn test_port_remote_display_matches_port_ref() {
440        let pref = make_port_ref(Some("my-actor"), Some("my-proc"));
441        let remote = Remote::<MyWorker>::from_port(pref.clone());
442        assert_eq!(remote.to_string(), pref.to_string());
443    }
444
445    #[test]
446    fn test_port_remote_debug_all_labels() {
447        let remote =
448            Remote::<MyWorker>::from_port(make_port_ref(Some("my-actor"), Some("my-proc")));
449        assert_eq!(
450            format!("{:?}", remote),
451            "<'my-actor<MyWorker>.my-proc' 0000000000abc123.0000000000def456:42@inproc://7>"
452        );
453    }
454
455    #[test]
456    fn test_port_remote_debug_no_labels() {
457        let remote = Remote::<MyWorker>::from_port(make_port_ref(None, None));
458        assert_eq!(
459            format!("{:?}", remote),
460            "<'<MyWorker>' 0000000000abc123.0000000000def456:42@inproc://7>"
461        );
462    }
463
464    #[test]
465    fn test_port_remote_fromstr_roundtrip() {
466        let pref = make_port_ref(Some("my-actor"), Some("my-proc"));
467        let remote = Remote::<MyWorker>::from_port(pref);
468        let s = remote.to_string();
469        let parsed: Remote<MyWorker> = s.parse().unwrap();
470        assert_eq!(remote, parsed);
471    }
472
473    #[test]
474    fn test_port_remote_serde_roundtrip() {
475        let remote =
476            Remote::<MyWorker>::from_port(make_port_ref(Some("my-actor"), Some("my-proc")));
477        let json = serde_json::to_string(&remote).unwrap();
478        let parsed: Remote<MyWorker> = serde_json::from_str(&json).unwrap();
479        assert_eq!(remote, parsed);
480    }
481
482    #[test]
483    fn test_port_remote_eq_and_hash() {
484        use std::collections::hash_map::DefaultHasher;
485        use std::hash::Hasher;
486
487        let pref = make_port_ref(Some("actor"), Some("proc"));
488        let a = Remote::<MyWorker>::from_port(pref.clone());
489        let b = Remote::<MyWorker>::from_port(pref);
490        assert_eq!(a, b);
491
492        let hash = |r: &Remote<MyWorker>| {
493            let mut h = DefaultHasher::new();
494            r.hash(&mut h);
495            h.finish()
496        };
497        assert_eq!(hash(&a), hash(&b));
498    }
499
500    #[test]
501    fn test_actor_and_port_remote_not_equal() {
502        let actor_remote: Remote<MyWorker> = make_remote();
503        let port_remote: Remote<MyWorker> = make_port_remote();
504        assert_ne!(actor_remote, port_remote);
505    }
506
507    #[test]
508    fn test_narrow_port_variant() {
509        let actor_remote: Remote<TestActor> = make_port_remote();
510        let narrowed: Remote<Ping> = actor_remote.narrow();
511        narrowed.send(Ping).unwrap();
512    }
513
514    #[test]
515    fn test_send_port_typed_remote() {
516        let remote: Remote<TestActor> = make_port_remote();
517        remote.send(Ping).unwrap();
518        remote.send(Pong).unwrap();
519    }
520}