hyperactor/
reference.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! References for different resources in Hyperactor.
10//!
11//! The "Id" variants are transparent and typeless, whereas the
12//! corresponding "Ref" variants are opaque and typed. The latter intended
13//! to be exposed in user-facing APIs. We provide [`std::convert::From`]
14//! implementations between Id and Refs where this makes sense.
15//!
16//! All system implementations use the same concrete reference
17//! representations, as their specific layout (e.g., actor index, rank,
18//! etc.) are used by the core communications algorithms throughout.
19//!
20//! References and ids are [`crate::Message`]s to facilitate passing
21//! them between actors.
22
23#![allow(dead_code)] // Allow until this is used outside of tests.
24
25use std::cmp::Ord;
26use std::cmp::Ordering;
27use std::cmp::PartialOrd;
28use std::convert::From;
29use std::fmt;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::marker::PhantomData;
33use std::num::ParseIntError;
34use std::str::FromStr;
35
36use derivative::Derivative;
37use enum_as_inner::EnumAsInner;
38use hyperactor_config::attrs::Attrs;
39use rand::Rng;
40use serde::Deserialize;
41use serde::Deserializer;
42use serde::Serialize;
43use serde::Serializer;
44use typeuri::Named;
45use wirevalue::TypeInfo;
46
47use crate::Actor;
48use crate::ActorHandle;
49use crate::RemoteHandles;
50use crate::RemoteMessage;
51use crate::accum::ReducerOpts;
52use crate::accum::ReducerSpec;
53use crate::actor::Referable;
54use crate::channel::ChannelAddr;
55use crate::context;
56use crate::context::MailboxExt;
57use crate::mailbox::MailboxSenderError;
58use crate::mailbox::MailboxSenderErrorKind;
59use crate::mailbox::PortSink;
60use crate::message::Bind;
61use crate::message::Bindings;
62use crate::message::Unbind;
63
64pub mod lex;
65pub mod name;
66mod parse;
67
68use parse::Lexer;
69use parse::ParseError;
70use parse::Token;
71pub use parse::is_valid_ident;
72use parse::parse;
73
74/// The kinds of references.
75#[derive(strum::Display)]
76pub enum ReferenceKind {
77    /// World references.
78    World,
79    /// Proc references.
80    Proc,
81    /// Actor references.
82    Actor,
83    /// Port references.
84    Port,
85    /// Gang references.
86    Gang,
87}
88
89/// A universal reference to hierarchical identifiers in Hyperactor.
90///
91/// References implement a concrete syntax which can be parsed via
92/// [`FromStr`]. They are of the form:
93///
94/// - `world`,
95/// - `world[rank]`,
96/// - `world[rank].actor[pid]`,
97/// - `world[rank].port[pid][port]`, or
98/// - `world.actor`
99///
100/// Reference also implements a total ordering, so that references are
101/// ordered lexicographically with the hierarchy implied by world, proc,
102/// actor. This allows reference ordering to be used to implement prefix
103/// based routing.
104#[derive(
105    Debug,
106    Serialize,
107    Deserialize,
108    Clone,
109    PartialEq,
110    Eq,
111    Hash,
112    typeuri::Named,
113    EnumAsInner
114)]
115pub enum Reference {
116    /// A reference to a world.
117    World(WorldId),
118    /// A reference to a proc.
119    Proc(ProcId),
120    /// A reference to an actor.
121    Actor(ActorId), // todo: should we only allow name references here?
122    /// A reference to a port.
123    Port(PortId),
124    /// A reference to a gang.
125    Gang(GangId),
126}
127
128impl Reference {
129    /// Tells whether this reference is a prefix of the provided reference.
130    pub fn is_prefix_of(&self, other: &Reference) -> bool {
131        match self {
132            Self::World(_) => self.world_id() == other.world_id(),
133            Self::Proc(_) => self.proc_id() == other.proc_id(),
134            Self::Actor(_) => self == other,
135            Self::Port(_) => self == other,
136            Self::Gang(_) => self == other,
137        }
138    }
139
140    /// The world id of the reference.
141    pub fn world_id(&self) -> Option<&WorldId> {
142        match self {
143            Self::World(world_id) => Some(world_id),
144            Self::Proc(proc_id) => proc_id.world_id(),
145            Self::Actor(ActorId(proc_id, _, _)) => proc_id.world_id(),
146            Self::Port(PortId(ActorId(proc_id, _, _), _)) => proc_id.world_id(),
147            Self::Gang(GangId(world_id, _)) => Some(world_id),
148        }
149    }
150
151    /// The proc id of the reference, if any.
152    pub fn proc_id(&self) -> Option<&ProcId> {
153        match self {
154            Self::World(_) => None,
155            Self::Proc(proc_id) => Some(proc_id),
156            Self::Actor(ActorId(proc_id, _, _)) => Some(proc_id),
157            Self::Port(PortId(ActorId(proc_id, _, _), _)) => Some(proc_id),
158            Self::Gang(_) => None,
159        }
160    }
161
162    /// The rank of the reference, if any.
163    fn rank(&self) -> Option<Index> {
164        self.proc_id().and_then(|proc_id| proc_id.rank())
165    }
166
167    /// The actor id of the reference, if any.
168    pub fn actor_id(&self) -> Option<&ActorId> {
169        match self {
170            Self::World(_) => None,
171            Self::Proc(_) => None,
172            Self::Actor(actor_id) => Some(actor_id),
173            Self::Port(PortId(actor_id, _)) => Some(actor_id),
174            Self::Gang(_) => None,
175        }
176    }
177
178    /// The actor name of the reference, if any.
179    fn actor_name(&self) -> Option<&str> {
180        match self {
181            Self::World(_) => None,
182            Self::Proc(_) => None,
183            Self::Actor(actor_id) => Some(actor_id.name()),
184            Self::Port(PortId(actor_id, _)) => Some(actor_id.name()),
185            Self::Gang(gang_id) => Some(&gang_id.1),
186        }
187    }
188
189    /// The pid of the reference, if any.
190    fn pid(&self) -> Option<Index> {
191        match self {
192            Self::World(_) => None,
193            Self::Proc(_) => None,
194            Self::Actor(actor_id) => Some(actor_id.pid()),
195            Self::Port(PortId(actor_id, _)) => Some(actor_id.pid()),
196            Self::Gang(_) => None,
197        }
198    }
199
200    /// The port of the reference, if any.
201    fn port(&self) -> Option<u64> {
202        match self {
203            Self::World(_) => None,
204            Self::Proc(_) => None,
205            Self::Actor(_) => None,
206            Self::Port(port_id) => Some(port_id.index()),
207            Self::Gang(_) => None,
208        }
209    }
210
211    /// Returns the kind of the reference.
212    pub fn kind(&self) -> ReferenceKind {
213        match self {
214            Self::World(_) => ReferenceKind::World,
215            Self::Proc(_) => ReferenceKind::Proc,
216            Self::Actor(_) => ReferenceKind::Actor,
217            Self::Port(_) => ReferenceKind::Port,
218            Self::Gang(_) => ReferenceKind::Gang,
219        }
220    }
221}
222
223impl PartialOrd for Reference {
224    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
225        Some(self.cmp(other))
226    }
227}
228
229impl Ord for Reference {
230    fn cmp(&self, other: &Self) -> Ordering {
231        (
232            // Ranked procs precede direct procs:
233            self.world_id(),
234            self.rank(),
235            self.proc_id().and_then(ProcId::as_direct),
236            self.actor_name(),
237            self.pid(),
238            self.port(),
239        )
240            .cmp(&(
241                other.world_id(),
242                other.rank(),
243                other.proc_id().and_then(ProcId::as_direct),
244                other.actor_name(),
245                other.pid(),
246                other.port(),
247            ))
248    }
249}
250
251impl fmt::Display for Reference {
252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253        match self {
254            Self::World(world_id) => fmt::Display::fmt(world_id, f),
255            Self::Proc(proc_id) => fmt::Display::fmt(proc_id, f),
256            Self::Actor(actor_id) => fmt::Display::fmt(actor_id, f),
257            Self::Port(port_id) => fmt::Display::fmt(port_id, f),
258            Self::Gang(gang_id) => fmt::Display::fmt(gang_id, f),
259        }
260    }
261}
262
263/// Statically create a [`WorldId`], [`ProcId`], [`ActorId`] or [`GangId`],
264/// given the concrete syntax documented in [`Reference`]:
265///
266/// ```
267/// # use hyperactor::id;
268/// # use hyperactor::reference::WorldId;
269/// # use hyperactor::reference::ProcId;
270/// # use hyperactor::reference::ActorId;
271/// # use hyperactor::reference::GangId;
272/// assert_eq!(id!(hello), WorldId("hello".into()));
273/// assert_eq!(id!(hello[0]), ProcId::Ranked(WorldId("hello".into()), 0));
274/// assert_eq!(
275///     id!(hello[0].actor),
276///     ActorId(
277///         ProcId::Ranked(WorldId("hello".into()), 0),
278///         "actor".into(),
279///         0
280///     )
281/// );
282/// assert_eq!(
283///     id!(hello[0].actor[1]),
284///     ActorId(
285///         ProcId::Ranked(WorldId("hello".into()), 0),
286///         "actor".into(),
287///         1
288///     )
289/// );
290/// assert_eq!(
291///     id!(hello.actor),
292///     GangId(WorldId("hello".into()), "actor".into())
293/// );
294/// ```
295///
296/// Prefer to use the id macro to construct identifiers in code, as it
297/// guarantees static validity, and preserves and reinforces the uniform
298/// concrete syntax of identifiers throughout.
299#[macro_export]
300macro_rules! id {
301    ($world:ident) => {
302        $crate::reference::WorldId(stringify!($world).to_string())
303    };
304    ($world:ident [$rank:expr]) => {
305        $crate::reference::ProcId::Ranked(
306            $crate::reference::WorldId(stringify!($world).to_string()),
307            $rank,
308        )
309    };
310    ($world:ident [$rank:expr] . $actor:ident) => {
311        $crate::reference::ActorId(
312            $crate::reference::ProcId::Ranked(
313                $crate::reference::WorldId(stringify!($world).to_string()),
314                $rank,
315            ),
316            stringify!($actor).to_string(),
317            0,
318        )
319    };
320    ($world:ident [$rank:expr] . $actor:ident [$pid:expr]) => {
321        $crate::reference::ActorId(
322            $crate::reference::ProcId::Ranked(
323                $crate::reference::WorldId(stringify!($world).to_string()),
324                $rank,
325            ),
326            stringify!($actor).to_string(),
327            $pid,
328        )
329    };
330    ($world:ident . $actor:ident) => {
331        $crate::reference::GangId(
332            $crate::reference::WorldId(stringify!($world).to_string()),
333            stringify!($actor).to_string(),
334        )
335    };
336    ($world:ident [$rank:expr] . $actor:ident [$pid:expr] [$port:expr]) => {
337        $crate::reference::PortId(
338            $crate::reference::ActorId(
339                $crate::reference::ProcId::Ranked(
340                    $crate::reference::WorldId(stringify!($world).to_string()),
341                    $rank,
342                ),
343                stringify!($actor).to_string(),
344                $pid,
345            ),
346            $port,
347        )
348    };
349}
350pub use id;
351
352/// The type of error encountered while parsing references.
353#[derive(thiserror::Error, Debug)]
354pub enum ReferenceParsingError {
355    /// The parser expected a token, but it reached the end of the token stream.
356    #[error("expected token")]
357    Empty,
358
359    /// The parser encountered an unexpected token.
360    #[error("unexpected token: {0}")]
361    Unexpected(String),
362
363    /// The parser encountered an error parsing an integer.
364    #[error(transparent)]
365    ParseInt(#[from] ParseIntError),
366
367    /// A parse error.
368    #[error("parse: {0}")]
369    Parse(#[from] ParseError),
370
371    /// The parser encountered the wrong reference type.
372    #[error("wrong reference type: expected {0}")]
373    WrongType(String),
374
375    /// An invalid channel address was encountered while parsing the reference.
376    #[error("invalid channel address {0}: {1}")]
377    InvalidChannelAddress(String, anyhow::Error),
378}
379
380impl FromStr for Reference {
381    type Err = ReferenceParsingError;
382
383    fn from_str(addr: &str) -> Result<Self, Self::Err> {
384        // First, try to parse a "new style" reference:
385        // 1) If the reference contains a comma (anywhere), it is a new style reference;
386        //    commas were not a valid lexeme in the previous reference format.
387        // 2) This is a bit ugly, but we bypass the tokenizer prior to this comma,
388        //    try to parse a channel address, and then parse the remainder.
389
390        match addr.split_once(",") {
391            Some((channel_addr, rest)) => {
392                let channel_addr = channel_addr.parse().map_err(|err| {
393                    ReferenceParsingError::InvalidChannelAddress(channel_addr.to_string(), err)
394                })?;
395
396                Ok(parse! {
397                    Lexer::new(rest);
398
399                    // channeladdr,proc_name
400                    Token::Elem(proc_name) =>
401                    Self::Proc(ProcId::Direct(channel_addr, proc_name.to_string())),
402
403                    // channeladdr,proc_name,actor_name
404                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name) =>
405                    Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), 0)),
406
407                    // channeladdr,proc_name,actor_name[rank]
408                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
409                        Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
410                        Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank)),
411
412                    // channeladdr,proc_name,actor_name[rank][port]
413                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
414                        Token::LeftBracket Token::Uint(rank) Token::RightBracket
415                        Token::LeftBracket Token::Uint(index) Token::RightBracket  =>
416                        Self::Port(PortId(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank), index as u64)),
417
418                    // channeladdr,proc_name,actor_name[rank][port<type>]
419                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
420                        Token::LeftBracket Token::Uint(rank) Token::RightBracket
421                        Token::LeftBracket Token::Uint(index)
422                            Token::LessThan Token::Elem(_type) Token::GreaterThan
423                        Token::RightBracket =>
424                        Self::Port(PortId(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank), index as u64)),
425                }?)
426            }
427
428            // "old style" / "ranked" reference
429            None => {
430                Ok(parse! {
431                    Lexer::new(addr);
432
433                    // world
434                    Token::Elem(world) => Self::World(WorldId(world.into())),
435
436                    // world[rank]
437                    Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
438                        Self::Proc(ProcId::Ranked(WorldId(world.into()), rank)),
439
440                    // world[rank].actor  (implied pid=0)
441                    Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
442                        Token::Dot Token::Elem(actor) =>
443                        Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), 0)),
444
445                    // world[rank].actor[pid]
446                    Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
447                        Token::Dot Token::Elem(actor)
448                        Token::LeftBracket Token::Uint(pid) Token::RightBracket =>
449                        Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid)),
450
451                    // world[rank].actor[pid][port]
452                    Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
453                        Token::Dot Token::Elem(actor)
454                        Token::LeftBracket Token::Uint(pid) Token::RightBracket
455                        Token::LeftBracket Token::Uint(index) Token::RightBracket =>
456                        Self::Port(PortId(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid), index as u64)),
457
458                    // world[rank].actor[pid][port<type>]
459                    Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
460                        Token::Dot Token::Elem(actor)
461                        Token::LeftBracket Token::Uint(pid) Token::RightBracket
462                        Token::LeftBracket Token::Uint(index)
463                            Token::LessThan Token::Elem(_type) Token::GreaterThan
464                        Token::RightBracket =>
465                        Self::Port(PortId(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid), index as u64)),
466
467                    // world.actor
468                    Token::Elem(world) Token::Dot Token::Elem(actor) =>
469                        Self::Gang(GangId(WorldId(world.into()), actor.into())),
470                }?)
471            }
472        }
473    }
474}
475
476impl From<WorldId> for Reference {
477    fn from(world_id: WorldId) -> Self {
478        Self::World(world_id)
479    }
480}
481
482impl From<ProcId> for Reference {
483    fn from(proc_id: ProcId) -> Self {
484        Self::Proc(proc_id)
485    }
486}
487
488impl From<ActorId> for Reference {
489    fn from(actor_id: ActorId) -> Self {
490        Self::Actor(actor_id)
491    }
492}
493
494impl From<PortId> for Reference {
495    fn from(port_id: PortId) -> Self {
496        Self::Port(port_id)
497    }
498}
499
500impl From<GangId> for Reference {
501    fn from(gang_id: GangId) -> Self {
502        Self::Gang(gang_id)
503    }
504}
505
506/// Index is a type alias representing a value that can be used as an index
507/// into a sequence.
508pub type Index = usize;
509
510/// WorldId identifies a world within a [`crate::system::System`].
511/// The index of a World uniquely identifies it within a system instance.
512#[derive(
513    Debug,
514    Serialize,
515    Deserialize,
516    Clone,
517    PartialEq,
518    Eq,
519    PartialOrd,
520    Hash,
521    Ord,
522    typeuri::Named
523)]
524pub struct WorldId(pub String);
525
526impl WorldId {
527    /// Create a proc ID with the provided index in this world.
528    pub fn proc_id(&self, index: Index) -> ProcId {
529        ProcId::Ranked(self.clone(), index)
530    }
531
532    /// The world index.
533    pub fn name(&self) -> &str {
534        &self.0
535    }
536
537    /// Return a randomly selected user proc in this world.
538    pub fn random_user_proc(&self) -> ProcId {
539        let mask = 1usize << (std::mem::size_of::<usize>() * 8 - 1);
540        ProcId::Ranked(self.clone(), rand::thread_rng().r#gen::<usize>() | mask)
541    }
542}
543
544impl fmt::Display for WorldId {
545    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546        let WorldId(name) = self;
547        write!(f, "{}", name)
548    }
549}
550
551impl FromStr for WorldId {
552    type Err = ReferenceParsingError;
553
554    fn from_str(addr: &str) -> Result<Self, Self::Err> {
555        match addr.parse()? {
556            Reference::World(world_id) => Ok(world_id),
557            _ => Err(ReferenceParsingError::WrongType("world".into())),
558        }
559    }
560}
561
562/// Procs are identified by their _rank_ within a world or by a direct channel address.
563/// Each proc represents an actor runtime that can locally route to all of its
564/// constituent actors.
565///
566/// Ranks >= 1usize << (no. bits in usize - 1) (i.e., with the high bit set) are "user"
567/// ranks. These are reserved for randomly generated identifiers not
568/// assigned by the system.
569#[derive(
570    Debug,
571    Serialize,
572    Deserialize,
573    Clone,
574    PartialEq,
575    Eq,
576    PartialOrd,
577    Hash,
578    Ord,
579    typeuri::Named,
580    EnumAsInner
581)]
582pub enum ProcId {
583    /// A ranked proc within a world
584    Ranked(WorldId, Index),
585    /// A proc reachable via a direct channel address, and local name.
586    Direct(ChannelAddr, String),
587}
588
589impl ProcId {
590    /// Create an actor ID with the provided name, pid within this proc.
591    pub fn actor_id(&self, name: impl Into<String>, pid: Index) -> ActorId {
592        ActorId(self.clone(), name.into(), pid)
593    }
594
595    /// The proc's world id, if this is a ranked proc.
596    pub fn world_id(&self) -> Option<&WorldId> {
597        match self {
598            ProcId::Ranked(world_id, _) => Some(world_id),
599            ProcId::Direct(_, _) => None,
600        }
601    }
602
603    /// The world name, if this is a ranked proc.
604    pub fn world_name(&self) -> Option<&str> {
605        self.world_id().map(|world_id| world_id.name())
606    }
607
608    /// The proc's rank, if this is a ranked proc.
609    pub fn rank(&self) -> Option<Index> {
610        match self {
611            ProcId::Ranked(_, rank) => Some(*rank),
612            ProcId::Direct(_, _) => None,
613        }
614    }
615
616    /// The proc's name, if this is a direct proc.
617    pub fn name(&self) -> Option<&String> {
618        match self {
619            ProcId::Ranked(_, _) => None,
620            ProcId::Direct(_, name) => Some(name),
621        }
622    }
623}
624
625impl fmt::Display for ProcId {
626    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627        match self {
628            ProcId::Ranked(world_id, rank) => write!(f, "{}[{}]", world_id, rank),
629            ProcId::Direct(addr, name) => write!(f, "{},{}", addr, name),
630        }
631    }
632}
633
634impl FromStr for ProcId {
635    type Err = ReferenceParsingError;
636
637    fn from_str(addr: &str) -> Result<Self, Self::Err> {
638        match addr.parse()? {
639            Reference::Proc(proc_id) => Ok(proc_id),
640            _ => Err(ReferenceParsingError::WrongType("proc".into())),
641        }
642    }
643}
644
645/// Actors are identified by their proc, their name, and pid.
646#[derive(
647    Debug,
648    Serialize,
649    Deserialize,
650    Clone,
651    PartialEq,
652    Eq,
653    PartialOrd,
654    Hash,
655    Ord,
656    typeuri::Named
657)]
658pub struct ActorId(pub ProcId, pub String, pub Index);
659
660hyperactor_config::impl_attrvalue!(ActorId);
661
662impl ActorId {
663    /// Create a new port ID with the provided port for this actor.
664    pub fn port_id(&self, port: u64) -> PortId {
665        PortId(self.clone(), port)
666    }
667
668    /// Create a child actor ID with the provided PID.
669    pub fn child_id(&self, pid: Index) -> Self {
670        Self(self.0.clone(), self.1.clone(), pid)
671    }
672
673    /// Return the root actor ID for the provided proc and name.
674    pub fn root(proc_id: ProcId, name: String) -> Self {
675        Self(proc_id, name, 0)
676    }
677
678    /// The proc ID of this actor ID.
679    pub fn proc_id(&self) -> &ProcId {
680        &self.0
681    }
682
683    /// The world name. Panics if this is a direct proc.
684    pub fn world_name(&self) -> &str {
685        self.0
686            .world_name()
687            .expect("world_name() called on direct proc")
688    }
689
690    /// The actor's proc's rank. Panics if this is a direct proc.
691    pub fn rank(&self) -> Index {
692        self.0.rank().expect("rank() called on direct proc")
693    }
694
695    /// The actor's name.
696    pub fn name(&self) -> &str {
697        &self.1
698    }
699
700    /// The actor's pid.
701    pub fn pid(&self) -> Index {
702        self.2
703    }
704}
705
706impl fmt::Display for ActorId {
707    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
708        let ActorId(proc_id, name, pid) = self;
709        match proc_id {
710            ProcId::Ranked(..) => write!(f, "{}.{}[{}]", proc_id, name, pid),
711            ProcId::Direct(..) => write!(f, "{},{}[{}]", proc_id, name, pid),
712        }
713    }
714}
715impl<A: Referable> From<ActorRef<A>> for ActorId {
716    fn from(actor_ref: ActorRef<A>) -> Self {
717        actor_ref.actor_id.clone()
718    }
719}
720
721impl<'a, A: Referable> From<&'a ActorRef<A>> for &'a ActorId {
722    fn from(actor_ref: &'a ActorRef<A>) -> Self {
723        &actor_ref.actor_id
724    }
725}
726
727impl FromStr for ActorId {
728    type Err = ReferenceParsingError;
729
730    fn from_str(addr: &str) -> Result<Self, Self::Err> {
731        match addr.parse()? {
732            Reference::Actor(actor_id) => Ok(actor_id),
733            _ => Err(ReferenceParsingError::WrongType("actor".into())),
734        }
735    }
736}
737
738/// ActorRefs are typed references to actors.
739#[derive(typeuri::Named)]
740pub struct ActorRef<A: Referable> {
741    pub(crate) actor_id: ActorId,
742    // fn() -> A so that the struct remains Send
743    phantom: PhantomData<fn() -> A>,
744}
745
746impl<A: Referable> ActorRef<A> {
747    /// Get the remote port for message type [`M`] for the referenced actor.
748    pub fn port<M: RemoteMessage>(&self) -> PortRef<M>
749    where
750        A: RemoteHandles<M>,
751    {
752        PortRef::attest(self.actor_id.port_id(<M as Named>::port()))
753    }
754
755    /// Send an [`M`]-typed message to the referenced actor.
756    pub fn send<M: RemoteMessage>(
757        &self,
758        cx: &impl context::Actor,
759        message: M,
760    ) -> Result<(), MailboxSenderError>
761    where
762        A: RemoteHandles<M>,
763    {
764        self.port().send(cx, message)
765    }
766
767    /// Send an [`M`]-typed message to the referenced actor, with additional context provided by
768    /// headers.
769    pub fn send_with_headers<M: RemoteMessage>(
770        &self,
771        cx: &impl context::Actor,
772        headers: Attrs,
773        message: M,
774    ) -> Result<(), MailboxSenderError>
775    where
776        A: RemoteHandles<M>,
777    {
778        self.port().send_with_headers(cx, headers, message)
779    }
780
781    /// The caller guarantees that the provided actor ID is also a valid,
782    /// typed reference.  This is usually invoked to provide a guarantee
783    /// that an externally-provided actor ID (e.g., through a command
784    /// line argument) is a valid reference.
785    pub fn attest(actor_id: ActorId) -> Self {
786        Self {
787            actor_id,
788            phantom: PhantomData,
789        }
790    }
791
792    /// The actor ID corresponding with this reference.
793    pub fn actor_id(&self) -> &ActorId {
794        &self.actor_id
795    }
796
797    /// Convert this actor reference into its corresponding actor ID.
798    pub fn into_actor_id(self) -> ActorId {
799        self.actor_id
800    }
801
802    /// Attempt to downcast this reference into a (local) actor handle.
803    /// This will only succeed when the referenced actor is in the same
804    /// proc as the caller.
805    pub fn downcast_handle(&self, cx: &impl context::Actor) -> Option<ActorHandle<A>>
806    where
807        A: Actor,
808    {
809        cx.instance().proc().resolve_actor_ref(self)
810    }
811}
812
813// Implement Serialize manually, without requiring A: Serialize
814impl<A: Referable> Serialize for ActorRef<A> {
815    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
816    where
817        S: Serializer,
818    {
819        // Serialize only the fields that don't depend on A
820        self.actor_id().serialize(serializer)
821    }
822}
823
824// Implement Deserialize manually, without requiring A: Deserialize
825impl<'de, A: Referable> Deserialize<'de> for ActorRef<A> {
826    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
827    where
828        D: Deserializer<'de>,
829    {
830        let actor_id = <ActorId>::deserialize(deserializer)?;
831        Ok(ActorRef {
832            actor_id,
833            phantom: PhantomData,
834        })
835    }
836}
837
838// Implement Debug manually, without requiring A: Debug
839impl<A: Referable> fmt::Debug for ActorRef<A> {
840    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
841        f.debug_struct("ActorRef")
842            .field("actor_id", &self.actor_id)
843            .field("type", &std::any::type_name::<A>())
844            .finish()
845    }
846}
847
848impl<A: Referable> fmt::Display for ActorRef<A> {
849    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
850        fmt::Display::fmt(&self.actor_id, f)?;
851        write!(f, "<{}>", std::any::type_name::<A>())
852    }
853}
854
855// We implement Clone manually to avoid imposing A: Clone.
856impl<A: Referable> Clone for ActorRef<A> {
857    fn clone(&self) -> Self {
858        Self {
859            actor_id: self.actor_id.clone(),
860            phantom: PhantomData,
861        }
862    }
863}
864
865impl<A: Referable> PartialEq for ActorRef<A> {
866    fn eq(&self, other: &Self) -> bool {
867        self.actor_id == other.actor_id
868    }
869}
870
871impl<A: Referable> Eq for ActorRef<A> {}
872
873impl<A: Referable> PartialOrd for ActorRef<A> {
874    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
875        Some(self.cmp(other))
876    }
877}
878
879impl<A: Referable> Ord for ActorRef<A> {
880    fn cmp(&self, other: &Self) -> Ordering {
881        self.actor_id.cmp(&other.actor_id)
882    }
883}
884
885impl<A: Referable> Hash for ActorRef<A> {
886    fn hash<H: Hasher>(&self, state: &mut H) {
887        self.actor_id.hash(state);
888    }
889}
890
891/// Port ids identify [`crate::mailbox::Port`]s of an actor.
892///
893/// TODO: consider moving [`crate::mailbox::Port`] to `PortRef` in this
894/// module for consistency with actors,
895#[derive(
896    Debug,
897    Serialize,
898    Deserialize,
899    Clone,
900    PartialEq,
901    Eq,
902    PartialOrd,
903    Hash,
904    Ord,
905    typeuri::Named
906)]
907pub struct PortId(pub ActorId, pub u64);
908
909impl PortId {
910    /// The ID of the port's owning actor.
911    pub fn actor_id(&self) -> &ActorId {
912        &self.0
913    }
914
915    /// Convert this port ID into an actor ID.
916    pub fn into_actor_id(self) -> ActorId {
917        self.0
918    }
919
920    /// This port's index.
921    pub fn index(&self) -> u64 {
922        self.1
923    }
924
925    /// Send a serialized message to this port, provided a sending capability,
926    /// such as [`crate::actor::Instance`]. It is the sender's responsibility
927    /// to ensure that the provided message is well-typed.
928    pub fn send(&self, cx: &impl context::Actor, serialized: wirevalue::Any) {
929        let mut headers = Attrs::new();
930        crate::mailbox::headers::set_send_timestamp(&mut headers);
931        cx.post(self.clone(), headers, serialized, true);
932    }
933
934    /// Send a serialized message to this port, provided a sending capability,
935    /// such as [`crate::actor::Instance`], with additional context provided by headers.
936    /// It is the sender's responsibility to ensure that the provided message is well-typed.
937    pub fn send_with_headers(
938        &self,
939        cx: &impl context::Actor,
940        serialized: wirevalue::Any,
941        mut headers: Attrs,
942    ) {
943        crate::mailbox::headers::set_send_timestamp(&mut headers);
944        cx.post(self.clone(), headers, serialized, true);
945    }
946
947    /// Split this port, returning a new port that relays messages to the port
948    /// through a local proxy, which may coalesce messages.
949    pub fn split(
950        &self,
951        cx: &impl context::Actor,
952        reducer_spec: Option<ReducerSpec>,
953        reducer_opts: Option<ReducerOpts>,
954        return_undeliverable: bool,
955    ) -> anyhow::Result<PortId> {
956        cx.split(
957            self.clone(),
958            reducer_spec,
959            reducer_opts,
960            return_undeliverable,
961        )
962    }
963}
964
965impl FromStr for PortId {
966    type Err = ReferenceParsingError;
967
968    fn from_str(addr: &str) -> Result<Self, Self::Err> {
969        match addr.parse()? {
970            Reference::Port(port_id) => Ok(port_id),
971            _ => Err(ReferenceParsingError::WrongType("port".into())),
972        }
973    }
974}
975
976impl fmt::Display for PortId {
977    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
978        let PortId(actor_id, port) = self;
979        if port & (1 << 63) != 0 {
980            let type_info = TypeInfo::get(*port).or_else(|| TypeInfo::get(*port & !(1 << 63)));
981            let typename = type_info.map_or("unknown", TypeInfo::typename);
982            write!(f, "{}[{}<{}>]", actor_id, port, typename)
983        } else {
984            write!(f, "{}[{}]", actor_id, port)
985        }
986    }
987}
988
989/// A reference to a remote port. All messages passed through
990/// PortRefs will be serialized.
991#[derive(Debug, Serialize, Deserialize, Derivative, typeuri::Named)]
992#[derivative(PartialEq, Eq, PartialOrd, Hash, Ord)]
993pub struct PortRef<M> {
994    port_id: PortId,
995    #[derivative(
996        PartialEq = "ignore",
997        PartialOrd = "ignore",
998        Ord = "ignore",
999        Hash = "ignore"
1000    )]
1001    reducer_spec: Option<ReducerSpec>,
1002    #[derivative(
1003        PartialEq = "ignore",
1004        PartialOrd = "ignore",
1005        Ord = "ignore",
1006        Hash = "ignore"
1007    )]
1008    reducer_opts: Option<ReducerOpts>,
1009    phantom: PhantomData<M>,
1010    return_undeliverable: bool,
1011}
1012
1013impl<M: RemoteMessage> PortRef<M> {
1014    /// The caller attests that the provided PortId can be
1015    /// converted to a reachable, typed port reference.
1016    pub fn attest(port_id: PortId) -> Self {
1017        Self {
1018            port_id,
1019            reducer_spec: None,
1020            reducer_opts: None,
1021            phantom: PhantomData,
1022            return_undeliverable: true,
1023        }
1024    }
1025
1026    /// The caller attests that the provided PortId can be
1027    /// converted to a reachable, typed port reference.
1028    pub fn attest_reducible(
1029        port_id: PortId,
1030        reducer_spec: Option<ReducerSpec>,
1031        reducer_opts: Option<ReducerOpts>,
1032    ) -> Self {
1033        Self {
1034            port_id,
1035            reducer_spec,
1036            reducer_opts,
1037            phantom: PhantomData,
1038            return_undeliverable: true,
1039        }
1040    }
1041
1042    /// The caller attests that the provided PortId can be
1043    /// converted to a reachable, typed port reference.
1044    pub fn attest_message_port(actor: &ActorId) -> Self {
1045        PortRef::<M>::attest(actor.port_id(<M as Named>::port()))
1046    }
1047
1048    /// The typehash of this port's reducer, if any. Reducers
1049    /// may be used to coalesce messages sent to a port.
1050    pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
1051        &self.reducer_spec
1052    }
1053
1054    /// This port's ID.
1055    pub fn port_id(&self) -> &PortId {
1056        &self.port_id
1057    }
1058
1059    /// Convert this PortRef into its corresponding port id.
1060    pub fn into_port_id(self) -> PortId {
1061        self.port_id
1062    }
1063
1064    /// coerce it into OncePortRef so we can send messages to this port from
1065    /// APIs requires OncePortRef.
1066    pub fn into_once(self) -> OncePortRef<M> {
1067        OncePortRef::attest(self.into_port_id())
1068    }
1069
1070    /// Send a message to this port, provided a sending capability, such as
1071    /// [`crate::actor::Instance`].
1072    pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1073        self.send_with_headers(cx, Attrs::new(), message)
1074    }
1075
1076    /// Send a message to this port, provided a sending capability, such as
1077    /// [`crate::actor::Instance`]. Additional context can be provided in the form of
1078    /// headers.
1079    pub fn send_with_headers(
1080        &self,
1081        cx: &impl context::Actor,
1082        headers: Attrs,
1083        message: M,
1084    ) -> Result<(), MailboxSenderError> {
1085        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
1086            MailboxSenderError::new_bound(
1087                self.port_id.clone(),
1088                MailboxSenderErrorKind::Serialize(err.into()),
1089            )
1090        })?;
1091        self.send_serialized(cx, headers, serialized);
1092        Ok(())
1093    }
1094
1095    /// Send a serialized message to this port, provided a sending capability, such as
1096    /// [`crate::actor::Instance`].
1097    pub fn send_serialized(
1098        &self,
1099        cx: &impl context::Actor,
1100        mut headers: Attrs,
1101        message: wirevalue::Any,
1102    ) {
1103        crate::mailbox::headers::set_send_timestamp(&mut headers);
1104        crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1105        cx.post(
1106            self.port_id.clone(),
1107            headers,
1108            message,
1109            self.return_undeliverable,
1110        );
1111    }
1112
1113    /// Convert this port into a sink that can be used to send messages using the given capability.
1114    pub fn into_sink<C: context::Actor>(self, cx: C) -> PortSink<C, M> {
1115        PortSink::new(cx, self)
1116    }
1117
1118    /// Set whether or not messages sent to this port that are undeliverable
1119    /// should be returned to the sender.
1120    pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
1121        self.return_undeliverable = return_undeliverable;
1122    }
1123}
1124
1125impl<M: RemoteMessage> Clone for PortRef<M> {
1126    fn clone(&self) -> Self {
1127        Self {
1128            port_id: self.port_id.clone(),
1129            reducer_spec: self.reducer_spec.clone(),
1130            reducer_opts: self.reducer_opts.clone(),
1131            phantom: PhantomData,
1132            return_undeliverable: self.return_undeliverable,
1133        }
1134    }
1135}
1136
1137impl<M: RemoteMessage> fmt::Display for PortRef<M> {
1138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1139        fmt::Display::fmt(&self.port_id, f)
1140    }
1141}
1142
1143/// The parameters extracted from [`PortRef`] to [`Bindings`].
1144#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
1145pub struct UnboundPort(
1146    pub PortId,
1147    pub Option<ReducerSpec>,
1148    pub Option<ReducerOpts>,
1149    pub bool, // return_undeliverable
1150);
1151wirevalue::register_type!(UnboundPort);
1152
1153impl UnboundPort {
1154    /// Update the port id of this binding.
1155    pub fn update(&mut self, port_id: PortId) {
1156        self.0 = port_id;
1157    }
1158}
1159
1160impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
1161    fn from(port_ref: &PortRef<M>) -> Self {
1162        UnboundPort(
1163            port_ref.port_id.clone(),
1164            port_ref.reducer_spec.clone(),
1165            port_ref.reducer_opts.clone(),
1166            port_ref.return_undeliverable,
1167        )
1168    }
1169}
1170
1171impl<M: RemoteMessage> Unbind for PortRef<M> {
1172    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
1173        bindings.push_back(&UnboundPort::from(self))
1174    }
1175}
1176
1177impl<M: RemoteMessage> Bind for PortRef<M> {
1178    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
1179        let bound = bindings.try_pop_front::<UnboundPort>()?;
1180        self.port_id = bound.0;
1181        self.reducer_spec = bound.1;
1182        self.reducer_opts = bound.2;
1183        self.return_undeliverable = bound.3;
1184        Ok(())
1185    }
1186}
1187
1188/// A remote reference to a [`OncePort`]. References are serializable
1189/// and may be passed to remote actors, which can then use it to send
1190/// a message to this port.
1191#[derive(Debug, Serialize, Deserialize, PartialEq)]
1192pub struct OncePortRef<M> {
1193    port_id: PortId,
1194    phantom: PhantomData<M>,
1195}
1196
1197impl<M: RemoteMessage> OncePortRef<M> {
1198    pub(crate) fn attest(port_id: PortId) -> Self {
1199        Self {
1200            port_id,
1201            phantom: PhantomData,
1202        }
1203    }
1204
1205    /// This port's ID.
1206    pub fn port_id(&self) -> &PortId {
1207        &self.port_id
1208    }
1209
1210    /// Convert this PortRef into its corresponding port id.
1211    pub fn into_port_id(self) -> PortId {
1212        self.port_id
1213    }
1214
1215    /// Send a message to this port, provided a sending capability, such as
1216    /// [`crate::actor::Instance`].
1217    pub fn send(self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1218        self.send_with_headers(cx, Attrs::new(), message)
1219    }
1220
1221    /// Send a message to this port, provided a sending capability, such as
1222    /// [`crate::actor::Instance`]. Additional context can be provided in the form of headers.
1223    pub fn send_with_headers(
1224        self,
1225        cx: &impl context::Actor,
1226        mut headers: Attrs,
1227        message: M,
1228    ) -> Result<(), MailboxSenderError> {
1229        crate::mailbox::headers::set_send_timestamp(&mut headers);
1230        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
1231            MailboxSenderError::new_bound(
1232                self.port_id.clone(),
1233                MailboxSenderErrorKind::Serialize(err.into()),
1234            )
1235        })?;
1236        cx.post(self.port_id.clone(), headers, serialized, true);
1237        Ok(())
1238    }
1239}
1240
1241impl<M: RemoteMessage> Clone for OncePortRef<M> {
1242    fn clone(&self) -> Self {
1243        Self {
1244            port_id: self.port_id.clone(),
1245            phantom: PhantomData,
1246        }
1247    }
1248}
1249
1250impl<M: RemoteMessage> fmt::Display for OncePortRef<M> {
1251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1252        fmt::Display::fmt(&self.port_id, f)
1253    }
1254}
1255
1256impl<M: RemoteMessage> Named for OncePortRef<M> {
1257    fn typename() -> &'static str {
1258        wirevalue::intern_typename!(Self, "hyperactor::mailbox::OncePortRef<{}>", M)
1259    }
1260}
1261
1262// We do not split PortRef, because it can only receive a single response, and
1263// there is no meaningful performance gain to make that response going through
1264// comm actors.
1265impl<M: RemoteMessage> Unbind for OncePortRef<M> {
1266    fn unbind(&self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1267        Ok(())
1268    }
1269}
1270
1271impl<M: RemoteMessage> Bind for OncePortRef<M> {
1272    fn bind(&mut self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1273        Ok(())
1274    }
1275}
1276
1277/// Gangs identify a gang of actors across the world.
1278#[derive(
1279    Debug,
1280    Serialize,
1281    Deserialize,
1282    Clone,
1283    PartialEq,
1284    Eq,
1285    PartialOrd,
1286    Hash,
1287    Ord,
1288    typeuri::Named
1289)]
1290pub struct GangId(pub WorldId, pub String);
1291
1292impl GangId {
1293    pub(crate) fn expand(&self, world_size: usize) -> impl Iterator<Item = ActorId> + '_ {
1294        (0..world_size).map(|rank| ActorId(ProcId::Ranked(self.0.clone(), rank), self.1.clone(), 0))
1295    }
1296
1297    /// The world id of the gang.
1298    pub fn world_id(&self) -> &WorldId {
1299        &self.0
1300    }
1301
1302    /// The name of the gang.
1303    pub fn name(&self) -> &str {
1304        &self.1
1305    }
1306
1307    /// The gang's actor ID for the provided rank. It always returns the root
1308    /// actor because the root actor is the public interface of a gang.
1309    pub fn actor_id(&self, rank: Index) -> ActorId {
1310        ActorId(
1311            ProcId::Ranked(self.world_id().clone(), rank),
1312            self.name().to_string(),
1313            0,
1314        )
1315    }
1316}
1317
1318impl fmt::Display for GangId {
1319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1320        let GangId(world_id, name) = self;
1321        write!(f, "{}.{}", world_id, name)
1322    }
1323}
1324
1325impl FromStr for GangId {
1326    type Err = ReferenceParsingError;
1327
1328    fn from_str(addr: &str) -> Result<Self, Self::Err> {
1329        match addr.parse()? {
1330            Reference::Gang(gang_id) => Ok(gang_id),
1331            _ => Err(ReferenceParsingError::WrongType("gang".into())),
1332        }
1333    }
1334}
1335
1336/// Chop implements a simple lexer on a fixed set of delimiters.
1337fn chop<'a>(mut s: &'a str, delims: &'a [&'a str]) -> impl Iterator<Item = &'a str> + 'a {
1338    std::iter::from_fn(move || {
1339        if s.is_empty() {
1340            return None;
1341        }
1342
1343        match delims
1344            .iter()
1345            .enumerate()
1346            .flat_map(|(index, d)| s.find(d).map(|pos| (index, pos)))
1347            .min_by_key(|&(_, v)| v)
1348        {
1349            Some((index, 0)) => {
1350                let delim = delims[index];
1351                s = &s[delim.len()..];
1352                Some(delim)
1353            }
1354            Some((_, pos)) => {
1355                let token = &s[..pos];
1356                s = &s[pos..];
1357                Some(token.trim())
1358            }
1359            None => {
1360                let token = s;
1361                s = "";
1362                Some(token.trim())
1363            }
1364        }
1365    })
1366}
1367
1368/// GangRefs are typed references to gangs.
1369#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Hash, Ord)]
1370pub struct GangRef<A: Referable> {
1371    gang_id: GangId,
1372    phantom: PhantomData<A>,
1373}
1374
1375impl<A: Referable> GangRef<A> {
1376    /// Return an ActorRef corresponding with the provided rank in
1377    /// this gang.  Does not check the validity of the rank, so the
1378    /// returned identifier is not guaranteed to refer to a valid rank.
1379    pub fn rank(&self, rank: Index) -> ActorRef<A> {
1380        let GangRef {
1381            gang_id: GangId(world_id, name),
1382            ..
1383        } = self;
1384        ActorRef::attest(ActorId(
1385            ProcId::Ranked(world_id.clone(), rank),
1386            name.clone(),
1387            0,
1388        ))
1389    }
1390
1391    /// Return the gang ID.
1392    pub fn gang_id(&self) -> &GangId {
1393        &self.gang_id
1394    }
1395
1396    /// The caller attests that the provided GandId can be
1397    /// converted to a reachable, typed gang reference.
1398    pub fn attest(gang_id: GangId) -> Self {
1399        Self {
1400            gang_id,
1401            phantom: PhantomData,
1402        }
1403    }
1404}
1405
1406impl<A: Referable> Clone for GangRef<A> {
1407    fn clone(&self) -> Self {
1408        Self {
1409            gang_id: self.gang_id.clone(),
1410            phantom: PhantomData,
1411        }
1412    }
1413}
1414
1415impl<A: Referable> From<GangRef<A>> for GangId {
1416    fn from(gang_ref: GangRef<A>) -> Self {
1417        gang_ref.gang_id
1418    }
1419}
1420
1421impl<'a, A: Referable> From<&'a GangRef<A>> for &'a GangId {
1422    fn from(gang_ref: &'a GangRef<A>) -> Self {
1423        &gang_ref.gang_id
1424    }
1425}
1426
1427#[cfg(test)]
1428mod tests {
1429    use rand::seq::SliceRandom;
1430    use rand::thread_rng;
1431
1432    use super::*;
1433    // for macros
1434
1435    #[test]
1436    fn test_reference_parse() {
1437        let cases: Vec<(&str, Reference)> = vec![
1438            ("test", WorldId("test".into()).into()),
1439            (
1440                "test[234]",
1441                ProcId::Ranked(WorldId("test".into()), 234).into(),
1442            ),
1443            (
1444                "test[234].testactor[6]",
1445                ActorId(
1446                    ProcId::Ranked(WorldId("test".into()), 234),
1447                    "testactor".into(),
1448                    6,
1449                )
1450                .into(),
1451            ),
1452            (
1453                "test[234].testactor[6][1]",
1454                PortId(
1455                    ActorId(
1456                        ProcId::Ranked(WorldId("test".into()), 234),
1457                        "testactor".into(),
1458                        6,
1459                    ),
1460                    1,
1461                )
1462                .into(),
1463            ),
1464            (
1465                "test.testactor",
1466                GangId(WorldId("test".into()), "testactor".into()).into(),
1467            ),
1468            (
1469                "tcp:[::1]:1234,test,testactor[123]",
1470                ActorId(
1471                    ProcId::Direct("tcp:[::1]:1234".parse().unwrap(), "test".to_string()),
1472                    "testactor".to_string(),
1473                    123,
1474                )
1475                .into(),
1476            ),
1477            (
1478                // type annotations are ignored
1479                "tcp:[::1]:1234,test,testactor[0][123<my::type>]",
1480                PortId(
1481                    ActorId(
1482                        ProcId::Direct("tcp:[::1]:1234".parse().unwrap(), "test".to_string()),
1483                        "testactor".to_string(),
1484                        0,
1485                    ),
1486                    123,
1487                )
1488                .into(),
1489            ),
1490            (
1491                // References with v1::Name::Suffixed for actor names are parseable
1492                "test[234].testactor_12345[6]",
1493                ActorId(
1494                    ProcId::Ranked(WorldId("test".into()), 234),
1495                    "testactor_12345".into(),
1496                    6,
1497                )
1498                .into(),
1499            ),
1500        ];
1501
1502        for (s, expected) in cases {
1503            assert_eq!(s.parse::<Reference>().unwrap(), expected, "for {}", s);
1504        }
1505    }
1506
1507    #[test]
1508    fn test_reference_parse_error() {
1509        let cases: Vec<&str> = vec!["(blah)", "world(1, 2, 3)"];
1510
1511        for s in cases {
1512            let result: Result<Reference, ReferenceParsingError> = s.parse();
1513            assert!(result.is_err());
1514        }
1515    }
1516
1517    #[test]
1518    fn test_id_macro() {
1519        assert_eq!(id!(hello), WorldId("hello".into()));
1520        assert_eq!(id!(hello[0]), ProcId::Ranked(WorldId("hello".into()), 0));
1521        assert_eq!(
1522            id!(hello[0].actor),
1523            ActorId(
1524                ProcId::Ranked(WorldId("hello".into()), 0),
1525                "actor".into(),
1526                0
1527            )
1528        );
1529        assert_eq!(
1530            id!(hello[0].actor[1]),
1531            ActorId(
1532                ProcId::Ranked(WorldId("hello".into()), 0),
1533                "actor".into(),
1534                1
1535            )
1536        );
1537        assert_eq!(
1538            id!(hello.actor),
1539            GangId(WorldId("hello".into()), "actor".into())
1540        );
1541    }
1542
1543    #[test]
1544    fn test_reference_ord() {
1545        let expected: Vec<Reference> = [
1546            "first",
1547            "second",
1548            "second.actor1",
1549            "second.actor2",
1550            "second[1]",
1551            "second[1].actor1",
1552            "second[1].actor2",
1553            "second[2]",
1554            "second[2].actor100",
1555            "third",
1556            "third.actor",
1557            "third[2]",
1558            "third[2].actor",
1559            "third[2].actor[1]",
1560        ]
1561        .into_iter()
1562        .map(|s| s.parse().unwrap())
1563        .collect();
1564
1565        let mut sorted = expected.to_vec();
1566        sorted.shuffle(&mut thread_rng());
1567        sorted.sort();
1568
1569        assert_eq!(sorted, expected);
1570    }
1571
1572    #[test]
1573    fn test_port_type_annotation() {
1574        #[derive(typeuri::Named, Serialize, Deserialize)]
1575        struct MyType;
1576        wirevalue::register_type!(MyType);
1577        let port_id = PortId(
1578            ActorId(
1579                ProcId::Ranked(WorldId("test".into()), 234),
1580                "testactor".into(),
1581                1,
1582            ),
1583            MyType::port(),
1584        );
1585        assert_eq!(
1586            port_id.to_string(),
1587            "test[234].testactor[1][17867850292987402005<hyperactor::reference::tests::MyType>]"
1588        );
1589    }
1590}