1#![allow(dead_code)] use std::cmp::Ord;
26use std::cmp::Ordering;
27use std::cmp::PartialOrd;
28use std::convert::From;
29use std::fmt;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::marker::PhantomData;
33use std::num::ParseIntError;
34use std::str::FromStr;
35
36use derivative::Derivative;
37use enum_as_inner::EnumAsInner;
38use rand::Rng;
39use serde::Deserialize;
40use serde::Deserializer;
41use serde::Serialize;
42use serde::Serializer;
43
44use crate as hyperactor;
45use crate::Actor;
46use crate::ActorHandle;
47use crate::Named;
48use crate::RemoteHandles;
49use crate::RemoteMessage;
50use crate::accum::ReducerOpts;
51use crate::accum::ReducerSpec;
52use crate::actor::Referable;
53use crate::attrs::Attrs;
54use crate::channel::ChannelAddr;
55use crate::context;
56use crate::context::MailboxExt;
57use crate::data::Serialized;
58use crate::data::TypeInfo;
59use crate::mailbox::MailboxSenderError;
60use crate::mailbox::MailboxSenderErrorKind;
61use crate::mailbox::PortSink;
62use crate::message::Bind;
63use crate::message::Bindings;
64use crate::message::Unbind;
65
66pub mod lex;
67pub mod name;
68mod parse;
69
70use parse::Lexer;
71use parse::ParseError;
72use parse::Token;
73use parse::parse;
74
75#[derive(
91 Debug,
92 Serialize,
93 Deserialize,
94 Clone,
95 PartialEq,
96 Eq,
97 Hash,
98 Named,
99 EnumAsInner
100)]
101pub enum Reference {
102 World(WorldId),
104 Proc(ProcId),
106 Actor(ActorId), Port(PortId),
110 Gang(GangId),
112}
113
114impl Reference {
115 pub fn is_prefix_of(&self, other: &Reference) -> bool {
117 match self {
118 Self::World(_) => self.world_id() == other.world_id(),
119 Self::Proc(_) => self.proc_id() == other.proc_id(),
120 Self::Actor(_) => self == other,
121 Self::Port(_) => self == other,
122 Self::Gang(_) => self == other,
123 }
124 }
125
126 pub fn world_id(&self) -> Option<&WorldId> {
128 match self {
129 Self::World(world_id) => Some(world_id),
130 Self::Proc(proc_id) => proc_id.world_id(),
131 Self::Actor(ActorId(proc_id, _, _)) => proc_id.world_id(),
132 Self::Port(PortId(ActorId(proc_id, _, _), _)) => proc_id.world_id(),
133 Self::Gang(GangId(world_id, _)) => Some(world_id),
134 }
135 }
136
137 pub fn proc_id(&self) -> Option<&ProcId> {
139 match self {
140 Self::World(_) => None,
141 Self::Proc(proc_id) => Some(proc_id),
142 Self::Actor(ActorId(proc_id, _, _)) => Some(proc_id),
143 Self::Port(PortId(ActorId(proc_id, _, _), _)) => Some(proc_id),
144 Self::Gang(_) => None,
145 }
146 }
147
148 fn rank(&self) -> Option<Index> {
150 self.proc_id().and_then(|proc_id| proc_id.rank())
151 }
152
153 pub fn actor_id(&self) -> Option<&ActorId> {
155 match self {
156 Self::World(_) => None,
157 Self::Proc(_) => None,
158 Self::Actor(actor_id) => Some(actor_id),
159 Self::Port(PortId(actor_id, _)) => Some(actor_id),
160 Self::Gang(_) => None,
161 }
162 }
163
164 fn actor_name(&self) -> Option<&str> {
166 match self {
167 Self::World(_) => None,
168 Self::Proc(_) => None,
169 Self::Actor(actor_id) => Some(actor_id.name()),
170 Self::Port(PortId(actor_id, _)) => Some(actor_id.name()),
171 Self::Gang(gang_id) => Some(&gang_id.1),
172 }
173 }
174
175 fn pid(&self) -> Option<Index> {
177 match self {
178 Self::World(_) => None,
179 Self::Proc(_) => None,
180 Self::Actor(actor_id) => Some(actor_id.pid()),
181 Self::Port(PortId(actor_id, _)) => Some(actor_id.pid()),
182 Self::Gang(_) => None,
183 }
184 }
185
186 fn port(&self) -> Option<u64> {
188 match self {
189 Self::World(_) => None,
190 Self::Proc(_) => None,
191 Self::Actor(_) => None,
192 Self::Port(port_id) => Some(port_id.index()),
193 Self::Gang(_) => None,
194 }
195 }
196}
197
198impl PartialOrd for Reference {
199 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
200 Some(self.cmp(other))
201 }
202}
203
204impl Ord for Reference {
205 fn cmp(&self, other: &Self) -> Ordering {
206 (
207 self.world_id(),
209 self.rank(),
210 self.proc_id().and_then(ProcId::as_direct),
211 self.actor_name(),
212 self.pid(),
213 self.port(),
214 )
215 .cmp(&(
216 other.world_id(),
217 other.rank(),
218 other.proc_id().and_then(ProcId::as_direct),
219 other.actor_name(),
220 other.pid(),
221 other.port(),
222 ))
223 }
224}
225
226impl fmt::Display for Reference {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 match self {
229 Self::World(world_id) => fmt::Display::fmt(world_id, f),
230 Self::Proc(proc_id) => fmt::Display::fmt(proc_id, f),
231 Self::Actor(actor_id) => fmt::Display::fmt(actor_id, f),
232 Self::Port(port_id) => fmt::Display::fmt(port_id, f),
233 Self::Gang(gang_id) => fmt::Display::fmt(gang_id, f),
234 }
235 }
236}
237
238#[macro_export]
275macro_rules! id {
276 ($world:ident) => {
277 $crate::reference::WorldId(stringify!($world).to_string())
278 };
279 ($world:ident [$rank:expr]) => {
280 $crate::reference::ProcId::Ranked(
281 $crate::reference::WorldId(stringify!($world).to_string()),
282 $rank,
283 )
284 };
285 ($world:ident [$rank:expr] . $actor:ident) => {
286 $crate::reference::ActorId(
287 $crate::reference::ProcId::Ranked(
288 $crate::reference::WorldId(stringify!($world).to_string()),
289 $rank,
290 ),
291 stringify!($actor).to_string(),
292 0,
293 )
294 };
295 ($world:ident [$rank:expr] . $actor:ident [$pid:expr]) => {
296 $crate::reference::ActorId(
297 $crate::reference::ProcId::Ranked(
298 $crate::reference::WorldId(stringify!($world).to_string()),
299 $rank,
300 ),
301 stringify!($actor).to_string(),
302 $pid,
303 )
304 };
305 ($world:ident . $actor:ident) => {
306 $crate::reference::GangId(
307 $crate::reference::WorldId(stringify!($world).to_string()),
308 stringify!($actor).to_string(),
309 )
310 };
311 ($world:ident [$rank:expr] . $actor:ident [$pid:expr] [$port:expr]) => {
312 $crate::reference::PortId(
313 $crate::reference::ActorId(
314 $crate::reference::ProcId::Ranked(
315 $crate::reference::WorldId(stringify!($world).to_string()),
316 $rank,
317 ),
318 stringify!($actor).to_string(),
319 $pid,
320 ),
321 $port,
322 )
323 };
324}
325pub use id;
326
327#[derive(thiserror::Error, Debug)]
329pub enum ReferenceParsingError {
330 #[error("expected token")]
332 Empty,
333
334 #[error("unexpected token: {0}")]
336 Unexpected(String),
337
338 #[error(transparent)]
340 ParseInt(#[from] ParseIntError),
341
342 #[error("parse: {0}")]
344 Parse(#[from] ParseError),
345
346 #[error("wrong reference type: expected {0}")]
348 WrongType(String),
349
350 #[error("invalid channel address {0}: {1}")]
352 InvalidChannelAddress(String, anyhow::Error),
353}
354
355impl FromStr for Reference {
356 type Err = ReferenceParsingError;
357
358 fn from_str(addr: &str) -> Result<Self, Self::Err> {
359 match addr.split_once(",") {
366 Some((channel_addr, rest)) => {
367 let channel_addr = channel_addr.parse().map_err(|err| {
368 ReferenceParsingError::InvalidChannelAddress(channel_addr.to_string(), err)
369 })?;
370
371 Ok(parse! {
372 Lexer::new(rest);
373
374 Token::Elem(proc_name) =>
376 Self::Proc(ProcId::Direct(channel_addr, proc_name.to_string())),
377
378 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name) =>
380 Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), 0)),
381
382 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
384 Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
385 Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank)),
386
387 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
389 Token::LeftBracket Token::Uint(rank) Token::RightBracket
390 Token::LeftBracket Token::Uint(index) Token::RightBracket =>
391 Self::Port(PortId(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank), index as u64)),
392
393 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
395 Token::LeftBracket Token::Uint(rank) Token::RightBracket
396 Token::LeftBracket Token::Uint(index)
397 Token::LessThan Token::Elem(_type) Token::GreaterThan
398 Token::RightBracket =>
399 Self::Port(PortId(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank), index as u64)),
400 }?)
401 }
402
403 None => {
405 Ok(parse! {
406 Lexer::new(addr);
407
408 Token::Elem(world) => Self::World(WorldId(world.into())),
410
411 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
413 Self::Proc(ProcId::Ranked(WorldId(world.into()), rank)),
414
415 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
417 Token::Dot Token::Elem(actor) =>
418 Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), 0)),
419
420 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
422 Token::Dot Token::Elem(actor)
423 Token::LeftBracket Token::Uint(pid) Token::RightBracket =>
424 Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid)),
425
426 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
428 Token::Dot Token::Elem(actor)
429 Token::LeftBracket Token::Uint(pid) Token::RightBracket
430 Token::LeftBracket Token::Uint(index) Token::RightBracket =>
431 Self::Port(PortId(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid), index as u64)),
432
433 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
435 Token::Dot Token::Elem(actor)
436 Token::LeftBracket Token::Uint(pid) Token::RightBracket
437 Token::LeftBracket Token::Uint(index)
438 Token::LessThan Token::Elem(_type) Token::GreaterThan
439 Token::RightBracket =>
440 Self::Port(PortId(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid), index as u64)),
441
442 Token::Elem(world) Token::Dot Token::Elem(actor) =>
444 Self::Gang(GangId(WorldId(world.into()), actor.into())),
445 }?)
446 }
447 }
448 }
449}
450
451impl From<WorldId> for Reference {
452 fn from(world_id: WorldId) -> Self {
453 Self::World(world_id)
454 }
455}
456
457impl From<ProcId> for Reference {
458 fn from(proc_id: ProcId) -> Self {
459 Self::Proc(proc_id)
460 }
461}
462
463impl From<ActorId> for Reference {
464 fn from(actor_id: ActorId) -> Self {
465 Self::Actor(actor_id)
466 }
467}
468
469impl From<PortId> for Reference {
470 fn from(port_id: PortId) -> Self {
471 Self::Port(port_id)
472 }
473}
474
475impl From<GangId> for Reference {
476 fn from(gang_id: GangId) -> Self {
477 Self::Gang(gang_id)
478 }
479}
480
481pub type Index = usize;
484
485#[derive(
488 Debug,
489 Serialize,
490 Deserialize,
491 Clone,
492 PartialEq,
493 Eq,
494 PartialOrd,
495 Hash,
496 Ord,
497 Named
498)]
499pub struct WorldId(pub String);
500
501impl WorldId {
502 pub fn proc_id(&self, index: Index) -> ProcId {
504 ProcId::Ranked(self.clone(), index)
505 }
506
507 pub fn name(&self) -> &str {
509 &self.0
510 }
511
512 pub fn random_user_proc(&self) -> ProcId {
514 let mask = 1usize << (std::mem::size_of::<usize>() * 8 - 1);
515 ProcId::Ranked(self.clone(), rand::thread_rng().r#gen::<usize>() | mask)
516 }
517}
518
519impl fmt::Display for WorldId {
520 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521 let WorldId(name) = self;
522 write!(f, "{}", name)
523 }
524}
525
526impl FromStr for WorldId {
527 type Err = ReferenceParsingError;
528
529 fn from_str(addr: &str) -> Result<Self, Self::Err> {
530 match addr.parse()? {
531 Reference::World(world_id) => Ok(world_id),
532 _ => Err(ReferenceParsingError::WrongType("world".into())),
533 }
534 }
535}
536
537#[derive(
545 Debug,
546 Serialize,
547 Deserialize,
548 Clone,
549 PartialEq,
550 Eq,
551 PartialOrd,
552 Hash,
553 Ord,
554 Named,
555 EnumAsInner
556)]
557pub enum ProcId {
558 Ranked(WorldId, Index),
560 Direct(ChannelAddr, String),
562}
563
564impl ProcId {
565 pub fn actor_id(&self, name: impl Into<String>, pid: Index) -> ActorId {
567 ActorId(self.clone(), name.into(), pid)
568 }
569
570 pub fn world_id(&self) -> Option<&WorldId> {
572 match self {
573 ProcId::Ranked(world_id, _) => Some(world_id),
574 ProcId::Direct(_, _) => None,
575 }
576 }
577
578 pub fn world_name(&self) -> Option<&str> {
580 self.world_id().map(|world_id| world_id.name())
581 }
582
583 pub fn rank(&self) -> Option<Index> {
585 match self {
586 ProcId::Ranked(_, rank) => Some(*rank),
587 ProcId::Direct(_, _) => None,
588 }
589 }
590
591 pub fn name(&self) -> Option<&String> {
593 match self {
594 ProcId::Ranked(_, _) => None,
595 ProcId::Direct(_, name) => Some(name),
596 }
597 }
598}
599
600impl fmt::Display for ProcId {
601 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602 match self {
603 ProcId::Ranked(world_id, rank) => write!(f, "{}[{}]", world_id, rank),
604 ProcId::Direct(addr, name) => write!(f, "{},{}", addr, name),
605 }
606 }
607}
608
609impl FromStr for ProcId {
610 type Err = ReferenceParsingError;
611
612 fn from_str(addr: &str) -> Result<Self, Self::Err> {
613 match addr.parse()? {
614 Reference::Proc(proc_id) => Ok(proc_id),
615 _ => Err(ReferenceParsingError::WrongType("proc".into())),
616 }
617 }
618}
619
620#[derive(
622 Debug,
623 Serialize,
624 Deserialize,
625 Clone,
626 PartialEq,
627 Eq,
628 PartialOrd,
629 Hash,
630 Ord,
631 Named
632)]
633pub struct ActorId(pub ProcId, pub String, pub Index);
634
635impl ActorId {
636 pub fn port_id(&self, port: u64) -> PortId {
638 PortId(self.clone(), port)
639 }
640
641 pub fn child_id(&self, pid: Index) -> Self {
643 Self(self.0.clone(), self.1.clone(), pid)
644 }
645
646 pub fn root(proc_id: ProcId, name: String) -> Self {
648 Self(proc_id, name, 0)
649 }
650
651 pub fn proc_id(&self) -> &ProcId {
653 &self.0
654 }
655
656 pub fn world_name(&self) -> &str {
658 self.0
659 .world_name()
660 .expect("world_name() called on direct proc")
661 }
662
663 pub fn rank(&self) -> Index {
665 self.0.rank().expect("rank() called on direct proc")
666 }
667
668 pub fn name(&self) -> &str {
670 &self.1
671 }
672
673 pub fn pid(&self) -> Index {
675 self.2
676 }
677}
678
679impl fmt::Display for ActorId {
680 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681 let ActorId(proc_id, name, pid) = self;
682 match proc_id {
683 ProcId::Ranked(..) => write!(f, "{}.{}[{}]", proc_id, name, pid),
684 ProcId::Direct(..) => write!(f, "{},{}[{}]", proc_id, name, pid),
685 }
686 }
687}
688impl<A: Referable> From<ActorRef<A>> for ActorId {
689 fn from(actor_ref: ActorRef<A>) -> Self {
690 actor_ref.actor_id.clone()
691 }
692}
693
694impl<'a, A: Referable> From<&'a ActorRef<A>> for &'a ActorId {
695 fn from(actor_ref: &'a ActorRef<A>) -> Self {
696 &actor_ref.actor_id
697 }
698}
699
700impl FromStr for ActorId {
701 type Err = ReferenceParsingError;
702
703 fn from_str(addr: &str) -> Result<Self, Self::Err> {
704 match addr.parse()? {
705 Reference::Actor(actor_id) => Ok(actor_id),
706 _ => Err(ReferenceParsingError::WrongType("actor".into())),
707 }
708 }
709}
710
711#[derive(Debug, Named)]
713pub struct ActorRef<A: Referable> {
714 pub(crate) actor_id: ActorId,
715 phantom: PhantomData<fn() -> A>,
717}
718
719impl<A: Referable> ActorRef<A> {
720 pub fn port<M: RemoteMessage>(&self) -> PortRef<M>
722 where
723 A: RemoteHandles<M>,
724 {
725 PortRef::attest(self.actor_id.port_id(<M as Named>::port()))
726 }
727
728 pub fn send<M: RemoteMessage>(
730 &self,
731 cx: &impl context::Actor,
732 message: M,
733 ) -> Result<(), MailboxSenderError>
734 where
735 A: RemoteHandles<M>,
736 {
737 self.port().send(cx, message)
738 }
739
740 pub fn send_with_headers<M: RemoteMessage>(
743 &self,
744 cx: &impl context::Actor,
745 headers: Attrs,
746 message: M,
747 ) -> Result<(), MailboxSenderError>
748 where
749 A: RemoteHandles<M>,
750 {
751 self.port().send_with_headers(cx, headers, message)
752 }
753
754 pub fn attest(actor_id: ActorId) -> Self {
759 Self {
760 actor_id,
761 phantom: PhantomData,
762 }
763 }
764
765 pub fn actor_id(&self) -> &ActorId {
767 &self.actor_id
768 }
769
770 pub fn into_actor_id(self) -> ActorId {
772 self.actor_id
773 }
774
775 pub fn downcast_handle(&self, cx: &impl context::Actor) -> Option<ActorHandle<A>>
779 where
780 A: Actor,
781 {
782 cx.instance().proc().resolve_actor_ref(self)
783 }
784}
785
786impl<A: Referable> Serialize for ActorRef<A> {
788 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
789 where
790 S: Serializer,
791 {
792 self.actor_id().serialize(serializer)
794 }
795}
796
797impl<'de, A: Referable> Deserialize<'de> for ActorRef<A> {
799 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
800 where
801 D: Deserializer<'de>,
802 {
803 let actor_id = <ActorId>::deserialize(deserializer)?;
804 Ok(ActorRef {
805 actor_id,
806 phantom: PhantomData,
807 })
808 }
809}
810
811impl<A: Referable> fmt::Display for ActorRef<A> {
812 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
813 fmt::Display::fmt(&self.actor_id, f)?;
814 write!(f, "<{}>", std::any::type_name::<A>())
815 }
816}
817
818impl<A: Referable> Clone for ActorRef<A> {
820 fn clone(&self) -> Self {
821 Self {
822 actor_id: self.actor_id.clone(),
823 phantom: PhantomData,
824 }
825 }
826}
827
828impl<A: Referable> PartialEq for ActorRef<A> {
829 fn eq(&self, other: &Self) -> bool {
830 self.actor_id == other.actor_id
831 }
832}
833
834impl<A: Referable> Eq for ActorRef<A> {}
835
836impl<A: Referable> PartialOrd for ActorRef<A> {
837 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
838 Some(self.cmp(other))
839 }
840}
841
842impl<A: Referable> Ord for ActorRef<A> {
843 fn cmp(&self, other: &Self) -> Ordering {
844 self.actor_id.cmp(&other.actor_id)
845 }
846}
847
848impl<A: Referable> Hash for ActorRef<A> {
849 fn hash<H: Hasher>(&self, state: &mut H) {
850 self.actor_id.hash(state);
851 }
852}
853
854#[derive(
859 Debug,
860 Serialize,
861 Deserialize,
862 Clone,
863 PartialEq,
864 Eq,
865 PartialOrd,
866 Hash,
867 Ord,
868 Named
869)]
870pub struct PortId(pub ActorId, pub u64);
871
872impl PortId {
873 pub fn actor_id(&self) -> &ActorId {
875 &self.0
876 }
877
878 pub fn into_actor_id(self) -> ActorId {
880 self.0
881 }
882
883 pub fn index(&self) -> u64 {
885 self.1
886 }
887
888 pub fn send(&self, cx: &impl context::Actor, serialized: Serialized) {
892 let mut headers = Attrs::new();
893 crate::mailbox::headers::set_send_timestamp(&mut headers);
894 cx.post(self.clone(), headers, serialized, true);
895 }
896
897 pub fn send_with_headers(
901 &self,
902 cx: &impl context::Actor,
903 serialized: Serialized,
904 mut headers: Attrs,
905 ) {
906 crate::mailbox::headers::set_send_timestamp(&mut headers);
907 cx.post(self.clone(), headers, serialized, true);
908 }
909
910 pub fn split(
913 &self,
914 cx: &impl context::Actor,
915 reducer_spec: Option<ReducerSpec>,
916 reducer_opts: Option<ReducerOpts>,
917 return_undeliverable: bool,
918 ) -> anyhow::Result<PortId> {
919 cx.split(
920 self.clone(),
921 reducer_spec,
922 reducer_opts,
923 return_undeliverable,
924 )
925 }
926}
927
928impl FromStr for PortId {
929 type Err = ReferenceParsingError;
930
931 fn from_str(addr: &str) -> Result<Self, Self::Err> {
932 match addr.parse()? {
933 Reference::Port(port_id) => Ok(port_id),
934 _ => Err(ReferenceParsingError::WrongType("port".into())),
935 }
936 }
937}
938
939impl fmt::Display for PortId {
940 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
941 let PortId(actor_id, port) = self;
942 if port & (1 << 63) != 0 {
943 let type_info = TypeInfo::get(*port).or_else(|| TypeInfo::get(*port & !(1 << 63)));
944 let typename = type_info.map_or("unknown", TypeInfo::typename);
945 write!(f, "{}[{}<{}>]", actor_id, port, typename)
946 } else {
947 write!(f, "{}[{}]", actor_id, port)
948 }
949 }
950}
951
952#[derive(Debug, Serialize, Deserialize, Derivative, Named)]
955#[derivative(PartialEq, Eq, PartialOrd, Hash, Ord)]
956pub struct PortRef<M> {
957 port_id: PortId,
958 #[derivative(
959 PartialEq = "ignore",
960 PartialOrd = "ignore",
961 Ord = "ignore",
962 Hash = "ignore"
963 )]
964 reducer_spec: Option<ReducerSpec>,
965 #[derivative(
966 PartialEq = "ignore",
967 PartialOrd = "ignore",
968 Ord = "ignore",
969 Hash = "ignore"
970 )]
971 reducer_opts: Option<ReducerOpts>,
972 phantom: PhantomData<M>,
973 return_undeliverable: bool,
974}
975
976impl<M: RemoteMessage> PortRef<M> {
977 pub fn attest(port_id: PortId) -> Self {
980 Self {
981 port_id,
982 reducer_spec: None,
983 reducer_opts: None,
984 phantom: PhantomData,
985 return_undeliverable: true,
986 }
987 }
988
989 pub fn attest_reducible(port_id: PortId, reducer_spec: Option<ReducerSpec>) -> Self {
992 Self {
993 port_id,
994 reducer_spec,
995 reducer_opts: None, phantom: PhantomData,
997 return_undeliverable: true,
998 }
999 }
1000
1001 pub fn attest_message_port(actor: &ActorId) -> Self {
1004 PortRef::<M>::attest(actor.port_id(<M as Named>::port()))
1005 }
1006
1007 pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
1010 &self.reducer_spec
1011 }
1012
1013 pub fn port_id(&self) -> &PortId {
1015 &self.port_id
1016 }
1017
1018 pub fn into_port_id(self) -> PortId {
1020 self.port_id
1021 }
1022
1023 pub fn into_once(self) -> OncePortRef<M> {
1026 OncePortRef::attest(self.into_port_id())
1027 }
1028
1029 pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1032 self.send_with_headers(cx, Attrs::new(), message)
1033 }
1034
1035 pub fn send_with_headers(
1039 &self,
1040 cx: &impl context::Actor,
1041 headers: Attrs,
1042 message: M,
1043 ) -> Result<(), MailboxSenderError> {
1044 let serialized = Serialized::serialize(&message).map_err(|err| {
1045 MailboxSenderError::new_bound(
1046 self.port_id.clone(),
1047 MailboxSenderErrorKind::Serialize(err.into()),
1048 )
1049 })?;
1050 self.send_serialized(cx, headers, serialized);
1051 Ok(())
1052 }
1053
1054 pub fn send_serialized(
1057 &self,
1058 cx: &impl context::Actor,
1059 mut headers: Attrs,
1060 message: Serialized,
1061 ) {
1062 crate::mailbox::headers::set_send_timestamp(&mut headers);
1063 crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1064 cx.post(
1065 self.port_id.clone(),
1066 headers,
1067 message,
1068 self.return_undeliverable,
1069 );
1070 }
1071
1072 pub fn into_sink<C: context::Actor>(self, cx: C) -> PortSink<C, M> {
1074 PortSink::new(cx, self)
1075 }
1076
1077 pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
1080 self.return_undeliverable = return_undeliverable;
1081 }
1082}
1083
1084impl<M: RemoteMessage> Clone for PortRef<M> {
1085 fn clone(&self) -> Self {
1086 Self {
1087 port_id: self.port_id.clone(),
1088 reducer_spec: self.reducer_spec.clone(),
1089 reducer_opts: self.reducer_opts.clone(),
1090 phantom: PhantomData,
1091 return_undeliverable: self.return_undeliverable,
1092 }
1093 }
1094}
1095
1096impl<M: RemoteMessage> fmt::Display for PortRef<M> {
1097 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1098 fmt::Display::fmt(&self.port_id, f)
1099 }
1100}
1101
1102#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
1104pub struct UnboundPort(
1105 pub PortId,
1106 pub Option<ReducerSpec>,
1107 pub Option<ReducerOpts>,
1108 pub bool, );
1110
1111impl UnboundPort {
1112 pub fn update(&mut self, port_id: PortId) {
1114 self.0 = port_id;
1115 }
1116}
1117
1118impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
1119 fn from(port_ref: &PortRef<M>) -> Self {
1120 UnboundPort(
1121 port_ref.port_id.clone(),
1122 port_ref.reducer_spec.clone(),
1123 port_ref.reducer_opts.clone(),
1124 port_ref.return_undeliverable,
1125 )
1126 }
1127}
1128
1129impl<M: RemoteMessage> Unbind for PortRef<M> {
1130 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
1131 bindings.push_back(&UnboundPort::from(self))
1132 }
1133}
1134
1135impl<M: RemoteMessage> Bind for PortRef<M> {
1136 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
1137 let bound = bindings.try_pop_front::<UnboundPort>()?;
1138 self.port_id = bound.0;
1139 self.reducer_spec = bound.1;
1140 self.reducer_opts = bound.2;
1141 self.return_undeliverable = bound.3;
1142 Ok(())
1143 }
1144}
1145
1146#[derive(Debug, Serialize, Deserialize, PartialEq)]
1150pub struct OncePortRef<M> {
1151 port_id: PortId,
1152 phantom: PhantomData<M>,
1153}
1154
1155impl<M: RemoteMessage> OncePortRef<M> {
1156 pub(crate) fn attest(port_id: PortId) -> Self {
1157 Self {
1158 port_id,
1159 phantom: PhantomData,
1160 }
1161 }
1162
1163 pub fn port_id(&self) -> &PortId {
1165 &self.port_id
1166 }
1167
1168 pub fn into_port_id(self) -> PortId {
1170 self.port_id
1171 }
1172
1173 pub fn send(self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1176 self.send_with_headers(cx, Attrs::new(), message)
1177 }
1178
1179 pub fn send_with_headers(
1182 self,
1183 cx: &impl context::Actor,
1184 mut headers: Attrs,
1185 message: M,
1186 ) -> Result<(), MailboxSenderError> {
1187 crate::mailbox::headers::set_send_timestamp(&mut headers);
1188 let serialized = Serialized::serialize(&message).map_err(|err| {
1189 MailboxSenderError::new_bound(
1190 self.port_id.clone(),
1191 MailboxSenderErrorKind::Serialize(err.into()),
1192 )
1193 })?;
1194 cx.post(self.port_id.clone(), headers, serialized, true);
1195 Ok(())
1196 }
1197}
1198
1199impl<M: RemoteMessage> Clone for OncePortRef<M> {
1200 fn clone(&self) -> Self {
1201 Self {
1202 port_id: self.port_id.clone(),
1203 phantom: PhantomData,
1204 }
1205 }
1206}
1207
1208impl<M: RemoteMessage> fmt::Display for OncePortRef<M> {
1209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1210 fmt::Display::fmt(&self.port_id, f)
1211 }
1212}
1213
1214impl<M: RemoteMessage> Named for OncePortRef<M> {
1215 fn typename() -> &'static str {
1216 crate::data::intern_typename!(Self, "hyperactor::mailbox::OncePortRef<{}>", M)
1217 }
1218}
1219
1220impl<M: RemoteMessage> Unbind for OncePortRef<M> {
1224 fn unbind(&self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1225 Ok(())
1226 }
1227}
1228
1229impl<M: RemoteMessage> Bind for OncePortRef<M> {
1230 fn bind(&mut self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1231 Ok(())
1232 }
1233}
1234
1235#[derive(
1237 Debug,
1238 Serialize,
1239 Deserialize,
1240 Clone,
1241 PartialEq,
1242 Eq,
1243 PartialOrd,
1244 Hash,
1245 Ord,
1246 Named
1247)]
1248pub struct GangId(pub WorldId, pub String);
1249
1250impl GangId {
1251 pub(crate) fn expand(&self, world_size: usize) -> impl Iterator<Item = ActorId> + '_ {
1252 (0..world_size).map(|rank| ActorId(ProcId::Ranked(self.0.clone(), rank), self.1.clone(), 0))
1253 }
1254
1255 pub fn world_id(&self) -> &WorldId {
1257 &self.0
1258 }
1259
1260 pub fn name(&self) -> &str {
1262 &self.1
1263 }
1264
1265 pub fn actor_id(&self, rank: Index) -> ActorId {
1268 ActorId(
1269 ProcId::Ranked(self.world_id().clone(), rank),
1270 self.name().to_string(),
1271 0,
1272 )
1273 }
1274}
1275
1276impl fmt::Display for GangId {
1277 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1278 let GangId(world_id, name) = self;
1279 write!(f, "{}.{}", world_id, name)
1280 }
1281}
1282
1283impl FromStr for GangId {
1284 type Err = ReferenceParsingError;
1285
1286 fn from_str(addr: &str) -> Result<Self, Self::Err> {
1287 match addr.parse()? {
1288 Reference::Gang(gang_id) => Ok(gang_id),
1289 _ => Err(ReferenceParsingError::WrongType("gang".into())),
1290 }
1291 }
1292}
1293
1294fn chop<'a>(mut s: &'a str, delims: &'a [&'a str]) -> impl Iterator<Item = &'a str> + 'a {
1296 std::iter::from_fn(move || {
1297 if s.is_empty() {
1298 return None;
1299 }
1300
1301 match delims
1302 .iter()
1303 .enumerate()
1304 .flat_map(|(index, d)| s.find(d).map(|pos| (index, pos)))
1305 .min_by_key(|&(_, v)| v)
1306 {
1307 Some((index, 0)) => {
1308 let delim = delims[index];
1309 s = &s[delim.len()..];
1310 Some(delim)
1311 }
1312 Some((_, pos)) => {
1313 let token = &s[..pos];
1314 s = &s[pos..];
1315 Some(token.trim())
1316 }
1317 None => {
1318 let token = s;
1319 s = "";
1320 Some(token.trim())
1321 }
1322 }
1323 })
1324}
1325
1326#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Hash, Ord)]
1328pub struct GangRef<A: Referable> {
1329 gang_id: GangId,
1330 phantom: PhantomData<A>,
1331}
1332
1333impl<A: Referable> GangRef<A> {
1334 pub fn rank(&self, rank: Index) -> ActorRef<A> {
1338 let GangRef {
1339 gang_id: GangId(world_id, name),
1340 ..
1341 } = self;
1342 ActorRef::attest(ActorId(
1343 ProcId::Ranked(world_id.clone(), rank),
1344 name.clone(),
1345 0,
1346 ))
1347 }
1348
1349 pub fn gang_id(&self) -> &GangId {
1351 &self.gang_id
1352 }
1353
1354 pub fn attest(gang_id: GangId) -> Self {
1357 Self {
1358 gang_id,
1359 phantom: PhantomData,
1360 }
1361 }
1362}
1363
1364impl<A: Referable> Clone for GangRef<A> {
1365 fn clone(&self) -> Self {
1366 Self {
1367 gang_id: self.gang_id.clone(),
1368 phantom: PhantomData,
1369 }
1370 }
1371}
1372
1373impl<A: Referable> From<GangRef<A>> for GangId {
1374 fn from(gang_ref: GangRef<A>) -> Self {
1375 gang_ref.gang_id
1376 }
1377}
1378
1379impl<'a, A: Referable> From<&'a GangRef<A>> for &'a GangId {
1380 fn from(gang_ref: &'a GangRef<A>) -> Self {
1381 &gang_ref.gang_id
1382 }
1383}
1384
1385#[cfg(test)]
1386mod tests {
1387 use rand::seq::SliceRandom;
1388 use rand::thread_rng;
1389
1390 use super::*;
1391
1392 #[test]
1393 fn test_reference_parse() {
1394 let cases: Vec<(&str, Reference)> = vec![
1395 ("test", WorldId("test".into()).into()),
1396 (
1397 "test[234]",
1398 ProcId::Ranked(WorldId("test".into()), 234).into(),
1399 ),
1400 (
1401 "test[234].testactor[6]",
1402 ActorId(
1403 ProcId::Ranked(WorldId("test".into()), 234),
1404 "testactor".into(),
1405 6,
1406 )
1407 .into(),
1408 ),
1409 (
1410 "test[234].testactor[6][1]",
1411 PortId(
1412 ActorId(
1413 ProcId::Ranked(WorldId("test".into()), 234),
1414 "testactor".into(),
1415 6,
1416 ),
1417 1,
1418 )
1419 .into(),
1420 ),
1421 (
1422 "test.testactor",
1423 GangId(WorldId("test".into()), "testactor".into()).into(),
1424 ),
1425 (
1426 "tcp:[::1]:1234,test,testactor[123]",
1427 ActorId(
1428 ProcId::Direct("tcp:[::1]:1234".parse().unwrap(), "test".to_string()),
1429 "testactor".to_string(),
1430 123,
1431 )
1432 .into(),
1433 ),
1434 (
1435 "tcp:[::1]:1234,test,testactor[0][123<my::type>]",
1437 PortId(
1438 ActorId(
1439 ProcId::Direct("tcp:[::1]:1234".parse().unwrap(), "test".to_string()),
1440 "testactor".to_string(),
1441 0,
1442 ),
1443 123,
1444 )
1445 .into(),
1446 ),
1447 (
1448 "test[234].testactor_12345[6]",
1450 ActorId(
1451 ProcId::Ranked(WorldId("test".into()), 234),
1452 "testactor_12345".into(),
1453 6,
1454 )
1455 .into(),
1456 ),
1457 ];
1458
1459 for (s, expected) in cases {
1460 assert_eq!(s.parse::<Reference>().unwrap(), expected, "for {}", s);
1461 }
1462 }
1463
1464 #[test]
1465 fn test_reference_parse_error() {
1466 let cases: Vec<&str> = vec![
1467 "(blah)",
1468 "world(1, 2, 3)",
1469 "test[234].testactor-12345[6]",
1471 ];
1472
1473 for s in cases {
1474 let result: Result<Reference, ReferenceParsingError> = s.parse();
1475 assert!(result.is_err());
1476 }
1477 }
1478
1479 #[test]
1480 fn test_id_macro() {
1481 assert_eq!(id!(hello), WorldId("hello".into()));
1482 assert_eq!(id!(hello[0]), ProcId::Ranked(WorldId("hello".into()), 0));
1483 assert_eq!(
1484 id!(hello[0].actor),
1485 ActorId(
1486 ProcId::Ranked(WorldId("hello".into()), 0),
1487 "actor".into(),
1488 0
1489 )
1490 );
1491 assert_eq!(
1492 id!(hello[0].actor[1]),
1493 ActorId(
1494 ProcId::Ranked(WorldId("hello".into()), 0),
1495 "actor".into(),
1496 1
1497 )
1498 );
1499 assert_eq!(
1500 id!(hello.actor),
1501 GangId(WorldId("hello".into()), "actor".into())
1502 );
1503 }
1504
1505 #[test]
1506 fn test_reference_ord() {
1507 let expected: Vec<Reference> = [
1508 "first",
1509 "second",
1510 "second.actor1",
1511 "second.actor2",
1512 "second[1]",
1513 "second[1].actor1",
1514 "second[1].actor2",
1515 "second[2]",
1516 "second[2].actor100",
1517 "third",
1518 "third.actor",
1519 "third[2]",
1520 "third[2].actor",
1521 "third[2].actor[1]",
1522 ]
1523 .into_iter()
1524 .map(|s| s.parse().unwrap())
1525 .collect();
1526
1527 let mut sorted = expected.to_vec();
1528 sorted.shuffle(&mut thread_rng());
1529 sorted.sort();
1530
1531 assert_eq!(sorted, expected);
1532 }
1533
1534 #[test]
1535 fn test_port_type_annotation() {
1536 #[derive(Named, Serialize, Deserialize)]
1537 struct MyType;
1538 let port_id = PortId(
1539 ActorId(
1540 ProcId::Ranked(WorldId("test".into()), 234),
1541 "testactor".into(),
1542 1,
1543 ),
1544 MyType::port(),
1545 );
1546 assert_eq!(
1547 port_id.to_string(),
1548 "test[234].testactor[1][17867850292987402005<hyperactor::reference::tests::MyType>]"
1549 );
1550 }
1551}