hyperactor/
reference.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! References for different resources in Hyperactor.
10//!
11//! The "Id" variants are transparent and typeless, whereas the
12//! corresponding "Ref" variants are opaque and typed. The latter intended
13//! to be exposed in user-facing APIs. We provide [`std::convert::From`]
14//! implementations between Id and Refs where this makes sense.
15//!
16//! All system implementations use the same concrete reference
17//! representations, as their specific layout (e.g., actor index, rank,
18//! etc.) are used by the core communications algorithms throughout.
19//!
20//! References and ids are [`crate::Message`]s to facilitate passing
21//! them between actors.
22
23#![allow(dead_code)] // Allow until this is used outside of tests.
24
25use std::cmp::Ord;
26use std::cmp::Ordering;
27use std::cmp::PartialOrd;
28use std::convert::From;
29use std::fmt;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::marker::PhantomData;
33use std::num::ParseIntError;
34use std::str::FromStr;
35
36use derivative::Derivative;
37use enum_as_inner::EnumAsInner;
38use hyperactor_config::Flattrs;
39use serde::Deserialize;
40use serde::Deserializer;
41use serde::Serialize;
42use serde::Serializer;
43use typeuri::ACTOR_PORT_BIT;
44use typeuri::Named;
45use wirevalue::TypeInfo;
46
47use crate::Actor;
48use crate::ActorHandle;
49use crate::RemoteHandles;
50use crate::RemoteMessage;
51use crate::accum::ReducerMode;
52use crate::accum::ReducerSpec;
53use crate::accum::StreamingReducerOpts;
54use crate::actor::Referable;
55use crate::channel::ChannelAddr;
56use crate::context;
57use crate::context::MailboxExt;
58use crate::mailbox::MailboxSenderError;
59use crate::mailbox::MailboxSenderErrorKind;
60use crate::mailbox::PortSink;
61use crate::message::Bind;
62use crate::message::Bindings;
63use crate::message::Unbind;
64
65pub mod lex;
66pub mod name;
67mod parse;
68
69use parse::Lexer;
70use parse::ParseError;
71use parse::Token;
72pub use parse::is_valid_ident;
73use parse::parse;
74
75/// The kinds of references.
76#[derive(strum::Display)]
77pub enum ReferenceKind {
78    /// Proc references.
79    Proc,
80    /// Actor references.
81    Actor,
82    /// Port references.
83    Port,
84}
85
86/// A universal reference to hierarchical identifiers in Hyperactor.
87///
88/// References implement a concrete syntax which can be parsed via
89/// [`FromStr`]. They are of the form:
90///
91/// - `addr,proc_name`,
92/// - `addr,proc_name,actor_name[pid]`,
93/// - `addr,proc_name,actor_name[pid][port]`
94///
95/// Reference also implements a total ordering, so that references are
96/// ordered lexicographically with the hierarchy implied by proc,
97/// actor. This allows reference ordering to be used to implement prefix
98/// based routing.
99#[derive(
100    Debug,
101    Serialize,
102    Deserialize,
103    Clone,
104    PartialEq,
105    Eq,
106    Hash,
107    typeuri::Named,
108    EnumAsInner
109)]
110pub enum Reference {
111    /// A reference to a proc.
112    Proc(ProcId),
113    /// A reference to an actor.
114    Actor(ActorId), // todo: should we only allow name references here?
115    /// A reference to a port.
116    Port(PortId),
117}
118
119impl Reference {
120    /// Tells whether this reference is a prefix of the provided reference.
121    pub fn is_prefix_of(&self, other: &Reference) -> bool {
122        match self {
123            Self::Proc(_) => self.proc_id() == other.proc_id(),
124            Self::Actor(_) => self == other,
125            Self::Port(_) => self == other,
126        }
127    }
128
129    /// The proc id of the reference, if any.
130    pub fn proc_id(&self) -> Option<&ProcId> {
131        match self {
132            Self::Proc(proc_id) => Some(proc_id),
133            Self::Actor(actor_id) => Some(actor_id.proc_id()),
134            Self::Port(port_id) => Some(port_id.actor_id().proc_id()),
135        }
136    }
137
138    /// The actor id of the reference, if any.
139    pub fn actor_id(&self) -> Option<&ActorId> {
140        match self {
141            Self::Proc(_) => None,
142            Self::Actor(actor_id) => Some(actor_id),
143            Self::Port(port_id) => Some(port_id.actor_id()),
144        }
145    }
146
147    /// The actor name of the reference, if any.
148    fn actor_name(&self) -> Option<&str> {
149        match self {
150            Self::Proc(_) => None,
151            Self::Actor(actor_id) => Some(actor_id.name()),
152            Self::Port(port_id) => Some(port_id.actor_id().name()),
153        }
154    }
155
156    /// The pid of the reference, if any.
157    fn pid(&self) -> Option<Index> {
158        match self {
159            Self::Proc(_) => None,
160            Self::Actor(actor_id) => Some(actor_id.pid()),
161            Self::Port(port_id) => Some(port_id.actor_id().pid()),
162        }
163    }
164
165    /// The port of the reference, if any.
166    fn port(&self) -> Option<u64> {
167        match self {
168            Self::Proc(_) => None,
169            Self::Actor(_) => None,
170            Self::Port(port_id) => Some(port_id.index()),
171        }
172    }
173
174    /// Returns the kind of the reference.
175    pub fn kind(&self) -> ReferenceKind {
176        match self {
177            Self::Proc(_) => ReferenceKind::Proc,
178            Self::Actor(_) => ReferenceKind::Actor,
179            Self::Port(_) => ReferenceKind::Port,
180        }
181    }
182}
183
184impl PartialOrd for Reference {
185    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
186        Some(self.cmp(other))
187    }
188}
189
190impl Ord for Reference {
191    fn cmp(&self, other: &Self) -> Ordering {
192        // Order by: proc address/name, then actor_name, then pid, then port
193        (self.proc_id(), self.actor_name(), self.pid(), self.port()).cmp(&(
194            other.proc_id(),
195            other.actor_name(),
196            other.pid(),
197            other.port(),
198        ))
199    }
200}
201
202impl fmt::Display for Reference {
203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204        match self {
205            Self::Proc(proc_id) => fmt::Display::fmt(proc_id, f),
206            Self::Actor(actor_id) => fmt::Display::fmt(actor_id, f),
207            Self::Port(port_id) => fmt::Display::fmt(port_id, f),
208        }
209    }
210}
211
212/// The type of error encountered while parsing references.
213#[derive(thiserror::Error, Debug)]
214pub enum ReferenceParsingError {
215    /// The parser expected a token, but it reached the end of the token stream.
216    #[error("expected token")]
217    Empty,
218
219    /// The parser encountered an unexpected token.
220    #[error("unexpected token: {0}")]
221    Unexpected(String),
222
223    /// The parser encountered an error parsing an integer.
224    #[error(transparent)]
225    ParseInt(#[from] ParseIntError),
226
227    /// A parse error.
228    #[error("parse: {0}")]
229    Parse(#[from] ParseError),
230
231    /// The parser encountered the wrong reference type.
232    #[error("wrong reference type: expected {0}")]
233    WrongType(String),
234
235    /// An invalid channel address was encountered while parsing the reference.
236    #[error("invalid channel address {0}: {1}")]
237    InvalidChannelAddress(String, anyhow::Error),
238}
239
240impl FromStr for Reference {
241    type Err = ReferenceParsingError;
242
243    fn from_str(addr: &str) -> Result<Self, Self::Err> {
244        // References are parsed in the "direct" format:
245        // The reference must contain a comma (anywhere), indicating a proc/actor/port reference.
246
247        match addr.split_once(",") {
248            Some((channel_addr, rest)) => {
249                let channel_addr = channel_addr.parse().map_err(|err| {
250                    ReferenceParsingError::InvalidChannelAddress(channel_addr.to_string(), err)
251                })?;
252
253                Ok(parse! {
254                    Lexer::new(rest);
255
256                    // channeladdr,proc_name
257                    Token::Elem(proc_name) =>
258                    Self::Proc(ProcId::with_name(channel_addr, proc_name)),
259
260                    // channeladdr,proc_name,actor_name
261                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name) =>
262                    Self::Actor(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, 0)),
263
264                    // channeladdr,proc_name,actor_name[pid]
265                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
266                        Token::LeftBracket Token::Uint(pid) Token::RightBracket =>
267                        Self::Actor(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid)),
268
269                    // channeladdr,proc_name,actor_name[pid][port]
270                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
271                        Token::LeftBracket Token::Uint(pid) Token::RightBracket
272                        Token::LeftBracket Token::Uint(index) Token::RightBracket  =>
273                        Self::Port(PortId::new(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid), index as u64)),
274
275                    // channeladdr,proc_name,actor_name[pid][port<type>]
276                    Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
277                        Token::LeftBracket Token::Uint(pid) Token::RightBracket
278                        Token::LeftBracket Token::Uint(index)
279                            Token::LessThan Token::Elem(_type) Token::GreaterThan
280                        Token::RightBracket =>
281                        Self::Port(PortId::new(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid), index as u64)),
282                }?)
283            }
284
285            None => Err(ReferenceParsingError::Unexpected(format!(
286                "expected a comma-separated reference, got: {}",
287                addr
288            ))),
289        }
290    }
291}
292
293impl From<ProcId> for Reference {
294    fn from(proc_id: ProcId) -> Self {
295        Self::Proc(proc_id)
296    }
297}
298
299impl From<ActorId> for Reference {
300    fn from(actor_id: ActorId) -> Self {
301        Self::Actor(actor_id)
302    }
303}
304
305impl From<PortId> for Reference {
306    fn from(port_id: PortId) -> Self {
307        Self::Port(port_id)
308    }
309}
310
311/// Index is a type alias representing a value that can be used as an index
312/// into a sequence.
313pub type Index = usize;
314
315/// Procs are identified by a direct channel address and local name.
316/// Each proc represents an actor runtime that can locally route to all of its
317/// constituent actors.
318#[derive(
319    Debug,
320    Serialize,
321    Deserialize,
322    Clone,
323    PartialEq,
324    Eq,
325    PartialOrd,
326    Hash,
327    Ord,
328    typeuri::Named
329)]
330pub struct ProcId(ChannelAddr, String);
331
332/// Compute an 8-char hex hash suffix from a [`ChannelAddr`].
333fn addr_hash_suffix(addr: &ChannelAddr) -> String {
334    use std::collections::hash_map::DefaultHasher;
335    let mut hasher = DefaultHasher::new();
336    addr.hash(&mut hasher);
337    format!("{:08x}", hasher.finish() as u32)
338}
339
340impl ProcId {
341    /// Create a ProcId with a globally-unique name: `"{base_name}-{hash_of_addr}"`.
342    ///
343    /// Use this for well-known / fixed proc names (e.g. `"service"`, `"local"`)
344    /// that are not already unique across hosts.
345    pub fn unique(addr: ChannelAddr, base_name: impl Into<String>) -> Self {
346        let base = base_name.into();
347        let suffix = addr_hash_suffix(&addr);
348        Self(addr, format!("{}-{}", base, suffix))
349    }
350
351    /// Create a ProcId with an already-unique name (no suffix added).
352    ///
353    /// Use this for deserialization, test helpers, and names that already
354    /// contain a UUID or other uniquifying component.
355    pub fn with_name(addr: ChannelAddr, name: impl Into<String>) -> Self {
356        Self(addr, name.into())
357    }
358
359    /// The base name before the `-{hash}` suffix, if present.
360    ///
361    /// If the name ends with `-XXXXXXXX` (8 hex chars), returns the part
362    /// before that suffix. Otherwise returns the full name.
363    pub fn base_name(&self) -> &str {
364        match self.1.rsplit_once('-') {
365            Some((base, suffix))
366                if suffix.len() == 8 && suffix.chars().all(|c| c.is_ascii_hexdigit()) =>
367            {
368                base
369            }
370            _ => &self.1,
371        }
372    }
373
374    /// Create an actor ID with the provided name, pid within this proc.
375    pub fn actor_id(&self, name: impl Into<String>, pid: Index) -> ActorId {
376        ActorId(self.clone(), name.into(), pid)
377    }
378
379    /// The proc's channel address.
380    pub fn addr(&self) -> &ChannelAddr {
381        &self.0
382    }
383
384    /// The proc's name.
385    pub fn name(&self) -> &str {
386        &self.1
387    }
388}
389
390impl fmt::Display for ProcId {
391    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392        write!(f, "{},{}", self.0, self.1)
393    }
394}
395
396impl FromStr for ProcId {
397    type Err = ReferenceParsingError;
398
399    fn from_str(addr: &str) -> Result<Self, Self::Err> {
400        match addr.parse()? {
401            Reference::Proc(proc_id) => Ok(proc_id),
402            _ => Err(ReferenceParsingError::WrongType("proc".into())),
403        }
404    }
405}
406
407/// Actors are identified by their proc, their name, and pid.
408#[derive(
409    Debug,
410    Serialize,
411    Deserialize,
412    Clone,
413    PartialEq,
414    Eq,
415    PartialOrd,
416    Hash,
417    Ord,
418    typeuri::Named
419)]
420pub struct ActorId(ProcId, String, Index);
421
422hyperactor_config::impl_attrvalue!(ActorId);
423
424impl ActorId {
425    /// Create a new actor ID.
426    pub fn new(proc_id: ProcId, name: impl Into<String>, pid: Index) -> Self {
427        Self(proc_id, name.into(), pid)
428    }
429
430    /// Create a new port ID with the provided port for this actor.
431    pub fn port_id(&self, port: u64) -> PortId {
432        PortId(self.clone(), port)
433    }
434
435    /// Create a child actor ID with the provided PID.
436    pub fn child_id(&self, pid: Index) -> Self {
437        Self(self.0.clone(), self.1.clone(), pid)
438    }
439
440    /// Return the root actor ID for the provided proc and name.
441    pub fn root(proc_id: ProcId, name: String) -> Self {
442        Self(proc_id, name, 0)
443    }
444
445    /// The proc ID of this actor ID.
446    pub fn proc_id(&self) -> &ProcId {
447        &self.0
448    }
449
450    /// The actor's name.
451    pub fn name(&self) -> &str {
452        &self.1
453    }
454
455    /// The actor's pid.
456    pub fn pid(&self) -> Index {
457        self.2
458    }
459}
460
461impl fmt::Display for ActorId {
462    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463        write!(f, "{},{}[{}]", self.0, self.1, self.2)
464    }
465}
466impl<A: Referable> From<ActorRef<A>> for ActorId {
467    fn from(actor_ref: ActorRef<A>) -> Self {
468        actor_ref.actor_id.clone()
469    }
470}
471
472impl<'a, A: Referable> From<&'a ActorRef<A>> for &'a ActorId {
473    fn from(actor_ref: &'a ActorRef<A>) -> Self {
474        &actor_ref.actor_id
475    }
476}
477
478impl FromStr for ActorId {
479    type Err = ReferenceParsingError;
480
481    fn from_str(addr: &str) -> Result<Self, Self::Err> {
482        match addr.parse()? {
483            Reference::Actor(actor_id) => Ok(actor_id),
484            _ => Err(ReferenceParsingError::WrongType("actor".into())),
485        }
486    }
487}
488
489/// ActorRefs are typed references to actors.
490#[derive(typeuri::Named)]
491pub struct ActorRef<A: Referable> {
492    pub(crate) actor_id: ActorId,
493    // fn() -> A so that the struct remains Send
494    phantom: PhantomData<fn() -> A>,
495}
496
497impl<A: Referable> ActorRef<A> {
498    /// Get the remote port for message type [`M`] for the referenced actor.
499    pub fn port<M: RemoteMessage>(&self) -> PortRef<M>
500    where
501        A: RemoteHandles<M>,
502    {
503        PortRef::attest(self.actor_id.port_id(<M as Named>::port()))
504    }
505
506    /// Send an [`M`]-typed message to the referenced actor.
507    pub fn send<M: RemoteMessage>(
508        &self,
509        cx: &impl context::Actor,
510        message: M,
511    ) -> Result<(), MailboxSenderError>
512    where
513        A: RemoteHandles<M>,
514    {
515        self.port().send(cx, message)
516    }
517
518    /// Send an [`M`]-typed message to the referenced actor, with additional context provided by
519    /// headers.
520    pub fn send_with_headers<M: RemoteMessage>(
521        &self,
522        cx: &impl context::Actor,
523        headers: Flattrs,
524        message: M,
525    ) -> Result<(), MailboxSenderError>
526    where
527        A: RemoteHandles<M>,
528    {
529        self.port().send_with_headers(cx, headers, message)
530    }
531
532    /// The caller guarantees that the provided actor ID is also a valid,
533    /// typed reference.  This is usually invoked to provide a guarantee
534    /// that an externally-provided actor ID (e.g., through a command
535    /// line argument) is a valid reference.
536    pub fn attest(actor_id: ActorId) -> Self {
537        Self {
538            actor_id,
539            phantom: PhantomData,
540        }
541    }
542
543    /// The actor ID corresponding with this reference.
544    pub fn actor_id(&self) -> &ActorId {
545        &self.actor_id
546    }
547
548    /// Convert this actor reference into its corresponding actor ID.
549    pub fn into_actor_id(self) -> ActorId {
550        self.actor_id
551    }
552
553    /// Attempt to downcast this reference into a (local) actor handle.
554    /// This will only succeed when the referenced actor is in the same
555    /// proc as the caller.
556    pub fn downcast_handle(&self, cx: &impl context::Actor) -> Option<ActorHandle<A>>
557    where
558        A: Actor,
559    {
560        cx.instance().proc().resolve_actor_ref(self)
561    }
562}
563
564// Implement Serialize manually, without requiring A: Serialize
565impl<A: Referable> Serialize for ActorRef<A> {
566    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
567    where
568        S: Serializer,
569    {
570        // Serialize only the fields that don't depend on A
571        self.actor_id().serialize(serializer)
572    }
573}
574
575// Implement Deserialize manually, without requiring A: Deserialize
576impl<'de, A: Referable> Deserialize<'de> for ActorRef<A> {
577    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
578    where
579        D: Deserializer<'de>,
580    {
581        let actor_id = <ActorId>::deserialize(deserializer)?;
582        Ok(ActorRef {
583            actor_id,
584            phantom: PhantomData,
585        })
586    }
587}
588
589// Implement Debug manually, without requiring A: Debug
590impl<A: Referable> fmt::Debug for ActorRef<A> {
591    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592        f.debug_struct("ActorRef")
593            .field("actor_id", &self.actor_id)
594            .field("type", &std::any::type_name::<A>())
595            .finish()
596    }
597}
598
599impl<A: Referable> fmt::Display for ActorRef<A> {
600    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601        fmt::Display::fmt(&self.actor_id, f)?;
602        write!(f, "<{}>", std::any::type_name::<A>())
603    }
604}
605
606// We implement Clone manually to avoid imposing A: Clone.
607impl<A: Referable> Clone for ActorRef<A> {
608    fn clone(&self) -> Self {
609        Self {
610            actor_id: self.actor_id.clone(),
611            phantom: PhantomData,
612        }
613    }
614}
615
616impl<A: Referable> PartialEq for ActorRef<A> {
617    fn eq(&self, other: &Self) -> bool {
618        self.actor_id == other.actor_id
619    }
620}
621
622impl<A: Referable> Eq for ActorRef<A> {}
623
624impl<A: Referable> PartialOrd for ActorRef<A> {
625    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
626        Some(self.cmp(other))
627    }
628}
629
630impl<A: Referable> Ord for ActorRef<A> {
631    fn cmp(&self, other: &Self) -> Ordering {
632        self.actor_id.cmp(&other.actor_id)
633    }
634}
635
636impl<A: Referable> Hash for ActorRef<A> {
637    fn hash<H: Hasher>(&self, state: &mut H) {
638        self.actor_id.hash(state);
639    }
640}
641
642/// Port ids identify [`crate::mailbox::Port`]s of an actor.
643///
644/// TODO: consider moving [`crate::mailbox::Port`] to `PortRef` in this
645/// module for consistency with actors,
646#[derive(
647    Debug,
648    Serialize,
649    Deserialize,
650    Clone,
651    PartialEq,
652    Eq,
653    PartialOrd,
654    Hash,
655    Ord,
656    typeuri::Named
657)]
658pub struct PortId(ActorId, u64);
659
660impl PortId {
661    /// Create a new port ID.
662    pub fn new(actor_id: ActorId, port: u64) -> Self {
663        Self(actor_id, port)
664    }
665
666    /// The ID of the port's owning actor.
667    pub fn actor_id(&self) -> &ActorId {
668        &self.0
669    }
670
671    /// Convert this port ID into an actor ID.
672    pub fn into_actor_id(self) -> ActorId {
673        self.0
674    }
675
676    /// This port's index.
677    pub fn index(&self) -> u64 {
678        self.1
679    }
680
681    pub(crate) fn is_actor_port(&self) -> bool {
682        self.1 & ACTOR_PORT_BIT != 0
683    }
684
685    /// Send a serialized message to this port, provided a sending capability,
686    /// such as [`crate::actor::Instance`]. It is the sender's responsibility
687    /// to ensure that the provided message is well-typed.
688    pub fn send(&self, cx: &impl context::Actor, serialized: wirevalue::Any) {
689        let mut headers = Flattrs::new();
690        crate::mailbox::headers::set_send_timestamp(&mut headers);
691        cx.post(
692            self.clone(),
693            headers,
694            serialized,
695            true,
696            context::SeqInfoPolicy::AssignNew,
697        );
698    }
699
700    /// Send a serialized message to this port, provided a sending capability,
701    /// such as [`crate::actor::Instance`], with additional context provided by headers.
702    /// It is the sender's responsibility to ensure that the provided message is well-typed.
703    pub fn send_with_headers(
704        &self,
705        cx: &impl context::Actor,
706        serialized: wirevalue::Any,
707        mut headers: Flattrs,
708    ) {
709        crate::mailbox::headers::set_send_timestamp(&mut headers);
710        cx.post(
711            self.clone(),
712            headers,
713            serialized,
714            true,
715            context::SeqInfoPolicy::AssignNew,
716        );
717    }
718
719    /// Split this port, returning a new port that relays messages to the port
720    /// through a local proxy, which may coalesce messages.
721    pub fn split(
722        &self,
723        cx: &impl context::Actor,
724        reducer_spec: Option<ReducerSpec>,
725        reducer_mode: ReducerMode,
726        return_undeliverable: bool,
727    ) -> anyhow::Result<PortId> {
728        cx.split(
729            self.clone(),
730            reducer_spec,
731            reducer_mode,
732            return_undeliverable,
733        )
734    }
735}
736
737impl FromStr for PortId {
738    type Err = ReferenceParsingError;
739
740    fn from_str(addr: &str) -> Result<Self, Self::Err> {
741        match addr.parse()? {
742            Reference::Port(port_id) => Ok(port_id),
743            _ => Err(ReferenceParsingError::WrongType("port".into())),
744        }
745    }
746}
747
748impl fmt::Display for PortId {
749    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
750        let PortId(actor_id, port) = self;
751        if self.is_actor_port() {
752            let type_info = TypeInfo::get(*port).or_else(|| TypeInfo::get(*port & !ACTOR_PORT_BIT));
753            let typename = type_info.map_or("unknown", TypeInfo::typename);
754            write!(f, "{}[{}<{}>]", actor_id, port, typename)
755        } else {
756            write!(f, "{}[{}]", actor_id, port)
757        }
758    }
759}
760
761/// A reference to a remote port. All messages passed through
762/// PortRefs will be serialized. PortRefs are always streaming.
763#[derive(Debug, Serialize, Deserialize, Derivative, typeuri::Named)]
764#[derivative(PartialEq, Eq, PartialOrd, Hash, Ord)]
765pub struct PortRef<M> {
766    port_id: PortId,
767    #[derivative(
768        PartialEq = "ignore",
769        PartialOrd = "ignore",
770        Ord = "ignore",
771        Hash = "ignore"
772    )]
773    reducer_spec: Option<ReducerSpec>,
774    #[derivative(
775        PartialEq = "ignore",
776        PartialOrd = "ignore",
777        Ord = "ignore",
778        Hash = "ignore"
779    )]
780    streaming_opts: StreamingReducerOpts,
781    phantom: PhantomData<M>,
782    return_undeliverable: bool,
783    #[derivative(
784        PartialEq = "ignore",
785        PartialOrd = "ignore",
786        Ord = "ignore",
787        Hash = "ignore"
788    )]
789    unsplit: bool,
790}
791
792impl<M: RemoteMessage> PortRef<M> {
793    /// The caller attests that the provided PortId can be
794    /// converted to a reachable, typed port reference.
795    pub fn attest(port_id: PortId) -> Self {
796        Self {
797            port_id,
798            reducer_spec: None,
799            streaming_opts: StreamingReducerOpts::default(),
800            phantom: PhantomData,
801            return_undeliverable: true,
802            unsplit: false,
803        }
804    }
805
806    /// The caller attests that the provided PortId can be
807    /// converted to a reachable, typed port reference.
808    pub fn attest_reducible(
809        port_id: PortId,
810        reducer_spec: Option<ReducerSpec>,
811        streaming_opts: StreamingReducerOpts,
812    ) -> Self {
813        Self {
814            port_id,
815            reducer_spec,
816            streaming_opts,
817            phantom: PhantomData,
818            return_undeliverable: true,
819            unsplit: false,
820        }
821    }
822
823    /// Prevents the port from being split.
824    pub fn unsplit(mut self) -> Self {
825        self.unsplit = true;
826        self
827    }
828
829    /// The caller attests that the provided PortId can be
830    /// converted to a reachable, typed port reference.
831    pub fn attest_message_port(actor: &ActorId) -> Self {
832        PortRef::<M>::attest(actor.port_id(<M as Named>::port()))
833    }
834
835    /// The typehash of this port's reducer, if any. Reducers
836    /// may be used to coalesce messages sent to a port.
837    pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
838        &self.reducer_spec
839    }
840
841    /// This port's ID.
842    pub fn port_id(&self) -> &PortId {
843        &self.port_id
844    }
845
846    /// Convert this PortRef into its corresponding port id.
847    pub fn into_port_id(self) -> PortId {
848        self.port_id
849    }
850
851    /// coerce it into OncePortRef so we can send messages to this port from
852    /// APIs requires OncePortRef.
853    pub fn into_once(self) -> OncePortRef<M> {
854        let return_undeliverable = self.return_undeliverable;
855        let unsplit = self.unsplit;
856        let mut once = OncePortRef::attest(self.into_port_id());
857        once.return_undeliverable = return_undeliverable;
858        once.unsplit = unsplit;
859        once
860    }
861
862    /// Send a message to this port, provided a sending capability, such as
863    /// [`crate::actor::Instance`].
864    pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
865        self.send_with_headers(cx, Flattrs::new(), message)
866    }
867
868    /// Send a message to this port, provided a sending capability, such as
869    /// [`crate::actor::Instance`]. Additional context can be provided in the form of
870    /// headers.
871    pub fn send_with_headers(
872        &self,
873        cx: &impl context::Actor,
874        headers: Flattrs,
875        message: M,
876    ) -> Result<(), MailboxSenderError> {
877        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
878            MailboxSenderError::new_bound(
879                self.port_id.clone(),
880                MailboxSenderErrorKind::Serialize(err.into()),
881            )
882        })?;
883        self.send_serialized(cx, headers, serialized);
884        Ok(())
885    }
886
887    /// Send a serialized message to this port, provided a sending capability, such as
888    /// [`crate::actor::Instance`].
889    pub fn send_serialized(
890        &self,
891        cx: &impl context::Actor,
892        mut headers: Flattrs,
893        message: wirevalue::Any,
894    ) {
895        crate::mailbox::headers::set_send_timestamp(&mut headers);
896        crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
897        cx.post(
898            self.port_id.clone(),
899            headers,
900            message,
901            self.return_undeliverable,
902            context::SeqInfoPolicy::AssignNew,
903        );
904    }
905
906    /// Convert this port into a sink that can be used to send messages using the given capability.
907    pub fn into_sink<C: context::Actor>(self, cx: C) -> PortSink<C, M> {
908        PortSink::new(cx, self)
909    }
910
911    /// Get whether or not messages sent to this port that are undeliverable should
912    /// be returned to the sender.
913    pub fn get_return_undeliverable(&self) -> bool {
914        self.return_undeliverable
915    }
916
917    /// Set whether or not messages sent to this port that are undeliverable
918    /// should be returned to the sender.
919    pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
920        self.return_undeliverable = return_undeliverable;
921    }
922}
923
924impl<M: RemoteMessage> Clone for PortRef<M> {
925    fn clone(&self) -> Self {
926        Self {
927            port_id: self.port_id.clone(),
928            reducer_spec: self.reducer_spec.clone(),
929            streaming_opts: self.streaming_opts.clone(),
930            phantom: PhantomData,
931            return_undeliverable: self.return_undeliverable,
932            unsplit: self.unsplit,
933        }
934    }
935}
936
937impl<M: RemoteMessage> fmt::Display for PortRef<M> {
938    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
939        fmt::Display::fmt(&self.port_id, f)
940    }
941}
942
943/// The kind of unbound port.
944#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
945pub enum UnboundPortKind {
946    /// A streaming port, which should be reduced with the provided options.
947    Streaming(Option<StreamingReducerOpts>),
948    /// A OncePort, which must be one-shot aggregated.
949    Once,
950}
951
952/// The parameters extracted from [`PortRef`] to [`Bindings`].
953#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
954pub struct UnboundPort(
955    pub PortId,
956    pub Option<ReducerSpec>,
957    pub bool, // return_undeliverable
958    pub UnboundPortKind,
959    pub bool, // unsplit
960);
961wirevalue::register_type!(UnboundPort);
962
963impl UnboundPort {
964    /// Update the port id of this binding.
965    pub fn update(&mut self, port_id: PortId) {
966        self.0 = port_id;
967    }
968}
969
970impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
971    fn from(port_ref: &PortRef<M>) -> Self {
972        UnboundPort(
973            port_ref.port_id.clone(),
974            port_ref.reducer_spec.clone(),
975            port_ref.return_undeliverable,
976            UnboundPortKind::Streaming(Some(port_ref.streaming_opts.clone())),
977            port_ref.unsplit,
978        )
979    }
980}
981
982impl<M: RemoteMessage> Unbind for PortRef<M> {
983    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
984        bindings.push_back(&UnboundPort::from(self))
985    }
986}
987
988impl<M: RemoteMessage> Bind for PortRef<M> {
989    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
990        let UnboundPort(port_id, reducer_spec, return_undeliverable, port_kind, unsplit) =
991            bindings.try_pop_front::<UnboundPort>()?;
992        self.port_id = port_id;
993        self.reducer_spec = reducer_spec;
994        self.return_undeliverable = return_undeliverable;
995        self.unsplit = unsplit;
996        self.streaming_opts = match port_kind {
997            UnboundPortKind::Streaming(opts) => opts.unwrap_or_default(),
998            UnboundPortKind::Once => {
999                anyhow::bail!("OncePortRef cannot be bound to PortRef")
1000            }
1001        };
1002        Ok(())
1003    }
1004}
1005
1006/// A remote reference to a [`OncePort`]. References are serializable
1007/// and may be passed to remote actors, which can then use it to send
1008/// a message to this port.
1009#[derive(Debug, Serialize, Deserialize, PartialEq)]
1010pub struct OncePortRef<M> {
1011    port_id: PortId,
1012    reducer_spec: Option<ReducerSpec>,
1013    return_undeliverable: bool,
1014    unsplit: bool,
1015    phantom: PhantomData<M>,
1016}
1017
1018impl<M: RemoteMessage> OncePortRef<M> {
1019    pub(crate) fn attest(port_id: PortId) -> Self {
1020        Self {
1021            port_id,
1022            reducer_spec: None,
1023            return_undeliverable: true,
1024            unsplit: false,
1025            phantom: PhantomData,
1026        }
1027    }
1028
1029    /// The caller attests that the provided PortId can be
1030    /// converted to a reachable, typed once port reference.
1031    pub fn attest_reducible(port_id: PortId, reducer_spec: Option<ReducerSpec>) -> Self {
1032        Self {
1033            port_id,
1034            reducer_spec,
1035            return_undeliverable: true,
1036            unsplit: false,
1037            phantom: PhantomData,
1038        }
1039    }
1040
1041    /// Prevents the port from being split.
1042    pub fn unsplit(mut self) -> Self {
1043        self.unsplit = true;
1044        self
1045    }
1046
1047    /// The typehash of this port's reducer, if any.
1048    pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
1049        &self.reducer_spec
1050    }
1051
1052    /// This port's ID.
1053    pub fn port_id(&self) -> &PortId {
1054        &self.port_id
1055    }
1056
1057    /// Convert this PortRef into its corresponding port id.
1058    pub fn into_port_id(self) -> PortId {
1059        self.port_id
1060    }
1061
1062    /// Send a message to this port, provided a sending capability, such as
1063    /// [`crate::actor::Instance`].
1064    pub fn send(self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1065        self.send_with_headers(cx, Flattrs::new(), message)
1066    }
1067
1068    /// Send a message to this port, provided a sending capability, such as
1069    /// [`crate::actor::Instance`]. Additional context can be provided in the form of headers.
1070    pub fn send_with_headers(
1071        self,
1072        cx: &impl context::Actor,
1073        mut headers: Flattrs,
1074        message: M,
1075    ) -> Result<(), MailboxSenderError> {
1076        crate::mailbox::headers::set_send_timestamp(&mut headers);
1077        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
1078            MailboxSenderError::new_bound(
1079                self.port_id.clone(),
1080                MailboxSenderErrorKind::Serialize(err.into()),
1081            )
1082        })?;
1083        cx.post(
1084            self.port_id.clone(),
1085            headers,
1086            serialized,
1087            self.return_undeliverable,
1088            context::SeqInfoPolicy::AssignNew,
1089        );
1090        Ok(())
1091    }
1092
1093    /// Get whether or not messages sent to this port that are undeliverable should
1094    /// be returned to the sender.
1095    pub fn get_return_undeliverable(&self) -> bool {
1096        self.return_undeliverable
1097    }
1098
1099    /// Set whether or not messages sent to this port that are undeliverable
1100    /// should be returned to the sender.
1101    pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
1102        self.return_undeliverable = return_undeliverable;
1103    }
1104}
1105
1106impl<M: RemoteMessage> Clone for OncePortRef<M> {
1107    fn clone(&self) -> Self {
1108        Self {
1109            port_id: self.port_id.clone(),
1110            reducer_spec: self.reducer_spec.clone(),
1111            return_undeliverable: self.return_undeliverable,
1112            unsplit: self.unsplit,
1113            phantom: PhantomData,
1114        }
1115    }
1116}
1117
1118impl<M: RemoteMessage> fmt::Display for OncePortRef<M> {
1119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1120        fmt::Display::fmt(&self.port_id, f)
1121    }
1122}
1123
1124impl<M: RemoteMessage> Named for OncePortRef<M> {
1125    fn typename() -> &'static str {
1126        wirevalue::intern_typename!(Self, "hyperactor::mailbox::OncePortRef<{}>", M)
1127    }
1128}
1129
1130impl<M: RemoteMessage> From<&OncePortRef<M>> for UnboundPort {
1131    fn from(port_ref: &OncePortRef<M>) -> Self {
1132        UnboundPort(
1133            port_ref.port_id.clone(),
1134            port_ref.reducer_spec.clone(),
1135            true, // return_undeliverable
1136            UnboundPortKind::Once,
1137            port_ref.unsplit,
1138        )
1139    }
1140}
1141
1142impl<M: RemoteMessage> Unbind for OncePortRef<M> {
1143    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
1144        bindings.push_back(&UnboundPort::from(self))
1145    }
1146}
1147
1148impl<M: RemoteMessage> Bind for OncePortRef<M> {
1149    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
1150        let UnboundPort(port_id, reducer_spec, _return_undeliverable, port_kind, unsplit) =
1151            bindings.try_pop_front::<UnboundPort>()?;
1152        match port_kind {
1153            UnboundPortKind::Once => {
1154                self.port_id = port_id;
1155                self.reducer_spec = reducer_spec;
1156                self.unsplit = unsplit;
1157                Ok(())
1158            }
1159            UnboundPortKind::Streaming(_) => {
1160                anyhow::bail!("PortRef cannot be bound to OncePortRef")
1161            }
1162        }
1163    }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use rand::seq::SliceRandom;
1169    use tokio::sync::mpsc;
1170    use uuid::Uuid;
1171
1172    use super::*;
1173    // for macros
1174    use crate::Proc;
1175    use crate::context::Mailbox as _;
1176    use crate::mailbox::PortLocation;
1177    use crate::ordering::SEQ_INFO;
1178    use crate::ordering::SeqInfo;
1179
1180    #[test]
1181    fn test_reference_parse() {
1182        let cases: Vec<(&str, Reference)> = vec![
1183            (
1184                "tcp:[::1]:1234,test",
1185                ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test").into(),
1186            ),
1187            (
1188                "tcp:[::1]:1234,test,testactor[123]",
1189                ActorId::new(
1190                    ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1191                    "testactor",
1192                    123,
1193                )
1194                .into(),
1195            ),
1196            (
1197                // type annotations are ignored
1198                "tcp:[::1]:1234,test,testactor[0][123<my::type>]",
1199                PortId::new(
1200                    ActorId::new(
1201                        ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1202                        "testactor",
1203                        0,
1204                    ),
1205                    123,
1206                )
1207                .into(),
1208            ),
1209        ];
1210
1211        for (s, expected) in cases {
1212            assert_eq!(s.parse::<Reference>().unwrap(), expected, "for {}", s);
1213        }
1214    }
1215
1216    #[test]
1217    fn test_reference_parse_error() {
1218        let cases: Vec<&str> = vec!["(blah)", "world(1, 2, 3)", "test"];
1219
1220        for s in cases {
1221            let result: Result<Reference, ReferenceParsingError> = s.parse();
1222            assert!(result.is_err(), "expected error for: {}", s);
1223        }
1224    }
1225
1226    #[test]
1227    fn test_reference_ord() {
1228        let expected: Vec<Reference> = [
1229            "tcp:[::1]:1234,first",
1230            "tcp:[::1]:1234,second",
1231            "tcp:[::1]:1234,third",
1232        ]
1233        .into_iter()
1234        .map(|s| s.parse().unwrap())
1235        .collect();
1236
1237        let mut sorted = expected.to_vec();
1238        sorted.shuffle(&mut rand::rng());
1239        sorted.sort();
1240
1241        assert_eq!(sorted, expected);
1242    }
1243
1244    #[test]
1245    fn test_port_type_annotation() {
1246        #[derive(typeuri::Named, Serialize, Deserialize)]
1247        struct MyType;
1248        wirevalue::register_type!(MyType);
1249        let port_id = PortId::new(
1250            ActorId::new(
1251                ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1252                "testactor",
1253                1,
1254            ),
1255            MyType::port(),
1256        );
1257        assert_eq!(
1258            port_id.to_string(),
1259            "tcp:[::1]:1234,test,testactor[1][17867850292987402005<hyperactor::reference::tests::MyType>]"
1260        );
1261    }
1262
1263    #[tokio::test]
1264    async fn test_sequencing_from_port_handle_ref_and_id() {
1265        let proc = Proc::local();
1266        let (client, _) = proc.instance("client").unwrap();
1267        let (tx, mut rx) = mpsc::unbounded_channel();
1268        let port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1269            let seq_info = headers.get(SEQ_INFO);
1270            tx.send(seq_info).unwrap();
1271            Ok(())
1272        });
1273        port_handle.send(&client, ()).unwrap();
1274        // Unordered is set for unbound port handle since handler's ordered
1275        // channel is expecting the SEQ_INFO header to be set.
1276        assert_eq!(rx.try_recv().unwrap().unwrap(), SeqInfo::Direct);
1277
1278        port_handle.bind_actor_port();
1279        let port_id = match port_handle.location() {
1280            PortLocation::Bound(port_id) => port_id,
1281            _ => panic!("port_handle should be bound"),
1282        };
1283        assert!(port_id.is_actor_port());
1284        let port_ref = PortRef::attest(port_id.clone());
1285
1286        port_handle.send(&client, ()).unwrap();
1287        let SeqInfo::Session {
1288            session_id,
1289            mut seq,
1290        } = rx.try_recv().unwrap().unwrap()
1291        else {
1292            panic!("expected session info");
1293        };
1294        assert_eq!(session_id, client.sequencer().session_id());
1295        assert_eq!(seq, 1);
1296
1297        fn assert_seq_info(
1298            rx: &mut mpsc::UnboundedReceiver<Option<SeqInfo>>,
1299            session_id: Uuid,
1300            seq: &mut u64,
1301        ) {
1302            *seq += 1;
1303            let SeqInfo::Session {
1304                session_id: rcved_session_id,
1305                seq: rcved_seq,
1306            } = rx.try_recv().unwrap().unwrap()
1307            else {
1308                panic!("expected session info");
1309            };
1310            assert_eq!(rcved_session_id, session_id);
1311            assert_eq!(rcved_seq, *seq);
1312        }
1313
1314        // Interleave sends from port_handle, port_ref, and port_id
1315        for _ in 0..10 {
1316            // From port_handle
1317            port_handle.send(&client, ()).unwrap();
1318            assert_seq_info(&mut rx, session_id, &mut seq);
1319
1320            // From port_ref
1321            for _ in 0..2 {
1322                port_ref.send(&client, ()).unwrap();
1323                assert_seq_info(&mut rx, session_id, &mut seq);
1324            }
1325
1326            // From port_id
1327            for _ in 0..3 {
1328                port_id.send(&client, wirevalue::Any::serialize(&()).unwrap());
1329                assert_seq_info(&mut rx, session_id, &mut seq);
1330            }
1331        }
1332
1333        assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1334    }
1335
1336    #[tokio::test]
1337    async fn test_sequencing_from_port_handle_bound_to_allocated_port() {
1338        let proc = Proc::local();
1339        let (client, _) = proc.instance("client").unwrap();
1340        let (tx, mut rx) = mpsc::unbounded_channel();
1341        let port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1342            let seq_info = headers.get(SEQ_INFO);
1343            tx.send(seq_info).unwrap();
1344            Ok(())
1345        });
1346        port_handle.send(&client, ()).unwrap();
1347        // Unordered be set for unbound port handle since handler's ordered
1348        // channel is expecting the SEQ_INFO header to be set.
1349        assert_eq!(rx.try_recv().unwrap().unwrap(), SeqInfo::Direct);
1350
1351        // Bind to the allocated port.
1352        port_handle.bind();
1353        let port_id = match port_handle.location() {
1354            PortLocation::Bound(port_id) => port_id,
1355            _ => panic!("port_handle should be bound"),
1356        };
1357        // This is a non-actor port, but it still gets seq info (per-port sequence).
1358        assert!(!port_id.is_actor_port());
1359
1360        // After binding, non-actor ports get their own sequence.
1361        port_handle.send(&client, ()).unwrap();
1362        let SeqInfo::Session {
1363            session_id,
1364            seq: seq1,
1365        } = rx
1366            .try_recv()
1367            .unwrap()
1368            .expect("non-actor port should have seq info")
1369        else {
1370            panic!("expected Session variant");
1371        };
1372        assert_eq!(seq1, 1);
1373        assert_eq!(session_id, client.sequencer().session_id());
1374
1375        let port_ref = PortRef::attest(port_id.clone());
1376        port_ref.send(&client, ()).unwrap();
1377        let SeqInfo::Session { seq: seq2, .. } = rx
1378            .try_recv()
1379            .unwrap()
1380            .expect("non-actor port should have seq info")
1381        else {
1382            panic!("expected Session variant");
1383        };
1384        assert_eq!(seq2, 2);
1385
1386        port_id.send(&client, wirevalue::Any::serialize(&()).unwrap());
1387        let SeqInfo::Session { seq: seq3, .. } = rx
1388            .try_recv()
1389            .unwrap()
1390            .expect("non-actor port should have seq info")
1391        else {
1392            panic!("expected Session variant");
1393        };
1394        assert_eq!(seq3, 3);
1395    }
1396}