1use std::fmt;
34use std::str::FromStr;
35
36use enum_as_inner::EnumAsInner;
37use serde::Deserialize;
38use serde::Serialize;
39
40use crate::channel::ChannelAddr;
41use crate::context::MailboxExt;
42use crate::id;
43use crate::id::ActorId;
44use crate::id::IdParseError;
45use crate::id::Label;
46use crate::id::PortId;
47use crate::id::ProcId;
48use crate::id::Uid;
49use crate::parse;
50use crate::port::Port;
51
52#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
54pub struct Location(ChannelAddr);
55
56impl Location {
57 pub fn addr(&self) -> &ChannelAddr {
59 &self.0
60 }
61}
62
63impl From<ChannelAddr> for Location {
64 fn from(addr: ChannelAddr) -> Self {
65 Self(addr)
66 }
67}
68
69impl fmt::Display for Location {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.write_str(&self.0.to_zmq_url())
72 }
73}
74
75impl fmt::Debug for Location {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 fmt::Display::fmt(self, f)
78 }
79}
80
81impl FromStr for Location {
82 type Err = anyhow::Error;
83
84 fn from_str(s: &str) -> Result<Self, Self::Err> {
85 ChannelAddr::from_zmq_url(s).map(Self)
86 }
87}
88
89#[derive(Debug, thiserror::Error)]
91pub enum AddrParseError {
92 #[error("missing '@' separator between id and location")]
94 MissingSeparator,
95 #[error("invalid id: {0}")]
97 InvalidId(#[from] IdParseError),
98 #[error("invalid location: {0}")]
100 InvalidLocation(#[source] anyhow::Error),
101}
102
103#[derive(Clone, Serialize, Deserialize, typeuri::Named)]
105pub struct ProcAddr {
106 id: ProcId,
107 location: Location,
108}
109
110impl ProcAddr {
111 pub fn new(id: ProcId, location: Location) -> Self {
113 Self { id, location }
114 }
115
116 pub fn id(&self) -> &ProcId {
118 &self.id
119 }
120
121 pub fn location(&self) -> &Location {
123 &self.location
124 }
125
126 pub fn addr(&self) -> &ChannelAddr {
128 self.location.addr()
129 }
130
131 pub fn uid(&self) -> &Uid {
133 self.id.uid()
134 }
135
136 pub fn label(&self) -> Option<&Label> {
139 self.id.label()
140 }
141
142 pub fn anonymous(addr: ChannelAddr) -> Self {
144 Self::new(id::ProcId::anonymous(), Location::from(addr))
145 }
146
147 pub fn instance(addr: ChannelAddr, base_name: impl AsRef<str>) -> Self {
149 let label = Label::strip(base_name.as_ref());
150 Self::new(id::ProcId::instance(label), Location::from(addr))
151 }
152
153 pub fn singleton(addr: ChannelAddr, name: impl AsRef<str>) -> Self {
155 Self::new(
156 id::ProcId::singleton(Label::strip(name.as_ref())),
157 Location::from(addr),
158 )
159 }
160
161 pub fn actor_addr(&self, name: impl AsRef<str>) -> ActorAddr {
163 let uid = Uid::singleton(Label::strip(name.as_ref()));
164 self.actor_addr_uid(uid)
165 }
166
167 pub fn actor_addr_uid(&self, uid: Uid) -> ActorAddr {
169 ActorAddr::new_from_uid(self.clone(), uid)
170 }
171
172 pub fn log_name(&self) -> &str {
174 self.label().map(|l| l.as_str()).unwrap_or("?")
175 }
176}
177
178impl PartialEq for ProcAddr {
179 fn eq(&self, other: &Self) -> bool {
180 self.id == other.id && self.location == other.location
181 }
182}
183
184impl Eq for ProcAddr {}
185
186impl std::hash::Hash for ProcAddr {
187 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
188 self.id.hash(state);
189 self.location.hash(state);
190 }
191}
192
193impl PartialOrd for ProcAddr {
194 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
195 Some(self.cmp(other))
196 }
197}
198
199impl Ord for ProcAddr {
200 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
201 self.id
202 .cmp(&other.id)
203 .then_with(|| self.location.cmp(&other.location))
204 }
205}
206
207impl fmt::Display for ProcAddr {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 write!(f, "{}@{}", self.id, self.location)
210 }
211}
212
213impl fmt::Debug for ProcAddr {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 match self.id.label() {
216 Some(label) => write!(f, "<'{}' {}@{}>", label, self.id, self.location),
217 None => write!(f, "<{}@{}>", self.id, self.location),
218 }
219 }
220}
221
222impl FromStr for ProcAddr {
223 type Err = AddrParseError;
224
225 fn from_str(s: &str) -> Result<Self, Self::Err> {
226 parse::addr::parse_proc_addr(s).map_err(|_| legacy_parse_proc_ref(s))
227 }
228}
229
230#[derive(Clone, Serialize, Deserialize, typeuri::Named)]
232pub struct ActorAddr {
233 id: ActorId,
234 location: Location,
235}
236
237hyperactor_config::impl_attrvalue!(ActorAddr);
238
239impl ActorAddr {
240 pub fn new(id: ActorId, location: Location) -> Self {
242 Self { id, location }
243 }
244
245 pub fn new_from_uid(proc_ref: ProcAddr, uid: Uid) -> Self {
247 let actor_id = id::ActorId::new(uid, proc_ref.id.clone(), None);
248 Self::new(actor_id, proc_ref.location)
249 }
250
251 pub fn id(&self) -> &ActorId {
253 &self.id
254 }
255
256 pub fn proc_id(&self) -> &ProcId {
258 self.id.proc_id()
259 }
260
261 pub fn location(&self) -> &Location {
263 &self.location
264 }
265
266 pub fn addr(&self) -> &ChannelAddr {
268 self.location.addr()
269 }
270
271 pub fn uid(&self) -> &Uid {
273 self.id.uid()
274 }
275
276 pub fn label(&self) -> Option<&Label> {
279 self.id.label()
280 }
281
282 pub fn proc_addr(&self) -> ProcAddr {
284 ProcAddr::new(self.id.proc_id().clone(), self.location.clone())
285 }
286
287 pub fn port_addr(&self, port: Port) -> PortAddr {
289 PortAddr::new(
290 id::PortId::new(self.id.clone(), port),
291 self.location.clone(),
292 )
293 }
294
295 pub fn root(proc_ref: ProcAddr, label: impl Into<Label>) -> Self {
297 let label = label.into();
298 let actor_id = id::ActorId::singleton(label, proc_ref.id.clone());
299 Self::new(actor_id, proc_ref.location)
300 }
301
302 pub fn anonymous_child(&self) -> Self {
304 let child_id = id::ActorId::anonymous(self.id.proc_id().clone());
305 Self::new(child_id, self.location.clone())
306 }
307
308 pub fn is_root(&self) -> bool {
310 self.id.uid().is_singleton()
311 }
312
313 pub fn log_name(&self) -> &str {
315 self.label().map(|l| l.as_str()).unwrap_or("?")
316 }
317}
318
319impl PartialEq for ActorAddr {
320 fn eq(&self, other: &Self) -> bool {
321 self.id == other.id && self.location == other.location
322 }
323}
324
325impl Eq for ActorAddr {}
326
327impl std::hash::Hash for ActorAddr {
328 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
329 self.id.hash(state);
330 self.location.hash(state);
331 }
332}
333
334impl PartialOrd for ActorAddr {
335 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
336 Some(self.cmp(other))
337 }
338}
339
340impl Ord for ActorAddr {
341 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
342 self.id
343 .cmp(&other.id)
344 .then_with(|| self.location.cmp(&other.location))
345 }
346}
347
348impl fmt::Display for ActorAddr {
349 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350 write!(f, "{}@{}", self.id, self.location)
351 }
352}
353
354impl fmt::Debug for ActorAddr {
355 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356 match (self.id.label(), self.id.proc_id().label()) {
357 (Some(actor_label), Some(proc_label)) => {
358 write!(
359 f,
360 "<'{}.{}' {}@{}>",
361 actor_label, proc_label, self.id, self.location
362 )
363 }
364 (Some(actor_label), None) => {
365 write!(f, "<'{}' {}@{}>", actor_label, self.id, self.location)
366 }
367 (None, Some(proc_label)) => {
368 write!(f, "<'.{}' {}@{}>", proc_label, self.id, self.location)
369 }
370 (None, None) => {
371 write!(f, "<{}@{}>", self.id, self.location)
372 }
373 }
374 }
375}
376
377impl FromStr for ActorAddr {
378 type Err = AddrParseError;
379
380 fn from_str(s: &str) -> Result<Self, Self::Err> {
381 parse::addr::parse_actor_addr(s).map_err(|_| legacy_parse_actor_ref(s))
382 }
383}
384
385#[derive(Clone, Serialize, Deserialize, typeuri::Named)]
387pub struct PortAddr {
388 id: PortId,
389 location: Location,
390}
391
392impl PortAddr {
393 pub fn new(id: PortId, location: Location) -> Self {
395 Self { id, location }
396 }
397
398 pub fn id(&self) -> &PortId {
400 &self.id
401 }
402
403 pub fn location(&self) -> &Location {
405 &self.location
406 }
407
408 pub fn actor_id(&self) -> &ActorId {
410 self.id.actor_id()
411 }
412
413 pub fn proc_id(&self) -> &ProcId {
415 self.id.actor_id().proc_id()
416 }
417
418 pub(crate) fn is_handler_port(&self) -> bool {
420 self.id.port().is_handler()
421 }
422
423 pub fn index(&self) -> u64 {
425 self.id.port().as_u64()
426 }
427
428 pub fn actor_addr(&self) -> ActorAddr {
430 ActorAddr::new(self.id.actor_id().clone(), self.location.clone())
431 }
432
433 pub fn actor_ref(&self) -> ActorAddr {
435 self.actor_addr()
436 }
437
438 pub fn send(&self, cx: &impl crate::context::Actor, serialized: wirevalue::Any) {
440 let mut headers = hyperactor_config::Flattrs::new();
441 crate::mailbox::headers::set_send_timestamp(&mut headers);
442 cx.post(
443 self.clone(),
444 headers,
445 serialized,
446 true,
447 crate::context::SeqInfoPolicy::AssignNew,
448 );
449 }
450
451 pub fn send_with_headers(
453 &self,
454 cx: &impl crate::context::Actor,
455 serialized: wirevalue::Any,
456 mut headers: hyperactor_config::Flattrs,
457 ) {
458 crate::mailbox::headers::set_send_timestamp(&mut headers);
459 cx.post(
460 self.clone(),
461 headers,
462 serialized,
463 true,
464 crate::context::SeqInfoPolicy::AssignNew,
465 );
466 }
467
468 pub fn split(
470 &self,
471 cx: &impl crate::context::Actor,
472 reducer_spec: Option<crate::accum::ReducerSpec>,
473 reducer_mode: crate::accum::ReducerMode,
474 return_undeliverable: bool,
475 ) -> anyhow::Result<PortAddr> {
476 cx.split(
477 self.clone(),
478 reducer_spec,
479 reducer_mode,
480 return_undeliverable,
481 )
482 }
483}
484
485impl PartialEq for PortAddr {
486 fn eq(&self, other: &Self) -> bool {
487 self.id == other.id && self.location == other.location
488 }
489}
490
491impl Eq for PortAddr {}
492
493impl std::hash::Hash for PortAddr {
494 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
495 self.id.hash(state);
496 self.location.hash(state);
497 }
498}
499
500impl PartialOrd for PortAddr {
501 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
502 Some(self.cmp(other))
503 }
504}
505
506impl Ord for PortAddr {
507 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
508 self.id
509 .cmp(&other.id)
510 .then_with(|| self.location.cmp(&other.location))
511 }
512}
513
514impl fmt::Display for PortAddr {
515 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516 write!(f, "{}@{}", self.id, self.location)
517 }
518}
519
520impl fmt::Debug for PortAddr {
521 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
522 match (
523 self.id.actor_id().label(),
524 self.id.actor_id().proc_id().label(),
525 ) {
526 (Some(actor_label), Some(proc_label)) => {
527 write!(
528 f,
529 "<'{}.{}' {}@{}>",
530 actor_label, proc_label, self.id, self.location
531 )
532 }
533 (Some(actor_label), None) => {
534 write!(f, "<'{}' {}@{}>", actor_label, self.id, self.location)
535 }
536 (None, Some(proc_label)) => {
537 write!(f, "<'.{}' {}@{}>", proc_label, self.id, self.location)
538 }
539 (None, None) => {
540 write!(f, "<{}@{}>", self.id, self.location)
541 }
542 }
543 }
544}
545
546impl FromStr for PortAddr {
547 type Err = AddrParseError;
548
549 fn from_str(s: &str) -> Result<Self, Self::Err> {
550 parse::addr::parse_port_addr(s).map_err(|_| legacy_parse_port_ref(s))
551 }
552}
553
554#[derive(Debug, Clone, EnumAsInner, PartialEq, Eq, Hash, Serialize, Deserialize)]
560pub enum Addr {
561 Proc(ProcAddr),
563 Actor(ActorAddr),
565 Port(PortAddr),
567}
568
569impl Addr {
570 pub fn is_prefix_of(&self, other: &Self) -> bool {
575 match (self, other) {
576 (Self::Proc(p), Self::Actor(a)) => *p == a.proc_addr(),
577 (Self::Proc(p), Self::Port(pt)) => *p == pt.actor_addr().proc_addr(),
578 (Self::Actor(a), Self::Port(pt)) => *a == pt.actor_addr(),
579 (Self::Proc(p1), Self::Proc(p2)) => p1 == p2,
580 (Self::Actor(a1), Self::Actor(a2)) => a1 == a2,
581 (Self::Port(p1), Self::Port(p2)) => p1 == p2,
582 _ => false,
583 }
584 }
585
586 pub fn proc_addr(&self) -> ProcAddr {
588 match self {
589 Self::Proc(p) => p.clone(),
590 Self::Actor(a) => a.proc_addr(),
591 Self::Port(p) => p.actor_addr().proc_addr(),
592 }
593 }
594}
595
596impl PartialOrd for Addr {
597 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
598 Some(self.cmp(other))
599 }
600}
601
602impl Ord for Addr {
603 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
604 let proc_ord = self.proc_addr().cmp(&other.proc_addr());
606 if proc_ord != std::cmp::Ordering::Equal {
607 return proc_ord;
608 }
609 let self_actor_uid = match self {
610 Self::Proc(_) => None,
611 Self::Actor(a) => Some(a.uid()),
612 Self::Port(p) => Some(p.actor_id().uid()),
613 };
614 let other_actor_uid = match other {
615 Self::Proc(_) => None,
616 Self::Actor(a) => Some(a.uid()),
617 Self::Port(p) => Some(p.actor_id().uid()),
618 };
619 let actor_ord = self_actor_uid.cmp(&other_actor_uid);
620 if actor_ord != std::cmp::Ordering::Equal {
621 return actor_ord;
622 }
623 let self_port = match self {
624 Self::Port(p) => Some(p.id().port()),
625 _ => None,
626 };
627 let other_port = match other {
628 Self::Port(p) => Some(p.id().port()),
629 _ => None,
630 };
631 self_port.cmp(&other_port)
632 }
633}
634
635impl fmt::Display for Addr {
636 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
637 match self {
638 Self::Proc(p) => fmt::Display::fmt(p, f),
639 Self::Actor(a) => fmt::Display::fmt(a, f),
640 Self::Port(p) => fmt::Display::fmt(p, f),
641 }
642 }
643}
644
645impl FromStr for Addr {
646 type Err = AddrParseError;
647
648 fn from_str(s: &str) -> Result<Self, Self::Err> {
649 parse::addr::parse_addr(s).map_err(|_| legacy_parse_reference(s))
650 }
651}
652
653fn split_ref_input(s: &str) -> Result<(&str, &str), AddrParseError> {
654 let Some((id_text, location_text)) = s.split_once('@') else {
655 return Err(AddrParseError::MissingSeparator);
656 };
657 Ok((id_text, location_text))
658}
659
660fn legacy_parse_proc_ref(s: &str) -> AddrParseError {
661 let Ok((id_text, location_text)) = split_ref_input(s) else {
662 return AddrParseError::MissingSeparator;
663 };
664 if let Err(err) = id_text.parse::<ProcId>() {
665 return AddrParseError::InvalidId(err);
666 }
667 let location = if location_text.is_empty() {
668 "@"
669 } else {
670 location_text
671 };
672 let err = location.parse::<Location>().unwrap_err();
673 AddrParseError::InvalidLocation(err)
674}
675
676fn legacy_parse_actor_ref(s: &str) -> AddrParseError {
677 let Ok((id_text, location_text)) = split_ref_input(s) else {
678 return AddrParseError::MissingSeparator;
679 };
680 if let Err(err) = id_text.parse::<ActorId>() {
681 return AddrParseError::InvalidId(err);
682 }
683 let location = if location_text.is_empty() {
684 "@"
685 } else {
686 location_text
687 };
688 let err = location.parse::<Location>().unwrap_err();
689 AddrParseError::InvalidLocation(err)
690}
691
692fn legacy_parse_port_ref(s: &str) -> AddrParseError {
693 let Ok((id_text, location_text)) = split_ref_input(s) else {
694 return AddrParseError::MissingSeparator;
695 };
696 if let Err(err) = id_text.parse::<PortId>() {
697 return AddrParseError::InvalidId(err);
698 }
699 let location = if location_text.is_empty() {
700 "@"
701 } else {
702 location_text
703 };
704 let err = location.parse::<Location>().unwrap_err();
705 AddrParseError::InvalidLocation(err)
706}
707
708fn legacy_parse_reference(s: &str) -> AddrParseError {
709 let Ok((id_text, location_text)) = split_ref_input(s) else {
710 return AddrParseError::MissingSeparator;
711 };
712 let location = if location_text.is_empty() {
713 "@"
714 } else {
715 location_text
716 };
717 let location_err = || {
718 let err = location.parse::<Location>().unwrap_err();
719 AddrParseError::InvalidLocation(err)
720 };
721
722 let port_result = id_text.parse::<PortId>();
723 if port_result.is_ok() {
724 return location_err();
725 }
726 let actor_result = id_text.parse::<ActorId>();
727 if actor_result.is_ok() {
728 return location_err();
729 }
730 let proc_result = id_text.parse::<ProcId>();
731 if proc_result.is_ok() {
732 return location_err();
733 }
734
735 if id_text.contains(':') {
736 return AddrParseError::InvalidId(port_result.unwrap_err());
737 }
738 if id_text.contains('.') {
739 return AddrParseError::InvalidId(actor_result.unwrap_err());
740 }
741
742 AddrParseError::InvalidId(proc_result.unwrap_err())
743}
744
745impl From<ProcAddr> for Addr {
746 fn from(p: ProcAddr) -> Self {
747 Self::Proc(p)
748 }
749}
750
751impl From<ActorAddr> for Addr {
752 fn from(a: ActorAddr) -> Self {
753 Self::Actor(a)
754 }
755}
756
757impl From<PortAddr> for Addr {
758 fn from(p: PortAddr) -> Self {
759 Self::Port(p)
760 }
761}
762
763#[cfg(test)]
764mod tests {
765 use std::hash::Hash;
766
767 use super::*;
768 use crate::id::Label;
769 use crate::id::Uid;
770 use crate::port::Port;
771
772 #[test]
773 fn test_location_display_fromstr_roundtrip() {
774 let loc: Location = ChannelAddr::Local(42).into();
775 let s = loc.to_string();
776 assert_eq!(s, "inproc://42");
777 let parsed: Location = s.parse().unwrap();
778 assert_eq!(loc, parsed);
779 }
780
781 #[test]
782 fn test_location_tcp() {
783 let addr: ChannelAddr = "tcp:127.0.0.1:8080".parse().unwrap();
784 let loc = Location::from(addr.clone());
785 assert_eq!(loc.to_string(), "tcp://127.0.0.1:8080");
786 assert_eq!(loc.addr(), &addr);
787 }
788
789 #[test]
790 fn test_location_debug_same_as_display() {
791 let loc: Location = ChannelAddr::Local(7).into();
792 assert_eq!(format!("{:?}", loc), format!("{}", loc));
793 }
794
795 #[test]
796 fn test_proc_ref_display() {
797 let pid = ProcId::new(
798 Uid::Instance(0xabc123, None),
799 Some(Label::new("my-proc").unwrap()),
800 );
801 let loc: Location = ChannelAddr::Local(42).into();
802 let pref = ProcAddr::new(pid, loc);
803 assert_eq!(pref.to_string(), format!("{}@inproc://42", pref.id()));
804 }
805
806 #[test]
807 fn test_proc_addr_identity_constructors() {
808 let anonymous = ProcAddr::anonymous(ChannelAddr::Local(1));
809 assert!(
810 matches!(anonymous.id().uid(), Uid::Instance(_, None)),
811 "anonymous proc addr must have an unlabeled instance id"
812 );
813 assert_eq!(anonymous.label(), None);
814 assert_eq!(*anonymous.location().addr(), ChannelAddr::Local(1));
815
816 let instance = ProcAddr::instance(ChannelAddr::Local(2), "worker");
817 assert!(
818 matches!(
819 instance.id().uid(),
820 Uid::Instance(_, Some(label)) if label.as_str() == "worker"
821 ),
822 "instance proc addr must have a labeled instance id"
823 );
824 assert_eq!(instance.label().map(|label| label.as_str()), Some("worker"));
825 assert_eq!(*instance.location().addr(), ChannelAddr::Local(2));
826
827 let singleton = ProcAddr::singleton(ChannelAddr::Local(3), "controller");
828 assert!(
829 matches!(
830 singleton.id().uid(),
831 Uid::Singleton(label) if label.as_str() == "controller"
832 ),
833 "singleton proc addr must have a singleton id"
834 );
835 assert_eq!(
836 singleton.label().map(|label| label.as_str()),
837 Some("controller")
838 );
839 assert_eq!(*singleton.location().addr(), ChannelAddr::Local(3));
840 }
841
842 #[test]
843 fn test_proc_ref_debug_with_label() {
844 let pid = ProcId::new(
845 Uid::Instance(0xabc123, None),
846 Some(Label::new("my-proc").unwrap()),
847 );
848 let loc: Location = ChannelAddr::Local(42).into();
849 let pref = ProcAddr::new(pid, loc);
850 assert_eq!(
851 format!("{:?}", pref),
852 format!("<'my-proc' {}@inproc://42>", pref.id())
853 );
854 }
855
856 #[test]
857 fn test_proc_ref_debug_without_label() {
858 let pid = ProcId::new(Uid::Instance(0xabc123, None), None);
859 let loc: Location = ChannelAddr::Local(42).into();
860 let pref = ProcAddr::new(pid, loc);
861 assert_eq!(
862 format!("{:?}", pref),
863 format!("<{}@inproc://42>", pref.id())
864 );
865 }
866
867 #[test]
868 fn test_proc_ref_fromstr_roundtrip() {
869 let pid = ProcId::new(
870 Uid::Instance(0xabc123, None),
871 Some(Label::new("my-proc").unwrap()),
872 );
873 let loc: Location = ChannelAddr::Local(42).into();
874 let pref = ProcAddr::new(pid, loc);
875 let s = pref.to_string();
876 let parsed: ProcAddr = s.parse().unwrap();
877 assert_eq!(pref, parsed);
878 }
879
880 #[test]
881 fn test_proc_ref_fromstr_tcp() {
882 let parsed: ProcAddr = format!(
883 "{}@tcp://127.0.0.1:8080",
884 ProcId::new(Uid::Instance(0xabc123, None), None)
885 )
886 .parse()
887 .unwrap();
888 assert_eq!(*parsed.id().uid(), Uid::Instance(0xabc123, None));
889 assert_eq!(
890 *parsed.location().addr(),
891 "tcp:127.0.0.1:8080".parse::<ChannelAddr>().unwrap()
892 );
893 }
894
895 #[test]
896 fn test_proc_ref_fromstr_examples() {
897 let parsed: ProcAddr = "local@inproc://0".parse().unwrap();
898 assert_eq!(
899 parsed.id().uid(),
900 &Uid::singleton(Label::new("local").unwrap())
901 );
902 assert_eq!(*parsed.location().addr(), ChannelAddr::Local(0));
903
904 let expected_uid = Uid::Instance(0xabc123, None);
905 let parsed: ProcAddr = format!("controller{}@tcp://[::1]:2345", expected_uid)
906 .parse()
907 .unwrap();
908 assert_eq!(parsed.id().uid(), &expected_uid);
909 assert_eq!(
910 parsed.id().label().map(|label| label.as_str()),
911 Some("controller")
912 );
913 assert_eq!(
914 *parsed.location().addr(),
915 "tcp:[::1]:2345".parse::<ChannelAddr>().unwrap()
916 );
917 }
918
919 #[test]
920 fn test_proc_ref_fromstr_missing_separator() {
921 let err = ProcId::new(Uid::Instance(0xabc123, None), None)
922 .to_string()
923 .parse::<ProcAddr>()
924 .unwrap_err();
925 assert!(matches!(err, AddrParseError::MissingSeparator));
926 }
927
928 #[test]
929 fn test_proc_ref_fromstr_invalid_location() {
930 let err = "local@tcp://".parse::<ProcAddr>().unwrap_err();
931 assert!(matches!(err, AddrParseError::InvalidLocation(_)));
932 }
933
934 #[test]
935 fn test_actor_ref_display() {
936 let aid = ActorId::new(
937 Uid::Instance(0xabc123, None),
938 ProcId::new(
939 Uid::Instance(0xdef456, None),
940 Some(Label::new("my-proc").unwrap()),
941 ),
942 Some(Label::new("my-actor").unwrap()),
943 );
944 let loc: Location = ChannelAddr::Local(42).into();
945 let aref = ActorAddr::new(aid, loc);
946 assert_eq!(aref.to_string(), format!("{}@inproc://42", aref.id()));
947 }
948
949 #[test]
950 fn test_actor_and_port_proc_id_accessors() {
951 let proc_id = ProcId::new(
952 Uid::Instance(0xdef456, None),
953 Some(Label::new("my-proc").unwrap()),
954 );
955 let actor_id = ActorId::new(
956 Uid::Instance(0xabc123, None),
957 proc_id.clone(),
958 Some(Label::new("my-actor").unwrap()),
959 );
960 let actor_addr = ActorAddr::new(actor_id, ChannelAddr::Local(42).into());
961 let port_addr = actor_addr.port_addr(Port::from(7));
962
963 assert_eq!(actor_addr.proc_id(), &proc_id);
964 assert_eq!(port_addr.actor_id(), actor_addr.id());
965 assert_eq!(port_addr.proc_id(), &proc_id);
966 }
967
968 #[test]
969 fn test_actor_ref_debug_all_labels() {
970 let aid = ActorId::new(
971 Uid::Instance(0xabc123, None),
972 ProcId::new(
973 Uid::Instance(0xdef456, None),
974 Some(Label::new("my-proc").unwrap()),
975 ),
976 Some(Label::new("my-actor").unwrap()),
977 );
978 let loc: Location = ChannelAddr::Local(42).into();
979 let aref = ActorAddr::new(aid, loc);
980 assert_eq!(
981 format!("{:?}", aref),
982 format!("<'my-actor.my-proc' {}@inproc://42>", aref.id())
983 );
984 }
985
986 #[test]
987 fn test_actor_ref_debug_no_labels() {
988 let aid = ActorId::new(
989 Uid::Instance(0xabc123, None),
990 ProcId::new(Uid::Instance(0xdef456, None), None),
991 None,
992 );
993 let loc: Location = ChannelAddr::Local(42).into();
994 let aref = ActorAddr::new(aid, loc);
995 assert_eq!(
996 format!("{:?}", aref),
997 format!("<{}@inproc://42>", aref.id())
998 );
999 }
1000
1001 #[test]
1002 fn test_actor_ref_debug_actor_label_only() {
1003 let aid = ActorId::new(
1004 Uid::Instance(0xabc123, None),
1005 ProcId::new(Uid::Instance(0xdef456, None), None),
1006 Some(Label::new("my-actor").unwrap()),
1007 );
1008 let loc: Location = ChannelAddr::Local(42).into();
1009 let aref = ActorAddr::new(aid, loc);
1010 assert_eq!(
1011 format!("{:?}", aref),
1012 format!("<'my-actor' {}@inproc://42>", aref.id())
1013 );
1014 }
1015
1016 #[test]
1017 fn test_actor_ref_debug_proc_label_only() {
1018 let aid = ActorId::new(
1019 Uid::Instance(0xabc123, None),
1020 ProcId::new(
1021 Uid::Instance(0xdef456, None),
1022 Some(Label::new("my-proc").unwrap()),
1023 ),
1024 None,
1025 );
1026 let loc: Location = ChannelAddr::Local(42).into();
1027 let aref = ActorAddr::new(aid, loc);
1028 assert_eq!(
1029 format!("{:?}", aref),
1030 format!("<'.my-proc' {}@inproc://42>", aref.id())
1031 );
1032 }
1033
1034 #[test]
1035 fn test_actor_ref_fromstr_roundtrip() {
1036 let aid = ActorId::new(
1037 Uid::Instance(0xabc123, None),
1038 ProcId::new(
1039 Uid::Instance(0xdef456, None),
1040 Some(Label::new("my-proc").unwrap()),
1041 ),
1042 Some(Label::new("my-actor").unwrap()),
1043 );
1044 let loc: Location = ChannelAddr::Local(42).into();
1045 let aref = ActorAddr::new(aid, loc);
1046 let s = aref.to_string();
1047 let parsed: ActorAddr = s.parse().unwrap();
1048 assert_eq!(aref, parsed);
1049 assert_eq!(parsed.id.label().map(|l| l.as_str()), Some("my-actor"));
1050 assert_eq!(
1051 parsed.id.proc_id().label().map(|l| l.as_str()),
1052 Some("my-proc")
1053 );
1054 }
1055
1056 #[test]
1057 fn test_actor_ref_fromstr_examples() {
1058 let expected_actor_uid = Uid::Instance(0xabc123, None);
1059 let parsed: ActorAddr = format!("controller{}.local@inproc://0", expected_actor_uid)
1060 .parse()
1061 .unwrap();
1062 assert_eq!(parsed.id().uid(), &expected_actor_uid);
1063 assert_eq!(
1064 parsed.id().label().map(|label| label.as_str()),
1065 Some("controller")
1066 );
1067 assert_eq!(
1068 parsed.id().proc_id().uid(),
1069 &Uid::singleton(Label::new("local").unwrap())
1070 );
1071 assert_eq!(*parsed.location().addr(), ChannelAddr::Local(0));
1072 }
1073
1074 #[test]
1075 fn test_actor_ref_fromstr_missing_separator() {
1076 let err = ActorId::new(
1077 Uid::Instance(0xabc123, None),
1078 ProcId::new(Uid::Instance(0xdef456, None), None),
1079 None,
1080 )
1081 .to_string()
1082 .parse::<ActorAddr>()
1083 .unwrap_err();
1084 assert!(matches!(err, AddrParseError::MissingSeparator));
1085 }
1086
1087 #[test]
1088 fn test_actor_ref_fromstr_invalid_location() {
1089 let err = "local.local@tcp://".parse::<ActorAddr>().unwrap_err();
1090 assert!(matches!(err, AddrParseError::InvalidLocation(_)));
1091 }
1092
1093 #[test]
1094 fn test_proc_ref_eq_and_hash() {
1095 use std::collections::hash_map::DefaultHasher;
1096 use std::hash::Hasher;
1097
1098 let pid = ProcId::new(Uid::Instance(0x42, None), Some(Label::new("proc").unwrap()));
1099 let loc: Location = ChannelAddr::Local(1).into();
1100 let a = ProcAddr::new(pid.clone(), loc.clone());
1101 let b = ProcAddr::new(pid, loc);
1102 assert_eq!(a, b);
1103
1104 let hash = |r: &ProcAddr| {
1105 let mut h = DefaultHasher::new();
1106 r.hash(&mut h);
1107 h.finish()
1108 };
1109 assert_eq!(hash(&a), hash(&b));
1110 }
1111
1112 #[test]
1113 fn test_proc_ref_neq_different_location() {
1114 let pid = ProcId::new(Uid::Instance(0x42, None), Some(Label::new("proc").unwrap()));
1115 let a = ProcAddr::new(pid.clone(), ChannelAddr::Local(1).into());
1116 let b = ProcAddr::new(pid, ChannelAddr::Local(2).into());
1117 assert_ne!(a, b);
1118 }
1119
1120 #[test]
1121 fn test_actor_ref_eq_and_hash() {
1122 use std::collections::hash_map::DefaultHasher;
1123 use std::hash::Hasher;
1124
1125 let aid = ActorId::new(
1126 Uid::Instance(0x42, None),
1127 ProcId::new(Uid::Instance(0x99, None), Some(Label::new("proc").unwrap())),
1128 Some(Label::new("actor").unwrap()),
1129 );
1130 let loc: Location = ChannelAddr::Local(1).into();
1131 let a = ActorAddr::new(aid.clone(), loc.clone());
1132 let b = ActorAddr::new(aid, loc);
1133 assert_eq!(a, b);
1134
1135 let hash = |r: &ActorAddr| {
1136 let mut h = DefaultHasher::new();
1137 r.hash(&mut h);
1138 h.finish()
1139 };
1140 assert_eq!(hash(&a), hash(&b));
1141 }
1142
1143 #[test]
1144 fn test_proc_ref_singleton() {
1145 let pid = ProcId::new(
1146 Uid::singleton(Label::new("my-proc").unwrap()),
1147 Some(Label::new("my-proc").unwrap()),
1148 );
1149 let loc: Location = ChannelAddr::Local(0).into();
1150 let pref = ProcAddr::new(pid, loc);
1151 let s = pref.to_string();
1152 assert_eq!(s, "my-proc@inproc://0");
1153 let parsed: ProcAddr = s.parse().unwrap();
1154 assert_eq!(pref, parsed);
1155 }
1156
1157 #[test]
1158 fn test_reference_prefix_relationships() {
1159 let proc_ref = ProcAddr::singleton(ChannelAddr::Local(42), "service");
1160 let actor_ref = proc_ref.actor_addr("host_agent");
1161 let port_ref = actor_ref.port_addr(Port::from(7u64));
1162
1163 assert!(Addr::Proc(proc_ref.clone()).is_prefix_of(&Addr::Actor(actor_ref.clone())));
1164 assert!(Addr::Proc(proc_ref.clone()).is_prefix_of(&Addr::Port(port_ref.clone())));
1165 assert!(Addr::Actor(actor_ref.clone()).is_prefix_of(&Addr::Port(port_ref)));
1166 }
1167
1168 #[test]
1169 fn test_location_serde_roundtrip() {
1170 let loc: Location = ChannelAddr::Local(42).into();
1171 let json = serde_json::to_string(&loc).unwrap();
1172 let parsed: Location = serde_json::from_str(&json).unwrap();
1173 assert_eq!(loc, parsed);
1174 }
1175
1176 #[test]
1177 fn test_proc_ref_serde_roundtrip() {
1178 let pid = ProcId::new(
1179 Uid::Instance(0xabcdef, None),
1180 Some(Label::new("my-proc").unwrap()),
1181 );
1182 let loc: Location = ChannelAddr::Local(42).into();
1183 let pref = ProcAddr::new(pid, loc);
1184 let json = serde_json::to_string(&pref).unwrap();
1185 let parsed: ProcAddr = serde_json::from_str(&json).unwrap();
1186 assert_eq!(pref, parsed);
1187 }
1188
1189 #[test]
1190 fn test_actor_ref_serde_roundtrip() {
1191 let aid = ActorId::new(
1192 Uid::Instance(0xabcdef, None),
1193 ProcId::new(
1194 Uid::Instance(0x123456, None),
1195 Some(Label::new("my-proc").unwrap()),
1196 ),
1197 Some(Label::new("my-actor").unwrap()),
1198 );
1199 let loc: Location = ChannelAddr::Local(42).into();
1200 let aref = ActorAddr::new(aid, loc);
1201 let json = serde_json::to_string(&aref).unwrap();
1202 let parsed: ActorAddr = serde_json::from_str(&json).unwrap();
1203 assert_eq!(aref, parsed);
1204 }
1205
1206 #[test]
1207 fn test_proc_ref_with_metatls_location() {
1208 use crate::channel::TlsAddr;
1209
1210 let pid = ProcId::new(Uid::Instance(0x42, None), None);
1211 let loc: Location = ChannelAddr::MetaTls(TlsAddr::new("example.com", 443)).into();
1212 let pref = ProcAddr::new(pid, loc);
1213 let s = pref.to_string();
1214 assert_eq!(s, format!("{}@metatls://example.com:443", pref.id()));
1215 let parsed: ProcAddr = s.parse().unwrap();
1216 assert_eq!(pref, parsed);
1217 }
1218
1219 #[test]
1220 fn test_port_ref_construction_and_accessors() {
1221 let aid = ActorId::new(
1222 Uid::Instance(0xabc123, None),
1223 ProcId::new(
1224 Uid::Instance(0xdef456, None),
1225 Some(Label::new("my-proc").unwrap()),
1226 ),
1227 Some(Label::new("my-actor").unwrap()),
1228 );
1229 let port_id = PortId::new(aid.clone(), Port::from(42));
1230 let loc: Location = ChannelAddr::Local(7).into();
1231 let pref = PortAddr::new(port_id.clone(), loc.clone());
1232 assert_eq!(pref.id(), &port_id);
1233 assert_eq!(pref.location(), &loc);
1234 assert_eq!(pref.actor_id(), &aid);
1235 }
1236
1237 #[test]
1238 fn test_port_ref_display() {
1239 let aid = ActorId::new(
1240 Uid::Instance(0xabc123, None),
1241 ProcId::new(
1242 Uid::Instance(0xdef456, None),
1243 Some(Label::new("my-proc").unwrap()),
1244 ),
1245 Some(Label::new("my-actor").unwrap()),
1246 );
1247 let port_id = PortId::new(aid, Port::from(42));
1248 let loc: Location = ChannelAddr::Local(7).into();
1249 let pref = PortAddr::new(port_id, loc);
1250 assert_eq!(pref.to_string(), format!("{}@inproc://7", pref.id()));
1251 }
1252
1253 #[test]
1254 fn test_port_ref_debug_all_labels() {
1255 let aid = ActorId::new(
1256 Uid::Instance(0xabc123, None),
1257 ProcId::new(
1258 Uid::Instance(0xdef456, None),
1259 Some(Label::new("my-proc").unwrap()),
1260 ),
1261 Some(Label::new("my-actor").unwrap()),
1262 );
1263 let port_id = PortId::new(aid, Port::from(42));
1264 let loc: Location = ChannelAddr::Local(7).into();
1265 let pref = PortAddr::new(port_id, loc);
1266 assert_eq!(
1267 format!("{:?}", pref),
1268 format!("<'my-actor.my-proc' {}@inproc://7>", pref.id())
1269 );
1270 }
1271
1272 #[test]
1273 fn test_port_ref_debug_no_labels() {
1274 let aid = ActorId::new(
1275 Uid::Instance(0xabc123, None),
1276 ProcId::new(Uid::Instance(0xdef456, None), None),
1277 None,
1278 );
1279 let port_id = PortId::new(aid, Port::from(42));
1280 let loc: Location = ChannelAddr::Local(7).into();
1281 let pref = PortAddr::new(port_id, loc);
1282 assert_eq!(format!("{:?}", pref), format!("<{}@inproc://7>", pref.id()));
1283 }
1284
1285 #[test]
1286 fn test_port_ref_debug_actor_label_only() {
1287 let aid = ActorId::new(
1288 Uid::Instance(0xabc123, None),
1289 ProcId::new(Uid::Instance(0xdef456, None), None),
1290 Some(Label::new("my-actor").unwrap()),
1291 );
1292 let port_id = PortId::new(aid, Port::from(42));
1293 let loc: Location = ChannelAddr::Local(7).into();
1294 let pref = PortAddr::new(port_id, loc);
1295 assert_eq!(
1296 format!("{:?}", pref),
1297 format!("<'my-actor' {}@inproc://7>", pref.id())
1298 );
1299 }
1300
1301 #[test]
1302 fn test_port_ref_debug_proc_label_only() {
1303 let aid = ActorId::new(
1304 Uid::Instance(0xabc123, None),
1305 ProcId::new(
1306 Uid::Instance(0xdef456, None),
1307 Some(Label::new("my-proc").unwrap()),
1308 ),
1309 None,
1310 );
1311 let port_id = PortId::new(aid, Port::from(42));
1312 let loc: Location = ChannelAddr::Local(7).into();
1313 let pref = PortAddr::new(port_id, loc);
1314 assert_eq!(
1315 format!("{:?}", pref),
1316 format!("<'.my-proc' {}@inproc://7>", pref.id())
1317 );
1318 }
1319
1320 #[test]
1321 fn test_port_ref_fromstr_roundtrip() {
1322 let aid = ActorId::new(
1323 Uid::Instance(0xabc123, None),
1324 ProcId::new(
1325 Uid::Instance(0xdef456, None),
1326 Some(Label::new("my-proc").unwrap()),
1327 ),
1328 Some(Label::new("my-actor").unwrap()),
1329 );
1330 let port_id = PortId::new(aid, Port::from(42));
1331 let loc: Location = ChannelAddr::Local(7).into();
1332 let pref = PortAddr::new(port_id, loc);
1333 let s = pref.to_string();
1334 let parsed: PortAddr = s.parse().unwrap();
1335 assert_eq!(pref, parsed);
1336 assert_eq!(
1337 parsed.id.actor_id().label().map(|l| l.as_str()),
1338 Some("my-actor")
1339 );
1340 assert_eq!(
1341 parsed.id.actor_id().proc_id().label().map(|l| l.as_str()),
1342 Some("my-proc")
1343 );
1344 }
1345
1346 #[test]
1347 fn test_port_ref_fromstr_examples() {
1348 let expected_actor_uid = Uid::Instance(0xabc123, None);
1349 let expected_proc_uid = Uid::Instance(0xdef456, None);
1350 let parsed: PortAddr = format!(
1351 "{}.{}:42@tcp://[::1]:2345",
1352 expected_actor_uid, expected_proc_uid
1353 )
1354 .parse()
1355 .unwrap();
1356 assert_eq!(parsed.id().actor_id().uid(), &expected_actor_uid);
1357 assert_eq!(parsed.id().proc_id().uid(), &expected_proc_uid);
1358 assert_eq!(parsed.id().port(), Port::from(42));
1359 assert_eq!(
1360 *parsed.location().addr(),
1361 "tcp:[::1]:2345".parse::<ChannelAddr>().unwrap()
1362 );
1363 }
1364
1365 #[test]
1366 fn test_port_ref_fromstr_missing_separator() {
1367 let err = PortId::new(
1368 ActorId::new(
1369 Uid::Instance(0xabc123, None),
1370 ProcId::new(Uid::Instance(0xdef456, None), None),
1371 None,
1372 ),
1373 Port::from(42),
1374 )
1375 .to_string()
1376 .parse::<PortAddr>()
1377 .unwrap_err();
1378 assert!(matches!(err, AddrParseError::MissingSeparator));
1379 }
1380
1381 #[test]
1382 fn test_port_ref_fromstr_invalid_location() {
1383 let err = "local.local:7@tcp://".parse::<PortAddr>().unwrap_err();
1384 assert!(matches!(err, AddrParseError::InvalidLocation(_)));
1385 }
1386
1387 #[test]
1388 fn test_reference_fromstr_specificity() {
1389 let parsed: Addr = "local@inproc://0".parse().unwrap();
1390 assert!(parsed.is_proc());
1391
1392 let parsed: Addr = "local.local@inproc://0".parse().unwrap();
1393 assert!(parsed.is_actor());
1394
1395 let parsed: Addr = "local.local:7@inproc://0".parse().unwrap();
1396 assert!(parsed.is_port());
1397 }
1398
1399 #[test]
1400 fn test_reference_fromstr_rejects_malformed_specific_forms() {
1401 assert!("local.local:not-a-port@inproc://0".parse::<Addr>().is_err());
1402 assert!("local.<bad!>@inproc://0".parse::<Addr>().is_err());
1403 assert!("local@tcp://".parse::<Addr>().is_err());
1404 }
1405
1406 #[test]
1407 fn test_reference_fromstr_does_not_downcast_malformed_port_ref() {
1408 let err = "local.local:not-a-port@inproc://0"
1409 .parse::<Addr>()
1410 .unwrap_err();
1411 assert!(matches!(
1412 err,
1413 AddrParseError::InvalidId(IdParseError::InvalidPort(_))
1414 ));
1415 }
1416
1417 #[test]
1418 fn test_reference_fromstr_does_not_downcast_malformed_actor_ref() {
1419 let err = "local.<bad!>@inproc://0".parse::<Addr>().unwrap_err();
1420 assert!(matches!(
1421 err,
1422 AddrParseError::InvalidId(IdParseError::InvalidActorProcUid(_))
1423 ));
1424 }
1425
1426 #[test]
1427 fn test_port_ref_eq_and_hash() {
1428 use std::collections::hash_map::DefaultHasher;
1429 use std::hash::Hasher;
1430
1431 let aid = ActorId::new(
1432 Uid::Instance(0x42, None),
1433 ProcId::new(Uid::Instance(0x99, None), Some(Label::new("proc").unwrap())),
1434 Some(Label::new("actor").unwrap()),
1435 );
1436 let port_id = PortId::new(aid, Port::from(10));
1437 let loc: Location = ChannelAddr::Local(1).into();
1438 let a = PortAddr::new(port_id.clone(), loc.clone());
1439 let b = PortAddr::new(port_id, loc);
1440 assert_eq!(a, b);
1441
1442 let hash = |r: &PortAddr| {
1443 let mut h = DefaultHasher::new();
1444 r.hash(&mut h);
1445 h.finish()
1446 };
1447 assert_eq!(hash(&a), hash(&b));
1448 }
1449
1450 #[test]
1451 fn test_port_ref_neq_different_location() {
1452 let aid = ActorId::new(
1453 Uid::Instance(0x42, None),
1454 ProcId::new(Uid::Instance(0x99, None), Some(Label::new("proc").unwrap())),
1455 Some(Label::new("actor").unwrap()),
1456 );
1457 let port_id = PortId::new(aid, Port::from(10));
1458 let a = PortAddr::new(port_id.clone(), ChannelAddr::Local(1).into());
1459 let b = PortAddr::new(port_id, ChannelAddr::Local(2).into());
1460 assert_ne!(a, b);
1461 }
1462
1463 #[test]
1464 fn test_port_ref_serde_roundtrip() {
1465 let aid = ActorId::new(
1466 Uid::Instance(0xabcdef, None),
1467 ProcId::new(
1468 Uid::Instance(0x123456, None),
1469 Some(Label::new("my-proc").unwrap()),
1470 ),
1471 Some(Label::new("my-actor").unwrap()),
1472 );
1473 let port_id = PortId::new(aid, Port::from(42));
1474 let loc: Location = ChannelAddr::Local(7).into();
1475 let pref = PortAddr::new(port_id, loc);
1476 let json = serde_json::to_string(&pref).unwrap();
1477 let parsed: PortAddr = serde_json::from_str(&json).unwrap();
1478 assert_eq!(pref, parsed);
1479 }
1480}