1#![allow(dead_code)] use std::any::Any;
69use std::collections::BTreeMap;
70use std::collections::BTreeSet;
71use std::fmt;
72use std::fmt::Debug;
73use std::future::Future;
74use std::ops::Bound::Excluded;
75use std::pin::Pin;
76use std::sync::Arc;
77use std::sync::LazyLock;
78use std::sync::Mutex;
79use std::sync::OnceLock;
80use std::sync::RwLock;
81use std::sync::Weak;
82use std::sync::atomic::AtomicU64;
83use std::sync::atomic::AtomicUsize;
84use std::sync::atomic::Ordering;
85use std::task::Context;
86use std::task::Poll;
87
88use async_trait::async_trait;
89use dashmap::DashMap;
90use dashmap::mapref::entry::Entry;
91use futures::Sink;
92use futures::Stream;
93use serde::Deserialize;
94use serde::Serialize;
95use serde::de::DeserializeOwned;
96use tokio::sync::mpsc;
97use tokio::sync::oneshot;
98use tokio::sync::watch;
99use tokio::task::JoinHandle;
100use tokio_util::sync::CancellationToken;
101
102use crate as hyperactor; use crate::Named;
104use crate::OncePortRef;
105use crate::PortRef;
106use crate::accum::Accumulator;
107use crate::accum::ReducerOpts;
108use crate::accum::ReducerSpec;
109use crate::actor::Signal;
110use crate::actor::remote::USER_PORT_OFFSET;
111use crate::attrs::Attrs;
112use crate::channel;
113use crate::channel::ChannelAddr;
114use crate::channel::ChannelError;
115use crate::channel::SendError;
116use crate::channel::TxStatus;
117use crate::context;
118use crate::data::Serialized;
119use crate::id;
120use crate::metrics;
121use crate::reference::ActorId;
122use crate::reference::PortId;
123use crate::reference::Reference;
124
125mod undeliverable;
126pub use undeliverable::Undeliverable;
128pub use undeliverable::UndeliverableMessageError;
129pub use undeliverable::custom_monitored_return_handle;
130pub use undeliverable::monitored_return_handle; pub use undeliverable::supervise_undeliverable_messages;
132pub use undeliverable::supervise_undeliverable_messages_with;
133pub mod mailbox_admin_message;
135pub use mailbox_admin_message::MailboxAdminMessage;
136pub use mailbox_admin_message::MailboxAdminMessageHandler;
137pub mod durable_mailbox_sender;
139pub use durable_mailbox_sender::log;
140use durable_mailbox_sender::log::*;
141pub mod headers;
143
144pub trait Message: Debug + Send + Sync + 'static {}
147impl<M: Debug + Send + Sync + 'static> Message for M {}
148
149pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
153
154impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
155
156pub type Data = Vec<u8>;
158
159#[derive(
161 thiserror::Error,
162 Debug,
163 Serialize,
164 Deserialize,
165 Named,
166 Clone,
167 PartialEq
168)]
169pub enum DeliveryError {
170 #[error("address not routable: {0}")]
172 Unroutable(String),
173
174 #[error("broken link: {0}")]
177 BrokenLink(String),
178
179 #[error("mailbox error: {0}")]
181 Mailbox(String),
182
183 #[error("multicast error: {0}")]
185 Multicast(String),
186
187 #[error("ttl expired")]
189 TtlExpired,
190}
191
192#[derive(Debug, Serialize, Deserialize, Clone, Named)]
196pub struct MessageEnvelope {
197 sender: ActorId,
199
200 dest: PortId,
202
203 data: Serialized,
205
206 errors: Vec<DeliveryError>,
208
209 headers: Attrs,
211
212 ttl: u8,
214 }
216
217impl MessageEnvelope {
218 pub fn new(sender: ActorId, dest: PortId, data: Serialized, headers: Attrs) -> Self {
220 Self {
221 sender,
222 dest,
223 data,
224 errors: Vec::new(),
225 headers,
226 ttl: crate::config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
227 }
228 }
229
230 pub(crate) fn new_unknown(dest: PortId, data: Serialized) -> Self {
232 Self::new(id!(unknown[0].unknown), dest, data, Attrs::new())
233 }
234
235 pub fn serialize<T: Serialize + Named>(
237 source: ActorId,
238 dest: PortId,
239 value: &T,
240 headers: Attrs,
241 ) -> Result<Self, crate::data::Error> {
242 Ok(Self {
243 headers,
244 data: Serialized::serialize(value)?,
245 sender: source,
246 dest,
247 errors: Vec::new(),
248 ttl: crate::config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
249 })
250 }
251
252 pub fn ttl(&self) -> u8 {
258 self.ttl
259 }
260
261 pub fn set_ttl(mut self, ttl: u8) -> Self {
271 self.ttl = ttl;
272 self
273 }
274
275 fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
283 if self.ttl == 0 {
284 Err(DeliveryError::TtlExpired)
285 } else {
286 self.ttl -= 1;
287 Ok(())
288 }
289 }
290
291 pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
293 self.data.deserialized()
294 }
295
296 pub fn data(&self) -> &Serialized {
298 &self.data
299 }
300
301 pub fn sender(&self) -> &ActorId {
303 &self.sender
304 }
305
306 pub fn dest(&self) -> &PortId {
308 &self.dest
309 }
310
311 pub fn headers(&self) -> &Attrs {
313 &self.headers
314 }
315
316 pub fn is_signal(&self) -> bool {
318 self.dest.index() == Signal::port()
319 }
320
321 pub fn set_error(&mut self, error: DeliveryError) {
324 self.errors.push(error)
325 }
326
327 pub fn undeliverable(
331 mut self,
332 error: DeliveryError,
333 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
334 ) {
335 tracing::error!(
336 name = "undelivered_message_attempt",
337 sender = self.sender.to_string(),
338 dest = self.dest.to_string(),
339 error = error.to_string(),
340 return_handle = %return_handle,
341 );
342 metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
343 1,
344 hyperactor_telemetry::kv_pairs!(
345 "sender_actor_id" => self.sender.to_string(),
346 "dest_actor_id" => self.dest.to_string(),
347 "message_type" => self.data.typename().unwrap_or("unknown"),
348 "error_type" => error.to_string(),
349 ),
350 );
351
352 self.set_error(error);
353 undeliverable::return_undeliverable(return_handle, self);
354 }
355
356 pub fn errors(&self) -> &Vec<DeliveryError> {
359 &self.errors
360 }
361
362 pub fn error_msg(&self) -> Option<String> {
366 if self.errors.is_empty() {
367 None
368 } else {
369 Some(
370 self.errors
371 .iter()
372 .map(|e| e.to_string())
373 .collect::<Vec<_>>()
374 .join("; "),
375 )
376 }
377 }
378
379 fn open(self) -> (MessageMetadata, Serialized) {
380 let Self {
381 sender,
382 dest,
383 data,
384 errors,
385 headers,
386 ttl,
387 } = self;
388
389 (
390 MessageMetadata {
391 sender,
392 dest,
393 errors,
394 headers,
395 ttl,
396 },
397 data,
398 )
399 }
400
401 fn seal(metadata: MessageMetadata, data: Serialized) -> Self {
402 let MessageMetadata {
403 sender,
404 dest,
405 errors,
406 headers,
407 ttl,
408 } = metadata;
409
410 Self {
411 sender,
412 dest,
413 data,
414 errors,
415 headers,
416 ttl,
417 }
418 }
419}
420
421impl fmt::Display for MessageEnvelope {
422 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423 match &self.error_msg() {
424 None => write!(f, "{} > {}: {}", self.sender, self.dest, self.data),
425 Some(err) => write!(
426 f,
427 "{} > {}: {}: delivery error: {}",
428 self.sender, self.dest, self.data, err
429 ),
430 }
431 }
432}
433
434#[derive(Clone)]
436pub struct MessageMetadata {
437 sender: ActorId,
438 dest: PortId,
439 errors: Vec<DeliveryError>,
440 headers: Attrs,
441 ttl: u8,
442}
443
444#[derive(Debug)]
447pub struct MailboxError {
448 actor_id: ActorId,
449 kind: MailboxErrorKind,
450}
451
452#[derive(thiserror::Error, Debug)]
455#[non_exhaustive]
456pub enum MailboxErrorKind {
457 #[error("mailbox closed")]
459 Closed,
460
461 #[error("invalid port: {0}")]
463 InvalidPort(PortId),
464
465 #[error("no sender for port: {0}")]
467 NoSenderForPort(PortId),
468
469 #[error("no local sender for port: {0}")]
472 NoLocalSenderForPort(PortId),
473
474 #[error("{0}: port closed")]
476 PortClosed(PortId),
477
478 #[error("send {0}: {1}")]
480 Send(PortId, #[source] anyhow::Error),
481
482 #[error("recv {0}: {1}")]
484 Recv(PortId, #[source] anyhow::Error),
485
486 #[error("serialize: {0}")]
488 Serialize(#[source] anyhow::Error),
489
490 #[error("deserialize {0}: {1}")]
492 Deserialize(&'static str, anyhow::Error),
493
494 #[error(transparent)]
496 Channel(#[from] ChannelError),
497}
498
499impl MailboxError {
500 pub fn new(actor_id: ActorId, kind: MailboxErrorKind) -> Self {
503 Self { actor_id, kind }
504 }
505
506 pub fn actor_id(&self) -> &ActorId {
508 &self.actor_id
509 }
510
511 pub fn kind(&self) -> &MailboxErrorKind {
513 &self.kind
514 }
515}
516
517impl fmt::Display for MailboxError {
518 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
519 write!(f, "{}: ", self.actor_id)?;
520 fmt::Display::fmt(&self.kind, f)
521 }
522}
523
524impl std::error::Error for MailboxError {
525 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
526 self.kind.source()
527 }
528}
529
530#[derive(Debug, Clone)]
534pub enum PortLocation {
535 Bound(PortId),
537 Unbound(ActorId, &'static str),
539}
540
541impl PortLocation {
542 fn new_unbound<M: Message>(actor_id: ActorId) -> Self {
543 PortLocation::Unbound(actor_id, std::any::type_name::<M>())
544 }
545
546 fn new_unbound_type(actor_id: ActorId, ty: &'static str) -> Self {
547 PortLocation::Unbound(actor_id, ty)
548 }
549
550 pub fn actor_id(&self) -> &ActorId {
552 match self {
553 PortLocation::Bound(port_id) => port_id.actor_id(),
554 PortLocation::Unbound(actor_id, _) => actor_id,
555 }
556 }
557}
558
559impl fmt::Display for PortLocation {
560 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
561 match self {
562 PortLocation::Bound(port_id) => write!(f, "{}", port_id),
563 PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
564 }
565 }
566}
567
568#[derive(Debug)]
571pub struct MailboxSenderError {
572 location: Box<PortLocation>,
573 kind: Box<MailboxSenderErrorKind>,
574}
575
576#[derive(thiserror::Error, Debug)]
578pub enum MailboxSenderErrorKind {
579 #[error("serialization error: {0}")]
581 Serialize(anyhow::Error),
582
583 #[error("deserialization error for type {0}: {1}")]
585 Deserialize(&'static str, anyhow::Error),
586
587 #[error("invalid port")]
589 Invalid,
590
591 #[error("port closed")]
593 Closed,
594
595 #[error(transparent)]
598 Mailbox(#[from] MailboxError),
599
600 #[error(transparent)]
602 Channel(#[from] ChannelError),
603
604 #[error(transparent)]
606 MessageLog(#[from] MessageLogError),
607
608 #[error("send error: {0}")]
610 Other(#[from] anyhow::Error),
611
612 #[error("unreachable: {0}")]
614 Unreachable(anyhow::Error),
615}
616
617impl MailboxSenderError {
618 pub fn new_unbound<M>(actor_id: ActorId, kind: MailboxSenderErrorKind) -> Self {
620 Self {
621 location: Box::new(PortLocation::Unbound(actor_id, std::any::type_name::<M>())),
622 kind: Box::new(kind),
623 }
624 }
625
626 pub fn new_unbound_type(
628 actor_id: ActorId,
629 kind: MailboxSenderErrorKind,
630 ty: &'static str,
631 ) -> Self {
632 Self {
633 location: Box::new(PortLocation::Unbound(actor_id, ty)),
634 kind: Box::new(kind),
635 }
636 }
637
638 pub fn new_bound(port_id: PortId, kind: MailboxSenderErrorKind) -> Self {
640 Self {
641 location: Box::new(PortLocation::Bound(port_id)),
642 kind: Box::new(kind),
643 }
644 }
645
646 pub fn location(&self) -> &PortLocation {
648 &self.location
649 }
650
651 pub fn kind(&self) -> &MailboxSenderErrorKind {
653 &self.kind
654 }
655}
656
657impl fmt::Display for MailboxSenderError {
658 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
659 write!(f, "{}: ", self.location)?;
660 fmt::Display::fmt(&self.kind, f)
661 }
662}
663
664impl std::error::Error for MailboxSenderError {
665 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
666 self.kind.source()
667 }
668}
669
670pub trait MailboxSender: Send + Sync + Debug + Any {
673 fn post(
676 &self,
677 mut envelope: MessageEnvelope,
678 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
679 ) {
680 if let Err(err) = envelope.dec_ttl_or_err() {
681 envelope.undeliverable(err, return_handle);
682 return;
683 }
684 self.post_unchecked(envelope, return_handle);
685 }
686
687 fn post_unchecked(
689 &self,
690 envelope: MessageEnvelope,
691 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
692 );
693}
694
695pub trait PortSender: MailboxSender {
698 fn serialize_and_send<M: RemoteMessage>(
700 &self,
701 port: &PortRef<M>,
702 message: M,
703 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
704 ) -> Result<(), MailboxSenderError> {
705 let serialized = Serialized::serialize(&message).map_err(|err| {
707 MailboxSenderError::new_bound(
708 port.port_id().clone(),
709 MailboxSenderErrorKind::Serialize(err.into()),
710 )
711 })?;
712 self.post(
713 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
714 return_handle,
715 );
716 Ok(())
717 }
718
719 fn serialize_and_send_once<M: RemoteMessage>(
722 &self,
723 once_port: OncePortRef<M>,
724 message: M,
725 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
726 ) -> Result<(), MailboxSenderError> {
727 let serialized = Serialized::serialize(&message).map_err(|err| {
728 MailboxSenderError::new_bound(
729 once_port.port_id().clone(),
730 MailboxSenderErrorKind::Serialize(err.into()),
731 )
732 })?;
733 self.post(
734 MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
735 return_handle,
736 );
737 Ok(())
738 }
739}
740
741impl<T: ?Sized + MailboxSender> PortSender for T {}
742
743#[derive(Debug, Clone)]
747pub struct PanickingMailboxSender;
748
749impl MailboxSender for PanickingMailboxSender {
750 fn post_unchecked(
751 &self,
752 envelope: MessageEnvelope,
753 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
754 ) {
755 panic!("panic! in the mailbox! attempted post: {}", envelope)
756 }
757}
758
759#[derive(Debug)]
762pub struct UndeliverableMailboxSender;
763
764impl MailboxSender for UndeliverableMailboxSender {
765 fn post_unchecked(
766 &self,
767 envelope: MessageEnvelope,
768 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
769 ) {
770 let sender_name = envelope.sender.name();
771 let mut error_str = "".to_string();
772 if !envelope.errors.is_empty() {
773 error_str = envelope
774 .errors
775 .iter()
776 .map(|e| e.to_string())
777 .collect::<Vec<_>>()
778 .join("; ");
779 }
780 tracing::error!(
783 name = "undelivered_message_abandoned",
784 actor_name = sender_name,
785 actor_id = envelope.sender.to_string(),
786 dest = envelope.dest.to_string(),
787 headers = envelope.headers().to_string(), data = envelope.data().to_string(),
789 "message not delivered, {}",
790 error_str,
791 );
792 }
793}
794
795#[derive(Debug)]
796struct Buffer<T: Message> {
797 queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
798 processed: watch::Receiver<usize>,
799 seq: AtomicUsize,
800}
801
802impl<T: Message> Buffer<T> {
803 fn new<Fut>(
804 process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
805 ) -> Self
806 where
807 Fut: Future<Output = ()> + Send + 'static,
808 {
809 let (queue, mut next) = mpsc::unbounded_channel();
810 let (last_processed, processed) = watch::channel(0);
811 crate::init::get_runtime().spawn(async move {
812 let mut seq = 0;
813 while let Some((msg, return_handle)) = next.recv().await {
814 process(msg, return_handle).await;
815 seq += 1;
816 let _ = last_processed.send(seq);
817 }
818 });
819 Self {
820 queue,
821 processed,
822 seq: AtomicUsize::new(0),
823 }
824 }
825
826 #[allow(clippy::result_large_err)]
827 fn send(
828 &self,
829 item: (T, PortHandle<Undeliverable<T>>),
830 ) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
831 self.seq.fetch_add(1, Ordering::SeqCst);
832 self.queue.send(item)?;
833 Ok(())
834 }
835
836 async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
837 let seq = self.seq.load(Ordering::SeqCst);
838 while *self.processed.borrow_and_update() < seq {
839 self.processed.changed().await?;
840 }
841 Ok(())
842 }
843}
844
845static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
846 LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
847
848#[derive(Debug, Clone)]
854pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
855
856impl BoxedMailboxSender {
857 pub fn new(sender: impl MailboxSender + 'static) -> Self {
859 Self(Arc::new(sender))
860 }
861
862 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
865 (&*self.0 as &dyn Any).downcast_ref::<T>()
866 }
867}
868
869pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
871 fn boxed(&self) -> BoxedMailboxSender;
873}
874impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
875 fn boxed(&self) -> BoxedMailboxSender {
876 BoxedMailboxSender::new(self.clone())
877 }
878}
879
880pub trait IntoBoxedMailboxSender: MailboxSender {
882 fn into_boxed(self) -> BoxedMailboxSender;
884}
885impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
886 fn into_boxed(self) -> BoxedMailboxSender {
887 BoxedMailboxSender::new(self)
888 }
889}
890
891impl MailboxSender for BoxedMailboxSender {
892 fn post_unchecked(
893 &self,
894 envelope: MessageEnvelope,
895 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
896 ) {
897 self.0.post_unchecked(envelope, return_handle);
898 }
899}
900
901#[derive(thiserror::Error, Debug)]
903pub enum MailboxServerError {
904 #[error(transparent)]
906 Channel(#[from] ChannelError),
907
908 #[error(transparent)]
910 MailboxSender(#[from] MailboxSenderError),
911}
912
913#[derive(Debug)]
916pub struct MailboxServerHandle {
917 join_handle: JoinHandle<Result<(), MailboxServerError>>,
918 stopped_tx: watch::Sender<bool>,
919}
920
921impl MailboxServerHandle {
922 pub fn stop(&self, reason: &str) {
927 tracing::info!("stopping mailbox server; reason: {}", reason);
928 self.stopped_tx.send(true).expect("stop called twice");
929 }
930}
931
932impl Future for MailboxServerHandle {
934 type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
935
936 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
937 let join_handle_pinned =
939 unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
940 join_handle_pinned.poll(cx)
941 }
942}
943
944fn server_return_handle<T: MailboxServer>(server: T) -> PortHandle<Undeliverable<MessageEnvelope>> {
949 let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
950
951 tokio::task::spawn(async move {
952 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
953 if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
954 {
955 UndeliverableMailboxSender.post(e, monitored_return_handle());
957 continue;
958 }
959 envelope.set_error(DeliveryError::BrokenLink(
960 "message was undeliverable".to_owned(),
961 ));
962 server.post(
963 MessageEnvelope::new(
964 envelope.sender().clone(),
965 PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
966 envelope.sender(),
967 )
968 .port_id()
969 .clone(),
970 Serialized::serialize(&Undeliverable(envelope)).unwrap(),
971 Attrs::new(),
972 ),
973 monitored_return_handle(),
974 );
975 }
976 });
977
978 return_handle
979}
980
981pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
984 fn serve(
988 self,
989 mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
990 ) -> MailboxServerHandle {
991 let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
996 let server = self.clone();
997 tokio::task::spawn(async move {
998 while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
999 if let Ok(Undeliverable(e)) =
1000 envelope.deserialized::<Undeliverable<MessageEnvelope>>()
1001 {
1002 UndeliverableMailboxSender.post(e, monitored_return_handle());
1004 continue;
1005 }
1006 envelope.set_error(DeliveryError::BrokenLink(
1007 "message was undeliverable".to_owned(),
1008 ));
1009 server.post(
1010 MessageEnvelope::new(
1011 envelope.sender().clone(),
1012 PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1013 envelope.sender(),
1014 )
1015 .port_id()
1016 .clone(),
1017 Serialized::serialize(&Undeliverable(envelope)).unwrap(),
1018 Attrs::new(),
1019 ),
1020 monitored_return_handle(),
1021 );
1022 }
1023 });
1024
1025 let (stopped_tx, mut stopped_rx) = watch::channel(false);
1026 let join_handle = tokio::spawn(async move {
1027 let mut detached = false;
1028
1029 loop {
1030 if *stopped_rx.borrow_and_update() {
1031 break Ok(());
1032 }
1033
1034 tokio::select! {
1035 message = rx.recv() => {
1036 match message {
1037 Ok(envelope) => self.post(envelope, return_handle.clone()),
1039
1040 Err(ChannelError::Closed) => break Ok(()),
1043 Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1044 }
1045 }
1046 result = stopped_rx.changed(), if !detached => {
1047 tracing::debug!(
1048 "the mailbox server is stopped"
1049 );
1050 detached = result.is_err();
1051 }
1052 }
1053 }
1054 });
1055
1056 MailboxServerHandle {
1057 join_handle,
1058 stopped_tx,
1059 }
1060 }
1061}
1062
1063impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1064
1065#[derive(Debug)]
1067pub struct MailboxClient {
1068 buffer: Buffer<MessageEnvelope>,
1070
1071 _tx_monitoring: CancellationToken,
1073}
1074
1075impl MailboxClient {
1076 pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1079 let addr = tx.addr();
1080 let tx = Arc::new(tx);
1081 let tx_status = tx.status().clone();
1082 let tx_monitoring = CancellationToken::new();
1083 let buffer = Buffer::new(move |envelope, return_handle| {
1084 let tx = Arc::clone(&tx);
1085 let (return_channel, return_receiver) = oneshot::channel();
1086 let return_handle_0 = return_handle.clone();
1088 tokio::spawn(async move {
1089 let result = return_receiver.await;
1090 if let Ok(message) = result {
1091 let _ = return_handle_0.send(Undeliverable(message));
1092 } else {
1093 }
1095 });
1096 let return_handle_1 = return_handle.clone();
1098 async move {
1099 if let Err(SendError(_, envelope)) = tx.try_post(envelope, return_channel) {
1100 envelope.undeliverable(
1102 DeliveryError::BrokenLink("failed to enqueue in MailboxClient".to_string()),
1103 return_handle_1.clone(),
1104 );
1105 }
1106 }
1107 });
1108 let this = Self {
1109 buffer,
1110 _tx_monitoring: tx_monitoring.clone(),
1111 };
1112 Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1113 this
1114 }
1115
1116 pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1119 Ok(MailboxClient::new(channel::dial(addr)?))
1120 }
1121
1122 fn monitor_tx_health(
1124 mut rx: watch::Receiver<TxStatus>,
1125 cancel_token: CancellationToken,
1126 addr: ChannelAddr,
1127 ) {
1128 crate::init::get_runtime().spawn(async move {
1129 loop {
1130 tokio::select! {
1131 changed = rx.changed() => {
1132 if changed.is_err() || *rx.borrow() == TxStatus::Closed {
1133 tracing::warn!("connection to {} lost", addr);
1134 break;
1137 }
1138 }
1139 _ = cancel_token.cancelled() => {
1140 break;
1141 }
1142 }
1143 }
1144 });
1145 }
1146}
1147
1148impl MailboxSender for MailboxClient {
1149 fn post_unchecked(
1150 &self,
1151 envelope: MessageEnvelope,
1152 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1153 ) {
1154 tracing::event!(target:"messages", tracing::Level::DEBUG, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1155 if let Err(mpsc::error::SendError((envelope, return_handle))) =
1156 self.buffer.send((envelope, return_handle))
1157 {
1158 let err = DeliveryError::BrokenLink("failed to enqueue in MailboxClient".to_string());
1159
1160 envelope.undeliverable(err, return_handle);
1162 }
1163 }
1164}
1165
1166pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1168 cx: C,
1169 port: PortRef<M>,
1170}
1171
1172impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1173 pub fn new(cx: C, port: PortRef<M>) -> Self {
1175 Self { cx, port }
1176 }
1177}
1178
1179impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1180 type Error = MailboxSenderError;
1181
1182 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1183 Poll::Ready(Ok(()))
1184 }
1185
1186 fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1187 self.port.send(&self.cx, item)
1188 }
1189
1190 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1191 Poll::Ready(Ok(()))
1192 }
1193
1194 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1195 Poll::Ready(Ok(()))
1196 }
1197}
1198
1199#[derive(Clone, Debug)]
1202pub struct Mailbox {
1203 inner: Arc<State>,
1204}
1205
1206impl Mailbox {
1207 pub fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
1210 Self {
1211 inner: Arc::new(State::new(actor_id, forwarder)),
1212 }
1213 }
1214
1215 pub fn new_detached(actor_id: ActorId) -> Self {
1217 Self {
1218 inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1219 }
1220 }
1221
1222 pub fn actor_id(&self) -> &ActorId {
1224 &self.inner.actor_id
1225 }
1226
1227 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1232 let port_index = self.inner.allocate_port();
1233 let (sender, receiver) = mpsc::unbounded_channel::<M>();
1234 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1235 tracing::trace!(
1236 name = "open_port",
1237 "opening port for {} at {}",
1238 self.inner.actor_id,
1239 port_id
1240 );
1241 (
1242 PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1243 PortReceiver::new(receiver, port_id, false, self.clone()),
1244 )
1245 }
1246
1247 pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1253 where
1254 A: Accumulator + Send + Sync + 'static,
1255 A::Update: Message,
1256 A::State: Message + Default + Clone,
1257 {
1258 let port_index = self.inner.allocate_port();
1259 let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1260 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1261 let state = Mutex::new(A::State::default());
1262 let reducer_spec = accum.reducer_spec();
1263 let enqueue = move |_, update: A::Update| {
1264 let mut state = state.lock().unwrap();
1265 accum.accumulate(&mut state, update)?;
1266 let _ = sender.send(state.clone());
1267 Ok(())
1268 };
1269 (
1270 PortHandle {
1271 mailbox: self.clone(),
1272 port_index,
1273 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1274 bound: Arc::new(OnceLock::new()),
1275 reducer_spec,
1276 reducer_opts: None, },
1278 PortReceiver::new(receiver, port_id, true, self.clone()),
1279 )
1280 }
1281
1282 pub(crate) fn open_enqueue_port<M: Message>(
1286 &self,
1287 enqueue: impl Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1288 ) -> PortHandle<M> {
1289 PortHandle {
1290 mailbox: self.clone(),
1291 port_index: self.inner.allocate_port(),
1292 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1293 bound: Arc::new(OnceLock::new()),
1294 reducer_spec: None,
1295 reducer_opts: None,
1296 }
1297 }
1298
1299 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1303 let port_index = self.inner.allocate_port();
1304 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1305 let (sender, receiver) = oneshot::channel::<M>();
1306 (
1307 OncePortHandle {
1308 mailbox: self.clone(),
1309 port_index,
1310 port_id: port_id.clone(),
1311 sender,
1312 },
1313 OncePortReceiver {
1314 receiver: Some(receiver),
1315 port_id,
1316 mailbox: self.clone(),
1317 },
1318 )
1319 }
1320
1321 fn error(&self, err: MailboxErrorKind) -> MailboxError {
1322 MailboxError::new(self.inner.actor_id.clone(), err)
1323 }
1324
1325 fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1326 let port_index = M::port();
1327 self.inner.ports.get(&port_index).and_then(|boxed| {
1328 boxed
1329 .as_any()
1330 .downcast_ref::<UnboundedSender<M>>()
1331 .map(|s| {
1332 assert_eq!(
1333 s.port_id,
1334 self.actor_id().port_id(port_index),
1335 "port_id mismatch in downcasted UnboundedSender"
1336 );
1337 s.sender.clone()
1338 })
1339 })
1340 }
1341
1342 pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1344 self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1345 .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1346 }
1347
1348 pub(crate) fn allocate_port(&self) -> u64 {
1349 self.inner.allocate_port()
1350 }
1351
1352 fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
1353 assert_eq!(
1354 handle.mailbox.actor_id(),
1355 self.actor_id(),
1356 "port does not belong to mailbox"
1357 );
1358
1359 let port_id = self.actor_id().port_id(handle.port_index);
1362 match self.inner.ports.entry(handle.port_index) {
1363 Entry::Vacant(entry) => {
1364 entry.insert(Box::new(UnboundedSender::new(
1365 handle.sender.clone(),
1366 port_id.clone(),
1367 )));
1368 }
1369 Entry::Occupied(_entry) => {}
1370 }
1371
1372 PortRef::attest(port_id)
1373 }
1374
1375 fn bind_to<M: RemoteMessage>(&self, handle: &PortHandle<M>, port_index: u64) {
1376 assert_eq!(
1377 handle.mailbox.actor_id(),
1378 self.actor_id(),
1379 "port does not belong to mailbox"
1380 );
1381
1382 let port_id = self.actor_id().port_id(port_index);
1383 match self.inner.ports.entry(port_index) {
1384 Entry::Vacant(entry) => {
1385 entry.insert(Box::new(UnboundedSender::new(
1386 handle.sender.clone(),
1387 port_id,
1388 )));
1389 }
1390 Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1391 }
1392 }
1393
1394 fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1395 let port_id = handle.port_id().clone();
1396 match self.inner.ports.entry(handle.port_index) {
1397 Entry::Vacant(entry) => {
1398 entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1399 }
1400 Entry::Occupied(_entry) => {}
1401 }
1402 }
1403
1404 pub(crate) fn bind_untyped(&self, port_id: &PortId, sender: UntypedUnboundedSender) {
1405 assert_eq!(
1406 port_id.actor_id(),
1407 self.actor_id(),
1408 "port does not belong to mailbox"
1409 );
1410
1411 match self.inner.ports.entry(port_id.index()) {
1412 Entry::Vacant(entry) => {
1413 entry.insert(Box::new(sender));
1414 }
1415 Entry::Occupied(_entry) => {}
1416 }
1417 }
1418}
1419
1420impl context::Mailbox for Mailbox {
1421 fn mailbox(&self) -> &Mailbox {
1422 self
1423 }
1424}
1425
1426pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1431 cx.mailbox().open_port()
1432}
1433
1434pub fn open_once_port<M: Message>(
1437 cx: &impl context::Mailbox,
1438) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1439 cx.mailbox().open_once_port()
1440}
1441
1442impl MailboxSender for Mailbox {
1443 fn post_unchecked(
1446 &self,
1447 envelope: MessageEnvelope,
1448 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1449 ) {
1450 metrics::MAILBOX_POSTS.add(
1451 1,
1452 hyperactor_telemetry::kv_pairs!(
1453 "actor_id" => envelope.sender.to_string(),
1454 "dest_actor_id" => envelope.dest.0.to_string(),
1455 ),
1456 );
1457 tracing::trace!(
1458 name = "post",
1459 actor_name = envelope.sender.name(),
1460 actor_id = envelope.sender.to_string(),
1461 "posting message to {}",
1462 envelope.dest
1463 );
1464
1465 if envelope.dest().actor_id() != &self.inner.actor_id {
1466 return self.inner.forwarder.post(envelope, return_handle);
1467 }
1468
1469 match self.inner.ports.entry(envelope.dest().index()) {
1470 Entry::Vacant(_) => {
1471 let err = DeliveryError::Unroutable("port not bound in mailbox".to_string());
1472
1473 envelope.undeliverable(err, return_handle);
1474 }
1475 Entry::Occupied(entry) => {
1476 let (metadata, data) = envelope.open();
1477 let MessageMetadata {
1478 headers,
1479 sender,
1480 dest,
1481 errors: metadata_errors,
1482 ttl,
1483 } = metadata;
1484
1485 match entry.get().send_serialized(headers, data) {
1493 Ok(false) => {
1494 entry.remove();
1495 }
1496 Ok(true) => (),
1497 Err(SerializedSenderError {
1498 data,
1499 error: sender_error,
1500 headers,
1501 }) => {
1502 let err = DeliveryError::Mailbox(format!("{}", sender_error));
1503
1504 MessageEnvelope::seal(
1505 MessageMetadata {
1506 headers,
1507 sender,
1508 dest,
1509 errors: metadata_errors,
1510 ttl,
1511 },
1512 data,
1513 )
1514 .undeliverable(err, return_handle)
1515 }
1516 }
1517 }
1518 }
1519 }
1520}
1521
1522#[derive(Debug)]
1530pub struct PortHandle<M: Message> {
1531 mailbox: Mailbox,
1532 port_index: u64,
1533 sender: UnboundedPortSender<M>,
1534 bound: Arc<OnceLock<PortId>>,
1541 reducer_spec: Option<ReducerSpec>,
1544 reducer_opts: Option<ReducerOpts>,
1546}
1547
1548impl<M: Message> PortHandle<M> {
1549 fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1550 Self {
1551 mailbox,
1552 port_index,
1553 sender,
1554 bound: Arc::new(OnceLock::new()),
1555 reducer_spec: None,
1556 reducer_opts: None,
1557 }
1558 }
1559
1560 fn location(&self) -> PortLocation {
1561 match self.bound.get() {
1562 Some(port_id) => PortLocation::Bound(port_id.clone()),
1563 None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1564 }
1565 }
1566
1567 pub fn send(&self, message: M) -> Result<(), MailboxSenderError> {
1569 let mut headers = Attrs::new();
1570
1571 crate::mailbox::headers::set_send_timestamp(&mut headers);
1572
1573 self.sender.send(headers, message).map_err(|err| {
1574 MailboxSenderError::new_unbound::<M>(
1575 self.mailbox.actor_id().clone(),
1576 MailboxSenderErrorKind::Other(err),
1577 )
1578 })
1579 }
1580
1581 pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
1584 where
1585 R: Message,
1586 F: Fn(R) -> M + Send + Sync + 'static,
1587 {
1588 let port_index = self.mailbox.inner.allocate_port();
1589 let sender = self.sender.clone();
1590 PortHandle::new(
1591 self.mailbox.clone(),
1592 port_index,
1593 UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
1594 sender.send(headers, unmap(value))
1595 })),
1596 )
1597 }
1598}
1599
1600impl<M: RemoteMessage> PortHandle<M> {
1601 pub fn bind(&self) -> PortRef<M> {
1603 PortRef::attest_reducible(
1604 self.bound
1605 .get_or_init(|| self.mailbox.bind(self).port_id().clone())
1606 .clone(),
1607 self.reducer_spec.clone(),
1608 )
1609 }
1610
1611 pub fn bind_to(&self, port_index: u64) {
1614 self.mailbox.bind_to(self, port_index);
1615 }
1616}
1617
1618impl<M: Message> Clone for PortHandle<M> {
1619 fn clone(&self) -> Self {
1620 Self {
1621 mailbox: self.mailbox.clone(),
1622 port_index: self.port_index,
1623 sender: self.sender.clone(),
1624 bound: self.bound.clone(),
1625 reducer_spec: self.reducer_spec.clone(),
1626 reducer_opts: self.reducer_opts.clone(),
1627 }
1628 }
1629}
1630
1631impl<M: Message> fmt::Display for PortHandle<M> {
1632 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1633 fmt::Display::fmt(&self.location(), f)
1634 }
1635}
1636
1637#[derive(Debug)]
1639pub struct OncePortHandle<M: Message> {
1640 mailbox: Mailbox,
1641 port_index: u64,
1642 port_id: PortId,
1643 sender: oneshot::Sender<M>,
1644}
1645
1646impl<M: Message> OncePortHandle<M> {
1647 pub fn port_id(&self) -> &PortId {
1650 &self.port_id
1651 }
1652
1653 pub fn send(self, message: M) -> Result<(), MailboxSenderError> {
1656 let actor_id = self.mailbox.actor_id().clone();
1657 self.sender.send(message).map_err(|_| {
1658 MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1663 })?;
1664 Ok(())
1665 }
1666}
1667
1668impl<M: RemoteMessage> OncePortHandle<M> {
1669 pub fn bind(self) -> OncePortRef<M> {
1674 let port_id = self.port_id().clone();
1675 self.mailbox.clone().bind_once(self);
1676 OncePortRef::attest(port_id)
1677 }
1678}
1679
1680impl<M: Message> fmt::Display for OncePortHandle<M> {
1681 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1682 fmt::Display::fmt(&self.port_id(), f)
1683 }
1684}
1685
1686#[derive(Debug)]
1689pub struct PortReceiver<M> {
1690 receiver: mpsc::UnboundedReceiver<M>,
1691 port_id: PortId,
1692 coalesce: bool,
1695 mailbox: Mailbox,
1698}
1699
1700impl<M> PortReceiver<M> {
1701 fn new(
1702 receiver: mpsc::UnboundedReceiver<M>,
1703 port_id: PortId,
1704 coalesce: bool,
1705 mailbox: Mailbox,
1706 ) -> Self {
1707 Self {
1708 receiver,
1709 port_id,
1710 coalesce,
1711 mailbox,
1712 }
1713 }
1714
1715 #[allow(clippy::result_large_err)] pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
1720 let mut next = self.receiver.try_recv();
1721 if self.coalesce
1723 && let Some(latest) = self.drain().pop()
1724 {
1725 next = Ok(latest);
1726 }
1727 match next {
1728 Ok(msg) => Ok(Some(msg)),
1729 Err(mpsc::error::TryRecvError::Empty) => Ok(None),
1730 Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
1731 self.actor_id().clone(),
1732 MailboxErrorKind::Closed,
1733 )),
1734 }
1735 }
1736
1737 pub async fn recv(&mut self) -> Result<M, MailboxError> {
1740 let mut next = self.receiver.recv().await;
1741 if self.coalesce
1744 && let Some(latest) = self.drain().pop()
1745 {
1746 next = Some(latest);
1747 }
1748 next.ok_or(MailboxError::new(
1749 self.actor_id().clone(),
1750 MailboxErrorKind::Closed,
1751 ))
1752 }
1753
1754 pub fn drain(&mut self) -> Vec<M> {
1756 let mut drained: Vec<M> = Vec::new();
1757 while let Ok(msg) = self.receiver.try_recv() {
1758 if self.coalesce {
1760 drained.pop();
1761 }
1762 drained.push(msg);
1763 }
1764 drained
1765 }
1766
1767 fn port(&self) -> u64 {
1768 self.port_id.1
1769 }
1770
1771 fn actor_id(&self) -> &ActorId {
1772 &self.port_id.0
1773 }
1774}
1775
1776impl<M> Drop for PortReceiver<M> {
1777 fn drop(&mut self) {
1778 self.mailbox.inner.ports.remove(&self.port());
1782 }
1783}
1784
1785impl<M> Stream for PortReceiver<M> {
1786 type Item = Result<M, MailboxError>;
1787
1788 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1789 std::pin::pin!(self.recv()).poll(cx).map(Some)
1790 }
1791}
1792
1793pub struct OncePortReceiver<M> {
1795 receiver: Option<oneshot::Receiver<M>>,
1796 port_id: PortId,
1797
1798 mailbox: Mailbox,
1801}
1802
1803impl<M> OncePortReceiver<M> {
1804 pub async fn recv(mut self) -> Result<M, MailboxError> {
1808 std::mem::take(&mut self.receiver)
1809 .unwrap()
1810 .await
1811 .map_err(|err| {
1812 MailboxError::new(
1813 self.actor_id().clone(),
1814 MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
1815 )
1816 })
1817 }
1818
1819 fn port(&self) -> u64 {
1820 self.port_id.1
1821 }
1822
1823 fn actor_id(&self) -> &ActorId {
1824 &self.port_id.0
1825 }
1826}
1827
1828impl<M> Drop for OncePortReceiver<M> {
1829 fn drop(&mut self) {
1830 self.mailbox.inner.ports.remove(&self.port());
1834 }
1835}
1836
1837pub struct SerializedSenderError {
1839 pub headers: Attrs,
1841 pub data: Serialized,
1843 pub error: MailboxSenderError,
1845}
1846
1847trait SerializedSender: Send + Sync {
1852 fn as_any(&self) -> &dyn Any;
1858
1859 #[allow(clippy::result_large_err)] fn send_serialized(
1867 &self,
1868 headers: Attrs,
1869 serialized: Serialized,
1870 ) -> Result<bool, SerializedSenderError>;
1871}
1872
1873enum UnboundedPortSender<M: Message> {
1875 Mpsc(mpsc::UnboundedSender<M>),
1877 Func(Arc<dyn Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
1879}
1880
1881impl<M: Message> UnboundedPortSender<M> {
1882 fn send(&self, headers: Attrs, message: M) -> Result<(), anyhow::Error> {
1883 match self {
1884 Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
1885 Self::Func(func) => func(headers, message),
1886 }
1887 }
1888}
1889
1890impl<M: Message> Clone for UnboundedPortSender<M> {
1893 fn clone(&self) -> Self {
1894 match self {
1895 Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
1896 Self::Func(func) => Self::Func(func.clone()),
1897 }
1898 }
1899}
1900
1901impl<M: Message> Debug for UnboundedPortSender<M> {
1902 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1903 match self {
1904 Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
1905 Self::Func(_) => f
1906 .debug_tuple("UnboundedPortSender::Func")
1907 .field(&"..")
1908 .finish(),
1909 }
1910 }
1911}
1912
1913struct UnboundedSender<M: Message> {
1914 sender: UnboundedPortSender<M>,
1915 port_id: PortId,
1916}
1917
1918impl<M: Message> UnboundedSender<M> {
1919 fn new(sender: UnboundedPortSender<M>, port_id: PortId) -> Self {
1922 Self { sender, port_id }
1923 }
1924
1925 fn send(&self, headers: Attrs, message: M) -> Result<(), MailboxSenderError> {
1926 self.sender.send(headers, message).map_err(|err| {
1927 MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
1928 })
1929 }
1930}
1931
1932impl<M: Message> Clone for UnboundedSender<M> {
1936 fn clone(&self) -> Self {
1937 Self {
1938 sender: self.sender.clone(),
1939 port_id: self.port_id.clone(),
1940 }
1941 }
1942}
1943
1944impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
1945 fn as_any(&self) -> &dyn Any {
1946 self
1947 }
1948
1949 fn send_serialized(
1950 &self,
1951 headers: Attrs,
1952 serialized: Serialized,
1953 ) -> Result<bool, SerializedSenderError> {
1954 match serialized.deserialized_unchecked() {
1960 Ok(message) => {
1961 self.sender.send(headers.clone(), message).map_err(|err| {
1962 SerializedSenderError {
1963 data: serialized,
1964 error: MailboxSenderError::new_bound(
1965 self.port_id.clone(),
1966 MailboxSenderErrorKind::Other(err),
1967 ),
1968 headers,
1969 }
1970 })?;
1971
1972 Ok(true)
1973 }
1974 Err(err) => Err(SerializedSenderError {
1975 data: serialized,
1976 error: MailboxSenderError::new_bound(
1977 self.port_id.clone(),
1978 MailboxSenderErrorKind::Deserialize(M::typename(), err),
1979 ),
1980 headers,
1981 }),
1982 }
1983 }
1984}
1985
1986#[derive(Debug)]
1989struct OnceSender<M: Message> {
1990 sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
1991 port_id: PortId,
1992}
1993
1994impl<M: Message> OnceSender<M> {
1995 fn new(sender: oneshot::Sender<M>, port_id: PortId) -> Self {
1998 Self {
1999 sender: Arc::new(Mutex::new(Some(sender))),
2000 port_id,
2001 }
2002 }
2003
2004 fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2005 match self.sender.lock().unwrap().take() {
2007 None => Err(MailboxSenderError::new_bound(
2008 self.port_id.clone(),
2009 MailboxSenderErrorKind::Closed,
2010 )),
2011 Some(sender) => {
2012 sender.send(message).map_err(|_| {
2013 MailboxSenderError::new_bound(
2018 self.port_id.clone(),
2019 MailboxSenderErrorKind::Closed,
2020 )
2021 })?;
2022 Ok(false)
2023 }
2024 }
2025 }
2026}
2027
2028impl<M: Message> Clone for OnceSender<M> {
2032 fn clone(&self) -> Self {
2033 Self {
2034 sender: self.sender.clone(),
2035 port_id: self.port_id.clone(),
2036 }
2037 }
2038}
2039
2040impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2041 fn as_any(&self) -> &dyn Any {
2042 self
2043 }
2044
2045 fn send_serialized(
2046 &self,
2047 headers: Attrs,
2048 serialized: Serialized,
2049 ) -> Result<bool, SerializedSenderError> {
2050 match serialized.deserialized() {
2051 Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2052 data: serialized,
2053 error: e,
2054 headers,
2055 }),
2056 Err(err) => Err(SerializedSenderError {
2057 data: serialized,
2058 error: MailboxSenderError::new_bound(
2059 self.port_id.clone(),
2060 MailboxSenderErrorKind::Deserialize(M::typename(), err),
2061 ),
2062 headers,
2063 }),
2064 }
2065 }
2066}
2067
2068pub(crate) struct UntypedUnboundedSender {
2070 pub(crate) sender:
2071 Box<dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync>,
2072 pub(crate) port_id: PortId,
2073}
2074
2075impl SerializedSender for UntypedUnboundedSender {
2076 fn as_any(&self) -> &dyn Any {
2077 self
2078 }
2079
2080 fn send_serialized(
2081 &self,
2082 headers: Attrs,
2083 serialized: Serialized,
2084 ) -> Result<bool, SerializedSenderError> {
2085 (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2086 data,
2087 error: MailboxSenderError::new_bound(
2088 self.port_id.clone(),
2089 MailboxSenderErrorKind::Other(err),
2090 ),
2091 headers,
2092 })?;
2093
2094 Ok(true)
2095 }
2096}
2097
2098struct State {
2100 actor_id: ActorId,
2102
2103 ports: DashMap<u64, Box<dyn SerializedSender>>,
2107
2108 next_port: AtomicU64,
2110
2111 forwarder: BoxedMailboxSender,
2113}
2114
2115impl State {
2116 fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
2118 Self {
2119 actor_id,
2120 ports: DashMap::new(),
2121 next_port: AtomicU64::new(USER_PORT_OFFSET),
2124 forwarder,
2125 }
2126 }
2127
2128 fn allocate_port(&self) -> u64 {
2130 self.next_port.fetch_add(1, Ordering::SeqCst)
2131 }
2132}
2133
2134impl fmt::Debug for State {
2135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2136 f.debug_struct("State")
2137 .field("actor_id", &self.actor_id)
2138 .field(
2139 "open_ports",
2140 &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2141 )
2142 .field("next_port", &self.next_port)
2143 .finish()
2144 }
2145}
2146
2147#[derive(Debug, Clone)]
2151pub struct MailboxMuxer {
2152 mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2153}
2154
2155impl Default for MailboxMuxer {
2156 fn default() -> Self {
2157 Self::new()
2158 }
2159}
2160
2161impl MailboxMuxer {
2162 pub fn new() -> Self {
2164 Self {
2165 mailboxes: Arc::new(DashMap::new()),
2166 }
2167 }
2168
2169 pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
2174 match self.mailboxes.entry(actor_id) {
2175 Entry::Occupied(_) => false,
2176 Entry::Vacant(entry) => {
2177 entry.insert(Box::new(sender));
2178 true
2179 }
2180 }
2181 }
2182
2183 pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2185 self.bind(mailbox.actor_id().clone(), mailbox)
2186 }
2187
2188 pub(crate) fn unbind(&self, actor_id: &ActorId) {
2192 self.mailboxes.remove(actor_id);
2193 }
2194
2195 pub fn bound_actors(&self) -> Vec<ActorId> {
2197 self.mailboxes.iter().map(|e| e.key().clone()).collect()
2198 }
2199}
2200
2201impl MailboxSender for MailboxMuxer {
2202 fn post_unchecked(
2203 &self,
2204 envelope: MessageEnvelope,
2205 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2206 ) {
2207 let dest_actor_id = envelope.dest().actor_id();
2208 match self.mailboxes.get(envelope.dest().actor_id()) {
2209 None => {
2210 let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2211 envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2212 }
2213 Some(sender) => sender.post(envelope, return_handle),
2214 }
2215 }
2216}
2217
2218#[derive(Debug, Clone)]
2221pub struct MailboxRouter {
2222 entries: Arc<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2223}
2224
2225impl Default for MailboxRouter {
2226 fn default() -> Self {
2227 Self::new()
2228 }
2229}
2230
2231impl MailboxRouter {
2232 pub fn new() -> Self {
2234 Self {
2235 entries: Arc::new(RwLock::new(BTreeMap::new())),
2236 }
2237 }
2238
2239 pub fn downgrade(&self) -> WeakMailboxRouter {
2241 WeakMailboxRouter(Arc::downgrade(&self.entries))
2242 }
2243
2244 pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2248 FallbackMailboxRouter {
2249 router: self.clone(),
2250 default,
2251 }
2252 }
2253
2254 pub fn bind(&self, dest: Reference, sender: impl MailboxSender + 'static) {
2258 let mut w = self.entries.write().unwrap();
2259 w.insert(dest, Arc::new(sender));
2260 }
2261
2262 fn sender(&self, actor_id: &ActorId) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2263 match self
2264 .entries
2265 .read()
2266 .unwrap()
2267 .lower_bound(Excluded(&actor_id.clone().into()))
2268 .prev()
2269 {
2270 None => None,
2271 Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2272 Some(sender.clone())
2273 }
2274 Some(_) => None,
2275 }
2276 }
2277}
2278
2279impl MailboxSender for MailboxRouter {
2280 fn post_unchecked(
2281 &self,
2282 envelope: MessageEnvelope,
2283 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2284 ) {
2285 match self.sender(envelope.dest().actor_id()) {
2286 None => envelope.undeliverable(
2287 DeliveryError::Unroutable(
2288 "no destination found for actor in routing table".to_string(),
2289 ),
2290 return_handle,
2291 ),
2292 Some(sender) => sender.post(envelope, return_handle),
2293 }
2294 }
2295}
2296
2297#[derive(Debug, Clone)]
2298struct FallbackMailboxRouter {
2299 router: MailboxRouter,
2300 default: BoxedMailboxSender,
2301}
2302
2303impl MailboxSender for FallbackMailboxRouter {
2304 fn post_unchecked(
2305 &self,
2306 envelope: MessageEnvelope,
2307 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2308 ) {
2309 match self.router.sender(envelope.dest().actor_id()) {
2310 Some(sender) => sender.post(envelope, return_handle),
2311 None => self.default.post(envelope, return_handle),
2312 }
2313 }
2314}
2315
2316#[derive(Debug, Clone)]
2325pub struct WeakMailboxRouter(
2326 Weak<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2327);
2328
2329impl WeakMailboxRouter {
2330 pub fn upgrade(&self) -> Option<MailboxRouter> {
2332 self.0.upgrade().map(|entries| MailboxRouter { entries })
2333 }
2334}
2335
2336impl MailboxSender for WeakMailboxRouter {
2337 fn post_unchecked(
2338 &self,
2339 envelope: MessageEnvelope,
2340 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2341 ) {
2342 match self.upgrade() {
2343 Some(router) => router.post(envelope, return_handle),
2344 None => envelope.undeliverable(
2345 DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2346 return_handle,
2347 ),
2348 }
2349 }
2350}
2351
2352#[derive(Debug, Clone)]
2366pub struct DialMailboxRouter {
2367 address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,
2368 sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2369
2370 default: BoxedMailboxSender,
2373
2374 direct_addressed_remote_only: bool,
2377}
2378
2379impl Default for DialMailboxRouter {
2380 fn default() -> Self {
2381 Self::new()
2382 }
2383}
2384
2385impl DialMailboxRouter {
2386 pub fn new() -> Self {
2388 Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2389 }
2390
2391 pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2396 Self {
2397 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2398 sender_cache: Arc::new(DashMap::new()),
2399 default,
2400 direct_addressed_remote_only: false,
2401 }
2402 }
2403
2404 pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
2409 Self {
2410 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2411 sender_cache: Arc::new(DashMap::new()),
2412 default,
2413 direct_addressed_remote_only: true,
2414 }
2415 }
2416
2417 pub fn bind(&self, dest: Reference, addr: ChannelAddr) {
2423 if let Ok(mut w) = self.address_book.write() {
2424 if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
2425 && old_addr != addr
2426 {
2427 tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2428 self.sender_cache.remove(&old_addr);
2429 }
2430 } else {
2431 tracing::error!("address book poisoned during bind of {:?}", dest);
2432 }
2433 }
2434
2435 pub fn unbind(&self, dest: &Reference) {
2441 if let Ok(mut w) = self.address_book.write() {
2442 let to_remove: Vec<(Reference, ChannelAddr)> = w
2443 .range(dest..)
2444 .take_while(|(key, _)| dest.is_prefix_of(key))
2445 .map(|(key, addr)| (key.clone(), addr.clone()))
2446 .collect();
2447
2448 for (key, addr) in to_remove {
2449 tracing::info!("unbinding {:?} from {:?}", key, addr);
2450 w.remove(&key);
2451 self.sender_cache.remove(&addr);
2452 }
2453 } else {
2454 tracing::error!("address book poisoned during unbind of {:?}", dest);
2455 }
2456 }
2457
2458 pub fn lookup_addr(&self, actor_id: &ActorId) -> Option<ChannelAddr> {
2460 let address_book = self.address_book.read().unwrap();
2461 let found = address_book
2462 .lower_bound(Excluded(&actor_id.clone().into()))
2463 .prev();
2464
2465 if let Some((key, addr)) = found
2468 && key.is_prefix_of(&actor_id.clone().into())
2469 {
2470 Some(addr.clone())
2471 } else if actor_id.proc_id().is_direct() {
2472 let (addr, _name) = actor_id.proc_id().clone().into_direct().unwrap();
2473 if self.direct_addressed_remote_only {
2474 addr.transport().is_remote().then_some(addr)
2475 } else {
2476 Some(addr)
2477 }
2478 } else {
2479 None
2480 }
2481 }
2482
2483 pub fn prefixes(&self) -> BTreeSet<Reference> {
2486 let addrs = self.address_book.read().unwrap();
2487 let mut prefixes: BTreeSet<Reference> = BTreeSet::new();
2488 for (reference, _) in addrs.iter() {
2489 match prefixes.lower_bound(Excluded(reference)).peek_prev() {
2490 Some(candidate) if candidate.is_prefix_of(reference) => (),
2491 _ => {
2492 prefixes.insert(reference.clone());
2493 }
2494 }
2495 }
2496
2497 prefixes
2498 }
2499
2500 fn dial(
2501 &self,
2502 addr: &ChannelAddr,
2503 actor_id: &ActorId,
2504 ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2505 match self.sender_cache.entry(addr.clone()) {
2509 Entry::Occupied(entry) => Ok(entry.get().clone()),
2510 Entry::Vacant(entry) => {
2511 let tx = channel::dial(addr.clone()).map_err(|err| {
2512 MailboxSenderError::new_unbound_type(
2513 actor_id.clone(),
2514 MailboxSenderErrorKind::Channel(err),
2515 "unknown",
2516 )
2517 })?;
2518 let sender = MailboxClient::new(tx);
2519 Ok(entry.insert(Arc::new(sender)).value().clone())
2520 }
2521 }
2522 }
2523}
2524
2525impl MailboxSender for DialMailboxRouter {
2526 fn post_unchecked(
2527 &self,
2528 envelope: MessageEnvelope,
2529 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2530 ) {
2531 let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2532 self.default.post(envelope, return_handle);
2533 return;
2534 };
2535
2536 match self.dial(&addr, envelope.dest().actor_id()) {
2537 Err(err) => envelope.undeliverable(
2538 DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2539 return_handle,
2540 ),
2541 Ok(sender) => sender.post(envelope, return_handle),
2542 }
2543 }
2544}
2545
2546#[derive(Debug)]
2549pub struct UnroutableMailboxSender;
2550
2551impl MailboxSender for UnroutableMailboxSender {
2552 fn post_unchecked(
2553 &self,
2554 envelope: MessageEnvelope,
2555 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2556 ) {
2557 envelope.undeliverable(
2558 DeliveryError::Unroutable("destination not found in routing table".to_string()),
2559 return_handle,
2560 );
2561 }
2562}
2563
2564#[cfg(test)]
2565mod tests {
2566
2567 use std::assert_matches::assert_matches;
2568 use std::mem::drop;
2569 use std::str::FromStr;
2570 use std::sync::atomic::AtomicUsize;
2571 use std::time::Duration;
2572
2573 use timed_test::async_timed_test;
2574
2575 use super::*;
2576 use crate::Actor;
2577 use crate::ActorHandle;
2578 use crate::Instance;
2579 use crate::PortId;
2580 use crate::accum;
2581 use crate::channel::ChannelTransport;
2582 use crate::channel::dial;
2583 use crate::channel::serve;
2584 use crate::channel::sim::SimAddr;
2585 use crate::clock::Clock;
2586 use crate::clock::RealClock;
2587 use crate::data::Serialized;
2588 use crate::id;
2589 use crate::proc::Proc;
2590 use crate::reference::ProcId;
2591 use crate::reference::WorldId;
2592 use crate::simnet;
2593
2594 #[test]
2595 fn test_error() {
2596 let err = MailboxError::new(
2597 ActorId(
2598 ProcId::Ranked(WorldId("myworld".to_string()), 2),
2599 "myactor".to_string(),
2600 5,
2601 ),
2602 MailboxErrorKind::Closed,
2603 );
2604 assert_eq!(format!("{}", err), "myworld[2].myactor[5]: mailbox closed");
2605 }
2606
2607 #[tokio::test]
2608 async fn test_mailbox_basic() {
2609 let mbox = Mailbox::new_detached(id!(test[0].test));
2610 let (port, mut receiver) = mbox.open_port::<u64>();
2611 let port = port.bind();
2612
2613 mbox.serialize_and_send(&port, 123, monitored_return_handle())
2614 .unwrap();
2615 mbox.serialize_and_send(&port, 321, monitored_return_handle())
2616 .unwrap();
2617 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2618 assert_eq!(receiver.recv().await.unwrap(), 321u64);
2619
2620 let serialized = Serialized::serialize(&999u64).unwrap();
2621 mbox.post(
2622 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2623 monitored_return_handle(),
2624 );
2625 assert_eq!(receiver.recv().await.unwrap(), 999u64);
2626 }
2627
2628 #[tokio::test]
2629 async fn test_mailbox_accum() {
2630 let mbox = Mailbox::new_detached(id!(test[0].test));
2631 let (port, mut receiver) = mbox.open_accum_port(accum::max::<i64>());
2632
2633 for i in -3..4 {
2634 port.send(i).unwrap();
2635 let received: accum::Max<i64> = receiver.recv().await.unwrap();
2636 let msg = received.get();
2637 assert_eq!(msg, &i);
2638 }
2639 for i in -3..4 {
2641 port.send(i).unwrap();
2642 assert_eq!(receiver.recv().await.unwrap().get(), &3);
2643 }
2644 port.send(4).unwrap();
2646 assert_eq!(receiver.recv().await.unwrap().get(), &4);
2647
2648 for i in 5..10 {
2650 port.send(i).unwrap();
2651 }
2652 assert_eq!(receiver.recv().await.unwrap().get(), &9);
2653 port.send(1).unwrap();
2654 port.send(3).unwrap();
2655 port.send(2).unwrap();
2656 assert_eq!(receiver.recv().await.unwrap().get(), &9);
2657 }
2658
2659 #[test]
2660 fn test_port_and_reducer() {
2661 let mbox = Mailbox::new_detached(id!(test[0].test));
2662 {
2664 let accumulator = accum::max::<u64>();
2665 let reducer_spec = accumulator.reducer_spec().unwrap();
2666 let (port, _) = mbox.open_accum_port(accum::max::<u64>());
2667 assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
2668 let port_ref = port.bind();
2669 assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
2670 }
2671 {
2673 let (port, _) = mbox.open_port::<u64>();
2674 assert_eq!(port.reducer_spec, None);
2675 let port_ref = port.bind();
2676 assert_eq!(port_ref.reducer_spec(), &None);
2677 }
2678 }
2679
2680 #[tokio::test]
2681 #[ignore] async fn test_mailbox_once() {
2683 let mbox = Mailbox::new_detached(id!(test[0].test));
2684
2685 let (port, receiver) = mbox.open_once_port::<u64>();
2686
2687 port.send(123u64).unwrap();
2690 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2691
2692 }
2703
2704 #[tokio::test]
2705 #[ignore] async fn test_mailbox_receiver_drop() {
2707 let mbox = Mailbox::new_detached(id!(test[0].test));
2708 let (port, mut receiver) = mbox.open_port::<u64>();
2709 let port = port.bind();
2711 mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
2712 .unwrap();
2713 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2714 drop(receiver);
2715 let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
2716 panic!();
2717 };
2718
2719 assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2720 assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
2721 }
2722
2723 #[tokio::test]
2724 async fn test_drain() {
2725 let mbox = Mailbox::new_detached(id!(test[0].test));
2726
2727 let (port, mut receiver) = mbox.open_port();
2728 let port = port.bind();
2729
2730 for i in 0..10 {
2731 mbox.serialize_and_send(&port, i, monitored_return_handle())
2732 .unwrap();
2733 }
2734
2735 for i in 0..10 {
2736 assert_eq!(receiver.recv().await.unwrap(), i);
2737 }
2738
2739 assert!(receiver.drain().is_empty());
2740 }
2741
2742 #[tokio::test]
2743 async fn test_mailbox_muxer() {
2744 let muxer = MailboxMuxer::new();
2745
2746 let mbox0 = Mailbox::new_detached(id!(test[0].actor1));
2747 let mbox1 = Mailbox::new_detached(id!(test[0].actor2));
2748
2749 muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
2750 muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
2751
2752 let (port, receiver) = mbox0.open_once_port::<u64>();
2753
2754 port.send(123u64).unwrap();
2755 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2756
2757 }
2767
2768 #[tokio::test]
2769 async fn test_local_client_server() {
2770 let mbox = Mailbox::new_detached(id!(test[0].actor0));
2771 let (tx, rx) = channel::local::new();
2772 let serve_handle = mbox.clone().serve(rx);
2773 let client = MailboxClient::new(tx);
2774
2775 let (port, receiver) = mbox.open_once_port::<u64>();
2776 let port = port.bind();
2777
2778 client
2779 .serialize_and_send_once(port, 123u64, monitored_return_handle())
2780 .unwrap();
2781 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2782 serve_handle.stop("fromt test");
2783 serve_handle.await.unwrap().unwrap();
2784 }
2785
2786 #[tokio::test]
2787 async fn test_sim_client_server() {
2788 simnet::start();
2789 let dst_addr = SimAddr::new("local:1".parse::<ChannelAddr>().unwrap()).unwrap();
2790 let src_to_dst = ChannelAddr::Sim(
2791 SimAddr::new_with_src(
2792 "local:0".parse::<ChannelAddr>().unwrap(),
2793 dst_addr.addr().clone(),
2794 )
2795 .unwrap(),
2796 );
2797
2798 let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone())).unwrap();
2799 let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
2800 let mbox = Mailbox::new_detached(id!(test[0].actor0));
2801 let serve_handle = mbox.clone().serve(rx);
2802 let client = MailboxClient::new(tx);
2803 let (port, receiver) = mbox.open_once_port::<u64>();
2804 let port = port.bind();
2805 let msg: u64 = 123;
2806 client
2807 .serialize_and_send_once(port, msg, monitored_return_handle())
2808 .unwrap();
2809 assert_eq!(receiver.recv().await.unwrap(), msg);
2810 serve_handle.stop("from test");
2811 serve_handle.await.unwrap().unwrap();
2812 }
2813
2814 #[tokio::test]
2815 async fn test_mailbox_router() {
2816 let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2817 let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2818 let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2819 let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2820
2821 let comms: Vec<(OncePortRef<u64>, OncePortReceiver<u64>)> =
2822 [&mbox0, &mbox1, &mbox2, &mbox3]
2823 .into_iter()
2824 .map(|mbox| {
2825 let (port, receiver) = mbox.open_once_port::<u64>();
2826 (port.bind(), receiver)
2827 })
2828 .collect();
2829
2830 let router = MailboxRouter::new();
2831
2832 router.bind(id!(world0).into(), mbox0);
2833 router.bind(id!(world1[0]).into(), mbox1);
2834 router.bind(id!(world1[1]).into(), mbox2);
2835 router.bind(id!(world1[1].actor1).into(), mbox3);
2836
2837 for (i, (port, receiver)) in comms.into_iter().enumerate() {
2838 router
2839 .serialize_and_send_once(port, i as u64, monitored_return_handle())
2840 .unwrap();
2841 assert_eq!(receiver.recv().await.unwrap(), i as u64);
2842 }
2843
2844 let mbox4 = Mailbox::new_detached(id!(fallback[0].actor));
2847
2848 let (return_handle, mut return_receiver) =
2849 crate::mailbox::undeliverable::new_undeliverable_port();
2850 let (port, _receiver) = mbox4.open_once_port();
2851 router
2852 .serialize_and_send_once(port.bind(), 0, return_handle.clone())
2853 .unwrap();
2854 assert!(return_receiver.recv().await.is_ok());
2855
2856 let router = router.fallback(mbox4.clone().into_boxed());
2857 let (port, receiver) = mbox4.open_once_port();
2858 router
2859 .serialize_and_send_once(port.bind(), 0, return_handle)
2860 .unwrap();
2861 assert_eq!(receiver.recv().await.unwrap(), 0);
2862 }
2863
2864 #[tokio::test]
2865 async fn test_dial_mailbox_router() {
2866 let router = DialMailboxRouter::new();
2867
2868 router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
2869 router.bind(id!(world1[0]).into(), "unix!@2".parse().unwrap());
2870 router.bind(id!(world1[1]).into(), "unix!@3".parse().unwrap());
2871 router.bind(id!(world1[1].actor1).into(), "unix!@4".parse().unwrap());
2872 router.bind(
2874 "unix:@4,my_proc,my_actor".parse().unwrap(),
2875 "unix:@5".parse().unwrap(),
2876 );
2877
2878 router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2880 router.lookup_addr(&id!(world1[0].actor[0])).unwrap();
2881
2882 let actor_id = Reference::from_str("unix:@4,my_proc,my_actor")
2883 .unwrap()
2884 .into_actor()
2885 .unwrap();
2886 assert_eq!(
2887 router.lookup_addr(&actor_id).unwrap(),
2888 "unix!@5".parse().unwrap(),
2889 );
2890 router.unbind(&actor_id.clone().into());
2891 assert_eq!(
2892 router.lookup_addr(&actor_id).unwrap(),
2893 "unix!@4".parse().unwrap(),
2894 );
2895
2896 router.unbind(&id!(world1).into());
2898 assert!(router.lookup_addr(&id!(world1[0].actor1[0])).is_none());
2899 assert!(router.lookup_addr(&id!(world1[1].actor1[0])).is_none());
2900 assert!(router.lookup_addr(&id!(world1[2].actor1[0])).is_none());
2901 router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2902 router.unbind(&id!(world0).into());
2903 assert!(router.lookup_addr(&id!(world0[0].actor[0])).is_none());
2904 }
2905
2906 #[tokio::test]
2907 #[ignore] async fn test_dial_mailbox_router_default() {
2909 let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2910 let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2911 let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2912 let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2913
2914 let root = MailboxRouter::new();
2917 let world0_router = DialMailboxRouter::new_with_default(root.boxed());
2918 let world1_router = DialMailboxRouter::new_with_default(root.boxed());
2919
2920 root.bind(id!(world0).into(), world0_router.clone());
2921 root.bind(id!(world1).into(), world1_router.clone());
2922
2923 let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
2924
2925 let mut handles = Vec::new(); for mbox in mailboxes.iter() {
2927 let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
2928 let handle = (*mbox).clone().serve(rx);
2929 handles.push(handle);
2930
2931 eprintln!("{}: {}", mbox.actor_id(), addr);
2932 if mbox.actor_id().world_name() == "world0" {
2933 world0_router.bind(mbox.actor_id().clone().into(), addr);
2934 } else {
2935 world1_router.bind(mbox.actor_id().clone().into(), addr);
2936 }
2937 }
2938
2939 for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
2941 for mbox in mailboxes.iter() {
2942 let (port, receiver) = mbox.open_once_port::<u64>();
2943 let port = port.bind();
2944 router
2945 .serialize_and_send_once(port, 123u64, monitored_return_handle())
2946 .unwrap();
2947 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2948 }
2949 }
2950 }
2951
2952 #[tokio::test]
2953 async fn test_enqueue_port() {
2954 let mbox = Mailbox::new_detached(id!(test[0].test));
2955
2956 let count = Arc::new(AtomicUsize::new(0));
2957 let count_clone = count.clone();
2958 let port = mbox.open_enqueue_port(move |_, n| {
2959 count_clone.fetch_add(n, Ordering::SeqCst);
2960 Ok(())
2961 });
2962
2963 port.send(10).unwrap();
2964 port.send(5).unwrap();
2965 port.send(1).unwrap();
2966 port.send(0).unwrap();
2967
2968 assert_eq!(count.load(Ordering::SeqCst), 16);
2969 }
2970
2971 #[derive(Clone, Debug, Serialize, Deserialize, Named)]
2972 struct TestMessage;
2973
2974 #[derive(Clone, Debug, Serialize, Deserialize, Named)]
2975 #[named(name = "some::custom::path")]
2976 struct TestMessage2;
2977
2978 #[test]
2979 fn test_remote_message_macros() {
2980 assert_eq!(
2981 TestMessage::typename(),
2982 "hyperactor::mailbox::tests::TestMessage"
2983 );
2984 assert_eq!(TestMessage2::typename(), "some::custom::path");
2985 }
2986
2987 #[test]
2988 fn test_message_envelope_display() {
2989 #[derive(Named, Serialize, Deserialize)]
2990 struct MyTest {
2991 a: u64,
2992 b: String,
2993 }
2994 crate::register_type!(MyTest);
2995
2996 let envelope = MessageEnvelope::serialize(
2997 id!(source[0].actor),
2998 id!(dest[1].actor[0][123]),
2999 &MyTest {
3000 a: 123,
3001 b: "hello".into(),
3002 },
3003 Attrs::new(),
3004 )
3005 .unwrap();
3006
3007 assert_eq!(
3008 format!("{}", envelope),
3009 r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"}"#
3010 );
3011 }
3012
3013 #[derive(Debug, Default, Actor)]
3014 struct Foo;
3015
3016 #[tokio::test]
3019 async fn test_actor_delivery_failure() {
3020 use crate::actor::ActorStatus;
3023 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
3024
3025 let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
3026 BOXED_PANICKING_MAILBOX_SENDER.clone(),
3027 ));
3028 let proc_id = id!(quux[0]);
3029 let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
3030 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3031
3032 let foo = proc.spawn::<Foo>("foo", ()).await.unwrap();
3033 let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
3034 let message = MessageEnvelope::new(
3035 foo.actor_id().clone(),
3036 PortId(id!(corge[0].bar), 9999u64),
3037 Serialized::serialize(&1u64).unwrap(),
3038 Attrs::new(),
3039 );
3040 return_handle.send(Undeliverable(message)).unwrap();
3041
3042 RealClock
3043 .sleep(tokio::time::Duration::from_millis(100))
3044 .await;
3045
3046 let foo_status = foo.status();
3047 assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
3048 let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
3049 unreachable!()
3050 };
3051 assert!(msg.as_str().contains(
3052 "serving quux[0].foo[0]: processing error: a message from \
3053 quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
3054 ));
3055
3056 proc.destroy_and_wait::<()>(tokio::time::Duration::from_secs(1), None)
3057 .await
3058 .unwrap();
3059 }
3060
3061 #[tokio::test]
3062 async fn test_detached_return_handle() {
3063 let (return_handle, mut return_receiver) =
3064 crate::mailbox::undeliverable::new_undeliverable_port();
3065 let envelope = MessageEnvelope::new(
3067 id!(foo[0].bar),
3068 PortId(id!(baz[0].corge), 9999u64),
3069 Serialized::serialize(&1u64).unwrap(),
3070 Attrs::new(),
3071 );
3072 return_handle.send(Undeliverable(envelope.clone())).unwrap();
3073 assert!(
3075 RealClock
3076 .timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
3077 .await
3078 .is_ok()
3079 );
3080 let monitor_handle = tokio::spawn(async move {
3083 while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
3084 envelope.set_error(DeliveryError::BrokenLink(
3085 "returned in unit test".to_string(),
3086 ));
3087 UndeliverableMailboxSender
3088 .post(envelope, monitored_return_handle());
3089 }
3090 });
3091 drop(return_handle);
3092 assert!(
3093 RealClock
3094 .timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3095 .await
3096 .is_ok()
3097 );
3098 }
3099
3100 async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3101 fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3102 let dummy_state =
3105 State::new(id!(world[0].actor), BOXED_PANICKING_MAILBOX_SENDER.clone());
3106 let dummy_port_id = PortId(id!(world[0].actor), 0);
3107 let (sender, receiver) = mpsc::unbounded_channel::<M>();
3108 let receiver = PortReceiver {
3109 receiver,
3110 port_id: dummy_port_id,
3111 coalesce,
3112 mailbox: Mailbox {
3113 inner: Arc::new(dummy_state),
3114 },
3115 };
3116 (sender, receiver)
3117 }
3118
3119 {
3121 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3122 assert!(receiver.drain().is_empty());
3123
3124 sender.send(0).unwrap();
3125 sender.send(1).unwrap();
3126 sender.send(2).unwrap();
3127 sender.send(3).unwrap();
3128 sender.send(4).unwrap();
3129 sender.send(5).unwrap();
3130 sender.send(6).unwrap();
3131 sender.send(7).unwrap();
3132
3133 if drop_sender {
3134 drop(sender);
3135 }
3136
3137 if !coalesce {
3138 assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3139 } else {
3140 assert_eq!(receiver.drain(), vec![7]);
3141 }
3142
3143 assert!(receiver.drain().is_empty());
3144 assert!(receiver.drain().is_empty());
3145 }
3146
3147 {
3149 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3150 assert!(receiver.try_recv().unwrap().is_none());
3151
3152 sender.send(0).unwrap();
3153 sender.send(1).unwrap();
3154 sender.send(2).unwrap();
3155 sender.send(3).unwrap();
3156
3157 if drop_sender {
3158 drop(sender);
3159 }
3160
3161 if !coalesce {
3162 assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3163 assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3164 assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3165 }
3166 assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3167 if drop_sender {
3168 assert_matches!(
3169 receiver.try_recv().unwrap_err().kind(),
3170 MailboxErrorKind::Closed
3171 );
3172 assert_matches!(
3174 receiver.try_recv().unwrap_err().kind(),
3175 MailboxErrorKind::Closed
3176 );
3177 } else {
3178 assert!(receiver.try_recv().unwrap().is_none());
3179 assert!(receiver.try_recv().unwrap().is_none());
3181 }
3182 }
3183 {
3185 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3186 assert!(
3187 RealClock
3188 .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3189 .await
3190 .is_err()
3191 );
3192
3193 sender.send(4).unwrap();
3194 sender.send(5).unwrap();
3195 sender.send(6).unwrap();
3196 sender.send(7).unwrap();
3197
3198 if drop_sender {
3199 drop(sender);
3200 }
3201
3202 if !coalesce {
3203 assert_eq!(receiver.recv().await.unwrap(), 4);
3204 assert_eq!(receiver.recv().await.unwrap(), 5);
3205 assert_eq!(receiver.recv().await.unwrap(), 6);
3206 }
3207 assert_eq!(receiver.recv().await.unwrap(), 7);
3208 if drop_sender {
3209 assert_matches!(
3210 receiver.recv().await.unwrap_err().kind(),
3211 MailboxErrorKind::Closed
3212 );
3213 assert_matches!(
3215 receiver.recv().await.unwrap_err().kind(),
3216 MailboxErrorKind::Closed
3217 );
3218 } else {
3219 assert!(
3220 RealClock
3221 .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3222 .await
3223 .is_err()
3224 );
3225 }
3226 }
3227 }
3228
3229 #[tokio::test]
3230 async fn test_receiver_basic_default() {
3231 verify_receiver(false, false).await
3232 }
3233
3234 #[tokio::test]
3235 async fn test_receiver_basic_latest() {
3236 verify_receiver(true, false).await
3237 }
3238
3239 #[tokio::test]
3240 async fn test_receiver_after_sender_drop_default() {
3241 verify_receiver(false, true).await
3242 }
3243
3244 #[tokio::test]
3245 async fn test_receiver_after_sender_drop_latest() {
3246 verify_receiver(true, true).await
3247 }
3248
3249 struct Setup {
3250 receiver: PortReceiver<u64>,
3251 actor0: Instance<()>,
3252 actor1: Instance<()>,
3253 _actor0_handle: ActorHandle<()>,
3254 _actor1_handle: ActorHandle<()>,
3255 port_id: PortId,
3256 port_id1: PortId,
3257 port_id2: PortId,
3258 port_id2_1: PortId,
3259 }
3260
3261 async fn setup_split_port_ids(
3262 reducer_spec: Option<ReducerSpec>,
3263 reducer_opts: Option<ReducerOpts>,
3264 ) -> Setup {
3265 let proc = Proc::local();
3266 let (actor0, actor0_handle) = proc.instance("actor0").unwrap();
3267 let (actor1, actor1_handle) = proc.instance("actor1").unwrap();
3268
3269 let (port_handle, receiver) = actor0.open_port::<u64>();
3271 let port_id = port_handle.bind().port_id().clone();
3272
3273 let port_id1 = port_id
3275 .split(&actor1, reducer_spec.clone(), reducer_opts.clone())
3276 .unwrap();
3277 let port_id2 = port_id
3278 .split(&actor1, reducer_spec.clone(), reducer_opts.clone())
3279 .unwrap();
3280
3281 let port_id2_1 = port_id2
3283 .split(&actor1, reducer_spec, reducer_opts.clone())
3284 .unwrap();
3285
3286 Setup {
3287 receiver,
3288 actor0,
3289 actor1,
3290 _actor0_handle: actor0_handle,
3291 _actor1_handle: actor1_handle,
3292 port_id,
3293 port_id1,
3294 port_id2,
3295 port_id2_1,
3296 }
3297 }
3298
3299 fn post(cx: &impl context::Actor, port_id: PortId, msg: u64) {
3300 let serialized = Serialized::serialize(&msg).unwrap();
3301 port_id.send(cx, serialized);
3302 }
3303
3304 #[async_timed_test(timeout_secs = 30)]
3305 async fn test_split_port_id_no_reducer() {
3306 let Setup {
3307 mut receiver,
3308 actor0,
3309 actor1,
3310 port_id,
3311 port_id1,
3312 port_id2,
3313 port_id2_1,
3314 ..
3315 } = setup_split_port_ids(None, None).await;
3316 post(&actor0, port_id.clone(), 1);
3318 assert_eq!(receiver.recv().await.unwrap(), 1);
3319 post(&actor1, port_id1.clone(), 2);
3320 assert_eq!(receiver.recv().await.unwrap(), 2);
3321 post(&actor1, port_id2.clone(), 3);
3322 assert_eq!(receiver.recv().await.unwrap(), 3);
3323 post(&actor1, port_id2_1.clone(), 4);
3324 assert_eq!(receiver.recv().await.unwrap(), 4);
3325
3326 RealClock.sleep(Duration::from_secs(2)).await;
3328 let msg = receiver.try_recv().unwrap();
3329 assert_eq!(msg, None);
3330 }
3331
3332 async fn wait_for(
3333 receiver: &mut PortReceiver<u64>,
3334 expected_size: usize,
3335 timeout_duration: Duration,
3336 ) -> anyhow::Result<Vec<u64>> {
3337 let mut messeges = vec![];
3338
3339 RealClock
3340 .timeout(timeout_duration, async {
3341 loop {
3342 let msg = receiver.recv().await.unwrap();
3343 messeges.push(msg);
3344 if messeges.len() == expected_size {
3345 break;
3346 }
3347 }
3348 })
3349 .await?;
3350 Ok(messeges)
3351 }
3352
3353 #[async_timed_test(timeout_secs = 30)]
3354 async fn test_split_port_id_sum_reducer() {
3355 let config = crate::config::global::lock();
3356 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3357
3358 let sum_accumulator = accum::sum::<u64>();
3359 let reducer_spec = sum_accumulator.reducer_spec();
3360 let Setup {
3361 mut receiver,
3362 actor0,
3363 actor1,
3364 port_id,
3365 port_id1,
3366 port_id2,
3367 port_id2_1,
3368 ..
3369 } = setup_split_port_ids(reducer_spec, None).await;
3370 post(&actor0, port_id.clone(), 4);
3371 post(&actor1, port_id1.clone(), 2);
3372 post(&actor1, port_id2.clone(), 3);
3373 post(&actor1, port_id2_1.clone(), 1);
3374 let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3375 .await
3376 .unwrap();
3377 messages.sort();
3380 assert_eq!(messages, vec![1, 2, 3, 4]);
3381
3382 RealClock.sleep(Duration::from_secs(2)).await;
3384 let msg = receiver.try_recv().unwrap();
3385 assert_eq!(msg, None);
3386 }
3387
3388 #[async_timed_test(timeout_secs = 30)]
3389 async fn test_split_port_id_every_n_messages() {
3390 let config = crate::config::global::lock();
3391 let _config_guard = config.override_key(
3392 crate::config::SPLIT_MAX_BUFFER_AGE,
3393 Duration::from_secs(600),
3394 );
3395 let proc = Proc::local();
3396 let (actor, _actor_handle) = proc.instance("actor").unwrap();
3397 let (port_handle, mut receiver) = actor.open_port::<u64>();
3398 let port_id = port_handle.bind().port_id().clone();
3399 let reducer_spec = accum::sum::<u64>().reducer_spec();
3401 let split_port_id = port_id.split(&actor, reducer_spec, None).unwrap();
3402
3403 for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3405 post(&actor, split_port_id.clone(), msg);
3406 }
3407 let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3410 .await
3411 .unwrap();
3412 assert_eq!(messages, vec![15]);
3413
3414 RealClock.sleep(Duration::from_secs(2)).await;
3417 let msg = receiver.try_recv().unwrap();
3418 assert_eq!(msg, None);
3419 }
3420
3421 #[async_timed_test(timeout_secs = 30)]
3422 async fn test_split_port_timeout_flush() {
3423 let config = crate::config::global::lock();
3424 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
3425
3426 let Setup {
3427 mut receiver,
3428 actor0: _,
3429 actor1,
3430 port_id: _,
3431 port_id1,
3432 port_id2: _,
3433 port_id2_1: _,
3434 ..
3435 } = setup_split_port_ids(
3436 Some(accum::sum::<u64>().reducer_spec().unwrap()),
3437 Some(ReducerOpts {
3438 max_update_interval: Some(Duration::from_millis(50)),
3439 }),
3440 )
3441 .await;
3442
3443 post(&actor1, port_id1.clone(), 10);
3444 post(&actor1, port_id1.clone(), 20);
3445 post(&actor1, port_id1.clone(), 30);
3446
3447 RealClock.sleep(Duration::from_millis(10)).await;
3449 let msg = receiver.try_recv().unwrap();
3450 assert_eq!(msg, None);
3451
3452 RealClock.sleep(Duration::from_millis(100)).await;
3454
3455 let msg = receiver.recv().await.unwrap();
3457 assert_eq!(msg, 60); let msg = receiver.try_recv().unwrap();
3461 assert_eq!(msg, None);
3462 }
3463
3464 #[async_timed_test(timeout_secs = 30)]
3465 async fn test_split_port_timeout_and_size_flush() {
3466 let config = crate::config::global::lock();
3467 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
3468
3469 let Setup {
3470 mut receiver,
3471 actor0: _,
3472 actor1,
3473 port_id: _,
3474 port_id1,
3475 port_id2: _,
3476 port_id2_1: _,
3477 ..
3478 } = setup_split_port_ids(
3479 Some(accum::sum::<u64>().reducer_spec().unwrap()),
3480 Some(ReducerOpts {
3481 max_update_interval: Some(Duration::from_millis(50)),
3482 }),
3483 )
3484 .await;
3485
3486 post(&actor1, port_id1.clone(), 10);
3487 post(&actor1, port_id1.clone(), 20);
3488 post(&actor1, port_id1.clone(), 30);
3489 post(&actor1, port_id1.clone(), 40);
3490
3491 let msg = receiver.recv().await.unwrap();
3493 assert_eq!(msg, 60);
3494
3495 let msg = receiver.recv().await.unwrap();
3497 assert_eq!(msg, 40);
3498
3499 let msg = receiver.try_recv().unwrap();
3501 assert_eq!(msg, None);
3502 }
3503
3504 #[test]
3505 fn test_dial_mailbox_router_prefixes_empty() {
3506 assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
3507 }
3508
3509 #[test]
3510 fn test_dial_mailbox_router_prefixes_single_entry() {
3511 let router = DialMailboxRouter::new();
3512 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3513
3514 let prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3515 assert_eq!(prefixes.len(), 1);
3516 assert_eq!(prefixes[0], id!(world0).into());
3517 }
3518
3519 #[test]
3520 fn test_dial_mailbox_router_prefixes_no_overlap() {
3521 let router = DialMailboxRouter::new();
3522 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3523 router.bind(id!(world1).into(), "unix!@2".parse().unwrap());
3524 router.bind(id!(world2).into(), "unix!@3".parse().unwrap());
3525
3526 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3527 prefixes.sort();
3528
3529 let mut expected = vec![id!(world0).into(), id!(world1).into(), id!(world2).into()];
3530 expected.sort();
3531
3532 assert_eq!(prefixes, expected);
3533 }
3534
3535 #[test]
3536 fn test_dial_mailbox_router_prefixes_with_overlaps() {
3537 let router = DialMailboxRouter::new();
3538 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3539 router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3540 router.bind(id!(world0[1]).into(), "unix!@3".parse().unwrap());
3541 router.bind(id!(world1).into(), "unix!@4".parse().unwrap());
3542 router.bind(id!(world1[0]).into(), "unix!@5".parse().unwrap());
3543
3544 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3545 prefixes.sort();
3546
3547 let mut expected = vec![id!(world0).into(), id!(world1).into()];
3549 expected.sort();
3550
3551 assert_eq!(prefixes, expected);
3552 }
3553
3554 #[test]
3555 fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
3556 let router = DialMailboxRouter::new();
3557 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3558 router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3559 router.bind(id!(world0[0].actor1).into(), "unix!@3".parse().unwrap());
3560 router.bind(id!(world1[0]).into(), "unix!@4".parse().unwrap());
3561 router.bind(id!(world1[1]).into(), "unix!@5".parse().unwrap());
3562 router.bind(id!(world2[0].actor0).into(), "unix!@6".parse().unwrap());
3563
3564 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3565 prefixes.sort();
3566
3567 let expected = vec![
3573 id!(world0).into(),
3574 id!(world1[0]).into(),
3575 id!(world1[1]).into(),
3576 id!(world2[0].actor0).into(),
3577 ];
3578
3579 assert_eq!(prefixes, expected);
3580 }
3581
3582 #[test]
3583 fn test_dial_mailbox_router_prefixes_same_level() {
3584 let router = DialMailboxRouter::new();
3585 router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
3586 router.bind(id!(world0[1]).into(), "unix!@2".parse().unwrap());
3587 router.bind(id!(world0[2]).into(), "unix!@3".parse().unwrap());
3588
3589 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3590 prefixes.sort();
3591
3592 let mut expected = vec![
3594 id!(world0[0]).into(),
3595 id!(world0[1]).into(),
3596 id!(world0[2]).into(),
3597 ];
3598 expected.sort();
3599
3600 assert_eq!(prefixes, expected);
3601 }
3602
3603 #[derive(Clone, Debug)]
3607 struct AsyncLoopForwarder;
3608
3609 impl MailboxSender for AsyncLoopForwarder {
3610 fn post_unchecked(
3611 &self,
3612 envelope: MessageEnvelope,
3613 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3614 ) {
3615 let me = self.clone();
3616 tokio::spawn(async move {
3617 me.post(envelope, return_handle);
3619 });
3620 }
3621 }
3622
3623 #[tokio::test]
3624 async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
3625 let actor_id = ActorId(
3626 ProcId::Ranked(id!(test_world), 0),
3627 "ttl_actor".to_string(),
3628 0,
3629 );
3630 let mailbox = Mailbox::new(
3631 actor_id.clone(),
3632 BoxedMailboxSender::new(AsyncLoopForwarder),
3633 );
3634 let (ret_port, mut ret_rx) = mailbox.open_port::<Undeliverable<MessageEnvelope>>();
3635 ret_port.bind_to(Undeliverable::<MessageEnvelope>::port());
3636
3637 let remote_actor = ActorId(
3640 ProcId::Ranked(id!(remote_world), 1),
3641 "remote".to_string(),
3642 0,
3643 );
3644 let dest = PortId(remote_actor.clone(), 4242);
3645
3646 let payload = 1234_u64;
3649 let envelope =
3650 MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Attrs::new())
3651 .expect("serialize");
3652
3653 let return_handle = ret_port.clone();
3656 mailbox.post(envelope, return_handle);
3657
3658 #[allow(clippy::disallowed_methods)]
3660 let Undeliverable(undelivered) =
3661 tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
3662 .await
3663 .expect("timed out waiting for undeliverable")
3664 .expect("channel closed");
3665
3666 let got: u64 = undelivered.deserialized().expect("deserialize");
3668 assert_eq!(got, payload, "payload preserved");
3669 }
3670
3671 #[tokio::test]
3672 async fn message_ttl_success_local_delivery() {
3673 let actor_id = ActorId(
3674 ProcId::Ranked(id!(test_world), 0),
3675 "ttl_actor".to_string(),
3676 0,
3677 );
3678 let mailbox = Mailbox::new(
3679 actor_id.clone(),
3680 BoxedMailboxSender::new(PanickingMailboxSender),
3681 );
3682 let (undeliverable_tx, mut undeliverable_rx) =
3683 mailbox.open_port::<Undeliverable<MessageEnvelope>>();
3684 undeliverable_tx.bind_to(Undeliverable::<MessageEnvelope>::port());
3685
3686 let (user_port, mut user_rx) = mailbox.open_port::<u64>();
3688
3689 let payload = 0xC0FFEE_u64;
3691 let envelope = MessageEnvelope::serialize(
3692 actor_id.clone(),
3693 user_port.bind().port_id().clone(),
3694 &payload,
3695 Attrs::new(),
3696 )
3697 .expect("serialize");
3698
3699 let return_handle = mailbox
3702 .bound_return_handle()
3703 .unwrap_or(monitored_return_handle());
3704 mailbox.post(envelope, return_handle);
3705
3706 #[allow(clippy::disallowed_methods)]
3708 let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
3709 .await
3710 .expect("timed out waiting for local delivery")
3711 .expect("user port closed");
3712 assert_eq!(got, payload);
3713
3714 #[allow(clippy::disallowed_methods)]
3716 let no_undeliverable =
3717 tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
3718 assert!(
3719 no_undeliverable.is_err(),
3720 "unexpected undeliverable returned on successful local delivery"
3721 );
3722 }
3723
3724 #[tokio::test]
3725 async fn test_port_contramap() {
3726 let mbox = Mailbox::new_detached(id!(test[0].test));
3727 let (handle, mut rx) = mbox.open_port();
3728
3729 handle
3730 .contramap(|m| (1, m))
3731 .send("hello".to_string())
3732 .unwrap();
3733 assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
3734 }
3735}