1#![allow(dead_code)] use std::cmp::Ord;
26use std::cmp::Ordering;
27use std::cmp::PartialOrd;
28use std::convert::From;
29use std::fmt;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::marker::PhantomData;
33use std::num::ParseIntError;
34use std::str::FromStr;
35
36use derivative::Derivative;
37use enum_as_inner::EnumAsInner;
38use hyperactor_config::Flattrs;
39use serde::Deserialize;
40use serde::Deserializer;
41use serde::Serialize;
42use serde::Serializer;
43use typeuri::ACTOR_PORT_BIT;
44use typeuri::Named;
45use wirevalue::TypeInfo;
46
47use crate::Actor;
48use crate::ActorHandle;
49use crate::RemoteHandles;
50use crate::RemoteMessage;
51use crate::accum::ReducerMode;
52use crate::accum::ReducerSpec;
53use crate::accum::StreamingReducerOpts;
54use crate::actor::Referable;
55use crate::channel::ChannelAddr;
56use crate::context;
57use crate::context::MailboxExt;
58use crate::mailbox::MailboxSenderError;
59use crate::mailbox::MailboxSenderErrorKind;
60use crate::mailbox::PortSink;
61use crate::message::Bind;
62use crate::message::Bindings;
63use crate::message::Unbind;
64
65pub mod lex;
66pub mod name;
67mod parse;
68
69use parse::Lexer;
70use parse::ParseError;
71use parse::Token;
72pub use parse::is_valid_ident;
73use parse::parse;
74
75#[derive(strum::Display)]
77pub enum ReferenceKind {
78 Proc,
80 Actor,
82 Port,
84}
85
86#[derive(
100 Debug,
101 Serialize,
102 Deserialize,
103 Clone,
104 PartialEq,
105 Eq,
106 Hash,
107 typeuri::Named,
108 EnumAsInner
109)]
110pub enum Reference {
111 Proc(ProcId),
113 Actor(ActorId), Port(PortId),
117}
118
119impl Reference {
120 pub fn is_prefix_of(&self, other: &Reference) -> bool {
122 match self {
123 Self::Proc(_) => self.proc_id() == other.proc_id(),
124 Self::Actor(_) => self == other,
125 Self::Port(_) => self == other,
126 }
127 }
128
129 pub fn proc_id(&self) -> Option<&ProcId> {
131 match self {
132 Self::Proc(proc_id) => Some(proc_id),
133 Self::Actor(actor_id) => Some(actor_id.proc_id()),
134 Self::Port(port_id) => Some(port_id.actor_id().proc_id()),
135 }
136 }
137
138 pub fn actor_id(&self) -> Option<&ActorId> {
140 match self {
141 Self::Proc(_) => None,
142 Self::Actor(actor_id) => Some(actor_id),
143 Self::Port(port_id) => Some(port_id.actor_id()),
144 }
145 }
146
147 fn actor_name(&self) -> Option<&str> {
149 match self {
150 Self::Proc(_) => None,
151 Self::Actor(actor_id) => Some(actor_id.name()),
152 Self::Port(port_id) => Some(port_id.actor_id().name()),
153 }
154 }
155
156 fn pid(&self) -> Option<Index> {
158 match self {
159 Self::Proc(_) => None,
160 Self::Actor(actor_id) => Some(actor_id.pid()),
161 Self::Port(port_id) => Some(port_id.actor_id().pid()),
162 }
163 }
164
165 fn port(&self) -> Option<u64> {
167 match self {
168 Self::Proc(_) => None,
169 Self::Actor(_) => None,
170 Self::Port(port_id) => Some(port_id.index()),
171 }
172 }
173
174 pub fn kind(&self) -> ReferenceKind {
176 match self {
177 Self::Proc(_) => ReferenceKind::Proc,
178 Self::Actor(_) => ReferenceKind::Actor,
179 Self::Port(_) => ReferenceKind::Port,
180 }
181 }
182}
183
184impl PartialOrd for Reference {
185 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
186 Some(self.cmp(other))
187 }
188}
189
190impl Ord for Reference {
191 fn cmp(&self, other: &Self) -> Ordering {
192 (self.proc_id(), self.actor_name(), self.pid(), self.port()).cmp(&(
194 other.proc_id(),
195 other.actor_name(),
196 other.pid(),
197 other.port(),
198 ))
199 }
200}
201
202impl fmt::Display for Reference {
203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204 match self {
205 Self::Proc(proc_id) => fmt::Display::fmt(proc_id, f),
206 Self::Actor(actor_id) => fmt::Display::fmt(actor_id, f),
207 Self::Port(port_id) => fmt::Display::fmt(port_id, f),
208 }
209 }
210}
211
212#[derive(thiserror::Error, Debug)]
214pub enum ReferenceParsingError {
215 #[error("expected token")]
217 Empty,
218
219 #[error("unexpected token: {0}")]
221 Unexpected(String),
222
223 #[error(transparent)]
225 ParseInt(#[from] ParseIntError),
226
227 #[error("parse: {0}")]
229 Parse(#[from] ParseError),
230
231 #[error("wrong reference type: expected {0}")]
233 WrongType(String),
234
235 #[error("invalid channel address {0}: {1}")]
237 InvalidChannelAddress(String, anyhow::Error),
238}
239
240impl FromStr for Reference {
241 type Err = ReferenceParsingError;
242
243 fn from_str(addr: &str) -> Result<Self, Self::Err> {
244 match addr.split_once(",") {
248 Some((channel_addr, rest)) => {
249 let channel_addr = channel_addr.parse().map_err(|err| {
250 ReferenceParsingError::InvalidChannelAddress(channel_addr.to_string(), err)
251 })?;
252
253 Ok(parse! {
254 Lexer::new(rest);
255
256 Token::Elem(proc_name) =>
258 Self::Proc(ProcId::with_name(channel_addr, proc_name)),
259
260 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name) =>
262 Self::Actor(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, 0)),
263
264 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
266 Token::LeftBracket Token::Uint(pid) Token::RightBracket =>
267 Self::Actor(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid)),
268
269 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
271 Token::LeftBracket Token::Uint(pid) Token::RightBracket
272 Token::LeftBracket Token::Uint(index) Token::RightBracket =>
273 Self::Port(PortId::new(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid), index as u64)),
274
275 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
277 Token::LeftBracket Token::Uint(pid) Token::RightBracket
278 Token::LeftBracket Token::Uint(index)
279 Token::LessThan Token::Elem(_type) Token::GreaterThan
280 Token::RightBracket =>
281 Self::Port(PortId::new(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid), index as u64)),
282 }?)
283 }
284
285 None => Err(ReferenceParsingError::Unexpected(format!(
286 "expected a comma-separated reference, got: {}",
287 addr
288 ))),
289 }
290 }
291}
292
293impl From<ProcId> for Reference {
294 fn from(proc_id: ProcId) -> Self {
295 Self::Proc(proc_id)
296 }
297}
298
299impl From<ActorId> for Reference {
300 fn from(actor_id: ActorId) -> Self {
301 Self::Actor(actor_id)
302 }
303}
304
305impl From<PortId> for Reference {
306 fn from(port_id: PortId) -> Self {
307 Self::Port(port_id)
308 }
309}
310
311pub type Index = usize;
314
315#[derive(
319 Debug,
320 Serialize,
321 Deserialize,
322 Clone,
323 PartialEq,
324 Eq,
325 PartialOrd,
326 Hash,
327 Ord,
328 typeuri::Named
329)]
330pub struct ProcId(ChannelAddr, String);
331
332fn addr_hash_suffix(addr: &ChannelAddr) -> String {
334 use std::collections::hash_map::DefaultHasher;
335 let mut hasher = DefaultHasher::new();
336 addr.hash(&mut hasher);
337 format!("{:08x}", hasher.finish() as u32)
338}
339
340impl ProcId {
341 pub fn unique(addr: ChannelAddr, base_name: impl Into<String>) -> Self {
346 let base = base_name.into();
347 let suffix = addr_hash_suffix(&addr);
348 Self(addr, format!("{}-{}", base, suffix))
349 }
350
351 pub fn with_name(addr: ChannelAddr, name: impl Into<String>) -> Self {
356 Self(addr, name.into())
357 }
358
359 pub fn base_name(&self) -> &str {
364 match self.1.rsplit_once('-') {
365 Some((base, suffix))
366 if suffix.len() == 8 && suffix.chars().all(|c| c.is_ascii_hexdigit()) =>
367 {
368 base
369 }
370 _ => &self.1,
371 }
372 }
373
374 pub fn actor_id(&self, name: impl Into<String>, pid: Index) -> ActorId {
376 ActorId(self.clone(), name.into(), pid)
377 }
378
379 pub fn addr(&self) -> &ChannelAddr {
381 &self.0
382 }
383
384 pub fn name(&self) -> &str {
386 &self.1
387 }
388}
389
390impl fmt::Display for ProcId {
391 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392 write!(f, "{},{}", self.0, self.1)
393 }
394}
395
396impl FromStr for ProcId {
397 type Err = ReferenceParsingError;
398
399 fn from_str(addr: &str) -> Result<Self, Self::Err> {
400 match addr.parse()? {
401 Reference::Proc(proc_id) => Ok(proc_id),
402 _ => Err(ReferenceParsingError::WrongType("proc".into())),
403 }
404 }
405}
406
407#[derive(
409 Debug,
410 Serialize,
411 Deserialize,
412 Clone,
413 PartialEq,
414 Eq,
415 PartialOrd,
416 Hash,
417 Ord,
418 typeuri::Named
419)]
420pub struct ActorId(ProcId, String, Index);
421
422hyperactor_config::impl_attrvalue!(ActorId);
423
424impl ActorId {
425 pub fn new(proc_id: ProcId, name: impl Into<String>, pid: Index) -> Self {
427 Self(proc_id, name.into(), pid)
428 }
429
430 pub fn port_id(&self, port: u64) -> PortId {
432 PortId(self.clone(), port)
433 }
434
435 pub fn child_id(&self, pid: Index) -> Self {
437 Self(self.0.clone(), self.1.clone(), pid)
438 }
439
440 pub fn root(proc_id: ProcId, name: String) -> Self {
442 Self(proc_id, name, 0)
443 }
444
445 pub fn proc_id(&self) -> &ProcId {
447 &self.0
448 }
449
450 pub fn name(&self) -> &str {
452 &self.1
453 }
454
455 pub fn pid(&self) -> Index {
457 self.2
458 }
459}
460
461impl fmt::Display for ActorId {
462 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463 write!(f, "{},{}[{}]", self.0, self.1, self.2)
464 }
465}
466impl<A: Referable> From<ActorRef<A>> for ActorId {
467 fn from(actor_ref: ActorRef<A>) -> Self {
468 actor_ref.actor_id.clone()
469 }
470}
471
472impl<'a, A: Referable> From<&'a ActorRef<A>> for &'a ActorId {
473 fn from(actor_ref: &'a ActorRef<A>) -> Self {
474 &actor_ref.actor_id
475 }
476}
477
478impl FromStr for ActorId {
479 type Err = ReferenceParsingError;
480
481 fn from_str(addr: &str) -> Result<Self, Self::Err> {
482 match addr.parse()? {
483 Reference::Actor(actor_id) => Ok(actor_id),
484 _ => Err(ReferenceParsingError::WrongType("actor".into())),
485 }
486 }
487}
488
489#[derive(typeuri::Named)]
491pub struct ActorRef<A: Referable> {
492 pub(crate) actor_id: ActorId,
493 phantom: PhantomData<fn() -> A>,
495}
496
497impl<A: Referable> ActorRef<A> {
498 pub fn port<M: RemoteMessage>(&self) -> PortRef<M>
500 where
501 A: RemoteHandles<M>,
502 {
503 PortRef::attest(self.actor_id.port_id(<M as Named>::port()))
504 }
505
506 pub fn send<M: RemoteMessage>(
508 &self,
509 cx: &impl context::Actor,
510 message: M,
511 ) -> Result<(), MailboxSenderError>
512 where
513 A: RemoteHandles<M>,
514 {
515 self.port().send(cx, message)
516 }
517
518 pub fn send_with_headers<M: RemoteMessage>(
521 &self,
522 cx: &impl context::Actor,
523 headers: Flattrs,
524 message: M,
525 ) -> Result<(), MailboxSenderError>
526 where
527 A: RemoteHandles<M>,
528 {
529 self.port().send_with_headers(cx, headers, message)
530 }
531
532 pub fn attest(actor_id: ActorId) -> Self {
537 Self {
538 actor_id,
539 phantom: PhantomData,
540 }
541 }
542
543 pub fn actor_id(&self) -> &ActorId {
545 &self.actor_id
546 }
547
548 pub fn into_actor_id(self) -> ActorId {
550 self.actor_id
551 }
552
553 pub fn downcast_handle(&self, cx: &impl context::Actor) -> Option<ActorHandle<A>>
557 where
558 A: Actor,
559 {
560 cx.instance().proc().resolve_actor_ref(self)
561 }
562}
563
564impl<A: Referable> Serialize for ActorRef<A> {
566 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
567 where
568 S: Serializer,
569 {
570 self.actor_id().serialize(serializer)
572 }
573}
574
575impl<'de, A: Referable> Deserialize<'de> for ActorRef<A> {
577 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
578 where
579 D: Deserializer<'de>,
580 {
581 let actor_id = <ActorId>::deserialize(deserializer)?;
582 Ok(ActorRef {
583 actor_id,
584 phantom: PhantomData,
585 })
586 }
587}
588
589impl<A: Referable> fmt::Debug for ActorRef<A> {
591 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592 f.debug_struct("ActorRef")
593 .field("actor_id", &self.actor_id)
594 .field("type", &std::any::type_name::<A>())
595 .finish()
596 }
597}
598
599impl<A: Referable> fmt::Display for ActorRef<A> {
600 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601 fmt::Display::fmt(&self.actor_id, f)?;
602 write!(f, "<{}>", std::any::type_name::<A>())
603 }
604}
605
606impl<A: Referable> Clone for ActorRef<A> {
608 fn clone(&self) -> Self {
609 Self {
610 actor_id: self.actor_id.clone(),
611 phantom: PhantomData,
612 }
613 }
614}
615
616impl<A: Referable> PartialEq for ActorRef<A> {
617 fn eq(&self, other: &Self) -> bool {
618 self.actor_id == other.actor_id
619 }
620}
621
622impl<A: Referable> Eq for ActorRef<A> {}
623
624impl<A: Referable> PartialOrd for ActorRef<A> {
625 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
626 Some(self.cmp(other))
627 }
628}
629
630impl<A: Referable> Ord for ActorRef<A> {
631 fn cmp(&self, other: &Self) -> Ordering {
632 self.actor_id.cmp(&other.actor_id)
633 }
634}
635
636impl<A: Referable> Hash for ActorRef<A> {
637 fn hash<H: Hasher>(&self, state: &mut H) {
638 self.actor_id.hash(state);
639 }
640}
641
642#[derive(
647 Debug,
648 Serialize,
649 Deserialize,
650 Clone,
651 PartialEq,
652 Eq,
653 PartialOrd,
654 Hash,
655 Ord,
656 typeuri::Named
657)]
658pub struct PortId(ActorId, u64);
659
660impl PortId {
661 pub fn new(actor_id: ActorId, port: u64) -> Self {
663 Self(actor_id, port)
664 }
665
666 pub fn actor_id(&self) -> &ActorId {
668 &self.0
669 }
670
671 pub fn into_actor_id(self) -> ActorId {
673 self.0
674 }
675
676 pub fn index(&self) -> u64 {
678 self.1
679 }
680
681 pub(crate) fn is_actor_port(&self) -> bool {
682 self.1 & ACTOR_PORT_BIT != 0
683 }
684
685 pub fn send(&self, cx: &impl context::Actor, serialized: wirevalue::Any) {
689 let mut headers = Flattrs::new();
690 crate::mailbox::headers::set_send_timestamp(&mut headers);
691 cx.post(
692 self.clone(),
693 headers,
694 serialized,
695 true,
696 context::SeqInfoPolicy::AssignNew,
697 );
698 }
699
700 pub fn send_with_headers(
704 &self,
705 cx: &impl context::Actor,
706 serialized: wirevalue::Any,
707 mut headers: Flattrs,
708 ) {
709 crate::mailbox::headers::set_send_timestamp(&mut headers);
710 cx.post(
711 self.clone(),
712 headers,
713 serialized,
714 true,
715 context::SeqInfoPolicy::AssignNew,
716 );
717 }
718
719 pub fn split(
722 &self,
723 cx: &impl context::Actor,
724 reducer_spec: Option<ReducerSpec>,
725 reducer_mode: ReducerMode,
726 return_undeliverable: bool,
727 ) -> anyhow::Result<PortId> {
728 cx.split(
729 self.clone(),
730 reducer_spec,
731 reducer_mode,
732 return_undeliverable,
733 )
734 }
735}
736
737impl FromStr for PortId {
738 type Err = ReferenceParsingError;
739
740 fn from_str(addr: &str) -> Result<Self, Self::Err> {
741 match addr.parse()? {
742 Reference::Port(port_id) => Ok(port_id),
743 _ => Err(ReferenceParsingError::WrongType("port".into())),
744 }
745 }
746}
747
748impl fmt::Display for PortId {
749 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
750 let PortId(actor_id, port) = self;
751 if self.is_actor_port() {
752 let type_info = TypeInfo::get(*port).or_else(|| TypeInfo::get(*port & !ACTOR_PORT_BIT));
753 let typename = type_info.map_or("unknown", TypeInfo::typename);
754 write!(f, "{}[{}<{}>]", actor_id, port, typename)
755 } else {
756 write!(f, "{}[{}]", actor_id, port)
757 }
758 }
759}
760
761#[derive(Debug, Serialize, Deserialize, Derivative, typeuri::Named)]
764#[derivative(PartialEq, Eq, PartialOrd, Hash, Ord)]
765pub struct PortRef<M> {
766 port_id: PortId,
767 #[derivative(
768 PartialEq = "ignore",
769 PartialOrd = "ignore",
770 Ord = "ignore",
771 Hash = "ignore"
772 )]
773 reducer_spec: Option<ReducerSpec>,
774 #[derivative(
775 PartialEq = "ignore",
776 PartialOrd = "ignore",
777 Ord = "ignore",
778 Hash = "ignore"
779 )]
780 streaming_opts: StreamingReducerOpts,
781 phantom: PhantomData<M>,
782 return_undeliverable: bool,
783 #[derivative(
784 PartialEq = "ignore",
785 PartialOrd = "ignore",
786 Ord = "ignore",
787 Hash = "ignore"
788 )]
789 unsplit: bool,
790}
791
792impl<M: RemoteMessage> PortRef<M> {
793 pub fn attest(port_id: PortId) -> Self {
796 Self {
797 port_id,
798 reducer_spec: None,
799 streaming_opts: StreamingReducerOpts::default(),
800 phantom: PhantomData,
801 return_undeliverable: true,
802 unsplit: false,
803 }
804 }
805
806 pub fn attest_reducible(
809 port_id: PortId,
810 reducer_spec: Option<ReducerSpec>,
811 streaming_opts: StreamingReducerOpts,
812 ) -> Self {
813 Self {
814 port_id,
815 reducer_spec,
816 streaming_opts,
817 phantom: PhantomData,
818 return_undeliverable: true,
819 unsplit: false,
820 }
821 }
822
823 pub fn unsplit(mut self) -> Self {
825 self.unsplit = true;
826 self
827 }
828
829 pub fn attest_message_port(actor: &ActorId) -> Self {
832 PortRef::<M>::attest(actor.port_id(<M as Named>::port()))
833 }
834
835 pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
838 &self.reducer_spec
839 }
840
841 pub fn port_id(&self) -> &PortId {
843 &self.port_id
844 }
845
846 pub fn into_port_id(self) -> PortId {
848 self.port_id
849 }
850
851 pub fn into_once(self) -> OncePortRef<M> {
854 let return_undeliverable = self.return_undeliverable;
855 let unsplit = self.unsplit;
856 let mut once = OncePortRef::attest(self.into_port_id());
857 once.return_undeliverable = return_undeliverable;
858 once.unsplit = unsplit;
859 once
860 }
861
862 pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
865 self.send_with_headers(cx, Flattrs::new(), message)
866 }
867
868 pub fn send_with_headers(
872 &self,
873 cx: &impl context::Actor,
874 headers: Flattrs,
875 message: M,
876 ) -> Result<(), MailboxSenderError> {
877 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
878 MailboxSenderError::new_bound(
879 self.port_id.clone(),
880 MailboxSenderErrorKind::Serialize(err.into()),
881 )
882 })?;
883 self.send_serialized(cx, headers, serialized);
884 Ok(())
885 }
886
887 pub fn send_serialized(
890 &self,
891 cx: &impl context::Actor,
892 mut headers: Flattrs,
893 message: wirevalue::Any,
894 ) {
895 crate::mailbox::headers::set_send_timestamp(&mut headers);
896 crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
897 cx.post(
898 self.port_id.clone(),
899 headers,
900 message,
901 self.return_undeliverable,
902 context::SeqInfoPolicy::AssignNew,
903 );
904 }
905
906 pub fn into_sink<C: context::Actor>(self, cx: C) -> PortSink<C, M> {
908 PortSink::new(cx, self)
909 }
910
911 pub fn get_return_undeliverable(&self) -> bool {
914 self.return_undeliverable
915 }
916
917 pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
920 self.return_undeliverable = return_undeliverable;
921 }
922}
923
924impl<M: RemoteMessage> Clone for PortRef<M> {
925 fn clone(&self) -> Self {
926 Self {
927 port_id: self.port_id.clone(),
928 reducer_spec: self.reducer_spec.clone(),
929 streaming_opts: self.streaming_opts.clone(),
930 phantom: PhantomData,
931 return_undeliverable: self.return_undeliverable,
932 unsplit: self.unsplit,
933 }
934 }
935}
936
937impl<M: RemoteMessage> fmt::Display for PortRef<M> {
938 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
939 fmt::Display::fmt(&self.port_id, f)
940 }
941}
942
943#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
945pub enum UnboundPortKind {
946 Streaming(Option<StreamingReducerOpts>),
948 Once,
950}
951
952#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
954pub struct UnboundPort(
955 pub PortId,
956 pub Option<ReducerSpec>,
957 pub bool, pub UnboundPortKind,
959 pub bool, );
961wirevalue::register_type!(UnboundPort);
962
963impl UnboundPort {
964 pub fn update(&mut self, port_id: PortId) {
966 self.0 = port_id;
967 }
968}
969
970impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
971 fn from(port_ref: &PortRef<M>) -> Self {
972 UnboundPort(
973 port_ref.port_id.clone(),
974 port_ref.reducer_spec.clone(),
975 port_ref.return_undeliverable,
976 UnboundPortKind::Streaming(Some(port_ref.streaming_opts.clone())),
977 port_ref.unsplit,
978 )
979 }
980}
981
982impl<M: RemoteMessage> Unbind for PortRef<M> {
983 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
984 bindings.push_back(&UnboundPort::from(self))
985 }
986}
987
988impl<M: RemoteMessage> Bind for PortRef<M> {
989 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
990 let UnboundPort(port_id, reducer_spec, return_undeliverable, port_kind, unsplit) =
991 bindings.try_pop_front::<UnboundPort>()?;
992 self.port_id = port_id;
993 self.reducer_spec = reducer_spec;
994 self.return_undeliverable = return_undeliverable;
995 self.unsplit = unsplit;
996 self.streaming_opts = match port_kind {
997 UnboundPortKind::Streaming(opts) => opts.unwrap_or_default(),
998 UnboundPortKind::Once => {
999 anyhow::bail!("OncePortRef cannot be bound to PortRef")
1000 }
1001 };
1002 Ok(())
1003 }
1004}
1005
1006#[derive(Debug, Serialize, Deserialize, PartialEq)]
1010pub struct OncePortRef<M> {
1011 port_id: PortId,
1012 reducer_spec: Option<ReducerSpec>,
1013 return_undeliverable: bool,
1014 unsplit: bool,
1015 phantom: PhantomData<M>,
1016}
1017
1018impl<M: RemoteMessage> OncePortRef<M> {
1019 pub(crate) fn attest(port_id: PortId) -> Self {
1020 Self {
1021 port_id,
1022 reducer_spec: None,
1023 return_undeliverable: true,
1024 unsplit: false,
1025 phantom: PhantomData,
1026 }
1027 }
1028
1029 pub fn attest_reducible(port_id: PortId, reducer_spec: Option<ReducerSpec>) -> Self {
1032 Self {
1033 port_id,
1034 reducer_spec,
1035 return_undeliverable: true,
1036 unsplit: false,
1037 phantom: PhantomData,
1038 }
1039 }
1040
1041 pub fn unsplit(mut self) -> Self {
1043 self.unsplit = true;
1044 self
1045 }
1046
1047 pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
1049 &self.reducer_spec
1050 }
1051
1052 pub fn port_id(&self) -> &PortId {
1054 &self.port_id
1055 }
1056
1057 pub fn into_port_id(self) -> PortId {
1059 self.port_id
1060 }
1061
1062 pub fn send(self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1065 self.send_with_headers(cx, Flattrs::new(), message)
1066 }
1067
1068 pub fn send_with_headers(
1071 self,
1072 cx: &impl context::Actor,
1073 mut headers: Flattrs,
1074 message: M,
1075 ) -> Result<(), MailboxSenderError> {
1076 crate::mailbox::headers::set_send_timestamp(&mut headers);
1077 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
1078 MailboxSenderError::new_bound(
1079 self.port_id.clone(),
1080 MailboxSenderErrorKind::Serialize(err.into()),
1081 )
1082 })?;
1083 cx.post(
1084 self.port_id.clone(),
1085 headers,
1086 serialized,
1087 self.return_undeliverable,
1088 context::SeqInfoPolicy::AssignNew,
1089 );
1090 Ok(())
1091 }
1092
1093 pub fn get_return_undeliverable(&self) -> bool {
1096 self.return_undeliverable
1097 }
1098
1099 pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
1102 self.return_undeliverable = return_undeliverable;
1103 }
1104}
1105
1106impl<M: RemoteMessage> Clone for OncePortRef<M> {
1107 fn clone(&self) -> Self {
1108 Self {
1109 port_id: self.port_id.clone(),
1110 reducer_spec: self.reducer_spec.clone(),
1111 return_undeliverable: self.return_undeliverable,
1112 unsplit: self.unsplit,
1113 phantom: PhantomData,
1114 }
1115 }
1116}
1117
1118impl<M: RemoteMessage> fmt::Display for OncePortRef<M> {
1119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1120 fmt::Display::fmt(&self.port_id, f)
1121 }
1122}
1123
1124impl<M: RemoteMessage> Named for OncePortRef<M> {
1125 fn typename() -> &'static str {
1126 wirevalue::intern_typename!(Self, "hyperactor::mailbox::OncePortRef<{}>", M)
1127 }
1128}
1129
1130impl<M: RemoteMessage> From<&OncePortRef<M>> for UnboundPort {
1131 fn from(port_ref: &OncePortRef<M>) -> Self {
1132 UnboundPort(
1133 port_ref.port_id.clone(),
1134 port_ref.reducer_spec.clone(),
1135 true, UnboundPortKind::Once,
1137 port_ref.unsplit,
1138 )
1139 }
1140}
1141
1142impl<M: RemoteMessage> Unbind for OncePortRef<M> {
1143 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
1144 bindings.push_back(&UnboundPort::from(self))
1145 }
1146}
1147
1148impl<M: RemoteMessage> Bind for OncePortRef<M> {
1149 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
1150 let UnboundPort(port_id, reducer_spec, _return_undeliverable, port_kind, unsplit) =
1151 bindings.try_pop_front::<UnboundPort>()?;
1152 match port_kind {
1153 UnboundPortKind::Once => {
1154 self.port_id = port_id;
1155 self.reducer_spec = reducer_spec;
1156 self.unsplit = unsplit;
1157 Ok(())
1158 }
1159 UnboundPortKind::Streaming(_) => {
1160 anyhow::bail!("PortRef cannot be bound to OncePortRef")
1161 }
1162 }
1163 }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use rand::seq::SliceRandom;
1169 use tokio::sync::mpsc;
1170 use uuid::Uuid;
1171
1172 use super::*;
1173 use crate::Proc;
1175 use crate::context::Mailbox as _;
1176 use crate::mailbox::PortLocation;
1177 use crate::ordering::SEQ_INFO;
1178 use crate::ordering::SeqInfo;
1179
1180 #[test]
1181 fn test_reference_parse() {
1182 let cases: Vec<(&str, Reference)> = vec![
1183 (
1184 "tcp:[::1]:1234,test",
1185 ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test").into(),
1186 ),
1187 (
1188 "tcp:[::1]:1234,test,testactor[123]",
1189 ActorId::new(
1190 ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1191 "testactor",
1192 123,
1193 )
1194 .into(),
1195 ),
1196 (
1197 "tcp:[::1]:1234,test,testactor[0][123<my::type>]",
1199 PortId::new(
1200 ActorId::new(
1201 ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1202 "testactor",
1203 0,
1204 ),
1205 123,
1206 )
1207 .into(),
1208 ),
1209 ];
1210
1211 for (s, expected) in cases {
1212 assert_eq!(s.parse::<Reference>().unwrap(), expected, "for {}", s);
1213 }
1214 }
1215
1216 #[test]
1217 fn test_reference_parse_error() {
1218 let cases: Vec<&str> = vec!["(blah)", "world(1, 2, 3)", "test"];
1219
1220 for s in cases {
1221 let result: Result<Reference, ReferenceParsingError> = s.parse();
1222 assert!(result.is_err(), "expected error for: {}", s);
1223 }
1224 }
1225
1226 #[test]
1227 fn test_reference_ord() {
1228 let expected: Vec<Reference> = [
1229 "tcp:[::1]:1234,first",
1230 "tcp:[::1]:1234,second",
1231 "tcp:[::1]:1234,third",
1232 ]
1233 .into_iter()
1234 .map(|s| s.parse().unwrap())
1235 .collect();
1236
1237 let mut sorted = expected.to_vec();
1238 sorted.shuffle(&mut rand::rng());
1239 sorted.sort();
1240
1241 assert_eq!(sorted, expected);
1242 }
1243
1244 #[test]
1245 fn test_port_type_annotation() {
1246 #[derive(typeuri::Named, Serialize, Deserialize)]
1247 struct MyType;
1248 wirevalue::register_type!(MyType);
1249 let port_id = PortId::new(
1250 ActorId::new(
1251 ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1252 "testactor",
1253 1,
1254 ),
1255 MyType::port(),
1256 );
1257 assert_eq!(
1258 port_id.to_string(),
1259 "tcp:[::1]:1234,test,testactor[1][17867850292987402005<hyperactor::reference::tests::MyType>]"
1260 );
1261 }
1262
1263 #[tokio::test]
1264 async fn test_sequencing_from_port_handle_ref_and_id() {
1265 let proc = Proc::local();
1266 let (client, _) = proc.instance("client").unwrap();
1267 let (tx, mut rx) = mpsc::unbounded_channel();
1268 let port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1269 let seq_info = headers.get(SEQ_INFO);
1270 tx.send(seq_info).unwrap();
1271 Ok(())
1272 });
1273 port_handle.send(&client, ()).unwrap();
1274 assert_eq!(rx.try_recv().unwrap().unwrap(), SeqInfo::Direct);
1277
1278 port_handle.bind_actor_port();
1279 let port_id = match port_handle.location() {
1280 PortLocation::Bound(port_id) => port_id,
1281 _ => panic!("port_handle should be bound"),
1282 };
1283 assert!(port_id.is_actor_port());
1284 let port_ref = PortRef::attest(port_id.clone());
1285
1286 port_handle.send(&client, ()).unwrap();
1287 let SeqInfo::Session {
1288 session_id,
1289 mut seq,
1290 } = rx.try_recv().unwrap().unwrap()
1291 else {
1292 panic!("expected session info");
1293 };
1294 assert_eq!(session_id, client.sequencer().session_id());
1295 assert_eq!(seq, 1);
1296
1297 fn assert_seq_info(
1298 rx: &mut mpsc::UnboundedReceiver<Option<SeqInfo>>,
1299 session_id: Uuid,
1300 seq: &mut u64,
1301 ) {
1302 *seq += 1;
1303 let SeqInfo::Session {
1304 session_id: rcved_session_id,
1305 seq: rcved_seq,
1306 } = rx.try_recv().unwrap().unwrap()
1307 else {
1308 panic!("expected session info");
1309 };
1310 assert_eq!(rcved_session_id, session_id);
1311 assert_eq!(rcved_seq, *seq);
1312 }
1313
1314 for _ in 0..10 {
1316 port_handle.send(&client, ()).unwrap();
1318 assert_seq_info(&mut rx, session_id, &mut seq);
1319
1320 for _ in 0..2 {
1322 port_ref.send(&client, ()).unwrap();
1323 assert_seq_info(&mut rx, session_id, &mut seq);
1324 }
1325
1326 for _ in 0..3 {
1328 port_id.send(&client, wirevalue::Any::serialize(&()).unwrap());
1329 assert_seq_info(&mut rx, session_id, &mut seq);
1330 }
1331 }
1332
1333 assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1334 }
1335
1336 #[tokio::test]
1337 async fn test_sequencing_from_port_handle_bound_to_allocated_port() {
1338 let proc = Proc::local();
1339 let (client, _) = proc.instance("client").unwrap();
1340 let (tx, mut rx) = mpsc::unbounded_channel();
1341 let port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1342 let seq_info = headers.get(SEQ_INFO);
1343 tx.send(seq_info).unwrap();
1344 Ok(())
1345 });
1346 port_handle.send(&client, ()).unwrap();
1347 assert_eq!(rx.try_recv().unwrap().unwrap(), SeqInfo::Direct);
1350
1351 port_handle.bind();
1353 let port_id = match port_handle.location() {
1354 PortLocation::Bound(port_id) => port_id,
1355 _ => panic!("port_handle should be bound"),
1356 };
1357 assert!(!port_id.is_actor_port());
1359
1360 port_handle.send(&client, ()).unwrap();
1362 let SeqInfo::Session {
1363 session_id,
1364 seq: seq1,
1365 } = rx
1366 .try_recv()
1367 .unwrap()
1368 .expect("non-actor port should have seq info")
1369 else {
1370 panic!("expected Session variant");
1371 };
1372 assert_eq!(seq1, 1);
1373 assert_eq!(session_id, client.sequencer().session_id());
1374
1375 let port_ref = PortRef::attest(port_id.clone());
1376 port_ref.send(&client, ()).unwrap();
1377 let SeqInfo::Session { seq: seq2, .. } = rx
1378 .try_recv()
1379 .unwrap()
1380 .expect("non-actor port should have seq info")
1381 else {
1382 panic!("expected Session variant");
1383 };
1384 assert_eq!(seq2, 2);
1385
1386 port_id.send(&client, wirevalue::Any::serialize(&()).unwrap());
1387 let SeqInfo::Session { seq: seq3, .. } = rx
1388 .try_recv()
1389 .unwrap()
1390 .expect("non-actor port should have seq info")
1391 else {
1392 panic!("expected Session variant");
1393 };
1394 assert_eq!(seq3, 3);
1395 }
1396}