hyperactor/
reference.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! References for different resources in Hyperactor.
10//!
11//! The "Id" variants are transparent and typeless, whereas the
12//! corresponding "Ref" variants are opaque and typed. The latter intended
13//! to be exposed in user-facing APIs. We provide [`std::convert::From`]
14//! implementations between Id and Refs where this makes sense.
15//!
16//! All system implementations use the same concrete reference
17//! representations, as their specific layout (e.g., actor index, rank,
18//! etc.) are used by the core communications algorithms throughout.
19//!
20//! References and ids are [`crate::Message`]s to facilitate passing
21//! them between actors.
22
23#![allow(dead_code)] // Allow until this is used outside of tests.
24
25use std::cmp::Ord;
26use std::cmp::Ordering;
27use std::cmp::PartialOrd;
28use std::convert::From;
29use std::fmt;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::marker::PhantomData;
33use std::num::ParseIntError;
34use std::str::FromStr;
35
36use derivative::Derivative;
37use enum_as_inner::EnumAsInner;
38use rand::Rng;
39use serde::Deserialize;
40use serde::Serialize;
41
42use crate as hyperactor;
43use crate::Actor;
44use crate::ActorHandle;
45use crate::Named;
46use crate::RemoteHandles;
47use crate::RemoteMessage;
48use crate::accum::ReducerSpec;
49use crate::actor::RemoteActor;
50use crate::attrs::Attrs;
51use crate::cap;
52use crate::channel::ChannelAddr;
53use crate::data::Serialized;
54use crate::mailbox::MailboxSenderError;
55use crate::mailbox::MailboxSenderErrorKind;
56use crate::mailbox::PortSink;
57use crate::message::Bind;
58use crate::message::Bindings;
59use crate::message::Unbind;
60use crate::parse::Lexer;
61use crate::parse::ParseError;
62use crate::parse::Token;
63use crate::parse::parse;
64
65/// A universal reference to hierarchical identifiers in Hyperactor.
66///
67/// References implement a concrete syntax which can be parsed via
68/// [`FromStr`]. They are of the form:
69///
70/// - `world`,
71/// - `world[rank]`,
72/// - `world[rank].actor[pid]`,
73/// - `world[rank].port[pid][port]`, or
74/// - `world.actor`
75///
76/// Reference also implements a total ordering, so that references are
77/// ordered lexicographically with the hierarchy implied by world, proc,
78/// actor. This allows reference ordering to be used to implement prefix
79/// based routing.
80#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash, Named)]
81pub enum Reference {
82    /// A reference to a world.
83    World(WorldId),
84    /// A reference to a proc.
85    Proc(ProcId),
86    /// A reference to an actor.
87    Actor(ActorId), // todo: should we only allow name references here?
88    /// A reference to a port.
89    Port(PortId),
90    /// A reference to a gang.
91    Gang(GangId),
92}
93
94impl Reference {
95    /// Tells whether this reference is a prefix of the provided reference.
96    pub fn is_prefix_of(&self, other: &Reference) -> bool {
97        match self {
98            Self::World(_) => self.world_id() == other.world_id(),
99            Self::Proc(_) => self.proc_id() == other.proc_id(),
100            Self::Actor(_) => self == other,
101            Self::Port(_) => self == other,
102            Self::Gang(_) => self == other,
103        }
104    }
105
106    /// The world id of the reference.
107    pub fn world_id(&self) -> Option<&WorldId> {
108        match self {
109            Self::World(world_id) => Some(world_id),
110            Self::Proc(proc_id) => proc_id.world_id(),
111            Self::Actor(ActorId(proc_id, _, _)) => proc_id.world_id(),
112            Self::Port(PortId(ActorId(proc_id, _, _), _)) => proc_id.world_id(),
113            Self::Gang(GangId(world_id, _)) => Some(world_id),
114        }
115    }
116
117    /// The proc id of the reference, if any.
118    pub fn proc_id(&self) -> Option<&ProcId> {
119        match self {
120            Self::World(_) => None,
121            Self::Proc(proc_id) => Some(proc_id),
122            Self::Actor(ActorId(proc_id, _, _)) => Some(proc_id),
123            Self::Port(PortId(ActorId(proc_id, _, _), _)) => Some(proc_id),
124            Self::Gang(_) => None,
125        }
126    }
127
128    /// The rank of the reference, if any.
129    fn rank(&self) -> Option<Index> {
130        self.proc_id().and_then(|proc_id| proc_id.rank())
131    }
132
133    /// The actor id of the reference, if any.
134    pub fn actor_id(&self) -> Option<&ActorId> {
135        match self {
136            Self::World(_) => None,
137            Self::Proc(_) => None,
138            Self::Actor(actor_id) => Some(actor_id),
139            Self::Port(PortId(actor_id, _)) => Some(actor_id),
140            Self::Gang(_) => None,
141        }
142    }
143
144    /// The actor name of the reference, if any.
145    fn actor_name(&self) -> Option<&str> {
146        match self {
147            Self::World(_) => None,
148            Self::Proc(_) => None,
149            Self::Actor(actor_id) => Some(actor_id.name()),
150            Self::Port(PortId(actor_id, _)) => Some(actor_id.name()),
151            Self::Gang(gang_id) => Some(&gang_id.1),
152        }
153    }
154
155    /// The pid of the reference, if any.
156    fn pid(&self) -> Option<Index> {
157        match self {
158            Self::World(_) => None,
159            Self::Proc(_) => None,
160            Self::Actor(actor_id) => Some(actor_id.pid()),
161            Self::Port(PortId(actor_id, _)) => Some(actor_id.pid()),
162            Self::Gang(_) => None,
163        }
164    }
165
166    /// The port of the reference, if any.
167    fn port(&self) -> Option<u64> {
168        match self {
169            Self::World(_) => None,
170            Self::Proc(_) => None,
171            Self::Actor(_) => None,
172            Self::Port(port_id) => Some(port_id.index()),
173            Self::Gang(_) => None,
174        }
175    }
176}
177
178impl PartialOrd for Reference {
179    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
180        Some(self.cmp(other))
181    }
182}
183
184impl Ord for Reference {
185    fn cmp(&self, other: &Self) -> Ordering {
186        (
187            self.world_id(),
188            self.rank(),
189            self.actor_name(),
190            self.pid(),
191            self.port(),
192        )
193            .cmp(&(
194                other.world_id(),
195                other.rank(),
196                other.actor_name(),
197                other.pid(),
198                other.port(),
199            ))
200    }
201}
202
203impl fmt::Display for Reference {
204    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205        match self {
206            Self::World(world_id) => fmt::Display::fmt(world_id, f),
207            Self::Proc(proc_id) => fmt::Display::fmt(proc_id, f),
208            Self::Actor(actor_id) => fmt::Display::fmt(actor_id, f),
209            Self::Port(port_id) => fmt::Display::fmt(port_id, f),
210            Self::Gang(gang_id) => fmt::Display::fmt(gang_id, f),
211        }
212    }
213}
214
215/// Statically create a [`WorldId`], [`ProcId`], [`ActorId`] or [`GangId`],
216/// given the concrete syntax documented in [`Reference`]:
217///
218/// ```
219/// # use hyperactor::id;
220/// # use hyperactor::reference::WorldId;
221/// # use hyperactor::reference::ProcId;
222/// # use hyperactor::reference::ActorId;
223/// # use hyperactor::reference::GangId;
224/// assert_eq!(id!(hello), WorldId("hello".into()));
225/// assert_eq!(id!(hello[0]), ProcId::Ranked(WorldId("hello".into()), 0));
226/// assert_eq!(
227///     id!(hello[0].actor),
228///     ActorId(
229///         ProcId::Ranked(WorldId("hello".into()), 0),
230///         "actor".into(),
231///         0
232///     )
233/// );
234/// assert_eq!(
235///     id!(hello[0].actor[1]),
236///     ActorId(
237///         ProcId::Ranked(WorldId("hello".into()), 0),
238///         "actor".into(),
239///         1
240///     )
241/// );
242/// assert_eq!(
243///     id!(hello.actor),
244///     GangId(WorldId("hello".into()), "actor".into())
245/// );
246/// ```
247///
248/// Prefer to use the id macro to construct identifiers in code, as it
249/// guarantees static validity, and preserves and reinforces the uniform
250/// concrete syntax of identifiers throughout.
251#[macro_export]
252macro_rules! id {
253    ($world:ident) => {
254        $crate::reference::WorldId(stringify!($world).to_string())
255    };
256    ($world:ident [$rank:expr_2021]) => {
257        $crate::reference::ProcId::Ranked(
258            $crate::reference::WorldId(stringify!($world).to_string()),
259            $rank,
260        )
261    };
262    ($world:ident [$rank:expr_2021] . $actor:ident) => {
263        $crate::reference::ActorId(
264            $crate::reference::ProcId::Ranked(
265                $crate::reference::WorldId(stringify!($world).to_string()),
266                $rank,
267            ),
268            stringify!($actor).to_string(),
269            0,
270        )
271    };
272    ($world:ident [$rank:expr_2021] . $actor:ident [$pid:expr_2021]) => {
273        $crate::reference::ActorId(
274            $crate::reference::ProcId::Ranked(
275                $crate::reference::WorldId(stringify!($world).to_string()),
276                $rank,
277            ),
278            stringify!($actor).to_string(),
279            $pid,
280        )
281    };
282    ($world:ident . $actor:ident) => {
283        $crate::reference::GangId(
284            $crate::reference::WorldId(stringify!($world).to_string()),
285            stringify!($actor).to_string(),
286        )
287    };
288    ($world:ident [$rank:expr_2021] . $actor:ident [$pid:expr_2021] [$port:expr_2021]) => {
289        $crate::reference::PortId(
290            $crate::reference::ActorId(
291                $crate::reference::ProcId::Ranked(
292                    $crate::reference::WorldId(stringify!($world).to_string()),
293                    $rank,
294                ),
295                stringify!($actor).to_string(),
296                $pid,
297            ),
298            $port,
299        )
300    };
301}
302pub use id;
303
304/// The type of error encountered while parsing references.
305#[derive(thiserror::Error, Debug)]
306pub enum ReferenceParsingError {
307    /// The parser expected a token, but it reached the end of the token stream.
308    #[error("expected token")]
309    Empty,
310
311    /// The parser encountered an unexpected token.
312    #[error("unexpected token: {0}")]
313    Unexpected(String),
314
315    /// The parser encountered an error parsing an integer.
316    #[error(transparent)]
317    ParseInt(#[from] ParseIntError),
318
319    /// A parse error.
320    #[error("parse: {0}")]
321    Parse(#[from] ParseError),
322
323    /// The parser encountered the wrong reference type.
324    #[error("wrong reference type: expected {0}")]
325    WrongType(String),
326
327    /// An invalid channel address was encountered while parsing the reference.
328    #[error("invalid channel address {0}: {1}")]
329    InvalidChannelAddress(String, anyhow::Error),
330}
331
332impl FromStr for Reference {
333    type Err = ReferenceParsingError;
334
335    fn from_str(addr: &str) -> Result<Self, Self::Err> {
336        // First, try to parse a "new style" reference:
337        // 1) If the reference contains a comma (anywhere), it is a new style reference;
338        //    commas were not a valid lexeme in the previous reference format.
339        // 2) This is a bit ugly, but we bypass the tokenizer prior to this comma,
340        //    try to parse a channel address, and then parse the remainder.
341
342        match addr.split_once(",") {
343            Some((channel_addr, rest)) => {
344                let channel_addr = channel_addr.parse().map_err(|err| {
345                    ReferenceParsingError::InvalidChannelAddress(channel_addr.to_string(), err)
346                })?;
347
348                Ok(parse! {
349                    Lexer::new(rest);
350
351                    // channeladdr,proc_name
352                    Token::Elem(proc_name) =>
353                    Self::Proc(ProcId::Direct(channel_addr, proc_name.to_string())),
354
355                    // channeladdr,proc_name,actor_name
356                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name) =>
357                    Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), 0)),
358
359                    // channeladdr,proc_name,actor_name[rank]
360                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
361                        Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
362                        Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank)),
363                }?)
364            }
365
366            // "old style" / "ranked" reference
367            None => {
368                Ok(parse! {
369                    Lexer::new(addr);
370
371                    // world
372                    Token::Elem(world) => Self::World(WorldId(world.into())),
373
374                    // world[rank]
375                    Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
376                        Self::Proc(ProcId::Ranked(WorldId(world.into()), rank)),
377
378                    // world[rank].actor  (implied pid=0)
379                    Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
380                        Token::Dot Token::Elem(actor) =>
381                        Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), 0)),
382
383                    // world[rank].actor[pid]
384                    Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
385                        Token::Dot Token::Elem(actor)
386                        Token::LeftBracket Token::Uint(pid) Token::RightBracket =>
387                        Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid)),
388
389                    // world[rank].actor[pid][port]
390                    Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
391                        Token::Dot Token::Elem(actor)
392                        Token::LeftBracket Token::Uint(pid) Token::RightBracket
393                        Token::LeftBracket Token::Uint(index) Token::RightBracket =>
394                        Self::Port(PortId(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid), index as u64)),
395
396                    // world.actor
397                    Token::Elem(world) Token::Dot Token::Elem(actor) =>
398                        Self::Gang(GangId(WorldId(world.into()), actor.into())),
399                }?)
400            }
401        }
402    }
403}
404
405impl From<WorldId> for Reference {
406    fn from(world_id: WorldId) -> Self {
407        Self::World(world_id)
408    }
409}
410
411impl From<ProcId> for Reference {
412    fn from(proc_id: ProcId) -> Self {
413        Self::Proc(proc_id)
414    }
415}
416
417impl From<ActorId> for Reference {
418    fn from(actor_id: ActorId) -> Self {
419        Self::Actor(actor_id)
420    }
421}
422
423impl From<PortId> for Reference {
424    fn from(port_id: PortId) -> Self {
425        Self::Port(port_id)
426    }
427}
428
429impl From<GangId> for Reference {
430    fn from(gang_id: GangId) -> Self {
431        Self::Gang(gang_id)
432    }
433}
434
435/// Index is a type alias representing a value that can be used as an index
436/// into a sequence.
437pub type Index = usize;
438
439/// WorldId identifies a world within a [`crate::system::System`].
440/// The index of a World uniquely identifies it within a system instance.
441#[derive(
442    Debug,
443    Serialize,
444    Deserialize,
445    Clone,
446    PartialEq,
447    Eq,
448    PartialOrd,
449    Hash,
450    Ord,
451    Named
452)]
453pub struct WorldId(pub String);
454
455impl WorldId {
456    /// Create a proc ID with the provided index in this world.
457    pub fn proc_id(&self, index: Index) -> ProcId {
458        ProcId::Ranked(self.clone(), index)
459    }
460
461    /// The world index.
462    pub fn name(&self) -> &str {
463        &self.0
464    }
465
466    /// Return a randomly selected user proc in this world.
467    pub fn random_user_proc(&self) -> ProcId {
468        let mask = 1usize << (std::mem::size_of::<usize>() * 8 - 1);
469        ProcId::Ranked(self.clone(), rand::thread_rng().r#gen::<usize>() | mask)
470    }
471}
472
473impl fmt::Display for WorldId {
474    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475        let WorldId(name) = self;
476        write!(f, "{}", name)
477    }
478}
479
480impl FromStr for WorldId {
481    type Err = ReferenceParsingError;
482
483    fn from_str(addr: &str) -> Result<Self, Self::Err> {
484        match addr.parse()? {
485            Reference::World(world_id) => Ok(world_id),
486            _ => Err(ReferenceParsingError::WrongType("world".into())),
487        }
488    }
489}
490
491/// Procs are identified by their _rank_ within a world or by a direct channel address.
492/// Each proc represents an actor runtime that can locally route to all of its
493/// constituent actors.
494///
495/// Ranks >= 1usize << (no. bits in usize - 1) (i.e., with the high bit set) are "user"
496/// ranks. These are reserved for randomly generated identifiers not
497/// assigned by the system.
498#[derive(
499    Debug,
500    Serialize,
501    Deserialize,
502    Clone,
503    PartialEq,
504    Eq,
505    PartialOrd,
506    Hash,
507    Ord,
508    Named,
509    EnumAsInner
510)]
511pub enum ProcId {
512    /// A ranked proc within a world
513    Ranked(WorldId, Index),
514    /// A proc reachable via a direct channel address, and local name.
515    Direct(ChannelAddr, String),
516}
517
518impl ProcId {
519    /// Create an actor ID with the provided name, pid within this proc.
520    pub fn actor_id(&self, name: impl Into<String>, pid: Index) -> ActorId {
521        ActorId(self.clone(), name.into(), pid)
522    }
523
524    /// The proc's world id, if this is a ranked proc.
525    pub fn world_id(&self) -> Option<&WorldId> {
526        match self {
527            ProcId::Ranked(world_id, _) => Some(world_id),
528            ProcId::Direct(_, _) => None,
529        }
530    }
531
532    /// The world name, if this is a ranked proc.
533    pub fn world_name(&self) -> Option<&str> {
534        self.world_id().map(|world_id| world_id.name())
535    }
536
537    /// The proc's rank, if this is a ranked proc.
538    pub fn rank(&self) -> Option<Index> {
539        match self {
540            ProcId::Ranked(_, rank) => Some(*rank),
541            ProcId::Direct(_, _) => None,
542        }
543    }
544}
545
546impl fmt::Display for ProcId {
547    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548        match self {
549            ProcId::Ranked(world_id, rank) => write!(f, "{}[{}]", world_id, rank),
550            ProcId::Direct(addr, name) => write!(f, "{},{}", addr, name),
551        }
552    }
553}
554
555impl FromStr for ProcId {
556    type Err = ReferenceParsingError;
557
558    fn from_str(addr: &str) -> Result<Self, Self::Err> {
559        match addr.parse()? {
560            Reference::Proc(proc_id) => Ok(proc_id),
561            _ => Err(ReferenceParsingError::WrongType("proc".into())),
562        }
563    }
564}
565
566/// Actors are identified by their proc, their name, and pid.
567#[derive(
568    Debug,
569    Serialize,
570    Deserialize,
571    Clone,
572    PartialEq,
573    Eq,
574    PartialOrd,
575    Hash,
576    Ord,
577    Named
578)]
579pub struct ActorId(pub ProcId, pub String, pub Index);
580
581impl ActorId {
582    /// Create a new port ID with the provided port for this actor.
583    pub fn port_id(&self, port: u64) -> PortId {
584        PortId(self.clone(), port)
585    }
586
587    /// Create a child actor ID with the provided PID.
588    pub fn child_id(&self, pid: Index) -> Self {
589        Self(self.0.clone(), self.1.clone(), pid)
590    }
591
592    /// Return the root actor ID for the provided proc and name.
593    pub fn root(proc_id: ProcId, name: String) -> Self {
594        Self(proc_id, name, 0)
595    }
596
597    /// The proc ID of this actor ID.
598    pub fn proc_id(&self) -> &ProcId {
599        &self.0
600    }
601
602    /// The world name. Panics if this is a direct proc.
603    pub fn world_name(&self) -> &str {
604        self.0
605            .world_name()
606            .expect("world_name() called on direct proc")
607    }
608
609    /// The actor's proc's rank. Panics if this is a direct proc.
610    pub fn rank(&self) -> Index {
611        self.0.rank().expect("rank() called on direct proc")
612    }
613
614    /// The actor's name.
615    pub fn name(&self) -> &str {
616        &self.1
617    }
618
619    /// The actor's pid.
620    pub fn pid(&self) -> Index {
621        self.2
622    }
623}
624
625impl fmt::Display for ActorId {
626    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627        let ActorId(proc_id, name, pid) = self;
628        write!(f, "{}.{}[{}]", proc_id, name, pid)
629    }
630}
631impl<A: RemoteActor> From<ActorRef<A>> for ActorId {
632    fn from(actor_ref: ActorRef<A>) -> Self {
633        actor_ref.actor_id.clone()
634    }
635}
636
637impl<'a, A: RemoteActor> From<&'a ActorRef<A>> for &'a ActorId {
638    fn from(actor_ref: &'a ActorRef<A>) -> Self {
639        &actor_ref.actor_id
640    }
641}
642
643impl FromStr for ActorId {
644    type Err = ReferenceParsingError;
645
646    fn from_str(addr: &str) -> Result<Self, Self::Err> {
647        match addr.parse()? {
648            Reference::Actor(actor_id) => Ok(actor_id),
649            _ => Err(ReferenceParsingError::WrongType("actor".into())),
650        }
651    }
652}
653
654/// ActorRefs are typed references to actors.
655#[derive(Debug, Serialize, Deserialize, Named)]
656pub struct ActorRef<A: RemoteActor> {
657    pub(crate) actor_id: ActorId,
658    phantom: PhantomData<A>,
659}
660
661impl<A: RemoteActor> ActorRef<A> {
662    /// Get the remote port for message type [`M`] for the referenced actor.
663    pub fn port<M: RemoteMessage>(&self) -> PortRef<M>
664    where
665        A: RemoteHandles<M>,
666    {
667        PortRef::attest(self.actor_id.port_id(<M as Named>::port()))
668    }
669
670    /// Send an [`M`]-typed message to the referenced actor.
671    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
672    pub fn send<M: RemoteMessage>(
673        &self,
674        cap: &impl cap::CanSend,
675        message: M,
676    ) -> Result<(), MailboxSenderError>
677    where
678        A: RemoteHandles<M>,
679    {
680        self.port().send(cap, message)
681    }
682
683    /// Send an [`M`]-typed message to the referenced actor, with additional context provided by
684    /// headers.
685    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
686    pub fn send_with_headers<M: RemoteMessage>(
687        &self,
688        cap: &impl cap::CanSend,
689        headers: Attrs,
690        message: M,
691    ) -> Result<(), MailboxSenderError>
692    where
693        A: RemoteHandles<M>,
694    {
695        self.port().send_with_headers(cap, headers, message)
696    }
697
698    /// The caller guarantees that the provided actor ID is also a valid,
699    /// typed reference.  This is usually invoked to provide a guarantee
700    /// that an externally-provided actor ID (e.g., through a command
701    /// line argument) is a valid reference.
702    pub fn attest(actor_id: ActorId) -> Self {
703        Self {
704            actor_id,
705            phantom: PhantomData,
706        }
707    }
708
709    /// The actor ID corresponding with this reference.
710    pub fn actor_id(&self) -> &ActorId {
711        &self.actor_id
712    }
713
714    /// Convert this actor reference into its corresponding actor ID.
715    pub fn into_actor_id(self) -> ActorId {
716        self.actor_id
717    }
718
719    /// Attempt to downcast this reference into a (local) actor handle.
720    /// This will only succeed when the referenced actor is in the same
721    /// proc as the caller.
722    pub fn downcast_handle(&self, cap: &impl cap::CanResolveActorRef) -> Option<ActorHandle<A>>
723    where
724        A: Actor,
725    {
726        cap.resolve_actor_ref(self)
727    }
728}
729
730impl<A: RemoteActor> fmt::Display for ActorRef<A> {
731    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
732        fmt::Display::fmt(&self.actor_id, f)?;
733        write!(f, "<{}>", std::any::type_name::<A>())
734    }
735}
736
737// We implement Clone manually to avoid imposing A: Clone.
738impl<A: RemoteActor> Clone for ActorRef<A> {
739    fn clone(&self) -> Self {
740        Self {
741            actor_id: self.actor_id.clone(),
742            phantom: PhantomData,
743        }
744    }
745}
746
747impl<A: RemoteActor> PartialEq for ActorRef<A> {
748    fn eq(&self, other: &Self) -> bool {
749        self.actor_id == other.actor_id
750    }
751}
752
753impl<A: RemoteActor> Eq for ActorRef<A> {}
754
755impl<A: RemoteActor> PartialOrd for ActorRef<A> {
756    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
757        Some(self.cmp(other))
758    }
759}
760
761impl<A: RemoteActor> Ord for ActorRef<A> {
762    fn cmp(&self, other: &Self) -> Ordering {
763        self.actor_id.cmp(&other.actor_id)
764    }
765}
766
767impl<A: RemoteActor> Hash for ActorRef<A> {
768    fn hash<H: Hasher>(&self, state: &mut H) {
769        self.actor_id.hash(state);
770    }
771}
772
773/// Port ids identify [`crate::mailbox::Port`]s of an actor.
774///
775/// TODO: consider moving [`crate::mailbox::Port`] to `PortRef` in this
776/// module for consistency with actors,
777#[derive(
778    Debug,
779    Serialize,
780    Deserialize,
781    Clone,
782    PartialEq,
783    Eq,
784    PartialOrd,
785    Hash,
786    Ord,
787    Named
788)]
789pub struct PortId(pub ActorId, pub u64);
790
791impl PortId {
792    /// The ID of the port's owning actor.
793    pub fn actor_id(&self) -> &ActorId {
794        &self.0
795    }
796
797    /// Convert this port ID into an actor ID.
798    pub fn into_actor_id(self) -> ActorId {
799        self.0
800    }
801
802    /// This port's index.
803    pub fn index(&self) -> u64 {
804        self.1
805    }
806
807    /// Send a serialized message to this port, provided a sending capability,
808    /// such as [`crate::actor::Instance`]. It is the sender's responsibility
809    /// to ensure that the provided message is well-typed.
810    pub fn send(&self, caps: &impl cap::CanSend, serialized: &Serialized) {
811        caps.post(self.clone(), Attrs::new(), serialized.clone());
812    }
813
814    /// Send a serialized message to this port, provided a sending capability,
815    /// such as [`crate::actor::Instance`], with additional context provided by headers.
816    /// It is the sender's responsibility to ensure that the provided message is well-typed.
817    pub fn send_with_headers(
818        &self,
819        caps: &impl cap::CanSend,
820        serialized: &Serialized,
821        headers: Attrs,
822    ) {
823        caps.post(self.clone(), headers, serialized.clone());
824    }
825
826    /// Split this port, returning a new port that relays messages to the port
827    /// through a local proxy, which may coalesce messages.
828    pub fn split(
829        &self,
830        caps: &impl cap::CanSplitPort,
831        reducer_spec: Option<ReducerSpec>,
832    ) -> anyhow::Result<PortId> {
833        caps.split(self.clone(), reducer_spec)
834    }
835}
836
837impl FromStr for PortId {
838    type Err = ReferenceParsingError;
839
840    fn from_str(addr: &str) -> Result<Self, Self::Err> {
841        match addr.parse()? {
842            Reference::Port(port_id) => Ok(port_id),
843            _ => Err(ReferenceParsingError::WrongType("port".into())),
844        }
845    }
846}
847
848impl fmt::Display for PortId {
849    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
850        let PortId(actor_id, port) = self;
851        write!(f, "{}[{}]", actor_id, port)
852    }
853}
854
855/// A reference to a remote port. All messages passed through
856/// PortRefs will be serialized.
857#[derive(Debug, Serialize, Deserialize, Derivative, Named)]
858#[derivative(PartialEq, Eq, PartialOrd, Hash, Ord)]
859pub struct PortRef<M: RemoteMessage> {
860    port_id: PortId,
861    #[derivative(
862        PartialEq = "ignore",
863        PartialOrd = "ignore",
864        Ord = "ignore",
865        Hash = "ignore"
866    )]
867    reducer_spec: Option<ReducerSpec>,
868    phantom: PhantomData<M>,
869}
870
871impl<M: RemoteMessage> PortRef<M> {
872    /// The caller attests that the provided PortId can be
873    /// converted to a reachable, typed port reference.
874    pub fn attest(port_id: PortId) -> Self {
875        Self {
876            port_id,
877            reducer_spec: None,
878            phantom: PhantomData,
879        }
880    }
881
882    /// The caller attests that the provided PortId can be
883    /// converted to a reachable, typed port reference.
884    pub fn attest_reducible(port_id: PortId, reducer_spec: Option<ReducerSpec>) -> Self {
885        Self {
886            port_id,
887            reducer_spec,
888            phantom: PhantomData,
889        }
890    }
891
892    /// The caller attests that the provided PortId can be
893    /// converted to a reachable, typed port reference.
894    pub fn attest_message_port(actor: &ActorId) -> Self {
895        PortRef::<M>::attest(actor.port_id(<M as Named>::port()))
896    }
897
898    /// The typehash of this port's reducer, if any. Reducers
899    /// may be used to coalesce messages sent to a port.
900    pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
901        &self.reducer_spec
902    }
903
904    /// This port's ID.
905    pub fn port_id(&self) -> &PortId {
906        &self.port_id
907    }
908
909    /// Convert this PortRef into its corresponding port id.
910    pub fn into_port_id(self) -> PortId {
911        self.port_id
912    }
913
914    /// coerce it into OncePortRef so we can send messages to this port from
915    /// APIs requires OncePortRef.
916    pub fn into_once(self) -> OncePortRef<M> {
917        OncePortRef::attest(self.into_port_id())
918    }
919
920    /// Send a message to this port, provided a sending capability, such as
921    /// [`crate::actor::Instance`].
922    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
923    pub fn send(&self, caps: &impl cap::CanSend, message: M) -> Result<(), MailboxSenderError> {
924        self.send_with_headers(caps, Attrs::new(), message)
925    }
926
927    /// Send a message to this port, provided a sending capability, such as
928    /// [`crate::actor::Instance`]. Additional context can be provided in the form of
929    /// headers.
930    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
931    pub fn send_with_headers(
932        &self,
933        caps: &impl cap::CanSend,
934        headers: Attrs,
935        message: M,
936    ) -> Result<(), MailboxSenderError> {
937        let serialized = Serialized::serialize(&message).map_err(|err| {
938            MailboxSenderError::new_bound(
939                self.port_id.clone(),
940                MailboxSenderErrorKind::Serialize(err.into()),
941            )
942        })?;
943        self.send_serialized(caps, serialized, headers);
944        Ok(())
945    }
946
947    /// Send a serialized message to this port, provided a sending capability, such as
948    /// [`crate::actor::Instance`].
949    pub fn send_serialized(&self, caps: &impl cap::CanSend, message: Serialized, headers: Attrs) {
950        caps.post(self.port_id.clone(), headers, message);
951    }
952
953    /// Convert this port into a sink that can be used to send messages using the given capability.
954    pub fn into_sink<C: cap::CanSend>(self, caps: C) -> PortSink<C, M> {
955        PortSink::new(caps, self)
956    }
957}
958
959impl<M: RemoteMessage> Clone for PortRef<M> {
960    fn clone(&self) -> Self {
961        Self {
962            port_id: self.port_id.clone(),
963            reducer_spec: self.reducer_spec.clone(),
964            phantom: PhantomData,
965        }
966    }
967}
968
969impl<M: RemoteMessage> fmt::Display for PortRef<M> {
970    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
971        fmt::Display::fmt(&self.port_id, f)
972    }
973}
974
975/// The parameters extracted from [`PortRef`] to [`Bindings`].
976#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
977pub struct UnboundPort(pub PortId, pub Option<ReducerSpec>);
978
979impl UnboundPort {
980    /// Update the port id of this binding.
981    pub fn update(&mut self, port_id: PortId) {
982        self.0 = port_id;
983    }
984}
985
986impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
987    fn from(port_ref: &PortRef<M>) -> Self {
988        UnboundPort(port_ref.port_id.clone(), port_ref.reducer_spec.clone())
989    }
990}
991
992impl<M: RemoteMessage> Unbind for PortRef<M> {
993    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
994        bindings.push_back(&UnboundPort::from(self))
995    }
996}
997
998impl<M: RemoteMessage> Bind for PortRef<M> {
999    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
1000        let bound = bindings.try_pop_front::<UnboundPort>()?;
1001        self.port_id = bound.0;
1002        self.reducer_spec = bound.1;
1003        Ok(())
1004    }
1005}
1006
1007/// A remote reference to a [`OncePort`]. References are serializable
1008/// and may be passed to remote actors, which can then use it to send
1009/// a message to this port.
1010#[derive(Debug, Serialize, Deserialize, PartialEq)]
1011pub struct OncePortRef<M: RemoteMessage> {
1012    port_id: PortId,
1013    phantom: PhantomData<M>,
1014}
1015
1016impl<M: RemoteMessage> OncePortRef<M> {
1017    pub(crate) fn attest(port_id: PortId) -> Self {
1018        Self {
1019            port_id,
1020            phantom: PhantomData,
1021        }
1022    }
1023
1024    /// This port's ID.
1025    pub fn port_id(&self) -> &PortId {
1026        &self.port_id
1027    }
1028
1029    /// Convert this PortRef into its corresponding port id.
1030    pub fn into_port_id(self) -> PortId {
1031        self.port_id
1032    }
1033
1034    /// Send a message to this port, provided a sending capability, such as
1035    /// [`crate::actor::Instance`].
1036    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
1037    pub fn send(self, caps: &impl cap::CanSend, message: M) -> Result<(), MailboxSenderError> {
1038        self.send_with_headers(caps, Attrs::new(), message)
1039    }
1040
1041    /// Send a message to this port, provided a sending capability, such as
1042    /// [`crate::actor::Instance`]. Additional context can be provided in the form of headers.
1043    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
1044    pub fn send_with_headers(
1045        self,
1046        caps: &impl cap::CanSend,
1047        headers: Attrs,
1048        message: M,
1049    ) -> Result<(), MailboxSenderError> {
1050        let serialized = Serialized::serialize(&message).map_err(|err| {
1051            MailboxSenderError::new_bound(
1052                self.port_id.clone(),
1053                MailboxSenderErrorKind::Serialize(err.into()),
1054            )
1055        })?;
1056        caps.post(self.port_id.clone(), headers, serialized);
1057        Ok(())
1058    }
1059}
1060
1061impl<M: RemoteMessage> Clone for OncePortRef<M> {
1062    fn clone(&self) -> Self {
1063        Self {
1064            port_id: self.port_id.clone(),
1065            phantom: PhantomData,
1066        }
1067    }
1068}
1069
1070impl<M: RemoteMessage> fmt::Display for OncePortRef<M> {
1071    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1072        fmt::Display::fmt(&self.port_id, f)
1073    }
1074}
1075
1076impl<M: RemoteMessage> Named for OncePortRef<M> {
1077    fn typename() -> &'static str {
1078        crate::data::intern_typename!(Self, "hyperactor::mailbox::OncePortRef<{}>", M)
1079    }
1080}
1081
1082// We do not split PortRef, because it can only receive a single response, and
1083// there is no meaningful performance gain to make that response going through
1084// comm actors.
1085impl<M: RemoteMessage> Unbind for OncePortRef<M> {
1086    fn unbind(&self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1087        Ok(())
1088    }
1089}
1090
1091impl<M: RemoteMessage> Bind for OncePortRef<M> {
1092    fn bind(&mut self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1093        Ok(())
1094    }
1095}
1096
1097/// Gangs identify a gang of actors across the world.
1098#[derive(
1099    Debug,
1100    Serialize,
1101    Deserialize,
1102    Clone,
1103    PartialEq,
1104    Eq,
1105    PartialOrd,
1106    Hash,
1107    Ord,
1108    Named
1109)]
1110pub struct GangId(pub WorldId, pub String);
1111
1112impl GangId {
1113    pub(crate) fn expand(&self, world_size: usize) -> impl Iterator<Item = ActorId> + '_ {
1114        (0..world_size).map(|rank| ActorId(ProcId::Ranked(self.0.clone(), rank), self.1.clone(), 0))
1115    }
1116
1117    /// The world id of the gang.
1118    pub fn world_id(&self) -> &WorldId {
1119        &self.0
1120    }
1121
1122    /// The name of the gang.
1123    pub fn name(&self) -> &str {
1124        &self.1
1125    }
1126
1127    /// The gang's actor ID for the provided rank. It always returns the root
1128    /// actor because the root actor is the public interface of a gang.
1129    pub fn actor_id(&self, rank: Index) -> ActorId {
1130        ActorId(
1131            ProcId::Ranked(self.world_id().clone(), rank),
1132            self.name().to_string(),
1133            0,
1134        )
1135    }
1136}
1137
1138impl fmt::Display for GangId {
1139    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1140        let GangId(world_id, name) = self;
1141        write!(f, "{}.{}", world_id, name)
1142    }
1143}
1144
1145impl FromStr for GangId {
1146    type Err = ReferenceParsingError;
1147
1148    fn from_str(addr: &str) -> Result<Self, Self::Err> {
1149        match addr.parse()? {
1150            Reference::Gang(gang_id) => Ok(gang_id),
1151            _ => Err(ReferenceParsingError::WrongType("gang".into())),
1152        }
1153    }
1154}
1155
1156/// Chop implements a simple lexer on a fixed set of delimiters.
1157fn chop<'a>(mut s: &'a str, delims: &'a [&'a str]) -> impl Iterator<Item = &'a str> + 'a {
1158    std::iter::from_fn(move || {
1159        if s.is_empty() {
1160            return None;
1161        }
1162
1163        match delims
1164            .iter()
1165            .enumerate()
1166            .flat_map(|(index, d)| s.find(d).map(|pos| (index, pos)))
1167            .min_by_key(|&(_, v)| v)
1168        {
1169            Some((index, 0)) => {
1170                let delim = delims[index];
1171                s = &s[delim.len()..];
1172                Some(delim)
1173            }
1174            Some((_, pos)) => {
1175                let token = &s[..pos];
1176                s = &s[pos..];
1177                Some(token.trim())
1178            }
1179            None => {
1180                let token = s;
1181                s = "";
1182                Some(token.trim())
1183            }
1184        }
1185    })
1186}
1187
1188/// GangRefs are typed references to gangs.
1189#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Hash, Ord)]
1190pub struct GangRef<A: RemoteActor> {
1191    gang_id: GangId,
1192    phantom: PhantomData<A>,
1193}
1194
1195impl<A: RemoteActor> GangRef<A> {
1196    /// Return an ActorRef corresponding with the provided rank in
1197    /// this gang.  Does not check the validity of the rank, so the
1198    /// returned identifier is not guaranteed to refer to a valid rank.
1199    pub fn rank(&self, rank: Index) -> ActorRef<A> {
1200        let GangRef {
1201            gang_id: GangId(world_id, name),
1202            ..
1203        } = self;
1204        ActorRef::attest(ActorId(
1205            ProcId::Ranked(world_id.clone(), rank),
1206            name.clone(),
1207            0,
1208        ))
1209    }
1210
1211    /// Return the gang ID.
1212    pub fn gang_id(&self) -> &GangId {
1213        &self.gang_id
1214    }
1215}
1216
1217impl<A: RemoteActor> Clone for GangRef<A> {
1218    fn clone(&self) -> Self {
1219        Self {
1220            gang_id: self.gang_id.clone(),
1221            phantom: PhantomData,
1222        }
1223    }
1224}
1225
1226// TODO: remove, replace with attest
1227impl<A: RemoteActor> From<GangId> for GangRef<A> {
1228    fn from(gang_id: GangId) -> Self {
1229        Self {
1230            gang_id,
1231            phantom: PhantomData,
1232        }
1233    }
1234}
1235
1236impl<A: RemoteActor> From<GangRef<A>> for GangId {
1237    fn from(gang_ref: GangRef<A>) -> Self {
1238        gang_ref.gang_id
1239    }
1240}
1241
1242impl<'a, A: RemoteActor> From<&'a GangRef<A>> for &'a GangId {
1243    fn from(gang_ref: &'a GangRef<A>) -> Self {
1244        &gang_ref.gang_id
1245    }
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250    use rand::seq::SliceRandom;
1251    use rand::thread_rng;
1252
1253    use super::*;
1254
1255    #[test]
1256    fn test_reference_parse() {
1257        let cases: Vec<(&str, Reference)> = vec![
1258            ("test", WorldId("test".into()).into()),
1259            (
1260                "test[234]",
1261                ProcId::Ranked(WorldId("test".into()), 234).into(),
1262            ),
1263            (
1264                "test[234].testactor[6]",
1265                ActorId(
1266                    ProcId::Ranked(WorldId("test".into()), 234),
1267                    "testactor".into(),
1268                    6,
1269                )
1270                .into(),
1271            ),
1272            (
1273                "test[234].testactor[6][1]",
1274                PortId(
1275                    ActorId(
1276                        ProcId::Ranked(WorldId("test".into()), 234),
1277                        "testactor".into(),
1278                        6,
1279                    ),
1280                    1,
1281                )
1282                .into(),
1283            ),
1284            (
1285                "test.testactor",
1286                GangId(WorldId("test".into()), "testactor".into()).into(),
1287            ),
1288            (
1289                "tcp:[::1]:1234,test,testactor[123]",
1290                ActorId(
1291                    ProcId::Direct("tcp:[::1]:1234".parse().unwrap(), "test".to_string()),
1292                    "testactor".to_string(),
1293                    123,
1294                )
1295                .into(),
1296            ),
1297        ];
1298
1299        for (s, expected) in cases {
1300            let got: Reference = s.parse().unwrap();
1301            assert_eq!(got, expected);
1302        }
1303    }
1304
1305    #[test]
1306    fn test_reference_parse_error() {
1307        let cases: Vec<&str> = vec!["(blah)", "world(1, 2, 3)"];
1308
1309        for s in cases {
1310            let result: Result<Reference, ReferenceParsingError> = s.parse();
1311            assert!(result.is_err());
1312        }
1313    }
1314
1315    #[test]
1316    fn test_id_macro() {
1317        assert_eq!(id!(hello), WorldId("hello".into()));
1318        assert_eq!(id!(hello[0]), ProcId::Ranked(WorldId("hello".into()), 0));
1319        assert_eq!(
1320            id!(hello[0].actor),
1321            ActorId(
1322                ProcId::Ranked(WorldId("hello".into()), 0),
1323                "actor".into(),
1324                0
1325            )
1326        );
1327        assert_eq!(
1328            id!(hello[0].actor[1]),
1329            ActorId(
1330                ProcId::Ranked(WorldId("hello".into()), 0),
1331                "actor".into(),
1332                1
1333            )
1334        );
1335        assert_eq!(
1336            id!(hello.actor),
1337            GangId(WorldId("hello".into()), "actor".into())
1338        );
1339    }
1340
1341    #[test]
1342    fn test_reference_ord() {
1343        let expected: Vec<Reference> = [
1344            "first",
1345            "second",
1346            "second.actor1",
1347            "second.actor2",
1348            "second[1]",
1349            "second[1].actor1",
1350            "second[1].actor2",
1351            "second[2]",
1352            "second[2].actor100",
1353            "third",
1354            "third.actor",
1355            "third[2]",
1356            "third[2].actor",
1357            "third[2].actor[1]",
1358        ]
1359        .into_iter()
1360        .map(|s| s.parse().unwrap())
1361        .collect();
1362
1363        let mut sorted = expected.to_vec();
1364        sorted.shuffle(&mut thread_rng());
1365        sorted.sort();
1366
1367        assert_eq!(sorted, expected);
1368    }
1369}