1use std::any::Any;
71use std::collections::BTreeMap;
72use std::collections::BTreeSet;
73use std::fmt;
74use std::fmt::Debug;
75use std::future;
76use std::future::Future;
77use std::ops::Bound::Excluded;
78use std::pin::Pin;
79use std::sync::Arc;
80use std::sync::LazyLock;
81use std::sync::Mutex;
82use std::sync::RwLock;
83use std::sync::Weak;
84use std::sync::atomic::AtomicU64;
85use std::sync::atomic::AtomicUsize;
86use std::sync::atomic::Ordering;
87use std::task::Context;
88use std::task::Poll;
89
90use async_trait::async_trait;
91use dashmap::DashMap;
92use dashmap::mapref::entry::Entry;
93use futures::Sink;
94use futures::Stream;
95use hyperactor_config::Flattrs;
96use hyperactor_telemetry::hash_to_u64;
97use serde::Deserialize;
98use serde::Serialize;
99use serde::de::DeserializeOwned;
100use tokio::sync::mpsc;
101use tokio::sync::oneshot;
102use tokio::sync::watch;
103use tokio::task::JoinHandle;
104use tokio_util::sync::CancellationToken;
105use typeuri::Named;
106
107use crate::Proc;
109use crate::accum::Accumulator;
110use crate::accum::ReducerSpec;
111use crate::accum::StreamingReducerOpts;
112use crate::actor::ActorStatus;
113use crate::actor::Signal;
114use crate::actor::remote::USER_PORT_OFFSET;
115use crate::channel;
116use crate::channel::ChannelAddr;
117use crate::channel::ChannelError;
118use crate::channel::ChannelTransport;
119use crate::channel::SendError;
120use crate::channel::TxStatus;
121use crate::context;
122use crate::metrics;
123use crate::ordering::SEQ_INFO;
124use crate::ordering::SeqInfo;
125use crate::reference;
126
127mod undeliverable;
128pub use undeliverable::Undeliverable;
130pub use undeliverable::UndeliverableMessageError;
131pub use undeliverable::custom_monitored_return_handle;
132pub use undeliverable::monitored_return_handle; pub mod mailbox_admin_message;
135pub use mailbox_admin_message::MailboxAdminMessage;
136pub use mailbox_admin_message::MailboxAdminMessageHandler;
137pub mod headers;
139
140pub trait Message: Send + Sync + 'static {}
143impl<M: Send + Sync + 'static> Message for M {}
144
145pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
149
150impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
151
152pub type Data = Vec<u8>;
154
155#[derive(
157 thiserror::Error,
158 Debug,
159 Serialize,
160 Deserialize,
161 typeuri::Named,
162 Clone,
163 PartialEq,
164 Eq
165)]
166pub enum DeliveryError {
167 #[error("address not routable: {0}")]
169 Unroutable(String),
170
171 #[error("broken link: {0}")]
174 BrokenLink(String),
175
176 #[error("mailbox error: {0}")]
178 Mailbox(String),
179
180 #[error("multicast error: {0}")]
182 Multicast(String),
183
184 #[error("ttl expired")]
186 TtlExpired,
187}
188
189#[derive(Debug, Serialize, Deserialize, Clone, typeuri::Named)]
193pub struct MessageEnvelope {
194 sender: reference::ActorId,
196
197 dest: reference::PortId,
199
200 data: wirevalue::Any,
202
203 errors: Vec<DeliveryError>,
205
206 headers: Flattrs,
208
209 ttl: u8,
211
212 return_undeliverable: bool,
215 }
217wirevalue::register_type!(MessageEnvelope);
218
219impl MessageEnvelope {
220 pub fn new(
222 sender: reference::ActorId,
223 dest: reference::PortId,
224 data: wirevalue::Any,
225 headers: Flattrs,
226 ) -> Self {
227 Self {
228 sender,
229 dest,
230 data,
231 errors: Vec::new(),
232 headers,
233 ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
234 return_undeliverable: true,
236 }
237 }
238
239 pub(crate) fn new_unknown(dest: reference::PortId, data: wirevalue::Any) -> Self {
241 let unknown_addr = ChannelAddr::any(ChannelTransport::Local);
243 let unknown_proc_id = crate::reference::ProcId::unique(unknown_addr, "unknown");
244 let unknown_actor_id =
245 crate::reference::ActorId::root(unknown_proc_id, "unknown".to_string());
246 Self::new(unknown_actor_id, dest, data, Flattrs::new())
247 }
248
249 pub fn serialize<T: Serialize + Named>(
251 source: reference::ActorId,
252 dest: reference::PortId,
253 value: &T,
254 headers: Flattrs,
255 ) -> Result<Self, wirevalue::Error> {
256 Ok(Self {
257 headers,
258 data: wirevalue::Any::serialize(value)?,
259 sender: source,
260 dest,
261 errors: Vec::new(),
262 ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
263 return_undeliverable: true,
265 })
266 }
267
268 pub fn ttl(&self) -> u8 {
274 self.ttl
275 }
276
277 pub fn set_ttl(mut self, ttl: u8) -> Self {
287 self.ttl = ttl;
288 self
289 }
290
291 fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
299 if self.ttl == 0 {
300 Err(DeliveryError::TtlExpired)
301 } else {
302 self.ttl -= 1;
303 Ok(())
304 }
305 }
306
307 pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
309 Ok(self.data.deserialized()?)
310 }
311
312 pub fn data(&self) -> &wirevalue::Any {
314 &self.data
315 }
316
317 pub fn sender(&self) -> &reference::ActorId {
319 &self.sender
320 }
321
322 pub fn dest(&self) -> &reference::PortId {
324 &self.dest
325 }
326
327 pub fn headers(&self) -> &Flattrs {
329 &self.headers
330 }
331
332 pub fn is_signal(&self) -> bool {
334 self.dest.index() == Signal::port()
335 }
336
337 pub fn set_error(&mut self, error: DeliveryError) {
340 self.errors.push(error)
341 }
342
343 pub fn update_sender(&mut self, sender: reference::ActorId) {
347 self.sender = sender;
348 }
349
350 pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
354 self.return_undeliverable = return_undeliverable;
355 }
356
357 pub fn undeliverable(
361 mut self,
362 error: DeliveryError,
363 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
364 ) {
365 tracing::debug!(
366 name = "undelivered_message_attempt",
367 sender = self.sender.to_string(),
368 dest = self.dest.to_string(),
369 error = error.to_string(),
370 return_handle = %return_handle,
371 );
372 metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
373 1,
374 hyperactor_telemetry::kv_pairs!(
375 "sender_actor_id" => self.sender.to_string(),
376 "dest_actor_id" => self.dest.to_string(),
377 "message_type" => self.data.typename().unwrap_or("unknown"),
378 "error_type" => error.to_string(),
379 ),
380 );
381
382 self.set_error(error);
383 undeliverable::return_undeliverable(return_handle, self);
384 }
385
386 pub fn errors(&self) -> &Vec<DeliveryError> {
389 &self.errors
390 }
391
392 pub fn error_msg(&self) -> Option<String> {
396 if self.errors.is_empty() {
397 None
398 } else {
399 Some(
400 self.errors
401 .iter()
402 .map(|e| e.to_string())
403 .collect::<Vec<_>>()
404 .join("; "),
405 )
406 }
407 }
408
409 fn open(self) -> (MessageMetadata, wirevalue::Any) {
410 let Self {
411 sender,
412 dest,
413 data,
414 errors,
415 headers,
416 ttl,
417 return_undeliverable,
418 } = self;
419
420 (
421 MessageMetadata {
422 sender,
423 dest,
424 errors,
425 headers,
426 ttl,
427 return_undeliverable,
428 },
429 data,
430 )
431 }
432
433 fn seal(metadata: MessageMetadata, data: wirevalue::Any) -> Self {
434 let MessageMetadata {
435 sender,
436 dest,
437 errors,
438 headers,
439 ttl,
440 return_undeliverable,
441 } = metadata;
442
443 Self {
444 sender,
445 dest,
446 data,
447 errors,
448 headers,
449 ttl,
450 return_undeliverable,
451 }
452 }
453
454 fn return_undeliverable(&self) -> bool {
455 self.return_undeliverable
456 }
457
458 pub fn set_header<T: Serialize>(&mut self, key: hyperactor_config::attrs::Key<T>, value: T) {
460 self.headers.set(key, value);
461 }
462}
463
464impl fmt::Display for MessageEnvelope {
465 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466 match &self.error_msg() {
467 None => write!(
468 f,
469 "{} > {}: {} {{{}}}",
470 self.sender, self.dest, self.data, self.headers
471 ),
472 Some(err) => write!(
473 f,
474 "{} > {}: {} {{{}}}: delivery error: {}",
475 self.sender, self.dest, self.data, self.headers, err
476 ),
477 }
478 }
479}
480
481#[derive(Clone)]
483pub struct MessageMetadata {
484 sender: reference::ActorId,
485 dest: reference::PortId,
486 errors: Vec<DeliveryError>,
487 headers: Flattrs,
488 ttl: u8,
489 return_undeliverable: bool,
490}
491
492#[derive(Debug)]
495pub struct MailboxError {
496 actor_id: reference::ActorId,
497 kind: MailboxErrorKind,
498}
499
500#[derive(thiserror::Error, Debug)]
503#[non_exhaustive]
504pub enum MailboxErrorKind {
505 #[error("mailbox closed")]
507 Closed,
508
509 #[error("invalid port: {0}")]
511 InvalidPort(reference::PortId),
512
513 #[error("no sender for port: {0}")]
515 NoSenderForPort(reference::PortId),
516
517 #[error("no local sender for port: {0}")]
520 NoLocalSenderForPort(reference::PortId),
521
522 #[error("{0}: port closed")]
524 PortClosed(reference::PortId),
525
526 #[error("send {0}: {1}")]
528 Send(reference::PortId, #[source] anyhow::Error),
529
530 #[error("recv {0}: {1}")]
532 Recv(reference::PortId, #[source] anyhow::Error),
533
534 #[error("serialize: {0}")]
536 Serialize(#[source] anyhow::Error),
537
538 #[error("deserialize {0}: {1}")]
540 Deserialize(&'static str, anyhow::Error),
541
542 #[error(transparent)]
544 Channel(#[from] ChannelError),
545
546 #[error("owner terminated: {0}")]
548 OwnerTerminated(ActorStatus),
549}
550
551impl MailboxError {
552 pub fn new(actor_id: reference::ActorId, kind: MailboxErrorKind) -> Self {
555 Self { actor_id, kind }
556 }
557
558 pub fn actor_id(&self) -> &reference::ActorId {
560 &self.actor_id
561 }
562
563 pub fn kind(&self) -> &MailboxErrorKind {
565 &self.kind
566 }
567}
568
569impl fmt::Display for MailboxError {
570 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
571 write!(f, "{}: ", self.actor_id)?;
572 fmt::Display::fmt(&self.kind, f)
573 }
574}
575
576impl std::error::Error for MailboxError {
577 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
578 self.kind.source()
579 }
580}
581
582#[derive(Debug, Clone)]
586pub enum PortLocation {
587 Bound(reference::PortId),
589 Unbound(reference::ActorId, &'static str),
591}
592
593impl PortLocation {
594 fn new_unbound<M: Message>(actor_id: reference::ActorId) -> Self {
595 PortLocation::Unbound(actor_id, std::any::type_name::<M>())
596 }
597
598 #[allow(dead_code)]
599 fn new_unbound_type(actor_id: reference::ActorId, ty: &'static str) -> Self {
600 PortLocation::Unbound(actor_id, ty)
601 }
602
603 pub fn actor_id(&self) -> &reference::ActorId {
605 match self {
606 PortLocation::Bound(port_id) => port_id.actor_id(),
607 PortLocation::Unbound(actor_id, _) => actor_id,
608 }
609 }
610}
611
612impl fmt::Display for PortLocation {
613 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614 match self {
615 PortLocation::Bound(port_id) => write!(f, "{}", port_id),
616 PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
617 }
618 }
619}
620
621#[derive(Debug)]
624pub struct MailboxSenderError {
625 location: Box<PortLocation>,
626 kind: Box<MailboxSenderErrorKind>,
627}
628
629#[derive(thiserror::Error, Debug)]
631pub enum MailboxSenderErrorKind {
632 #[error("serialization error: {0}")]
634 Serialize(anyhow::Error),
635
636 #[error("deserialization error for type {0}: {1}")]
638 Deserialize(&'static str, anyhow::Error),
639
640 #[error("invalid port")]
642 Invalid,
643
644 #[error("port closed")]
646 Closed,
647
648 #[error(transparent)]
651 Mailbox(#[from] MailboxError),
652
653 #[error(transparent)]
655 Channel(#[from] ChannelError),
656
657 #[error("send error: {0}")]
659 Other(#[from] anyhow::Error),
660
661 #[error("unreachable: {0}")]
663 Unreachable(anyhow::Error),
664}
665
666impl MailboxSenderError {
667 pub fn new_unbound<M>(actor_id: reference::ActorId, kind: MailboxSenderErrorKind) -> Self {
669 Self {
670 location: Box::new(PortLocation::Unbound(actor_id, std::any::type_name::<M>())),
671 kind: Box::new(kind),
672 }
673 }
674
675 pub fn new_unbound_type(
677 actor_id: reference::ActorId,
678 kind: MailboxSenderErrorKind,
679 ty: &'static str,
680 ) -> Self {
681 Self {
682 location: Box::new(PortLocation::Unbound(actor_id, ty)),
683 kind: Box::new(kind),
684 }
685 }
686
687 pub fn new_bound(port_id: reference::PortId, kind: MailboxSenderErrorKind) -> Self {
689 Self {
690 location: Box::new(PortLocation::Bound(port_id)),
691 kind: Box::new(kind),
692 }
693 }
694
695 pub fn location(&self) -> &PortLocation {
697 &self.location
698 }
699
700 pub fn kind(&self) -> &MailboxSenderErrorKind {
702 &self.kind
703 }
704}
705
706impl fmt::Display for MailboxSenderError {
707 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
708 write!(f, "{}: ", self.location)?;
709 fmt::Display::fmt(&self.kind, f)
710 }
711}
712
713impl std::error::Error for MailboxSenderError {
714 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
715 self.kind.source()
716 }
717}
718
719#[async_trait]
722pub trait MailboxSender: Send + Sync + Any {
723 fn post(
726 &self,
727 mut envelope: MessageEnvelope,
728 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
729 ) {
730 if let Err(err) = envelope.dec_ttl_or_err() {
731 envelope.undeliverable(err, return_handle);
732 return;
733 }
734 self.post_unchecked(envelope, return_handle);
735 }
736
737 fn post_unchecked(
739 &self,
740 envelope: MessageEnvelope,
741 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
742 );
743
744 async fn flush(&self) -> Result<(), anyhow::Error> {
749 Ok(())
750 }
751}
752
753pub trait PortSender: MailboxSender {
756 fn serialize_and_send<M: RemoteMessage>(
758 &self,
759 port: &reference::PortRef<M>,
760 message: M,
761 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
762 ) -> Result<(), MailboxSenderError> {
763 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
765 MailboxSenderError::new_bound(
766 port.port_id().clone(),
767 MailboxSenderErrorKind::Serialize(err.into()),
768 )
769 })?;
770 self.post(
771 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
772 return_handle,
773 );
774 Ok(())
775 }
776
777 fn serialize_and_send_once<M: RemoteMessage>(
780 &self,
781 once_port: reference::OncePortRef<M>,
782 message: M,
783 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
784 ) -> Result<(), MailboxSenderError> {
785 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
786 MailboxSenderError::new_bound(
787 once_port.port_id().clone(),
788 MailboxSenderErrorKind::Serialize(err.into()),
789 )
790 })?;
791 self.post(
792 MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
793 return_handle,
794 );
795 Ok(())
796 }
797}
798
799impl<T: ?Sized + MailboxSender> PortSender for T {}
800
801#[derive(Debug, Clone)]
805pub struct PanickingMailboxSender;
806
807#[async_trait]
808impl MailboxSender for PanickingMailboxSender {
809 fn post_unchecked(
810 &self,
811 envelope: MessageEnvelope,
812 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
813 ) {
814 panic!("panic! in the mailbox! attempted post: {}", envelope)
815 }
816}
817
818#[derive(Debug)]
821pub struct UndeliverableMailboxSender;
822
823#[async_trait]
824impl MailboxSender for UndeliverableMailboxSender {
825 fn post_unchecked(
826 &self,
827 envelope: MessageEnvelope,
828 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
829 ) {
830 let sender_name = envelope.sender.name();
831 let error_str = envelope.error_msg().unwrap_or("".to_string());
832 tracing::error!(
835 name = "undelivered_message_abandoned",
836 actor_name = sender_name,
837 actor_id = envelope.sender.to_string(),
838 dest = envelope.dest.to_string(),
839 headers = envelope.headers().to_string(), data = envelope.data().to_string(),
841 return_handle = %return_handle,
842 "message not delivered, {}",
843 error_str,
844 );
845 }
846}
847
848static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
849 LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
850
851#[derive(Clone)]
857pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
858
859impl fmt::Debug for BoxedMailboxSender {
860 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861 f.debug_struct("BoxedMailboxSender")
862 .field("sender", &"<dyn MailboxSender>")
863 .finish()
864 }
865}
866
867impl BoxedMailboxSender {
868 pub fn new(sender: impl MailboxSender + 'static) -> Self {
870 Self(Arc::new(sender))
871 }
872
873 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
876 (&*self.0 as &dyn Any).downcast_ref::<T>()
877 }
878}
879
880pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
882 fn boxed(&self) -> BoxedMailboxSender;
884}
885impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
886 fn boxed(&self) -> BoxedMailboxSender {
887 BoxedMailboxSender::new(self.clone())
888 }
889}
890
891pub trait IntoBoxedMailboxSender: MailboxSender {
893 fn into_boxed(self) -> BoxedMailboxSender;
895}
896impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
897 fn into_boxed(self) -> BoxedMailboxSender {
898 BoxedMailboxSender::new(self)
899 }
900}
901
902#[async_trait]
903impl MailboxSender for BoxedMailboxSender {
904 fn post_unchecked(
905 &self,
906 envelope: MessageEnvelope,
907 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
908 ) {
909 self.0.post_unchecked(envelope, return_handle);
910 }
911
912 async fn flush(&self) -> Result<(), anyhow::Error> {
913 self.0.flush().await
914 }
915}
916
917#[derive(thiserror::Error, Debug)]
919pub enum MailboxServerError {
920 #[error(transparent)]
922 Channel(#[from] ChannelError),
923
924 #[error(transparent)]
926 MailboxSender(#[from] MailboxSenderError),
927}
928
929#[derive(Debug)]
932pub struct MailboxServerHandle {
933 join_handle: JoinHandle<Result<(), MailboxServerError>>,
934 stopped_tx: watch::Sender<bool>,
935}
936
937impl MailboxServerHandle {
938 pub fn stop(&self, reason: &str) {
943 tracing::info!("stopping mailbox server; reason: {}", reason);
944 self.stopped_tx.send(true).expect("stop called twice");
945 }
946}
947
948impl Future for MailboxServerHandle {
950 type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
951
952 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
953 let join_handle_pinned =
955 unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
956 join_handle_pinned.poll(cx)
957 }
958}
959
960pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
963 fn serve(
967 self,
968 mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
969 ) -> MailboxServerHandle {
970 let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
975 let server = self.clone();
976 tokio::task::spawn(async move {
977 static NEXT_RANK: AtomicUsize = AtomicUsize::new(0);
979 let rank = NEXT_RANK.fetch_add(1, Ordering::Relaxed);
980 let addr = ChannelAddr::any(ChannelTransport::Local);
981 let proc_id = reference::ProcId::unique(addr, format!("mailbox_server_{}", rank));
982 let proc = Proc::configured(proc_id, BoxedMailboxSender::new(server));
985 let (client, _) = proc.instance("undeliverable_supervisor").unwrap();
986 while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
987 if let Ok(Undeliverable(e)) =
988 envelope.deserialized::<Undeliverable<MessageEnvelope>>()
989 {
990 UndeliverableMailboxSender.post(e, monitored_return_handle());
992 continue;
993 }
994 envelope.set_error(DeliveryError::BrokenLink(
995 "message was undeliverable".to_owned(),
996 ));
997 let return_port =
998 reference::PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
999 envelope.sender(),
1000 );
1001 return_port.send_serialized(
1002 &client,
1003 Flattrs::new(),
1004 wirevalue::Any::serialize(&Undeliverable(envelope)).unwrap(),
1005 );
1006 }
1007 });
1008
1009 let (stopped_tx, mut stopped_rx) = watch::channel(false);
1010 let join_handle = tokio::spawn(async move {
1011 let mut detached = false;
1012
1013 let result = loop {
1014 if *stopped_rx.borrow_and_update() {
1015 break Ok(());
1016 }
1017
1018 tokio::select! {
1019 message = rx.recv() => {
1020 match message {
1021 Ok(envelope) => self.post(envelope, return_handle.clone()),
1023
1024 Err(ChannelError::Closed) => break Ok(()),
1027 Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1028 }
1029 }
1030 result = stopped_rx.changed(), if !detached => {
1031 detached = result.is_err();
1032 if detached {
1033 tracing::debug!(
1034 "the mailbox server is detached for Rx {}", rx.addr()
1035 );
1036 } else {
1037 tracing::debug!(
1038 "the mailbox server is stopped for Rx {}", rx.addr()
1039 );
1040 }
1041 }
1042 }
1043 };
1044
1045 rx.join().await;
1048
1049 result
1050 });
1051
1052 MailboxServerHandle {
1053 join_handle,
1054 stopped_tx,
1055 }
1056 }
1057}
1058
1059impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1060
1061struct Buffer<T: Message> {
1062 queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
1063 #[allow(dead_code)]
1064 processed: watch::Receiver<usize>,
1065 seq: AtomicUsize,
1066}
1067
1068impl<T: Message> Buffer<T> {
1069 fn new<Fut>(
1070 process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
1071 ) -> Self
1072 where
1073 Fut: Future<Output = ()> + Send + 'static,
1074 {
1075 let (queue, mut next) = mpsc::unbounded_channel();
1076 let (last_processed, processed) = watch::channel(0);
1077 crate::init::get_runtime().spawn(async move {
1078 let mut seq = 0;
1079 while let Some((msg, return_handle)) = next.recv().await {
1080 process(msg, return_handle).await;
1081 seq += 1;
1082 let _ = last_processed.send(seq);
1083 }
1084 });
1085 Self {
1086 queue,
1087 processed,
1088 seq: AtomicUsize::new(0),
1089 }
1090 }
1091
1092 fn send(
1093 &self,
1094 item: (T, PortHandle<Undeliverable<T>>),
1095 ) -> Result<(), Box<mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>>> {
1096 self.seq.fetch_add(1, Ordering::SeqCst);
1097 self.queue.send(item).map_err(Box::new)?;
1098 Ok(())
1099 }
1100}
1101
1102pub struct MailboxClient {
1104 buffer: Buffer<MessageEnvelope>,
1106
1107 _tx_monitoring: CancellationToken,
1109
1110 submitted: Arc<AtomicUsize>,
1112 completed: Arc<AtomicUsize>,
1115 completed_notify: Arc<tokio::sync::Notify>,
1117}
1118
1119impl fmt::Debug for MailboxClient {
1120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1121 f.debug_struct("MailboxClient")
1122 .field("buffer", &"<Buffer>")
1123 .finish()
1124 }
1125}
1126
1127impl MailboxClient {
1128 pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1131 let addr = tx.addr();
1132 let tx = Arc::new(tx);
1133 let tx_status = tx.status().clone();
1134 let tx_monitoring = CancellationToken::new();
1135 let completed = Arc::new(AtomicUsize::new(0));
1136 let completed_notify = Arc::new(tokio::sync::Notify::new());
1137 let buffer = {
1138 let completed = completed.clone();
1139 let completed_notify = completed_notify.clone();
1140 Buffer::new(move |envelope, return_handle| {
1141 let tx = Arc::clone(&tx);
1142 let (return_channel, return_receiver) =
1143 oneshot::channel::<SendError<MessageEnvelope>>();
1144 let return_handle_0 = return_handle.clone();
1146 let completed = completed.clone();
1147 let completed_notify = completed_notify.clone();
1148 tokio::spawn(async move {
1149 match return_receiver.await {
1150 Ok(SendError {
1151 error,
1152 message,
1153 reason,
1154 }) => {
1155 message.undeliverable(
1156 DeliveryError::BrokenLink(format!(
1157 "failed to enqueue in MailboxClient when processing buffer: {error} with reason {reason:?}"
1158 )),
1159 return_handle_0,
1160 );
1161 }
1162 Err(_) => {
1163 }
1165 }
1166 completed.fetch_add(1, Ordering::SeqCst);
1167 completed_notify.notify_waiters();
1168 });
1169 tx.try_post(envelope, return_channel);
1171 future::ready(())
1172 })
1173 };
1174 let this = Self {
1175 buffer,
1176 _tx_monitoring: tx_monitoring.clone(),
1177 submitted: Arc::new(AtomicUsize::new(0)),
1178 completed,
1179 completed_notify,
1180 };
1181 Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1182 this
1183 }
1184
1185 pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1188 Ok(MailboxClient::new(channel::dial(addr)?))
1189 }
1190
1191 fn monitor_tx_health(
1193 mut rx: watch::Receiver<TxStatus>,
1194 cancel_token: CancellationToken,
1195 addr: ChannelAddr,
1196 ) {
1197 crate::init::get_runtime().spawn(async move {
1198 loop {
1199 tokio::select! {
1200 changed = rx.changed() => {
1201 if changed.is_err() || rx.borrow().is_closed() {
1202 let reason = rx.borrow().as_closed().map(|r| r.to_string()).unwrap_or_else(|| "unknown".to_string());
1203 tracing::warn!("connection to {} lost: {}", addr, reason);
1204 break;
1207 }
1208 }
1209 _ = cancel_token.cancelled() => {
1210 break;
1211 }
1212 }
1213 }
1214 });
1215 }
1216}
1217
1218#[async_trait]
1219impl MailboxSender for MailboxClient {
1220 #[tracing::instrument(level = "debug", skip_all)]
1221 fn post_unchecked(
1222 &self,
1223 envelope: MessageEnvelope,
1224 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1225 ) {
1226 tracing::event!(target:"messages", tracing::Level::TRACE, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.actor_id(), "port"= envelope.dest.index(), "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1227 if let Err(err) = self.buffer.send((envelope, return_handle)) {
1228 let mpsc::error::SendError((envelope, return_handle)) = *err;
1229 let err = DeliveryError::BrokenLink(
1230 "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
1231 );
1232
1233 envelope.undeliverable(err, return_handle);
1235 } else {
1236 self.submitted.fetch_add(1, Ordering::SeqCst);
1237 }
1238 }
1239
1240 async fn flush(&self) -> Result<(), anyhow::Error> {
1241 let target = self.submitted.load(Ordering::SeqCst);
1242 loop {
1243 if self.completed.load(Ordering::SeqCst) >= target {
1244 return Ok(());
1245 }
1246 self.completed_notify.notified().await;
1247 }
1248 }
1249}
1250
1251pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1253 cx: C,
1254 port: reference::PortRef<M>,
1255}
1256
1257impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1258 pub fn new(cx: C, port: reference::PortRef<M>) -> Self {
1260 Self { cx, port }
1261 }
1262}
1263
1264impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1265 type Error = MailboxSenderError;
1266
1267 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1268 Poll::Ready(Ok(()))
1269 }
1270
1271 fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1272 self.port.send(&self.cx, item)
1273 }
1274
1275 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1276 Poll::Ready(Ok(()))
1277 }
1278
1279 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1280 Poll::Ready(Ok(()))
1281 }
1282}
1283
1284#[derive(Clone, Debug)]
1287pub struct Mailbox {
1288 inner: Arc<State>,
1289}
1290
1291impl Mailbox {
1292 pub fn new(actor_id: reference::ActorId, forwarder: BoxedMailboxSender) -> Self {
1295 Self {
1296 inner: Arc::new(State::new(actor_id, forwarder)),
1297 }
1298 }
1299
1300 pub fn new_detached(actor_id: reference::ActorId) -> Self {
1302 Self {
1303 inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1304 }
1305 }
1306
1307 pub fn actor_id(&self) -> &reference::ActorId {
1309 &self.inner.actor_id
1310 }
1311
1312 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1317 let port_index = self.inner.allocate_port();
1318 let (sender, receiver) = mpsc::unbounded_channel::<M>();
1319 let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1320 tracing::trace!(
1321 name = "open_port",
1322 "opening port for {} at {}",
1323 self.inner.actor_id,
1324 port_id
1325 );
1326 (
1327 PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1328 PortReceiver::new(receiver, port_id, false, self.clone()),
1329 )
1330 }
1331
1332 pub(crate) fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1339 let (handle, receiver) = self.open_port();
1340 handle.bind_actor_port();
1341 (handle, receiver)
1342 }
1343
1344 pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1347 where
1348 A: Accumulator + Send + Sync + 'static,
1349 A::Update: Message,
1350 A::State: Message + Default + Clone,
1351 {
1352 self.open_accum_port_opts(accum, StreamingReducerOpts::default())
1353 }
1354
1355 pub fn open_accum_port_opts<A>(
1363 &self,
1364 accum: A,
1365 streaming_opts: StreamingReducerOpts,
1366 ) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1367 where
1368 A: Accumulator + Send + Sync + 'static,
1369 A::Update: Message,
1370 A::State: Message + Default + Clone,
1371 {
1372 let port_index = self.inner.allocate_port();
1373 let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1374 let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1375 let state = Mutex::new(A::State::default());
1376 let reducer_spec = accum.reducer_spec();
1377 let enqueue = move |_, update: A::Update| {
1378 let mut state = state.lock().unwrap();
1379 accum.accumulate(&mut state, update)?;
1380 let _ = sender.send(state.clone());
1381 Ok(())
1382 };
1383 (
1384 PortHandle {
1385 mailbox: self.clone(),
1386 port_index,
1387 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1388 bound: Arc::new(RwLock::new(None)),
1389 reducer_spec,
1390 streaming_opts,
1391 },
1392 PortReceiver::new(receiver, port_id, true, self.clone()),
1393 )
1394 }
1395
1396 pub(crate) fn open_enqueue_port<M: Message>(
1400 &self,
1401 enqueue: impl Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1402 ) -> PortHandle<M> {
1403 PortHandle {
1404 mailbox: self.clone(),
1405 port_index: self.inner.allocate_port(),
1406 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1407 bound: Arc::new(RwLock::new(None)),
1408 reducer_spec: None,
1409 streaming_opts: StreamingReducerOpts::default(),
1410 }
1411 }
1412
1413 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1417 let port_index = self.inner.allocate_port();
1418 let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1419 let (sender, receiver) = oneshot::channel::<M>();
1420 (
1421 OncePortHandle {
1422 mailbox: self.clone(),
1423 port_index,
1424 port_id: port_id.clone(),
1425 sender,
1426 reducer_spec: None,
1427 },
1428 OncePortReceiver {
1429 receiver: Some(receiver),
1430 port_id,
1431 mailbox: self.clone(),
1432 },
1433 )
1434 }
1435
1436 pub fn open_reduce_port<A, T>(
1450 &self,
1451 accum: A,
1452 ) -> (OncePortHandle<A::State>, OncePortReceiver<A::State>)
1453 where
1454 A: Accumulator<State = T, Update = T> + Send + Sync + 'static,
1455 T: Message + Default + Clone,
1456 {
1457 let port_index = self.inner.allocate_port();
1458 let (sender, receiver) = oneshot::channel::<T>();
1459 let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1460 let reducer_spec = accum.reducer_spec();
1461 assert!(
1462 reducer_spec.is_some(),
1463 "cannot use a reduce port without a ReducerSpec"
1464 );
1465
1466 (
1467 OncePortHandle {
1468 mailbox: self.clone(),
1469 port_index,
1470 port_id: port_id.clone(),
1471 sender,
1472 reducer_spec,
1473 },
1474 OncePortReceiver {
1475 receiver: Some(receiver),
1476 port_id,
1477 mailbox: self.clone(),
1478 },
1479 )
1480 }
1481
1482 #[allow(dead_code)]
1483 fn error(&self, err: MailboxErrorKind) -> MailboxError {
1484 MailboxError::new(self.inner.actor_id.clone(), err)
1485 }
1486
1487 fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1488 let port_index = M::port();
1489 self.inner.ports.get(&port_index).and_then(|boxed| {
1490 boxed
1491 .as_any()
1492 .downcast_ref::<UnboundedSender<M>>()
1493 .map(|s| {
1494 assert_eq!(
1495 s.port_id,
1496 self.actor_id().port_id(port_index),
1497 "port_id mismatch in downcasted UnboundedSender"
1498 );
1499 s.sender.clone()
1500 })
1501 })
1502 }
1503
1504 pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1506 self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1507 .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1508 }
1509
1510 pub(crate) fn allocate_port(&self) -> u64 {
1511 self.inner.allocate_port()
1512 }
1513
1514 fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> reference::PortRef<M> {
1515 assert_eq!(
1516 handle.mailbox.actor_id(),
1517 self.actor_id(),
1518 "port does not belong to mailbox"
1519 );
1520
1521 let port_id = self.actor_id().port_id(handle.port_index);
1524 match self.inner.ports.entry(handle.port_index) {
1525 Entry::Vacant(entry) => {
1526 entry.insert(Box::new(UnboundedSender::new(
1527 handle.sender.clone(),
1528 port_id.clone(),
1529 )));
1530 }
1531 Entry::Occupied(_entry) => {}
1532 }
1533
1534 reference::PortRef::attest(port_id)
1535 }
1536
1537 fn bind_to_actor_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
1538 assert_eq!(
1539 handle.mailbox.actor_id(),
1540 self.actor_id(),
1541 "port does not belong to mailbox"
1542 );
1543
1544 let port_index = M::port();
1545 let port_id = self.actor_id().port_id(port_index);
1546 match self.inner.ports.entry(port_index) {
1547 Entry::Vacant(entry) => {
1548 entry.insert(Box::new(UnboundedSender::new(
1549 handle.sender.clone(),
1550 port_id,
1551 )));
1552 }
1553 Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1554 }
1555 }
1556
1557 fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1558 let port_id = handle.port_id().clone();
1559 match self.inner.ports.entry(handle.port_index) {
1560 Entry::Vacant(entry) => {
1561 entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1562 }
1563 Entry::Occupied(_entry) => {}
1564 }
1565 }
1566
1567 pub(crate) fn bind_untyped(&self, port_id: &reference::PortId, sender: UntypedUnboundedSender) {
1568 assert_eq!(
1569 port_id.actor_id(),
1570 self.actor_id(),
1571 "port does not belong to mailbox"
1572 );
1573
1574 match self.inner.ports.entry(port_id.index()) {
1575 Entry::Vacant(entry) => {
1576 entry.insert(Box::new(sender));
1577 }
1578 Entry::Occupied(_entry) => {}
1579 }
1580 }
1581
1582 pub(crate) fn close(&self, status: ActorStatus) {
1583 let mut closed = self.inner.closed.write().unwrap();
1584 if closed.is_some() {
1585 panic!("mailbox with owner {} already closed", self.actor_id());
1586 }
1587 let _ = closed.insert(status);
1588 }
1589}
1590
1591impl context::Mailbox for Mailbox {
1592 fn mailbox(&self) -> &Mailbox {
1593 self
1594 }
1595}
1596
1597pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1602 cx.mailbox().open_port()
1603}
1604
1605pub fn open_once_port<M: Message>(
1608 cx: &impl context::Mailbox,
1609) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1610 cx.mailbox().open_once_port()
1611}
1612
1613#[async_trait]
1614impl MailboxSender for Mailbox {
1615 fn post_unchecked(
1618 &self,
1619 envelope: MessageEnvelope,
1620 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1621 ) {
1622 metrics::MAILBOX_POSTS.add(
1623 1,
1624 hyperactor_telemetry::kv_pairs!(
1625 "actor_id" => envelope.sender.to_string(),
1626 "dest_actor_id" => envelope.dest.actor_id().to_string(),
1627 ),
1628 );
1629 tracing::trace!(
1630 name = "post",
1631 actor_name = envelope.sender.name(),
1632 actor_id = envelope.sender.to_string(),
1633 "posting message to {}",
1634 envelope.dest
1635 );
1636
1637 if envelope.dest().actor_id() != &self.inner.actor_id {
1638 return self.inner.forwarder.post(envelope, return_handle);
1639 }
1640
1641 match self.inner.ports.entry(envelope.dest().index()) {
1642 Entry::Vacant(_) => {
1643 let err = DeliveryError::Unroutable(format!(
1644 "port not bound in mailbox; port id: {}; message type: {}",
1645 envelope.dest().index(),
1646 envelope.data().typename().map_or_else(
1647 || format!("unregistered type hash {}", envelope.data().typehash()),
1648 |s| s.to_string(),
1649 )
1650 ));
1651
1652 envelope.undeliverable(err, return_handle);
1653 }
1654 Entry::Occupied(entry) => {
1655 let closed = self.inner.closed.read().unwrap();
1656 if let Some(status) = &*closed {
1657 match status {
1658 ActorStatus::Stopped(reason) => {
1659 let err = format!(
1660 "mailbox owner {} is stopped: {}",
1661 self.inner.actor_id, reason
1662 );
1663 return envelope
1664 .undeliverable(DeliveryError::Mailbox(err), return_handle);
1665 }
1666 ActorStatus::Failed(actor_error) => {
1667 let err = format!(
1668 "mailbox owner {} failed: {}",
1669 self.inner.actor_id, actor_error
1670 );
1671 return envelope
1672 .undeliverable(DeliveryError::Mailbox(err), return_handle);
1673 }
1674 _ => {
1675 let err = format!(
1676 "mailbox owner {} closed unexpectedly: {:?}",
1677 self.inner.actor_id, status
1678 );
1679 return envelope
1680 .undeliverable(DeliveryError::Mailbox(err), return_handle);
1681 }
1682 }
1683 }
1684
1685 let (metadata, data) = envelope.open();
1686 let MessageMetadata {
1687 mut headers,
1688 sender,
1689 dest,
1690 errors: metadata_errors,
1691 ttl,
1692 return_undeliverable,
1693 } = metadata;
1694
1695 let to_actor_id = hash_to_u64(&dest);
1696 let message_id = hyperactor_telemetry::generate_message_id(to_actor_id);
1697 headers.set(crate::mailbox::headers::TELEMETRY_MESSAGE_ID, message_id);
1698 if !headers.contains_key(crate::mailbox::headers::SENDER_ACTOR_ID_HASH) {
1701 headers.set(
1702 crate::mailbox::headers::SENDER_ACTOR_ID_HASH,
1703 hash_to_u64(&sender),
1704 );
1705 }
1706 headers.set(crate::mailbox::headers::TELEMETRY_PORT_ID, dest.index());
1707
1708 match entry.get().send_serialized(headers, data) {
1716 Ok(false) => {
1717 hyperactor_telemetry::notify_message_status(
1718 hyperactor_telemetry::MessageStatusEvent {
1719 timestamp: std::time::SystemTime::now(),
1720 id: hyperactor_telemetry::generate_status_event_id(message_id),
1721 message_id,
1722 status: "queued".to_string(),
1723 },
1724 );
1725 entry.remove();
1726 }
1727 Ok(true) => {
1728 hyperactor_telemetry::notify_message_status(
1729 hyperactor_telemetry::MessageStatusEvent {
1730 timestamp: std::time::SystemTime::now(),
1731 id: hyperactor_telemetry::generate_status_event_id(message_id),
1732 message_id,
1733 status: "queued".to_string(),
1734 },
1735 );
1736 }
1737 Err(SerializedSenderError {
1738 data,
1739 error: sender_error,
1740 headers,
1741 }) => {
1742 entry.remove();
1743 let err = DeliveryError::Mailbox(format!("{}", sender_error));
1744
1745 MessageEnvelope::seal(
1746 MessageMetadata {
1747 headers,
1748 sender,
1749 dest,
1750 errors: metadata_errors,
1751 ttl,
1752 return_undeliverable,
1753 },
1754 data,
1755 )
1756 .undeliverable(err, return_handle)
1757 }
1758 }
1759 }
1760 }
1761 }
1762}
1763
1764#[derive(Debug)]
1772pub struct PortHandle<M: Message> {
1773 mailbox: Mailbox,
1774 port_index: u64,
1775 sender: UnboundedPortSender<M>,
1776 bound: Arc<RwLock<Option<reference::PortId>>>,
1783 reducer_spec: Option<ReducerSpec>,
1786 streaming_opts: StreamingReducerOpts,
1788}
1789
1790impl<M: Message> PortHandle<M> {
1791 fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1792 Self {
1793 mailbox,
1794 port_index,
1795 sender,
1796 bound: Arc::new(RwLock::new(None)),
1797 reducer_spec: None,
1798 streaming_opts: StreamingReducerOpts::default(),
1799 }
1800 }
1801
1802 pub(crate) fn location(&self) -> PortLocation {
1803 match self.bound.read().unwrap().as_ref() {
1804 Some(port_id) => PortLocation::Bound(port_id.clone()),
1805 None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1806 }
1807 }
1808
1809 pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1811 let closed = self.mailbox.inner.closed.read().unwrap();
1812
1813 if let Some(status) = &*closed {
1814 let err = MailboxError {
1815 actor_id: self.mailbox.actor_id().clone(),
1816 kind: MailboxErrorKind::OwnerTerminated(status.clone()),
1817 };
1818 return Err(MailboxSenderError::new_unbound::<M>(
1819 self.mailbox.actor_id().clone(),
1820 MailboxSenderErrorKind::Mailbox(err),
1821 ));
1822 }
1823
1824 let mut headers = Flattrs::new();
1825
1826 crate::mailbox::headers::set_send_timestamp(&mut headers);
1827 crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1828 let bound_guard = self.bound.read().unwrap();
1832 if let Some(bound_port) = bound_guard.as_ref() {
1833 let sequencer = cx.instance().sequencer();
1834 let seq_info = sequencer.assign_seq(bound_port);
1835 headers.set(SEQ_INFO, seq_info);
1836 } else {
1837 headers.set(SEQ_INFO, SeqInfo::Direct);
1842 }
1843 self.sender.send(headers, message).map_err(|err| {
1855 MailboxSenderError::new_unbound::<M>(
1856 self.mailbox.actor_id().clone(),
1857 MailboxSenderErrorKind::Other(err),
1858 )
1859 })
1860 }
1861
1862 pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
1865 where
1866 R: Message,
1867 F: Fn(R) -> M + Send + Sync + 'static,
1868 {
1869 let port_index = self.mailbox.inner.allocate_port();
1870 let sender = self.sender.clone();
1871 PortHandle::new(
1872 self.mailbox.clone(),
1873 port_index,
1874 UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
1875 sender.send(headers, unmap(value))
1876 })),
1877 )
1878 }
1879}
1880
1881impl<M: RemoteMessage> PortHandle<M> {
1882 pub fn bind(&self) -> reference::PortRef<M> {
1884 let port_id = {
1885 let mut guard = self.bound.write().unwrap();
1886 guard
1887 .get_or_insert_with(|| self.mailbox.bind(self).port_id().clone())
1888 .clone()
1889 };
1890 reference::PortRef::attest_reducible(
1891 port_id,
1892 self.reducer_spec.clone(),
1893 self.streaming_opts.clone(),
1894 )
1895 }
1896
1897 pub(crate) fn bind_actor_port(&self) {
1903 let port_id = self.mailbox.actor_id().port_id(M::port());
1904 {
1905 let mut guard = self.bound.write().unwrap();
1906 if guard.is_some() {
1907 panic!(
1908 "could not bind port handle {} as {port_id}: already bound",
1909 self.port_index
1910 );
1911 }
1912 *guard = Some(port_id);
1913 }
1914 self.mailbox.bind_to_actor_port(self);
1915 }
1916}
1917
1918impl<M: Message> Clone for PortHandle<M> {
1919 fn clone(&self) -> Self {
1920 Self {
1921 mailbox: self.mailbox.clone(),
1922 port_index: self.port_index,
1923 sender: self.sender.clone(),
1924 bound: self.bound.clone(),
1925 reducer_spec: self.reducer_spec.clone(),
1926 streaming_opts: self.streaming_opts.clone(),
1927 }
1928 }
1929}
1930
1931impl<M: Message> fmt::Display for PortHandle<M> {
1932 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1933 fmt::Display::fmt(&self.location(), f)
1934 }
1935}
1936
1937#[derive(Debug)]
1939pub struct OncePortHandle<M: Message> {
1940 mailbox: Mailbox,
1941 port_index: u64,
1942 port_id: reference::PortId,
1943 sender: oneshot::Sender<M>,
1944 reducer_spec: Option<ReducerSpec>,
1945}
1946
1947impl<M: Message> OncePortHandle<M> {
1948 pub fn port_id(&self) -> &reference::PortId {
1951 &self.port_id
1952 }
1953
1954 pub fn send(self, _cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1957 assert!(
1960 !self.port_id().is_actor_port(),
1961 "OncePortHandle currently does not support actor ports; a \
1962 prerequisite of that support is to assign seq to messages \
1963 if the port is actor port."
1964 );
1965
1966 let actor_id = self.mailbox.actor_id().clone();
1967 self.sender.send(message).map_err(|_| {
1968 MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1973 })?;
1974 Ok(())
1975 }
1976}
1977
1978impl<M: RemoteMessage> OncePortHandle<M> {
1979 pub fn bind(self) -> reference::OncePortRef<M> {
1984 let port_id = self.port_id().clone();
1985 let reducer_spec = self.reducer_spec.clone();
1986 self.mailbox.clone().bind_once(self);
1987 reference::OncePortRef::attest_reducible(port_id, reducer_spec)
1988 }
1989}
1990
1991impl<M: Message> fmt::Display for OncePortHandle<M> {
1992 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1993 fmt::Display::fmt(&self.port_id(), f)
1994 }
1995}
1996
1997#[derive(Debug)]
2000pub struct PortReceiver<M> {
2001 receiver: mpsc::UnboundedReceiver<M>,
2002 port_id: reference::PortId,
2003 coalesce: bool,
2006 mailbox: Mailbox,
2009}
2010
2011impl<M> PortReceiver<M> {
2012 fn new(
2013 receiver: mpsc::UnboundedReceiver<M>,
2014 port_id: reference::PortId,
2015 coalesce: bool,
2016 mailbox: Mailbox,
2017 ) -> Self {
2018 Self {
2019 receiver,
2020 port_id,
2021 coalesce,
2022 mailbox,
2023 }
2024 }
2025
2026 #[allow(clippy::result_large_err)] pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
2031 let mut next = self.receiver.try_recv();
2032 if self.coalesce
2034 && let Some(latest) = self.drain().pop()
2035 {
2036 next = Ok(latest);
2037 }
2038 match next {
2039 Ok(msg) => Ok(Some(msg)),
2040 Err(mpsc::error::TryRecvError::Empty) => Ok(None),
2041 Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
2042 self.actor_id().clone(),
2043 MailboxErrorKind::Closed,
2044 )),
2045 }
2046 }
2047
2048 pub async fn recv(&mut self) -> Result<M, MailboxError> {
2051 let mut next = self.receiver.recv().await;
2052 if self.coalesce
2055 && let Some(latest) = self.drain().pop()
2056 {
2057 next = Some(latest);
2058 }
2059 next.ok_or(MailboxError::new(
2060 self.actor_id().clone(),
2061 MailboxErrorKind::Closed,
2062 ))
2063 }
2064
2065 pub fn drain(&mut self) -> Vec<M> {
2067 let mut drained: Vec<M> = Vec::new();
2068 while let Ok(msg) = self.receiver.try_recv() {
2069 if self.coalesce {
2071 drained.pop();
2072 }
2073 drained.push(msg);
2074 }
2075 drained
2076 }
2077
2078 fn port(&self) -> u64 {
2079 self.port_id.index()
2080 }
2081
2082 fn actor_id(&self) -> &reference::ActorId {
2083 self.port_id.actor_id()
2084 }
2085}
2086
2087impl<M> Drop for PortReceiver<M> {
2088 fn drop(&mut self) {
2089 self.mailbox.inner.ports.remove(&self.port());
2093 }
2094}
2095
2096impl<M> Stream for PortReceiver<M> {
2097 type Item = Result<M, MailboxError>;
2098
2099 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2100 std::pin::pin!(self.recv()).poll(cx).map(Some)
2101 }
2102}
2103
2104pub struct OncePortReceiver<M> {
2106 receiver: Option<oneshot::Receiver<M>>,
2107 port_id: reference::PortId,
2108
2109 mailbox: Mailbox,
2112}
2113
2114impl<M> OncePortReceiver<M> {
2115 pub async fn recv(mut self) -> Result<M, MailboxError> {
2119 std::mem::take(&mut self.receiver)
2120 .unwrap()
2121 .await
2122 .map_err(|err| {
2123 MailboxError::new(
2124 self.actor_id().clone(),
2125 MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
2126 )
2127 })
2128 }
2129
2130 fn port(&self) -> u64 {
2131 self.port_id.index()
2132 }
2133
2134 fn actor_id(&self) -> &reference::ActorId {
2135 self.port_id.actor_id()
2136 }
2137}
2138
2139impl<M> Drop for OncePortReceiver<M> {
2140 fn drop(&mut self) {
2141 self.mailbox.inner.ports.remove(&self.port());
2145 }
2146}
2147
2148pub struct SerializedSenderError {
2150 pub headers: Flattrs,
2152 pub data: wirevalue::Any,
2154 pub error: MailboxSenderError,
2156}
2157
2158trait SerializedSender: Send + Sync {
2163 fn as_any(&self) -> &dyn Any;
2169
2170 #[allow(clippy::result_large_err)] fn send_serialized(
2178 &self,
2179 headers: Flattrs,
2180 serialized: wirevalue::Any,
2181 ) -> Result<bool, SerializedSenderError>;
2182}
2183
2184enum UnboundedPortSender<M: Message> {
2186 Mpsc(mpsc::UnboundedSender<M>),
2188 Func(Arc<dyn Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
2190}
2191
2192impl<M: Message> UnboundedPortSender<M> {
2193 fn send(&self, headers: Flattrs, message: M) -> Result<(), anyhow::Error> {
2194 match self {
2195 Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
2196 Self::Func(func) => func(headers, message),
2197 }
2198 }
2199}
2200
2201impl<M: Message> Clone for UnboundedPortSender<M> {
2204 fn clone(&self) -> Self {
2205 match self {
2206 Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
2207 Self::Func(func) => Self::Func(func.clone()),
2208 }
2209 }
2210}
2211
2212impl<M: Message> Debug for UnboundedPortSender<M> {
2213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2214 match self {
2215 Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
2216 Self::Func(_) => f
2217 .debug_tuple("UnboundedPortSender::Func")
2218 .field(&"..")
2219 .finish(),
2220 }
2221 }
2222}
2223
2224struct UnboundedSender<M: Message> {
2225 sender: UnboundedPortSender<M>,
2226 port_id: reference::PortId,
2227}
2228
2229impl<M: Message> UnboundedSender<M> {
2230 fn new(sender: UnboundedPortSender<M>, port_id: reference::PortId) -> Self {
2233 Self { sender, port_id }
2234 }
2235
2236 #[allow(dead_code)]
2237 fn send(&self, headers: Flattrs, message: M) -> Result<(), MailboxSenderError> {
2238 self.sender.send(headers, message).map_err(|err| {
2239 MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
2240 })
2241 }
2242}
2243
2244impl<M: Message> Clone for UnboundedSender<M> {
2248 fn clone(&self) -> Self {
2249 Self {
2250 sender: self.sender.clone(),
2251 port_id: self.port_id.clone(),
2252 }
2253 }
2254}
2255
2256impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
2257 fn as_any(&self) -> &dyn Any {
2258 self
2259 }
2260
2261 fn send_serialized(
2262 &self,
2263 headers: Flattrs,
2264 serialized: wirevalue::Any,
2265 ) -> Result<bool, SerializedSenderError> {
2266 match serialized.deserialized_unchecked() {
2272 Ok(message) => {
2273 self.sender.send(headers.clone(), message).map_err(|err| {
2274 SerializedSenderError {
2275 data: serialized,
2276 error: MailboxSenderError::new_bound(
2277 self.port_id.clone(),
2278 MailboxSenderErrorKind::Other(err),
2279 ),
2280 headers,
2281 }
2282 })?;
2283
2284 Ok(true)
2285 }
2286 Err(err) => Err(SerializedSenderError {
2287 data: serialized,
2288 error: MailboxSenderError::new_bound(
2289 self.port_id.clone(),
2290 MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2291 ),
2292 headers,
2293 }),
2294 }
2295 }
2296}
2297
2298#[derive(Debug)]
2301struct OnceSender<M: Message> {
2302 sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
2303 port_id: reference::PortId,
2304}
2305
2306impl<M: Message> OnceSender<M> {
2307 fn new(sender: oneshot::Sender<M>, port_id: reference::PortId) -> Self {
2310 Self {
2311 sender: Arc::new(Mutex::new(Some(sender))),
2312 port_id,
2313 }
2314 }
2315
2316 fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2317 match self.sender.lock().unwrap().take() {
2319 None => Err(MailboxSenderError::new_bound(
2320 self.port_id.clone(),
2321 MailboxSenderErrorKind::Closed,
2322 )),
2323 Some(sender) => {
2324 sender.send(message).map_err(|_| {
2325 MailboxSenderError::new_bound(
2330 self.port_id.clone(),
2331 MailboxSenderErrorKind::Closed,
2332 )
2333 })?;
2334 Ok(false)
2335 }
2336 }
2337 }
2338}
2339
2340impl<M: Message> Clone for OnceSender<M> {
2344 fn clone(&self) -> Self {
2345 Self {
2346 sender: self.sender.clone(),
2347 port_id: self.port_id.clone(),
2348 }
2349 }
2350}
2351
2352impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2353 fn as_any(&self) -> &dyn Any {
2354 self
2355 }
2356
2357 fn send_serialized(
2358 &self,
2359 headers: Flattrs,
2360 serialized: wirevalue::Any,
2361 ) -> Result<bool, SerializedSenderError> {
2362 match serialized.deserialized() {
2363 Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2364 data: serialized,
2365 error: e,
2366 headers,
2367 }),
2368 Err(err) => Err(SerializedSenderError {
2369 data: serialized,
2370 error: MailboxSenderError::new_bound(
2371 self.port_id.clone(),
2372 MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2373 ),
2374 headers,
2375 }),
2376 }
2377 }
2378}
2379
2380pub(crate) struct UntypedUnboundedSender {
2382 pub(crate) sender:
2383 Box<dyn Fn(wirevalue::Any) -> Result<bool, (wirevalue::Any, anyhow::Error)> + Send + Sync>,
2384 pub(crate) port_id: reference::PortId,
2385}
2386
2387impl SerializedSender for UntypedUnboundedSender {
2388 fn as_any(&self) -> &dyn Any {
2389 self
2390 }
2391
2392 fn send_serialized(
2393 &self,
2394 headers: Flattrs,
2395 serialized: wirevalue::Any,
2396 ) -> Result<bool, SerializedSenderError> {
2397 (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2398 data,
2399 error: MailboxSenderError::new_bound(
2400 self.port_id.clone(),
2401 MailboxSenderErrorKind::Other(err),
2402 ),
2403 headers,
2404 })
2405 }
2406}
2407
2408struct State {
2410 actor_id: reference::ActorId,
2412
2413 ports: DashMap<u64, Box<dyn SerializedSender>>,
2417
2418 next_port: AtomicU64,
2420
2421 forwarder: BoxedMailboxSender,
2423
2424 closed: RwLock<Option<ActorStatus>>,
2427}
2428
2429impl State {
2430 fn new(actor_id: reference::ActorId, forwarder: BoxedMailboxSender) -> Self {
2432 Self {
2433 actor_id,
2434 ports: DashMap::new(),
2435 next_port: AtomicU64::new(USER_PORT_OFFSET),
2438 forwarder,
2439 closed: RwLock::new(None),
2440 }
2441 }
2442
2443 fn allocate_port(&self) -> u64 {
2445 self.next_port.fetch_add(1, Ordering::SeqCst)
2446 }
2447}
2448
2449impl fmt::Debug for State {
2450 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2451 f.debug_struct("State")
2452 .field("actor_id", &self.actor_id)
2453 .field(
2454 "open_ports",
2455 &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2456 )
2457 .field("next_port", &self.next_port)
2458 .finish()
2459 }
2460}
2461
2462#[derive(Clone)]
2466pub struct MailboxMuxer {
2467 mailboxes: Arc<DashMap<reference::ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2468}
2469
2470impl Default for MailboxMuxer {
2471 fn default() -> Self {
2472 Self::new()
2473 }
2474}
2475
2476impl MailboxMuxer {
2477 pub fn new() -> Self {
2479 Self {
2480 mailboxes: Arc::new(DashMap::new()),
2481 }
2482 }
2483
2484 pub fn bind(&self, actor_id: reference::ActorId, sender: impl MailboxSender + 'static) -> bool {
2489 match self.mailboxes.entry(actor_id) {
2490 Entry::Occupied(_) => false,
2491 Entry::Vacant(entry) => {
2492 entry.insert(Box::new(sender));
2493 true
2494 }
2495 }
2496 }
2497
2498 pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2500 self.bind(mailbox.actor_id().clone(), mailbox)
2501 }
2502
2503 #[allow(dead_code)]
2507 pub(crate) fn unbind(&self, actor_id: &reference::ActorId) {
2508 self.mailboxes.remove(actor_id);
2509 }
2510
2511 pub fn bound_actors(&self) -> Vec<reference::ActorId> {
2513 self.mailboxes.iter().map(|e| e.key().clone()).collect()
2514 }
2515}
2516
2517#[async_trait]
2518impl MailboxSender for MailboxMuxer {
2519 fn post_unchecked(
2520 &self,
2521 envelope: MessageEnvelope,
2522 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2523 ) {
2524 let dest_actor_id = envelope.dest().actor_id();
2525 match self.mailboxes.get(dest_actor_id) {
2526 None => {
2527 let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2528 envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2529 }
2530 Some(sender) => sender.post(envelope, return_handle),
2531 }
2532 }
2533
2534 async fn flush(&self) -> Result<(), anyhow::Error> {
2535 let keys: Vec<_> = self
2536 .mailboxes
2537 .iter()
2538 .map(|entry| entry.key().clone())
2539 .collect();
2540 for key in keys {
2541 if let Some(sender) = self.mailboxes.get(&key) {
2542 sender.value().flush().await?;
2543 }
2544 }
2545 Ok(())
2546 }
2547}
2548
2549#[derive(Clone)]
2552pub struct MailboxRouter {
2553 entries: Arc<RwLock<BTreeMap<reference::Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2554}
2555
2556impl Default for MailboxRouter {
2557 fn default() -> Self {
2558 Self::new()
2559 }
2560}
2561
2562impl MailboxRouter {
2563 pub fn new() -> Self {
2565 Self {
2566 entries: Arc::new(RwLock::new(BTreeMap::new())),
2567 }
2568 }
2569
2570 pub fn downgrade(&self) -> WeakMailboxRouter {
2572 WeakMailboxRouter(Arc::downgrade(&self.entries))
2573 }
2574
2575 pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2579 FallbackMailboxRouter {
2580 router: self.clone(),
2581 default,
2582 }
2583 }
2584
2585 pub fn bind(&self, dest: reference::Reference, sender: impl MailboxSender + 'static) {
2589 let mut w = self.entries.write().unwrap();
2590 w.insert(dest, Arc::new(sender));
2591 }
2592
2593 fn sender(
2594 &self,
2595 actor_id: &reference::ActorId,
2596 ) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2597 match self
2598 .entries
2599 .read()
2600 .unwrap()
2601 .lower_bound(Excluded(&actor_id.clone().into()))
2602 .prev()
2603 {
2604 None => None,
2605 Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2606 Some(sender.clone())
2607 }
2608 Some(_) => None,
2609 }
2610 }
2611}
2612
2613#[async_trait]
2614impl MailboxSender for MailboxRouter {
2615 fn post_unchecked(
2616 &self,
2617 envelope: MessageEnvelope,
2618 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2619 ) {
2620 match self.sender(envelope.dest().actor_id()) {
2621 None => envelope.undeliverable(
2622 DeliveryError::Unroutable(
2623 "no destination found for actor in routing table".to_string(),
2624 ),
2625 return_handle,
2626 ),
2627 Some(sender) => sender.post(envelope, return_handle),
2628 }
2629 }
2630
2631 async fn flush(&self) -> Result<(), anyhow::Error> {
2632 let senders: Vec<_> = self.entries.read().unwrap().values().cloned().collect();
2633 let futs: Vec<_> = senders.iter().map(|s| s.flush()).collect();
2634 futures::future::try_join_all(futs).await?;
2635 Ok(())
2636 }
2637}
2638
2639#[derive(Clone)]
2640struct FallbackMailboxRouter {
2641 router: MailboxRouter,
2642 default: BoxedMailboxSender,
2643}
2644
2645#[async_trait]
2646impl MailboxSender for FallbackMailboxRouter {
2647 fn post_unchecked(
2648 &self,
2649 envelope: MessageEnvelope,
2650 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2651 ) {
2652 match self.router.sender(envelope.dest().actor_id()) {
2653 Some(sender) => sender.post(envelope, return_handle),
2654 None => self.default.post(envelope, return_handle),
2655 }
2656 }
2657
2658 async fn flush(&self) -> Result<(), anyhow::Error> {
2659 let (r1, r2) = futures::future::join(self.router.flush(), self.default.flush()).await;
2660 r1?;
2661 r2?;
2662 Ok(())
2663 }
2664}
2665
2666#[derive(Debug, Clone)]
2675pub struct WeakMailboxRouter(
2676 Weak<RwLock<BTreeMap<reference::Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2677);
2678
2679impl WeakMailboxRouter {
2680 pub fn upgrade(&self) -> Option<MailboxRouter> {
2682 self.0.upgrade().map(|entries| MailboxRouter { entries })
2683 }
2684}
2685
2686#[async_trait]
2687impl MailboxSender for WeakMailboxRouter {
2688 fn post_unchecked(
2689 &self,
2690 envelope: MessageEnvelope,
2691 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2692 ) {
2693 match self.upgrade() {
2694 Some(router) => router.post(envelope, return_handle),
2695 None => envelope.undeliverable(
2696 DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2697 return_handle,
2698 ),
2699 }
2700 }
2701
2702 async fn flush(&self) -> Result<(), anyhow::Error> {
2703 match self.upgrade() {
2704 Some(router) => router.flush().await,
2705 None => Ok(()),
2706 }
2707 }
2708}
2709
2710#[derive(Clone)]
2724pub struct DialMailboxRouter {
2725 address_book: Arc<RwLock<BTreeMap<reference::Reference, ChannelAddr>>>,
2726 sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2727
2728 default: BoxedMailboxSender,
2731
2732 direct_addressed_remote_only: bool,
2735}
2736
2737impl Default for DialMailboxRouter {
2738 fn default() -> Self {
2739 Self::new()
2740 }
2741}
2742
2743impl DialMailboxRouter {
2744 pub fn new() -> Self {
2746 Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2747 }
2748
2749 pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2754 Self {
2755 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2756 sender_cache: Arc::new(DashMap::new()),
2757 default,
2758 direct_addressed_remote_only: false,
2759 }
2760 }
2761
2762 pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
2767 Self {
2768 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2769 sender_cache: Arc::new(DashMap::new()),
2770 default,
2771 direct_addressed_remote_only: true,
2772 }
2773 }
2774
2775 pub fn bind(&self, dest: reference::Reference, addr: ChannelAddr) {
2781 if let Ok(mut w) = self.address_book.write() {
2782 if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
2783 && old_addr != addr
2784 {
2785 tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2786 self.sender_cache.remove(&old_addr);
2787 }
2788 } else {
2789 tracing::error!("address book poisoned during bind of {:?}", dest);
2790 }
2791 }
2792
2793 pub fn unbind(&self, dest: &reference::Reference) {
2799 if let Ok(mut w) = self.address_book.write() {
2800 let to_remove: Vec<(reference::Reference, ChannelAddr)> = w
2801 .range(dest..)
2802 .take_while(|(key, _)| dest.is_prefix_of(key))
2803 .map(|(key, addr)| (key.clone(), addr.clone()))
2804 .collect();
2805
2806 for (key, addr) in to_remove {
2807 tracing::info!("unbinding {:?} from {:?}", key, addr);
2808 w.remove(&key);
2809 self.sender_cache.remove(&addr);
2810 }
2811 } else {
2812 tracing::error!("address book poisoned during unbind of {:?}", dest);
2813 }
2814 }
2815
2816 pub fn lookup_addr(&self, actor_id: &reference::ActorId) -> Option<ChannelAddr> {
2818 let address_book = self.address_book.read().unwrap();
2819 let found = address_book
2820 .lower_bound(Excluded(&actor_id.clone().into()))
2821 .prev();
2822
2823 if let Some((key, addr)) = found
2826 && key.is_prefix_of(&actor_id.clone().into())
2827 {
2828 Some(addr.clone())
2829 } else {
2830 let addr = actor_id.proc_id().addr().clone();
2831 if self.direct_addressed_remote_only {
2832 addr.transport().is_remote().then_some(addr)
2833 } else {
2834 Some(addr)
2835 }
2836 }
2837 }
2838
2839 pub fn prefixes(&self) -> BTreeSet<reference::Reference> {
2842 let addrs = self.address_book.read().unwrap();
2843 let mut prefixes: BTreeSet<reference::Reference> = BTreeSet::new();
2844 for (reference, _) in addrs.iter() {
2845 match prefixes.lower_bound(Excluded(reference)).peek_prev() {
2846 Some(candidate) if candidate.is_prefix_of(reference) => (),
2847 _ => {
2848 prefixes.insert(reference.clone());
2849 }
2850 }
2851 }
2852
2853 prefixes
2854 }
2855
2856 fn dial(
2857 &self,
2858 addr: &ChannelAddr,
2859 actor_id: &reference::ActorId,
2860 ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2861 match self.sender_cache.entry(addr.clone()) {
2865 Entry::Occupied(entry) => Ok(entry.get().clone()),
2866 Entry::Vacant(entry) => {
2867 let tx = channel::dial(addr.clone()).map_err(|err| {
2868 MailboxSenderError::new_unbound_type(
2869 actor_id.clone(),
2870 MailboxSenderErrorKind::Channel(err),
2871 "unknown",
2872 )
2873 })?;
2874 let sender = MailboxClient::new(tx);
2875 Ok(entry.insert(Arc::new(sender)).value().clone())
2876 }
2877 }
2878 }
2879}
2880
2881#[async_trait]
2882impl MailboxSender for DialMailboxRouter {
2883 fn post_unchecked(
2884 &self,
2885 envelope: MessageEnvelope,
2886 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2887 ) {
2888 let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2889 self.default.post(envelope, return_handle);
2890 return;
2891 };
2892
2893 match self.dial(&addr, envelope.dest().actor_id()) {
2894 Err(err) => envelope.undeliverable(
2895 DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2896 return_handle,
2897 ),
2898 Ok(sender) => sender.post(envelope, return_handle),
2899 }
2900 }
2901
2902 async fn flush(&self) -> Result<(), anyhow::Error> {
2903 let senders: Vec<_> = self
2904 .sender_cache
2905 .iter()
2906 .map(|entry| entry.value().clone())
2907 .collect();
2908 let mut futs: Vec<_> = senders.iter().map(|s| s.flush()).collect();
2909 futs.push(self.default.flush());
2910 futures::future::try_join_all(futs).await?;
2911 Ok(())
2912 }
2913}
2914
2915#[derive(Debug)]
2918pub struct UnroutableMailboxSender;
2919
2920#[async_trait]
2921impl MailboxSender for UnroutableMailboxSender {
2922 fn post_unchecked(
2923 &self,
2924 envelope: MessageEnvelope,
2925 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2926 ) {
2927 envelope.undeliverable(
2928 DeliveryError::Unroutable("destination not found in routing table".to_string()),
2929 return_handle,
2930 );
2931 }
2932}
2933
2934#[cfg(test)]
2935mod tests {
2936
2937 use std::assert_matches::assert_matches;
2938 use std::mem::drop;
2939 use std::str::FromStr;
2940 use std::sync::atomic::AtomicUsize;
2941 use std::time::Duration;
2942
2943 use timed_test::async_timed_test;
2944
2945 use super::*;
2946 use crate::Actor;
2947 use crate::ActorHandle;
2948 use crate::Instance;
2949 use crate::accum;
2950 use crate::accum::ReducerMode;
2951 use crate::channel::ChannelTransport;
2952 use crate::context::Mailbox as MailboxContext;
2953 use crate::proc::Proc;
2954 use crate::testing::ids::test_actor_id;
2955 use crate::testing::ids::test_port_id;
2956 use crate::testing::ids::test_proc_id;
2957
2958 fn test_proc_ref(name: &str) -> reference::Reference {
2959 reference::Reference::Proc(test_proc_id(name))
2960 }
2961
2962 fn test_actor_ref(proc_name: &str, actor_name: &str) -> reference::Reference {
2963 reference::Reference::Actor(test_actor_id(proc_name, actor_name))
2964 }
2965
2966 #[test]
2967 fn test_error() {
2968 use crate::testing::ids::test_actor_id_with_pid;
2969 let err = MailboxError::new(
2970 test_actor_id_with_pid("myworld_2", "myactor", 5),
2971 MailboxErrorKind::Closed,
2972 );
2973 assert!(format!("{}", err).ends_with(",test_myworld_2,myactor[5]: mailbox closed"));
2976 }
2977
2978 #[tokio::test]
2979 async fn test_mailbox_basic() {
2980 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
2981 let (port, mut receiver) = mbox.open_port::<u64>();
2982 let port = port.bind();
2983
2984 mbox.serialize_and_send(&port, 123, monitored_return_handle())
2985 .unwrap();
2986 mbox.serialize_and_send(&port, 321, monitored_return_handle())
2987 .unwrap();
2988 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2989 assert_eq!(receiver.recv().await.unwrap(), 321u64);
2990
2991 let serialized = wirevalue::Any::serialize(&999u64).unwrap();
2992 mbox.post(
2993 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2994 monitored_return_handle(),
2995 );
2996 assert_eq!(receiver.recv().await.unwrap(), 999u64);
2997 }
2998
2999 #[tokio::test]
3000 async fn test_mailbox_accum() {
3001 let proc = Proc::local();
3002 let (client, _) = proc.instance("client").unwrap();
3003 let (port, mut receiver) = client
3004 .mailbox()
3005 .open_accum_port(accum::join_semilattice::<accum::Max<i64>>());
3006
3007 for i in -3..4 {
3008 port.send(&client, accum::Max(i)).unwrap();
3009 let received: accum::Max<i64> = receiver.recv().await.unwrap();
3010 let msg = received.get();
3011 assert_eq!(msg, &i);
3012 }
3013 for i in -3..4 {
3015 port.send(&client, accum::Max(i)).unwrap();
3016 assert_eq!(receiver.recv().await.unwrap().get(), &3);
3017 }
3018 port.send(&client, accum::Max(4)).unwrap();
3020 assert_eq!(receiver.recv().await.unwrap().get(), &4);
3021
3022 for i in 5..10 {
3024 port.send(&client, accum::Max(i)).unwrap();
3025 }
3026 assert_eq!(receiver.recv().await.unwrap().get(), &9);
3027 port.send(&client, accum::Max(1)).unwrap();
3028 port.send(&client, accum::Max(3)).unwrap();
3029 port.send(&client, accum::Max(2)).unwrap();
3030 assert_eq!(receiver.recv().await.unwrap().get(), &9);
3031 }
3032
3033 #[test]
3034 fn test_port_and_reducer() {
3035 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
3036 {
3038 let accumulator = accum::join_semilattice::<accum::Max<u64>>();
3039 let reducer_spec = accumulator.reducer_spec().unwrap();
3040 let (port, _) = mbox.open_accum_port(accum::join_semilattice::<accum::Max<u64>>());
3041 assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
3042 let port_ref = port.bind();
3043 assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
3044 }
3045 {
3047 let (port, _) = mbox.open_port::<u64>();
3048 assert_eq!(port.reducer_spec, None);
3049 let port_ref = port.bind();
3050 assert_eq!(port_ref.reducer_spec(), &None);
3051 }
3052 }
3053
3054 #[tokio::test]
3055 #[ignore] async fn test_mailbox_once() {
3057 let proc = Proc::local();
3058 let (client, _) = proc.instance("client").unwrap();
3059
3060 let (port, receiver) = client.open_once_port::<u64>();
3061
3062 port.send(&client, 123u64).unwrap();
3065 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3066
3067 }
3078
3079 #[tokio::test]
3080 #[ignore] async fn test_mailbox_receiver_drop() {
3082 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
3083 let (port, mut receiver) = mbox.open_port::<u64>();
3084 let port = port.bind();
3086 mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
3087 .unwrap();
3088 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3089 drop(receiver);
3090 let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
3091 panic!();
3092 };
3093
3094 assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
3095 assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
3096 }
3097
3098 #[tokio::test]
3099 async fn test_drain() {
3100 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
3101
3102 let (port, mut receiver) = mbox.open_port();
3103 let port = port.bind();
3104
3105 for i in 0..10 {
3106 mbox.serialize_and_send(&port, i, monitored_return_handle())
3107 .unwrap();
3108 }
3109
3110 for i in 0..10 {
3111 assert_eq!(receiver.recv().await.unwrap(), i);
3112 }
3113
3114 assert!(receiver.drain().is_empty());
3115 }
3116
3117 #[tokio::test]
3118 async fn test_mailbox_muxer() {
3119 let muxer = MailboxMuxer::new();
3120
3121 let mbox0 = Mailbox::new_detached(test_actor_id("0", "actor1"));
3122 let mbox1 = Mailbox::new_detached(test_actor_id("0", "actor2"));
3123
3124 muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
3125 muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
3126
3127 let (port, receiver) = mbox0.open_once_port::<u64>();
3128
3129 let proc = Proc::configured(test_proc_id("0"), BoxedMailboxSender::new(muxer));
3130 let (client, _) = proc.instance("client").unwrap();
3131
3132 port.send(&client, 123u64).unwrap();
3133 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3134
3135 }
3145
3146 #[tokio::test]
3147 async fn test_local_client_server() {
3148 let mbox = Mailbox::new_detached(test_actor_id("0", "actor0"));
3149 let (tx, rx) = channel::local::new();
3150 let serve_handle = mbox.clone().serve(rx);
3151 let client = MailboxClient::new(tx);
3152
3153 let (port, receiver) = mbox.open_once_port::<u64>();
3154 let port = port.bind();
3155
3156 client
3157 .serialize_and_send_once(port, 123u64, monitored_return_handle())
3158 .unwrap();
3159 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3160 serve_handle.stop("fromt test");
3161 serve_handle.await.unwrap().unwrap();
3162 }
3163
3164 #[tokio::test]
3165 async fn test_mailbox_router() {
3166 let mbox0 = Mailbox::new_detached(test_actor_id("world0_0", "actor0"));
3167 let mbox1 = Mailbox::new_detached(test_actor_id("world1_0", "actor0"));
3168 let mbox2 = Mailbox::new_detached(test_actor_id("world1_1", "actor0"));
3169 let mbox3 = Mailbox::new_detached(test_actor_id("world1_1", "actor1"));
3170
3171 let comms: Vec<(reference::OncePortRef<u64>, OncePortReceiver<u64>)> =
3172 [&mbox0, &mbox1, &mbox2, &mbox3]
3173 .into_iter()
3174 .map(|mbox| {
3175 let (port, receiver) = mbox.open_once_port::<u64>();
3176 (port.bind(), receiver)
3177 })
3178 .collect();
3179
3180 let router = MailboxRouter::new();
3181
3182 router.bind(test_proc_id("world0_0").into(), mbox0);
3183 router.bind(test_proc_id("world1_0").into(), mbox1);
3184 router.bind(test_proc_id("world1_1").into(), mbox2);
3185 router.bind(test_actor_id("world1_1", "actor1").into(), mbox3);
3186
3187 for (i, (port, receiver)) in comms.into_iter().enumerate() {
3188 router
3189 .serialize_and_send_once(port, i as u64, monitored_return_handle())
3190 .unwrap();
3191 assert_eq!(receiver.recv().await.unwrap(), i as u64);
3192 }
3193
3194 let mbox4 = Mailbox::new_detached(test_actor_id("fallback_0", "actor"));
3197
3198 let (return_handle, mut return_receiver) =
3199 crate::mailbox::undeliverable::new_undeliverable_port();
3200 let (port, _receiver) = mbox4.open_once_port();
3201 router
3202 .serialize_and_send_once(port.bind(), 0, return_handle.clone())
3203 .unwrap();
3204 assert!(return_receiver.recv().await.is_ok());
3205
3206 let router = router.fallback(mbox4.clone().into_boxed());
3207 let (port, receiver) = mbox4.open_once_port();
3208 router
3209 .serialize_and_send_once(port.bind(), 0, return_handle)
3210 .unwrap();
3211 assert_eq!(receiver.recv().await.unwrap(), 0);
3212 }
3213
3214 #[tokio::test]
3215 async fn test_dial_mailbox_router() {
3216 let router = DialMailboxRouter::new();
3217
3218 router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
3219 router.bind(test_proc_ref("world1_0"), "unix!@2".parse().unwrap());
3220 router.bind(test_proc_ref("world1_1"), "unix!@3".parse().unwrap());
3221 router.bind(
3222 test_actor_ref("world1_1", "actor1"),
3223 "unix!@4".parse().unwrap(),
3224 );
3225 router.bind(
3227 "unix:@4,my_proc,my_actor".parse().unwrap(),
3228 "unix:@5".parse().unwrap(),
3229 );
3230
3231 router
3233 .lookup_addr(&test_actor_id("world0_0", "actor"))
3234 .unwrap();
3235 router
3236 .lookup_addr(&test_actor_id("world1_0", "actor"))
3237 .unwrap();
3238
3239 let actor_id = reference::Reference::from_str("unix:@4,my_proc,my_actor")
3240 .unwrap()
3241 .into_actor()
3242 .unwrap();
3243 assert_eq!(
3244 router.lookup_addr(&actor_id).unwrap(),
3245 "unix!@5".parse().unwrap(),
3246 );
3247 router.unbind(&actor_id.clone().into());
3248 assert_eq!(
3249 router.lookup_addr(&actor_id).unwrap(),
3250 "unix!@4".parse().unwrap(),
3251 );
3252
3253 let fallback = ChannelAddr::any(ChannelTransport::Local);
3258 router.unbind(&test_proc_ref("world1_0"));
3259 router.unbind(&test_proc_ref("world1_1"));
3260 assert_eq!(
3261 router
3262 .lookup_addr(&test_actor_id("world1_0", "actor1"))
3263 .unwrap(),
3264 fallback,
3265 );
3266 assert_eq!(
3267 router
3268 .lookup_addr(&test_actor_id("world1_1", "actor1"))
3269 .unwrap(),
3270 fallback,
3271 );
3272 router
3273 .lookup_addr(&test_actor_id("world0_0", "actor"))
3274 .unwrap();
3275 router.unbind(&test_proc_ref("world0_0"));
3276 assert_eq!(
3277 router
3278 .lookup_addr(&test_actor_id("world0_0", "actor"))
3279 .unwrap(),
3280 fallback,
3281 );
3282 }
3283
3284 #[tokio::test]
3285 #[ignore] async fn test_dial_mailbox_router_default() {
3287 let mbox0 = Mailbox::new_detached(test_actor_id("world0_0", "actor0"));
3288 let mbox1 = Mailbox::new_detached(test_actor_id("world1_0", "actor0"));
3289 let mbox2 = Mailbox::new_detached(test_actor_id("world1_1", "actor0"));
3290 let mbox3 = Mailbox::new_detached(test_actor_id("world1_1", "actor1"));
3291
3292 let root = MailboxRouter::new();
3295 let world0_router = DialMailboxRouter::new_with_default(root.boxed());
3296 let world1_router = DialMailboxRouter::new_with_default(root.boxed());
3297
3298 root.bind(test_proc_ref("world0"), world0_router.clone());
3299 root.bind(test_proc_ref("world1"), world1_router.clone());
3300
3301 let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
3302
3303 let mut handles = Vec::new(); for mbox in mailboxes.iter() {
3305 let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
3306 let handle = (*mbox).clone().serve(rx);
3307 handles.push(handle);
3308
3309 eprintln!("{}: {}", mbox.actor_id(), addr);
3310 if mbox.actor_id().proc_id().name().starts_with("world0") {
3311 world0_router.bind(mbox.actor_id().clone().into(), addr);
3312 } else {
3313 world1_router.bind(mbox.actor_id().clone().into(), addr);
3314 }
3315 }
3316
3317 for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
3319 for mbox in mailboxes.iter() {
3320 let (port, receiver) = mbox.open_once_port::<u64>();
3321 let port = port.bind();
3322 router
3323 .serialize_and_send_once(port, 123u64, monitored_return_handle())
3324 .unwrap();
3325 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3326 }
3327 }
3328 }
3329
3330 #[tokio::test]
3331 async fn test_enqueue_port() {
3332 let proc = Proc::local();
3333 let (client, _) = proc.instance("client").unwrap();
3334
3335 let count = Arc::new(AtomicUsize::new(0));
3336 let count_clone = count.clone();
3337 let port = client.mailbox().open_enqueue_port(move |_, n| {
3338 count_clone.fetch_add(n, Ordering::SeqCst);
3339 Ok(())
3340 });
3341
3342 port.send(&client, 10).unwrap();
3343 port.send(&client, 5).unwrap();
3344 port.send(&client, 1).unwrap();
3345 port.send(&client, 0).unwrap();
3346
3347 assert_eq!(count.load(Ordering::SeqCst), 16);
3348 }
3349
3350 #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3351 struct TestMessage;
3352
3353 #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3354 #[named(name = "some::custom::path")]
3355 struct TestMessage2;
3356
3357 #[test]
3358 fn test_remote_message_macros() {
3359 assert_eq!(
3360 TestMessage::typename(),
3361 "hyperactor::mailbox::tests::TestMessage"
3362 );
3363 assert_eq!(TestMessage2::typename(), "some::custom::path");
3364 }
3365
3366 #[test]
3367 fn test_message_envelope_display() {
3368 #[derive(typeuri::Named, Serialize, Deserialize)]
3369 struct MyTest {
3370 a: u64,
3371 b: String,
3372 }
3373 wirevalue::register_type!(MyTest);
3374
3375 let envelope = MessageEnvelope::serialize(
3376 test_actor_id("source_0", "actor"),
3377 test_port_id("dest_1", "actor", 123),
3378 &MyTest {
3379 a: 123,
3380 b: "hello".into(),
3381 },
3382 Flattrs::new(),
3383 )
3384 .unwrap();
3385
3386 assert!(format!("{}", envelope).contains("MyTest{\"a\":123,\"b\":\"hello\"}"));
3388 }
3389
3390 #[derive(Debug, Default)]
3391 struct Foo;
3392
3393 impl Actor for Foo {}
3394
3395 #[tokio::test]
3398 async fn test_actor_delivery_failure() {
3399 use crate::actor::ActorStatus;
3402 use crate::testing::proc_supervison::ProcSupervisionCoordinator;
3403
3404 let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
3405 BOXED_PANICKING_MAILBOX_SENDER.clone(),
3406 ));
3407 let proc_id = test_proc_id("quux_0");
3408 let mut proc = Proc::configured(proc_id.clone(), proc_forwarder);
3409 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3410 let (client, _) = proc.instance("client").unwrap();
3411
3412 let foo = proc.spawn("foo", Foo).unwrap();
3413 let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
3414 let message = MessageEnvelope::new(
3415 foo.actor_id().clone(),
3416 test_port_id("corge_0", "bar", 9999),
3417 wirevalue::Any::serialize(&1u64).unwrap(),
3418 Flattrs::new(),
3419 );
3420 return_handle.send(&client, Undeliverable(message)).unwrap();
3421
3422 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
3423
3424 let foo_status = foo.status();
3425 assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
3426 let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
3427 unreachable!()
3428 };
3429 let msg_str = msg.to_string();
3430 assert!(msg_str.contains("undeliverable message error"));
3431 assert!(msg_str.contains("sender:") && msg_str.contains("quux_0"));
3433 assert!(msg_str.contains("dest:") && msg_str.contains("corge_0"));
3434
3435 proc.destroy_and_wait::<()>(tokio::time::Duration::from_secs(1), None, "test cleanup")
3436 .await
3437 .unwrap();
3438 }
3439
3440 #[tokio::test]
3441 async fn test_detached_return_handle() {
3442 let (return_handle, mut return_receiver) =
3443 crate::mailbox::undeliverable::new_undeliverable_port();
3444 let envelope = MessageEnvelope::new(
3446 test_actor_id("foo_0", "bar"),
3447 test_port_id("baz_0", "corge", 9999),
3448 wirevalue::Any::serialize(&1u64).unwrap(),
3449 Flattrs::new(),
3450 );
3451 let proc = Proc::local();
3452 let (client, _) = proc.instance("client").unwrap();
3453 return_handle
3454 .send(&client, Undeliverable(envelope.clone()))
3455 .unwrap();
3456 assert!(
3458 tokio::time::timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
3459 .await
3460 .is_ok()
3461 );
3462 let monitor_handle = tokio::spawn(async move {
3465 while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
3466 envelope.set_error(DeliveryError::BrokenLink(
3467 "returned in unit test".to_string(),
3468 ));
3469 UndeliverableMailboxSender
3470 .post(envelope, monitored_return_handle());
3471 }
3472 });
3473 drop(return_handle);
3474 assert!(
3475 tokio::time::timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3476 .await
3477 .is_ok()
3478 );
3479 }
3480
3481 async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3482 fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3483 let dummy_actor_id = test_actor_id("world_0", "actor");
3486 let dummy_state = State::new(
3487 dummy_actor_id.clone(),
3488 BOXED_PANICKING_MAILBOX_SENDER.clone(),
3489 );
3490 let dummy_port_id = reference::PortId::new(dummy_actor_id, 0);
3491 let (sender, receiver) = mpsc::unbounded_channel::<M>();
3492 let receiver = PortReceiver {
3493 receiver,
3494 port_id: dummy_port_id,
3495 coalesce,
3496 mailbox: Mailbox {
3497 inner: Arc::new(dummy_state),
3498 },
3499 };
3500 (sender, receiver)
3501 }
3502
3503 {
3505 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3506 assert!(receiver.drain().is_empty());
3507
3508 sender.send(0).unwrap();
3509 sender.send(1).unwrap();
3510 sender.send(2).unwrap();
3511 sender.send(3).unwrap();
3512 sender.send(4).unwrap();
3513 sender.send(5).unwrap();
3514 sender.send(6).unwrap();
3515 sender.send(7).unwrap();
3516
3517 if drop_sender {
3518 drop(sender);
3519 }
3520
3521 if !coalesce {
3522 assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3523 } else {
3524 assert_eq!(receiver.drain(), vec![7]);
3525 }
3526
3527 assert!(receiver.drain().is_empty());
3528 assert!(receiver.drain().is_empty());
3529 }
3530
3531 {
3533 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3534 assert!(receiver.try_recv().unwrap().is_none());
3535
3536 sender.send(0).unwrap();
3537 sender.send(1).unwrap();
3538 sender.send(2).unwrap();
3539 sender.send(3).unwrap();
3540
3541 if drop_sender {
3542 drop(sender);
3543 }
3544
3545 if !coalesce {
3546 assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3547 assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3548 assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3549 }
3550 assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3551 if drop_sender {
3552 assert_matches!(
3553 receiver.try_recv().unwrap_err().kind(),
3554 MailboxErrorKind::Closed
3555 );
3556 assert_matches!(
3558 receiver.try_recv().unwrap_err().kind(),
3559 MailboxErrorKind::Closed
3560 );
3561 } else {
3562 assert!(receiver.try_recv().unwrap().is_none());
3563 assert!(receiver.try_recv().unwrap().is_none());
3565 }
3566 }
3567 {
3569 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3570 assert!(
3571 tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3572 .await
3573 .is_err()
3574 );
3575
3576 sender.send(4).unwrap();
3577 sender.send(5).unwrap();
3578 sender.send(6).unwrap();
3579 sender.send(7).unwrap();
3580
3581 if drop_sender {
3582 drop(sender);
3583 }
3584
3585 if !coalesce {
3586 assert_eq!(receiver.recv().await.unwrap(), 4);
3587 assert_eq!(receiver.recv().await.unwrap(), 5);
3588 assert_eq!(receiver.recv().await.unwrap(), 6);
3589 }
3590 assert_eq!(receiver.recv().await.unwrap(), 7);
3591 if drop_sender {
3592 assert_matches!(
3593 receiver.recv().await.unwrap_err().kind(),
3594 MailboxErrorKind::Closed
3595 );
3596 assert_matches!(
3598 receiver.recv().await.unwrap_err().kind(),
3599 MailboxErrorKind::Closed
3600 );
3601 } else {
3602 assert!(
3603 tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3604 .await
3605 .is_err()
3606 );
3607 }
3608 }
3609 }
3610
3611 #[tokio::test]
3612 async fn test_receiver_basic_default() {
3613 verify_receiver(false, false).await
3614 }
3615
3616 #[tokio::test]
3617 async fn test_receiver_basic_latest() {
3618 verify_receiver(true, false).await
3619 }
3620
3621 #[tokio::test]
3622 async fn test_receiver_after_sender_drop_default() {
3623 verify_receiver(false, true).await
3624 }
3625
3626 #[tokio::test]
3627 async fn test_receiver_after_sender_drop_latest() {
3628 verify_receiver(true, true).await
3629 }
3630
3631 struct Setup {
3632 receiver: PortReceiver<u64>,
3633 actor0: Instance<()>,
3634 actor1: Instance<()>,
3635 _actor0_handle: ActorHandle<()>,
3636 _actor1_handle: ActorHandle<()>,
3637 port_id: reference::PortId,
3638 port_id1: reference::PortId,
3639 port_id2: reference::PortId,
3640 port_id2_1: reference::PortId,
3641 }
3642
3643 async fn setup_split_port_ids(
3644 reducer_spec: Option<ReducerSpec>,
3645 reducer_mode: ReducerMode,
3646 ) -> Setup {
3647 let proc = Proc::local();
3648 let (actor0, actor0_handle) = proc.instance("actor0").unwrap();
3649 let (actor1, actor1_handle) = proc.instance("actor1").unwrap();
3650
3651 let (port_handle, receiver) = actor0.open_port::<u64>();
3653 let port_id = port_handle.bind().port_id().clone();
3654
3655 let port_id1 = port_id
3657 .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
3658 .unwrap();
3659 let port_id2 = port_id
3660 .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
3661 .unwrap();
3662
3663 let port_id2_1 = port_id2
3665 .split(&actor1, reducer_spec, reducer_mode.clone(), true)
3666 .unwrap();
3667
3668 Setup {
3669 receiver,
3670 actor0,
3671 actor1,
3672 _actor0_handle: actor0_handle,
3673 _actor1_handle: actor1_handle,
3674 port_id,
3675 port_id1,
3676 port_id2,
3677 port_id2_1,
3678 }
3679 }
3680
3681 fn post(cx: &impl context::Actor, port_id: reference::PortId, msg: u64) {
3682 let serialized = wirevalue::Any::serialize(&msg).unwrap();
3683 port_id.send(cx, serialized);
3684 }
3685
3686 #[async_timed_test(timeout_secs = 30)]
3687 #[cfg_attr(not(fbcode_build), ignore)]
3689 async fn test_split_port_id_no_reducer() {
3690 let Setup {
3691 mut receiver,
3692 actor0,
3693 actor1,
3694 port_id,
3695 port_id1,
3696 port_id2,
3697 port_id2_1,
3698 ..
3699 } = setup_split_port_ids(None, ReducerMode::default()).await;
3700 post(&actor0, port_id.clone(), 1);
3702 assert_eq!(receiver.recv().await.unwrap(), 1);
3703 post(&actor1, port_id1.clone(), 2);
3704 assert_eq!(receiver.recv().await.unwrap(), 2);
3705 post(&actor1, port_id2.clone(), 3);
3706 assert_eq!(receiver.recv().await.unwrap(), 3);
3707 post(&actor1, port_id2_1.clone(), 4);
3708 assert_eq!(receiver.recv().await.unwrap(), 4);
3709
3710 tokio::time::sleep(Duration::from_secs(2)).await;
3712 let msg = receiver.try_recv().unwrap();
3713 assert_eq!(msg, None);
3714 }
3715
3716 async fn wait_for(
3717 receiver: &mut PortReceiver<u64>,
3718 expected_size: usize,
3719 timeout_duration: Duration,
3720 ) -> anyhow::Result<Vec<u64>> {
3721 let mut messeges = vec![];
3722
3723 tokio::time::timeout(timeout_duration, async {
3724 loop {
3725 let msg = receiver.recv().await.unwrap();
3726 messeges.push(msg);
3727 if messeges.len() == expected_size {
3728 break;
3729 }
3730 }
3731 })
3732 .await?;
3733 Ok(messeges)
3734 }
3735
3736 #[async_timed_test(timeout_secs = 30)]
3737 async fn test_split_port_id_sum_reducer() {
3738 let config = hyperactor_config::global::lock();
3739 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3740
3741 let sum_accumulator = accum::sum::<u64>();
3742 let reducer_spec = sum_accumulator.reducer_spec();
3743 let Setup {
3744 mut receiver,
3745 actor0,
3746 actor1,
3747 port_id,
3748 port_id1,
3749 port_id2,
3750 port_id2_1,
3751 ..
3752 } = setup_split_port_ids(reducer_spec, ReducerMode::default()).await;
3753 post(&actor0, port_id.clone(), 4);
3754 post(&actor1, port_id1.clone(), 2);
3755 post(&actor1, port_id2.clone(), 3);
3756 post(&actor1, port_id2_1.clone(), 1);
3757 let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3758 .await
3759 .unwrap();
3760 messages.sort();
3763 assert_eq!(messages, vec![1, 2, 3, 4]);
3764
3765 tokio::time::sleep(Duration::from_secs(2)).await;
3767 let msg = receiver.try_recv().unwrap();
3768 assert_eq!(msg, None);
3769 }
3770
3771 #[async_timed_test(timeout_secs = 30)]
3772 #[cfg_attr(not(fbcode_build), ignore)]
3774 async fn test_split_port_id_every_n_messages() {
3775 let config = hyperactor_config::global::lock();
3776 let _config_guard =
3777 config.override_key(crate::config::SPLIT_MAX_BUFFER_AGE, Duration::from_mins(10));
3778 let proc = Proc::local();
3779 let (actor, _actor_handle) = proc.instance("actor").unwrap();
3780 let (port_handle, mut receiver) = actor.open_port::<u64>();
3781 let port_id = port_handle.bind().port_id().clone();
3782 let reducer_spec = accum::sum::<u64>().reducer_spec();
3784 let split_port_id = port_id
3785 .split(
3786 &actor,
3787 reducer_spec,
3788 ReducerMode::Streaming(accum::StreamingReducerOpts {
3789 max_update_interval: Some(Duration::from_mins(10)),
3790 initial_update_interval: Some(Duration::from_mins(10)),
3791 }),
3792 true,
3793 )
3794 .unwrap();
3795
3796 for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3798 post(&actor, split_port_id.clone(), msg);
3799 }
3800 let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3803 .await
3804 .unwrap();
3805 assert_eq!(messages, vec![15]);
3806
3807 tokio::time::sleep(Duration::from_secs(2)).await;
3810 let msg = receiver.try_recv().unwrap();
3811 assert_eq!(msg, None);
3812 }
3813
3814 #[async_timed_test(timeout_secs = 30)]
3815 async fn test_split_port_timeout_flush() {
3816 let config = hyperactor_config::global::lock();
3817 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
3818
3819 let Setup {
3820 mut receiver,
3821 actor0: _,
3822 actor1,
3823 port_id: _,
3824 port_id1,
3825 port_id2: _,
3826 port_id2_1: _,
3827 ..
3828 } = setup_split_port_ids(
3829 Some(accum::sum::<u64>().reducer_spec().unwrap()),
3830 ReducerMode::Streaming(accum::StreamingReducerOpts {
3831 max_update_interval: Some(Duration::from_millis(50)),
3832 initial_update_interval: Some(Duration::from_millis(50)),
3833 }),
3834 )
3835 .await;
3836
3837 post(&actor1, port_id1.clone(), 10);
3838 post(&actor1, port_id1.clone(), 20);
3839 post(&actor1, port_id1.clone(), 30);
3840
3841 tokio::time::sleep(Duration::from_millis(10)).await;
3843 let msg = receiver.try_recv().unwrap();
3844 assert_eq!(msg, None);
3845
3846 tokio::time::sleep(Duration::from_millis(100)).await;
3848
3849 let msg = receiver.recv().await.unwrap();
3851 assert_eq!(msg, 60); let msg = receiver.try_recv().unwrap();
3855 assert_eq!(msg, None);
3856 }
3857
3858 #[async_timed_test(timeout_secs = 30)]
3859 async fn test_split_port_timeout_and_size_flush() {
3860 let config = hyperactor_config::global::lock();
3861 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
3862
3863 let Setup {
3864 mut receiver,
3865 actor0: _,
3866 actor1,
3867 port_id: _,
3868 port_id1,
3869 port_id2: _,
3870 port_id2_1: _,
3871 ..
3872 } = setup_split_port_ids(
3873 Some(accum::sum::<u64>().reducer_spec().unwrap()),
3874 ReducerMode::Streaming(accum::StreamingReducerOpts {
3875 max_update_interval: Some(Duration::from_millis(50)),
3876 initial_update_interval: Some(Duration::from_millis(50)),
3877 }),
3878 )
3879 .await;
3880
3881 post(&actor1, port_id1.clone(), 10);
3882 post(&actor1, port_id1.clone(), 20);
3883 post(&actor1, port_id1.clone(), 30);
3884 post(&actor1, port_id1.clone(), 40);
3885
3886 let msg = receiver.recv().await.unwrap();
3888 assert_eq!(msg, 60);
3889
3890 let msg = receiver.recv().await.unwrap();
3892 assert_eq!(msg, 40);
3893
3894 let msg = receiver.try_recv().unwrap();
3896 assert_eq!(msg, None);
3897 }
3898
3899 #[async_timed_test(timeout_secs = 30)]
3900 async fn test_split_port_once_mode_basic() {
3901 let proc = Proc::local();
3902 let (actor, _actor_handle) = proc.instance("actor").unwrap();
3903 let (port_handle, mut receiver) = actor.open_port::<u64>();
3904 let port_id = port_handle.bind().port_id().clone();
3905
3906 let reducer_spec = accum::sum::<u64>().reducer_spec();
3908 let split_port_id = port_id
3909 .split(&actor, reducer_spec, ReducerMode::Once(3), true)
3910 .unwrap();
3911
3912 post(&actor, split_port_id.clone(), 10);
3914 post(&actor, split_port_id.clone(), 20);
3915 post(&actor, split_port_id.clone(), 30);
3916
3917 let msg = receiver.recv().await.unwrap();
3919 assert_eq!(msg, 60); tokio::time::sleep(Duration::from_millis(100)).await;
3923 let msg = receiver.try_recv().unwrap();
3924 assert_eq!(msg, None);
3925 }
3926
3927 #[async_timed_test(timeout_secs = 30)]
3928 async fn test_split_port_once_mode_teardown() {
3929 let proc = Proc::local();
3930 let (actor, _actor_handle) = proc.instance("actor").unwrap();
3931 let (port_handle, mut receiver) = actor.open_port::<u64>();
3932 let port_id = port_handle.bind().port_id().clone();
3933
3934 let (undeliverable_handle, mut undeliverable_receiver) =
3936 undeliverable::new_undeliverable_port();
3937
3938 let reducer_spec = accum::sum::<u64>().reducer_spec();
3940 let split_port_id = port_id
3941 .split(&actor, reducer_spec, ReducerMode::Once(3), true)
3942 .unwrap();
3943
3944 post(&actor, split_port_id.clone(), 10);
3946 post(&actor, split_port_id.clone(), 20);
3947 post(&actor, split_port_id.clone(), 30);
3948
3949 let msg = receiver.recv().await.unwrap();
3951 assert_eq!(msg, 60); let serialized = wirevalue::Any::serialize(&100u64).unwrap();
3955 let envelope = MessageEnvelope::new(
3956 actor.mailbox().actor_id().clone(),
3957 split_port_id.clone(),
3958 serialized,
3959 Flattrs::new(),
3960 );
3961 actor.mailbox().post(envelope, undeliverable_handle);
3962
3963 let undeliverable =
3965 tokio::time::timeout(Duration::from_secs(2), undeliverable_receiver.recv())
3966 .await
3967 .expect("should receive undeliverable message")
3968 .expect("receiver should not be closed");
3969
3970 assert_eq!(undeliverable.0.dest(), &split_port_id);
3972
3973 let msg = receiver.try_recv().unwrap();
3975 assert_eq!(msg, None);
3976 }
3977
3978 #[test]
3979 fn test_dial_mailbox_router_prefixes_empty() {
3980 assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
3981 }
3982
3983 #[test]
3984 fn test_dial_mailbox_router_prefixes_single_entry() {
3985 let router = DialMailboxRouter::new();
3986 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3987
3988 let prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3989 assert_eq!(prefixes.len(), 1);
3990 assert_eq!(prefixes[0], test_proc_ref("world0"));
3991 }
3992
3993 #[test]
3994 fn test_dial_mailbox_router_prefixes_no_overlap() {
3995 let router = DialMailboxRouter::new();
3996 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3997 router.bind(test_proc_ref("world1"), "unix!@2".parse().unwrap());
3998 router.bind(test_proc_ref("world2"), "unix!@3".parse().unwrap());
3999
4000 let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
4001 prefixes.sort();
4002
4003 let mut expected = vec![
4004 test_proc_ref("world0"),
4005 test_proc_ref("world1"),
4006 test_proc_ref("world2"),
4007 ];
4008 expected.sort();
4009
4010 assert_eq!(prefixes, expected);
4011 }
4012
4013 #[test]
4014 fn test_dial_mailbox_router_prefixes_with_overlaps() {
4015 let router = DialMailboxRouter::new();
4016 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4018 router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
4019 router.bind(test_proc_ref("world0_1"), "unix!@3".parse().unwrap());
4020 router.bind(test_proc_ref("world1"), "unix!@4".parse().unwrap());
4021 router.bind(test_proc_ref("world1_0"), "unix!@5".parse().unwrap());
4022
4023 let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
4024 prefixes.sort();
4025
4026 let mut expected = vec![
4027 test_proc_ref("world0"),
4028 test_proc_ref("world0_0"),
4029 test_proc_ref("world0_1"),
4030 test_proc_ref("world1"),
4031 test_proc_ref("world1_0"),
4032 ];
4033 expected.sort();
4034
4035 assert_eq!(prefixes, expected);
4036 }
4037
4038 #[test]
4039 fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
4040 let router = DialMailboxRouter::new();
4041 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4043 router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
4044 router.bind(
4045 test_actor_ref("world0_0", "actor1"),
4046 "unix!@3".parse().unwrap(),
4047 );
4048 router.bind(test_proc_ref("world1_0"), "unix!@4".parse().unwrap());
4049 router.bind(test_proc_ref("world1_1"), "unix!@5".parse().unwrap());
4050 router.bind(
4051 test_actor_ref("world2_0", "actor0"),
4052 "unix!@6".parse().unwrap(),
4053 );
4054
4055 let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
4056 prefixes.sort();
4057
4058 let mut expected = vec![
4065 test_proc_ref("world0"),
4066 test_proc_ref("world0_0"),
4067 test_proc_ref("world1_0"),
4068 test_proc_ref("world1_1"),
4069 test_actor_ref("world2_0", "actor0"),
4070 ];
4071 expected.sort();
4072
4073 assert_eq!(prefixes, expected);
4074 }
4075
4076 #[test]
4077 fn test_dial_mailbox_router_prefixes_same_level() {
4078 let router = DialMailboxRouter::new();
4079 router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
4080 router.bind(test_proc_ref("world0_1"), "unix!@2".parse().unwrap());
4081 router.bind(test_proc_ref("world0_2"), "unix!@3".parse().unwrap());
4082
4083 let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
4084 prefixes.sort();
4085
4086 let mut expected = vec![
4088 test_proc_ref("world0_0"),
4089 test_proc_ref("world0_1"),
4090 test_proc_ref("world0_2"),
4091 ];
4092 expected.sort();
4093
4094 assert_eq!(prefixes, expected);
4095 }
4096
4097 #[derive(Clone, Debug)]
4101 struct AsyncLoopForwarder;
4102
4103 #[async_trait]
4104 impl MailboxSender for AsyncLoopForwarder {
4105 fn post_unchecked(
4106 &self,
4107 envelope: MessageEnvelope,
4108 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
4109 ) {
4110 let me = self.clone();
4111 tokio::spawn(async move {
4112 me.post(envelope, return_handle);
4114 });
4115 }
4116 }
4117
4118 #[tokio::test]
4119 async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
4120 let actor_id = test_actor_id("world_0", "ttl_actor");
4121 let mailbox = Mailbox::new(
4122 actor_id.clone(),
4123 BoxedMailboxSender::new(AsyncLoopForwarder),
4124 );
4125 let (ret_port, mut ret_rx) = mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
4126
4127 let remote_actor = test_actor_id("remote_world_1", "remote");
4130 let dest = reference::PortId::new(remote_actor.clone(), 4242);
4131
4132 let payload = 1234_u64;
4135 let envelope =
4136 MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Flattrs::new())
4137 .expect("serialize");
4138
4139 let return_handle = ret_port.clone();
4142 mailbox.post(envelope, return_handle);
4143
4144 let Undeliverable(undelivered) =
4146 tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
4147 .await
4148 .expect("timed out waiting for undeliverable")
4149 .expect("channel closed");
4150
4151 let got: u64 = undelivered.deserialized().expect("deserialize");
4153 assert_eq!(got, payload, "payload preserved");
4154 }
4155
4156 #[tokio::test]
4157 async fn message_ttl_success_local_delivery() {
4158 let actor_id = test_actor_id("world_0", "ttl_actor");
4159 let mailbox = Mailbox::new(
4160 actor_id.clone(),
4161 BoxedMailboxSender::new(PanickingMailboxSender),
4162 );
4163 let (_undeliverable_tx, mut undeliverable_rx) =
4164 mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
4165
4166 let (user_port, mut user_rx) = mailbox.open_port::<u64>();
4168
4169 let payload = 0xC0FFEE_u64;
4171 let envelope = MessageEnvelope::serialize(
4172 actor_id.clone(),
4173 user_port.bind().port_id().clone(),
4174 &payload,
4175 Flattrs::new(),
4176 )
4177 .expect("serialize");
4178
4179 let return_handle = mailbox
4182 .bound_return_handle()
4183 .unwrap_or(monitored_return_handle());
4184 mailbox.post(envelope, return_handle);
4185
4186 let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
4188 .await
4189 .expect("timed out waiting for local delivery")
4190 .expect("user port closed");
4191 assert_eq!(got, payload);
4192
4193 let no_undeliverable =
4195 tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
4196 assert!(
4197 no_undeliverable.is_err(),
4198 "unexpected undeliverable returned on successful local delivery"
4199 );
4200 }
4201
4202 #[tokio::test]
4203 async fn test_port_contramap() {
4204 let proc = Proc::local();
4205 let (client, _) = proc.instance("client").unwrap();
4206 let (handle, mut rx) = client.open_port();
4207
4208 handle
4209 .contramap(|m| (1, m))
4210 .send(&client, "hello".to_string())
4211 .unwrap();
4212 assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
4213 }
4214
4215 #[test]
4216 #[should_panic(expected = "already bound")]
4217 fn test_bind_port_handle_to_actor_port_twice() {
4218 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4219 let (handle, _rx) = mbox.open_port::<String>();
4220 handle.bind_actor_port();
4221 handle.bind_actor_port();
4222 }
4223
4224 #[test]
4225 fn test_bind_port_handle_to_actor_port() {
4226 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4227 let default_port = mbox.actor_id().port_id(String::port());
4228 let (handle, _rx) = mbox.open_port::<String>();
4229 assert_ne!(default_port.index(), handle.port_index);
4231 handle.bind_actor_port();
4233 assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4234 handle.bind();
4236 handle.bind();
4237 assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4238 }
4239
4240 #[test]
4241 #[should_panic(expected = "already bound")]
4242 fn test_bind_port_handle_to_actor_port_when_already_bound() {
4243 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4244 let (handle, _rx) = mbox.open_port::<String>();
4245 handle.bind();
4247 assert_matches!(handle.location(), PortLocation::Bound(port) if port.index() == handle.port_index);
4248 handle.bind_actor_port();
4250 }
4251
4252 #[tokio::test]
4253 async fn test_mailbox_post_fails_when_actor_stopped() {
4254 let actor_id = test_actor_id("0", "stopped_actor");
4255
4256 let mailbox = Mailbox::new(
4257 actor_id.clone(),
4258 BoxedMailboxSender::new(PanickingMailboxSender),
4259 );
4260
4261 mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4262
4263 let (user_port, _user_rx) = mailbox.open_port::<u64>();
4264
4265 let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4268
4269 let envelope = MessageEnvelope::serialize(
4270 actor_id.clone(),
4271 user_port.bind().port_id().clone(),
4272 &42u64,
4273 Flattrs::new(),
4274 )
4275 .expect("serialize");
4276
4277 mailbox.post(envelope, return_handle);
4278
4279 let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4280 .await
4281 .expect("timed out waiting for undeliverable")
4282 .expect("return port closed");
4283
4284 let err = undeliverable.0.error_msg().expect("expected error");
4285 assert!(
4286 err.contains(&format!("owner {} is stopped", actor_id)),
4287 "error should indicate actor stopped: {}",
4288 err
4289 );
4290 }
4291
4292 #[tokio::test]
4293 async fn test_mailbox_post_fails_when_actor_failed() {
4294 use crate::actor::ActorErrorKind;
4295
4296 let actor_id = test_actor_id("0", "failed_actor");
4297
4298 let mailbox = Mailbox::new(
4299 actor_id.clone(),
4300 BoxedMailboxSender::new(PanickingMailboxSender),
4301 );
4302
4303 let (user_port, _user_rx) = mailbox.open_port::<u64>();
4304
4305 mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4306 "test failure".to_string(),
4307 )));
4308
4309 let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4312
4313 let envelope = MessageEnvelope::serialize(
4314 actor_id.clone(),
4315 user_port.bind().port_id().clone(),
4316 &42u64,
4317 Flattrs::new(),
4318 )
4319 .expect("serialize");
4320
4321 mailbox.post(envelope, return_handle);
4322
4323 let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4324 .await
4325 .expect("timed out waiting for undeliverable")
4326 .expect("return port closed");
4327
4328 let err = undeliverable.0.error_msg().expect("expected error");
4329 assert!(
4330 err.contains(&format!("owner {} failed", actor_id)),
4331 "error should indicate actor failed: {}",
4332 err
4333 );
4334 }
4335
4336 #[tokio::test]
4337 async fn test_port_handle_send_fails_when_actor_stopped() {
4338 let actor_id = test_actor_id("0", "stopped_actor");
4339
4340 let mailbox = Mailbox::new(
4341 actor_id.clone(),
4342 BoxedMailboxSender::new(PanickingMailboxSender),
4343 );
4344
4345 let (port_handle, _rx) = mailbox.open_port::<u64>();
4346 let proc = Proc::local();
4347 let (client, _) = proc.instance("client").unwrap();
4348
4349 mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4350
4351 let result = port_handle.send(&client, 42u64);
4352
4353 assert!(result.is_err(), "send should fail when actor is stopped");
4354 let err = result.unwrap_err();
4355 assert_matches!(
4356 err.kind(),
4357 MailboxSenderErrorKind::Mailbox(mailbox_err)
4358 if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "test stop")
4359 );
4360 }
4361
4362 #[tokio::test]
4363 async fn test_port_handle_send_fails_when_actor_failed() {
4364 use crate::actor::ActorErrorKind;
4365
4366 let actor_id = test_actor_id("0", "failed_actor");
4367
4368 let mailbox = Mailbox::new(
4369 actor_id.clone(),
4370 BoxedMailboxSender::new(PanickingMailboxSender),
4371 );
4372
4373 let (port_handle, _rx) = mailbox.open_port::<u64>();
4374 let proc = Proc::local();
4375 let (client, _) = proc.instance("client").unwrap();
4376
4377 mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4378 "test failure".to_string(),
4379 )));
4380
4381 let result = port_handle.send(&client, 42u64);
4382
4383 assert!(result.is_err(), "send should fail when actor is failed");
4384 let err = result.unwrap_err();
4385 assert_matches!(
4386 err.kind(),
4387 MailboxSenderErrorKind::Mailbox(mailbox_err)
4388 if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg))) if msg == "test failure")
4389 );
4390 }
4391
4392 #[async_timed_test(timeout_secs = 30)]
4393 async fn test_open_reduce_port() {
4394 let proc = Proc::local();
4395 let (client, _) = proc.instance("client").unwrap();
4396
4397 let (port_handle, receiver) = client.mailbox().open_reduce_port(accum::sum::<u64>());
4399
4400 let port_ref = port_handle.bind();
4402 assert!(port_ref.reducer_spec().is_some());
4403
4404 port_ref.send(&client, 42).unwrap();
4406
4407 let result = receiver.recv().await.unwrap();
4409 assert_eq!(result, 42);
4410 }
4411
4412 #[async_timed_test(timeout_secs = 30)]
4413 async fn test_open_reduce_port_reducer_spec_preserved() {
4414 let proc = Proc::local();
4415 let (client, _) = proc.instance("client").unwrap();
4416
4417 let (sum_handle, _) = client.mailbox().open_reduce_port(accum::sum::<u64>());
4419 let sum_ref = sum_handle.bind();
4420 let sum_typehash = sum_ref.reducer_spec().as_ref().unwrap().typehash;
4421
4422 let (max_handle, _) = client
4423 .mailbox()
4424 .open_reduce_port(accum::join_semilattice::<accum::Max<u64>>());
4425 let max_ref = max_handle.bind();
4426 let max_typehash = max_ref.reducer_spec().as_ref().unwrap().typehash;
4427
4428 assert_ne!(sum_typehash, max_typehash);
4430 }
4431
4432 #[tokio::test]
4437 async fn test_flush_over_unix_channel() {
4438 let mbox = Mailbox::new_detached(test_actor_id("0", "actor0"));
4439
4440 let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
4442 let serve_handle = mbox.clone().serve(rx);
4443
4444 let client = MailboxClient::dial(addr).unwrap();
4446
4447 let (port, mut receiver) = mbox.open_port::<u64>();
4449 let port = port.bind();
4450
4451 for i in 0..10u64 {
4453 client
4454 .serialize_and_send(&port, i, monitored_return_handle())
4455 .unwrap();
4456 }
4457
4458 client.flush().await.unwrap();
4461
4462 for i in 0..10u64 {
4464 let msg = receiver
4465 .try_recv()
4466 .expect("message should be available after flush")
4467 .expect("receiver should not be empty after flush");
4468 assert_eq!(msg, i);
4469 }
4470
4471 serve_handle.stop("test done");
4472 serve_handle.await.unwrap().unwrap();
4473 }
4474}