1#![allow(dead_code)] use std::cmp::Ord;
26use std::cmp::Ordering;
27use std::cmp::PartialOrd;
28use std::convert::From;
29use std::fmt;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::marker::PhantomData;
33use std::num::ParseIntError;
34use std::str::FromStr;
35
36use derivative::Derivative;
37use enum_as_inner::EnumAsInner;
38use rand::Rng;
39use serde::Deserialize;
40use serde::Serialize;
41
42use crate as hyperactor;
43use crate::Actor;
44use crate::ActorHandle;
45use crate::Named;
46use crate::RemoteHandles;
47use crate::RemoteMessage;
48use crate::accum::ReducerSpec;
49use crate::actor::RemoteActor;
50use crate::attrs::Attrs;
51use crate::cap;
52use crate::channel::ChannelAddr;
53use crate::data::Serialized;
54use crate::mailbox::MailboxSenderError;
55use crate::mailbox::MailboxSenderErrorKind;
56use crate::mailbox::PortSink;
57use crate::message::Bind;
58use crate::message::Bindings;
59use crate::message::Unbind;
60use crate::parse::Lexer;
61use crate::parse::ParseError;
62use crate::parse::Token;
63use crate::parse::parse;
64
65#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash, Named)]
81pub enum Reference {
82 World(WorldId),
84 Proc(ProcId),
86 Actor(ActorId), Port(PortId),
90 Gang(GangId),
92}
93
94impl Reference {
95 pub fn is_prefix_of(&self, other: &Reference) -> bool {
97 match self {
98 Self::World(_) => self.world_id() == other.world_id(),
99 Self::Proc(_) => self.proc_id() == other.proc_id(),
100 Self::Actor(_) => self == other,
101 Self::Port(_) => self == other,
102 Self::Gang(_) => self == other,
103 }
104 }
105
106 pub fn world_id(&self) -> Option<&WorldId> {
108 match self {
109 Self::World(world_id) => Some(world_id),
110 Self::Proc(proc_id) => proc_id.world_id(),
111 Self::Actor(ActorId(proc_id, _, _)) => proc_id.world_id(),
112 Self::Port(PortId(ActorId(proc_id, _, _), _)) => proc_id.world_id(),
113 Self::Gang(GangId(world_id, _)) => Some(world_id),
114 }
115 }
116
117 pub fn proc_id(&self) -> Option<&ProcId> {
119 match self {
120 Self::World(_) => None,
121 Self::Proc(proc_id) => Some(proc_id),
122 Self::Actor(ActorId(proc_id, _, _)) => Some(proc_id),
123 Self::Port(PortId(ActorId(proc_id, _, _), _)) => Some(proc_id),
124 Self::Gang(_) => None,
125 }
126 }
127
128 fn rank(&self) -> Option<Index> {
130 self.proc_id().and_then(|proc_id| proc_id.rank())
131 }
132
133 pub fn actor_id(&self) -> Option<&ActorId> {
135 match self {
136 Self::World(_) => None,
137 Self::Proc(_) => None,
138 Self::Actor(actor_id) => Some(actor_id),
139 Self::Port(PortId(actor_id, _)) => Some(actor_id),
140 Self::Gang(_) => None,
141 }
142 }
143
144 fn actor_name(&self) -> Option<&str> {
146 match self {
147 Self::World(_) => None,
148 Self::Proc(_) => None,
149 Self::Actor(actor_id) => Some(actor_id.name()),
150 Self::Port(PortId(actor_id, _)) => Some(actor_id.name()),
151 Self::Gang(gang_id) => Some(&gang_id.1),
152 }
153 }
154
155 fn pid(&self) -> Option<Index> {
157 match self {
158 Self::World(_) => None,
159 Self::Proc(_) => None,
160 Self::Actor(actor_id) => Some(actor_id.pid()),
161 Self::Port(PortId(actor_id, _)) => Some(actor_id.pid()),
162 Self::Gang(_) => None,
163 }
164 }
165
166 fn port(&self) -> Option<u64> {
168 match self {
169 Self::World(_) => None,
170 Self::Proc(_) => None,
171 Self::Actor(_) => None,
172 Self::Port(port_id) => Some(port_id.index()),
173 Self::Gang(_) => None,
174 }
175 }
176}
177
178impl PartialOrd for Reference {
179 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
180 Some(self.cmp(other))
181 }
182}
183
184impl Ord for Reference {
185 fn cmp(&self, other: &Self) -> Ordering {
186 (
187 self.world_id(),
188 self.rank(),
189 self.actor_name(),
190 self.pid(),
191 self.port(),
192 )
193 .cmp(&(
194 other.world_id(),
195 other.rank(),
196 other.actor_name(),
197 other.pid(),
198 other.port(),
199 ))
200 }
201}
202
203impl fmt::Display for Reference {
204 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205 match self {
206 Self::World(world_id) => fmt::Display::fmt(world_id, f),
207 Self::Proc(proc_id) => fmt::Display::fmt(proc_id, f),
208 Self::Actor(actor_id) => fmt::Display::fmt(actor_id, f),
209 Self::Port(port_id) => fmt::Display::fmt(port_id, f),
210 Self::Gang(gang_id) => fmt::Display::fmt(gang_id, f),
211 }
212 }
213}
214
215#[macro_export]
252macro_rules! id {
253 ($world:ident) => {
254 $crate::reference::WorldId(stringify!($world).to_string())
255 };
256 ($world:ident [$rank:expr_2021]) => {
257 $crate::reference::ProcId::Ranked(
258 $crate::reference::WorldId(stringify!($world).to_string()),
259 $rank,
260 )
261 };
262 ($world:ident [$rank:expr_2021] . $actor:ident) => {
263 $crate::reference::ActorId(
264 $crate::reference::ProcId::Ranked(
265 $crate::reference::WorldId(stringify!($world).to_string()),
266 $rank,
267 ),
268 stringify!($actor).to_string(),
269 0,
270 )
271 };
272 ($world:ident [$rank:expr_2021] . $actor:ident [$pid:expr_2021]) => {
273 $crate::reference::ActorId(
274 $crate::reference::ProcId::Ranked(
275 $crate::reference::WorldId(stringify!($world).to_string()),
276 $rank,
277 ),
278 stringify!($actor).to_string(),
279 $pid,
280 )
281 };
282 ($world:ident . $actor:ident) => {
283 $crate::reference::GangId(
284 $crate::reference::WorldId(stringify!($world).to_string()),
285 stringify!($actor).to_string(),
286 )
287 };
288 ($world:ident [$rank:expr_2021] . $actor:ident [$pid:expr_2021] [$port:expr_2021]) => {
289 $crate::reference::PortId(
290 $crate::reference::ActorId(
291 $crate::reference::ProcId::Ranked(
292 $crate::reference::WorldId(stringify!($world).to_string()),
293 $rank,
294 ),
295 stringify!($actor).to_string(),
296 $pid,
297 ),
298 $port,
299 )
300 };
301}
302pub use id;
303
304#[derive(thiserror::Error, Debug)]
306pub enum ReferenceParsingError {
307 #[error("expected token")]
309 Empty,
310
311 #[error("unexpected token: {0}")]
313 Unexpected(String),
314
315 #[error(transparent)]
317 ParseInt(#[from] ParseIntError),
318
319 #[error("parse: {0}")]
321 Parse(#[from] ParseError),
322
323 #[error("wrong reference type: expected {0}")]
325 WrongType(String),
326
327 #[error("invalid channel address {0}: {1}")]
329 InvalidChannelAddress(String, anyhow::Error),
330}
331
332impl FromStr for Reference {
333 type Err = ReferenceParsingError;
334
335 fn from_str(addr: &str) -> Result<Self, Self::Err> {
336 match addr.split_once(",") {
343 Some((channel_addr, rest)) => {
344 let channel_addr = channel_addr.parse().map_err(|err| {
345 ReferenceParsingError::InvalidChannelAddress(channel_addr.to_string(), err)
346 })?;
347
348 Ok(parse! {
349 Lexer::new(rest);
350
351 Token::Elem(proc_name) =>
353 Self::Proc(ProcId::Direct(channel_addr, proc_name.to_string())),
354
355 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name) =>
357 Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), 0)),
358
359 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
361 Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
362 Self::Actor(ActorId(ProcId::Direct(channel_addr, proc_name.to_string()), actor_name.to_string(), rank)),
363 }?)
364 }
365
366 None => {
368 Ok(parse! {
369 Lexer::new(addr);
370
371 Token::Elem(world) => Self::World(WorldId(world.into())),
373
374 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket =>
376 Self::Proc(ProcId::Ranked(WorldId(world.into()), rank)),
377
378 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
380 Token::Dot Token::Elem(actor) =>
381 Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), 0)),
382
383 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
385 Token::Dot Token::Elem(actor)
386 Token::LeftBracket Token::Uint(pid) Token::RightBracket =>
387 Self::Actor(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid)),
388
389 Token::Elem(world) Token::LeftBracket Token::Uint(rank) Token::RightBracket
391 Token::Dot Token::Elem(actor)
392 Token::LeftBracket Token::Uint(pid) Token::RightBracket
393 Token::LeftBracket Token::Uint(index) Token::RightBracket =>
394 Self::Port(PortId(ActorId(ProcId::Ranked(WorldId(world.into()), rank), actor.into(), pid), index as u64)),
395
396 Token::Elem(world) Token::Dot Token::Elem(actor) =>
398 Self::Gang(GangId(WorldId(world.into()), actor.into())),
399 }?)
400 }
401 }
402 }
403}
404
405impl From<WorldId> for Reference {
406 fn from(world_id: WorldId) -> Self {
407 Self::World(world_id)
408 }
409}
410
411impl From<ProcId> for Reference {
412 fn from(proc_id: ProcId) -> Self {
413 Self::Proc(proc_id)
414 }
415}
416
417impl From<ActorId> for Reference {
418 fn from(actor_id: ActorId) -> Self {
419 Self::Actor(actor_id)
420 }
421}
422
423impl From<PortId> for Reference {
424 fn from(port_id: PortId) -> Self {
425 Self::Port(port_id)
426 }
427}
428
429impl From<GangId> for Reference {
430 fn from(gang_id: GangId) -> Self {
431 Self::Gang(gang_id)
432 }
433}
434
435pub type Index = usize;
438
439#[derive(
442 Debug,
443 Serialize,
444 Deserialize,
445 Clone,
446 PartialEq,
447 Eq,
448 PartialOrd,
449 Hash,
450 Ord,
451 Named
452)]
453pub struct WorldId(pub String);
454
455impl WorldId {
456 pub fn proc_id(&self, index: Index) -> ProcId {
458 ProcId::Ranked(self.clone(), index)
459 }
460
461 pub fn name(&self) -> &str {
463 &self.0
464 }
465
466 pub fn random_user_proc(&self) -> ProcId {
468 let mask = 1usize << (std::mem::size_of::<usize>() * 8 - 1);
469 ProcId::Ranked(self.clone(), rand::thread_rng().r#gen::<usize>() | mask)
470 }
471}
472
473impl fmt::Display for WorldId {
474 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475 let WorldId(name) = self;
476 write!(f, "{}", name)
477 }
478}
479
480impl FromStr for WorldId {
481 type Err = ReferenceParsingError;
482
483 fn from_str(addr: &str) -> Result<Self, Self::Err> {
484 match addr.parse()? {
485 Reference::World(world_id) => Ok(world_id),
486 _ => Err(ReferenceParsingError::WrongType("world".into())),
487 }
488 }
489}
490
491#[derive(
499 Debug,
500 Serialize,
501 Deserialize,
502 Clone,
503 PartialEq,
504 Eq,
505 PartialOrd,
506 Hash,
507 Ord,
508 Named,
509 EnumAsInner
510)]
511pub enum ProcId {
512 Ranked(WorldId, Index),
514 Direct(ChannelAddr, String),
516}
517
518impl ProcId {
519 pub fn actor_id(&self, name: impl Into<String>, pid: Index) -> ActorId {
521 ActorId(self.clone(), name.into(), pid)
522 }
523
524 pub fn world_id(&self) -> Option<&WorldId> {
526 match self {
527 ProcId::Ranked(world_id, _) => Some(world_id),
528 ProcId::Direct(_, _) => None,
529 }
530 }
531
532 pub fn world_name(&self) -> Option<&str> {
534 self.world_id().map(|world_id| world_id.name())
535 }
536
537 pub fn rank(&self) -> Option<Index> {
539 match self {
540 ProcId::Ranked(_, rank) => Some(*rank),
541 ProcId::Direct(_, _) => None,
542 }
543 }
544}
545
546impl fmt::Display for ProcId {
547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548 match self {
549 ProcId::Ranked(world_id, rank) => write!(f, "{}[{}]", world_id, rank),
550 ProcId::Direct(addr, name) => write!(f, "{},{}", addr, name),
551 }
552 }
553}
554
555impl FromStr for ProcId {
556 type Err = ReferenceParsingError;
557
558 fn from_str(addr: &str) -> Result<Self, Self::Err> {
559 match addr.parse()? {
560 Reference::Proc(proc_id) => Ok(proc_id),
561 _ => Err(ReferenceParsingError::WrongType("proc".into())),
562 }
563 }
564}
565
566#[derive(
568 Debug,
569 Serialize,
570 Deserialize,
571 Clone,
572 PartialEq,
573 Eq,
574 PartialOrd,
575 Hash,
576 Ord,
577 Named
578)]
579pub struct ActorId(pub ProcId, pub String, pub Index);
580
581impl ActorId {
582 pub fn port_id(&self, port: u64) -> PortId {
584 PortId(self.clone(), port)
585 }
586
587 pub fn child_id(&self, pid: Index) -> Self {
589 Self(self.0.clone(), self.1.clone(), pid)
590 }
591
592 pub fn root(proc_id: ProcId, name: String) -> Self {
594 Self(proc_id, name, 0)
595 }
596
597 pub fn proc_id(&self) -> &ProcId {
599 &self.0
600 }
601
602 pub fn world_name(&self) -> &str {
604 self.0
605 .world_name()
606 .expect("world_name() called on direct proc")
607 }
608
609 pub fn rank(&self) -> Index {
611 self.0.rank().expect("rank() called on direct proc")
612 }
613
614 pub fn name(&self) -> &str {
616 &self.1
617 }
618
619 pub fn pid(&self) -> Index {
621 self.2
622 }
623}
624
625impl fmt::Display for ActorId {
626 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627 let ActorId(proc_id, name, pid) = self;
628 write!(f, "{}.{}[{}]", proc_id, name, pid)
629 }
630}
631impl<A: RemoteActor> From<ActorRef<A>> for ActorId {
632 fn from(actor_ref: ActorRef<A>) -> Self {
633 actor_ref.actor_id.clone()
634 }
635}
636
637impl<'a, A: RemoteActor> From<&'a ActorRef<A>> for &'a ActorId {
638 fn from(actor_ref: &'a ActorRef<A>) -> Self {
639 &actor_ref.actor_id
640 }
641}
642
643impl FromStr for ActorId {
644 type Err = ReferenceParsingError;
645
646 fn from_str(addr: &str) -> Result<Self, Self::Err> {
647 match addr.parse()? {
648 Reference::Actor(actor_id) => Ok(actor_id),
649 _ => Err(ReferenceParsingError::WrongType("actor".into())),
650 }
651 }
652}
653
654#[derive(Debug, Serialize, Deserialize, Named)]
656pub struct ActorRef<A: RemoteActor> {
657 pub(crate) actor_id: ActorId,
658 phantom: PhantomData<A>,
659}
660
661impl<A: RemoteActor> ActorRef<A> {
662 pub fn port<M: RemoteMessage>(&self) -> PortRef<M>
664 where
665 A: RemoteHandles<M>,
666 {
667 PortRef::attest(self.actor_id.port_id(<M as Named>::port()))
668 }
669
670 #[allow(clippy::result_large_err)] pub fn send<M: RemoteMessage>(
673 &self,
674 cap: &impl cap::CanSend,
675 message: M,
676 ) -> Result<(), MailboxSenderError>
677 where
678 A: RemoteHandles<M>,
679 {
680 self.port().send(cap, message)
681 }
682
683 #[allow(clippy::result_large_err)] pub fn send_with_headers<M: RemoteMessage>(
687 &self,
688 cap: &impl cap::CanSend,
689 headers: Attrs,
690 message: M,
691 ) -> Result<(), MailboxSenderError>
692 where
693 A: RemoteHandles<M>,
694 {
695 self.port().send_with_headers(cap, headers, message)
696 }
697
698 pub fn attest(actor_id: ActorId) -> Self {
703 Self {
704 actor_id,
705 phantom: PhantomData,
706 }
707 }
708
709 pub fn actor_id(&self) -> &ActorId {
711 &self.actor_id
712 }
713
714 pub fn into_actor_id(self) -> ActorId {
716 self.actor_id
717 }
718
719 pub fn downcast_handle(&self, cap: &impl cap::CanResolveActorRef) -> Option<ActorHandle<A>>
723 where
724 A: Actor,
725 {
726 cap.resolve_actor_ref(self)
727 }
728}
729
730impl<A: RemoteActor> fmt::Display for ActorRef<A> {
731 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
732 fmt::Display::fmt(&self.actor_id, f)?;
733 write!(f, "<{}>", std::any::type_name::<A>())
734 }
735}
736
737impl<A: RemoteActor> Clone for ActorRef<A> {
739 fn clone(&self) -> Self {
740 Self {
741 actor_id: self.actor_id.clone(),
742 phantom: PhantomData,
743 }
744 }
745}
746
747impl<A: RemoteActor> PartialEq for ActorRef<A> {
748 fn eq(&self, other: &Self) -> bool {
749 self.actor_id == other.actor_id
750 }
751}
752
753impl<A: RemoteActor> Eq for ActorRef<A> {}
754
755impl<A: RemoteActor> PartialOrd for ActorRef<A> {
756 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
757 Some(self.cmp(other))
758 }
759}
760
761impl<A: RemoteActor> Ord for ActorRef<A> {
762 fn cmp(&self, other: &Self) -> Ordering {
763 self.actor_id.cmp(&other.actor_id)
764 }
765}
766
767impl<A: RemoteActor> Hash for ActorRef<A> {
768 fn hash<H: Hasher>(&self, state: &mut H) {
769 self.actor_id.hash(state);
770 }
771}
772
773#[derive(
778 Debug,
779 Serialize,
780 Deserialize,
781 Clone,
782 PartialEq,
783 Eq,
784 PartialOrd,
785 Hash,
786 Ord,
787 Named
788)]
789pub struct PortId(pub ActorId, pub u64);
790
791impl PortId {
792 pub fn actor_id(&self) -> &ActorId {
794 &self.0
795 }
796
797 pub fn into_actor_id(self) -> ActorId {
799 self.0
800 }
801
802 pub fn index(&self) -> u64 {
804 self.1
805 }
806
807 pub fn send(&self, caps: &impl cap::CanSend, serialized: &Serialized) {
811 caps.post(self.clone(), Attrs::new(), serialized.clone());
812 }
813
814 pub fn send_with_headers(
818 &self,
819 caps: &impl cap::CanSend,
820 serialized: &Serialized,
821 headers: Attrs,
822 ) {
823 caps.post(self.clone(), headers, serialized.clone());
824 }
825
826 pub fn split(
829 &self,
830 caps: &impl cap::CanSplitPort,
831 reducer_spec: Option<ReducerSpec>,
832 ) -> anyhow::Result<PortId> {
833 caps.split(self.clone(), reducer_spec)
834 }
835}
836
837impl FromStr for PortId {
838 type Err = ReferenceParsingError;
839
840 fn from_str(addr: &str) -> Result<Self, Self::Err> {
841 match addr.parse()? {
842 Reference::Port(port_id) => Ok(port_id),
843 _ => Err(ReferenceParsingError::WrongType("port".into())),
844 }
845 }
846}
847
848impl fmt::Display for PortId {
849 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
850 let PortId(actor_id, port) = self;
851 write!(f, "{}[{}]", actor_id, port)
852 }
853}
854
855#[derive(Debug, Serialize, Deserialize, Derivative, Named)]
858#[derivative(PartialEq, Eq, PartialOrd, Hash, Ord)]
859pub struct PortRef<M: RemoteMessage> {
860 port_id: PortId,
861 #[derivative(
862 PartialEq = "ignore",
863 PartialOrd = "ignore",
864 Ord = "ignore",
865 Hash = "ignore"
866 )]
867 reducer_spec: Option<ReducerSpec>,
868 phantom: PhantomData<M>,
869}
870
871impl<M: RemoteMessage> PortRef<M> {
872 pub fn attest(port_id: PortId) -> Self {
875 Self {
876 port_id,
877 reducer_spec: None,
878 phantom: PhantomData,
879 }
880 }
881
882 pub fn attest_reducible(port_id: PortId, reducer_spec: Option<ReducerSpec>) -> Self {
885 Self {
886 port_id,
887 reducer_spec,
888 phantom: PhantomData,
889 }
890 }
891
892 pub fn attest_message_port(actor: &ActorId) -> Self {
895 PortRef::<M>::attest(actor.port_id(<M as Named>::port()))
896 }
897
898 pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
901 &self.reducer_spec
902 }
903
904 pub fn port_id(&self) -> &PortId {
906 &self.port_id
907 }
908
909 pub fn into_port_id(self) -> PortId {
911 self.port_id
912 }
913
914 pub fn into_once(self) -> OncePortRef<M> {
917 OncePortRef::attest(self.into_port_id())
918 }
919
920 #[allow(clippy::result_large_err)] pub fn send(&self, caps: &impl cap::CanSend, message: M) -> Result<(), MailboxSenderError> {
924 self.send_with_headers(caps, Attrs::new(), message)
925 }
926
927 #[allow(clippy::result_large_err)] pub fn send_with_headers(
932 &self,
933 caps: &impl cap::CanSend,
934 headers: Attrs,
935 message: M,
936 ) -> Result<(), MailboxSenderError> {
937 let serialized = Serialized::serialize(&message).map_err(|err| {
938 MailboxSenderError::new_bound(
939 self.port_id.clone(),
940 MailboxSenderErrorKind::Serialize(err.into()),
941 )
942 })?;
943 self.send_serialized(caps, serialized, headers);
944 Ok(())
945 }
946
947 pub fn send_serialized(&self, caps: &impl cap::CanSend, message: Serialized, headers: Attrs) {
950 caps.post(self.port_id.clone(), headers, message);
951 }
952
953 pub fn into_sink<C: cap::CanSend>(self, caps: C) -> PortSink<C, M> {
955 PortSink::new(caps, self)
956 }
957}
958
959impl<M: RemoteMessage> Clone for PortRef<M> {
960 fn clone(&self) -> Self {
961 Self {
962 port_id: self.port_id.clone(),
963 reducer_spec: self.reducer_spec.clone(),
964 phantom: PhantomData,
965 }
966 }
967}
968
969impl<M: RemoteMessage> fmt::Display for PortRef<M> {
970 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
971 fmt::Display::fmt(&self.port_id, f)
972 }
973}
974
975#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
977pub struct UnboundPort(pub PortId, pub Option<ReducerSpec>);
978
979impl UnboundPort {
980 pub fn update(&mut self, port_id: PortId) {
982 self.0 = port_id;
983 }
984}
985
986impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
987 fn from(port_ref: &PortRef<M>) -> Self {
988 UnboundPort(port_ref.port_id.clone(), port_ref.reducer_spec.clone())
989 }
990}
991
992impl<M: RemoteMessage> Unbind for PortRef<M> {
993 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
994 bindings.push_back(&UnboundPort::from(self))
995 }
996}
997
998impl<M: RemoteMessage> Bind for PortRef<M> {
999 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
1000 let bound = bindings.try_pop_front::<UnboundPort>()?;
1001 self.port_id = bound.0;
1002 self.reducer_spec = bound.1;
1003 Ok(())
1004 }
1005}
1006
1007#[derive(Debug, Serialize, Deserialize, PartialEq)]
1011pub struct OncePortRef<M: RemoteMessage> {
1012 port_id: PortId,
1013 phantom: PhantomData<M>,
1014}
1015
1016impl<M: RemoteMessage> OncePortRef<M> {
1017 pub(crate) fn attest(port_id: PortId) -> Self {
1018 Self {
1019 port_id,
1020 phantom: PhantomData,
1021 }
1022 }
1023
1024 pub fn port_id(&self) -> &PortId {
1026 &self.port_id
1027 }
1028
1029 pub fn into_port_id(self) -> PortId {
1031 self.port_id
1032 }
1033
1034 #[allow(clippy::result_large_err)] pub fn send(self, caps: &impl cap::CanSend, message: M) -> Result<(), MailboxSenderError> {
1038 self.send_with_headers(caps, Attrs::new(), message)
1039 }
1040
1041 #[allow(clippy::result_large_err)] pub fn send_with_headers(
1045 self,
1046 caps: &impl cap::CanSend,
1047 headers: Attrs,
1048 message: M,
1049 ) -> Result<(), MailboxSenderError> {
1050 let serialized = Serialized::serialize(&message).map_err(|err| {
1051 MailboxSenderError::new_bound(
1052 self.port_id.clone(),
1053 MailboxSenderErrorKind::Serialize(err.into()),
1054 )
1055 })?;
1056 caps.post(self.port_id.clone(), headers, serialized);
1057 Ok(())
1058 }
1059}
1060
1061impl<M: RemoteMessage> Clone for OncePortRef<M> {
1062 fn clone(&self) -> Self {
1063 Self {
1064 port_id: self.port_id.clone(),
1065 phantom: PhantomData,
1066 }
1067 }
1068}
1069
1070impl<M: RemoteMessage> fmt::Display for OncePortRef<M> {
1071 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1072 fmt::Display::fmt(&self.port_id, f)
1073 }
1074}
1075
1076impl<M: RemoteMessage> Named for OncePortRef<M> {
1077 fn typename() -> &'static str {
1078 crate::data::intern_typename!(Self, "hyperactor::mailbox::OncePortRef<{}>", M)
1079 }
1080}
1081
1082impl<M: RemoteMessage> Unbind for OncePortRef<M> {
1086 fn unbind(&self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1087 Ok(())
1088 }
1089}
1090
1091impl<M: RemoteMessage> Bind for OncePortRef<M> {
1092 fn bind(&mut self, _bindings: &mut Bindings) -> anyhow::Result<()> {
1093 Ok(())
1094 }
1095}
1096
1097#[derive(
1099 Debug,
1100 Serialize,
1101 Deserialize,
1102 Clone,
1103 PartialEq,
1104 Eq,
1105 PartialOrd,
1106 Hash,
1107 Ord,
1108 Named
1109)]
1110pub struct GangId(pub WorldId, pub String);
1111
1112impl GangId {
1113 pub(crate) fn expand(&self, world_size: usize) -> impl Iterator<Item = ActorId> + '_ {
1114 (0..world_size).map(|rank| ActorId(ProcId::Ranked(self.0.clone(), rank), self.1.clone(), 0))
1115 }
1116
1117 pub fn world_id(&self) -> &WorldId {
1119 &self.0
1120 }
1121
1122 pub fn name(&self) -> &str {
1124 &self.1
1125 }
1126
1127 pub fn actor_id(&self, rank: Index) -> ActorId {
1130 ActorId(
1131 ProcId::Ranked(self.world_id().clone(), rank),
1132 self.name().to_string(),
1133 0,
1134 )
1135 }
1136}
1137
1138impl fmt::Display for GangId {
1139 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1140 let GangId(world_id, name) = self;
1141 write!(f, "{}.{}", world_id, name)
1142 }
1143}
1144
1145impl FromStr for GangId {
1146 type Err = ReferenceParsingError;
1147
1148 fn from_str(addr: &str) -> Result<Self, Self::Err> {
1149 match addr.parse()? {
1150 Reference::Gang(gang_id) => Ok(gang_id),
1151 _ => Err(ReferenceParsingError::WrongType("gang".into())),
1152 }
1153 }
1154}
1155
1156fn chop<'a>(mut s: &'a str, delims: &'a [&'a str]) -> impl Iterator<Item = &'a str> + 'a {
1158 std::iter::from_fn(move || {
1159 if s.is_empty() {
1160 return None;
1161 }
1162
1163 match delims
1164 .iter()
1165 .enumerate()
1166 .flat_map(|(index, d)| s.find(d).map(|pos| (index, pos)))
1167 .min_by_key(|&(_, v)| v)
1168 {
1169 Some((index, 0)) => {
1170 let delim = delims[index];
1171 s = &s[delim.len()..];
1172 Some(delim)
1173 }
1174 Some((_, pos)) => {
1175 let token = &s[..pos];
1176 s = &s[pos..];
1177 Some(token.trim())
1178 }
1179 None => {
1180 let token = s;
1181 s = "";
1182 Some(token.trim())
1183 }
1184 }
1185 })
1186}
1187
1188#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Hash, Ord)]
1190pub struct GangRef<A: RemoteActor> {
1191 gang_id: GangId,
1192 phantom: PhantomData<A>,
1193}
1194
1195impl<A: RemoteActor> GangRef<A> {
1196 pub fn rank(&self, rank: Index) -> ActorRef<A> {
1200 let GangRef {
1201 gang_id: GangId(world_id, name),
1202 ..
1203 } = self;
1204 ActorRef::attest(ActorId(
1205 ProcId::Ranked(world_id.clone(), rank),
1206 name.clone(),
1207 0,
1208 ))
1209 }
1210
1211 pub fn gang_id(&self) -> &GangId {
1213 &self.gang_id
1214 }
1215}
1216
1217impl<A: RemoteActor> Clone for GangRef<A> {
1218 fn clone(&self) -> Self {
1219 Self {
1220 gang_id: self.gang_id.clone(),
1221 phantom: PhantomData,
1222 }
1223 }
1224}
1225
1226impl<A: RemoteActor> From<GangId> for GangRef<A> {
1228 fn from(gang_id: GangId) -> Self {
1229 Self {
1230 gang_id,
1231 phantom: PhantomData,
1232 }
1233 }
1234}
1235
1236impl<A: RemoteActor> From<GangRef<A>> for GangId {
1237 fn from(gang_ref: GangRef<A>) -> Self {
1238 gang_ref.gang_id
1239 }
1240}
1241
1242impl<'a, A: RemoteActor> From<&'a GangRef<A>> for &'a GangId {
1243 fn from(gang_ref: &'a GangRef<A>) -> Self {
1244 &gang_ref.gang_id
1245 }
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250 use rand::seq::SliceRandom;
1251 use rand::thread_rng;
1252
1253 use super::*;
1254
1255 #[test]
1256 fn test_reference_parse() {
1257 let cases: Vec<(&str, Reference)> = vec![
1258 ("test", WorldId("test".into()).into()),
1259 (
1260 "test[234]",
1261 ProcId::Ranked(WorldId("test".into()), 234).into(),
1262 ),
1263 (
1264 "test[234].testactor[6]",
1265 ActorId(
1266 ProcId::Ranked(WorldId("test".into()), 234),
1267 "testactor".into(),
1268 6,
1269 )
1270 .into(),
1271 ),
1272 (
1273 "test[234].testactor[6][1]",
1274 PortId(
1275 ActorId(
1276 ProcId::Ranked(WorldId("test".into()), 234),
1277 "testactor".into(),
1278 6,
1279 ),
1280 1,
1281 )
1282 .into(),
1283 ),
1284 (
1285 "test.testactor",
1286 GangId(WorldId("test".into()), "testactor".into()).into(),
1287 ),
1288 (
1289 "tcp:[::1]:1234,test,testactor[123]",
1290 ActorId(
1291 ProcId::Direct("tcp:[::1]:1234".parse().unwrap(), "test".to_string()),
1292 "testactor".to_string(),
1293 123,
1294 )
1295 .into(),
1296 ),
1297 ];
1298
1299 for (s, expected) in cases {
1300 let got: Reference = s.parse().unwrap();
1301 assert_eq!(got, expected);
1302 }
1303 }
1304
1305 #[test]
1306 fn test_reference_parse_error() {
1307 let cases: Vec<&str> = vec!["(blah)", "world(1, 2, 3)"];
1308
1309 for s in cases {
1310 let result: Result<Reference, ReferenceParsingError> = s.parse();
1311 assert!(result.is_err());
1312 }
1313 }
1314
1315 #[test]
1316 fn test_id_macro() {
1317 assert_eq!(id!(hello), WorldId("hello".into()));
1318 assert_eq!(id!(hello[0]), ProcId::Ranked(WorldId("hello".into()), 0));
1319 assert_eq!(
1320 id!(hello[0].actor),
1321 ActorId(
1322 ProcId::Ranked(WorldId("hello".into()), 0),
1323 "actor".into(),
1324 0
1325 )
1326 );
1327 assert_eq!(
1328 id!(hello[0].actor[1]),
1329 ActorId(
1330 ProcId::Ranked(WorldId("hello".into()), 0),
1331 "actor".into(),
1332 1
1333 )
1334 );
1335 assert_eq!(
1336 id!(hello.actor),
1337 GangId(WorldId("hello".into()), "actor".into())
1338 );
1339 }
1340
1341 #[test]
1342 fn test_reference_ord() {
1343 let expected: Vec<Reference> = [
1344 "first",
1345 "second",
1346 "second.actor1",
1347 "second.actor2",
1348 "second[1]",
1349 "second[1].actor1",
1350 "second[1].actor2",
1351 "second[2]",
1352 "second[2].actor100",
1353 "third",
1354 "third.actor",
1355 "third[2]",
1356 "third[2].actor",
1357 "third[2].actor[1]",
1358 ]
1359 .into_iter()
1360 .map(|s| s.parse().unwrap())
1361 .collect();
1362
1363 let mut sorted = expected.to_vec();
1364 sorted.shuffle(&mut thread_rng());
1365 sorted.sort();
1366
1367 assert_eq!(sorted, expected);
1368 }
1369}