1#![allow(dead_code)] use std::cmp::Ord;
26use std::cmp::Ordering;
27use std::cmp::PartialOrd;
28use std::convert::From;
29use std::fmt;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::marker::PhantomData;
33use std::num::ParseIntError;
34use std::str::FromStr;
35
36use derivative::Derivative;
37use enum_as_inner::EnumAsInner;
38use hyperactor_config::Flattrs;
39use serde::Deserialize;
40use serde::Deserializer;
41use serde::Serialize;
42use serde::Serializer;
43use typeuri::ACTOR_PORT_BIT;
44use typeuri::Named;
45use wirevalue::TypeInfo;
46
47use crate::Actor;
48use crate::ActorHandle;
49use crate::RemoteHandles;
50use crate::RemoteMessage;
51use crate::accum::ReducerMode;
52use crate::accum::ReducerSpec;
53use crate::accum::StreamingReducerOpts;
54use crate::actor::Referable;
55use crate::channel::ChannelAddr;
56use crate::context;
57use crate::context::MailboxExt;
58use crate::mailbox::MailboxSenderError;
59use crate::mailbox::MailboxSenderErrorKind;
60use crate::mailbox::PortSink;
61use crate::message::Bind;
62use crate::message::Bindings;
63use crate::message::Unbind;
64
65pub mod lex;
66pub mod name;
67mod parse;
68
69use parse::Lexer;
70use parse::ParseError;
71use parse::Token;
72pub use parse::is_valid_ident;
73use parse::parse;
74
75#[derive(strum::Display)]
77pub enum ReferenceKind {
78 Proc,
80 Actor,
82 Port,
84}
85
86#[derive(
100 Debug,
101 Serialize,
102 Deserialize,
103 Clone,
104 PartialEq,
105 Eq,
106 Hash,
107 typeuri::Named,
108 EnumAsInner
109)]
110pub enum Reference {
111 Proc(ProcId),
113 Actor(ActorId), Port(PortId),
117}
118
119impl Reference {
120 pub fn is_prefix_of(&self, other: &Reference) -> bool {
122 match self {
123 Self::Proc(_) => self.proc_id() == other.proc_id(),
124 Self::Actor(_) => self == other,
125 Self::Port(_) => self == other,
126 }
127 }
128
129 pub fn proc_id(&self) -> Option<&ProcId> {
131 match self {
132 Self::Proc(proc_id) => Some(proc_id),
133 Self::Actor(actor_id) => Some(actor_id.proc_id()),
134 Self::Port(port_id) => Some(port_id.actor_id().proc_id()),
135 }
136 }
137
138 pub fn actor_id(&self) -> Option<&ActorId> {
140 match self {
141 Self::Proc(_) => None,
142 Self::Actor(actor_id) => Some(actor_id),
143 Self::Port(port_id) => Some(port_id.actor_id()),
144 }
145 }
146
147 fn actor_name(&self) -> Option<&str> {
149 match self {
150 Self::Proc(_) => None,
151 Self::Actor(actor_id) => Some(actor_id.name()),
152 Self::Port(port_id) => Some(port_id.actor_id().name()),
153 }
154 }
155
156 fn pid(&self) -> Option<Index> {
158 match self {
159 Self::Proc(_) => None,
160 Self::Actor(actor_id) => Some(actor_id.pid()),
161 Self::Port(port_id) => Some(port_id.actor_id().pid()),
162 }
163 }
164
165 fn port(&self) -> Option<u64> {
167 match self {
168 Self::Proc(_) => None,
169 Self::Actor(_) => None,
170 Self::Port(port_id) => Some(port_id.index()),
171 }
172 }
173
174 pub fn kind(&self) -> ReferenceKind {
176 match self {
177 Self::Proc(_) => ReferenceKind::Proc,
178 Self::Actor(_) => ReferenceKind::Actor,
179 Self::Port(_) => ReferenceKind::Port,
180 }
181 }
182}
183
184impl PartialOrd for Reference {
185 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
186 Some(self.cmp(other))
187 }
188}
189
190impl Ord for Reference {
191 fn cmp(&self, other: &Self) -> Ordering {
192 (self.proc_id(), self.actor_name(), self.pid(), self.port()).cmp(&(
194 other.proc_id(),
195 other.actor_name(),
196 other.pid(),
197 other.port(),
198 ))
199 }
200}
201
202impl fmt::Display for Reference {
203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204 match self {
205 Self::Proc(proc_id) => fmt::Display::fmt(proc_id, f),
206 Self::Actor(actor_id) => fmt::Display::fmt(actor_id, f),
207 Self::Port(port_id) => fmt::Display::fmt(port_id, f),
208 }
209 }
210}
211
212#[derive(thiserror::Error, Debug)]
214pub enum ReferenceParsingError {
215 #[error("expected token")]
217 Empty,
218
219 #[error("unexpected token: {0}")]
221 Unexpected(String),
222
223 #[error(transparent)]
225 ParseInt(#[from] ParseIntError),
226
227 #[error("parse: {0}")]
229 Parse(#[from] ParseError),
230
231 #[error("wrong reference type: expected {0}")]
233 WrongType(String),
234
235 #[error("invalid channel address {0}: {1}")]
237 InvalidChannelAddress(String, anyhow::Error),
238}
239
240impl FromStr for Reference {
241 type Err = ReferenceParsingError;
242
243 fn from_str(addr: &str) -> Result<Self, Self::Err> {
244 match addr.split_once(",") {
248 Some((channel_addr, rest)) => {
249 let channel_addr = channel_addr.parse().map_err(|err| {
250 ReferenceParsingError::InvalidChannelAddress(channel_addr.to_string(), err)
251 })?;
252
253 Ok(parse! {
254 Lexer::new(rest);
255
256 Token::Elem(proc_name) =>
258 Self::Proc(ProcId::with_name(channel_addr, proc_name)),
259
260 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name) =>
262 Self::Actor(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, 0)),
263
264 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
266 Token::LeftBracket Token::Uint(pid) Token::RightBracket =>
267 Self::Actor(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid)),
268
269 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
271 Token::LeftBracket Token::Uint(pid) Token::RightBracket
272 Token::LeftBracket Token::Uint(index) Token::RightBracket =>
273 Self::Port(PortId::new(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid), index as u64)),
274
275 Token::Elem(proc_name) Token::Comma Token::Elem(actor_name)
277 Token::LeftBracket Token::Uint(pid) Token::RightBracket
278 Token::LeftBracket Token::Uint(index)
279 Token::LessThan Token::Elem(_type) Token::GreaterThan
280 Token::RightBracket =>
281 Self::Port(PortId::new(ActorId::new(ProcId::with_name(channel_addr, proc_name), actor_name, pid), index as u64)),
282 }?)
283 }
284
285 None => Err(ReferenceParsingError::Unexpected(format!(
286 "expected a comma-separated reference, got: {}",
287 addr
288 ))),
289 }
290 }
291}
292
293impl From<ProcId> for Reference {
294 fn from(proc_id: ProcId) -> Self {
295 Self::Proc(proc_id)
296 }
297}
298
299impl From<ActorId> for Reference {
300 fn from(actor_id: ActorId) -> Self {
301 Self::Actor(actor_id)
302 }
303}
304
305impl From<PortId> for Reference {
306 fn from(port_id: PortId) -> Self {
307 Self::Port(port_id)
308 }
309}
310
311pub type Index = usize;
314
315#[derive(
319 Debug,
320 Serialize,
321 Deserialize,
322 Clone,
323 PartialEq,
324 Eq,
325 PartialOrd,
326 Hash,
327 Ord,
328 typeuri::Named
329)]
330pub struct ProcId(ChannelAddr, String);
331
332fn addr_hash_suffix(addr: &ChannelAddr) -> String {
334 use std::collections::hash_map::DefaultHasher;
335 let mut hasher = DefaultHasher::new();
336 addr.hash(&mut hasher);
337 format!("{:08x}", hasher.finish() as u32)
338}
339
340impl ProcId {
341 pub fn unique(addr: ChannelAddr, base_name: impl Into<String>) -> Self {
346 let base = base_name.into();
347 let suffix = addr_hash_suffix(&addr);
348 Self(addr, format!("{}-{}", base, suffix))
349 }
350
351 pub fn with_name(addr: ChannelAddr, name: impl Into<String>) -> Self {
356 Self(addr, name.into())
357 }
358
359 pub fn base_name(&self) -> &str {
364 match self.1.rsplit_once('-') {
365 Some((base, suffix))
366 if suffix.len() == 8 && suffix.chars().all(|c| c.is_ascii_hexdigit()) =>
367 {
368 base
369 }
370 _ => &self.1,
371 }
372 }
373
374 pub fn actor_id(&self, name: impl Into<String>, pid: Index) -> ActorId {
376 ActorId(self.clone(), name.into(), pid)
377 }
378
379 pub fn addr(&self) -> &ChannelAddr {
381 &self.0
382 }
383
384 pub fn name(&self) -> &str {
386 &self.1
387 }
388}
389
390impl fmt::Display for ProcId {
391 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392 write!(f, "{},{}", self.0, self.1)
393 }
394}
395
396impl FromStr for ProcId {
397 type Err = ReferenceParsingError;
398
399 fn from_str(addr: &str) -> Result<Self, Self::Err> {
400 match addr.parse()? {
401 Reference::Proc(proc_id) => Ok(proc_id),
402 _ => Err(ReferenceParsingError::WrongType("proc".into())),
403 }
404 }
405}
406
407#[derive(
409 Debug,
410 Serialize,
411 Deserialize,
412 Clone,
413 PartialEq,
414 Eq,
415 PartialOrd,
416 Hash,
417 Ord,
418 typeuri::Named
419)]
420pub struct ActorId(ProcId, String, Index);
421
422hyperactor_config::impl_attrvalue!(ActorId);
423
424impl ActorId {
425 pub fn new(proc_id: ProcId, name: impl Into<String>, pid: Index) -> Self {
427 Self(proc_id, name.into(), pid)
428 }
429
430 pub fn port_id(&self, port: u64) -> PortId {
432 PortId(self.clone(), port)
433 }
434
435 pub fn child_id(&self, pid: Index) -> Self {
437 Self(self.0.clone(), self.1.clone(), pid)
438 }
439
440 pub fn root(proc_id: ProcId, name: String) -> Self {
442 Self(proc_id, name, 0)
443 }
444
445 pub fn proc_id(&self) -> &ProcId {
447 &self.0
448 }
449
450 pub fn name(&self) -> &str {
452 &self.1
453 }
454
455 pub fn pid(&self) -> Index {
457 self.2
458 }
459}
460
461impl fmt::Display for ActorId {
462 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463 write!(f, "{},{}[{}]", self.0, self.1, self.2)
464 }
465}
466impl<A: Referable> From<ActorRef<A>> for ActorId {
467 fn from(actor_ref: ActorRef<A>) -> Self {
468 actor_ref.actor_id.clone()
469 }
470}
471
472impl<'a, A: Referable> From<&'a ActorRef<A>> for &'a ActorId {
473 fn from(actor_ref: &'a ActorRef<A>) -> Self {
474 &actor_ref.actor_id
475 }
476}
477
478impl FromStr for ActorId {
479 type Err = ReferenceParsingError;
480
481 fn from_str(addr: &str) -> Result<Self, Self::Err> {
482 match addr.parse()? {
483 Reference::Actor(actor_id) => Ok(actor_id),
484 _ => Err(ReferenceParsingError::WrongType("actor".into())),
485 }
486 }
487}
488
489#[derive(typeuri::Named)]
491pub struct ActorRef<A: Referable> {
492 pub(crate) actor_id: ActorId,
493 phantom: PhantomData<fn() -> A>,
495}
496
497impl<A: Referable> ActorRef<A> {
498 pub fn port<M: RemoteMessage>(&self) -> PortRef<M>
500 where
501 A: RemoteHandles<M>,
502 {
503 PortRef::attest(self.actor_id.port_id(<M as Named>::port()))
504 }
505
506 pub fn send<M: RemoteMessage>(
508 &self,
509 cx: &impl context::Actor,
510 message: M,
511 ) -> Result<(), MailboxSenderError>
512 where
513 A: RemoteHandles<M>,
514 {
515 self.port().send(cx, message)
516 }
517
518 pub fn send_with_headers<M: RemoteMessage>(
521 &self,
522 cx: &impl context::Actor,
523 headers: Flattrs,
524 message: M,
525 ) -> Result<(), MailboxSenderError>
526 where
527 A: RemoteHandles<M>,
528 {
529 self.port().send_with_headers(cx, headers, message)
530 }
531
532 pub fn attest(actor_id: ActorId) -> Self {
537 Self {
538 actor_id,
539 phantom: PhantomData,
540 }
541 }
542
543 pub fn actor_id(&self) -> &ActorId {
545 &self.actor_id
546 }
547
548 pub fn into_actor_id(self) -> ActorId {
550 self.actor_id
551 }
552
553 pub fn downcast_handle(&self, cx: &impl context::Actor) -> Option<ActorHandle<A>>
557 where
558 A: Actor,
559 {
560 cx.instance().proc().resolve_actor_ref(self)
561 }
562}
563
564impl<A: Referable> Serialize for ActorRef<A> {
566 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
567 where
568 S: Serializer,
569 {
570 self.actor_id().serialize(serializer)
572 }
573}
574
575impl<'de, A: Referable> Deserialize<'de> for ActorRef<A> {
577 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
578 where
579 D: Deserializer<'de>,
580 {
581 let actor_id = <ActorId>::deserialize(deserializer)?;
582 Ok(ActorRef {
583 actor_id,
584 phantom: PhantomData,
585 })
586 }
587}
588
589impl<A: Referable> fmt::Debug for ActorRef<A> {
591 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592 f.debug_struct("ActorRef")
593 .field("actor_id", &self.actor_id)
594 .field("type", &std::any::type_name::<A>())
595 .finish()
596 }
597}
598
599impl<A: Referable> fmt::Display for ActorRef<A> {
600 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601 fmt::Display::fmt(&self.actor_id, f)?;
602 write!(f, "<{}>", std::any::type_name::<A>())
603 }
604}
605
606impl<A: Referable> Clone for ActorRef<A> {
608 fn clone(&self) -> Self {
609 Self {
610 actor_id: self.actor_id.clone(),
611 phantom: PhantomData,
612 }
613 }
614}
615
616impl<A: Referable> PartialEq for ActorRef<A> {
617 fn eq(&self, other: &Self) -> bool {
618 self.actor_id == other.actor_id
619 }
620}
621
622impl<A: Referable> Eq for ActorRef<A> {}
623
624impl<A: Referable> PartialOrd for ActorRef<A> {
625 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
626 Some(self.cmp(other))
627 }
628}
629
630impl<A: Referable> Ord for ActorRef<A> {
631 fn cmp(&self, other: &Self) -> Ordering {
632 self.actor_id.cmp(&other.actor_id)
633 }
634}
635
636impl<A: Referable> Hash for ActorRef<A> {
637 fn hash<H: Hasher>(&self, state: &mut H) {
638 self.actor_id.hash(state);
639 }
640}
641
642#[derive(
647 Debug,
648 Serialize,
649 Deserialize,
650 Clone,
651 PartialEq,
652 Eq,
653 PartialOrd,
654 Hash,
655 Ord,
656 typeuri::Named
657)]
658pub struct PortId(ActorId, u64);
659
660impl PortId {
661 pub fn new(actor_id: ActorId, port: u64) -> Self {
663 Self(actor_id, port)
664 }
665
666 pub fn actor_id(&self) -> &ActorId {
668 &self.0
669 }
670
671 pub fn into_actor_id(self) -> ActorId {
673 self.0
674 }
675
676 pub fn index(&self) -> u64 {
678 self.1
679 }
680
681 pub(crate) fn is_actor_port(&self) -> bool {
682 self.1 & ACTOR_PORT_BIT != 0
683 }
684
685 pub fn send(&self, cx: &impl context::Actor, serialized: wirevalue::Any) {
689 let mut headers = Flattrs::new();
690 crate::mailbox::headers::set_send_timestamp(&mut headers);
691 cx.post(
692 self.clone(),
693 headers,
694 serialized,
695 true,
696 context::SeqInfoPolicy::AssignNew,
697 );
698 }
699
700 pub fn send_with_headers(
704 &self,
705 cx: &impl context::Actor,
706 serialized: wirevalue::Any,
707 mut headers: Flattrs,
708 ) {
709 crate::mailbox::headers::set_send_timestamp(&mut headers);
710 cx.post(
711 self.clone(),
712 headers,
713 serialized,
714 true,
715 context::SeqInfoPolicy::AssignNew,
716 );
717 }
718
719 pub fn split(
722 &self,
723 cx: &impl context::Actor,
724 reducer_spec: Option<ReducerSpec>,
725 reducer_mode: ReducerMode,
726 return_undeliverable: bool,
727 ) -> anyhow::Result<PortId> {
728 cx.split(
729 self.clone(),
730 reducer_spec,
731 reducer_mode,
732 return_undeliverable,
733 )
734 }
735}
736
737impl FromStr for PortId {
738 type Err = ReferenceParsingError;
739
740 fn from_str(addr: &str) -> Result<Self, Self::Err> {
741 match addr.parse()? {
742 Reference::Port(port_id) => Ok(port_id),
743 _ => Err(ReferenceParsingError::WrongType("port".into())),
744 }
745 }
746}
747
748impl fmt::Display for PortId {
749 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
750 let PortId(actor_id, port) = self;
751 if self.is_actor_port() {
752 let type_info = TypeInfo::get(*port).or_else(|| TypeInfo::get(*port & !ACTOR_PORT_BIT));
753 let typename = type_info.map_or("unknown", TypeInfo::typename);
754 write!(f, "{}[{}<{}>]", actor_id, port, typename)
755 } else {
756 write!(f, "{}[{}]", actor_id, port)
757 }
758 }
759}
760
761#[derive(Debug, Serialize, Deserialize, Derivative, typeuri::Named)]
764#[derivative(PartialEq, Eq, PartialOrd, Hash, Ord)]
765pub struct PortRef<M> {
766 port_id: PortId,
767 #[derivative(
768 PartialEq = "ignore",
769 PartialOrd = "ignore",
770 Ord = "ignore",
771 Hash = "ignore"
772 )]
773 reducer_spec: Option<ReducerSpec>,
774 #[derivative(
775 PartialEq = "ignore",
776 PartialOrd = "ignore",
777 Ord = "ignore",
778 Hash = "ignore"
779 )]
780 streaming_opts: StreamingReducerOpts,
781 phantom: PhantomData<M>,
782 return_undeliverable: bool,
783}
784
785impl<M: RemoteMessage> PortRef<M> {
786 pub fn attest(port_id: PortId) -> Self {
789 Self {
790 port_id,
791 reducer_spec: None,
792 streaming_opts: StreamingReducerOpts::default(),
793 phantom: PhantomData,
794 return_undeliverable: true,
795 }
796 }
797
798 pub fn attest_reducible(
801 port_id: PortId,
802 reducer_spec: Option<ReducerSpec>,
803 streaming_opts: StreamingReducerOpts,
804 ) -> Self {
805 Self {
806 port_id,
807 reducer_spec,
808 streaming_opts,
809 phantom: PhantomData,
810 return_undeliverable: true,
811 }
812 }
813
814 pub fn attest_message_port(actor: &ActorId) -> Self {
817 PortRef::<M>::attest(actor.port_id(<M as Named>::port()))
818 }
819
820 pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
823 &self.reducer_spec
824 }
825
826 pub fn port_id(&self) -> &PortId {
828 &self.port_id
829 }
830
831 pub fn into_port_id(self) -> PortId {
833 self.port_id
834 }
835
836 pub fn into_once(self) -> OncePortRef<M> {
839 let return_undeliverable = self.return_undeliverable;
840 let mut once = OncePortRef::attest(self.into_port_id());
841 once.return_undeliverable = return_undeliverable;
842 once
843 }
844
845 pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
848 self.send_with_headers(cx, Flattrs::new(), message)
849 }
850
851 pub fn send_with_headers(
855 &self,
856 cx: &impl context::Actor,
857 headers: Flattrs,
858 message: M,
859 ) -> Result<(), MailboxSenderError> {
860 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
861 MailboxSenderError::new_bound(
862 self.port_id.clone(),
863 MailboxSenderErrorKind::Serialize(err.into()),
864 )
865 })?;
866 self.send_serialized(cx, headers, serialized);
867 Ok(())
868 }
869
870 pub fn send_serialized(
873 &self,
874 cx: &impl context::Actor,
875 mut headers: Flattrs,
876 message: wirevalue::Any,
877 ) {
878 crate::mailbox::headers::set_send_timestamp(&mut headers);
879 crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
880 cx.post(
881 self.port_id.clone(),
882 headers,
883 message,
884 self.return_undeliverable,
885 context::SeqInfoPolicy::AssignNew,
886 );
887 }
888
889 pub fn into_sink<C: context::Actor>(self, cx: C) -> PortSink<C, M> {
891 PortSink::new(cx, self)
892 }
893
894 pub fn get_return_undeliverable(&self) -> bool {
897 self.return_undeliverable
898 }
899
900 pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
903 self.return_undeliverable = return_undeliverable;
904 }
905}
906
907impl<M: RemoteMessage> Clone for PortRef<M> {
908 fn clone(&self) -> Self {
909 Self {
910 port_id: self.port_id.clone(),
911 reducer_spec: self.reducer_spec.clone(),
912 streaming_opts: self.streaming_opts.clone(),
913 phantom: PhantomData,
914 return_undeliverable: self.return_undeliverable,
915 }
916 }
917}
918
919impl<M: RemoteMessage> fmt::Display for PortRef<M> {
920 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
921 fmt::Display::fmt(&self.port_id, f)
922 }
923}
924
925#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
927pub enum UnboundPortKind {
928 Streaming(Option<StreamingReducerOpts>),
930 Once,
932}
933
934#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
936pub struct UnboundPort(
937 pub PortId,
938 pub Option<ReducerSpec>,
939 pub bool, pub UnboundPortKind,
941);
942wirevalue::register_type!(UnboundPort);
943
944impl UnboundPort {
945 pub fn update(&mut self, port_id: PortId) {
947 self.0 = port_id;
948 }
949}
950
951impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
952 fn from(port_ref: &PortRef<M>) -> Self {
953 UnboundPort(
954 port_ref.port_id.clone(),
955 port_ref.reducer_spec.clone(),
956 port_ref.return_undeliverable,
957 UnboundPortKind::Streaming(Some(port_ref.streaming_opts.clone())),
958 )
959 }
960}
961
962impl<M: RemoteMessage> Unbind for PortRef<M> {
963 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
964 bindings.push_back(&UnboundPort::from(self))
965 }
966}
967
968impl<M: RemoteMessage> Bind for PortRef<M> {
969 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
970 let UnboundPort(port_id, reducer_spec, return_undeliverable, port_kind) =
971 bindings.try_pop_front::<UnboundPort>()?;
972 self.port_id = port_id;
973 self.reducer_spec = reducer_spec;
974 self.return_undeliverable = return_undeliverable;
975 self.streaming_opts = match port_kind {
976 UnboundPortKind::Streaming(opts) => opts.unwrap_or_default(),
977 UnboundPortKind::Once => {
978 anyhow::bail!("OncePortRef cannot be bound to PortRef")
979 }
980 };
981 Ok(())
982 }
983}
984
985#[derive(Debug, Serialize, Deserialize, PartialEq)]
989pub struct OncePortRef<M> {
990 port_id: PortId,
991 reducer_spec: Option<ReducerSpec>,
992 return_undeliverable: bool,
993 phantom: PhantomData<M>,
994}
995
996impl<M: RemoteMessage> OncePortRef<M> {
997 pub(crate) fn attest(port_id: PortId) -> Self {
998 Self {
999 port_id,
1000 reducer_spec: None,
1001 return_undeliverable: true,
1002 phantom: PhantomData,
1003 }
1004 }
1005
1006 pub fn attest_reducible(port_id: PortId, reducer_spec: Option<ReducerSpec>) -> Self {
1009 Self {
1010 port_id,
1011 reducer_spec,
1012 return_undeliverable: true,
1013 phantom: PhantomData,
1014 }
1015 }
1016
1017 pub fn reducer_spec(&self) -> &Option<ReducerSpec> {
1019 &self.reducer_spec
1020 }
1021
1022 pub fn port_id(&self) -> &PortId {
1024 &self.port_id
1025 }
1026
1027 pub fn into_port_id(self) -> PortId {
1029 self.port_id
1030 }
1031
1032 pub fn send(self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1035 self.send_with_headers(cx, Flattrs::new(), message)
1036 }
1037
1038 pub fn send_with_headers(
1041 self,
1042 cx: &impl context::Actor,
1043 mut headers: Flattrs,
1044 message: M,
1045 ) -> Result<(), MailboxSenderError> {
1046 crate::mailbox::headers::set_send_timestamp(&mut headers);
1047 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
1048 MailboxSenderError::new_bound(
1049 self.port_id.clone(),
1050 MailboxSenderErrorKind::Serialize(err.into()),
1051 )
1052 })?;
1053 cx.post(
1054 self.port_id.clone(),
1055 headers,
1056 serialized,
1057 self.return_undeliverable,
1058 context::SeqInfoPolicy::AssignNew,
1059 );
1060 Ok(())
1061 }
1062
1063 pub fn get_return_undeliverable(&self) -> bool {
1066 self.return_undeliverable
1067 }
1068
1069 pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
1072 self.return_undeliverable = return_undeliverable;
1073 }
1074}
1075
1076impl<M: RemoteMessage> Clone for OncePortRef<M> {
1077 fn clone(&self) -> Self {
1078 Self {
1079 port_id: self.port_id.clone(),
1080 reducer_spec: self.reducer_spec.clone(),
1081 return_undeliverable: self.return_undeliverable,
1082 phantom: PhantomData,
1083 }
1084 }
1085}
1086
1087impl<M: RemoteMessage> fmt::Display for OncePortRef<M> {
1088 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1089 fmt::Display::fmt(&self.port_id, f)
1090 }
1091}
1092
1093impl<M: RemoteMessage> Named for OncePortRef<M> {
1094 fn typename() -> &'static str {
1095 wirevalue::intern_typename!(Self, "hyperactor::mailbox::OncePortRef<{}>", M)
1096 }
1097}
1098
1099impl<M: RemoteMessage> From<&OncePortRef<M>> for UnboundPort {
1100 fn from(port_ref: &OncePortRef<M>) -> Self {
1101 UnboundPort(
1102 port_ref.port_id.clone(),
1103 port_ref.reducer_spec.clone(),
1104 true, UnboundPortKind::Once,
1106 )
1107 }
1108}
1109
1110impl<M: RemoteMessage> Unbind for OncePortRef<M> {
1111 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
1112 bindings.push_back(&UnboundPort::from(self))
1113 }
1114}
1115
1116impl<M: RemoteMessage> Bind for OncePortRef<M> {
1117 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
1118 let UnboundPort(port_id, reducer_spec, _return_undeliverable, port_kind) =
1119 bindings.try_pop_front::<UnboundPort>()?;
1120 match port_kind {
1121 UnboundPortKind::Once => {
1122 self.port_id = port_id;
1123 self.reducer_spec = reducer_spec;
1124 Ok(())
1125 }
1126 UnboundPortKind::Streaming(_) => {
1127 anyhow::bail!("PortRef cannot be bound to OncePortRef")
1128 }
1129 }
1130 }
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135 use rand::seq::SliceRandom;
1136 use rand::thread_rng;
1137 use tokio::sync::mpsc;
1138 use uuid::Uuid;
1139
1140 use super::*;
1141 use crate::Proc;
1143 use crate::context::Mailbox as _;
1144 use crate::mailbox::PortLocation;
1145 use crate::ordering::SEQ_INFO;
1146 use crate::ordering::SeqInfo;
1147
1148 #[test]
1149 fn test_reference_parse() {
1150 let cases: Vec<(&str, Reference)> = vec![
1151 (
1152 "tcp:[::1]:1234,test",
1153 ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test").into(),
1154 ),
1155 (
1156 "tcp:[::1]:1234,test,testactor[123]",
1157 ActorId::new(
1158 ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1159 "testactor",
1160 123,
1161 )
1162 .into(),
1163 ),
1164 (
1165 "tcp:[::1]:1234,test,testactor[0][123<my::type>]",
1167 PortId::new(
1168 ActorId::new(
1169 ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1170 "testactor",
1171 0,
1172 ),
1173 123,
1174 )
1175 .into(),
1176 ),
1177 ];
1178
1179 for (s, expected) in cases {
1180 assert_eq!(s.parse::<Reference>().unwrap(), expected, "for {}", s);
1181 }
1182 }
1183
1184 #[test]
1185 fn test_reference_parse_error() {
1186 let cases: Vec<&str> = vec!["(blah)", "world(1, 2, 3)", "test"];
1187
1188 for s in cases {
1189 let result: Result<Reference, ReferenceParsingError> = s.parse();
1190 assert!(result.is_err(), "expected error for: {}", s);
1191 }
1192 }
1193
1194 #[test]
1195 fn test_reference_ord() {
1196 let expected: Vec<Reference> = [
1197 "tcp:[::1]:1234,first",
1198 "tcp:[::1]:1234,second",
1199 "tcp:[::1]:1234,third",
1200 ]
1201 .into_iter()
1202 .map(|s| s.parse().unwrap())
1203 .collect();
1204
1205 let mut sorted = expected.to_vec();
1206 sorted.shuffle(&mut thread_rng());
1207 sorted.sort();
1208
1209 assert_eq!(sorted, expected);
1210 }
1211
1212 #[test]
1213 fn test_port_type_annotation() {
1214 #[derive(typeuri::Named, Serialize, Deserialize)]
1215 struct MyType;
1216 wirevalue::register_type!(MyType);
1217 let port_id = PortId::new(
1218 ActorId::new(
1219 ProcId::with_name("tcp:[::1]:1234".parse::<ChannelAddr>().unwrap(), "test"),
1220 "testactor",
1221 1,
1222 ),
1223 MyType::port(),
1224 );
1225 assert_eq!(
1226 port_id.to_string(),
1227 "tcp:[::1]:1234,test,testactor[1][17867850292987402005<hyperactor::reference::tests::MyType>]"
1228 );
1229 }
1230
1231 #[tokio::test]
1232 async fn test_sequencing_from_port_handle_ref_and_id() {
1233 let proc = Proc::local();
1234 let (client, _) = proc.instance("client").unwrap();
1235 let (tx, mut rx) = mpsc::unbounded_channel();
1236 let port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1237 let seq_info = headers.get(SEQ_INFO);
1238 tx.send(seq_info).unwrap();
1239 Ok(())
1240 });
1241 port_handle.send(&client, ()).unwrap();
1242 assert_eq!(rx.try_recv().unwrap().unwrap(), SeqInfo::Direct);
1245
1246 port_handle.bind_actor_port();
1247 let port_id = match port_handle.location() {
1248 PortLocation::Bound(port_id) => port_id,
1249 _ => panic!("port_handle should be bound"),
1250 };
1251 assert!(port_id.is_actor_port());
1252 let port_ref = PortRef::attest(port_id.clone());
1253
1254 port_handle.send(&client, ()).unwrap();
1255 let SeqInfo::Session {
1256 session_id,
1257 mut seq,
1258 } = rx.try_recv().unwrap().unwrap()
1259 else {
1260 panic!("expected session info");
1261 };
1262 assert_eq!(session_id, client.sequencer().session_id());
1263 assert_eq!(seq, 1);
1264
1265 fn assert_seq_info(
1266 rx: &mut mpsc::UnboundedReceiver<Option<SeqInfo>>,
1267 session_id: Uuid,
1268 seq: &mut u64,
1269 ) {
1270 *seq += 1;
1271 let SeqInfo::Session {
1272 session_id: rcved_session_id,
1273 seq: rcved_seq,
1274 } = rx.try_recv().unwrap().unwrap()
1275 else {
1276 panic!("expected session info");
1277 };
1278 assert_eq!(rcved_session_id, session_id);
1279 assert_eq!(rcved_seq, *seq);
1280 }
1281
1282 for _ in 0..10 {
1284 port_handle.send(&client, ()).unwrap();
1286 assert_seq_info(&mut rx, session_id, &mut seq);
1287
1288 for _ in 0..2 {
1290 port_ref.send(&client, ()).unwrap();
1291 assert_seq_info(&mut rx, session_id, &mut seq);
1292 }
1293
1294 for _ in 0..3 {
1296 port_id.send(&client, wirevalue::Any::serialize(&()).unwrap());
1297 assert_seq_info(&mut rx, session_id, &mut seq);
1298 }
1299 }
1300
1301 assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1302 }
1303
1304 #[tokio::test]
1305 async fn test_sequencing_from_port_handle_bound_to_allocated_port() {
1306 let proc = Proc::local();
1307 let (client, _) = proc.instance("client").unwrap();
1308 let (tx, mut rx) = mpsc::unbounded_channel();
1309 let port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1310 let seq_info = headers.get(SEQ_INFO);
1311 tx.send(seq_info).unwrap();
1312 Ok(())
1313 });
1314 port_handle.send(&client, ()).unwrap();
1315 assert_eq!(rx.try_recv().unwrap().unwrap(), SeqInfo::Direct);
1318
1319 port_handle.bind();
1321 let port_id = match port_handle.location() {
1322 PortLocation::Bound(port_id) => port_id,
1323 _ => panic!("port_handle should be bound"),
1324 };
1325 assert!(!port_id.is_actor_port());
1327
1328 port_handle.send(&client, ()).unwrap();
1330 let SeqInfo::Session {
1331 session_id,
1332 seq: seq1,
1333 } = rx
1334 .try_recv()
1335 .unwrap()
1336 .expect("non-actor port should have seq info")
1337 else {
1338 panic!("expected Session variant");
1339 };
1340 assert_eq!(seq1, 1);
1341 assert_eq!(session_id, client.sequencer().session_id());
1342
1343 let port_ref = PortRef::attest(port_id.clone());
1344 port_ref.send(&client, ()).unwrap();
1345 let SeqInfo::Session { seq: seq2, .. } = rx
1346 .try_recv()
1347 .unwrap()
1348 .expect("non-actor port should have seq info")
1349 else {
1350 panic!("expected Session variant");
1351 };
1352 assert_eq!(seq2, 2);
1353
1354 port_id.send(&client, wirevalue::Any::serialize(&()).unwrap());
1355 let SeqInfo::Session { seq: seq3, .. } = rx
1356 .try_recv()
1357 .unwrap()
1358 .expect("non-actor port should have seq info")
1359 else {
1360 panic!("expected Session variant");
1361 };
1362 assert_eq!(seq3, 3);
1363 }
1364}