1#![allow(dead_code)] use std::cmp::Ord;
26use std::cmp::Ordering;
27use std::cmp::PartialOrd;
28use std::convert::From;
29use std::fmt;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::marker::PhantomData;
33use std::num::ParseIntError;
34use std::str::FromStr;
35
36use derivative::Derivative;
37use enum_as_inner::EnumAsInner;
38use hyperactor_config::attrs::Attrs;
39use rand::Rng;
40use serde::Deserialize;
41use serde::Deserializer;
42use serde::Serialize;
43use serde::Serializer;
44use typeuri::Named;
45use wirevalue::TypeInfo;
46
47use crate::Actor;
48use crate::ActorHandle;
49use crate::RemoteHandles;
50use crate::RemoteMessage;
51use crate::accum::ReducerOpts;
52use crate::accum::ReducerSpec;
53use crate::actor::Referable;
54use crate::channel::ChannelAddr;
55use crate::context;
56use crate::context::MailboxExt;
57use crate::mailbox::MailboxSenderError;
58use crate::mailbox::MailboxSenderErrorKind;
59use crate::mailbox::PortSink;
60use crate::message::Bind;
61use crate::message::Bindings;
62use crate::message::Unbind;
63
64pub mod lex;
65pub mod name;
66mod parse;
67
68use parse::Lexer;
69use parse::ParseError;
70use parse::Token;
71pub use parse::is_valid_ident;
72use parse::parse;
73
74#[derive(strum::Display)]
76pub enum ReferenceKind {
77 World,
79 Proc,
81 Actor,
83 Port,
85 Gang,
87}
88
89#[derive(
105 Debug,
106 Serialize,
107 Deserialize,
108 Clone,
109 PartialEq,
110 Eq,
111 Hash,
112 typeuri::Named,
113 EnumAsInner
114)]
115pub enum Reference {
116 World(WorldId),
118 Proc(ProcId),
120 Actor(ActorId), Port(PortId),
124 Gang(GangId),
126}
127
128impl Reference {
129 pub fn is_prefix_of(&self, other: &Reference) -> bool {
131 match self {
132 Self::World(_) => self.world_id() == other.world_id(),
133 Self::Proc(_) => self.proc_id() == other.proc_id(),
134 Self::Actor(_) => self == other,
135 Self::Port(_) => self == other,
136 Self::Gang(_) => self == other,
137 }
138 }
139
140 pub fn world_id(&self) -> Option<&WorldId> {
142 match self {
143 Self::World(world_id) => Some(world_id),
144 Self::Proc(proc_id) => proc_id.world_id(),
145 Self::Actor(ActorId(proc_id, _, _)) => proc_id.world_id(),
146 Self::Port(PortId(ActorId(proc_id, _, _), _)) => proc_id.world_id(),
147 Self::Gang(GangId(world_id, _)) => Some(world_id),
148 }
149 }
150
151 pub fn proc_id(&self) -> Option<&ProcId> {
153 match self {
154 Self::World(_) => None,
155 Self::Proc(proc_id) => Some(proc_id),
156 Self::Actor(ActorId(proc_id, _, _)) => Some(proc_id),
157 Self::Port(PortId(ActorId(proc_id, _, _), _)) => Some(proc_id),
158 Self::Gang(_) => None,
159 }
160 }
161
162 fn rank(&self) -> Option<Index> {
164 self.proc_id().and_then(|proc_id| proc_id.rank())
165 }
166
167 pub fn actor_id(&self) -> Option<&ActorId> {
169 match self {
170 Self::World(_) => None,
171 Self::Proc(_) => None,
172 Self::Actor(actor_id) => Some(actor_id),
173 Self::Port(PortId(actor_id, _)) => Some(actor_id),
174 Self::Gang(_) => None,
175 }
176 }
177
178 fn actor_name(&self) -> Option<&str> {
180 match self {
181 Self::World(_) => None,
182 Self::Proc(_) => None,
183 Self::Actor(actor_id) => Some(actor_id.name()),
184 Self::Port(PortId(actor_id, _)) => Some(actor_id.name()),
185 Self::Gang(gang_id) => Some(&gang_id.1),
186 }
187 }
188
189 fn pid(&self) -> Option<Index> {
191 match self {
192 Self::World(_) => None,
193 Self::Proc(_) => None,
194 Self::Actor(actor_id) => Some(actor_id.pid()),
195 Self::Port(PortId(actor_id, _)) => Some(actor_id.pid()),
196 Self::Gang(_) => None,
197 }
198 }
199
200 fn port(&self) -> Option<u64> {
202 match self {
203 Self::World(_) => None,
204 Self::Proc(_) => None,
205 Self::Actor(_) => None,
206 Self::Port(port_id) => Some(port_id.index()),
207 Self::Gang(_) => None,
208 }
209 }
210
211 pub fn kind(&self) -> ReferenceKind {
213 match self {
214 Self::World(_) => ReferenceKind::World,
215 Self::Proc(_) => ReferenceKind::Proc,
216 Self::Actor(_) => ReferenceKind::Actor,
217 Self::Port(_) => ReferenceKind::Port,
218 Self::Gang(_) => ReferenceKind::Gang,
219 }
220 }
221}
222
223impl PartialOrd for Reference {
224 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
225 Some(self.cmp(other))
226 }
227}
228
229impl Ord for Reference {
230 fn cmp(&self, other: &Self) -> Ordering {
231 (
232 self.world_id(),
234 self.rank(),
235 self.proc_id().and_then(ProcId::as_direct),
236 self.actor_name(),
237 self.pid(),
238 self.port(),
239 )
240 .cmp(&(
241 other.world_id(),
242 other.rank(),
243 other.proc_id().and_then(ProcId::as_direct),
244 other.actor_name(),
245 other.pid(),
246 other.port(),
247 ))
248 }
249}
250
251impl fmt::Display for Reference {
252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253 match self {
254 Self::World(world_id) => fmt::Display::fmt(world_id, f),
255 Self::Proc(proc_id) => fmt::Display::fmt(proc_id, f),
256 Self::Actor(actor_id) => fmt::Display::fmt(actor_id, f),
257 Self::Port(port_id) => fmt::Display::fmt(port_id, f),
258 Self::Gang(gang_id) => fmt::Display::fmt(gang_id, f),
259 }
260 }
261}
262
263#[macro_export]
300macro_rules! id {
301 ($world:ident) => {
302 $crate::reference::WorldId(stringify!($world).to_string())
303 };
304 ($world:ident [$rank:expr]) => {
305 $crate::reference::ProcId::Ranked(
306 $crate::reference::WorldId(stringify!($world).to_string()),
307 $rank,
308 )
309 };
310 ($world:ident [$rank:expr] . $actor:ident) => {
311 $crate::reference::ActorId(
312 $crate::reference::ProcId::Ranked(
313 $crate::reference::WorldId(stringify!($world).to_string()),
314 $rank,
315 ),
316 stringify!($actor).to_string(),
317 0,
318 )
319 };
320 ($world:ident [$rank:expr] . $actor:ident [$pid:expr]) => {
321 $crate::reference::ActorId(
322 $crate::reference::ProcId::Ranked(
323 $crate::reference::WorldId(stringify!($world).to_string()),
324 $rank,
325 ),
326 stringify!($actor).to_string(),
327 $pid,
328 )
329 };
330 ($world:ident . $actor:ident) => {
331 $crate::reference::GangId(
332 $crate::reference::WorldId(stringify!($world).to_string()),
333 stringify!($actor).to_string(),
334 )
335 };
336 ($world:ident [$rank:expr] . $actor:ident [$pid:expr] [$port:expr]) => {
337 $crate::reference::PortId(
338 $crate::reference::ActorId(
339 $crate::reference::ProcId::Ranked(
340 $crate::reference::WorldId(stringify!($world).to_string()),
341 $rank,
342 ),
343 stringify!($actor).to_string(),
344 $pid,
345 ),
346 $port,
347 )
348 };
349}
350pub use id;
351
352#[derive(thiserror::Error, Debug)]
354pub enum ReferenceParsingError {
355 #[error("expected token")]
357 Empty,
358
359 #[error("unexpected token: {0}")]
361 Unexpected(String),
362
363 #[error(transparent)]
365 ParseInt(#[from] ParseIntError),
366
367 #[error("parse: {0}")]
369 Parse(#[from] ParseError),
370
371 #[error("wrong reference type: expected {0}")]
373 WrongType(String),
374
375 #[error("invalid channel address {0}: {1}")]
377 InvalidChannelAddress(String, anyhow::Error),
378}
379
380impl FromStr for Reference {
381 type Err = ReferenceParsingError;
382
383 fn from_str(addr: &str) -> Result<Self, Self::Err> {
384 match addr.split_once(",") {
391 Some((channel_addr, rest)) => {
392 let channel_addr = channel_addr.parse().map_err(|err| {
393 ReferenceParsingError::InvalidChannelAddress(channel_addr.to_string(), err)
394 })?;
395
396 Ok(parse! {
397 Lexer::new(rest);
398
399 Token::Elem(proc_name) =>
401 Self::Proc(ProcId::Direct(channel_addr, proc_name.to_string())),
402
403 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name) =>
405 Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), 0)),
406
407 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
409 Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
410 Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank)),
411
412 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
414 Token::LeftBracket Token::Uint(rank) Token::RightBracket
415 Token::LeftBracket Token::Uint(index) Token::RightBracket =>
416 Self::Port(PortId(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank), index as u64)),
417
418 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
420 Token::LeftBracket Token::Uint(rank) Token::RightBracket
421 Token::LeftBracket Token::Uint(index)
422 Token::LessThan Token::Elem(_type) Token::GreaterThan
423 Token::RightBracket =>
424 Self::Port(PortId(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank), index as u64)),
425 }?)
426 }
427
428 None => {
430 Ok(parse! {
431 Lexer::new(addr);
432
433 Token::Elem(world) => Self::World(WorldId(world.into())),
435
436 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
438 Self::Proc(ProcId::Ranked(WorldId(world.into()), rank)),
439
440 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
442 Token::Dot Token::Elem(actor) =>
443 Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), 0)),
444
445 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
447 Token::Dot Token::Elem(actor)
448 Token::LeftBracket Token::Uint(pid) Token::RightBracket =>
449 Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid)),
450
451 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
453 Token::Dot Token::Elem(actor)
454 Token::LeftBracket Token::Uint(pid) Token::RightBracket
455 Token::LeftBracket Token::Uint(index) Token::RightBracket =>
456 Self::Port(PortId(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid), index as u64)),
457
458 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
460 Token::Dot Token::Elem(actor)
461 Token::LeftBracket Token::Uint(pid) Token::RightBracket
462 Token::LeftBracket Token::Uint(index)
463 Token::LessThan Token::Elem(_type) Token::GreaterThan
464 Token::RightBracket =>
465 Self::Port(PortId(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid), index as u64)),
466
467 Token::Elem(world) Token::Dot Token::Elem(actor) =>
469 Self::Gang(GangId(WorldId(world.into()), actor.into())),
470 }?)
471 }
472 }
473 }
474}
475
476impl From<WorldId> for Reference {
477 fn from(world_id: WorldId) -> Self {
478 Self::World(world_id)
479 }
480}
481
482impl From<ProcId> for Reference {
483 fn from(proc_id: ProcId) -> Self {
484 Self::Proc(proc_id)
485 }
486}
487
488impl From<ActorId> for Reference {
489 fn from(actor_id: ActorId) -> Self {
490 Self::Actor(actor_id)
491 }
492}
493
494impl From<PortId> for Reference {
495 fn from(port_id: PortId) -> Self {
496 Self::Port(port_id)
497 }
498}
499
500impl From<GangId> for Reference {
501 fn from(gang_id: GangId) -> Self {
502 Self::Gang(gang_id)
503 }
504}
505
506pub type Index = usize;
509
510#[derive(
513 Debug,
514 Serialize,
515 Deserialize,
516 Clone,
517 PartialEq,
518 Eq,
519 PartialOrd,
520 Hash,
521 Ord,
522 typeuri::Named
523)]
524pub struct WorldId(pub String);
525
526impl WorldId {
527 pub fn proc_id(&self, index: Index) -> ProcId {
529 ProcId::Ranked(self.clone(), index)
530 }
531
532 pub fn name(&self) -> &str {
534 &self.0
535 }
536
537 pub fn random_user_proc(&self) -> ProcId {
539 let mask = 1usize << (std::mem::size_of::<usize>() * 8 - 1);
540 ProcId::Ranked(self.clone(), rand::thread_rng().r#gen::<usize>() | mask)
541 }
542}
543
544impl fmt::Display for WorldId {
545 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546 let WorldId(name) = self;
547 write!(f, "{}", name)
548 }
549}
550
551impl FromStr for WorldId {
552 type Err = ReferenceParsingError;
553
554 fn from_str(addr: &str) -> Result<Self, Self::Err> {
555 match addr.parse()? {
556 Reference::World(world_id) => Ok(world_id),
557 _ => Err(ReferenceParsingError::WrongType("world".into())),
558 }
559 }
560}
561
562#[derive(
570 Debug,
571 Serialize,
572 Deserialize,
573 Clone,
574 PartialEq,
575 Eq,
576 PartialOrd,
577 Hash,
578 Ord,
579 typeuri::Named,
580 EnumAsInner
581)]
582pub enum ProcId {
583 Ranked(WorldId, Index),
585 Direct(ChannelAddr, String),
587}
588
589impl ProcId {
590 pub fn actor_id(&self, name: impl Into<String>, pid: Index) -> ActorId {
592 ActorId(self.clone(), name.into(), pid)
593 }
594
595 pub fn world_id(&self) -> Option<&WorldId> {
597 match self {
598 ProcId::Ranked(world_id, _) => Some(world_id),
599 ProcId::Direct(_, _) => None,
600 }
601 }
602
603 pub fn world_name(&self) -> Option<&str> {
605 self.world_id().map(|world_id| world_id.name())
606 }
607
608 pub fn rank(&self) -> Option<Index> {
610 match self {
611 ProcId::Ranked(_, rank) => Some(*rank),
612 ProcId::Direct(_, _) => None,
613 }
614 }
615
616 pub fn name(&self) -> Option<&String> {
618 match self {
619 ProcId::Ranked(_, _) => None,
620 ProcId::Direct(_, name) => Some(name),
621 }
622 }
623}
624
625impl fmt::Display for ProcId {
626 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627 match self {
628 ProcId::Ranked(world_id, rank) => write!(f, "{}[{}]", world_id, rank),
629 ProcId::Direct(addr, name) => write!(f, "{},{}", addr, name),
630 }
631 }
632}
633
634impl FromStr for ProcId {
635 type Err = ReferenceParsingError;
636
637 fn from_str(addr: &str) -> Result<Self, Self::Err> {
638 match addr.parse()? {
639 Reference::Proc(proc_id) => Ok(proc_id),
640 _ => Err(ReferenceParsingError::WrongType("proc".into())),
641 }
642 }
643}
644
645#[derive(
647 Debug,
648 Serialize,
649 Deserialize,
650 Clone,
651 PartialEq,
652 Eq,
653 PartialOrd,
654 Hash,
655 Ord,
656 typeuri::Named
657)]
658pub struct ActorId(pub ProcId, pub String, pub Index);
659
660hyperactor_config::impl_attrvalue!(ActorId);
661
662impl ActorId {
663 pub fn port_id(&self, port: u64) -> PortId {
665 PortId(self.clone(), port)
666 }
667
668 pub fn child_id(&self, pid: Index) -> Self {
670 Self(self.0.clone(), self.1.clone(), pid)
671 }
672
673 pub fn root(proc_id: ProcId, name: String) -> Self {
675 Self(proc_id, name, 0)
676 }
677
678 pub fn proc_id(&self) -> &ProcId {
680 &self.0
681 }
682
683 pub fn world_name(&self) -> &str {
685 self.0
686 .world_name()
687 .expect("world_name() called on direct proc")
688 }
689
690 pub fn rank(&self) -> Index {
692 self.0.rank().expect("rank() called on direct proc")
693 }
694
695 pub fn name(&self) -> &str {
697 &self.1
698 }
699
700 pub fn pid(&self) -> Index {
702 self.2
703 }
704}
705
706impl fmt::Display for ActorId {
707 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
708 let ActorId(proc_id, name, pid) = self;
709 match proc_id {
710 ProcId::Ranked(..) => write!(f, "{}.{}[{}]", proc_id, name, pid),
711 ProcId::Direct(..) => write!(f, "{},{}[{}]", proc_id, name, pid),
712 }
713 }
714}
715impl<A: Referable> From<ActorRef<A>> for ActorId {
716 fn from(actor_ref: ActorRef<A>) -> Self {
717 actor_ref.actor_id.clone()
718 }
719}
720
721impl<'a, A: Referable> From<&'a ActorRef<A>> for &'a ActorId {
722 fn from(actor_ref: &'a ActorRef<A>) -> Self {
723 &actor_ref.actor_id
724 }
725}
726
727impl FromStr for ActorId {
728 type Err = ReferenceParsingError;
729
730 fn from_str(addr: &str) -> Result<Self, Self::Err> {
731 match addr.parse()? {
732 Reference::Actor(actor_id) => Ok(actor_id),
733 _ => Err(ReferenceParsingError::WrongType("actor".into())),
734 }
735 }
736}
737
738#[derive(typeuri::Named)]
740pub struct ActorRef<A: Referable> {
741 pub(crate) actor_id: ActorId,
742 phantom: PhantomData<fn() -> A>,
744}
745
746impl<A: Referable> ActorRef<A> {
747 pub fn port<M: RemoteMessage>(&self) -> PortRef<M>
749 where
750 A: RemoteHandles<M>,
751 {
752 PortRef::attest(self.actor_id.port_id(<M as Named>::port()))
753 }
754
755 pub fn send<M: RemoteMessage>(
757 &self,
758 cx: &impl context::Actor,
759 message: M,
760 ) -> Result<(), MailboxSenderError>
761 where
762 A: RemoteHandles<M>,
763 {
764 self.port().send(cx, message)
765 }
766
767 pub fn send_with_headers<M: RemoteMessage>(
770 &self,
771 cx: &impl context::Actor,
772 headers: Attrs,
773 message: M,
774 ) -> Result<(), MailboxSenderError>
775 where
776 A: RemoteHandles<M>,
777 {
778 self.port().send_with_headers(cx, headers, message)
779 }
780
781 pub fn attest(actor_id: ActorId) -> Self {
786 Self {
787 actor_id,
788 phantom: PhantomData,
789 }
790 }
791
792 pub fn actor_id(&self) -> &ActorId {
794 &self.actor_id
795 }
796
797 pub fn into_actor_id(self) -> ActorId {
799 self.actor_id
800 }
801
802 pub fn downcast_handle(&self, cx: &impl context::Actor) -> Option<ActorHandle<A>>
806 where
807 A: Actor,
808 {
809 cx.instance().proc().resolve_actor_ref(self)
810 }
811}
812
813impl<A: Referable> Serialize for ActorRef<A> {
815 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
816 where
817 S: Serializer,
818 {
819 self.actor_id().serialize(serializer)
821 }
822}
823
824impl<'de, A: Referable> Deserialize<'de> for ActorRef<A> {
826 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
827 where
828 D: Deserializer<'de>,
829 {
830 let actor_id = <ActorId>::deserialize(deserializer)?;
831 Ok(ActorRef {
832 actor_id,
833 phantom: PhantomData,
834 })
835 }
836}
837
838impl<A: Referable> fmt::Debug for ActorRef<A> {
840 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
841 f.debug_struct("ActorRef")
842 .field("actor_id", &self.actor_id)
843 .field("type", &std::any::type_name::<A>())
844 .finish()
845 }
846}
847
848impl<A: Referable> fmt::Display for ActorRef<A> {
849 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
850 fmt::Display::fmt(&self.actor_id, f)?;
851 write!(f, "<{}>", std::any::type_name::<A>())
852 }
853}
854
855impl<A: Referable> Clone for ActorRef<A> {
857 fn clone(&self) -> Self {
858 Self {
859 actor_id: self.actor_id.clone(),
860 phantom: PhantomData,
861 }
862 }
863}
864
865impl<A: Referable> PartialEq for ActorRef<A> {
866 fn eq(&self, other: &Self) -> bool {
867 self.actor_id == other.actor_id
868 }
869}
870
871impl<A: Referable> Eq for ActorRef<A> {}
872
873impl<A: Referable> PartialOrd for ActorRef<A> {
874 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
875 Some(self.cmp(other))
876 }
877}
878
879impl<A: Referable> Ord for ActorRef<A> {
880 fn cmp(&self, other: &Self) -> Ordering {
881 self.actor_id.cmp(&other.actor_id)
882 }
883}
884
885impl<A: Referable> Hash for ActorRef<A> {
886 fn hash<H: Hasher>(&self, state: &mut H) {
887 self.actor_id.hash(state);
888 }
889}
890
891#[derive(
896 Debug,
897 Serialize,
898 Deserialize,
899 Clone,
900 PartialEq,
901 Eq,
902 PartialOrd,
903 Hash,
904 Ord,
905 typeuri::Named
906)]
907pub struct PortId(pub ActorId, pub u64);
908
909impl PortId {
910 pub fn actor_id(&self) -> &ActorId {
912 &self.0
913 }
914
915 pub fn into_actor_id(self) -> ActorId {
917 self.0
918 }
919
920 pub fn index(&self) -> u64 {
922 self.1
923 }
924
925 pub fn send(&self, cx: &impl context::Actor, serialized: wirevalue::Any) {
929 let mut headers = Attrs::new();
930 crate::mailbox::headers::set_send_timestamp(&mut headers);
931 cx.post(self.clone(), headers, serialized, true);
932 }
933
934 pub fn send_with_headers(
938 &self,
939 cx: &impl context::Actor,
940 serialized: wirevalue::Any,
941 mut headers: Attrs,
942 ) {
943 crate::mailbox::headers::set_send_timestamp(&mut headers);
944 cx.post(self.clone(), headers, serialized, true);
945 }
946
947 pub fn split(
950 &self,
951 cx: &impl context::Actor,
952 reducer_spec: Option<ReducerSpec>,
953 reducer_opts: Option<ReducerOpts>,
954 return_undeliverable: bool,
955 ) -> anyhow::Result<PortId> {
956 cx.split(
957 self.clone(),
958 reducer_spec,
959 reducer_opts,
960 return_undeliverable,
961 )
962 }
963}
964
965impl FromStr for PortId {
966 type Err = ReferenceParsingError;
967
968 fn from_str(addr: &str) -> Result<Self, Self::Err> {
969 match addr.parse()? {
970 Reference::Port(port_id) => Ok(port_id),
971 _ => Err(ReferenceParsingError::WrongType("port".into())),
972 }
973 }
974}
975
976impl fmt::Display for PortId {
977 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
978 let PortId(actor_id, port) = self;
979 if port & (1 << 63) != 0 {
980 let type_info = TypeInfo::get(*port).or_else(|| TypeInfo::get(*port & !(1 << 63)));
981 let typename = type_info.map_or("unknown", TypeInfo::typename);
982 write!(f, "{}[{}<{}>]", actor_id, port, typename)
983 } else {
984 write!(f, "{}[{}]", actor_id, port)
985 }
986 }
987}
988
989#[derive(Debug, Serialize, Deserialize, Derivative, typeuri::Named)]
992#[derivative(PartialEq, Eq, PartialOrd, Hash, Ord)]
993pub struct PortRef<M> {
994 port_id: PortId,
995 #[derivative(
996 PartialEq = "ignore",
997 PartialOrd = "ignore",
998 Ord = "ignore",
999 Hash = "ignore"
1000 )]
1001 reducer_spec: Option<ReducerSpec>,
1002 #[derivative(
1003 PartialEq = "ignore",
1004 PartialOrd = "ignore",
1005 Ord = "ignore",
1006 Hash = "ignore"
1007 )]
1008 reducer_opts: Option<ReducerOpts>,
1009 phantom: PhantomData<M>,
1010 return_undeliverable: bool,
1011}
1012
1013impl<M: RemoteMessage> PortRef<M> {
1014 pub fn attest(port_id: PortId) -> Self {
1017 Self {
1018 port_id,
1019 reducer_spec: None,
1020 reducer_opts: None,
1021 phantom: PhantomData,
1022 return_undeliverable: true,
1023 }
1024 }
1025
1026 pub fn attest_reducible(
1029 port_id: PortId,
1030 reducer_spec: Option<ReducerSpec>,
1031 reducer_opts: Option<ReducerOpts>,
1032 ) -> Self {
1033 Self {
1034 port_id,
1035 reducer_spec,
1036 reducer_opts,
1037 phantom: PhantomData,
1038 return_undeliverable: true,
1039 }
1040 }
1041
1042 pub fn attest_message_port(actor: &ActorId) -> Self {
1045 PortRef::<M>::attest(actor.port_id(<M as Named>::port()))
1046 }
1047
1048 pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
1051 &self.reducer_spec
1052 }
1053
1054 pub fn port_id(&self) -> &PortId {
1056 &self.port_id
1057 }
1058
1059 pub fn into_port_id(self) -> PortId {
1061 self.port_id
1062 }
1063
1064 pub fn into_once(self) -> OncePortRef<M> {
1067 OncePortRef::attest(self.into_port_id())
1068 }
1069
1070 pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1073 self.send_with_headers(cx, Attrs::new(), message)
1074 }
1075
1076 pub fn send_with_headers(
1080 &self,
1081 cx: &impl context::Actor,
1082 headers: Attrs,
1083 message: M,
1084 ) -> Result<(), MailboxSenderError> {
1085 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
1086 MailboxSenderError::new_bound(
1087 self.port_id.clone(),
1088 MailboxSenderErrorKind::Serialize(err.into()),
1089 )
1090 })?;
1091 self.send_serialized(cx, headers, serialized);
1092 Ok(())
1093 }
1094
1095 pub fn send_serialized(
1098 &self,
1099 cx: &impl context::Actor,
1100 mut headers: Attrs,
1101 message: wirevalue::Any,
1102 ) {
1103 crate::mailbox::headers::set_send_timestamp(&mut headers);
1104 crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1105 cx.post(
1106 self.port_id.clone(),
1107 headers,
1108 message,
1109 self.return_undeliverable,
1110 );
1111 }
1112
1113 pub fn into_sink<C: context::Actor>(self, cx: C) -> PortSink<C, M> {
1115 PortSink::new(cx, self)
1116 }
1117
1118 pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
1121 self.return_undeliverable = return_undeliverable;
1122 }
1123}
1124
1125impl<M: RemoteMessage> Clone for PortRef<M> {
1126 fn clone(&self) -> Self {
1127 Self {
1128 port_id: self.port_id.clone(),
1129 reducer_spec: self.reducer_spec.clone(),
1130 reducer_opts: self.reducer_opts.clone(),
1131 phantom: PhantomData,
1132 return_undeliverable: self.return_undeliverable,
1133 }
1134 }
1135}
1136
1137impl<M: RemoteMessage> fmt::Display for PortRef<M> {
1138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1139 fmt::Display::fmt(&self.port_id, f)
1140 }
1141}
1142
1143#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
1145pub struct UnboundPort(
1146 pub PortId,
1147 pub Option<ReducerSpec>,
1148 pub Option<ReducerOpts>,
1149 pub bool, );
1151wirevalue::register_type!(UnboundPort);
1152
1153impl UnboundPort {
1154 pub fn update(&mut self, port_id: PortId) {
1156 self.0 = port_id;
1157 }
1158}
1159
1160impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
1161 fn from(port_ref: &PortRef<M>) -> Self {
1162 UnboundPort(
1163 port_ref.port_id.clone(),
1164 port_ref.reducer_spec.clone(),
1165 port_ref.reducer_opts.clone(),
1166 port_ref.return_undeliverable,
1167 )
1168 }
1169}
1170
1171impl<M: RemoteMessage> Unbind for PortRef<M> {
1172 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
1173 bindings.push_back(&UnboundPort::from(self))
1174 }
1175}
1176
1177impl<M: RemoteMessage> Bind for PortRef<M> {
1178 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
1179 let bound = bindings.try_pop_front::<UnboundPort>()?;
1180 self.port_id = bound.0;
1181 self.reducer_spec = bound.1;
1182 self.reducer_opts = bound.2;
1183 self.return_undeliverable = bound.3;
1184 Ok(())
1185 }
1186}
1187
1188#[derive(Debug, Serialize, Deserialize, PartialEq)]
1192pub struct OncePortRef<M> {
1193 port_id: PortId,
1194 phantom: PhantomData<M>,
1195}
1196
1197impl<M: RemoteMessage> OncePortRef<M> {
1198 pub(crate) fn attest(port_id: PortId) -> Self {
1199 Self {
1200 port_id,
1201 phantom: PhantomData,
1202 }
1203 }
1204
1205 pub fn port_id(&self) -> &PortId {
1207 &self.port_id
1208 }
1209
1210 pub fn into_port_id(self) -> PortId {
1212 self.port_id
1213 }
1214
1215 pub fn send(self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1218 self.send_with_headers(cx, Attrs::new(), message)
1219 }
1220
1221 pub fn send_with_headers(
1224 self,
1225 cx: &impl context::Actor,
1226 mut headers: Attrs,
1227 message: M,
1228 ) -> Result<(), MailboxSenderError> {
1229 crate::mailbox::headers::set_send_timestamp(&mut headers);
1230 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
1231 MailboxSenderError::new_bound(
1232 self.port_id.clone(),
1233 MailboxSenderErrorKind::Serialize(err.into()),
1234 )
1235 })?;
1236 cx.post(self.port_id.clone(), headers, serialized, true);
1237 Ok(())
1238 }
1239}
1240
1241impl<M: RemoteMessage> Clone for OncePortRef<M> {
1242 fn clone(&self) -> Self {
1243 Self {
1244 port_id: self.port_id.clone(),
1245 phantom: PhantomData,
1246 }
1247 }
1248}
1249
1250impl<M: RemoteMessage> fmt::Display for OncePortRef<M> {
1251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1252 fmt::Display::fmt(&self.port_id, f)
1253 }
1254}
1255
1256impl<M: RemoteMessage> Named for OncePortRef<M> {
1257 fn typename() -> &'static str {
1258 wirevalue::intern_typename!(Self, "hyperactor::mailbox::OncePortRef<{}>", M)
1259 }
1260}
1261
1262impl<M: RemoteMessage> Unbind for OncePortRef<M> {
1266 fn unbind(&self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1267 Ok(())
1268 }
1269}
1270
1271impl<M: RemoteMessage> Bind for OncePortRef<M> {
1272 fn bind(&mut self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1273 Ok(())
1274 }
1275}
1276
1277#[derive(
1279 Debug,
1280 Serialize,
1281 Deserialize,
1282 Clone,
1283 PartialEq,
1284 Eq,
1285 PartialOrd,
1286 Hash,
1287 Ord,
1288 typeuri::Named
1289)]
1290pub struct GangId(pub WorldId, pub String);
1291
1292impl GangId {
1293 pub(crate) fn expand(&self, world_size: usize) -> impl Iterator<Item = ActorId> + '_ {
1294 (0..world_size).map(|rank| ActorId(ProcId::Ranked(self.0.clone(), rank), self.1.clone(), 0))
1295 }
1296
1297 pub fn world_id(&self) -> &WorldId {
1299 &self.0
1300 }
1301
1302 pub fn name(&self) -> &str {
1304 &self.1
1305 }
1306
1307 pub fn actor_id(&self, rank: Index) -> ActorId {
1310 ActorId(
1311 ProcId::Ranked(self.world_id().clone(), rank),
1312 self.name().to_string(),
1313 0,
1314 )
1315 }
1316}
1317
1318impl fmt::Display for GangId {
1319 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1320 let GangId(world_id, name) = self;
1321 write!(f, "{}.{}", world_id, name)
1322 }
1323}
1324
1325impl FromStr for GangId {
1326 type Err = ReferenceParsingError;
1327
1328 fn from_str(addr: &str) -> Result<Self, Self::Err> {
1329 match addr.parse()? {
1330 Reference::Gang(gang_id) => Ok(gang_id),
1331 _ => Err(ReferenceParsingError::WrongType("gang".into())),
1332 }
1333 }
1334}
1335
1336fn chop<'a>(mut s: &'a str, delims: &'a [&'a str]) -> impl Iterator<Item = &'a str> + 'a {
1338 std::iter::from_fn(move || {
1339 if s.is_empty() {
1340 return None;
1341 }
1342
1343 match delims
1344 .iter()
1345 .enumerate()
1346 .flat_map(|(index, d)| s.find(d).map(|pos| (index, pos)))
1347 .min_by_key(|&(_, v)| v)
1348 {
1349 Some((index, 0)) => {
1350 let delim = delims[index];
1351 s = &s[delim.len()..];
1352 Some(delim)
1353 }
1354 Some((_, pos)) => {
1355 let token = &s[..pos];
1356 s = &s[pos..];
1357 Some(token.trim())
1358 }
1359 None => {
1360 let token = s;
1361 s = "";
1362 Some(token.trim())
1363 }
1364 }
1365 })
1366}
1367
1368#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Hash, Ord)]
1370pub struct GangRef<A: Referable> {
1371 gang_id: GangId,
1372 phantom: PhantomData<A>,
1373}
1374
1375impl<A: Referable> GangRef<A> {
1376 pub fn rank(&self, rank: Index) -> ActorRef<A> {
1380 let GangRef {
1381 gang_id: GangId(world_id, name),
1382 ..
1383 } = self;
1384 ActorRef::attest(ActorId(
1385 ProcId::Ranked(world_id.clone(), rank),
1386 name.clone(),
1387 0,
1388 ))
1389 }
1390
1391 pub fn gang_id(&self) -> &GangId {
1393 &self.gang_id
1394 }
1395
1396 pub fn attest(gang_id: GangId) -> Self {
1399 Self {
1400 gang_id,
1401 phantom: PhantomData,
1402 }
1403 }
1404}
1405
1406impl<A: Referable> Clone for GangRef<A> {
1407 fn clone(&self) -> Self {
1408 Self {
1409 gang_id: self.gang_id.clone(),
1410 phantom: PhantomData,
1411 }
1412 }
1413}
1414
1415impl<A: Referable> From<GangRef<A>> for GangId {
1416 fn from(gang_ref: GangRef<A>) -> Self {
1417 gang_ref.gang_id
1418 }
1419}
1420
1421impl<'a, A: Referable> From<&'a GangRef<A>> for &'a GangId {
1422 fn from(gang_ref: &'a GangRef<A>) -> Self {
1423 &gang_ref.gang_id
1424 }
1425}
1426
1427#[cfg(test)]
1428mod tests {
1429 use rand::seq::SliceRandom;
1430 use rand::thread_rng;
1431
1432 use super::*;
1433 #[test]
1436 fn test_reference_parse() {
1437 let cases: Vec<(&str, Reference)> = vec![
1438 ("test", WorldId("test".into()).into()),
1439 (
1440 "test[234]",
1441 ProcId::Ranked(WorldId("test".into()), 234).into(),
1442 ),
1443 (
1444 "test[234].testactor[6]",
1445 ActorId(
1446 ProcId::Ranked(WorldId("test".into()), 234),
1447 "testactor".into(),
1448 6,
1449 )
1450 .into(),
1451 ),
1452 (
1453 "test[234].testactor[6][1]",
1454 PortId(
1455 ActorId(
1456 ProcId::Ranked(WorldId("test".into()), 234),
1457 "testactor".into(),
1458 6,
1459 ),
1460 1,
1461 )
1462 .into(),
1463 ),
1464 (
1465 "test.testactor",
1466 GangId(WorldId("test".into()), "testactor".into()).into(),
1467 ),
1468 (
1469 "tcp:[::1]:1234,test,testactor[123]",
1470 ActorId(
1471 ProcId::Direct("tcp:[::1]:1234".parse().unwrap(), "test".to_string()),
1472 "testactor".to_string(),
1473 123,
1474 )
1475 .into(),
1476 ),
1477 (
1478 "tcp:[::1]:1234,test,testactor[0][123<my::type>]",
1480 PortId(
1481 ActorId(
1482 ProcId::Direct("tcp:[::1]:1234".parse().unwrap(), "test".to_string()),
1483 "testactor".to_string(),
1484 0,
1485 ),
1486 123,
1487 )
1488 .into(),
1489 ),
1490 (
1491 "test[234].testactor_12345[6]",
1493 ActorId(
1494 ProcId::Ranked(WorldId("test".into()), 234),
1495 "testactor_12345".into(),
1496 6,
1497 )
1498 .into(),
1499 ),
1500 ];
1501
1502 for (s, expected) in cases {
1503 assert_eq!(s.parse::<Reference>().unwrap(), expected, "for {}", s);
1504 }
1505 }
1506
1507 #[test]
1508 fn test_reference_parse_error() {
1509 let cases: Vec<&str> = vec!["(blah)", "world(1, 2, 3)"];
1510
1511 for s in cases {
1512 let result: Result<Reference, ReferenceParsingError> = s.parse();
1513 assert!(result.is_err());
1514 }
1515 }
1516
1517 #[test]
1518 fn test_id_macro() {
1519 assert_eq!(id!(hello), WorldId("hello".into()));
1520 assert_eq!(id!(hello[0]), ProcId::Ranked(WorldId("hello".into()), 0));
1521 assert_eq!(
1522 id!(hello[0].actor),
1523 ActorId(
1524 ProcId::Ranked(WorldId("hello".into()), 0),
1525 "actor".into(),
1526 0
1527 )
1528 );
1529 assert_eq!(
1530 id!(hello[0].actor[1]),
1531 ActorId(
1532 ProcId::Ranked(WorldId("hello".into()), 0),
1533 "actor".into(),
1534 1
1535 )
1536 );
1537 assert_eq!(
1538 id!(hello.actor),
1539 GangId(WorldId("hello".into()), "actor".into())
1540 );
1541 }
1542
1543 #[test]
1544 fn test_reference_ord() {
1545 let expected: Vec<Reference> = [
1546 "first",
1547 "second",
1548 "second.actor1",
1549 "second.actor2",
1550 "second[1]",
1551 "second[1].actor1",
1552 "second[1].actor2",
1553 "second[2]",
1554 "second[2].actor100",
1555 "third",
1556 "third.actor",
1557 "third[2]",
1558 "third[2].actor",
1559 "third[2].actor[1]",
1560 ]
1561 .into_iter()
1562 .map(|s| s.parse().unwrap())
1563 .collect();
1564
1565 let mut sorted = expected.to_vec();
1566 sorted.shuffle(&mut thread_rng());
1567 sorted.sort();
1568
1569 assert_eq!(sorted, expected);
1570 }
1571
1572 #[test]
1573 fn test_port_type_annotation() {
1574 #[derive(typeuri::Named, Serialize, Deserialize)]
1575 struct MyType;
1576 wirevalue::register_type!(MyType);
1577 let port_id = PortId(
1578 ActorId(
1579 ProcId::Ranked(WorldId("test".into()), 234),
1580 "testactor".into(),
1581 1,
1582 ),
1583 MyType::port(),
1584 );
1585 assert_eq!(
1586 port_id.to_string(),
1587 "test[234].testactor[1][17867850292987402005<hyperactor::reference::tests::MyType>]"
1588 );
1589 }
1590}