Skip to main content

hyperactor/
remote.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Remote references: typed wrappers around [`ActorAddr`] or [`PortAddr`] for type-safe message sending.
10
11use std::fmt;
12use std::marker::PhantomData;
13use std::str::FromStr;
14
15use serde::Deserialize;
16use serde::Serialize;
17use typeuri::Named;
18
19use crate::ActorAddr;
20use crate::AddrParseError;
21use crate::Location;
22use crate::PortAddr;
23use crate::actor::Referable;
24use crate::id::ActorId;
25use crate::id::Label;
26use crate::mailbox::RemoteMessage;
27
28/// Marker trait: `Remote<T>` can send message `M` when `T: Accepts<M>`.
29///
30/// - For actor/behavior types: generated by `#[export]`/`behavior!` (one impl per handled message).
31/// - For message types: blanket `impl<M: RemoteMessage> Accepts<M> for M {}` covers identity.
32pub trait Accepts<M: RemoteMessage>: Named {}
33
34impl<M: RemoteMessage> Accepts<M> for M {}
35
36/// The inner reference: either an actor or a port.
37#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub(crate) enum RemoteRef {
39    Actor(ActorAddr),
40    Port(PortAddr),
41}
42
43impl RemoteRef {
44    /// Returns the actor id.
45    pub(crate) fn actor_id(&self) -> &ActorId {
46        match self {
47            RemoteRef::Actor(r) => r.id(),
48            RemoteRef::Port(r) => r.actor_id(),
49        }
50    }
51
52    /// Returns the location.
53    pub(crate) fn location(&self) -> &Location {
54        match self {
55            RemoteRef::Actor(r) => r.location(),
56            RemoteRef::Port(r) => r.location(),
57        }
58    }
59}
60
61impl fmt::Display for RemoteRef {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        match self {
64            RemoteRef::Actor(r) => fmt::Display::fmt(r, f),
65            RemoteRef::Port(r) => fmt::Display::fmt(r, f),
66        }
67    }
68}
69
70/// A typed remote actor reference, wrapping an [`ActorAddr`] or [`PortAddr`]
71/// with a type parameter for type-safe message sending.
72#[derive(Clone, Serialize, Deserialize, typeuri::Named)]
73pub struct Remote<A: Named> {
74    inner: RemoteRef,
75    #[serde(skip)]
76    _phantom: PhantomData<fn() -> A>,
77}
78
79impl<A: Named> Remote<A> {
80    /// Create a new [`Remote`] reference from an [`ActorAddr`].
81    pub fn new(actor_ref: ActorAddr) -> Self {
82        Self {
83            inner: RemoteRef::Actor(actor_ref),
84            _phantom: PhantomData,
85        }
86    }
87
88    /// Create a new [`Remote`] reference from a [`PortAddr`].
89    pub fn from_port(port_ref: PortAddr) -> Self {
90        Self {
91            inner: RemoteRef::Port(port_ref),
92            _phantom: PhantomData,
93        }
94    }
95
96    /// Returns the actor id.
97    pub fn actor_id(&self) -> &ActorId {
98        self.inner.actor_id()
99    }
100
101    /// Returns the location.
102    pub fn location(&self) -> &Location {
103        self.inner.location()
104    }
105
106    /// Send a message to this remote.
107    ///
108    /// For actor-typed remotes (`Remote<MyActor>`), `A: Accepts<M>` is
109    /// generated by `#[export]` / `behavior!`.
110    /// For message-typed remotes (`Remote<Ping>`), the blanket
111    /// `impl<M: RemoteMessage> Accepts<M> for M` covers identity.
112    pub fn send<M: RemoteMessage>(&self, _message: M) -> Result<(), anyhow::Error>
113    where
114        A: Accepts<M>,
115    {
116        // No-op: actual send implementation deferred to channel integration.
117        Ok(())
118    }
119}
120
121impl<A: Referable> Remote<A> {
122    /// Narrow to a single-message remote for message type `M`.
123    pub fn narrow<M: RemoteMessage>(self) -> Remote<M>
124    where
125        A: Accepts<M>,
126    {
127        Remote {
128            inner: self.inner,
129            _phantom: PhantomData,
130        }
131    }
132}
133
134impl<A: Named> PartialEq for Remote<A> {
135    fn eq(&self, other: &Self) -> bool {
136        self.inner == other.inner
137    }
138}
139
140impl<A: Named> Eq for Remote<A> {}
141
142impl<A: Named> std::hash::Hash for Remote<A> {
143    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
144        self.inner.hash(state);
145    }
146}
147
148impl<A: Named> PartialOrd for Remote<A> {
149    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
150        Some(self.cmp(other))
151    }
152}
153
154impl<A: Named> Ord for Remote<A> {
155    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
156        self.inner.cmp(&other.inner)
157    }
158}
159
160impl<A: Named> fmt::Display for Remote<A> {
161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162        fmt::Display::fmt(&self.inner, f)
163    }
164}
165
166fn fmt_remote_debug(
167    f: &mut fmt::Formatter<'_>,
168    short_type: &str,
169    actor_label: Option<&Label>,
170    proc_label: Option<&Label>,
171    id_display: &dyn fmt::Display,
172    location: &Location,
173) -> fmt::Result {
174    match (actor_label, proc_label) {
175        (Some(actor_label), Some(proc_label)) => {
176            write!(
177                f,
178                "<'{}<{}>.{}' {}@{}>",
179                actor_label, short_type, proc_label, id_display, location
180            )
181        }
182        (Some(actor_label), None) => {
183            write!(
184                f,
185                "<'{}<{}>' {}@{}>",
186                actor_label, short_type, id_display, location
187            )
188        }
189        (None, Some(proc_label)) => {
190            write!(
191                f,
192                "<'<{}>.{}' {}@{}>",
193                short_type, proc_label, id_display, location
194            )
195        }
196        (None, None) => {
197            write!(f, "<'<{}>' {}@{}>", short_type, id_display, location)
198        }
199    }
200}
201
202impl<A: Named> fmt::Debug for Remote<A> {
203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204        let typename = A::typename();
205        let short = typename.rsplit("::").next().unwrap_or(typename);
206        let actor_id = self.inner.actor_id();
207        match &self.inner {
208            RemoteRef::Actor(r) => fmt_remote_debug(
209                f,
210                short,
211                actor_id.label(),
212                actor_id.proc_id().label(),
213                r.id(),
214                r.location(),
215            ),
216            RemoteRef::Port(r) => fmt_remote_debug(
217                f,
218                short,
219                actor_id.label(),
220                actor_id.proc_id().label(),
221                r.id(),
222                r.location(),
223            ),
224        }
225    }
226}
227
228impl<A: Named> FromStr for Remote<A> {
229    type Err = AddrParseError;
230
231    fn from_str(s: &str) -> Result<Self, Self::Err> {
232        let at = s.find('@').ok_or(AddrParseError::MissingSeparator)?;
233        let id_part = &s[..at];
234        if id_part.contains(':') {
235            let port_ref: PortAddr = s.parse()?;
236            Ok(Self::from_port(port_ref))
237        } else {
238            let actor_ref: ActorAddr = s.parse()?;
239            Ok(Self::new(actor_ref))
240        }
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use std::hash::Hash;
247
248    use serde::Deserialize;
249    use serde::Serialize;
250
251    use super::*;
252    use crate::actor::Referable;
253    use crate::channel::ChannelAddr;
254    use crate::id::Label;
255    use crate::id::PortId;
256    use crate::id::ProcId;
257    use crate::id::Uid;
258    use crate::port::Port;
259
260    #[derive(Clone, Serialize, Deserialize, typeuri::Named)]
261    struct MyWorker;
262
263    // A test message type.
264    #[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
265    struct Ping;
266
267    #[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
268    struct Pong;
269
270    // A test actor type (manually impl the traits that #[export] would generate).
271    #[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
272    struct TestActor;
273    impl Referable for TestActor {}
274    impl Accepts<Ping> for TestActor {}
275    impl Accepts<Pong> for TestActor {}
276
277    fn make_actor_ref(actor_label: Option<&str>, proc_label: Option<&str>) -> ActorAddr {
278        let aid = ActorId::new(
279            Uid::Instance(0xabc123, None),
280            ProcId::new(
281                Uid::Instance(0xdef456, None),
282                proc_label.map(|l| Label::new(l).unwrap()),
283            ),
284            actor_label.map(|l| Label::new(l).unwrap()),
285        );
286        let loc: Location = ChannelAddr::Local(42).into();
287        ActorAddr::new(aid, loc)
288    }
289
290    fn make_port_ref(actor_label: Option<&str>, proc_label: Option<&str>) -> PortAddr {
291        let aid = ActorId::new(
292            Uid::Instance(0xabc123, None),
293            ProcId::new(
294                Uid::Instance(0xdef456, None),
295                proc_label.map(|l| Label::new(l).unwrap()),
296            ),
297            actor_label.map(|l| Label::new(l).unwrap()),
298        );
299        let port_id = PortId::new(aid, Port::from(42));
300        let loc: Location = ChannelAddr::Local(7).into();
301        PortAddr::new(port_id, loc)
302    }
303
304    fn make_remote<A: Named>() -> Remote<A> {
305        Remote::new(make_actor_ref(Some("my-actor"), Some("my-proc")))
306    }
307
308    fn make_port_remote<A: Named>() -> Remote<A> {
309        Remote::from_port(make_port_ref(Some("my-actor"), Some("my-proc")))
310    }
311
312    // --- Actor variant tests ---
313
314    #[test]
315    fn test_remote_construction_and_accessors() {
316        let aref = make_actor_ref(Some("my-actor"), Some("my-proc"));
317        let remote = Remote::<MyWorker>::new(aref.clone());
318        assert_eq!(remote.actor_id(), aref.id());
319        assert_eq!(remote.location(), aref.location());
320    }
321
322    #[test]
323    fn test_remote_display_matches_actor_ref() {
324        let aref = make_actor_ref(Some("my-actor"), Some("my-proc"));
325        let remote = Remote::<MyWorker>::new(aref.clone());
326        assert_eq!(remote.to_string(), aref.to_string());
327    }
328
329    #[test]
330    fn test_remote_debug_all_labels() {
331        let aref = make_actor_ref(Some("my-actor"), Some("my-proc"));
332        let remote = Remote::<MyWorker>::new(aref.clone());
333        assert_eq!(
334            format!("{:?}", remote),
335            format!("<'my-actor<MyWorker>.my-proc' {}>", aref)
336        );
337    }
338
339    #[test]
340    fn test_remote_debug_actor_label_only() {
341        let aref = make_actor_ref(Some("my-actor"), None);
342        let remote = Remote::<MyWorker>::new(aref.clone());
343        assert_eq!(
344            format!("{:?}", remote),
345            format!("<'my-actor<MyWorker>' {}>", aref)
346        );
347    }
348
349    #[test]
350    fn test_remote_debug_no_labels() {
351        let aref = make_actor_ref(None, None);
352        let remote = Remote::<MyWorker>::new(aref.clone());
353        assert_eq!(format!("{:?}", remote), format!("<'<MyWorker>' {}>", aref));
354    }
355
356    #[test]
357    fn test_remote_debug_proc_label_only() {
358        let aref = make_actor_ref(None, Some("my-proc"));
359        let remote = Remote::<MyWorker>::new(aref.clone());
360        assert_eq!(
361            format!("{:?}", remote),
362            format!("<'<MyWorker>.my-proc' {}>", aref)
363        );
364    }
365
366    #[test]
367    fn test_remote_fromstr_roundtrip() {
368        let aref = make_actor_ref(Some("my-actor"), Some("my-proc"));
369        let remote = Remote::<MyWorker>::new(aref);
370        let s = remote.to_string();
371        let parsed: Remote<MyWorker> = s.parse().unwrap();
372        assert_eq!(remote, parsed);
373    }
374
375    #[test]
376    fn test_remote_serde_roundtrip() {
377        let remote = Remote::<MyWorker>::new(make_actor_ref(Some("my-actor"), Some("my-proc")));
378        let json = serde_json::to_string(&remote).unwrap();
379        let parsed: Remote<MyWorker> = serde_json::from_str(&json).unwrap();
380        assert_eq!(remote, parsed);
381    }
382
383    #[test]
384    fn test_remote_eq_and_hash() {
385        use std::collections::hash_map::DefaultHasher;
386        use std::hash::Hasher;
387
388        let aref = make_actor_ref(Some("actor"), Some("proc"));
389        let a = Remote::<MyWorker>::new(aref.clone());
390        let b = Remote::<MyWorker>::new(aref);
391        assert_eq!(a, b);
392
393        let hash = |r: &Remote<MyWorker>| {
394            let mut h = DefaultHasher::new();
395            r.hash(&mut h);
396            h.finish()
397        };
398        assert_eq!(hash(&a), hash(&b));
399    }
400
401    #[test]
402    fn test_remote_ord() {
403        let a = Remote::<MyWorker>::new(make_actor_ref(Some("a"), None));
404        let b = Remote::<MyWorker>::new(make_actor_ref(Some("b"), None));
405        // Same ids, so ordering should be equal.
406        assert_eq!(a.cmp(&b), std::cmp::Ordering::Equal);
407    }
408
409    #[test]
410    fn test_send_actor_typed_remote() {
411        let remote: Remote<TestActor> = make_remote();
412        remote.send(Ping).unwrap();
413        remote.send(Pong).unwrap();
414    }
415
416    #[test]
417    fn test_send_message_typed_remote() {
418        let remote: Remote<Ping> = make_remote();
419        remote.send(Ping).unwrap();
420    }
421
422    #[test]
423    fn test_narrow() {
424        let actor_remote: Remote<TestActor> = make_remote();
425        let narrowed: Remote<Ping> = actor_remote.narrow();
426        narrowed.send(Ping).unwrap();
427    }
428
429    // --- Port variant tests ---
430
431    #[test]
432    fn test_port_remote_construction_and_accessors() {
433        let pref = make_port_ref(Some("my-actor"), Some("my-proc"));
434        let remote = Remote::<MyWorker>::from_port(pref.clone());
435        assert_eq!(remote.actor_id(), pref.actor_id());
436        assert_eq!(remote.location(), pref.location());
437    }
438
439    #[test]
440    fn test_port_remote_display_matches_port_ref() {
441        let pref = make_port_ref(Some("my-actor"), Some("my-proc"));
442        let remote = Remote::<MyWorker>::from_port(pref.clone());
443        assert_eq!(remote.to_string(), pref.to_string());
444    }
445
446    #[test]
447    fn test_port_remote_debug_all_labels() {
448        let pref = make_port_ref(Some("my-actor"), Some("my-proc"));
449        let remote = Remote::<MyWorker>::from_port(pref.clone());
450        assert_eq!(
451            format!("{:?}", remote),
452            format!("<'my-actor<MyWorker>.my-proc' {}>", pref)
453        );
454    }
455
456    #[test]
457    fn test_port_remote_debug_no_labels() {
458        let pref = make_port_ref(None, None);
459        let remote = Remote::<MyWorker>::from_port(pref.clone());
460        assert_eq!(format!("{:?}", remote), format!("<'<MyWorker>' {}>", pref));
461    }
462
463    #[test]
464    fn test_port_remote_fromstr_roundtrip() {
465        let pref = make_port_ref(Some("my-actor"), Some("my-proc"));
466        let remote = Remote::<MyWorker>::from_port(pref);
467        let s = remote.to_string();
468        let parsed: Remote<MyWorker> = s.parse().unwrap();
469        assert_eq!(remote, parsed);
470    }
471
472    #[test]
473    fn test_port_remote_serde_roundtrip() {
474        let remote =
475            Remote::<MyWorker>::from_port(make_port_ref(Some("my-actor"), Some("my-proc")));
476        let json = serde_json::to_string(&remote).unwrap();
477        let parsed: Remote<MyWorker> = serde_json::from_str(&json).unwrap();
478        assert_eq!(remote, parsed);
479    }
480
481    #[test]
482    fn test_port_remote_eq_and_hash() {
483        use std::collections::hash_map::DefaultHasher;
484        use std::hash::Hasher;
485
486        let pref = make_port_ref(Some("actor"), Some("proc"));
487        let a = Remote::<MyWorker>::from_port(pref.clone());
488        let b = Remote::<MyWorker>::from_port(pref);
489        assert_eq!(a, b);
490
491        let hash = |r: &Remote<MyWorker>| {
492            let mut h = DefaultHasher::new();
493            r.hash(&mut h);
494            h.finish()
495        };
496        assert_eq!(hash(&a), hash(&b));
497    }
498
499    #[test]
500    fn test_actor_and_port_remote_not_equal() {
501        let actor_remote: Remote<MyWorker> = make_remote();
502        let port_remote: Remote<MyWorker> = make_port_remote();
503        assert_ne!(actor_remote, port_remote);
504    }
505
506    #[test]
507    fn test_narrow_port_variant() {
508        let actor_remote: Remote<TestActor> = make_port_remote();
509        let narrowed: Remote<Ping> = actor_remote.narrow();
510        narrowed.send(Ping).unwrap();
511    }
512
513    #[test]
514    fn test_send_port_typed_remote() {
515        let remote: Remote<TestActor> = make_port_remote();
516        remote.send(Ping).unwrap();
517        remote.send(Pong).unwrap();
518    }
519}