1#![allow(dead_code)] use std::cmp::Ord;
26use std::cmp::Ordering;
27use std::cmp::PartialOrd;
28use std::convert::From;
29use std::fmt;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::marker::PhantomData;
33use std::num::ParseIntError;
34use std::str::FromStr;
35
36use derivative::Derivative;
37use enum_as_inner::EnumAsInner;
38use rand::Rng;
39use serde::Deserialize;
40use serde::Deserializer;
41use serde::Serialize;
42use serde::Serializer;
43
44use crate as hyperactor;
45use crate::Actor;
46use crate::ActorHandle;
47use crate::Named;
48use crate::RemoteHandles;
49use crate::RemoteMessage;
50use crate::accum::ReducerOpts;
51use crate::accum::ReducerSpec;
52use crate::actor::Referable;
53use crate::attrs::Attrs;
54use crate::channel::ChannelAddr;
55use crate::context;
56use crate::context::MailboxExt;
57use crate::data::Serialized;
58use crate::data::TypeInfo;
59use crate::mailbox::MailboxSenderError;
60use crate::mailbox::MailboxSenderErrorKind;
61use crate::mailbox::PortSink;
62use crate::message::Bind;
63use crate::message::Bindings;
64use crate::message::Unbind;
65
66pub mod lex;
67pub mod name;
68mod parse;
69
70use parse::Lexer;
71use parse::ParseError;
72use parse::Token;
73use parse::parse;
74
75#[derive(
91 Debug,
92 Serialize,
93 Deserialize,
94 Clone,
95 PartialEq,
96 Eq,
97 Hash,
98 Named,
99 EnumAsInner
100)]
101pub enum Reference {
102 World(WorldId),
104 Proc(ProcId),
106 Actor(ActorId), Port(PortId),
110 Gang(GangId),
112}
113
114impl Reference {
115 pub fn is_prefix_of(&self, other: &Reference) -> bool {
117 match self {
118 Self::World(_) => self.world_id() == other.world_id(),
119 Self::Proc(_) => self.proc_id() == other.proc_id(),
120 Self::Actor(_) => self == other,
121 Self::Port(_) => self == other,
122 Self::Gang(_) => self == other,
123 }
124 }
125
126 pub fn world_id(&self) -> Option<&WorldId> {
128 match self {
129 Self::World(world_id) => Some(world_id),
130 Self::Proc(proc_id) => proc_id.world_id(),
131 Self::Actor(ActorId(proc_id, _, _)) => proc_id.world_id(),
132 Self::Port(PortId(ActorId(proc_id, _, _), _)) => proc_id.world_id(),
133 Self::Gang(GangId(world_id, _)) => Some(world_id),
134 }
135 }
136
137 pub fn proc_id(&self) -> Option<&ProcId> {
139 match self {
140 Self::World(_) => None,
141 Self::Proc(proc_id) => Some(proc_id),
142 Self::Actor(ActorId(proc_id, _, _)) => Some(proc_id),
143 Self::Port(PortId(ActorId(proc_id, _, _), _)) => Some(proc_id),
144 Self::Gang(_) => None,
145 }
146 }
147
148 fn rank(&self) -> Option<Index> {
150 self.proc_id().and_then(|proc_id| proc_id.rank())
151 }
152
153 pub fn actor_id(&self) -> Option<&ActorId> {
155 match self {
156 Self::World(_) => None,
157 Self::Proc(_) => None,
158 Self::Actor(actor_id) => Some(actor_id),
159 Self::Port(PortId(actor_id, _)) => Some(actor_id),
160 Self::Gang(_) => None,
161 }
162 }
163
164 fn actor_name(&self) -> Option<&str> {
166 match self {
167 Self::World(_) => None,
168 Self::Proc(_) => None,
169 Self::Actor(actor_id) => Some(actor_id.name()),
170 Self::Port(PortId(actor_id, _)) => Some(actor_id.name()),
171 Self::Gang(gang_id) => Some(&gang_id.1),
172 }
173 }
174
175 fn pid(&self) -> Option<Index> {
177 match self {
178 Self::World(_) => None,
179 Self::Proc(_) => None,
180 Self::Actor(actor_id) => Some(actor_id.pid()),
181 Self::Port(PortId(actor_id, _)) => Some(actor_id.pid()),
182 Self::Gang(_) => None,
183 }
184 }
185
186 fn port(&self) -> Option<u64> {
188 match self {
189 Self::World(_) => None,
190 Self::Proc(_) => None,
191 Self::Actor(_) => None,
192 Self::Port(port_id) => Some(port_id.index()),
193 Self::Gang(_) => None,
194 }
195 }
196}
197
198impl PartialOrd for Reference {
199 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
200 Some(self.cmp(other))
201 }
202}
203
204impl Ord for Reference {
205 fn cmp(&self, other: &Self) -> Ordering {
206 (
207 self.world_id(),
209 self.rank(),
210 self.proc_id().and_then(ProcId::as_direct),
211 self.actor_name(),
212 self.pid(),
213 self.port(),
214 )
215 .cmp(&(
216 other.world_id(),
217 other.rank(),
218 other.proc_id().and_then(ProcId::as_direct),
219 other.actor_name(),
220 other.pid(),
221 other.port(),
222 ))
223 }
224}
225
226impl fmt::Display for Reference {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 match self {
229 Self::World(world_id) => fmt::Display::fmt(world_id, f),
230 Self::Proc(proc_id) => fmt::Display::fmt(proc_id, f),
231 Self::Actor(actor_id) => fmt::Display::fmt(actor_id, f),
232 Self::Port(port_id) => fmt::Display::fmt(port_id, f),
233 Self::Gang(gang_id) => fmt::Display::fmt(gang_id, f),
234 }
235 }
236}
237
238#[macro_export]
275macro_rules! id {
276 ($world:ident) => {
277 $crate::reference::WorldId(stringify!($world).to_string())
278 };
279 ($world:ident [$rank:expr]) => {
280 $crate::reference::ProcId::Ranked(
281 $crate::reference::WorldId(stringify!($world).to_string()),
282 $rank,
283 )
284 };
285 ($world:ident [$rank:expr] . $actor:ident) => {
286 $crate::reference::ActorId(
287 $crate::reference::ProcId::Ranked(
288 $crate::reference::WorldId(stringify!($world).to_string()),
289 $rank,
290 ),
291 stringify!($actor).to_string(),
292 0,
293 )
294 };
295 ($world:ident [$rank:expr] . $actor:ident [$pid:expr]) => {
296 $crate::reference::ActorId(
297 $crate::reference::ProcId::Ranked(
298 $crate::reference::WorldId(stringify!($world).to_string()),
299 $rank,
300 ),
301 stringify!($actor).to_string(),
302 $pid,
303 )
304 };
305 ($world:ident . $actor:ident) => {
306 $crate::reference::GangId(
307 $crate::reference::WorldId(stringify!($world).to_string()),
308 stringify!($actor).to_string(),
309 )
310 };
311 ($world:ident [$rank:expr] . $actor:ident [$pid:expr] [$port:expr]) => {
312 $crate::reference::PortId(
313 $crate::reference::ActorId(
314 $crate::reference::ProcId::Ranked(
315 $crate::reference::WorldId(stringify!($world).to_string()),
316 $rank,
317 ),
318 stringify!($actor).to_string(),
319 $pid,
320 ),
321 $port,
322 )
323 };
324}
325pub use id;
326
327#[derive(thiserror::Error, Debug)]
329pub enum ReferenceParsingError {
330 #[error("expected token")]
332 Empty,
333
334 #[error("unexpected token: {0}")]
336 Unexpected(String),
337
338 #[error(transparent)]
340 ParseInt(#[from] ParseIntError),
341
342 #[error("parse: {0}")]
344 Parse(#[from] ParseError),
345
346 #[error("wrong reference type: expected {0}")]
348 WrongType(String),
349
350 #[error("invalid channel address {0}: {1}")]
352 InvalidChannelAddress(String, anyhow::Error),
353}
354
355impl FromStr for Reference {
356 type Err = ReferenceParsingError;
357
358 fn from_str(addr: &str) -> Result<Self, Self::Err> {
359 match addr.split_once(",") {
366 Some((channel_addr, rest)) => {
367 let channel_addr = channel_addr.parse().map_err(|err| {
368 ReferenceParsingError::InvalidChannelAddress(channel_addr.to_string(), err)
369 })?;
370
371 Ok(parse! {
372 Lexer::new(rest);
373
374 Token::Elem(proc_name) =>
376 Self::Proc(ProcId::Direct(channel_addr, proc_name.to_string())),
377
378 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name) =>
380 Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), 0)),
381
382 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
384 Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
385 Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank)),
386
387 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
389 Token::LeftBracket Token::Uint(rank) Token::RightBracket
390 Token::LeftBracket Token::Uint(index) Token::RightBracket =>
391 Self::Port(PortId(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank), index as u64)),
392
393 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
395 Token::LeftBracket Token::Uint(rank) Token::RightBracket
396 Token::LeftBracket Token::Uint(index)
397 Token::LessThan Token::Elem(_type) Token::GreaterThan
398 Token::RightBracket =>
399 Self::Port(PortId(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank), index as u64)),
400 }?)
401 }
402
403 None => {
405 Ok(parse! {
406 Lexer::new(addr);
407
408 Token::Elem(world) => Self::World(WorldId(world.into())),
410
411 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
413 Self::Proc(ProcId::Ranked(WorldId(world.into()), rank)),
414
415 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
417 Token::Dot Token::Elem(actor) =>
418 Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), 0)),
419
420 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
422 Token::Dot Token::Elem(actor)
423 Token::LeftBracket Token::Uint(pid) Token::RightBracket =>
424 Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid)),
425
426 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
428 Token::Dot Token::Elem(actor)
429 Token::LeftBracket Token::Uint(pid) Token::RightBracket
430 Token::LeftBracket Token::Uint(index) Token::RightBracket =>
431 Self::Port(PortId(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid), index as u64)),
432
433 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
435 Token::Dot Token::Elem(actor)
436 Token::LeftBracket Token::Uint(pid) Token::RightBracket
437 Token::LeftBracket Token::Uint(index)
438 Token::LessThan Token::Elem(_type) Token::GreaterThan
439 Token::RightBracket =>
440 Self::Port(PortId(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid), index as u64)),
441
442 Token::Elem(world) Token::Dot Token::Elem(actor) =>
444 Self::Gang(GangId(WorldId(world.into()), actor.into())),
445 }?)
446 }
447 }
448 }
449}
450
451impl From<WorldId> for Reference {
452 fn from(world_id: WorldId) -> Self {
453 Self::World(world_id)
454 }
455}
456
457impl From<ProcId> for Reference {
458 fn from(proc_id: ProcId) -> Self {
459 Self::Proc(proc_id)
460 }
461}
462
463impl From<ActorId> for Reference {
464 fn from(actor_id: ActorId) -> Self {
465 Self::Actor(actor_id)
466 }
467}
468
469impl From<PortId> for Reference {
470 fn from(port_id: PortId) -> Self {
471 Self::Port(port_id)
472 }
473}
474
475impl From<GangId> for Reference {
476 fn from(gang_id: GangId) -> Self {
477 Self::Gang(gang_id)
478 }
479}
480
481pub type Index = usize;
484
485#[derive(
488 Debug,
489 Serialize,
490 Deserialize,
491 Clone,
492 PartialEq,
493 Eq,
494 PartialOrd,
495 Hash,
496 Ord,
497 Named
498)]
499pub struct WorldId(pub String);
500
501impl WorldId {
502 pub fn proc_id(&self, index: Index) -> ProcId {
504 ProcId::Ranked(self.clone(), index)
505 }
506
507 pub fn name(&self) -> &str {
509 &self.0
510 }
511
512 pub fn random_user_proc(&self) -> ProcId {
514 let mask = 1usize << (std::mem::size_of::<usize>() * 8 - 1);
515 ProcId::Ranked(self.clone(), rand::thread_rng().r#gen::<usize>() | mask)
516 }
517}
518
519impl fmt::Display for WorldId {
520 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521 let WorldId(name) = self;
522 write!(f, "{}", name)
523 }
524}
525
526impl FromStr for WorldId {
527 type Err = ReferenceParsingError;
528
529 fn from_str(addr: &str) -> Result<Self, Self::Err> {
530 match addr.parse()? {
531 Reference::World(world_id) => Ok(world_id),
532 _ => Err(ReferenceParsingError::WrongType("world".into())),
533 }
534 }
535}
536
537#[derive(
545 Debug,
546 Serialize,
547 Deserialize,
548 Clone,
549 PartialEq,
550 Eq,
551 PartialOrd,
552 Hash,
553 Ord,
554 Named,
555 EnumAsInner
556)]
557pub enum ProcId {
558 Ranked(WorldId, Index),
560 Direct(ChannelAddr, String),
562}
563
564impl ProcId {
565 pub fn actor_id(&self, name: impl Into<String>, pid: Index) -> ActorId {
567 ActorId(self.clone(), name.into(), pid)
568 }
569
570 pub fn world_id(&self) -> Option<&WorldId> {
572 match self {
573 ProcId::Ranked(world_id, _) => Some(world_id),
574 ProcId::Direct(_, _) => None,
575 }
576 }
577
578 pub fn world_name(&self) -> Option<&str> {
580 self.world_id().map(|world_id| world_id.name())
581 }
582
583 pub fn rank(&self) -> Option<Index> {
585 match self {
586 ProcId::Ranked(_, rank) => Some(*rank),
587 ProcId::Direct(_, _) => None,
588 }
589 }
590}
591
592impl fmt::Display for ProcId {
593 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
594 match self {
595 ProcId::Ranked(world_id, rank) => write!(f, "{}[{}]", world_id, rank),
596 ProcId::Direct(addr, name) => write!(f, "{},{}", addr, name),
597 }
598 }
599}
600
601impl FromStr for ProcId {
602 type Err = ReferenceParsingError;
603
604 fn from_str(addr: &str) -> Result<Self, Self::Err> {
605 match addr.parse()? {
606 Reference::Proc(proc_id) => Ok(proc_id),
607 _ => Err(ReferenceParsingError::WrongType("proc".into())),
608 }
609 }
610}
611
612#[derive(
614 Debug,
615 Serialize,
616 Deserialize,
617 Clone,
618 PartialEq,
619 Eq,
620 PartialOrd,
621 Hash,
622 Ord,
623 Named
624)]
625pub struct ActorId(pub ProcId, pub String, pub Index);
626
627impl ActorId {
628 pub fn port_id(&self, port: u64) -> PortId {
630 PortId(self.clone(), port)
631 }
632
633 pub fn child_id(&self, pid: Index) -> Self {
635 Self(self.0.clone(), self.1.clone(), pid)
636 }
637
638 pub fn root(proc_id: ProcId, name: String) -> Self {
640 Self(proc_id, name, 0)
641 }
642
643 pub fn proc_id(&self) -> &ProcId {
645 &self.0
646 }
647
648 pub fn world_name(&self) -> &str {
650 self.0
651 .world_name()
652 .expect("world_name() called on direct proc")
653 }
654
655 pub fn rank(&self) -> Index {
657 self.0.rank().expect("rank() called on direct proc")
658 }
659
660 pub fn name(&self) -> &str {
662 &self.1
663 }
664
665 pub fn pid(&self) -> Index {
667 self.2
668 }
669}
670
671impl fmt::Display for ActorId {
672 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
673 let ActorId(proc_id, name, pid) = self;
674 match proc_id {
675 ProcId::Ranked(..) => write!(f, "{}.{}[{}]", proc_id, name, pid),
676 ProcId::Direct(..) => write!(f, "{},{}[{}]", proc_id, name, pid),
677 }
678 }
679}
680impl<A: Referable> From<ActorRef<A>> for ActorId {
681 fn from(actor_ref: ActorRef<A>) -> Self {
682 actor_ref.actor_id.clone()
683 }
684}
685
686impl<'a, A: Referable> From<&'a ActorRef<A>> for &'a ActorId {
687 fn from(actor_ref: &'a ActorRef<A>) -> Self {
688 &actor_ref.actor_id
689 }
690}
691
692impl FromStr for ActorId {
693 type Err = ReferenceParsingError;
694
695 fn from_str(addr: &str) -> Result<Self, Self::Err> {
696 match addr.parse()? {
697 Reference::Actor(actor_id) => Ok(actor_id),
698 _ => Err(ReferenceParsingError::WrongType("actor".into())),
699 }
700 }
701}
702
703#[derive(Debug, Named)]
705pub struct ActorRef<A: Referable> {
706 pub(crate) actor_id: ActorId,
707 phantom: PhantomData<fn() -> A>,
709}
710
711impl<A: Referable> ActorRef<A> {
712 pub fn port<M: RemoteMessage>(&self) -> PortRef<M>
714 where
715 A: RemoteHandles<M>,
716 {
717 PortRef::attest(self.actor_id.port_id(<M as Named>::port()))
718 }
719
720 pub fn send<M: RemoteMessage>(
722 &self,
723 cx: &impl context::Actor,
724 message: M,
725 ) -> Result<(), MailboxSenderError>
726 where
727 A: RemoteHandles<M>,
728 {
729 self.port().send(cx, message)
730 }
731
732 pub fn send_with_headers<M: RemoteMessage>(
735 &self,
736 cx: &impl context::Actor,
737 headers: Attrs,
738 message: M,
739 ) -> Result<(), MailboxSenderError>
740 where
741 A: RemoteHandles<M>,
742 {
743 self.port().send_with_headers(cx, headers, message)
744 }
745
746 pub fn attest(actor_id: ActorId) -> Self {
751 Self {
752 actor_id,
753 phantom: PhantomData,
754 }
755 }
756
757 pub fn actor_id(&self) -> &ActorId {
759 &self.actor_id
760 }
761
762 pub fn into_actor_id(self) -> ActorId {
764 self.actor_id
765 }
766
767 pub fn downcast_handle(&self, cx: &impl context::Actor) -> Option<ActorHandle<A>>
771 where
772 A: Actor,
773 {
774 cx.instance().proc().resolve_actor_ref(self)
775 }
776}
777
778impl<A: Referable> Serialize for ActorRef<A> {
780 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
781 where
782 S: Serializer,
783 {
784 self.actor_id().serialize(serializer)
786 }
787}
788
789impl<'de, A: Referable> Deserialize<'de> for ActorRef<A> {
791 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
792 where
793 D: Deserializer<'de>,
794 {
795 let actor_id = <ActorId>::deserialize(deserializer)?;
796 Ok(ActorRef {
797 actor_id,
798 phantom: PhantomData,
799 })
800 }
801}
802
803impl<A: Referable> fmt::Display for ActorRef<A> {
804 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
805 fmt::Display::fmt(&self.actor_id, f)?;
806 write!(f, "<{}>", std::any::type_name::<A>())
807 }
808}
809
810impl<A: Referable> Clone for ActorRef<A> {
812 fn clone(&self) -> Self {
813 Self {
814 actor_id: self.actor_id.clone(),
815 phantom: PhantomData,
816 }
817 }
818}
819
820impl<A: Referable> PartialEq for ActorRef<A> {
821 fn eq(&self, other: &Self) -> bool {
822 self.actor_id == other.actor_id
823 }
824}
825
826impl<A: Referable> Eq for ActorRef<A> {}
827
828impl<A: Referable> PartialOrd for ActorRef<A> {
829 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
830 Some(self.cmp(other))
831 }
832}
833
834impl<A: Referable> Ord for ActorRef<A> {
835 fn cmp(&self, other: &Self) -> Ordering {
836 self.actor_id.cmp(&other.actor_id)
837 }
838}
839
840impl<A: Referable> Hash for ActorRef<A> {
841 fn hash<H: Hasher>(&self, state: &mut H) {
842 self.actor_id.hash(state);
843 }
844}
845
846#[derive(
851 Debug,
852 Serialize,
853 Deserialize,
854 Clone,
855 PartialEq,
856 Eq,
857 PartialOrd,
858 Hash,
859 Ord,
860 Named
861)]
862pub struct PortId(pub ActorId, pub u64);
863
864impl PortId {
865 pub fn actor_id(&self) -> &ActorId {
867 &self.0
868 }
869
870 pub fn into_actor_id(self) -> ActorId {
872 self.0
873 }
874
875 pub fn index(&self) -> u64 {
877 self.1
878 }
879
880 pub fn send(&self, cx: &impl context::Actor, serialized: Serialized) {
884 let mut headers = Attrs::new();
885 crate::mailbox::headers::set_send_timestamp(&mut headers);
886 cx.post(self.clone(), headers, serialized);
887 }
888
889 pub fn send_with_headers(
893 &self,
894 cx: &impl context::Actor,
895 serialized: Serialized,
896 mut headers: Attrs,
897 ) {
898 crate::mailbox::headers::set_send_timestamp(&mut headers);
899 cx.post(self.clone(), headers, serialized);
900 }
901
902 pub fn split(
905 &self,
906 cx: &impl context::Mailbox,
907 reducer_spec: Option<ReducerSpec>,
908 reducer_opts: Option<ReducerOpts>,
909 ) -> anyhow::Result<PortId> {
910 cx.split(self.clone(), reducer_spec, reducer_opts)
911 }
912}
913
914impl FromStr for PortId {
915 type Err = ReferenceParsingError;
916
917 fn from_str(addr: &str) -> Result<Self, Self::Err> {
918 match addr.parse()? {
919 Reference::Port(port_id) => Ok(port_id),
920 _ => Err(ReferenceParsingError::WrongType("port".into())),
921 }
922 }
923}
924
925impl fmt::Display for PortId {
926 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
927 let PortId(actor_id, port) = self;
928 if port & (1 << 63) != 0 {
929 let type_info = TypeInfo::get(*port).or_else(|| TypeInfo::get(*port & !(1 << 63)));
930 let typename = type_info.map_or("unknown", TypeInfo::typename);
931 write!(f, "{}[{}<{}>]", actor_id, port, typename)
932 } else {
933 write!(f, "{}[{}]", actor_id, port)
934 }
935 }
936}
937
938#[derive(Debug, Serialize, Deserialize, Derivative, Named)]
941#[derivative(PartialEq, Eq, PartialOrd, Hash, Ord)]
942pub struct PortRef<M> {
943 port_id: PortId,
944 #[derivative(
945 PartialEq = "ignore",
946 PartialOrd = "ignore",
947 Ord = "ignore",
948 Hash = "ignore"
949 )]
950 reducer_spec: Option<ReducerSpec>,
951 #[derivative(
952 PartialEq = "ignore",
953 PartialOrd = "ignore",
954 Ord = "ignore",
955 Hash = "ignore"
956 )]
957 reducer_opts: Option<ReducerOpts>,
958 phantom: PhantomData<M>,
959}
960
961impl<M: RemoteMessage> PortRef<M> {
962 pub fn attest(port_id: PortId) -> Self {
965 Self {
966 port_id,
967 reducer_spec: None,
968 reducer_opts: None,
969 phantom: PhantomData,
970 }
971 }
972
973 pub fn attest_reducible(port_id: PortId, reducer_spec: Option<ReducerSpec>) -> Self {
976 Self {
977 port_id,
978 reducer_spec,
979 reducer_opts: None, phantom: PhantomData,
981 }
982 }
983
984 pub fn attest_message_port(actor: &ActorId) -> Self {
987 PortRef::<M>::attest(actor.port_id(<M as Named>::port()))
988 }
989
990 pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
993 &self.reducer_spec
994 }
995
996 pub fn port_id(&self) -> &PortId {
998 &self.port_id
999 }
1000
1001 pub fn into_port_id(self) -> PortId {
1003 self.port_id
1004 }
1005
1006 pub fn into_once(self) -> OncePortRef<M> {
1009 OncePortRef::attest(self.into_port_id())
1010 }
1011
1012 pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1015 self.send_with_headers(cx, Attrs::new(), message)
1016 }
1017
1018 pub fn send_with_headers(
1022 &self,
1023 cx: &impl context::Actor,
1024 headers: Attrs,
1025 message: M,
1026 ) -> Result<(), MailboxSenderError> {
1027 let serialized = Serialized::serialize(&message).map_err(|err| {
1028 MailboxSenderError::new_bound(
1029 self.port_id.clone(),
1030 MailboxSenderErrorKind::Serialize(err.into()),
1031 )
1032 })?;
1033 self.send_serialized(cx, headers, serialized);
1034 Ok(())
1035 }
1036
1037 pub fn send_serialized(
1040 &self,
1041 cx: &impl context::Actor,
1042 mut headers: Attrs,
1043 message: Serialized,
1044 ) {
1045 crate::mailbox::headers::set_send_timestamp(&mut headers);
1046 cx.post(self.port_id.clone(), headers, message);
1047 }
1048
1049 pub fn into_sink<C: context::Actor>(self, cx: C) -> PortSink<C, M> {
1051 PortSink::new(cx, self)
1052 }
1053}
1054
1055impl<M: RemoteMessage> Clone for PortRef<M> {
1056 fn clone(&self) -> Self {
1057 Self {
1058 port_id: self.port_id.clone(),
1059 reducer_spec: self.reducer_spec.clone(),
1060 reducer_opts: self.reducer_opts.clone(),
1061 phantom: PhantomData,
1062 }
1063 }
1064}
1065
1066impl<M: RemoteMessage> fmt::Display for PortRef<M> {
1067 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1068 fmt::Display::fmt(&self.port_id, f)
1069 }
1070}
1071
1072#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
1074pub struct UnboundPort(pub PortId, pub Option<ReducerSpec>, pub Option<ReducerOpts>);
1075
1076impl UnboundPort {
1077 pub fn update(&mut self, port_id: PortId) {
1079 self.0 = port_id;
1080 }
1081}
1082
1083impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
1084 fn from(port_ref: &PortRef<M>) -> Self {
1085 UnboundPort(
1086 port_ref.port_id.clone(),
1087 port_ref.reducer_spec.clone(),
1088 port_ref.reducer_opts.clone(),
1089 )
1090 }
1091}
1092
1093impl<M: RemoteMessage> Unbind for PortRef<M> {
1094 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
1095 bindings.push_back(&UnboundPort::from(self))
1096 }
1097}
1098
1099impl<M: RemoteMessage> Bind for PortRef<M> {
1100 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
1101 let bound = bindings.try_pop_front::<UnboundPort>()?;
1102 self.port_id = bound.0;
1103 self.reducer_spec = bound.1;
1104 self.reducer_opts = bound.2;
1105 Ok(())
1106 }
1107}
1108
1109#[derive(Debug, Serialize, Deserialize, PartialEq)]
1113pub struct OncePortRef<M> {
1114 port_id: PortId,
1115 phantom: PhantomData<M>,
1116}
1117
1118impl<M: RemoteMessage> OncePortRef<M> {
1119 pub(crate) fn attest(port_id: PortId) -> Self {
1120 Self {
1121 port_id,
1122 phantom: PhantomData,
1123 }
1124 }
1125
1126 pub fn port_id(&self) -> &PortId {
1128 &self.port_id
1129 }
1130
1131 pub fn into_port_id(self) -> PortId {
1133 self.port_id
1134 }
1135
1136 pub fn send(self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1139 self.send_with_headers(cx, Attrs::new(), message)
1140 }
1141
1142 pub fn send_with_headers(
1145 self,
1146 cx: &impl context::Actor,
1147 mut headers: Attrs,
1148 message: M,
1149 ) -> Result<(), MailboxSenderError> {
1150 crate::mailbox::headers::set_send_timestamp(&mut headers);
1151 let serialized = Serialized::serialize(&message).map_err(|err| {
1152 MailboxSenderError::new_bound(
1153 self.port_id.clone(),
1154 MailboxSenderErrorKind::Serialize(err.into()),
1155 )
1156 })?;
1157 cx.post(self.port_id.clone(), headers, serialized);
1158 Ok(())
1159 }
1160}
1161
1162impl<M: RemoteMessage> Clone for OncePortRef<M> {
1163 fn clone(&self) -> Self {
1164 Self {
1165 port_id: self.port_id.clone(),
1166 phantom: PhantomData,
1167 }
1168 }
1169}
1170
1171impl<M: RemoteMessage> fmt::Display for OncePortRef<M> {
1172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1173 fmt::Display::fmt(&self.port_id, f)
1174 }
1175}
1176
1177impl<M: RemoteMessage> Named for OncePortRef<M> {
1178 fn typename() -> &'static str {
1179 crate::data::intern_typename!(Self, "hyperactor::mailbox::OncePortRef<{}>", M)
1180 }
1181}
1182
1183impl<M: RemoteMessage> Unbind for OncePortRef<M> {
1187 fn unbind(&self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1188 Ok(())
1189 }
1190}
1191
1192impl<M: RemoteMessage> Bind for OncePortRef<M> {
1193 fn bind(&mut self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1194 Ok(())
1195 }
1196}
1197
1198#[derive(
1200 Debug,
1201 Serialize,
1202 Deserialize,
1203 Clone,
1204 PartialEq,
1205 Eq,
1206 PartialOrd,
1207 Hash,
1208 Ord,
1209 Named
1210)]
1211pub struct GangId(pub WorldId, pub String);
1212
1213impl GangId {
1214 pub(crate) fn expand(&self, world_size: usize) -> impl Iterator<Item = ActorId> + '_ {
1215 (0..world_size).map(|rank| ActorId(ProcId::Ranked(self.0.clone(), rank), self.1.clone(), 0))
1216 }
1217
1218 pub fn world_id(&self) -> &WorldId {
1220 &self.0
1221 }
1222
1223 pub fn name(&self) -> &str {
1225 &self.1
1226 }
1227
1228 pub fn actor_id(&self, rank: Index) -> ActorId {
1231 ActorId(
1232 ProcId::Ranked(self.world_id().clone(), rank),
1233 self.name().to_string(),
1234 0,
1235 )
1236 }
1237}
1238
1239impl fmt::Display for GangId {
1240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1241 let GangId(world_id, name) = self;
1242 write!(f, "{}.{}", world_id, name)
1243 }
1244}
1245
1246impl FromStr for GangId {
1247 type Err = ReferenceParsingError;
1248
1249 fn from_str(addr: &str) -> Result<Self, Self::Err> {
1250 match addr.parse()? {
1251 Reference::Gang(gang_id) => Ok(gang_id),
1252 _ => Err(ReferenceParsingError::WrongType("gang".into())),
1253 }
1254 }
1255}
1256
1257fn chop<'a>(mut s: &'a str, delims: &'a [&'a str]) -> impl Iterator<Item = &'a str> + 'a {
1259 std::iter::from_fn(move || {
1260 if s.is_empty() {
1261 return None;
1262 }
1263
1264 match delims
1265 .iter()
1266 .enumerate()
1267 .flat_map(|(index, d)| s.find(d).map(|pos| (index, pos)))
1268 .min_by_key(|&(_, v)| v)
1269 {
1270 Some((index, 0)) => {
1271 let delim = delims[index];
1272 s = &s[delim.len()..];
1273 Some(delim)
1274 }
1275 Some((_, pos)) => {
1276 let token = &s[..pos];
1277 s = &s[pos..];
1278 Some(token.trim())
1279 }
1280 None => {
1281 let token = s;
1282 s = "";
1283 Some(token.trim())
1284 }
1285 }
1286 })
1287}
1288
1289#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Hash, Ord)]
1291pub struct GangRef<A: Referable> {
1292 gang_id: GangId,
1293 phantom: PhantomData<A>,
1294}
1295
1296impl<A: Referable> GangRef<A> {
1297 pub fn rank(&self, rank: Index) -> ActorRef<A> {
1301 let GangRef {
1302 gang_id: GangId(world_id, name),
1303 ..
1304 } = self;
1305 ActorRef::attest(ActorId(
1306 ProcId::Ranked(world_id.clone(), rank),
1307 name.clone(),
1308 0,
1309 ))
1310 }
1311
1312 pub fn gang_id(&self) -> &GangId {
1314 &self.gang_id
1315 }
1316}
1317
1318impl<A: Referable> Clone for GangRef<A> {
1319 fn clone(&self) -> Self {
1320 Self {
1321 gang_id: self.gang_id.clone(),
1322 phantom: PhantomData,
1323 }
1324 }
1325}
1326
1327impl<A: Referable> From<GangId> for GangRef<A> {
1329 fn from(gang_id: GangId) -> Self {
1330 Self {
1331 gang_id,
1332 phantom: PhantomData,
1333 }
1334 }
1335}
1336
1337impl<A: Referable> From<GangRef<A>> for GangId {
1338 fn from(gang_ref: GangRef<A>) -> Self {
1339 gang_ref.gang_id
1340 }
1341}
1342
1343impl<'a, A: Referable> From<&'a GangRef<A>> for &'a GangId {
1344 fn from(gang_ref: &'a GangRef<A>) -> Self {
1345 &gang_ref.gang_id
1346 }
1347}
1348
1349#[cfg(test)]
1350mod tests {
1351 use rand::seq::SliceRandom;
1352 use rand::thread_rng;
1353
1354 use super::*;
1355
1356 #[test]
1357 fn test_reference_parse() {
1358 let cases: Vec<(&str, Reference)> = vec![
1359 ("test", WorldId("test".into()).into()),
1360 (
1361 "test[234]",
1362 ProcId::Ranked(WorldId("test".into()), 234).into(),
1363 ),
1364 (
1365 "test[234].testactor[6]",
1366 ActorId(
1367 ProcId::Ranked(WorldId("test".into()), 234),
1368 "testactor".into(),
1369 6,
1370 )
1371 .into(),
1372 ),
1373 (
1374 "test[234].testactor[6][1]",
1375 PortId(
1376 ActorId(
1377 ProcId::Ranked(WorldId("test".into()), 234),
1378 "testactor".into(),
1379 6,
1380 ),
1381 1,
1382 )
1383 .into(),
1384 ),
1385 (
1386 "test.testactor",
1387 GangId(WorldId("test".into()), "testactor".into()).into(),
1388 ),
1389 (
1390 "tcp:[::1]:1234,test,testactor[123]",
1391 ActorId(
1392 ProcId::Direct("tcp:[::1]:1234".parse().unwrap(), "test".to_string()),
1393 "testactor".to_string(),
1394 123,
1395 )
1396 .into(),
1397 ),
1398 (
1399 "tcp:[::1]:1234,test,testactor[0][123<my::type>]",
1401 PortId(
1402 ActorId(
1403 ProcId::Direct("tcp:[::1]:1234".parse().unwrap(), "test".to_string()),
1404 "testactor".to_string(),
1405 0,
1406 ),
1407 123,
1408 )
1409 .into(),
1410 ),
1411 (
1412 "test[234].testactor_12345[6]",
1414 ActorId(
1415 ProcId::Ranked(WorldId("test".into()), 234),
1416 "testactor_12345".into(),
1417 6,
1418 )
1419 .into(),
1420 ),
1421 ];
1422
1423 for (s, expected) in cases {
1424 assert_eq!(s.parse::<Reference>().unwrap(), expected, "for {}", s);
1425 }
1426 }
1427
1428 #[test]
1429 fn test_reference_parse_error() {
1430 let cases: Vec<&str> = vec![
1431 "(blah)",
1432 "world(1, 2, 3)",
1433 "test[234].testactor-12345[6]",
1435 ];
1436
1437 for s in cases {
1438 let result: Result<Reference, ReferenceParsingError> = s.parse();
1439 assert!(result.is_err());
1440 }
1441 }
1442
1443 #[test]
1444 fn test_id_macro() {
1445 assert_eq!(id!(hello), WorldId("hello".into()));
1446 assert_eq!(id!(hello[0]), ProcId::Ranked(WorldId("hello".into()), 0));
1447 assert_eq!(
1448 id!(hello[0].actor),
1449 ActorId(
1450 ProcId::Ranked(WorldId("hello".into()), 0),
1451 "actor".into(),
1452 0
1453 )
1454 );
1455 assert_eq!(
1456 id!(hello[0].actor[1]),
1457 ActorId(
1458 ProcId::Ranked(WorldId("hello".into()), 0),
1459 "actor".into(),
1460 1
1461 )
1462 );
1463 assert_eq!(
1464 id!(hello.actor),
1465 GangId(WorldId("hello".into()), "actor".into())
1466 );
1467 }
1468
1469 #[test]
1470 fn test_reference_ord() {
1471 let expected: Vec<Reference> = [
1472 "first",
1473 "second",
1474 "second.actor1",
1475 "second.actor2",
1476 "second[1]",
1477 "second[1].actor1",
1478 "second[1].actor2",
1479 "second[2]",
1480 "second[2].actor100",
1481 "third",
1482 "third.actor",
1483 "third[2]",
1484 "third[2].actor",
1485 "third[2].actor[1]",
1486 ]
1487 .into_iter()
1488 .map(|s| s.parse().unwrap())
1489 .collect();
1490
1491 let mut sorted = expected.to_vec();
1492 sorted.shuffle(&mut thread_rng());
1493 sorted.sort();
1494
1495 assert_eq!(sorted, expected);
1496 }
1497
1498 #[test]
1499 fn test_port_type_annotation() {
1500 #[derive(Named, Serialize, Deserialize)]
1501 struct MyType;
1502 let port_id = PortId(
1503 ActorId(
1504 ProcId::Ranked(WorldId("test".into()), 234),
1505 "testactor".into(),
1506 1,
1507 ),
1508 MyType::port(),
1509 );
1510 assert_eq!(
1511 port_id.to_string(),
1512 "test[234].testactor[1][17867850292987402005<hyperactor::reference::tests::MyType>]"
1513 );
1514 }
1515}