hyperactor/
reference.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! References for different resources in Hyperactor.
10//!
11//! The "Id" variants are transparent and typeless, whereas the
12//! corresponding "Ref" variants are opaque and typed. The latter intended
13//! to be exposed in user-facing APIs. We provide [`std::convert::From`]
14//! implementations between Id and Refs where this makes sense.
15//!
16//! All system implementations use the same concrete reference
17//! representations, as their specific layout (e.g., actor index, rank,
18//! etc.) are used by the core communications algorithms throughout.
19//!
20//! References and ids are [`crate::Message`]s to facilitate passing
21//! them between actors.
22
23#![allow(dead_code)] // Allow until this is used outside of tests.
24
25use std::cmp::Ord;
26use std::cmp::Ordering;
27use std::cmp::PartialOrd;
28use std::convert::From;
29use std::fmt;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::marker::PhantomData;
33use std::num::ParseIntError;
34use std::str::FromStr;
35
36use derivative::Derivative;
37use enum_as_inner::EnumAsInner;
38use hyperactor_config::Flattrs;
39use serde::Deserialize;
40use serde::Deserializer;
41use serde::Serialize;
42use serde::Serializer;
43use typeuri::ACTOR_PORT_BIT;
44use typeuri::Named;
45use wirevalue::TypeInfo;
46
47use crate::Actor;
48use crate::ActorHandle;
49use crate::RemoteHandles;
50use crate::RemoteMessage;
51use crate::accum::ReducerMode;
52use crate::accum::ReducerSpec;
53use crate::accum::StreamingReducerOpts;
54use crate::actor::Referable;
55use crate::channel::ChannelAddr;
56use crate::context;
57use crate::context::MailboxExt;
58use crate::mailbox::MailboxSenderError;
59use crate::mailbox::MailboxSenderErrorKind;
60use crate::mailbox::PortSink;
61use crate::message::Bind;
62use crate::message::Bindings;
63use crate::message::Unbind;
64
65pub mod lex;
66pub mod name;
67mod parse;
68
69use parse::Lexer;
70use parse::ParseError;
71use parse::Token;
72pub use parse::is_valid_ident;
73use parse::parse;
74
75/// The kinds of references.
76#[derive(strum::Display)]
77pub enum ReferenceKind {
78    /// Proc references.
79    Proc,
80    /// Actor references.
81    Actor,
82    /// Port references.
83    Port,
84}
85
86/// A universal reference to hierarchical identifiers in Hyperactor.
87///
88/// References implement a concrete syntax which can be parsed via
89/// [`FromStr`]. They are of the form:
90///
91/// - `addr,proc_name`,
92/// - `addr,proc_name,actor_name[pid]`,
93/// - `addr,proc_name,actor_name[pid][port]`
94///
95/// Reference also implements a total ordering, so that references are
96/// ordered lexicographically with the hierarchy implied by proc,
97/// actor. This allows reference ordering to be used to implement prefix
98/// based routing.
99#[derive(
100    Debug,
101    Serialize,
102    Deserialize,
103    Clone,
104    PartialEq,
105    Eq,
106    Hash,
107    typeuri::Named,
108    EnumAsInner
109)]
110pub enum Reference {
111    /// A reference to a proc.
112    Proc(ProcId),
113    /// A reference to an actor.
114    Actor(ActorId), // todo: should we only allow name references here?
115    /// A reference to a port.
116    Port(PortId),
117}
118
119impl Reference {
120    /// Tells whether this reference is a prefix of the provided reference.
121    pub fn is_prefix_of(&self, other: &Reference) -> bool {
122        match self {
123            Self::Proc(_) => self.proc_id() == other.proc_id(),
124            Self::Actor(_) => self == other,
125            Self::Port(_) => self == other,
126        }
127    }
128
129    /// The proc id of the reference, if any.
130    pub fn proc_id(&self) -> Option<&ProcId> {
131        match self {
132            Self::Proc(proc_id) => Some(proc_id),
133            Self::Actor(actor_id) => Some(actor_id.proc_id()),
134            Self::Port(port_id) => Some(port_id.actor_id().proc_id()),
135        }
136    }
137
138    /// The actor id of the reference, if any.
139    pub fn actor_id(&self) -> Option<&ActorId> {
140        match self {
141            Self::Proc(_) => None,
142            Self::Actor(actor_id) => Some(actor_id),
143            Self::Port(port_id) => Some(port_id.actor_id()),
144        }
145    }
146
147    /// The actor name of the reference, if any.
148    fn actor_name(&self) -> Option<&str> {
149        match self {
150            Self::Proc(_) => None,
151            Self::Actor(actor_id) => Some(actor_id.name()),
152            Self::Port(port_id) => Some(port_id.actor_id().name()),
153        }
154    }
155
156    /// The pid of the reference, if any.
157    fn pid(&self) -> Option<Index> {
158        match self {
159            Self::Proc(_) => None,
160            Self::Actor(actor_id) => Some(actor_id.pid()),
161            Self::Port(port_id) => Some(port_id.actor_id().pid()),
162        }
163    }
164
165    /// The port of the reference, if any.
166    fn port(&self) -> Option<u64> {
167        match self {
168            Self::Proc(_) => None,
169            Self::Actor(_) => None,
170            Self::Port(port_id) => Some(port_id.index()),
171        }
172    }
173
174    /// Returns the kind of the reference.
175    pub fn kind(&self) -> ReferenceKind {
176        match self {
177            Self::Proc(_) => ReferenceKind::Proc,
178            Self::Actor(_) => ReferenceKind::Actor,
179            Self::Port(_) => ReferenceKind::Port,
180        }
181    }
182}
183
184impl PartialOrd for Reference {
185    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
186        Some(self.cmp(other))
187    }
188}
189
190impl Ord for Reference {
191    fn cmp(&self, other: &Self) -> Ordering {
192        // Order by: proc address/name, then actor_name, then pid, then port
193        (self.proc_id(), self.actor_name(), self.pid(), self.port()).cmp(&(
194            other.proc_id(),
195            other.actor_name(),
196            other.pid(),
197            other.port(),
198        ))
199    }
200}
201
202impl fmt::Display for Reference {
203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204        match self {
205            Self::Proc(proc_id) => fmt::Display::fmt(proc_id, f),
206            Self::Actor(actor_id) => fmt::Display::fmt(actor_id, f),
207            Self::Port(port_id) => fmt::Display::fmt(port_id, f),
208        }
209    }
210}
211
212/// The type of error encountered while parsing references.
213#[derive(thiserror::Error, Debug)]
214pub enum ReferenceParsingError {
215    /// The parser expected a token, but it reached the end of the token stream.
216    #[error("expected token")]
217    Empty,
218
219    /// The parser encountered an unexpected token.
220    #[error("unexpected token: {0}")]
221    Unexpected(String),
222
223    /// The parser encountered an error parsing an integer.
224    #[error(transparent)]
225    ParseInt(#[from] ParseIntError),
226
227    /// A parse error.
228    #[error("parse: {0}")]
229    Parse(#[from] ParseError),
230
231    /// The parser encountered the wrong reference type.
232    #[error("wrong reference type: expected {0}")]
233    WrongType(String),
234
235    /// An invalid channel address was encountered while parsing the reference.
236    #[error("invalid channel address {0}: {1}")]
237    InvalidChannelAddress(String, anyhow::Error),
238}
239
240impl FromStr for Reference {
241    type Err = ReferenceParsingError;
242
243    fn from_str(addr: &str) -> Result<Self, Self::Err> {
244        // References are parsed in the "direct" format:
245        // The reference must contain a comma (anywhere), indicating a proc/actor/port reference.
246
247        match addr.split_once(",") {
248            Some((channel_addr, rest)) => {
249                let channel_addr = channel_addr.parse().map_err(|err| {
250                    ReferenceParsingError::InvalidChannelAddress(channel_addr.to_string(), err)
251                })?;
252
253                Ok(parse! {
254                    Lexer::new(rest);
255
256                    // channeladdr,proc_name
257                    Token::Elem(proc_name) =>
258                    Self::Proc(ProcId::with_name(channel_addr, proc_name)),
259
260                    // channeladdr,proc_name,actor_name
261                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name) =>
262                    Self::Actor(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, 0)),
263
264                    // channeladdr,proc_name,actor_name[pid]
265                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
266                        Token::LeftBracket Token::Uint(pid) Token::RightBracket =>
267                        Self::Actor(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid)),
268
269                    // channeladdr,proc_name,actor_name[pid][port]
270                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
271                        Token::LeftBracket Token::Uint(pid) Token::RightBracket
272                        Token::LeftBracket Token::Uint(index) Token::RightBracket  =>
273                        Self::Port(PortId::new(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid), index as u64)),
274
275                    // channeladdr,proc_name,actor_name[pid][port<type>]
276                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
277                        Token::LeftBracket Token::Uint(pid) Token::RightBracket
278                        Token::LeftBracket Token::Uint(index)
279                            Token::LessThan Token::Elem(_type) Token::GreaterThan
280                        Token::RightBracket =>
281                        Self::Port(PortId::new(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid), index as u64)),
282                }?)
283            }
284
285            None => Err(ReferenceParsingError::Unexpected(format!(
286                "expected a comma-separated reference, got: {}",
287                addr
288            ))),
289        }
290    }
291}
292
293impl From<ProcId> for Reference {
294    fn from(proc_id: ProcId) -> Self {
295        Self::Proc(proc_id)
296    }
297}
298
299impl From<ActorId> for Reference {
300    fn from(actor_id: ActorId) -> Self {
301        Self::Actor(actor_id)
302    }
303}
304
305impl From<PortId> for Reference {
306    fn from(port_id: PortId) -> Self {
307        Self::Port(port_id)
308    }
309}
310
311/// Index is a type alias representing a value that can be used as an index
312/// into a sequence.
313pub type Index = usize;
314
315/// Procs are identified by a direct channel address and local name.
316/// Each proc represents an actor runtime that can locally route to all of its
317/// constituent actors.
318#[derive(
319    Debug,
320    Serialize,
321    Deserialize,
322    Clone,
323    PartialEq,
324    Eq,
325    PartialOrd,
326    Hash,
327    Ord,
328    typeuri::Named
329)]
330pub struct ProcId(ChannelAddr, String);
331
332/// Compute an 8-char hex hash suffix from a [`ChannelAddr`].
333fn addr_hash_suffix(addr: &ChannelAddr) -> String {
334    use std::collections::hash_map::DefaultHasher;
335    let mut hasher = DefaultHasher::new();
336    addr.hash(&mut hasher);
337    format!("{:08x}", hasher.finish() as u32)
338}
339
340impl ProcId {
341    /// Create a ProcId with a globally-unique name: `"{base_name}-{hash_of_addr}"`.
342    ///
343    /// Use this for well-known / fixed proc names (e.g. `"service"`, `"local"`)
344    /// that are not already unique across hosts.
345    pub fn unique(addr: ChannelAddr, base_name: impl Into<String>) -> Self {
346        let base = base_name.into();
347        let suffix = addr_hash_suffix(&addr);
348        Self(addr, format!("{}-{}", base, suffix))
349    }
350
351    /// Create a ProcId with an already-unique name (no suffix added).
352    ///
353    /// Use this for deserialization, test helpers, and names that already
354    /// contain a UUID or other uniquifying component.
355    pub fn with_name(addr: ChannelAddr, name: impl Into<String>) -> Self {
356        Self(addr, name.into())
357    }
358
359    /// The base name before the `-{hash}` suffix, if present.
360    ///
361    /// If the name ends with `-XXXXXXXX` (8 hex chars), returns the part
362    /// before that suffix. Otherwise returns the full name.
363    pub fn base_name(&self) -> &str {
364        match self.1.rsplit_once('-') {
365            Some((base, suffix))
366                if suffix.len() == 8 && suffix.chars().all(|c| c.is_ascii_hexdigit()) =>
367            {
368                base
369            }
370            _ => &self.1,
371        }
372    }
373
374    /// Create an actor ID with the provided name, pid within this proc.
375    pub fn actor_id(&self, name: impl Into<String>, pid: Index) -> ActorId {
376        ActorId(self.clone(), name.into(), pid)
377    }
378
379    /// The proc's channel address.
380    pub fn addr(&self) -> &ChannelAddr {
381        &self.0
382    }
383
384    /// The proc's name.
385    pub fn name(&self) -> &str {
386        &self.1
387    }
388}
389
390impl fmt::Display for ProcId {
391    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392        write!(f, "{},{}", self.0, self.1)
393    }
394}
395
396impl FromStr for ProcId {
397    type Err = ReferenceParsingError;
398
399    fn from_str(addr: &str) -> Result<Self, Self::Err> {
400        match addr.parse()? {
401            Reference::Proc(proc_id) => Ok(proc_id),
402            _ => Err(ReferenceParsingError::WrongType("proc".into())),
403        }
404    }
405}
406
407/// Actors are identified by their proc, their name, and pid.
408#[derive(
409    Debug,
410    Serialize,
411    Deserialize,
412    Clone,
413    PartialEq,
414    Eq,
415    PartialOrd,
416    Hash,
417    Ord,
418    typeuri::Named
419)]
420pub struct ActorId(ProcId, String, Index);
421
422hyperactor_config::impl_attrvalue!(ActorId);
423
424impl ActorId {
425    /// Create a new actor ID.
426    pub fn new(proc_id: ProcId, name: impl Into<String>, pid: Index) -> Self {
427        Self(proc_id, name.into(), pid)
428    }
429
430    /// Create a new port ID with the provided port for this actor.
431    pub fn port_id(&self, port: u64) -> PortId {
432        PortId(self.clone(), port)
433    }
434
435    /// Create a child actor ID with the provided PID.
436    pub fn child_id(&self, pid: Index) -> Self {
437        Self(self.0.clone(), self.1.clone(), pid)
438    }
439
440    /// Return the root actor ID for the provided proc and name.
441    pub fn root(proc_id: ProcId, name: String) -> Self {
442        Self(proc_id, name, 0)
443    }
444
445    /// The proc ID of this actor ID.
446    pub fn proc_id(&self) -> &ProcId {
447        &self.0
448    }
449
450    /// The actor's name.
451    pub fn name(&self) -> &str {
452        &self.1
453    }
454
455    /// The actor's pid.
456    pub fn pid(&self) -> Index {
457        self.2
458    }
459}
460
461impl fmt::Display for ActorId {
462    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463        write!(f, "{},{}[{}]", self.0, self.1, self.2)
464    }
465}
466impl<A: Referable> From<ActorRef<A>> for ActorId {
467    fn from(actor_ref: ActorRef<A>) -> Self {
468        actor_ref.actor_id.clone()
469    }
470}
471
472impl<'a, A: Referable> From<&'a ActorRef<A>> for &'a ActorId {
473    fn from(actor_ref: &'a ActorRef<A>) -> Self {
474        &actor_ref.actor_id
475    }
476}
477
478impl FromStr for ActorId {
479    type Err = ReferenceParsingError;
480
481    fn from_str(addr: &str) -> Result<Self, Self::Err> {
482        match addr.parse()? {
483            Reference::Actor(actor_id) => Ok(actor_id),
484            _ => Err(ReferenceParsingError::WrongType("actor".into())),
485        }
486    }
487}
488
489/// ActorRefs are typed references to actors.
490#[derive(typeuri::Named)]
491pub struct ActorRef<A: Referable> {
492    pub(crate) actor_id: ActorId,
493    // fn() -> A so that the struct remains Send
494    phantom: PhantomData<fn() -> A>,
495}
496
497impl<A: Referable> ActorRef<A> {
498    /// Get the remote port for message type [`M`] for the referenced actor.
499    pub fn port<M: RemoteMessage>(&self) -> PortRef<M>
500    where
501        A: RemoteHandles<M>,
502    {
503        PortRef::attest(self.actor_id.port_id(<M as Named>::port()))
504    }
505
506    /// Send an [`M`]-typed message to the referenced actor.
507    pub fn send<M: RemoteMessage>(
508        &self,
509        cx: &impl context::Actor,
510        message: M,
511    ) -> Result<(), MailboxSenderError>
512    where
513        A: RemoteHandles<M>,
514    {
515        self.port().send(cx, message)
516    }
517
518    /// Send an [`M`]-typed message to the referenced actor, with additional context provided by
519    /// headers.
520    pub fn send_with_headers<M: RemoteMessage>(
521        &self,
522        cx: &impl context::Actor,
523        headers: Flattrs,
524        message: M,
525    ) -> Result<(), MailboxSenderError>
526    where
527        A: RemoteHandles<M>,
528    {
529        self.port().send_with_headers(cx, headers, message)
530    }
531
532    /// The caller guarantees that the provided actor ID is also a valid,
533    /// typed reference.  This is usually invoked to provide a guarantee
534    /// that an externally-provided actor ID (e.g., through a command
535    /// line argument) is a valid reference.
536    pub fn attest(actor_id: ActorId) -> Self {
537        Self {
538            actor_id,
539            phantom: PhantomData,
540        }
541    }
542
543    /// The actor ID corresponding with this reference.
544    pub fn actor_id(&self) -> &ActorId {
545        &self.actor_id
546    }
547
548    /// Convert this actor reference into its corresponding actor ID.
549    pub fn into_actor_id(self) -> ActorId {
550        self.actor_id
551    }
552
553    /// Attempt to downcast this reference into a (local) actor handle.
554    /// This will only succeed when the referenced actor is in the same
555    /// proc as the caller.
556    pub fn downcast_handle(&self, cx: &impl context::Actor) -> Option<ActorHandle<A>>
557    where
558        A: Actor,
559    {
560        cx.instance().proc().resolve_actor_ref(self)
561    }
562}
563
564// Implement Serialize manually, without requiring A: Serialize
565impl<A: Referable> Serialize for ActorRef<A> {
566    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
567    where
568        S: Serializer,
569    {
570        // Serialize only the fields that don't depend on A
571        self.actor_id().serialize(serializer)
572    }
573}
574
575// Implement Deserialize manually, without requiring A: Deserialize
576impl<'de, A: Referable> Deserialize<'de> for ActorRef<A> {
577    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
578    where
579        D: Deserializer<'de>,
580    {
581        let actor_id = <ActorId>::deserialize(deserializer)?;
582        Ok(ActorRef {
583            actor_id,
584            phantom: PhantomData,
585        })
586    }
587}
588
589// Implement Debug manually, without requiring A: Debug
590impl<A: Referable> fmt::Debug for ActorRef<A> {
591    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592        f.debug_struct("ActorRef")
593            .field("actor_id", &self.actor_id)
594            .field("type", &std::any::type_name::<A>())
595            .finish()
596    }
597}
598
599impl<A: Referable> fmt::Display for ActorRef<A> {
600    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601        fmt::Display::fmt(&self.actor_id, f)?;
602        write!(f, "<{}>", std::any::type_name::<A>())
603    }
604}
605
606// We implement Clone manually to avoid imposing A: Clone.
607impl<A: Referable> Clone for ActorRef<A> {
608    fn clone(&self) -> Self {
609        Self {
610            actor_id: self.actor_id.clone(),
611            phantom: PhantomData,
612        }
613    }
614}
615
616impl<A: Referable> PartialEq for ActorRef<A> {
617    fn eq(&self, other: &Self) -> bool {
618        self.actor_id == other.actor_id
619    }
620}
621
622impl<A: Referable> Eq for ActorRef<A> {}
623
624impl<A: Referable> PartialOrd for ActorRef<A> {
625    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
626        Some(self.cmp(other))
627    }
628}
629
630impl<A: Referable> Ord for ActorRef<A> {
631    fn cmp(&self, other: &Self) -> Ordering {
632        self.actor_id.cmp(&other.actor_id)
633    }
634}
635
636impl<A: Referable> Hash for ActorRef<A> {
637    fn hash<H: Hasher>(&self, state: &mut H) {
638        self.actor_id.hash(state);
639    }
640}
641
642/// Port ids identify [`crate::mailbox::Port`]s of an actor.
643///
644/// TODO: consider moving [`crate::mailbox::Port`] to `PortRef` in this
645/// module for consistency with actors,
646#[derive(
647    Debug,
648    Serialize,
649    Deserialize,
650    Clone,
651    PartialEq,
652    Eq,
653    PartialOrd,
654    Hash,
655    Ord,
656    typeuri::Named
657)]
658pub struct PortId(ActorId, u64);
659
660impl PortId {
661    /// Create a new port ID.
662    pub fn new(actor_id: ActorId, port: u64) -> Self {
663        Self(actor_id, port)
664    }
665
666    /// The ID of the port's owning actor.
667    pub fn actor_id(&self) -> &ActorId {
668        &self.0
669    }
670
671    /// Convert this port ID into an actor ID.
672    pub fn into_actor_id(self) -> ActorId {
673        self.0
674    }
675
676    /// This port's index.
677    pub fn index(&self) -> u64 {
678        self.1
679    }
680
681    pub(crate) fn is_actor_port(&self) -> bool {
682        self.1 & ACTOR_PORT_BIT != 0
683    }
684
685    /// Send a serialized message to this port, provided a sending capability,
686    /// such as [`crate::actor::Instance`]. It is the sender's responsibility
687    /// to ensure that the provided message is well-typed.
688    pub fn send(&self, cx: &impl context::Actor, serialized: wirevalue::Any) {
689        let mut headers = Flattrs::new();
690        crate::mailbox::headers::set_send_timestamp(&mut headers);
691        cx.post(
692            self.clone(),
693            headers,
694            serialized,
695            true,
696            context::SeqInfoPolicy::AssignNew,
697        );
698    }
699
700    /// Send a serialized message to this port, provided a sending capability,
701    /// such as [`crate::actor::Instance`], with additional context provided by headers.
702    /// It is the sender's responsibility to ensure that the provided message is well-typed.
703    pub fn send_with_headers(
704        &self,
705        cx: &impl context::Actor,
706        serialized: wirevalue::Any,
707        mut headers: Flattrs,
708    ) {
709        crate::mailbox::headers::set_send_timestamp(&mut headers);
710        cx.post(
711            self.clone(),
712            headers,
713            serialized,
714            true,
715            context::SeqInfoPolicy::AssignNew,
716        );
717    }
718
719    /// Split this port, returning a new port that relays messages to the port
720    /// through a local proxy, which may coalesce messages.
721    pub fn split(
722        &self,
723        cx: &impl context::Actor,
724        reducer_spec: Option<ReducerSpec>,
725        reducer_mode: ReducerMode,
726        return_undeliverable: bool,
727    ) -> anyhow::Result<PortId> {
728        cx.split(
729            self.clone(),
730            reducer_spec,
731            reducer_mode,
732            return_undeliverable,
733        )
734    }
735}
736
737impl FromStr for PortId {
738    type Err = ReferenceParsingError;
739
740    fn from_str(addr: &str) -> Result<Self, Self::Err> {
741        match addr.parse()? {
742            Reference::Port(port_id) => Ok(port_id),
743            _ => Err(ReferenceParsingError::WrongType("port".into())),
744        }
745    }
746}
747
748impl fmt::Display for PortId {
749    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
750        let PortId(actor_id, port) = self;
751        if self.is_actor_port() {
752            let type_info = TypeInfo::get(*port).or_else(|| TypeInfo::get(*port & !ACTOR_PORT_BIT));
753            let typename = type_info.map_or("unknown", TypeInfo::typename);
754            write!(f, "{}[{}<{}>]", actor_id, port, typename)
755        } else {
756            write!(f, "{}[{}]", actor_id, port)
757        }
758    }
759}
760
761/// A reference to a remote port. All messages passed through
762/// PortRefs will be serialized. PortRefs are always streaming.
763#[derive(Debug, Serialize, Deserialize, Derivative, typeuri::Named)]
764#[derivative(PartialEq, Eq, PartialOrd, Hash, Ord)]
765pub struct PortRef<M> {
766    port_id: PortId,
767    #[derivative(
768        PartialEq = "ignore",
769        PartialOrd = "ignore",
770        Ord = "ignore",
771        Hash = "ignore"
772    )]
773    reducer_spec: Option<ReducerSpec>,
774    #[derivative(
775        PartialEq = "ignore",
776        PartialOrd = "ignore",
777        Ord = "ignore",
778        Hash = "ignore"
779    )]
780    streaming_opts: StreamingReducerOpts,
781    phantom: PhantomData<M>,
782    return_undeliverable: bool,
783}
784
785impl<M: RemoteMessage> PortRef<M> {
786    /// The caller attests that the provided PortId can be
787    /// converted to a reachable, typed port reference.
788    pub fn attest(port_id: PortId) -> Self {
789        Self {
790            port_id,
791            reducer_spec: None,
792            streaming_opts: StreamingReducerOpts::default(),
793            phantom: PhantomData,
794            return_undeliverable: true,
795        }
796    }
797
798    /// The caller attests that the provided PortId can be
799    /// converted to a reachable, typed port reference.
800    pub fn attest_reducible(
801        port_id: PortId,
802        reducer_spec: Option<ReducerSpec>,
803        streaming_opts: StreamingReducerOpts,
804    ) -> Self {
805        Self {
806            port_id,
807            reducer_spec,
808            streaming_opts,
809            phantom: PhantomData,
810            return_undeliverable: true,
811        }
812    }
813
814    /// The caller attests that the provided PortId can be
815    /// converted to a reachable, typed port reference.
816    pub fn attest_message_port(actor: &ActorId) -> Self {
817        PortRef::<M>::attest(actor.port_id(<M as Named>::port()))
818    }
819
820    /// The typehash of this port's reducer, if any. Reducers
821    /// may be used to coalesce messages sent to a port.
822    pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
823        &self.reducer_spec
824    }
825
826    /// This port's ID.
827    pub fn port_id(&self) -> &PortId {
828        &self.port_id
829    }
830
831    /// Convert this PortRef into its corresponding port id.
832    pub fn into_port_id(self) -> PortId {
833        self.port_id
834    }
835
836    /// coerce it into OncePortRef so we can send messages to this port from
837    /// APIs requires OncePortRef.
838    pub fn into_once(self) -> OncePortRef<M> {
839        let return_undeliverable = self.return_undeliverable;
840        let mut once = OncePortRef::attest(self.into_port_id());
841        once.return_undeliverable = return_undeliverable;
842        once
843    }
844
845    /// Send a message to this port, provided a sending capability, such as
846    /// [`crate::actor::Instance`].
847    pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
848        self.send_with_headers(cx, Flattrs::new(), message)
849    }
850
851    /// Send a message to this port, provided a sending capability, such as
852    /// [`crate::actor::Instance`]. Additional context can be provided in the form of
853    /// headers.
854    pub fn send_with_headers(
855        &self,
856        cx: &impl context::Actor,
857        headers: Flattrs,
858        message: M,
859    ) -> Result<(), MailboxSenderError> {
860        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
861            MailboxSenderError::new_bound(
862                self.port_id.clone(),
863                MailboxSenderErrorKind::Serialize(err.into()),
864            )
865        })?;
866        self.send_serialized(cx, headers, serialized);
867        Ok(())
868    }
869
870    /// Send a serialized message to this port, provided a sending capability, such as
871    /// [`crate::actor::Instance`].
872    pub fn send_serialized(
873        &self,
874        cx: &impl context::Actor,
875        mut headers: Flattrs,
876        message: wirevalue::Any,
877    ) {
878        crate::mailbox::headers::set_send_timestamp(&mut headers);
879        crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
880        cx.post(
881            self.port_id.clone(),
882            headers,
883            message,
884            self.return_undeliverable,
885            context::SeqInfoPolicy::AssignNew,
886        );
887    }
888
889    /// Convert this port into a sink that can be used to send messages using the given capability.
890    pub fn into_sink<C: context::Actor>(self, cx: C) -> PortSink<C, M> {
891        PortSink::new(cx, self)
892    }
893
894    /// Get whether or not messages sent to this port that are undeliverable should
895    /// be returned to the sender.
896    pub fn get_return_undeliverable(&self) -> bool {
897        self.return_undeliverable
898    }
899
900    /// Set whether or not messages sent to this port that are undeliverable
901    /// should be returned to the sender.
902    pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
903        self.return_undeliverable = return_undeliverable;
904    }
905}
906
907impl<M: RemoteMessage> Clone for PortRef<M> {
908    fn clone(&self) -> Self {
909        Self {
910            port_id: self.port_id.clone(),
911            reducer_spec: self.reducer_spec.clone(),
912            streaming_opts: self.streaming_opts.clone(),
913            phantom: PhantomData,
914            return_undeliverable: self.return_undeliverable,
915        }
916    }
917}
918
919impl<M: RemoteMessage> fmt::Display for PortRef<M> {
920    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
921        fmt::Display::fmt(&self.port_id, f)
922    }
923}
924
925/// The kind of unbound port.
926#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
927pub enum UnboundPortKind {
928    /// A streaming port, which should be reduced with the provided options.
929    Streaming(Option<StreamingReducerOpts>),
930    /// A OncePort, which must be one-shot aggregated.
931    Once,
932}
933
934/// The parameters extracted from [`PortRef`] to [`Bindings`].
935#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
936pub struct UnboundPort(
937    pub PortId,
938    pub Option<ReducerSpec>,
939    pub bool, // return_undeliverable
940    pub UnboundPortKind,
941);
942wirevalue::register_type!(UnboundPort);
943
944impl UnboundPort {
945    /// Update the port id of this binding.
946    pub fn update(&mut self, port_id: PortId) {
947        self.0 = port_id;
948    }
949}
950
951impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
952    fn from(port_ref: &PortRef<M>) -> Self {
953        UnboundPort(
954            port_ref.port_id.clone(),
955            port_ref.reducer_spec.clone(),
956            port_ref.return_undeliverable,
957            UnboundPortKind::Streaming(Some(port_ref.streaming_opts.clone())),
958        )
959    }
960}
961
962impl<M: RemoteMessage> Unbind for PortRef<M> {
963    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
964        bindings.push_back(&UnboundPort::from(self))
965    }
966}
967
968impl<M: RemoteMessage> Bind for PortRef<M> {
969    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
970        let UnboundPort(port_id, reducer_spec, return_undeliverable, port_kind) =
971            bindings.try_pop_front::<UnboundPort>()?;
972        self.port_id = port_id;
973        self.reducer_spec = reducer_spec;
974        self.return_undeliverable = return_undeliverable;
975        self.streaming_opts = match port_kind {
976            UnboundPortKind::Streaming(opts) => opts.unwrap_or_default(),
977            UnboundPortKind::Once => {
978                anyhow::bail!("OncePortRef cannot be bound to PortRef")
979            }
980        };
981        Ok(())
982    }
983}
984
985/// A remote reference to a [`OncePort`]. References are serializable
986/// and may be passed to remote actors, which can then use it to send
987/// a message to this port.
988#[derive(Debug, Serialize, Deserialize, PartialEq)]
989pub struct OncePortRef<M> {
990    port_id: PortId,
991    reducer_spec: Option<ReducerSpec>,
992    return_undeliverable: bool,
993    phantom: PhantomData<M>,
994}
995
996impl<M: RemoteMessage> OncePortRef<M> {
997    pub(crate) fn attest(port_id: PortId) -> Self {
998        Self {
999            port_id,
1000            reducer_spec: None,
1001            return_undeliverable: true,
1002            phantom: PhantomData,
1003        }
1004    }
1005
1006    /// The caller attests that the provided PortId can be
1007    /// converted to a reachable, typed once port reference.
1008    pub fn attest_reducible(port_id: PortId, reducer_spec: Option<ReducerSpec>) -> Self {
1009        Self {
1010            port_id,
1011            reducer_spec,
1012            return_undeliverable: true,
1013            phantom: PhantomData,
1014        }
1015    }
1016
1017    /// The typehash of this port's reducer, if any.
1018    pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
1019        &self.reducer_spec
1020    }
1021
1022    /// This port's ID.
1023    pub fn port_id(&self) -> &PortId {
1024        &self.port_id
1025    }
1026
1027    /// Convert this PortRef into its corresponding port id.
1028    pub fn into_port_id(self) -> PortId {
1029        self.port_id
1030    }
1031
1032    /// Send a message to this port, provided a sending capability, such as
1033    /// [`crate::actor::Instance`].
1034    pub fn send(self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1035        self.send_with_headers(cx, Flattrs::new(), message)
1036    }
1037
1038    /// Send a message to this port, provided a sending capability, such as
1039    /// [`crate::actor::Instance`]. Additional context can be provided in the form of headers.
1040    pub fn send_with_headers(
1041        self,
1042        cx: &impl context::Actor,
1043        mut headers: Flattrs,
1044        message: M,
1045    ) -> Result<(), MailboxSenderError> {
1046        crate::mailbox::headers::set_send_timestamp(&mut headers);
1047        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
1048            MailboxSenderError::new_bound(
1049                self.port_id.clone(),
1050                MailboxSenderErrorKind::Serialize(err.into()),
1051            )
1052        })?;
1053        cx.post(
1054            self.port_id.clone(),
1055            headers,
1056            serialized,
1057            self.return_undeliverable,
1058            context::SeqInfoPolicy::AssignNew,
1059        );
1060        Ok(())
1061    }
1062
1063    /// Get whether or not messages sent to this port that are undeliverable should
1064    /// be returned to the sender.
1065    pub fn get_return_undeliverable(&self) -> bool {
1066        self.return_undeliverable
1067    }
1068
1069    /// Set whether or not messages sent to this port that are undeliverable
1070    /// should be returned to the sender.
1071    pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
1072        self.return_undeliverable = return_undeliverable;
1073    }
1074}
1075
1076impl<M: RemoteMessage> Clone for OncePortRef<M> {
1077    fn clone(&self) -> Self {
1078        Self {
1079            port_id: self.port_id.clone(),
1080            reducer_spec: self.reducer_spec.clone(),
1081            return_undeliverable: self.return_undeliverable,
1082            phantom: PhantomData,
1083        }
1084    }
1085}
1086
1087impl<M: RemoteMessage> fmt::Display for OncePortRef<M> {
1088    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1089        fmt::Display::fmt(&self.port_id, f)
1090    }
1091}
1092
1093impl<M: RemoteMessage> Named for OncePortRef<M> {
1094    fn typename() -> &'static str {
1095        wirevalue::intern_typename!(Self, "hyperactor::mailbox::OncePortRef<{}>", M)
1096    }
1097}
1098
1099impl<M: RemoteMessage> From<&OncePortRef<M>> for UnboundPort {
1100    fn from(port_ref: &OncePortRef<M>) -> Self {
1101        UnboundPort(
1102            port_ref.port_id.clone(),
1103            port_ref.reducer_spec.clone(),
1104            true, // return_undeliverable
1105            UnboundPortKind::Once,
1106        )
1107    }
1108}
1109
1110impl<M: RemoteMessage> Unbind for OncePortRef<M> {
1111    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
1112        bindings.push_back(&UnboundPort::from(self))
1113    }
1114}
1115
1116impl<M: RemoteMessage> Bind for OncePortRef<M> {
1117    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
1118        let UnboundPort(port_id, reducer_spec, _return_undeliverable, port_kind) =
1119            bindings.try_pop_front::<UnboundPort>()?;
1120        match port_kind {
1121            UnboundPortKind::Once => {
1122                self.port_id = port_id;
1123                self.reducer_spec = reducer_spec;
1124                Ok(())
1125            }
1126            UnboundPortKind::Streaming(_) => {
1127                anyhow::bail!("PortRef cannot be bound to OncePortRef")
1128            }
1129        }
1130    }
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135    use rand::seq::SliceRandom;
1136    use rand::thread_rng;
1137    use tokio::sync::mpsc;
1138    use uuid::Uuid;
1139
1140    use super::*;
1141    // for macros
1142    use crate::Proc;
1143    use crate::context::Mailbox as _;
1144    use crate::mailbox::PortLocation;
1145    use crate::ordering::SEQ_INFO;
1146    use crate::ordering::SeqInfo;
1147
1148    #[test]
1149    fn test_reference_parse() {
1150        let cases: Vec<(&str, Reference)> = vec![
1151            (
1152                "tcp:[::1]:1234,test",
1153                ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test").into(),
1154            ),
1155            (
1156                "tcp:[::1]:1234,test,testactor[123]",
1157                ActorId::new(
1158                    ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1159                    "testactor",
1160                    123,
1161                )
1162                .into(),
1163            ),
1164            (
1165                // type annotations are ignored
1166                "tcp:[::1]:1234,test,testactor[0][123<my::type>]",
1167                PortId::new(
1168                    ActorId::new(
1169                        ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1170                        "testactor",
1171                        0,
1172                    ),
1173                    123,
1174                )
1175                .into(),
1176            ),
1177        ];
1178
1179        for (s, expected) in cases {
1180            assert_eq!(s.parse::<Reference>().unwrap(), expected, "for {}", s);
1181        }
1182    }
1183
1184    #[test]
1185    fn test_reference_parse_error() {
1186        let cases: Vec<&str> = vec!["(blah)", "world(1, 2, 3)", "test"];
1187
1188        for s in cases {
1189            let result: Result<Reference, ReferenceParsingError> = s.parse();
1190            assert!(result.is_err(), "expected error for: {}", s);
1191        }
1192    }
1193
1194    #[test]
1195    fn test_reference_ord() {
1196        let expected: Vec<Reference> = [
1197            "tcp:[::1]:1234,first",
1198            "tcp:[::1]:1234,second",
1199            "tcp:[::1]:1234,third",
1200        ]
1201        .into_iter()
1202        .map(|s| s.parse().unwrap())
1203        .collect();
1204
1205        let mut sorted = expected.to_vec();
1206        sorted.shuffle(&mut thread_rng());
1207        sorted.sort();
1208
1209        assert_eq!(sorted, expected);
1210    }
1211
1212    #[test]
1213    fn test_port_type_annotation() {
1214        #[derive(typeuri::Named, Serialize, Deserialize)]
1215        struct MyType;
1216        wirevalue::register_type!(MyType);
1217        let port_id = PortId::new(
1218            ActorId::new(
1219                ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1220                "testactor",
1221                1,
1222            ),
1223            MyType::port(),
1224        );
1225        assert_eq!(
1226            port_id.to_string(),
1227            "tcp:[::1]:1234,test,testactor[1][17867850292987402005<hyperactor::reference::tests::MyType>]"
1228        );
1229    }
1230
1231    #[tokio::test]
1232    async fn test_sequencing_from_port_handle_ref_and_id() {
1233        let proc = Proc::local();
1234        let (client, _) = proc.instance("client").unwrap();
1235        let (tx, mut rx) = mpsc::unbounded_channel();
1236        let port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1237            let seq_info = headers.get(SEQ_INFO);
1238            tx.send(seq_info).unwrap();
1239            Ok(())
1240        });
1241        port_handle.send(&client, ()).unwrap();
1242        // Unordered is set for unbound port handle since handler's ordered
1243        // channel is expecting the SEQ_INFO header to be set.
1244        assert_eq!(rx.try_recv().unwrap().unwrap(), SeqInfo::Direct);
1245
1246        port_handle.bind_actor_port();
1247        let port_id = match port_handle.location() {
1248            PortLocation::Bound(port_id) => port_id,
1249            _ => panic!("port_handle should be bound"),
1250        };
1251        assert!(port_id.is_actor_port());
1252        let port_ref = PortRef::attest(port_id.clone());
1253
1254        port_handle.send(&client, ()).unwrap();
1255        let SeqInfo::Session {
1256            session_id,
1257            mut seq,
1258        } = rx.try_recv().unwrap().unwrap()
1259        else {
1260            panic!("expected session info");
1261        };
1262        assert_eq!(session_id, client.sequencer().session_id());
1263        assert_eq!(seq, 1);
1264
1265        fn assert_seq_info(
1266            rx: &mut mpsc::UnboundedReceiver<Option<SeqInfo>>,
1267            session_id: Uuid,
1268            seq: &mut u64,
1269        ) {
1270            *seq += 1;
1271            let SeqInfo::Session {
1272                session_id: rcved_session_id,
1273                seq: rcved_seq,
1274            } = rx.try_recv().unwrap().unwrap()
1275            else {
1276                panic!("expected session info");
1277            };
1278            assert_eq!(rcved_session_id, session_id);
1279            assert_eq!(rcved_seq, *seq);
1280        }
1281
1282        // Interleave sends from port_handle, port_ref, and port_id
1283        for _ in 0..10 {
1284            // From port_handle
1285            port_handle.send(&client, ()).unwrap();
1286            assert_seq_info(&mut rx, session_id, &mut seq);
1287
1288            // From port_ref
1289            for _ in 0..2 {
1290                port_ref.send(&client, ()).unwrap();
1291                assert_seq_info(&mut rx, session_id, &mut seq);
1292            }
1293
1294            // From port_id
1295            for _ in 0..3 {
1296                port_id.send(&client, wirevalue::Any::serialize(&()).unwrap());
1297                assert_seq_info(&mut rx, session_id, &mut seq);
1298            }
1299        }
1300
1301        assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1302    }
1303
1304    #[tokio::test]
1305    async fn test_sequencing_from_port_handle_bound_to_allocated_port() {
1306        let proc = Proc::local();
1307        let (client, _) = proc.instance("client").unwrap();
1308        let (tx, mut rx) = mpsc::unbounded_channel();
1309        let port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1310            let seq_info = headers.get(SEQ_INFO);
1311            tx.send(seq_info).unwrap();
1312            Ok(())
1313        });
1314        port_handle.send(&client, ()).unwrap();
1315        // Unordered be set for unbound port handle since handler's ordered
1316        // channel is expecting the SEQ_INFO header to be set.
1317        assert_eq!(rx.try_recv().unwrap().unwrap(), SeqInfo::Direct);
1318
1319        // Bind to the allocated port.
1320        port_handle.bind();
1321        let port_id = match port_handle.location() {
1322            PortLocation::Bound(port_id) => port_id,
1323            _ => panic!("port_handle should be bound"),
1324        };
1325        // This is a non-actor port, but it still gets seq info (per-port sequence).
1326        assert!(!port_id.is_actor_port());
1327
1328        // After binding, non-actor ports get their own sequence.
1329        port_handle.send(&client, ()).unwrap();
1330        let SeqInfo::Session {
1331            session_id,
1332            seq: seq1,
1333        } = rx
1334            .try_recv()
1335            .unwrap()
1336            .expect("non-actor port should have seq info")
1337        else {
1338            panic!("expected Session variant");
1339        };
1340        assert_eq!(seq1, 1);
1341        assert_eq!(session_id, client.sequencer().session_id());
1342
1343        let port_ref = PortRef::attest(port_id.clone());
1344        port_ref.send(&client, ()).unwrap();
1345        let SeqInfo::Session { seq: seq2, .. } = rx
1346            .try_recv()
1347            .unwrap()
1348            .expect("non-actor port should have seq info")
1349        else {
1350            panic!("expected Session variant");
1351        };
1352        assert_eq!(seq2, 2);
1353
1354        port_id.send(&client, wirevalue::Any::serialize(&()).unwrap());
1355        let SeqInfo::Session { seq: seq3, .. } = rx
1356            .try_recv()
1357            .unwrap()
1358            .expect("non-actor port should have seq info")
1359        else {
1360            panic!("expected Session variant");
1361        };
1362        assert_eq!(seq3, 3);
1363    }
1364}