1#![allow(dead_code)] use std::any::Any;
69use std::collections::BTreeMap;
70use std::collections::BTreeSet;
71use std::fmt;
72use std::fmt::Debug;
73use std::future;
74use std::future::Future;
75use std::ops::Bound::Excluded;
76use std::pin::Pin;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::sync::Mutex;
80use std::sync::OnceLock;
81use std::sync::RwLock;
82use std::sync::Weak;
83use std::sync::atomic::AtomicU64;
84use std::sync::atomic::AtomicUsize;
85use std::sync::atomic::Ordering;
86use std::task::Context;
87use std::task::Poll;
88
89use async_trait::async_trait;
90use dashmap::DashMap;
91use dashmap::mapref::entry::Entry;
92use futures::Sink;
93use futures::Stream;
94use serde::Deserialize;
95use serde::Serialize;
96use serde::de::DeserializeOwned;
97use tokio::sync::mpsc;
98use tokio::sync::oneshot;
99use tokio::sync::watch;
100use tokio::task::JoinHandle;
101use tokio_util::sync::CancellationToken;
102
103use crate as hyperactor; use crate::Named;
105use crate::OncePortRef;
106use crate::PortRef;
107use crate::accum::Accumulator;
108use crate::accum::ReducerOpts;
109use crate::accum::ReducerSpec;
110use crate::actor::Signal;
111use crate::actor::remote::USER_PORT_OFFSET;
112use crate::attrs::Attrs;
113use crate::channel;
114use crate::channel::ChannelAddr;
115use crate::channel::ChannelError;
116use crate::channel::SendError;
117use crate::channel::TxStatus;
118use crate::context;
119use crate::data::Serialized;
120use crate::id;
121use crate::metrics;
122use crate::reference::ActorId;
123use crate::reference::PortId;
124use crate::reference::Reference;
125
126mod undeliverable;
127pub use undeliverable::Undeliverable;
129pub use undeliverable::UndeliverableMessageError;
130pub use undeliverable::custom_monitored_return_handle;
131pub use undeliverable::monitored_return_handle; pub use undeliverable::supervise_undeliverable_messages;
133pub use undeliverable::supervise_undeliverable_messages_with;
134pub mod mailbox_admin_message;
136pub use mailbox_admin_message::MailboxAdminMessage;
137pub use mailbox_admin_message::MailboxAdminMessageHandler;
138pub mod durable_mailbox_sender;
140pub use durable_mailbox_sender::log;
141use durable_mailbox_sender::log::*;
142pub mod headers;
144
145pub trait Message: Debug + Send + Sync + 'static {}
148impl<M: Debug + Send + Sync + 'static> Message for M {}
149
150pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
154
155impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
156
157pub type Data = Vec<u8>;
159
160#[derive(
162 thiserror::Error,
163 Debug,
164 Serialize,
165 Deserialize,
166 Named,
167 Clone,
168 PartialEq,
169 Eq
170)]
171pub enum DeliveryError {
172 #[error("address not routable: {0}")]
174 Unroutable(String),
175
176 #[error("broken link: {0}")]
179 BrokenLink(String),
180
181 #[error("mailbox error: {0}")]
183 Mailbox(String),
184
185 #[error("multicast error: {0}")]
187 Multicast(String),
188
189 #[error("ttl expired")]
191 TtlExpired,
192}
193
194#[derive(Debug, Serialize, Deserialize, Clone, Named)]
198pub struct MessageEnvelope {
199 sender: ActorId,
201
202 dest: PortId,
204
205 data: Serialized,
207
208 errors: Vec<DeliveryError>,
210
211 headers: Attrs,
213
214 ttl: u8,
216
217 return_undeliverable: bool,
220 }
222
223impl MessageEnvelope {
224 pub fn new(sender: ActorId, dest: PortId, data: Serialized, headers: Attrs) -> Self {
226 Self {
227 sender,
228 dest,
229 data,
230 errors: Vec::new(),
231 headers,
232 ttl: crate::config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
233 return_undeliverable: true,
235 }
236 }
237
238 pub(crate) fn new_unknown(dest: PortId, data: Serialized) -> Self {
240 Self::new(id!(unknown[0].unknown), dest, data, Attrs::new())
241 }
242
243 pub fn serialize<T: Serialize + Named>(
245 source: ActorId,
246 dest: PortId,
247 value: &T,
248 headers: Attrs,
249 ) -> Result<Self, crate::data::Error> {
250 Ok(Self {
251 headers,
252 data: Serialized::serialize(value)?,
253 sender: source,
254 dest,
255 errors: Vec::new(),
256 ttl: crate::config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
257 return_undeliverable: true,
259 })
260 }
261
262 pub fn ttl(&self) -> u8 {
268 self.ttl
269 }
270
271 pub fn set_ttl(mut self, ttl: u8) -> Self {
281 self.ttl = ttl;
282 self
283 }
284
285 fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
293 if self.ttl == 0 {
294 Err(DeliveryError::TtlExpired)
295 } else {
296 self.ttl -= 1;
297 Ok(())
298 }
299 }
300
301 pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
303 self.data.deserialized()
304 }
305
306 pub fn data(&self) -> &Serialized {
308 &self.data
309 }
310
311 pub fn sender(&self) -> &ActorId {
313 &self.sender
314 }
315
316 pub fn dest(&self) -> &PortId {
318 &self.dest
319 }
320
321 pub fn headers(&self) -> &Attrs {
323 &self.headers
324 }
325
326 pub fn is_signal(&self) -> bool {
328 self.dest.index() == Signal::port()
329 }
330
331 pub fn set_error(&mut self, error: DeliveryError) {
334 self.errors.push(error)
335 }
336
337 pub fn update_sender(&mut self, sender: ActorId) {
341 self.sender = sender;
342 }
343
344 pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
348 self.return_undeliverable = return_undeliverable;
349 }
350
351 pub fn undeliverable(
355 mut self,
356 error: DeliveryError,
357 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
358 ) {
359 tracing::error!(
360 name = "undelivered_message_attempt",
361 sender = self.sender.to_string(),
362 dest = self.dest.to_string(),
363 error = error.to_string(),
364 return_handle = %return_handle,
365 );
366 metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
367 1,
368 hyperactor_telemetry::kv_pairs!(
369 "sender_actor_id" => self.sender.to_string(),
370 "dest_actor_id" => self.dest.to_string(),
371 "message_type" => self.data.typename().unwrap_or("unknown"),
372 "error_type" => error.to_string(),
373 ),
374 );
375
376 self.set_error(error);
377 undeliverable::return_undeliverable(return_handle, self);
378 }
379
380 pub fn errors(&self) -> &Vec<DeliveryError> {
383 &self.errors
384 }
385
386 pub fn error_msg(&self) -> Option<String> {
390 if self.errors.is_empty() {
391 None
392 } else {
393 Some(
394 self.errors
395 .iter()
396 .map(|e| e.to_string())
397 .collect::<Vec<_>>()
398 .join("; "),
399 )
400 }
401 }
402
403 fn open(self) -> (MessageMetadata, Serialized) {
404 let Self {
405 sender,
406 dest,
407 data,
408 errors,
409 headers,
410 ttl,
411 return_undeliverable,
412 } = self;
413
414 (
415 MessageMetadata {
416 sender,
417 dest,
418 errors,
419 headers,
420 ttl,
421 return_undeliverable,
422 },
423 data,
424 )
425 }
426
427 fn seal(metadata: MessageMetadata, data: Serialized) -> Self {
428 let MessageMetadata {
429 sender,
430 dest,
431 errors,
432 headers,
433 ttl,
434 return_undeliverable,
435 } = metadata;
436
437 Self {
438 sender,
439 dest,
440 data,
441 errors,
442 headers,
443 ttl,
444 return_undeliverable,
445 }
446 }
447
448 fn return_undeliverable(&self) -> bool {
449 self.return_undeliverable
450 }
451}
452
453impl fmt::Display for MessageEnvelope {
454 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
455 match &self.error_msg() {
456 None => write!(
457 f,
458 "{} > {}: {} {{{}}}",
459 self.sender, self.dest, self.data, self.headers
460 ),
461 Some(err) => write!(
462 f,
463 "{} > {}: {} {{{}}}: delivery error: {}",
464 self.sender, self.dest, self.data, self.headers, err
465 ),
466 }
467 }
468}
469
470#[derive(Clone)]
472pub struct MessageMetadata {
473 sender: ActorId,
474 dest: PortId,
475 errors: Vec<DeliveryError>,
476 headers: Attrs,
477 ttl: u8,
478 return_undeliverable: bool,
479}
480
481#[derive(Debug)]
484pub struct MailboxError {
485 actor_id: ActorId,
486 kind: MailboxErrorKind,
487}
488
489#[derive(thiserror::Error, Debug)]
492#[non_exhaustive]
493pub enum MailboxErrorKind {
494 #[error("mailbox closed")]
496 Closed,
497
498 #[error("invalid port: {0}")]
500 InvalidPort(PortId),
501
502 #[error("no sender for port: {0}")]
504 NoSenderForPort(PortId),
505
506 #[error("no local sender for port: {0}")]
509 NoLocalSenderForPort(PortId),
510
511 #[error("{0}: port closed")]
513 PortClosed(PortId),
514
515 #[error("send {0}: {1}")]
517 Send(PortId, #[source] anyhow::Error),
518
519 #[error("recv {0}: {1}")]
521 Recv(PortId, #[source] anyhow::Error),
522
523 #[error("serialize: {0}")]
525 Serialize(#[source] anyhow::Error),
526
527 #[error("deserialize {0}: {1}")]
529 Deserialize(&'static str, anyhow::Error),
530
531 #[error(transparent)]
533 Channel(#[from] ChannelError),
534}
535
536impl MailboxError {
537 pub fn new(actor_id: ActorId, kind: MailboxErrorKind) -> Self {
540 Self { actor_id, kind }
541 }
542
543 pub fn actor_id(&self) -> &ActorId {
545 &self.actor_id
546 }
547
548 pub fn kind(&self) -> &MailboxErrorKind {
550 &self.kind
551 }
552}
553
554impl fmt::Display for MailboxError {
555 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556 write!(f, "{}: ", self.actor_id)?;
557 fmt::Display::fmt(&self.kind, f)
558 }
559}
560
561impl std::error::Error for MailboxError {
562 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
563 self.kind.source()
564 }
565}
566
567#[derive(Debug, Clone)]
571pub enum PortLocation {
572 Bound(PortId),
574 Unbound(ActorId, &'static str),
576}
577
578impl PortLocation {
579 fn new_unbound<M: Message>(actor_id: ActorId) -> Self {
580 PortLocation::Unbound(actor_id, std::any::type_name::<M>())
581 }
582
583 fn new_unbound_type(actor_id: ActorId, ty: &'static str) -> Self {
584 PortLocation::Unbound(actor_id, ty)
585 }
586
587 pub fn actor_id(&self) -> &ActorId {
589 match self {
590 PortLocation::Bound(port_id) => port_id.actor_id(),
591 PortLocation::Unbound(actor_id, _) => actor_id,
592 }
593 }
594}
595
596impl fmt::Display for PortLocation {
597 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598 match self {
599 PortLocation::Bound(port_id) => write!(f, "{}", port_id),
600 PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
601 }
602 }
603}
604
605#[derive(Debug)]
608pub struct MailboxSenderError {
609 location: Box<PortLocation>,
610 kind: Box<MailboxSenderErrorKind>,
611}
612
613#[derive(thiserror::Error, Debug)]
615pub enum MailboxSenderErrorKind {
616 #[error("serialization error: {0}")]
618 Serialize(anyhow::Error),
619
620 #[error("deserialization error for type {0}: {1}")]
622 Deserialize(&'static str, anyhow::Error),
623
624 #[error("invalid port")]
626 Invalid,
627
628 #[error("port closed")]
630 Closed,
631
632 #[error(transparent)]
635 Mailbox(#[from] MailboxError),
636
637 #[error(transparent)]
639 Channel(#[from] ChannelError),
640
641 #[error(transparent)]
643 MessageLog(#[from] MessageLogError),
644
645 #[error("send error: {0}")]
647 Other(#[from] anyhow::Error),
648
649 #[error("unreachable: {0}")]
651 Unreachable(anyhow::Error),
652}
653
654impl MailboxSenderError {
655 pub fn new_unbound<M>(actor_id: ActorId, kind: MailboxSenderErrorKind) -> Self {
657 Self {
658 location: Box::new(PortLocation::Unbound(actor_id, std::any::type_name::<M>())),
659 kind: Box::new(kind),
660 }
661 }
662
663 pub fn new_unbound_type(
665 actor_id: ActorId,
666 kind: MailboxSenderErrorKind,
667 ty: &'static str,
668 ) -> Self {
669 Self {
670 location: Box::new(PortLocation::Unbound(actor_id, ty)),
671 kind: Box::new(kind),
672 }
673 }
674
675 pub fn new_bound(port_id: PortId, kind: MailboxSenderErrorKind) -> Self {
677 Self {
678 location: Box::new(PortLocation::Bound(port_id)),
679 kind: Box::new(kind),
680 }
681 }
682
683 pub fn location(&self) -> &PortLocation {
685 &self.location
686 }
687
688 pub fn kind(&self) -> &MailboxSenderErrorKind {
690 &self.kind
691 }
692}
693
694impl fmt::Display for MailboxSenderError {
695 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
696 write!(f, "{}: ", self.location)?;
697 fmt::Display::fmt(&self.kind, f)
698 }
699}
700
701impl std::error::Error for MailboxSenderError {
702 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
703 self.kind.source()
704 }
705}
706
707pub trait MailboxSender: Send + Sync + Debug + Any {
710 fn post(
713 &self,
714 mut envelope: MessageEnvelope,
715 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
716 ) {
717 if let Err(err) = envelope.dec_ttl_or_err() {
718 envelope.undeliverable(err, return_handle);
719 return;
720 }
721 self.post_unchecked(envelope, return_handle);
722 }
723
724 fn post_unchecked(
726 &self,
727 envelope: MessageEnvelope,
728 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
729 );
730}
731
732pub trait PortSender: MailboxSender {
735 fn serialize_and_send<M: RemoteMessage>(
737 &self,
738 port: &PortRef<M>,
739 message: M,
740 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
741 ) -> Result<(), MailboxSenderError> {
742 let serialized = Serialized::serialize(&message).map_err(|err| {
744 MailboxSenderError::new_bound(
745 port.port_id().clone(),
746 MailboxSenderErrorKind::Serialize(err.into()),
747 )
748 })?;
749 self.post(
750 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
751 return_handle,
752 );
753 Ok(())
754 }
755
756 fn serialize_and_send_once<M: RemoteMessage>(
759 &self,
760 once_port: OncePortRef<M>,
761 message: M,
762 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
763 ) -> Result<(), MailboxSenderError> {
764 let serialized = Serialized::serialize(&message).map_err(|err| {
765 MailboxSenderError::new_bound(
766 once_port.port_id().clone(),
767 MailboxSenderErrorKind::Serialize(err.into()),
768 )
769 })?;
770 self.post(
771 MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
772 return_handle,
773 );
774 Ok(())
775 }
776}
777
778impl<T: ?Sized + MailboxSender> PortSender for T {}
779
780#[derive(Debug, Clone)]
784pub struct PanickingMailboxSender;
785
786impl MailboxSender for PanickingMailboxSender {
787 fn post_unchecked(
788 &self,
789 envelope: MessageEnvelope,
790 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
791 ) {
792 panic!("panic! in the mailbox! attempted post: {}", envelope)
793 }
794}
795
796#[derive(Debug)]
799pub struct UndeliverableMailboxSender;
800
801impl MailboxSender for UndeliverableMailboxSender {
802 fn post_unchecked(
803 &self,
804 envelope: MessageEnvelope,
805 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
806 ) {
807 let sender_name = envelope.sender.name();
808 let mut error_str = "".to_string();
809 if !envelope.errors.is_empty() {
810 error_str = envelope
811 .errors
812 .iter()
813 .map(|e| e.to_string())
814 .collect::<Vec<_>>()
815 .join("; ");
816 }
817 tracing::error!(
820 name = "undelivered_message_abandoned",
821 actor_name = sender_name,
822 actor_id = envelope.sender.to_string(),
823 dest = envelope.dest.to_string(),
824 headers = envelope.headers().to_string(), data = envelope.data().to_string(),
826 "message not delivered, {}",
827 error_str,
828 );
829 }
830}
831
832#[derive(Debug)]
833struct Buffer<T: Message> {
834 queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
835 processed: watch::Receiver<usize>,
836 seq: AtomicUsize,
837}
838
839impl<T: Message> Buffer<T> {
840 fn new<Fut>(
841 process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
842 ) -> Self
843 where
844 Fut: Future<Output = ()> + Send + 'static,
845 {
846 let (queue, mut next) = mpsc::unbounded_channel();
847 let (last_processed, processed) = watch::channel(0);
848 crate::init::get_runtime().spawn(async move {
849 let mut seq = 0;
850 while let Some((msg, return_handle)) = next.recv().await {
851 process(msg, return_handle).await;
852 seq += 1;
853 let _ = last_processed.send(seq);
854 }
855 });
856 Self {
857 queue,
858 processed,
859 seq: AtomicUsize::new(0),
860 }
861 }
862
863 #[allow(clippy::result_large_err)]
864 fn send(
865 &self,
866 item: (T, PortHandle<Undeliverable<T>>),
867 ) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
868 self.seq.fetch_add(1, Ordering::SeqCst);
869 self.queue.send(item)?;
870 Ok(())
871 }
872
873 async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
874 let seq = self.seq.load(Ordering::SeqCst);
875 while *self.processed.borrow_and_update() < seq {
876 self.processed.changed().await?;
877 }
878 Ok(())
879 }
880}
881
882static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
883 LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
884
885#[derive(Debug, Clone)]
891pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
892
893impl BoxedMailboxSender {
894 pub fn new(sender: impl MailboxSender + 'static) -> Self {
896 Self(Arc::new(sender))
897 }
898
899 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
902 (&*self.0 as &dyn Any).downcast_ref::<T>()
903 }
904}
905
906pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
908 fn boxed(&self) -> BoxedMailboxSender;
910}
911impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
912 fn boxed(&self) -> BoxedMailboxSender {
913 BoxedMailboxSender::new(self.clone())
914 }
915}
916
917pub trait IntoBoxedMailboxSender: MailboxSender {
919 fn into_boxed(self) -> BoxedMailboxSender;
921}
922impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
923 fn into_boxed(self) -> BoxedMailboxSender {
924 BoxedMailboxSender::new(self)
925 }
926}
927
928impl MailboxSender for BoxedMailboxSender {
929 fn post_unchecked(
930 &self,
931 envelope: MessageEnvelope,
932 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
933 ) {
934 self.0.post_unchecked(envelope, return_handle);
935 }
936}
937
938#[derive(thiserror::Error, Debug)]
940pub enum MailboxServerError {
941 #[error(transparent)]
943 Channel(#[from] ChannelError),
944
945 #[error(transparent)]
947 MailboxSender(#[from] MailboxSenderError),
948}
949
950#[derive(Debug)]
953pub struct MailboxServerHandle {
954 join_handle: JoinHandle<Result<(), MailboxServerError>>,
955 stopped_tx: watch::Sender<bool>,
956}
957
958impl MailboxServerHandle {
959 pub fn stop(&self, reason: &str) {
964 tracing::info!("stopping mailbox server; reason: {}", reason);
965 self.stopped_tx.send(true).expect("stop called twice");
966 }
967}
968
969impl Future for MailboxServerHandle {
971 type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
972
973 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
974 let join_handle_pinned =
976 unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
977 join_handle_pinned.poll(cx)
978 }
979}
980
981fn server_return_handle<T: MailboxServer>(server: T) -> PortHandle<Undeliverable<MessageEnvelope>> {
986 let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
987
988 tokio::task::spawn(async move {
989 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
990 if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
991 {
992 UndeliverableMailboxSender.post(e, monitored_return_handle());
994 continue;
995 }
996 envelope.set_error(DeliveryError::BrokenLink(
997 "message was undeliverable".to_owned(),
998 ));
999 server.post(
1000 MessageEnvelope::new(
1001 envelope.sender().clone(),
1002 PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1003 envelope.sender(),
1004 )
1005 .port_id()
1006 .clone(),
1007 Serialized::serialize(&Undeliverable(envelope)).unwrap(),
1008 Attrs::new(),
1009 ),
1010 monitored_return_handle(),
1011 );
1012 }
1013 });
1014
1015 return_handle
1016}
1017
1018pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
1021 fn serve(
1025 self,
1026 mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
1027 ) -> MailboxServerHandle {
1028 let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
1033 let server = self.clone();
1034 tokio::task::spawn(async move {
1035 while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
1036 if let Ok(Undeliverable(e)) =
1037 envelope.deserialized::<Undeliverable<MessageEnvelope>>()
1038 {
1039 UndeliverableMailboxSender.post(e, monitored_return_handle());
1041 continue;
1042 }
1043 envelope.set_error(DeliveryError::BrokenLink(
1044 "message was undeliverable".to_owned(),
1045 ));
1046 server.post(
1047 MessageEnvelope::new(
1048 envelope.sender().clone(),
1049 PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1050 envelope.sender(),
1051 )
1052 .port_id()
1053 .clone(),
1054 Serialized::serialize(&Undeliverable(envelope)).unwrap(),
1055 Attrs::new(),
1056 ),
1057 monitored_return_handle(),
1058 );
1059 }
1060 });
1061
1062 let (stopped_tx, mut stopped_rx) = watch::channel(false);
1063 let join_handle = tokio::spawn(async move {
1064 let mut detached = false;
1065
1066 loop {
1067 if *stopped_rx.borrow_and_update() {
1068 break Ok(());
1069 }
1070
1071 tokio::select! {
1072 message = rx.recv() => {
1073 match message {
1074 Ok(envelope) => self.post(envelope, return_handle.clone()),
1076
1077 Err(ChannelError::Closed) => break Ok(()),
1080 Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1081 }
1082 }
1083 result = stopped_rx.changed(), if !detached => {
1084 detached = result.is_err();
1085 if detached {
1086 tracing::debug!(
1087 "the mailbox server is detached for Rx {}", rx.addr()
1088 );
1089 } else {
1090 tracing::debug!(
1091 "the mailbox server is stopped for Rx {}", rx.addr()
1092 );
1093 }
1094 }
1095 }
1096 }
1097 });
1098
1099 MailboxServerHandle {
1100 join_handle,
1101 stopped_tx,
1102 }
1103 }
1104}
1105
1106impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1107
1108#[derive(Debug)]
1110pub struct MailboxClient {
1111 buffer: Buffer<MessageEnvelope>,
1113
1114 _tx_monitoring: CancellationToken,
1116}
1117
1118impl MailboxClient {
1119 pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1122 let addr = tx.addr();
1123 let tx = Arc::new(tx);
1124 let tx_status = tx.status().clone();
1125 let tx_monitoring = CancellationToken::new();
1126 let buffer = Buffer::new(move |envelope, return_handle| {
1127 let tx = Arc::clone(&tx);
1128 let (return_channel, return_receiver) =
1129 oneshot::channel::<SendError<MessageEnvelope>>();
1130 let return_handle_0 = return_handle.clone();
1132 tokio::spawn(async move {
1133 let result = return_receiver.await;
1134 if let Ok(SendError(e, message)) = result {
1135 message.undeliverable(
1136 DeliveryError::BrokenLink(format!(
1137 "failed to enqueue in MailboxClient when processing buffer: {e}"
1138 )),
1139 return_handle_0,
1140 );
1141 }
1142 });
1143 tx.try_post(envelope, return_channel);
1145 future::ready(())
1146 });
1147 let this = Self {
1148 buffer,
1149 _tx_monitoring: tx_monitoring.clone(),
1150 };
1151 Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1152 this
1153 }
1154
1155 pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1158 Ok(MailboxClient::new(channel::dial(addr)?))
1159 }
1160
1161 fn monitor_tx_health(
1163 mut rx: watch::Receiver<TxStatus>,
1164 cancel_token: CancellationToken,
1165 addr: ChannelAddr,
1166 ) {
1167 crate::init::get_runtime().spawn(async move {
1168 loop {
1169 tokio::select! {
1170 changed = rx.changed() => {
1171 if changed.is_err() || *rx.borrow() == TxStatus::Closed {
1172 tracing::warn!("connection to {} lost", addr);
1173 break;
1176 }
1177 }
1178 _ = cancel_token.cancelled() => {
1179 break;
1180 }
1181 }
1182 }
1183 });
1184 }
1185}
1186
1187impl MailboxSender for MailboxClient {
1188 fn post_unchecked(
1189 &self,
1190 envelope: MessageEnvelope,
1191 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1192 ) {
1193 tracing::event!(target:"messages", tracing::Level::TRACE, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1194 if let Err(mpsc::error::SendError((envelope, return_handle))) =
1195 self.buffer.send((envelope, return_handle))
1196 {
1197 let err = DeliveryError::BrokenLink(
1198 "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
1199 );
1200
1201 envelope.undeliverable(err, return_handle);
1203 }
1204 }
1205}
1206
1207pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1209 cx: C,
1210 port: PortRef<M>,
1211}
1212
1213impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1214 pub fn new(cx: C, port: PortRef<M>) -> Self {
1216 Self { cx, port }
1217 }
1218}
1219
1220impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1221 type Error = MailboxSenderError;
1222
1223 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1224 Poll::Ready(Ok(()))
1225 }
1226
1227 fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1228 self.port.send(&self.cx, item)
1229 }
1230
1231 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1232 Poll::Ready(Ok(()))
1233 }
1234
1235 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1236 Poll::Ready(Ok(()))
1237 }
1238}
1239
1240#[derive(Clone, Debug)]
1243pub struct Mailbox {
1244 inner: Arc<State>,
1245}
1246
1247impl Mailbox {
1248 pub fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
1251 Self {
1252 inner: Arc::new(State::new(actor_id, forwarder)),
1253 }
1254 }
1255
1256 pub fn new_detached(actor_id: ActorId) -> Self {
1258 Self {
1259 inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1260 }
1261 }
1262
1263 pub fn actor_id(&self) -> &ActorId {
1265 &self.inner.actor_id
1266 }
1267
1268 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1273 let port_index = self.inner.allocate_port();
1274 let (sender, receiver) = mpsc::unbounded_channel::<M>();
1275 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1276 tracing::trace!(
1277 name = "open_port",
1278 "opening port for {} at {}",
1279 self.inner.actor_id,
1280 port_id
1281 );
1282 (
1283 PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1284 PortReceiver::new(receiver, port_id, false, self.clone()),
1285 )
1286 }
1287
1288 pub(crate) fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1295 let (handle, receiver) = self.open_port();
1296 handle.bind_actor_port();
1297 (handle, receiver)
1298 }
1299
1300 pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1303 where
1304 A: Accumulator + Send + Sync + 'static,
1305 A::Update: Message,
1306 A::State: Message + Default + Clone,
1307 {
1308 self.open_accum_port_opts(accum, None)
1309 }
1310
1311 pub fn open_accum_port_opts<A>(
1319 &self,
1320 accum: A,
1321 reducer_opts: Option<ReducerOpts>,
1322 ) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1323 where
1324 A: Accumulator + Send + Sync + 'static,
1325 A::Update: Message,
1326 A::State: Message + Default + Clone,
1327 {
1328 let port_index = self.inner.allocate_port();
1329 let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1330 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1331 let state = Mutex::new(A::State::default());
1332 let reducer_spec = accum.reducer_spec();
1333 let enqueue = move |_, update: A::Update| {
1334 let mut state = state.lock().unwrap();
1335 accum.accumulate(&mut state, update)?;
1336 let _ = sender.send(state.clone());
1337 Ok(())
1338 };
1339 (
1340 PortHandle {
1341 mailbox: self.clone(),
1342 port_index,
1343 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1344 bound: Arc::new(OnceLock::new()),
1345 reducer_spec,
1346 reducer_opts,
1347 },
1348 PortReceiver::new(receiver, port_id, true, self.clone()),
1349 )
1350 }
1351
1352 pub(crate) fn open_enqueue_port<M: Message>(
1356 &self,
1357 enqueue: impl Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1358 ) -> PortHandle<M> {
1359 PortHandle {
1360 mailbox: self.clone(),
1361 port_index: self.inner.allocate_port(),
1362 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1363 bound: Arc::new(OnceLock::new()),
1364 reducer_spec: None,
1365 reducer_opts: None,
1366 }
1367 }
1368
1369 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1373 let port_index = self.inner.allocate_port();
1374 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1375 let (sender, receiver) = oneshot::channel::<M>();
1376 (
1377 OncePortHandle {
1378 mailbox: self.clone(),
1379 port_index,
1380 port_id: port_id.clone(),
1381 sender,
1382 },
1383 OncePortReceiver {
1384 receiver: Some(receiver),
1385 port_id,
1386 mailbox: self.clone(),
1387 },
1388 )
1389 }
1390
1391 fn error(&self, err: MailboxErrorKind) -> MailboxError {
1392 MailboxError::new(self.inner.actor_id.clone(), err)
1393 }
1394
1395 fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1396 let port_index = M::port();
1397 self.inner.ports.get(&port_index).and_then(|boxed| {
1398 boxed
1399 .as_any()
1400 .downcast_ref::<UnboundedSender<M>>()
1401 .map(|s| {
1402 assert_eq!(
1403 s.port_id,
1404 self.actor_id().port_id(port_index),
1405 "port_id mismatch in downcasted UnboundedSender"
1406 );
1407 s.sender.clone()
1408 })
1409 })
1410 }
1411
1412 pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1414 self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1415 .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1416 }
1417
1418 pub(crate) fn allocate_port(&self) -> u64 {
1419 self.inner.allocate_port()
1420 }
1421
1422 fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
1423 assert_eq!(
1424 handle.mailbox.actor_id(),
1425 self.actor_id(),
1426 "port does not belong to mailbox"
1427 );
1428
1429 let port_id = self.actor_id().port_id(handle.port_index);
1432 match self.inner.ports.entry(handle.port_index) {
1433 Entry::Vacant(entry) => {
1434 entry.insert(Box::new(UnboundedSender::new(
1435 handle.sender.clone(),
1436 port_id.clone(),
1437 )));
1438 }
1439 Entry::Occupied(_entry) => {}
1440 }
1441
1442 PortRef::attest(port_id)
1443 }
1444
1445 fn bind_to_actor_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
1446 assert_eq!(
1447 handle.mailbox.actor_id(),
1448 self.actor_id(),
1449 "port does not belong to mailbox"
1450 );
1451
1452 let port_index = M::port();
1453 let port_id = self.actor_id().port_id(port_index);
1454 match self.inner.ports.entry(port_index) {
1455 Entry::Vacant(entry) => {
1456 entry.insert(Box::new(UnboundedSender::new(
1457 handle.sender.clone(),
1458 port_id,
1459 )));
1460 }
1461 Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1462 }
1463 }
1464
1465 fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1466 let port_id = handle.port_id().clone();
1467 match self.inner.ports.entry(handle.port_index) {
1468 Entry::Vacant(entry) => {
1469 entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1470 }
1471 Entry::Occupied(_entry) => {}
1472 }
1473 }
1474
1475 pub(crate) fn bind_untyped(&self, port_id: &PortId, sender: UntypedUnboundedSender) {
1476 assert_eq!(
1477 port_id.actor_id(),
1478 self.actor_id(),
1479 "port does not belong to mailbox"
1480 );
1481
1482 match self.inner.ports.entry(port_id.index()) {
1483 Entry::Vacant(entry) => {
1484 entry.insert(Box::new(sender));
1485 }
1486 Entry::Occupied(_entry) => {}
1487 }
1488 }
1489}
1490
1491impl context::Mailbox for Mailbox {
1492 fn mailbox(&self) -> &Mailbox {
1493 self
1494 }
1495}
1496
1497pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1502 cx.mailbox().open_port()
1503}
1504
1505pub fn open_once_port<M: Message>(
1508 cx: &impl context::Mailbox,
1509) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1510 cx.mailbox().open_once_port()
1511}
1512
1513impl MailboxSender for Mailbox {
1514 fn post_unchecked(
1517 &self,
1518 envelope: MessageEnvelope,
1519 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1520 ) {
1521 metrics::MAILBOX_POSTS.add(
1522 1,
1523 hyperactor_telemetry::kv_pairs!(
1524 "actor_id" => envelope.sender.to_string(),
1525 "dest_actor_id" => envelope.dest.0.to_string(),
1526 ),
1527 );
1528 tracing::trace!(
1529 name = "post",
1530 actor_name = envelope.sender.name(),
1531 actor_id = envelope.sender.to_string(),
1532 "posting message to {}",
1533 envelope.dest
1534 );
1535
1536 if envelope.dest().actor_id() != &self.inner.actor_id {
1537 return self.inner.forwarder.post(envelope, return_handle);
1538 }
1539
1540 match self.inner.ports.entry(envelope.dest().index()) {
1541 Entry::Vacant(_) => {
1542 let err = DeliveryError::Unroutable(format!(
1543 "port not bound in mailbox; port id: {}; message type: {}",
1544 envelope.dest().index(),
1545 envelope.data().typename().map_or_else(
1546 || format!("unregistered type hash {}", envelope.data().typehash()),
1547 |s| s.to_string(),
1548 )
1549 ));
1550
1551 envelope.undeliverable(err, return_handle);
1552 }
1553 Entry::Occupied(entry) => {
1554 let (metadata, data) = envelope.open();
1555 let MessageMetadata {
1556 headers,
1557 sender,
1558 dest,
1559 errors: metadata_errors,
1560 ttl,
1561 return_undeliverable,
1562 } = metadata;
1563
1564 match entry.get().send_serialized(headers, data) {
1572 Ok(false) => {
1573 entry.remove();
1574 }
1575 Ok(true) => (),
1576 Err(SerializedSenderError {
1577 data,
1578 error: sender_error,
1579 headers,
1580 }) => {
1581 let err = DeliveryError::Mailbox(format!("{}", sender_error));
1582
1583 MessageEnvelope::seal(
1584 MessageMetadata {
1585 headers,
1586 sender,
1587 dest,
1588 errors: metadata_errors,
1589 ttl,
1590 return_undeliverable,
1591 },
1592 data,
1593 )
1594 .undeliverable(err, return_handle)
1595 }
1596 }
1597 }
1598 }
1599 }
1600}
1601
1602#[derive(Debug)]
1610pub struct PortHandle<M: Message> {
1611 mailbox: Mailbox,
1612 port_index: u64,
1613 sender: UnboundedPortSender<M>,
1614 bound: Arc<OnceLock<PortId>>,
1621 reducer_spec: Option<ReducerSpec>,
1624 reducer_opts: Option<ReducerOpts>,
1626}
1627
1628impl<M: Message> PortHandle<M> {
1629 fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1630 Self {
1631 mailbox,
1632 port_index,
1633 sender,
1634 bound: Arc::new(OnceLock::new()),
1635 reducer_spec: None,
1636 reducer_opts: None,
1637 }
1638 }
1639
1640 fn location(&self) -> PortLocation {
1641 match self.bound.get() {
1642 Some(port_id) => PortLocation::Bound(port_id.clone()),
1643 None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1644 }
1645 }
1646
1647 pub fn send(&self, message: M) -> Result<(), MailboxSenderError> {
1649 let mut headers = Attrs::new();
1650
1651 crate::mailbox::headers::set_send_timestamp(&mut headers);
1652 crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1653
1654 self.sender.send(headers, message).map_err(|err| {
1655 MailboxSenderError::new_unbound::<M>(
1656 self.mailbox.actor_id().clone(),
1657 MailboxSenderErrorKind::Other(err),
1658 )
1659 })
1660 }
1661
1662 pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
1665 where
1666 R: Message,
1667 F: Fn(R) -> M + Send + Sync + 'static,
1668 {
1669 let port_index = self.mailbox.inner.allocate_port();
1670 let sender = self.sender.clone();
1671 PortHandle::new(
1672 self.mailbox.clone(),
1673 port_index,
1674 UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
1675 sender.send(headers, unmap(value))
1676 })),
1677 )
1678 }
1679}
1680
1681impl<M: RemoteMessage> PortHandle<M> {
1682 pub fn bind(&self) -> PortRef<M> {
1684 PortRef::attest_reducible(
1685 self.bound
1686 .get_or_init(|| self.mailbox.bind(self).port_id().clone())
1687 .clone(),
1688 self.reducer_spec.clone(),
1689 )
1690 }
1691
1692 pub(crate) fn bind_actor_port(&self) {
1698 let port_id = self.mailbox.actor_id().port_id(M::port());
1699 self.bound
1700 .set(port_id)
1701 .map_err(|p| {
1702 format!(
1703 "could not bind port handle {} as {p}: already bound",
1704 self.port_index
1705 )
1706 })
1707 .unwrap();
1708 self.mailbox.bind_to_actor_port(self);
1709 }
1710}
1711
1712impl<M: Message> Clone for PortHandle<M> {
1713 fn clone(&self) -> Self {
1714 Self {
1715 mailbox: self.mailbox.clone(),
1716 port_index: self.port_index,
1717 sender: self.sender.clone(),
1718 bound: self.bound.clone(),
1719 reducer_spec: self.reducer_spec.clone(),
1720 reducer_opts: self.reducer_opts.clone(),
1721 }
1722 }
1723}
1724
1725impl<M: Message> fmt::Display for PortHandle<M> {
1726 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1727 fmt::Display::fmt(&self.location(), f)
1728 }
1729}
1730
1731#[derive(Debug)]
1733pub struct OncePortHandle<M: Message> {
1734 mailbox: Mailbox,
1735 port_index: u64,
1736 port_id: PortId,
1737 sender: oneshot::Sender<M>,
1738}
1739
1740impl<M: Message> OncePortHandle<M> {
1741 pub fn port_id(&self) -> &PortId {
1744 &self.port_id
1745 }
1746
1747 pub fn send(self, message: M) -> Result<(), MailboxSenderError> {
1750 let actor_id = self.mailbox.actor_id().clone();
1751 self.sender.send(message).map_err(|_| {
1752 MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1757 })?;
1758 Ok(())
1759 }
1760}
1761
1762impl<M: RemoteMessage> OncePortHandle<M> {
1763 pub fn bind(self) -> OncePortRef<M> {
1768 let port_id = self.port_id().clone();
1769 self.mailbox.clone().bind_once(self);
1770 OncePortRef::attest(port_id)
1771 }
1772}
1773
1774impl<M: Message> fmt::Display for OncePortHandle<M> {
1775 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1776 fmt::Display::fmt(&self.port_id(), f)
1777 }
1778}
1779
1780#[derive(Debug)]
1783pub struct PortReceiver<M> {
1784 receiver: mpsc::UnboundedReceiver<M>,
1785 port_id: PortId,
1786 coalesce: bool,
1789 mailbox: Mailbox,
1792}
1793
1794impl<M> PortReceiver<M> {
1795 fn new(
1796 receiver: mpsc::UnboundedReceiver<M>,
1797 port_id: PortId,
1798 coalesce: bool,
1799 mailbox: Mailbox,
1800 ) -> Self {
1801 Self {
1802 receiver,
1803 port_id,
1804 coalesce,
1805 mailbox,
1806 }
1807 }
1808
1809 #[allow(clippy::result_large_err)] pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
1814 let mut next = self.receiver.try_recv();
1815 if self.coalesce
1817 && let Some(latest) = self.drain().pop()
1818 {
1819 next = Ok(latest);
1820 }
1821 match next {
1822 Ok(msg) => Ok(Some(msg)),
1823 Err(mpsc::error::TryRecvError::Empty) => Ok(None),
1824 Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
1825 self.actor_id().clone(),
1826 MailboxErrorKind::Closed,
1827 )),
1828 }
1829 }
1830
1831 pub async fn recv(&mut self) -> Result<M, MailboxError> {
1834 let mut next = self.receiver.recv().await;
1835 if self.coalesce
1838 && let Some(latest) = self.drain().pop()
1839 {
1840 next = Some(latest);
1841 }
1842 next.ok_or(MailboxError::new(
1843 self.actor_id().clone(),
1844 MailboxErrorKind::Closed,
1845 ))
1846 }
1847
1848 pub fn drain(&mut self) -> Vec<M> {
1850 let mut drained: Vec<M> = Vec::new();
1851 while let Ok(msg) = self.receiver.try_recv() {
1852 if self.coalesce {
1854 drained.pop();
1855 }
1856 drained.push(msg);
1857 }
1858 drained
1859 }
1860
1861 fn port(&self) -> u64 {
1862 self.port_id.1
1863 }
1864
1865 fn actor_id(&self) -> &ActorId {
1866 &self.port_id.0
1867 }
1868}
1869
1870impl<M> Drop for PortReceiver<M> {
1871 fn drop(&mut self) {
1872 self.mailbox.inner.ports.remove(&self.port());
1876 }
1877}
1878
1879impl<M> Stream for PortReceiver<M> {
1880 type Item = Result<M, MailboxError>;
1881
1882 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1883 std::pin::pin!(self.recv()).poll(cx).map(Some)
1884 }
1885}
1886
1887pub struct OncePortReceiver<M> {
1889 receiver: Option<oneshot::Receiver<M>>,
1890 port_id: PortId,
1891
1892 mailbox: Mailbox,
1895}
1896
1897impl<M> OncePortReceiver<M> {
1898 pub async fn recv(mut self) -> Result<M, MailboxError> {
1902 std::mem::take(&mut self.receiver)
1903 .unwrap()
1904 .await
1905 .map_err(|err| {
1906 MailboxError::new(
1907 self.actor_id().clone(),
1908 MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
1909 )
1910 })
1911 }
1912
1913 fn port(&self) -> u64 {
1914 self.port_id.1
1915 }
1916
1917 fn actor_id(&self) -> &ActorId {
1918 &self.port_id.0
1919 }
1920}
1921
1922impl<M> Drop for OncePortReceiver<M> {
1923 fn drop(&mut self) {
1924 self.mailbox.inner.ports.remove(&self.port());
1928 }
1929}
1930
1931pub struct SerializedSenderError {
1933 pub headers: Attrs,
1935 pub data: Serialized,
1937 pub error: MailboxSenderError,
1939}
1940
1941trait SerializedSender: Send + Sync {
1946 fn as_any(&self) -> &dyn Any;
1952
1953 #[allow(clippy::result_large_err)] fn send_serialized(
1961 &self,
1962 headers: Attrs,
1963 serialized: Serialized,
1964 ) -> Result<bool, SerializedSenderError>;
1965}
1966
1967enum UnboundedPortSender<M: Message> {
1969 Mpsc(mpsc::UnboundedSender<M>),
1971 Func(Arc<dyn Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
1973}
1974
1975impl<M: Message> UnboundedPortSender<M> {
1976 fn send(&self, headers: Attrs, message: M) -> Result<(), anyhow::Error> {
1977 match self {
1978 Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
1979 Self::Func(func) => func(headers, message),
1980 }
1981 }
1982}
1983
1984impl<M: Message> Clone for UnboundedPortSender<M> {
1987 fn clone(&self) -> Self {
1988 match self {
1989 Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
1990 Self::Func(func) => Self::Func(func.clone()),
1991 }
1992 }
1993}
1994
1995impl<M: Message> Debug for UnboundedPortSender<M> {
1996 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1997 match self {
1998 Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
1999 Self::Func(_) => f
2000 .debug_tuple("UnboundedPortSender::Func")
2001 .field(&"..")
2002 .finish(),
2003 }
2004 }
2005}
2006
2007struct UnboundedSender<M: Message> {
2008 sender: UnboundedPortSender<M>,
2009 port_id: PortId,
2010}
2011
2012impl<M: Message> UnboundedSender<M> {
2013 fn new(sender: UnboundedPortSender<M>, port_id: PortId) -> Self {
2016 Self { sender, port_id }
2017 }
2018
2019 fn send(&self, headers: Attrs, message: M) -> Result<(), MailboxSenderError> {
2020 self.sender.send(headers, message).map_err(|err| {
2021 MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
2022 })
2023 }
2024}
2025
2026impl<M: Message> Clone for UnboundedSender<M> {
2030 fn clone(&self) -> Self {
2031 Self {
2032 sender: self.sender.clone(),
2033 port_id: self.port_id.clone(),
2034 }
2035 }
2036}
2037
2038impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
2039 fn as_any(&self) -> &dyn Any {
2040 self
2041 }
2042
2043 fn send_serialized(
2044 &self,
2045 headers: Attrs,
2046 serialized: Serialized,
2047 ) -> Result<bool, SerializedSenderError> {
2048 match serialized.deserialized_unchecked() {
2054 Ok(message) => {
2055 self.sender.send(headers.clone(), message).map_err(|err| {
2056 SerializedSenderError {
2057 data: serialized,
2058 error: MailboxSenderError::new_bound(
2059 self.port_id.clone(),
2060 MailboxSenderErrorKind::Other(err),
2061 ),
2062 headers,
2063 }
2064 })?;
2065
2066 Ok(true)
2067 }
2068 Err(err) => Err(SerializedSenderError {
2069 data: serialized,
2070 error: MailboxSenderError::new_bound(
2071 self.port_id.clone(),
2072 MailboxSenderErrorKind::Deserialize(M::typename(), err),
2073 ),
2074 headers,
2075 }),
2076 }
2077 }
2078}
2079
2080#[derive(Debug)]
2083struct OnceSender<M: Message> {
2084 sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
2085 port_id: PortId,
2086}
2087
2088impl<M: Message> OnceSender<M> {
2089 fn new(sender: oneshot::Sender<M>, port_id: PortId) -> Self {
2092 Self {
2093 sender: Arc::new(Mutex::new(Some(sender))),
2094 port_id,
2095 }
2096 }
2097
2098 fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2099 match self.sender.lock().unwrap().take() {
2101 None => Err(MailboxSenderError::new_bound(
2102 self.port_id.clone(),
2103 MailboxSenderErrorKind::Closed,
2104 )),
2105 Some(sender) => {
2106 sender.send(message).map_err(|_| {
2107 MailboxSenderError::new_bound(
2112 self.port_id.clone(),
2113 MailboxSenderErrorKind::Closed,
2114 )
2115 })?;
2116 Ok(false)
2117 }
2118 }
2119 }
2120}
2121
2122impl<M: Message> Clone for OnceSender<M> {
2126 fn clone(&self) -> Self {
2127 Self {
2128 sender: self.sender.clone(),
2129 port_id: self.port_id.clone(),
2130 }
2131 }
2132}
2133
2134impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2135 fn as_any(&self) -> &dyn Any {
2136 self
2137 }
2138
2139 fn send_serialized(
2140 &self,
2141 headers: Attrs,
2142 serialized: Serialized,
2143 ) -> Result<bool, SerializedSenderError> {
2144 match serialized.deserialized() {
2145 Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2146 data: serialized,
2147 error: e,
2148 headers,
2149 }),
2150 Err(err) => Err(SerializedSenderError {
2151 data: serialized,
2152 error: MailboxSenderError::new_bound(
2153 self.port_id.clone(),
2154 MailboxSenderErrorKind::Deserialize(M::typename(), err),
2155 ),
2156 headers,
2157 }),
2158 }
2159 }
2160}
2161
2162pub(crate) struct UntypedUnboundedSender {
2164 pub(crate) sender:
2165 Box<dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync>,
2166 pub(crate) port_id: PortId,
2167}
2168
2169impl SerializedSender for UntypedUnboundedSender {
2170 fn as_any(&self) -> &dyn Any {
2171 self
2172 }
2173
2174 fn send_serialized(
2175 &self,
2176 headers: Attrs,
2177 serialized: Serialized,
2178 ) -> Result<bool, SerializedSenderError> {
2179 (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2180 data,
2181 error: MailboxSenderError::new_bound(
2182 self.port_id.clone(),
2183 MailboxSenderErrorKind::Other(err),
2184 ),
2185 headers,
2186 })?;
2187
2188 Ok(true)
2189 }
2190}
2191
2192struct State {
2194 actor_id: ActorId,
2196
2197 ports: DashMap<u64, Box<dyn SerializedSender>>,
2201
2202 next_port: AtomicU64,
2204
2205 forwarder: BoxedMailboxSender,
2207}
2208
2209impl State {
2210 fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
2212 Self {
2213 actor_id,
2214 ports: DashMap::new(),
2215 next_port: AtomicU64::new(USER_PORT_OFFSET),
2218 forwarder,
2219 }
2220 }
2221
2222 fn allocate_port(&self) -> u64 {
2224 self.next_port.fetch_add(1, Ordering::SeqCst)
2225 }
2226}
2227
2228impl fmt::Debug for State {
2229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2230 f.debug_struct("State")
2231 .field("actor_id", &self.actor_id)
2232 .field(
2233 "open_ports",
2234 &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2235 )
2236 .field("next_port", &self.next_port)
2237 .finish()
2238 }
2239}
2240
2241#[derive(Debug, Clone)]
2245pub struct MailboxMuxer {
2246 mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2247}
2248
2249impl Default for MailboxMuxer {
2250 fn default() -> Self {
2251 Self::new()
2252 }
2253}
2254
2255impl MailboxMuxer {
2256 pub fn new() -> Self {
2258 Self {
2259 mailboxes: Arc::new(DashMap::new()),
2260 }
2261 }
2262
2263 pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
2268 match self.mailboxes.entry(actor_id) {
2269 Entry::Occupied(_) => false,
2270 Entry::Vacant(entry) => {
2271 entry.insert(Box::new(sender));
2272 true
2273 }
2274 }
2275 }
2276
2277 pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2279 self.bind(mailbox.actor_id().clone(), mailbox)
2280 }
2281
2282 pub(crate) fn unbind(&self, actor_id: &ActorId) {
2286 self.mailboxes.remove(actor_id);
2287 }
2288
2289 pub fn bound_actors(&self) -> Vec<ActorId> {
2291 self.mailboxes.iter().map(|e| e.key().clone()).collect()
2292 }
2293}
2294
2295impl MailboxSender for MailboxMuxer {
2296 fn post_unchecked(
2297 &self,
2298 envelope: MessageEnvelope,
2299 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2300 ) {
2301 let dest_actor_id = envelope.dest().actor_id();
2302 match self.mailboxes.get(envelope.dest().actor_id()) {
2303 None => {
2304 let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2305 envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2306 }
2307 Some(sender) => sender.post(envelope, return_handle),
2308 }
2309 }
2310}
2311
2312#[derive(Debug, Clone)]
2315pub struct MailboxRouter {
2316 entries: Arc<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2317}
2318
2319impl Default for MailboxRouter {
2320 fn default() -> Self {
2321 Self::new()
2322 }
2323}
2324
2325impl MailboxRouter {
2326 pub fn new() -> Self {
2328 Self {
2329 entries: Arc::new(RwLock::new(BTreeMap::new())),
2330 }
2331 }
2332
2333 pub fn downgrade(&self) -> WeakMailboxRouter {
2335 WeakMailboxRouter(Arc::downgrade(&self.entries))
2336 }
2337
2338 pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2342 FallbackMailboxRouter {
2343 router: self.clone(),
2344 default,
2345 }
2346 }
2347
2348 pub fn bind(&self, dest: Reference, sender: impl MailboxSender + 'static) {
2352 let mut w = self.entries.write().unwrap();
2353 w.insert(dest, Arc::new(sender));
2354 }
2355
2356 fn sender(&self, actor_id: &ActorId) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2357 match self
2358 .entries
2359 .read()
2360 .unwrap()
2361 .lower_bound(Excluded(&actor_id.clone().into()))
2362 .prev()
2363 {
2364 None => None,
2365 Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2366 Some(sender.clone())
2367 }
2368 Some(_) => None,
2369 }
2370 }
2371}
2372
2373impl MailboxSender for MailboxRouter {
2374 fn post_unchecked(
2375 &self,
2376 envelope: MessageEnvelope,
2377 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2378 ) {
2379 match self.sender(envelope.dest().actor_id()) {
2380 None => envelope.undeliverable(
2381 DeliveryError::Unroutable(
2382 "no destination found for actor in routing table".to_string(),
2383 ),
2384 return_handle,
2385 ),
2386 Some(sender) => sender.post(envelope, return_handle),
2387 }
2388 }
2389}
2390
2391#[derive(Debug, Clone)]
2392struct FallbackMailboxRouter {
2393 router: MailboxRouter,
2394 default: BoxedMailboxSender,
2395}
2396
2397impl MailboxSender for FallbackMailboxRouter {
2398 fn post_unchecked(
2399 &self,
2400 envelope: MessageEnvelope,
2401 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2402 ) {
2403 match self.router.sender(envelope.dest().actor_id()) {
2404 Some(sender) => sender.post(envelope, return_handle),
2405 None => self.default.post(envelope, return_handle),
2406 }
2407 }
2408}
2409
2410#[derive(Debug, Clone)]
2419pub struct WeakMailboxRouter(
2420 Weak<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2421);
2422
2423impl WeakMailboxRouter {
2424 pub fn upgrade(&self) -> Option<MailboxRouter> {
2426 self.0.upgrade().map(|entries| MailboxRouter { entries })
2427 }
2428}
2429
2430impl MailboxSender for WeakMailboxRouter {
2431 fn post_unchecked(
2432 &self,
2433 envelope: MessageEnvelope,
2434 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2435 ) {
2436 match self.upgrade() {
2437 Some(router) => router.post(envelope, return_handle),
2438 None => envelope.undeliverable(
2439 DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2440 return_handle,
2441 ),
2442 }
2443 }
2444}
2445
2446#[derive(Debug, Clone)]
2460pub struct DialMailboxRouter {
2461 address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,
2462 sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2463
2464 default: BoxedMailboxSender,
2467
2468 direct_addressed_remote_only: bool,
2471}
2472
2473impl Default for DialMailboxRouter {
2474 fn default() -> Self {
2475 Self::new()
2476 }
2477}
2478
2479impl DialMailboxRouter {
2480 pub fn new() -> Self {
2482 Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2483 }
2484
2485 pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2490 Self {
2491 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2492 sender_cache: Arc::new(DashMap::new()),
2493 default,
2494 direct_addressed_remote_only: false,
2495 }
2496 }
2497
2498 pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
2503 Self {
2504 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2505 sender_cache: Arc::new(DashMap::new()),
2506 default,
2507 direct_addressed_remote_only: true,
2508 }
2509 }
2510
2511 pub fn bind(&self, dest: Reference, addr: ChannelAddr) {
2517 if let Ok(mut w) = self.address_book.write() {
2518 if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
2519 && old_addr != addr
2520 {
2521 tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2522 self.sender_cache.remove(&old_addr);
2523 }
2524 } else {
2525 tracing::error!("address book poisoned during bind of {:?}", dest);
2526 }
2527 }
2528
2529 pub fn unbind(&self, dest: &Reference) {
2535 if let Ok(mut w) = self.address_book.write() {
2536 let to_remove: Vec<(Reference, ChannelAddr)> = w
2537 .range(dest..)
2538 .take_while(|(key, _)| dest.is_prefix_of(key))
2539 .map(|(key, addr)| (key.clone(), addr.clone()))
2540 .collect();
2541
2542 for (key, addr) in to_remove {
2543 tracing::info!("unbinding {:?} from {:?}", key, addr);
2544 w.remove(&key);
2545 self.sender_cache.remove(&addr);
2546 }
2547 } else {
2548 tracing::error!("address book poisoned during unbind of {:?}", dest);
2549 }
2550 }
2551
2552 pub fn lookup_addr(&self, actor_id: &ActorId) -> Option<ChannelAddr> {
2554 let address_book = self.address_book.read().unwrap();
2555 let found = address_book
2556 .lower_bound(Excluded(&actor_id.clone().into()))
2557 .prev();
2558
2559 if let Some((key, addr)) = found
2562 && key.is_prefix_of(&actor_id.clone().into())
2563 {
2564 Some(addr.clone())
2565 } else if actor_id.proc_id().is_direct() {
2566 let (addr, _name) = actor_id.proc_id().clone().into_direct().unwrap();
2567 if self.direct_addressed_remote_only {
2568 addr.transport().is_remote().then_some(addr)
2569 } else {
2570 Some(addr)
2571 }
2572 } else {
2573 None
2574 }
2575 }
2576
2577 pub fn prefixes(&self) -> BTreeSet<Reference> {
2580 let addrs = self.address_book.read().unwrap();
2581 let mut prefixes: BTreeSet<Reference> = BTreeSet::new();
2582 for (reference, _) in addrs.iter() {
2583 match prefixes.lower_bound(Excluded(reference)).peek_prev() {
2584 Some(candidate) if candidate.is_prefix_of(reference) => (),
2585 _ => {
2586 prefixes.insert(reference.clone());
2587 }
2588 }
2589 }
2590
2591 prefixes
2592 }
2593
2594 fn dial(
2595 &self,
2596 addr: &ChannelAddr,
2597 actor_id: &ActorId,
2598 ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2599 match self.sender_cache.entry(addr.clone()) {
2603 Entry::Occupied(entry) => Ok(entry.get().clone()),
2604 Entry::Vacant(entry) => {
2605 let tx = channel::dial(addr.clone()).map_err(|err| {
2606 MailboxSenderError::new_unbound_type(
2607 actor_id.clone(),
2608 MailboxSenderErrorKind::Channel(err),
2609 "unknown",
2610 )
2611 })?;
2612 let sender = MailboxClient::new(tx);
2613 Ok(entry.insert(Arc::new(sender)).value().clone())
2614 }
2615 }
2616 }
2617}
2618
2619impl MailboxSender for DialMailboxRouter {
2620 fn post_unchecked(
2621 &self,
2622 envelope: MessageEnvelope,
2623 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2624 ) {
2625 let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2626 self.default.post(envelope, return_handle);
2627 return;
2628 };
2629
2630 match self.dial(&addr, envelope.dest().actor_id()) {
2631 Err(err) => envelope.undeliverable(
2632 DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2633 return_handle,
2634 ),
2635 Ok(sender) => sender.post(envelope, return_handle),
2636 }
2637 }
2638}
2639
2640#[derive(Debug)]
2643pub struct UnroutableMailboxSender;
2644
2645impl MailboxSender for UnroutableMailboxSender {
2646 fn post_unchecked(
2647 &self,
2648 envelope: MessageEnvelope,
2649 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2650 ) {
2651 envelope.undeliverable(
2652 DeliveryError::Unroutable("destination not found in routing table".to_string()),
2653 return_handle,
2654 );
2655 }
2656}
2657
2658#[cfg(test)]
2659mod tests {
2660
2661 use std::assert_matches::assert_matches;
2662 use std::mem::drop;
2663 use std::str::FromStr;
2664 use std::sync::atomic::AtomicUsize;
2665 use std::time::Duration;
2666
2667 use timed_test::async_timed_test;
2668
2669 use super::*;
2670 use crate::Actor;
2671 use crate::ActorHandle;
2672 use crate::Instance;
2673 use crate::PortId;
2674 use crate::accum;
2675 use crate::channel::ChannelTransport;
2676 use crate::channel::dial;
2677 use crate::channel::serve;
2678 use crate::channel::sim::SimAddr;
2679 use crate::clock::Clock;
2680 use crate::clock::RealClock;
2681 use crate::data::Serialized;
2682 use crate::id;
2683 use crate::proc::Proc;
2684 use crate::reference::ProcId;
2685 use crate::reference::WorldId;
2686 use crate::simnet;
2687
2688 #[test]
2689 fn test_error() {
2690 let err = MailboxError::new(
2691 ActorId(
2692 ProcId::Ranked(WorldId("myworld".to_string()), 2),
2693 "myactor".to_string(),
2694 5,
2695 ),
2696 MailboxErrorKind::Closed,
2697 );
2698 assert_eq!(format!("{}", err), "myworld[2].myactor[5]: mailbox closed");
2699 }
2700
2701 #[tokio::test]
2702 async fn test_mailbox_basic() {
2703 let mbox = Mailbox::new_detached(id!(test[0].test));
2704 let (port, mut receiver) = mbox.open_port::<u64>();
2705 let port = port.bind();
2706
2707 mbox.serialize_and_send(&port, 123, monitored_return_handle())
2708 .unwrap();
2709 mbox.serialize_and_send(&port, 321, monitored_return_handle())
2710 .unwrap();
2711 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2712 assert_eq!(receiver.recv().await.unwrap(), 321u64);
2713
2714 let serialized = Serialized::serialize(&999u64).unwrap();
2715 mbox.post(
2716 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2717 monitored_return_handle(),
2718 );
2719 assert_eq!(receiver.recv().await.unwrap(), 999u64);
2720 }
2721
2722 #[tokio::test]
2723 async fn test_mailbox_accum() {
2724 let mbox = Mailbox::new_detached(id!(test[0].test));
2725 let (port, mut receiver) = mbox.open_accum_port(accum::max::<i64>());
2726
2727 for i in -3..4 {
2728 port.send(i).unwrap();
2729 let received: accum::Max<i64> = receiver.recv().await.unwrap();
2730 let msg = received.get();
2731 assert_eq!(msg, &i);
2732 }
2733 for i in -3..4 {
2735 port.send(i).unwrap();
2736 assert_eq!(receiver.recv().await.unwrap().get(), &3);
2737 }
2738 port.send(4).unwrap();
2740 assert_eq!(receiver.recv().await.unwrap().get(), &4);
2741
2742 for i in 5..10 {
2744 port.send(i).unwrap();
2745 }
2746 assert_eq!(receiver.recv().await.unwrap().get(), &9);
2747 port.send(1).unwrap();
2748 port.send(3).unwrap();
2749 port.send(2).unwrap();
2750 assert_eq!(receiver.recv().await.unwrap().get(), &9);
2751 }
2752
2753 #[test]
2754 fn test_port_and_reducer() {
2755 let mbox = Mailbox::new_detached(id!(test[0].test));
2756 {
2758 let accumulator = accum::max::<u64>();
2759 let reducer_spec = accumulator.reducer_spec().unwrap();
2760 let (port, _) = mbox.open_accum_port(accum::max::<u64>());
2761 assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
2762 let port_ref = port.bind();
2763 assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
2764 }
2765 {
2767 let (port, _) = mbox.open_port::<u64>();
2768 assert_eq!(port.reducer_spec, None);
2769 let port_ref = port.bind();
2770 assert_eq!(port_ref.reducer_spec(), &None);
2771 }
2772 }
2773
2774 #[tokio::test]
2775 #[ignore] async fn test_mailbox_once() {
2777 let mbox = Mailbox::new_detached(id!(test[0].test));
2778
2779 let (port, receiver) = mbox.open_once_port::<u64>();
2780
2781 port.send(123u64).unwrap();
2784 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2785
2786 }
2797
2798 #[tokio::test]
2799 #[ignore] async fn test_mailbox_receiver_drop() {
2801 let mbox = Mailbox::new_detached(id!(test[0].test));
2802 let (port, mut receiver) = mbox.open_port::<u64>();
2803 let port = port.bind();
2805 mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
2806 .unwrap();
2807 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2808 drop(receiver);
2809 let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
2810 panic!();
2811 };
2812
2813 assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2814 assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
2815 }
2816
2817 #[tokio::test]
2818 async fn test_drain() {
2819 let mbox = Mailbox::new_detached(id!(test[0].test));
2820
2821 let (port, mut receiver) = mbox.open_port();
2822 let port = port.bind();
2823
2824 for i in 0..10 {
2825 mbox.serialize_and_send(&port, i, monitored_return_handle())
2826 .unwrap();
2827 }
2828
2829 for i in 0..10 {
2830 assert_eq!(receiver.recv().await.unwrap(), i);
2831 }
2832
2833 assert!(receiver.drain().is_empty());
2834 }
2835
2836 #[tokio::test]
2837 async fn test_mailbox_muxer() {
2838 let muxer = MailboxMuxer::new();
2839
2840 let mbox0 = Mailbox::new_detached(id!(test[0].actor1));
2841 let mbox1 = Mailbox::new_detached(id!(test[0].actor2));
2842
2843 muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
2844 muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
2845
2846 let (port, receiver) = mbox0.open_once_port::<u64>();
2847
2848 port.send(123u64).unwrap();
2849 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2850
2851 }
2861
2862 #[tokio::test]
2863 async fn test_local_client_server() {
2864 let mbox = Mailbox::new_detached(id!(test[0].actor0));
2865 let (tx, rx) = channel::local::new();
2866 let serve_handle = mbox.clone().serve(rx);
2867 let client = MailboxClient::new(tx);
2868
2869 let (port, receiver) = mbox.open_once_port::<u64>();
2870 let port = port.bind();
2871
2872 client
2873 .serialize_and_send_once(port, 123u64, monitored_return_handle())
2874 .unwrap();
2875 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2876 serve_handle.stop("fromt test");
2877 serve_handle.await.unwrap().unwrap();
2878 }
2879
2880 #[tokio::test]
2881 async fn test_sim_client_server() {
2882 simnet::start();
2883 let dst_addr = SimAddr::new("local:1".parse::<ChannelAddr>().unwrap()).unwrap();
2884 let src_to_dst = ChannelAddr::Sim(
2885 SimAddr::new_with_src(
2886 "local:0".parse::<ChannelAddr>().unwrap(),
2887 dst_addr.addr().clone(),
2888 )
2889 .unwrap(),
2890 );
2891
2892 let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone())).unwrap();
2893 let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
2894 let mbox = Mailbox::new_detached(id!(test[0].actor0));
2895 let serve_handle = mbox.clone().serve(rx);
2896 let client = MailboxClient::new(tx);
2897 let (port, receiver) = mbox.open_once_port::<u64>();
2898 let port = port.bind();
2899 let msg: u64 = 123;
2900 client
2901 .serialize_and_send_once(port, msg, monitored_return_handle())
2902 .unwrap();
2903 assert_eq!(receiver.recv().await.unwrap(), msg);
2904 serve_handle.stop("from test");
2905 serve_handle.await.unwrap().unwrap();
2906 }
2907
2908 #[tokio::test]
2909 async fn test_mailbox_router() {
2910 let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2911 let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2912 let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2913 let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2914
2915 let comms: Vec<(OncePortRef<u64>, OncePortReceiver<u64>)> =
2916 [&mbox0, &mbox1, &mbox2, &mbox3]
2917 .into_iter()
2918 .map(|mbox| {
2919 let (port, receiver) = mbox.open_once_port::<u64>();
2920 (port.bind(), receiver)
2921 })
2922 .collect();
2923
2924 let router = MailboxRouter::new();
2925
2926 router.bind(id!(world0).into(), mbox0);
2927 router.bind(id!(world1[0]).into(), mbox1);
2928 router.bind(id!(world1[1]).into(), mbox2);
2929 router.bind(id!(world1[1].actor1).into(), mbox3);
2930
2931 for (i, (port, receiver)) in comms.into_iter().enumerate() {
2932 router
2933 .serialize_and_send_once(port, i as u64, monitored_return_handle())
2934 .unwrap();
2935 assert_eq!(receiver.recv().await.unwrap(), i as u64);
2936 }
2937
2938 let mbox4 = Mailbox::new_detached(id!(fallback[0].actor));
2941
2942 let (return_handle, mut return_receiver) =
2943 crate::mailbox::undeliverable::new_undeliverable_port();
2944 let (port, _receiver) = mbox4.open_once_port();
2945 router
2946 .serialize_and_send_once(port.bind(), 0, return_handle.clone())
2947 .unwrap();
2948 assert!(return_receiver.recv().await.is_ok());
2949
2950 let router = router.fallback(mbox4.clone().into_boxed());
2951 let (port, receiver) = mbox4.open_once_port();
2952 router
2953 .serialize_and_send_once(port.bind(), 0, return_handle)
2954 .unwrap();
2955 assert_eq!(receiver.recv().await.unwrap(), 0);
2956 }
2957
2958 #[tokio::test]
2959 async fn test_dial_mailbox_router() {
2960 let router = DialMailboxRouter::new();
2961
2962 router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
2963 router.bind(id!(world1[0]).into(), "unix!@2".parse().unwrap());
2964 router.bind(id!(world1[1]).into(), "unix!@3".parse().unwrap());
2965 router.bind(id!(world1[1].actor1).into(), "unix!@4".parse().unwrap());
2966 router.bind(
2968 "unix:@4,my_proc,my_actor".parse().unwrap(),
2969 "unix:@5".parse().unwrap(),
2970 );
2971
2972 router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2974 router.lookup_addr(&id!(world1[0].actor[0])).unwrap();
2975
2976 let actor_id = Reference::from_str("unix:@4,my_proc,my_actor")
2977 .unwrap()
2978 .into_actor()
2979 .unwrap();
2980 assert_eq!(
2981 router.lookup_addr(&actor_id).unwrap(),
2982 "unix!@5".parse().unwrap(),
2983 );
2984 router.unbind(&actor_id.clone().into());
2985 assert_eq!(
2986 router.lookup_addr(&actor_id).unwrap(),
2987 "unix!@4".parse().unwrap(),
2988 );
2989
2990 router.unbind(&id!(world1).into());
2992 assert!(router.lookup_addr(&id!(world1[0].actor1[0])).is_none());
2993 assert!(router.lookup_addr(&id!(world1[1].actor1[0])).is_none());
2994 assert!(router.lookup_addr(&id!(world1[2].actor1[0])).is_none());
2995 router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2996 router.unbind(&id!(world0).into());
2997 assert!(router.lookup_addr(&id!(world0[0].actor[0])).is_none());
2998 }
2999
3000 #[tokio::test]
3001 #[ignore] async fn test_dial_mailbox_router_default() {
3003 let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
3004 let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
3005 let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
3006 let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
3007
3008 let root = MailboxRouter::new();
3011 let world0_router = DialMailboxRouter::new_with_default(root.boxed());
3012 let world1_router = DialMailboxRouter::new_with_default(root.boxed());
3013
3014 root.bind(id!(world0).into(), world0_router.clone());
3015 root.bind(id!(world1).into(), world1_router.clone());
3016
3017 let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
3018
3019 let mut handles = Vec::new(); for mbox in mailboxes.iter() {
3021 let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
3022 let handle = (*mbox).clone().serve(rx);
3023 handles.push(handle);
3024
3025 eprintln!("{}: {}", mbox.actor_id(), addr);
3026 if mbox.actor_id().world_name() == "world0" {
3027 world0_router.bind(mbox.actor_id().clone().into(), addr);
3028 } else {
3029 world1_router.bind(mbox.actor_id().clone().into(), addr);
3030 }
3031 }
3032
3033 for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
3035 for mbox in mailboxes.iter() {
3036 let (port, receiver) = mbox.open_once_port::<u64>();
3037 let port = port.bind();
3038 router
3039 .serialize_and_send_once(port, 123u64, monitored_return_handle())
3040 .unwrap();
3041 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3042 }
3043 }
3044 }
3045
3046 #[tokio::test]
3047 async fn test_enqueue_port() {
3048 let mbox = Mailbox::new_detached(id!(test[0].test));
3049
3050 let count = Arc::new(AtomicUsize::new(0));
3051 let count_clone = count.clone();
3052 let port = mbox.open_enqueue_port(move |_, n| {
3053 count_clone.fetch_add(n, Ordering::SeqCst);
3054 Ok(())
3055 });
3056
3057 port.send(10).unwrap();
3058 port.send(5).unwrap();
3059 port.send(1).unwrap();
3060 port.send(0).unwrap();
3061
3062 assert_eq!(count.load(Ordering::SeqCst), 16);
3063 }
3064
3065 #[derive(Clone, Debug, Serialize, Deserialize, Named)]
3066 struct TestMessage;
3067
3068 #[derive(Clone, Debug, Serialize, Deserialize, Named)]
3069 #[named(name = "some::custom::path")]
3070 struct TestMessage2;
3071
3072 #[test]
3073 fn test_remote_message_macros() {
3074 assert_eq!(
3075 TestMessage::typename(),
3076 "hyperactor::mailbox::tests::TestMessage"
3077 );
3078 assert_eq!(TestMessage2::typename(), "some::custom::path");
3079 }
3080
3081 #[test]
3082 fn test_message_envelope_display() {
3083 #[derive(Named, Serialize, Deserialize)]
3084 struct MyTest {
3085 a: u64,
3086 b: String,
3087 }
3088 crate::register_type!(MyTest);
3089
3090 let envelope = MessageEnvelope::serialize(
3091 id!(source[0].actor),
3092 id!(dest[1].actor[0][123]),
3093 &MyTest {
3094 a: 123,
3095 b: "hello".into(),
3096 },
3097 Attrs::new(),
3098 )
3099 .unwrap();
3100
3101 assert_eq!(
3102 format!("{}", envelope),
3103 r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"} {}"#
3104 );
3105 }
3106
3107 #[derive(Debug, Default, Actor)]
3108 struct Foo;
3109
3110 #[tokio::test]
3113 async fn test_actor_delivery_failure() {
3114 use crate::actor::ActorStatus;
3117 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
3118
3119 let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
3120 BOXED_PANICKING_MAILBOX_SENDER.clone(),
3121 ));
3122 let proc_id = id!(quux[0]);
3123 let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
3124 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3125
3126 let foo = proc.spawn::<Foo>("foo", ()).await.unwrap();
3127 let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
3128 let message = MessageEnvelope::new(
3129 foo.actor_id().clone(),
3130 PortId(id!(corge[0].bar), 9999u64),
3131 Serialized::serialize(&1u64).unwrap(),
3132 Attrs::new(),
3133 );
3134 return_handle.send(Undeliverable(message)).unwrap();
3135
3136 RealClock
3137 .sleep(tokio::time::Duration::from_millis(100))
3138 .await;
3139
3140 let foo_status = foo.status();
3141 assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
3142 let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
3143 unreachable!()
3144 };
3145 assert!(msg.to_string().contains(
3146 "a message from \
3147 quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
3148 ));
3149
3150 proc.destroy_and_wait::<()>(tokio::time::Duration::from_secs(1), None)
3151 .await
3152 .unwrap();
3153 }
3154
3155 #[tokio::test]
3156 async fn test_detached_return_handle() {
3157 let (return_handle, mut return_receiver) =
3158 crate::mailbox::undeliverable::new_undeliverable_port();
3159 let envelope = MessageEnvelope::new(
3161 id!(foo[0].bar),
3162 PortId(id!(baz[0].corge), 9999u64),
3163 Serialized::serialize(&1u64).unwrap(),
3164 Attrs::new(),
3165 );
3166 return_handle.send(Undeliverable(envelope.clone())).unwrap();
3167 assert!(
3169 RealClock
3170 .timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
3171 .await
3172 .is_ok()
3173 );
3174 let monitor_handle = tokio::spawn(async move {
3177 while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
3178 envelope.set_error(DeliveryError::BrokenLink(
3179 "returned in unit test".to_string(),
3180 ));
3181 UndeliverableMailboxSender
3182 .post(envelope, monitored_return_handle());
3183 }
3184 });
3185 drop(return_handle);
3186 assert!(
3187 RealClock
3188 .timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3189 .await
3190 .is_ok()
3191 );
3192 }
3193
3194 async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3195 fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3196 let dummy_state =
3199 State::new(id!(world[0].actor), BOXED_PANICKING_MAILBOX_SENDER.clone());
3200 let dummy_port_id = PortId(id!(world[0].actor), 0);
3201 let (sender, receiver) = mpsc::unbounded_channel::<M>();
3202 let receiver = PortReceiver {
3203 receiver,
3204 port_id: dummy_port_id,
3205 coalesce,
3206 mailbox: Mailbox {
3207 inner: Arc::new(dummy_state),
3208 },
3209 };
3210 (sender, receiver)
3211 }
3212
3213 {
3215 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3216 assert!(receiver.drain().is_empty());
3217
3218 sender.send(0).unwrap();
3219 sender.send(1).unwrap();
3220 sender.send(2).unwrap();
3221 sender.send(3).unwrap();
3222 sender.send(4).unwrap();
3223 sender.send(5).unwrap();
3224 sender.send(6).unwrap();
3225 sender.send(7).unwrap();
3226
3227 if drop_sender {
3228 drop(sender);
3229 }
3230
3231 if !coalesce {
3232 assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3233 } else {
3234 assert_eq!(receiver.drain(), vec![7]);
3235 }
3236
3237 assert!(receiver.drain().is_empty());
3238 assert!(receiver.drain().is_empty());
3239 }
3240
3241 {
3243 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3244 assert!(receiver.try_recv().unwrap().is_none());
3245
3246 sender.send(0).unwrap();
3247 sender.send(1).unwrap();
3248 sender.send(2).unwrap();
3249 sender.send(3).unwrap();
3250
3251 if drop_sender {
3252 drop(sender);
3253 }
3254
3255 if !coalesce {
3256 assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3257 assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3258 assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3259 }
3260 assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3261 if drop_sender {
3262 assert_matches!(
3263 receiver.try_recv().unwrap_err().kind(),
3264 MailboxErrorKind::Closed
3265 );
3266 assert_matches!(
3268 receiver.try_recv().unwrap_err().kind(),
3269 MailboxErrorKind::Closed
3270 );
3271 } else {
3272 assert!(receiver.try_recv().unwrap().is_none());
3273 assert!(receiver.try_recv().unwrap().is_none());
3275 }
3276 }
3277 {
3279 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3280 assert!(
3281 RealClock
3282 .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3283 .await
3284 .is_err()
3285 );
3286
3287 sender.send(4).unwrap();
3288 sender.send(5).unwrap();
3289 sender.send(6).unwrap();
3290 sender.send(7).unwrap();
3291
3292 if drop_sender {
3293 drop(sender);
3294 }
3295
3296 if !coalesce {
3297 assert_eq!(receiver.recv().await.unwrap(), 4);
3298 assert_eq!(receiver.recv().await.unwrap(), 5);
3299 assert_eq!(receiver.recv().await.unwrap(), 6);
3300 }
3301 assert_eq!(receiver.recv().await.unwrap(), 7);
3302 if drop_sender {
3303 assert_matches!(
3304 receiver.recv().await.unwrap_err().kind(),
3305 MailboxErrorKind::Closed
3306 );
3307 assert_matches!(
3309 receiver.recv().await.unwrap_err().kind(),
3310 MailboxErrorKind::Closed
3311 );
3312 } else {
3313 assert!(
3314 RealClock
3315 .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3316 .await
3317 .is_err()
3318 );
3319 }
3320 }
3321 }
3322
3323 #[tokio::test]
3324 async fn test_receiver_basic_default() {
3325 verify_receiver(false, false).await
3326 }
3327
3328 #[tokio::test]
3329 async fn test_receiver_basic_latest() {
3330 verify_receiver(true, false).await
3331 }
3332
3333 #[tokio::test]
3334 async fn test_receiver_after_sender_drop_default() {
3335 verify_receiver(false, true).await
3336 }
3337
3338 #[tokio::test]
3339 async fn test_receiver_after_sender_drop_latest() {
3340 verify_receiver(true, true).await
3341 }
3342
3343 struct Setup {
3344 receiver: PortReceiver<u64>,
3345 actor0: Instance<()>,
3346 actor1: Instance<()>,
3347 _actor0_handle: ActorHandle<()>,
3348 _actor1_handle: ActorHandle<()>,
3349 port_id: PortId,
3350 port_id1: PortId,
3351 port_id2: PortId,
3352 port_id2_1: PortId,
3353 }
3354
3355 async fn setup_split_port_ids(
3356 reducer_spec: Option<ReducerSpec>,
3357 reducer_opts: Option<ReducerOpts>,
3358 ) -> Setup {
3359 let proc = Proc::local();
3360 let (actor0, actor0_handle) = proc.instance("actor0").unwrap();
3361 let (actor1, actor1_handle) = proc.instance("actor1").unwrap();
3362
3363 let (port_handle, receiver) = actor0.open_port::<u64>();
3365 let port_id = port_handle.bind().port_id().clone();
3366
3367 let port_id1 = port_id
3369 .split(&actor1, reducer_spec.clone(), reducer_opts.clone(), true)
3370 .unwrap();
3371 let port_id2 = port_id
3372 .split(&actor1, reducer_spec.clone(), reducer_opts.clone(), true)
3373 .unwrap();
3374
3375 let port_id2_1 = port_id2
3377 .split(&actor1, reducer_spec, reducer_opts.clone(), true)
3378 .unwrap();
3379
3380 Setup {
3381 receiver,
3382 actor0,
3383 actor1,
3384 _actor0_handle: actor0_handle,
3385 _actor1_handle: actor1_handle,
3386 port_id,
3387 port_id1,
3388 port_id2,
3389 port_id2_1,
3390 }
3391 }
3392
3393 fn post(cx: &impl context::Actor, port_id: PortId, msg: u64) {
3394 let serialized = Serialized::serialize(&msg).unwrap();
3395 port_id.send(cx, serialized);
3396 }
3397
3398 #[async_timed_test(timeout_secs = 30)]
3399 #[cfg_attr(not(fbcode_build), ignore)]
3401 async fn test_split_port_id_no_reducer() {
3402 let Setup {
3403 mut receiver,
3404 actor0,
3405 actor1,
3406 port_id,
3407 port_id1,
3408 port_id2,
3409 port_id2_1,
3410 ..
3411 } = setup_split_port_ids(None, None).await;
3412 post(&actor0, port_id.clone(), 1);
3414 assert_eq!(receiver.recv().await.unwrap(), 1);
3415 post(&actor1, port_id1.clone(), 2);
3416 assert_eq!(receiver.recv().await.unwrap(), 2);
3417 post(&actor1, port_id2.clone(), 3);
3418 assert_eq!(receiver.recv().await.unwrap(), 3);
3419 post(&actor1, port_id2_1.clone(), 4);
3420 assert_eq!(receiver.recv().await.unwrap(), 4);
3421
3422 RealClock.sleep(Duration::from_secs(2)).await;
3424 let msg = receiver.try_recv().unwrap();
3425 assert_eq!(msg, None);
3426 }
3427
3428 async fn wait_for(
3429 receiver: &mut PortReceiver<u64>,
3430 expected_size: usize,
3431 timeout_duration: Duration,
3432 ) -> anyhow::Result<Vec<u64>> {
3433 let mut messeges = vec![];
3434
3435 RealClock
3436 .timeout(timeout_duration, async {
3437 loop {
3438 let msg = receiver.recv().await.unwrap();
3439 messeges.push(msg);
3440 if messeges.len() == expected_size {
3441 break;
3442 }
3443 }
3444 })
3445 .await?;
3446 Ok(messeges)
3447 }
3448
3449 #[async_timed_test(timeout_secs = 30)]
3450 async fn test_split_port_id_sum_reducer() {
3451 let config = crate::config::global::lock();
3452 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3453
3454 let sum_accumulator = accum::sum::<u64>();
3455 let reducer_spec = sum_accumulator.reducer_spec();
3456 let Setup {
3457 mut receiver,
3458 actor0,
3459 actor1,
3460 port_id,
3461 port_id1,
3462 port_id2,
3463 port_id2_1,
3464 ..
3465 } = setup_split_port_ids(reducer_spec, None).await;
3466 post(&actor0, port_id.clone(), 4);
3467 post(&actor1, port_id1.clone(), 2);
3468 post(&actor1, port_id2.clone(), 3);
3469 post(&actor1, port_id2_1.clone(), 1);
3470 let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3471 .await
3472 .unwrap();
3473 messages.sort();
3476 assert_eq!(messages, vec![1, 2, 3, 4]);
3477
3478 RealClock.sleep(Duration::from_secs(2)).await;
3480 let msg = receiver.try_recv().unwrap();
3481 assert_eq!(msg, None);
3482 }
3483
3484 #[async_timed_test(timeout_secs = 30)]
3485 #[cfg_attr(not(fbcode_build), ignore)]
3487 async fn test_split_port_id_every_n_messages() {
3488 let config = crate::config::global::lock();
3489 let _config_guard = config.override_key(
3490 crate::config::SPLIT_MAX_BUFFER_AGE,
3491 Duration::from_secs(600),
3492 );
3493 let proc = Proc::local();
3494 let (actor, _actor_handle) = proc.instance("actor").unwrap();
3495 let (port_handle, mut receiver) = actor.open_port::<u64>();
3496 let port_id = port_handle.bind().port_id().clone();
3497 let reducer_spec = accum::sum::<u64>().reducer_spec();
3499 let split_port_id = port_id.split(&actor, reducer_spec, None, true).unwrap();
3500
3501 for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3503 post(&actor, split_port_id.clone(), msg);
3504 }
3505 let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3508 .await
3509 .unwrap();
3510 assert_eq!(messages, vec![15]);
3511
3512 RealClock.sleep(Duration::from_secs(2)).await;
3515 let msg = receiver.try_recv().unwrap();
3516 assert_eq!(msg, None);
3517 }
3518
3519 #[async_timed_test(timeout_secs = 30)]
3520 async fn test_split_port_timeout_flush() {
3521 let config = crate::config::global::lock();
3522 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
3523
3524 let Setup {
3525 mut receiver,
3526 actor0: _,
3527 actor1,
3528 port_id: _,
3529 port_id1,
3530 port_id2: _,
3531 port_id2_1: _,
3532 ..
3533 } = setup_split_port_ids(
3534 Some(accum::sum::<u64>().reducer_spec().unwrap()),
3535 Some(ReducerOpts {
3536 max_update_interval: Some(Duration::from_millis(50)),
3537 }),
3538 )
3539 .await;
3540
3541 post(&actor1, port_id1.clone(), 10);
3542 post(&actor1, port_id1.clone(), 20);
3543 post(&actor1, port_id1.clone(), 30);
3544
3545 RealClock.sleep(Duration::from_millis(10)).await;
3547 let msg = receiver.try_recv().unwrap();
3548 assert_eq!(msg, None);
3549
3550 RealClock.sleep(Duration::from_millis(100)).await;
3552
3553 let msg = receiver.recv().await.unwrap();
3555 assert_eq!(msg, 60); let msg = receiver.try_recv().unwrap();
3559 assert_eq!(msg, None);
3560 }
3561
3562 #[async_timed_test(timeout_secs = 30)]
3563 async fn test_split_port_timeout_and_size_flush() {
3564 let config = crate::config::global::lock();
3565 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
3566
3567 let Setup {
3568 mut receiver,
3569 actor0: _,
3570 actor1,
3571 port_id: _,
3572 port_id1,
3573 port_id2: _,
3574 port_id2_1: _,
3575 ..
3576 } = setup_split_port_ids(
3577 Some(accum::sum::<u64>().reducer_spec().unwrap()),
3578 Some(ReducerOpts {
3579 max_update_interval: Some(Duration::from_millis(50)),
3580 }),
3581 )
3582 .await;
3583
3584 post(&actor1, port_id1.clone(), 10);
3585 post(&actor1, port_id1.clone(), 20);
3586 post(&actor1, port_id1.clone(), 30);
3587 post(&actor1, port_id1.clone(), 40);
3588
3589 let msg = receiver.recv().await.unwrap();
3591 assert_eq!(msg, 60);
3592
3593 let msg = receiver.recv().await.unwrap();
3595 assert_eq!(msg, 40);
3596
3597 let msg = receiver.try_recv().unwrap();
3599 assert_eq!(msg, None);
3600 }
3601
3602 #[test]
3603 fn test_dial_mailbox_router_prefixes_empty() {
3604 assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
3605 }
3606
3607 #[test]
3608 fn test_dial_mailbox_router_prefixes_single_entry() {
3609 let router = DialMailboxRouter::new();
3610 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3611
3612 let prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3613 assert_eq!(prefixes.len(), 1);
3614 assert_eq!(prefixes[0], id!(world0).into());
3615 }
3616
3617 #[test]
3618 fn test_dial_mailbox_router_prefixes_no_overlap() {
3619 let router = DialMailboxRouter::new();
3620 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3621 router.bind(id!(world1).into(), "unix!@2".parse().unwrap());
3622 router.bind(id!(world2).into(), "unix!@3".parse().unwrap());
3623
3624 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3625 prefixes.sort();
3626
3627 let mut expected = vec![id!(world0).into(), id!(world1).into(), id!(world2).into()];
3628 expected.sort();
3629
3630 assert_eq!(prefixes, expected);
3631 }
3632
3633 #[test]
3634 fn test_dial_mailbox_router_prefixes_with_overlaps() {
3635 let router = DialMailboxRouter::new();
3636 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3637 router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3638 router.bind(id!(world0[1]).into(), "unix!@3".parse().unwrap());
3639 router.bind(id!(world1).into(), "unix!@4".parse().unwrap());
3640 router.bind(id!(world1[0]).into(), "unix!@5".parse().unwrap());
3641
3642 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3643 prefixes.sort();
3644
3645 let mut expected = vec![id!(world0).into(), id!(world1).into()];
3647 expected.sort();
3648
3649 assert_eq!(prefixes, expected);
3650 }
3651
3652 #[test]
3653 fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
3654 let router = DialMailboxRouter::new();
3655 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3656 router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3657 router.bind(id!(world0[0].actor1).into(), "unix!@3".parse().unwrap());
3658 router.bind(id!(world1[0]).into(), "unix!@4".parse().unwrap());
3659 router.bind(id!(world1[1]).into(), "unix!@5".parse().unwrap());
3660 router.bind(id!(world2[0].actor0).into(), "unix!@6".parse().unwrap());
3661
3662 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3663 prefixes.sort();
3664
3665 let expected = vec![
3671 id!(world0).into(),
3672 id!(world1[0]).into(),
3673 id!(world1[1]).into(),
3674 id!(world2[0].actor0).into(),
3675 ];
3676
3677 assert_eq!(prefixes, expected);
3678 }
3679
3680 #[test]
3681 fn test_dial_mailbox_router_prefixes_same_level() {
3682 let router = DialMailboxRouter::new();
3683 router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
3684 router.bind(id!(world0[1]).into(), "unix!@2".parse().unwrap());
3685 router.bind(id!(world0[2]).into(), "unix!@3".parse().unwrap());
3686
3687 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3688 prefixes.sort();
3689
3690 let mut expected = vec![
3692 id!(world0[0]).into(),
3693 id!(world0[1]).into(),
3694 id!(world0[2]).into(),
3695 ];
3696 expected.sort();
3697
3698 assert_eq!(prefixes, expected);
3699 }
3700
3701 #[derive(Clone, Debug)]
3705 struct AsyncLoopForwarder;
3706
3707 impl MailboxSender for AsyncLoopForwarder {
3708 fn post_unchecked(
3709 &self,
3710 envelope: MessageEnvelope,
3711 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3712 ) {
3713 let me = self.clone();
3714 tokio::spawn(async move {
3715 me.post(envelope, return_handle);
3717 });
3718 }
3719 }
3720
3721 #[tokio::test]
3722 async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
3723 let actor_id = ActorId(
3724 ProcId::Ranked(id!(test_world), 0),
3725 "ttl_actor".to_string(),
3726 0,
3727 );
3728 let mailbox = Mailbox::new(
3729 actor_id.clone(),
3730 BoxedMailboxSender::new(AsyncLoopForwarder),
3731 );
3732 let (ret_port, mut ret_rx) = mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
3733
3734 let remote_actor = ActorId(
3737 ProcId::Ranked(id!(remote_world), 1),
3738 "remote".to_string(),
3739 0,
3740 );
3741 let dest = PortId(remote_actor.clone(), 4242);
3742
3743 let payload = 1234_u64;
3746 let envelope =
3747 MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Attrs::new())
3748 .expect("serialize");
3749
3750 let return_handle = ret_port.clone();
3753 mailbox.post(envelope, return_handle);
3754
3755 #[allow(clippy::disallowed_methods)]
3757 let Undeliverable(undelivered) =
3758 tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
3759 .await
3760 .expect("timed out waiting for undeliverable")
3761 .expect("channel closed");
3762
3763 let got: u64 = undelivered.deserialized().expect("deserialize");
3765 assert_eq!(got, payload, "payload preserved");
3766 }
3767
3768 #[tokio::test]
3769 async fn message_ttl_success_local_delivery() {
3770 let actor_id = ActorId(
3771 ProcId::Ranked(id!(test_world), 0),
3772 "ttl_actor".to_string(),
3773 0,
3774 );
3775 let mailbox = Mailbox::new(
3776 actor_id.clone(),
3777 BoxedMailboxSender::new(PanickingMailboxSender),
3778 );
3779 let (_undeliverable_tx, mut undeliverable_rx) =
3780 mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
3781
3782 let (user_port, mut user_rx) = mailbox.open_port::<u64>();
3784
3785 let payload = 0xC0FFEE_u64;
3787 let envelope = MessageEnvelope::serialize(
3788 actor_id.clone(),
3789 user_port.bind().port_id().clone(),
3790 &payload,
3791 Attrs::new(),
3792 )
3793 .expect("serialize");
3794
3795 let return_handle = mailbox
3798 .bound_return_handle()
3799 .unwrap_or(monitored_return_handle());
3800 mailbox.post(envelope, return_handle);
3801
3802 #[allow(clippy::disallowed_methods)]
3804 let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
3805 .await
3806 .expect("timed out waiting for local delivery")
3807 .expect("user port closed");
3808 assert_eq!(got, payload);
3809
3810 #[allow(clippy::disallowed_methods)]
3812 let no_undeliverable =
3813 tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
3814 assert!(
3815 no_undeliverable.is_err(),
3816 "unexpected undeliverable returned on successful local delivery"
3817 );
3818 }
3819
3820 #[tokio::test]
3821 async fn test_port_contramap() {
3822 let mbox = Mailbox::new_detached(id!(test[0].test));
3823 let (handle, mut rx) = mbox.open_port();
3824
3825 handle
3826 .contramap(|m| (1, m))
3827 .send("hello".to_string())
3828 .unwrap();
3829 assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
3830 }
3831
3832 #[test]
3833 #[should_panic(expected = "already bound")]
3834 fn test_bind_port_handle_to_actor_port_twice() {
3835 let mbox = Mailbox::new_detached(id!(test[0].test));
3836 let (handle, _rx) = mbox.open_port::<String>();
3837 handle.bind_actor_port();
3838 handle.bind_actor_port();
3839 }
3840
3841 #[test]
3842 fn test_bind_port_handle_to_actor_port() {
3843 let mbox = Mailbox::new_detached(id!(test[0].test));
3844 let default_port = mbox.actor_id().port_id(String::port());
3845 let (handle, _rx) = mbox.open_port::<String>();
3846 assert_ne!(default_port.index(), handle.port_index);
3848 handle.bind_actor_port();
3850 assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
3851 handle.bind();
3853 handle.bind();
3854 assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
3855 }
3856
3857 #[test]
3858 #[should_panic(expected = "already bound")]
3859 fn test_bind_port_handle_to_actor_port_when_already_bound() {
3860 let mbox = Mailbox::new_detached(id!(test[0].test));
3861 let (handle, _rx) = mbox.open_port::<String>();
3862 handle.bind();
3864 assert_matches!(handle.location(), PortLocation::Bound(port) if port.index() == handle.port_index);
3865 handle.bind_actor_port();
3867 }
3868}