1#![allow(dead_code)] use std::any::Any;
69use std::collections::BTreeMap;
70use std::collections::BTreeSet;
71use std::fmt;
72use std::fmt::Debug;
73use std::future;
74use std::future::Future;
75use std::ops::Bound::Excluded;
76use std::pin::Pin;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::sync::Mutex;
80use std::sync::OnceLock;
81use std::sync::RwLock;
82use std::sync::Weak;
83use std::sync::atomic::AtomicU64;
84use std::sync::atomic::AtomicUsize;
85use std::sync::atomic::Ordering;
86use std::task::Context;
87use std::task::Poll;
88
89use async_trait::async_trait;
90use dashmap::DashMap;
91use dashmap::mapref::entry::Entry;
92use futures::Sink;
93use futures::Stream;
94use hyperactor_config::attrs::Attrs;
95use serde::Deserialize;
96use serde::Serialize;
97use serde::de::DeserializeOwned;
98use tokio::sync::mpsc;
99use tokio::sync::oneshot;
100use tokio::sync::watch;
101use tokio::task::JoinHandle;
102use tokio_util::sync::CancellationToken;
103use typeuri::Named;
104
105use crate as hyperactor; use crate::OncePortRef;
107use crate::PortRef;
108use crate::accum::Accumulator;
109use crate::accum::ReducerOpts;
110use crate::accum::ReducerSpec;
111use crate::actor::Signal;
112use crate::actor::remote::USER_PORT_OFFSET;
113use crate::channel;
114use crate::channel::ChannelAddr;
115use crate::channel::ChannelError;
116use crate::channel::SendError;
117use crate::channel::TxStatus;
118use crate::context;
119use crate::id;
120use crate::metrics;
121use crate::reference::ActorId;
122use crate::reference::PortId;
123use crate::reference::Reference;
124
125mod undeliverable;
126pub use undeliverable::Undeliverable;
128pub use undeliverable::UndeliverableMessageError;
129pub use undeliverable::custom_monitored_return_handle;
130pub use undeliverable::monitored_return_handle; pub use undeliverable::supervise_undeliverable_messages;
132pub use undeliverable::supervise_undeliverable_messages_with;
133pub mod mailbox_admin_message;
135pub use mailbox_admin_message::MailboxAdminMessage;
136pub use mailbox_admin_message::MailboxAdminMessageHandler;
137pub mod durable_mailbox_sender;
139pub use durable_mailbox_sender::log;
140use durable_mailbox_sender::log::*;
141pub mod headers;
143
144pub trait Message: Send + Sync + 'static {}
147impl<M: Send + Sync + 'static> Message for M {}
148
149pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
153
154impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
155
156pub type Data = Vec<u8>;
158
159#[derive(
161 thiserror::Error,
162 Debug,
163 Serialize,
164 Deserialize,
165 typeuri::Named,
166 Clone,
167 PartialEq,
168 Eq
169)]
170pub enum DeliveryError {
171 #[error("address not routable: {0}")]
173 Unroutable(String),
174
175 #[error("broken link: {0}")]
178 BrokenLink(String),
179
180 #[error("mailbox error: {0}")]
182 Mailbox(String),
183
184 #[error("multicast error: {0}")]
186 Multicast(String),
187
188 #[error("ttl expired")]
190 TtlExpired,
191}
192
193#[derive(Debug, Serialize, Deserialize, Clone, typeuri::Named)]
197pub struct MessageEnvelope {
198 sender: ActorId,
200
201 dest: PortId,
203
204 data: wirevalue::Any,
206
207 errors: Vec<DeliveryError>,
209
210 headers: Attrs,
212
213 ttl: u8,
215
216 return_undeliverable: bool,
219 }
221wirevalue::register_type!(MessageEnvelope);
222
223impl MessageEnvelope {
224 pub fn new(sender: ActorId, dest: PortId, data: wirevalue::Any, headers: Attrs) -> Self {
226 Self {
227 sender,
228 dest,
229 data,
230 errors: Vec::new(),
231 headers,
232 ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
233 return_undeliverable: true,
235 }
236 }
237
238 pub(crate) fn new_unknown(dest: PortId, data: wirevalue::Any) -> Self {
240 Self::new(id!(unknown[0].unknown), dest, data, Attrs::new())
241 }
242
243 pub fn serialize<T: Serialize + Named>(
245 source: ActorId,
246 dest: PortId,
247 value: &T,
248 headers: Attrs,
249 ) -> Result<Self, wirevalue::Error> {
250 Ok(Self {
251 headers,
252 data: wirevalue::Any::serialize(value)?,
253 sender: source,
254 dest,
255 errors: Vec::new(),
256 ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
257 return_undeliverable: true,
259 })
260 }
261
262 pub fn ttl(&self) -> u8 {
268 self.ttl
269 }
270
271 pub fn set_ttl(mut self, ttl: u8) -> Self {
281 self.ttl = ttl;
282 self
283 }
284
285 fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
293 if self.ttl == 0 {
294 Err(DeliveryError::TtlExpired)
295 } else {
296 self.ttl -= 1;
297 Ok(())
298 }
299 }
300
301 pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
303 self.data.deserialized()
304 }
305
306 pub fn data(&self) -> &wirevalue::Any {
308 &self.data
309 }
310
311 pub fn sender(&self) -> &ActorId {
313 &self.sender
314 }
315
316 pub fn dest(&self) -> &PortId {
318 &self.dest
319 }
320
321 pub fn headers(&self) -> &Attrs {
323 &self.headers
324 }
325
326 pub fn is_signal(&self) -> bool {
328 self.dest.index() == Signal::port()
329 }
330
331 pub fn set_error(&mut self, error: DeliveryError) {
334 self.errors.push(error)
335 }
336
337 pub fn update_sender(&mut self, sender: ActorId) {
341 self.sender = sender;
342 }
343
344 pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
348 self.return_undeliverable = return_undeliverable;
349 }
350
351 pub fn undeliverable(
355 mut self,
356 error: DeliveryError,
357 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
358 ) {
359 tracing::error!(
360 name = "undelivered_message_attempt",
361 sender = self.sender.to_string(),
362 dest = self.dest.to_string(),
363 error = error.to_string(),
364 return_handle = %return_handle,
365 );
366 metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
367 1,
368 hyperactor_telemetry::kv_pairs!(
369 "sender_actor_id" => self.sender.to_string(),
370 "dest_actor_id" => self.dest.to_string(),
371 "message_type" => self.data.typename().unwrap_or("unknown"),
372 "error_type" => error.to_string(),
373 ),
374 );
375
376 self.set_error(error);
377 undeliverable::return_undeliverable(return_handle, self);
378 }
379
380 pub fn errors(&self) -> &Vec<DeliveryError> {
383 &self.errors
384 }
385
386 pub fn error_msg(&self) -> Option<String> {
390 if self.errors.is_empty() {
391 None
392 } else {
393 Some(
394 self.errors
395 .iter()
396 .map(|e| e.to_string())
397 .collect::<Vec<_>>()
398 .join("; "),
399 )
400 }
401 }
402
403 fn open(self) -> (MessageMetadata, wirevalue::Any) {
404 let Self {
405 sender,
406 dest,
407 data,
408 errors,
409 headers,
410 ttl,
411 return_undeliverable,
412 } = self;
413
414 (
415 MessageMetadata {
416 sender,
417 dest,
418 errors,
419 headers,
420 ttl,
421 return_undeliverable,
422 },
423 data,
424 )
425 }
426
427 fn seal(metadata: MessageMetadata, data: wirevalue::Any) -> Self {
428 let MessageMetadata {
429 sender,
430 dest,
431 errors,
432 headers,
433 ttl,
434 return_undeliverable,
435 } = metadata;
436
437 Self {
438 sender,
439 dest,
440 data,
441 errors,
442 headers,
443 ttl,
444 return_undeliverable,
445 }
446 }
447
448 fn return_undeliverable(&self) -> bool {
449 self.return_undeliverable
450 }
451}
452
453impl fmt::Display for MessageEnvelope {
454 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
455 match &self.error_msg() {
456 None => write!(
457 f,
458 "{} > {}: {} {{{}}}",
459 self.sender, self.dest, self.data, self.headers
460 ),
461 Some(err) => write!(
462 f,
463 "{} > {}: {} {{{}}}: delivery error: {}",
464 self.sender, self.dest, self.data, self.headers, err
465 ),
466 }
467 }
468}
469
470#[derive(Clone)]
472pub struct MessageMetadata {
473 sender: ActorId,
474 dest: PortId,
475 errors: Vec<DeliveryError>,
476 headers: Attrs,
477 ttl: u8,
478 return_undeliverable: bool,
479}
480
481#[derive(Debug)]
484pub struct MailboxError {
485 actor_id: ActorId,
486 kind: MailboxErrorKind,
487}
488
489#[derive(thiserror::Error, Debug)]
492#[non_exhaustive]
493pub enum MailboxErrorKind {
494 #[error("mailbox closed")]
496 Closed,
497
498 #[error("invalid port: {0}")]
500 InvalidPort(PortId),
501
502 #[error("no sender for port: {0}")]
504 NoSenderForPort(PortId),
505
506 #[error("no local sender for port: {0}")]
509 NoLocalSenderForPort(PortId),
510
511 #[error("{0}: port closed")]
513 PortClosed(PortId),
514
515 #[error("send {0}: {1}")]
517 Send(PortId, #[source] anyhow::Error),
518
519 #[error("recv {0}: {1}")]
521 Recv(PortId, #[source] anyhow::Error),
522
523 #[error("serialize: {0}")]
525 Serialize(#[source] anyhow::Error),
526
527 #[error("deserialize {0}: {1}")]
529 Deserialize(&'static str, anyhow::Error),
530
531 #[error(transparent)]
533 Channel(#[from] ChannelError),
534}
535
536impl MailboxError {
537 pub fn new(actor_id: ActorId, kind: MailboxErrorKind) -> Self {
540 Self { actor_id, kind }
541 }
542
543 pub fn actor_id(&self) -> &ActorId {
545 &self.actor_id
546 }
547
548 pub fn kind(&self) -> &MailboxErrorKind {
550 &self.kind
551 }
552}
553
554impl fmt::Display for MailboxError {
555 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556 write!(f, "{}: ", self.actor_id)?;
557 fmt::Display::fmt(&self.kind, f)
558 }
559}
560
561impl std::error::Error for MailboxError {
562 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
563 self.kind.source()
564 }
565}
566
567#[derive(Debug, Clone)]
571pub enum PortLocation {
572 Bound(PortId),
574 Unbound(ActorId, &'static str),
576}
577
578impl PortLocation {
579 fn new_unbound<M: Message>(actor_id: ActorId) -> Self {
580 PortLocation::Unbound(actor_id, std::any::type_name::<M>())
581 }
582
583 fn new_unbound_type(actor_id: ActorId, ty: &'static str) -> Self {
584 PortLocation::Unbound(actor_id, ty)
585 }
586
587 pub fn actor_id(&self) -> &ActorId {
589 match self {
590 PortLocation::Bound(port_id) => port_id.actor_id(),
591 PortLocation::Unbound(actor_id, _) => actor_id,
592 }
593 }
594}
595
596impl fmt::Display for PortLocation {
597 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598 match self {
599 PortLocation::Bound(port_id) => write!(f, "{}", port_id),
600 PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
601 }
602 }
603}
604
605#[derive(Debug)]
608pub struct MailboxSenderError {
609 location: Box<PortLocation>,
610 kind: Box<MailboxSenderErrorKind>,
611}
612
613#[derive(thiserror::Error, Debug)]
615pub enum MailboxSenderErrorKind {
616 #[error("serialization error: {0}")]
618 Serialize(anyhow::Error),
619
620 #[error("deserialization error for type {0}: {1}")]
622 Deserialize(&'static str, anyhow::Error),
623
624 #[error("invalid port")]
626 Invalid,
627
628 #[error("port closed")]
630 Closed,
631
632 #[error(transparent)]
635 Mailbox(#[from] MailboxError),
636
637 #[error(transparent)]
639 Channel(#[from] ChannelError),
640
641 #[error(transparent)]
643 MessageLog(#[from] MessageLogError),
644
645 #[error("send error: {0}")]
647 Other(#[from] anyhow::Error),
648
649 #[error("unreachable: {0}")]
651 Unreachable(anyhow::Error),
652}
653
654impl MailboxSenderError {
655 pub fn new_unbound<M>(actor_id: ActorId, kind: MailboxSenderErrorKind) -> Self {
657 Self {
658 location: Box::new(PortLocation::Unbound(actor_id, std::any::type_name::<M>())),
659 kind: Box::new(kind),
660 }
661 }
662
663 pub fn new_unbound_type(
665 actor_id: ActorId,
666 kind: MailboxSenderErrorKind,
667 ty: &'static str,
668 ) -> Self {
669 Self {
670 location: Box::new(PortLocation::Unbound(actor_id, ty)),
671 kind: Box::new(kind),
672 }
673 }
674
675 pub fn new_bound(port_id: PortId, kind: MailboxSenderErrorKind) -> Self {
677 Self {
678 location: Box::new(PortLocation::Bound(port_id)),
679 kind: Box::new(kind),
680 }
681 }
682
683 pub fn location(&self) -> &PortLocation {
685 &self.location
686 }
687
688 pub fn kind(&self) -> &MailboxSenderErrorKind {
690 &self.kind
691 }
692}
693
694impl fmt::Display for MailboxSenderError {
695 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
696 write!(f, "{}: ", self.location)?;
697 fmt::Display::fmt(&self.kind, f)
698 }
699}
700
701impl std::error::Error for MailboxSenderError {
702 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
703 self.kind.source()
704 }
705}
706
707pub trait MailboxSender: Send + Sync + Any {
710 fn post(
713 &self,
714 mut envelope: MessageEnvelope,
715 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
716 ) {
717 if let Err(err) = envelope.dec_ttl_or_err() {
718 envelope.undeliverable(err, return_handle);
719 return;
720 }
721 self.post_unchecked(envelope, return_handle);
722 }
723
724 fn post_unchecked(
726 &self,
727 envelope: MessageEnvelope,
728 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
729 );
730}
731
732pub trait PortSender: MailboxSender {
735 fn serialize_and_send<M: RemoteMessage>(
737 &self,
738 port: &PortRef<M>,
739 message: M,
740 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
741 ) -> Result<(), MailboxSenderError> {
742 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
744 MailboxSenderError::new_bound(
745 port.port_id().clone(),
746 MailboxSenderErrorKind::Serialize(err.into()),
747 )
748 })?;
749 self.post(
750 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
751 return_handle,
752 );
753 Ok(())
754 }
755
756 fn serialize_and_send_once<M: RemoteMessage>(
759 &self,
760 once_port: OncePortRef<M>,
761 message: M,
762 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
763 ) -> Result<(), MailboxSenderError> {
764 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
765 MailboxSenderError::new_bound(
766 once_port.port_id().clone(),
767 MailboxSenderErrorKind::Serialize(err.into()),
768 )
769 })?;
770 self.post(
771 MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
772 return_handle,
773 );
774 Ok(())
775 }
776}
777
778impl<T: ?Sized + MailboxSender> PortSender for T {}
779
780#[derive(Debug, Clone)]
784pub struct PanickingMailboxSender;
785
786impl MailboxSender for PanickingMailboxSender {
787 fn post_unchecked(
788 &self,
789 envelope: MessageEnvelope,
790 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
791 ) {
792 panic!("panic! in the mailbox! attempted post: {}", envelope)
793 }
794}
795
796#[derive(Debug)]
799pub struct UndeliverableMailboxSender;
800
801impl MailboxSender for UndeliverableMailboxSender {
802 fn post_unchecked(
803 &self,
804 envelope: MessageEnvelope,
805 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
806 ) {
807 let sender_name = envelope.sender.name();
808 let error_str = envelope.error_msg().unwrap_or("".to_string());
809 tracing::error!(
812 name = "undelivered_message_abandoned",
813 actor_name = sender_name,
814 actor_id = envelope.sender.to_string(),
815 dest = envelope.dest.to_string(),
816 headers = envelope.headers().to_string(), data = envelope.data().to_string(),
818 return_handle = %return_handle,
819 "message not delivered, {}",
820 error_str,
821 );
822 }
823}
824
825struct Buffer<T: Message> {
826 queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
827 processed: watch::Receiver<usize>,
828 seq: AtomicUsize,
829}
830
831impl<T: Message> Buffer<T> {
832 fn new<Fut>(
833 process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
834 ) -> Self
835 where
836 Fut: Future<Output = ()> + Send + 'static,
837 {
838 let (queue, mut next) = mpsc::unbounded_channel();
839 let (last_processed, processed) = watch::channel(0);
840 crate::init::get_runtime().spawn(async move {
841 let mut seq = 0;
842 while let Some((msg, return_handle)) = next.recv().await {
843 process(msg, return_handle).await;
844 seq += 1;
845 let _ = last_processed.send(seq);
846 }
847 });
848 Self {
849 queue,
850 processed,
851 seq: AtomicUsize::new(0),
852 }
853 }
854
855 #[allow(clippy::result_large_err)]
856 fn send(
857 &self,
858 item: (T, PortHandle<Undeliverable<T>>),
859 ) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
860 self.seq.fetch_add(1, Ordering::SeqCst);
861 self.queue.send(item)?;
862 Ok(())
863 }
864
865 async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
866 let seq = self.seq.load(Ordering::SeqCst);
867 while *self.processed.borrow_and_update() < seq {
868 self.processed.changed().await?;
869 }
870 Ok(())
871 }
872}
873
874static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
875 LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
876
877#[derive(Clone)]
883pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
884
885impl fmt::Debug for BoxedMailboxSender {
886 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
887 f.debug_struct("BoxedMailboxSender")
888 .field("sender", &"<dyn MailboxSender>")
889 .finish()
890 }
891}
892
893impl BoxedMailboxSender {
894 pub fn new(sender: impl MailboxSender + 'static) -> Self {
896 Self(Arc::new(sender))
897 }
898
899 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
902 (&*self.0 as &dyn Any).downcast_ref::<T>()
903 }
904}
905
906pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
908 fn boxed(&self) -> BoxedMailboxSender;
910}
911impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
912 fn boxed(&self) -> BoxedMailboxSender {
913 BoxedMailboxSender::new(self.clone())
914 }
915}
916
917pub trait IntoBoxedMailboxSender: MailboxSender {
919 fn into_boxed(self) -> BoxedMailboxSender;
921}
922impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
923 fn into_boxed(self) -> BoxedMailboxSender {
924 BoxedMailboxSender::new(self)
925 }
926}
927
928impl MailboxSender for BoxedMailboxSender {
929 fn post_unchecked(
930 &self,
931 envelope: MessageEnvelope,
932 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
933 ) {
934 self.0.post_unchecked(envelope, return_handle);
935 }
936}
937
938#[derive(thiserror::Error, Debug)]
940pub enum MailboxServerError {
941 #[error(transparent)]
943 Channel(#[from] ChannelError),
944
945 #[error(transparent)]
947 MailboxSender(#[from] MailboxSenderError),
948}
949
950#[derive(Debug)]
953pub struct MailboxServerHandle {
954 join_handle: JoinHandle<Result<(), MailboxServerError>>,
955 stopped_tx: watch::Sender<bool>,
956}
957
958impl MailboxServerHandle {
959 pub fn stop(&self, reason: &str) {
964 tracing::info!("stopping mailbox server; reason: {}", reason);
965 self.stopped_tx.send(true).expect("stop called twice");
966 }
967}
968
969impl Future for MailboxServerHandle {
971 type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
972
973 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
974 let join_handle_pinned =
976 unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
977 join_handle_pinned.poll(cx)
978 }
979}
980
981fn server_return_handle<T: MailboxServer>(server: T) -> PortHandle<Undeliverable<MessageEnvelope>> {
986 let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
987
988 tokio::task::spawn(async move {
989 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
990 if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
991 {
992 UndeliverableMailboxSender.post(e, monitored_return_handle());
994 continue;
995 }
996 envelope.set_error(DeliveryError::BrokenLink(
997 "message was undeliverable".to_owned(),
998 ));
999 server.post(
1000 MessageEnvelope::new(
1001 envelope.sender().clone(),
1002 PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1003 envelope.sender(),
1004 )
1005 .port_id()
1006 .clone(),
1007 wirevalue::Any::serialize(&Undeliverable(envelope)).unwrap(),
1008 Attrs::new(),
1009 ),
1010 monitored_return_handle(),
1011 );
1012 }
1013 });
1014
1015 return_handle
1016}
1017
1018pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
1021 fn serve(
1025 self,
1026 mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
1027 ) -> MailboxServerHandle {
1028 let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
1033 let server = self.clone();
1034 tokio::task::spawn(async move {
1035 while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
1036 if let Ok(Undeliverable(e)) =
1037 envelope.deserialized::<Undeliverable<MessageEnvelope>>()
1038 {
1039 UndeliverableMailboxSender.post(e, monitored_return_handle());
1041 continue;
1042 }
1043 envelope.set_error(DeliveryError::BrokenLink(
1044 "message was undeliverable".to_owned(),
1045 ));
1046 server.post(
1047 MessageEnvelope::new(
1048 envelope.sender().clone(),
1049 PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1050 envelope.sender(),
1051 )
1052 .port_id()
1053 .clone(),
1054 wirevalue::Any::serialize(&Undeliverable(envelope)).unwrap(),
1055 Attrs::new(),
1056 ),
1057 monitored_return_handle(),
1058 );
1059 }
1060 });
1061
1062 let (stopped_tx, mut stopped_rx) = watch::channel(false);
1063 let join_handle = tokio::spawn(async move {
1064 let mut detached = false;
1065
1066 loop {
1067 if *stopped_rx.borrow_and_update() {
1068 break Ok(());
1069 }
1070
1071 tokio::select! {
1072 message = rx.recv() => {
1073 match message {
1074 Ok(envelope) => self.post(envelope, return_handle.clone()),
1076
1077 Err(ChannelError::Closed) => break Ok(()),
1080 Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1081 }
1082 }
1083 result = stopped_rx.changed(), if !detached => {
1084 detached = result.is_err();
1085 if detached {
1086 tracing::debug!(
1087 "the mailbox server is detached for Rx {}", rx.addr()
1088 );
1089 } else {
1090 tracing::debug!(
1091 "the mailbox server is stopped for Rx {}", rx.addr()
1092 );
1093 }
1094 }
1095 }
1096 }
1097 });
1098
1099 MailboxServerHandle {
1100 join_handle,
1101 stopped_tx,
1102 }
1103 }
1104}
1105
1106impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1107
1108pub struct MailboxClient {
1110 buffer: Buffer<MessageEnvelope>,
1112
1113 _tx_monitoring: CancellationToken,
1115}
1116
1117impl fmt::Debug for MailboxClient {
1118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1119 f.debug_struct("MailboxClient")
1120 .field("buffer", &"<Buffer>")
1121 .finish()
1122 }
1123}
1124
1125impl MailboxClient {
1126 pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1129 let addr = tx.addr();
1130 let tx = Arc::new(tx);
1131 let tx_status = tx.status().clone();
1132 let tx_monitoring = CancellationToken::new();
1133 let buffer = Buffer::new(move |envelope, return_handle| {
1134 let tx = Arc::clone(&tx);
1135 let (return_channel, return_receiver) =
1136 oneshot::channel::<SendError<MessageEnvelope>>();
1137 let return_handle_0 = return_handle.clone();
1139 tokio::spawn(async move {
1140 if let Ok(SendError {
1141 error,
1142 message,
1143 reason,
1144 }) = return_receiver.await
1145 {
1146 message.undeliverable(
1147 DeliveryError::BrokenLink(format!(
1148 "failed to enqueue in MailboxClient when processing buffer: {error} with reason {reason:?}"
1149 )),
1150 return_handle_0,
1151 );
1152 }
1153 });
1154 tx.try_post(envelope, return_channel);
1156 future::ready(())
1157 });
1158 let this = Self {
1159 buffer,
1160 _tx_monitoring: tx_monitoring.clone(),
1161 };
1162 Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1163 this
1164 }
1165
1166 pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1169 Ok(MailboxClient::new(channel::dial(addr)?))
1170 }
1171
1172 fn monitor_tx_health(
1174 mut rx: watch::Receiver<TxStatus>,
1175 cancel_token: CancellationToken,
1176 addr: ChannelAddr,
1177 ) {
1178 crate::init::get_runtime().spawn(async move {
1179 loop {
1180 tokio::select! {
1181 changed = rx.changed() => {
1182 if changed.is_err() || *rx.borrow() == TxStatus::Closed {
1183 tracing::warn!("connection to {} lost", addr);
1184 break;
1187 }
1188 }
1189 _ = cancel_token.cancelled() => {
1190 break;
1191 }
1192 }
1193 }
1194 });
1195 }
1196}
1197
1198impl MailboxSender for MailboxClient {
1199 #[hyperactor::instrument_infallible]
1200 fn post_unchecked(
1201 &self,
1202 envelope: MessageEnvelope,
1203 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1204 ) {
1205 tracing::event!(target:"messages", tracing::Level::TRACE, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1206 if let Err(mpsc::error::SendError((envelope, return_handle))) =
1207 self.buffer.send((envelope, return_handle))
1208 {
1209 let err = DeliveryError::BrokenLink(
1210 "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
1211 );
1212
1213 envelope.undeliverable(err, return_handle);
1215 }
1216 }
1217}
1218
1219pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1221 cx: C,
1222 port: PortRef<M>,
1223}
1224
1225impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1226 pub fn new(cx: C, port: PortRef<M>) -> Self {
1228 Self { cx, port }
1229 }
1230}
1231
1232impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1233 type Error = MailboxSenderError;
1234
1235 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1236 Poll::Ready(Ok(()))
1237 }
1238
1239 fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1240 self.port.send(&self.cx, item)
1241 }
1242
1243 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1244 Poll::Ready(Ok(()))
1245 }
1246
1247 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1248 Poll::Ready(Ok(()))
1249 }
1250}
1251
1252#[derive(Clone, Debug)]
1255pub struct Mailbox {
1256 inner: Arc<State>,
1257}
1258
1259impl Mailbox {
1260 pub fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
1263 Self {
1264 inner: Arc::new(State::new(actor_id, forwarder)),
1265 }
1266 }
1267
1268 pub fn new_detached(actor_id: ActorId) -> Self {
1270 Self {
1271 inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1272 }
1273 }
1274
1275 pub fn actor_id(&self) -> &ActorId {
1277 &self.inner.actor_id
1278 }
1279
1280 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1285 let port_index = self.inner.allocate_port();
1286 let (sender, receiver) = mpsc::unbounded_channel::<M>();
1287 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1288 tracing::trace!(
1289 name = "open_port",
1290 "opening port for {} at {}",
1291 self.inner.actor_id,
1292 port_id
1293 );
1294 (
1295 PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1296 PortReceiver::new(receiver, port_id, false, self.clone()),
1297 )
1298 }
1299
1300 pub(crate) fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1307 let (handle, receiver) = self.open_port();
1308 handle.bind_actor_port();
1309 (handle, receiver)
1310 }
1311
1312 pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1315 where
1316 A: Accumulator + Send + Sync + 'static,
1317 A::Update: Message,
1318 A::State: Message + Default + Clone,
1319 {
1320 self.open_accum_port_opts(accum, None)
1321 }
1322
1323 pub fn open_accum_port_opts<A>(
1331 &self,
1332 accum: A,
1333 reducer_opts: Option<ReducerOpts>,
1334 ) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1335 where
1336 A: Accumulator + Send + Sync + 'static,
1337 A::Update: Message,
1338 A::State: Message + Default + Clone,
1339 {
1340 let port_index = self.inner.allocate_port();
1341 let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1342 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1343 let state = Mutex::new(A::State::default());
1344 let reducer_spec = accum.reducer_spec();
1345 let enqueue = move |_, update: A::Update| {
1346 let mut state = state.lock().unwrap();
1347 accum.accumulate(&mut state, update)?;
1348 let _ = sender.send(state.clone());
1349 Ok(())
1350 };
1351 (
1352 PortHandle {
1353 mailbox: self.clone(),
1354 port_index,
1355 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1356 bound: Arc::new(OnceLock::new()),
1357 reducer_spec,
1358 reducer_opts,
1359 },
1360 PortReceiver::new(receiver, port_id, true, self.clone()),
1361 )
1362 }
1363
1364 pub(crate) fn open_enqueue_port<M: Message>(
1368 &self,
1369 enqueue: impl Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1370 ) -> PortHandle<M> {
1371 PortHandle {
1372 mailbox: self.clone(),
1373 port_index: self.inner.allocate_port(),
1374 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1375 bound: Arc::new(OnceLock::new()),
1376 reducer_spec: None,
1377 reducer_opts: None,
1378 }
1379 }
1380
1381 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1385 let port_index = self.inner.allocate_port();
1386 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1387 let (sender, receiver) = oneshot::channel::<M>();
1388 (
1389 OncePortHandle {
1390 mailbox: self.clone(),
1391 port_index,
1392 port_id: port_id.clone(),
1393 sender,
1394 },
1395 OncePortReceiver {
1396 receiver: Some(receiver),
1397 port_id,
1398 mailbox: self.clone(),
1399 },
1400 )
1401 }
1402
1403 fn error(&self, err: MailboxErrorKind) -> MailboxError {
1404 MailboxError::new(self.inner.actor_id.clone(), err)
1405 }
1406
1407 fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1408 let port_index = M::port();
1409 self.inner.ports.get(&port_index).and_then(|boxed| {
1410 boxed
1411 .as_any()
1412 .downcast_ref::<UnboundedSender<M>>()
1413 .map(|s| {
1414 assert_eq!(
1415 s.port_id,
1416 self.actor_id().port_id(port_index),
1417 "port_id mismatch in downcasted UnboundedSender"
1418 );
1419 s.sender.clone()
1420 })
1421 })
1422 }
1423
1424 pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1426 self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1427 .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1428 }
1429
1430 pub(crate) fn allocate_port(&self) -> u64 {
1431 self.inner.allocate_port()
1432 }
1433
1434 fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
1435 assert_eq!(
1436 handle.mailbox.actor_id(),
1437 self.actor_id(),
1438 "port does not belong to mailbox"
1439 );
1440
1441 let port_id = self.actor_id().port_id(handle.port_index);
1444 match self.inner.ports.entry(handle.port_index) {
1445 Entry::Vacant(entry) => {
1446 entry.insert(Box::new(UnboundedSender::new(
1447 handle.sender.clone(),
1448 port_id.clone(),
1449 )));
1450 }
1451 Entry::Occupied(_entry) => {}
1452 }
1453
1454 PortRef::attest(port_id)
1455 }
1456
1457 fn bind_to_actor_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
1458 assert_eq!(
1459 handle.mailbox.actor_id(),
1460 self.actor_id(),
1461 "port does not belong to mailbox"
1462 );
1463
1464 let port_index = M::port();
1465 let port_id = self.actor_id().port_id(port_index);
1466 match self.inner.ports.entry(port_index) {
1467 Entry::Vacant(entry) => {
1468 entry.insert(Box::new(UnboundedSender::new(
1469 handle.sender.clone(),
1470 port_id,
1471 )));
1472 }
1473 Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1474 }
1475 }
1476
1477 fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1478 let port_id = handle.port_id().clone();
1479 match self.inner.ports.entry(handle.port_index) {
1480 Entry::Vacant(entry) => {
1481 entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1482 }
1483 Entry::Occupied(_entry) => {}
1484 }
1485 }
1486
1487 pub(crate) fn bind_untyped(&self, port_id: &PortId, sender: UntypedUnboundedSender) {
1488 assert_eq!(
1489 port_id.actor_id(),
1490 self.actor_id(),
1491 "port does not belong to mailbox"
1492 );
1493
1494 match self.inner.ports.entry(port_id.index()) {
1495 Entry::Vacant(entry) => {
1496 entry.insert(Box::new(sender));
1497 }
1498 Entry::Occupied(_entry) => {}
1499 }
1500 }
1501}
1502
1503impl context::Mailbox for Mailbox {
1504 fn mailbox(&self) -> &Mailbox {
1505 self
1506 }
1507}
1508
1509pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1514 cx.mailbox().open_port()
1515}
1516
1517pub fn open_once_port<M: Message>(
1520 cx: &impl context::Mailbox,
1521) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1522 cx.mailbox().open_once_port()
1523}
1524
1525impl MailboxSender for Mailbox {
1526 fn post_unchecked(
1529 &self,
1530 envelope: MessageEnvelope,
1531 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1532 ) {
1533 metrics::MAILBOX_POSTS.add(
1534 1,
1535 hyperactor_telemetry::kv_pairs!(
1536 "actor_id" => envelope.sender.to_string(),
1537 "dest_actor_id" => envelope.dest.0.to_string(),
1538 ),
1539 );
1540 tracing::trace!(
1541 name = "post",
1542 actor_name = envelope.sender.name(),
1543 actor_id = envelope.sender.to_string(),
1544 "posting message to {}",
1545 envelope.dest
1546 );
1547
1548 if envelope.dest().actor_id() != &self.inner.actor_id {
1549 return self.inner.forwarder.post(envelope, return_handle);
1550 }
1551
1552 match self.inner.ports.entry(envelope.dest().index()) {
1553 Entry::Vacant(_) => {
1554 let err = DeliveryError::Unroutable(format!(
1555 "port not bound in mailbox; port id: {}; message type: {}",
1556 envelope.dest().index(),
1557 envelope.data().typename().map_or_else(
1558 || format!("unregistered type hash {}", envelope.data().typehash()),
1559 |s| s.to_string(),
1560 )
1561 ));
1562
1563 envelope.undeliverable(err, return_handle);
1564 }
1565 Entry::Occupied(entry) => {
1566 let (metadata, data) = envelope.open();
1567 let MessageMetadata {
1568 headers,
1569 sender,
1570 dest,
1571 errors: metadata_errors,
1572 ttl,
1573 return_undeliverable,
1574 } = metadata;
1575
1576 match entry.get().send_serialized(headers, data) {
1584 Ok(false) => {
1585 entry.remove();
1586 }
1587 Ok(true) => (),
1588 Err(SerializedSenderError {
1589 data,
1590 error: sender_error,
1591 headers,
1592 }) => {
1593 let err = DeliveryError::Mailbox(format!("{}", sender_error));
1594
1595 MessageEnvelope::seal(
1596 MessageMetadata {
1597 headers,
1598 sender,
1599 dest,
1600 errors: metadata_errors,
1601 ttl,
1602 return_undeliverable,
1603 },
1604 data,
1605 )
1606 .undeliverable(err, return_handle)
1607 }
1608 }
1609 }
1610 }
1611 }
1612}
1613
1614#[derive(Debug)]
1622pub struct PortHandle<M: Message> {
1623 mailbox: Mailbox,
1624 port_index: u64,
1625 sender: UnboundedPortSender<M>,
1626 bound: Arc<OnceLock<PortId>>,
1633 reducer_spec: Option<ReducerSpec>,
1636 reducer_opts: Option<ReducerOpts>,
1638}
1639
1640impl<M: Message> PortHandle<M> {
1641 fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1642 Self {
1643 mailbox,
1644 port_index,
1645 sender,
1646 bound: Arc::new(OnceLock::new()),
1647 reducer_spec: None,
1648 reducer_opts: None,
1649 }
1650 }
1651
1652 fn location(&self) -> PortLocation {
1653 match self.bound.get() {
1654 Some(port_id) => PortLocation::Bound(port_id.clone()),
1655 None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1656 }
1657 }
1658
1659 pub fn send(&self, message: M) -> Result<(), MailboxSenderError> {
1661 let mut headers = Attrs::new();
1662
1663 crate::mailbox::headers::set_send_timestamp(&mut headers);
1664 crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1665
1666 self.sender.send(headers, message).map_err(|err| {
1667 MailboxSenderError::new_unbound::<M>(
1668 self.mailbox.actor_id().clone(),
1669 MailboxSenderErrorKind::Other(err),
1670 )
1671 })
1672 }
1673
1674 pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
1677 where
1678 R: Message,
1679 F: Fn(R) -> M + Send + Sync + 'static,
1680 {
1681 let port_index = self.mailbox.inner.allocate_port();
1682 let sender = self.sender.clone();
1683 PortHandle::new(
1684 self.mailbox.clone(),
1685 port_index,
1686 UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
1687 sender.send(headers, unmap(value))
1688 })),
1689 )
1690 }
1691}
1692
1693impl<M: RemoteMessage> PortHandle<M> {
1694 pub fn bind(&self) -> PortRef<M> {
1696 PortRef::attest_reducible(
1697 self.bound
1698 .get_or_init(|| self.mailbox.bind(self).port_id().clone())
1699 .clone(),
1700 self.reducer_spec.clone(),
1701 self.reducer_opts.clone(),
1702 )
1703 }
1704
1705 pub(crate) fn bind_actor_port(&self) {
1711 let port_id = self.mailbox.actor_id().port_id(M::port());
1712 self.bound
1713 .set(port_id)
1714 .map_err(|p| {
1715 format!(
1716 "could not bind port handle {} as {p}: already bound",
1717 self.port_index
1718 )
1719 })
1720 .unwrap();
1721 self.mailbox.bind_to_actor_port(self);
1722 }
1723}
1724
1725impl<M: Message> Clone for PortHandle<M> {
1726 fn clone(&self) -> Self {
1727 Self {
1728 mailbox: self.mailbox.clone(),
1729 port_index: self.port_index,
1730 sender: self.sender.clone(),
1731 bound: self.bound.clone(),
1732 reducer_spec: self.reducer_spec.clone(),
1733 reducer_opts: self.reducer_opts.clone(),
1734 }
1735 }
1736}
1737
1738impl<M: Message> fmt::Display for PortHandle<M> {
1739 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1740 fmt::Display::fmt(&self.location(), f)
1741 }
1742}
1743
1744#[derive(Debug)]
1746pub struct OncePortHandle<M: Message> {
1747 mailbox: Mailbox,
1748 port_index: u64,
1749 port_id: PortId,
1750 sender: oneshot::Sender<M>,
1751}
1752
1753impl<M: Message> OncePortHandle<M> {
1754 pub fn port_id(&self) -> &PortId {
1757 &self.port_id
1758 }
1759
1760 pub fn send(self, message: M) -> Result<(), MailboxSenderError> {
1763 let actor_id = self.mailbox.actor_id().clone();
1764 self.sender.send(message).map_err(|_| {
1765 MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1770 })?;
1771 Ok(())
1772 }
1773}
1774
1775impl<M: RemoteMessage> OncePortHandle<M> {
1776 pub fn bind(self) -> OncePortRef<M> {
1781 let port_id = self.port_id().clone();
1782 self.mailbox.clone().bind_once(self);
1783 OncePortRef::attest(port_id)
1784 }
1785}
1786
1787impl<M: Message> fmt::Display for OncePortHandle<M> {
1788 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1789 fmt::Display::fmt(&self.port_id(), f)
1790 }
1791}
1792
1793#[derive(Debug)]
1796pub struct PortReceiver<M> {
1797 receiver: mpsc::UnboundedReceiver<M>,
1798 port_id: PortId,
1799 coalesce: bool,
1802 mailbox: Mailbox,
1805}
1806
1807impl<M> PortReceiver<M> {
1808 fn new(
1809 receiver: mpsc::UnboundedReceiver<M>,
1810 port_id: PortId,
1811 coalesce: bool,
1812 mailbox: Mailbox,
1813 ) -> Self {
1814 Self {
1815 receiver,
1816 port_id,
1817 coalesce,
1818 mailbox,
1819 }
1820 }
1821
1822 #[allow(clippy::result_large_err)] pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
1827 let mut next = self.receiver.try_recv();
1828 if self.coalesce
1830 && let Some(latest) = self.drain().pop()
1831 {
1832 next = Ok(latest);
1833 }
1834 match next {
1835 Ok(msg) => Ok(Some(msg)),
1836 Err(mpsc::error::TryRecvError::Empty) => Ok(None),
1837 Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
1838 self.actor_id().clone(),
1839 MailboxErrorKind::Closed,
1840 )),
1841 }
1842 }
1843
1844 pub async fn recv(&mut self) -> Result<M, MailboxError> {
1847 let mut next = self.receiver.recv().await;
1848 if self.coalesce
1851 && let Some(latest) = self.drain().pop()
1852 {
1853 next = Some(latest);
1854 }
1855 next.ok_or(MailboxError::new(
1856 self.actor_id().clone(),
1857 MailboxErrorKind::Closed,
1858 ))
1859 }
1860
1861 pub fn drain(&mut self) -> Vec<M> {
1863 let mut drained: Vec<M> = Vec::new();
1864 while let Ok(msg) = self.receiver.try_recv() {
1865 if self.coalesce {
1867 drained.pop();
1868 }
1869 drained.push(msg);
1870 }
1871 drained
1872 }
1873
1874 fn port(&self) -> u64 {
1875 self.port_id.1
1876 }
1877
1878 fn actor_id(&self) -> &ActorId {
1879 &self.port_id.0
1880 }
1881}
1882
1883impl<M> Drop for PortReceiver<M> {
1884 fn drop(&mut self) {
1885 self.mailbox.inner.ports.remove(&self.port());
1889 }
1890}
1891
1892impl<M> Stream for PortReceiver<M> {
1893 type Item = Result<M, MailboxError>;
1894
1895 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1896 std::pin::pin!(self.recv()).poll(cx).map(Some)
1897 }
1898}
1899
1900pub struct OncePortReceiver<M> {
1902 receiver: Option<oneshot::Receiver<M>>,
1903 port_id: PortId,
1904
1905 mailbox: Mailbox,
1908}
1909
1910impl<M> OncePortReceiver<M> {
1911 pub async fn recv(mut self) -> Result<M, MailboxError> {
1915 std::mem::take(&mut self.receiver)
1916 .unwrap()
1917 .await
1918 .map_err(|err| {
1919 MailboxError::new(
1920 self.actor_id().clone(),
1921 MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
1922 )
1923 })
1924 }
1925
1926 fn port(&self) -> u64 {
1927 self.port_id.1
1928 }
1929
1930 fn actor_id(&self) -> &ActorId {
1931 &self.port_id.0
1932 }
1933}
1934
1935impl<M> Drop for OncePortReceiver<M> {
1936 fn drop(&mut self) {
1937 self.mailbox.inner.ports.remove(&self.port());
1941 }
1942}
1943
1944pub struct SerializedSenderError {
1946 pub headers: Attrs,
1948 pub data: wirevalue::Any,
1950 pub error: MailboxSenderError,
1952}
1953
1954trait SerializedSender: Send + Sync {
1959 fn as_any(&self) -> &dyn Any;
1965
1966 #[allow(clippy::result_large_err)] fn send_serialized(
1974 &self,
1975 headers: Attrs,
1976 serialized: wirevalue::Any,
1977 ) -> Result<bool, SerializedSenderError>;
1978}
1979
1980enum UnboundedPortSender<M: Message> {
1982 Mpsc(mpsc::UnboundedSender<M>),
1984 Func(Arc<dyn Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
1986}
1987
1988impl<M: Message> UnboundedPortSender<M> {
1989 fn send(&self, headers: Attrs, message: M) -> Result<(), anyhow::Error> {
1990 match self {
1991 Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
1992 Self::Func(func) => func(headers, message),
1993 }
1994 }
1995}
1996
1997impl<M: Message> Clone for UnboundedPortSender<M> {
2000 fn clone(&self) -> Self {
2001 match self {
2002 Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
2003 Self::Func(func) => Self::Func(func.clone()),
2004 }
2005 }
2006}
2007
2008impl<M: Message> Debug for UnboundedPortSender<M> {
2009 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2010 match self {
2011 Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
2012 Self::Func(_) => f
2013 .debug_tuple("UnboundedPortSender::Func")
2014 .field(&"..")
2015 .finish(),
2016 }
2017 }
2018}
2019
2020struct UnboundedSender<M: Message> {
2021 sender: UnboundedPortSender<M>,
2022 port_id: PortId,
2023}
2024
2025impl<M: Message> UnboundedSender<M> {
2026 fn new(sender: UnboundedPortSender<M>, port_id: PortId) -> Self {
2029 Self { sender, port_id }
2030 }
2031
2032 fn send(&self, headers: Attrs, message: M) -> Result<(), MailboxSenderError> {
2033 self.sender.send(headers, message).map_err(|err| {
2034 MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
2035 })
2036 }
2037}
2038
2039impl<M: Message> Clone for UnboundedSender<M> {
2043 fn clone(&self) -> Self {
2044 Self {
2045 sender: self.sender.clone(),
2046 port_id: self.port_id.clone(),
2047 }
2048 }
2049}
2050
2051impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
2052 fn as_any(&self) -> &dyn Any {
2053 self
2054 }
2055
2056 fn send_serialized(
2057 &self,
2058 headers: Attrs,
2059 serialized: wirevalue::Any,
2060 ) -> Result<bool, SerializedSenderError> {
2061 match serialized.deserialized_unchecked() {
2067 Ok(message) => {
2068 self.sender.send(headers.clone(), message).map_err(|err| {
2069 SerializedSenderError {
2070 data: serialized,
2071 error: MailboxSenderError::new_bound(
2072 self.port_id.clone(),
2073 MailboxSenderErrorKind::Other(err),
2074 ),
2075 headers,
2076 }
2077 })?;
2078
2079 Ok(true)
2080 }
2081 Err(err) => Err(SerializedSenderError {
2082 data: serialized,
2083 error: MailboxSenderError::new_bound(
2084 self.port_id.clone(),
2085 MailboxSenderErrorKind::Deserialize(M::typename(), err),
2086 ),
2087 headers,
2088 }),
2089 }
2090 }
2091}
2092
2093#[derive(Debug)]
2096struct OnceSender<M: Message> {
2097 sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
2098 port_id: PortId,
2099}
2100
2101impl<M: Message> OnceSender<M> {
2102 fn new(sender: oneshot::Sender<M>, port_id: PortId) -> Self {
2105 Self {
2106 sender: Arc::new(Mutex::new(Some(sender))),
2107 port_id,
2108 }
2109 }
2110
2111 fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2112 match self.sender.lock().unwrap().take() {
2114 None => Err(MailboxSenderError::new_bound(
2115 self.port_id.clone(),
2116 MailboxSenderErrorKind::Closed,
2117 )),
2118 Some(sender) => {
2119 sender.send(message).map_err(|_| {
2120 MailboxSenderError::new_bound(
2125 self.port_id.clone(),
2126 MailboxSenderErrorKind::Closed,
2127 )
2128 })?;
2129 Ok(false)
2130 }
2131 }
2132 }
2133}
2134
2135impl<M: Message> Clone for OnceSender<M> {
2139 fn clone(&self) -> Self {
2140 Self {
2141 sender: self.sender.clone(),
2142 port_id: self.port_id.clone(),
2143 }
2144 }
2145}
2146
2147impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2148 fn as_any(&self) -> &dyn Any {
2149 self
2150 }
2151
2152 fn send_serialized(
2153 &self,
2154 headers: Attrs,
2155 serialized: wirevalue::Any,
2156 ) -> Result<bool, SerializedSenderError> {
2157 match serialized.deserialized() {
2158 Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2159 data: serialized,
2160 error: e,
2161 headers,
2162 }),
2163 Err(err) => Err(SerializedSenderError {
2164 data: serialized,
2165 error: MailboxSenderError::new_bound(
2166 self.port_id.clone(),
2167 MailboxSenderErrorKind::Deserialize(M::typename(), err),
2168 ),
2169 headers,
2170 }),
2171 }
2172 }
2173}
2174
2175pub(crate) struct UntypedUnboundedSender {
2177 pub(crate) sender:
2178 Box<dyn Fn(wirevalue::Any) -> Result<(), (wirevalue::Any, anyhow::Error)> + Send + Sync>,
2179 pub(crate) port_id: PortId,
2180}
2181
2182impl SerializedSender for UntypedUnboundedSender {
2183 fn as_any(&self) -> &dyn Any {
2184 self
2185 }
2186
2187 fn send_serialized(
2188 &self,
2189 headers: Attrs,
2190 serialized: wirevalue::Any,
2191 ) -> Result<bool, SerializedSenderError> {
2192 (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2193 data,
2194 error: MailboxSenderError::new_bound(
2195 self.port_id.clone(),
2196 MailboxSenderErrorKind::Other(err),
2197 ),
2198 headers,
2199 })?;
2200
2201 Ok(true)
2202 }
2203}
2204
2205struct State {
2207 actor_id: ActorId,
2209
2210 ports: DashMap<u64, Box<dyn SerializedSender>>,
2214
2215 next_port: AtomicU64,
2217
2218 forwarder: BoxedMailboxSender,
2220}
2221
2222impl State {
2223 fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
2225 Self {
2226 actor_id,
2227 ports: DashMap::new(),
2228 next_port: AtomicU64::new(USER_PORT_OFFSET),
2231 forwarder,
2232 }
2233 }
2234
2235 fn allocate_port(&self) -> u64 {
2237 self.next_port.fetch_add(1, Ordering::SeqCst)
2238 }
2239}
2240
2241impl fmt::Debug for State {
2242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2243 f.debug_struct("State")
2244 .field("actor_id", &self.actor_id)
2245 .field(
2246 "open_ports",
2247 &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2248 )
2249 .field("next_port", &self.next_port)
2250 .finish()
2251 }
2252}
2253
2254#[derive(Clone)]
2258pub struct MailboxMuxer {
2259 mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2260}
2261
2262impl Default for MailboxMuxer {
2263 fn default() -> Self {
2264 Self::new()
2265 }
2266}
2267
2268impl MailboxMuxer {
2269 pub fn new() -> Self {
2271 Self {
2272 mailboxes: Arc::new(DashMap::new()),
2273 }
2274 }
2275
2276 pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
2281 match self.mailboxes.entry(actor_id) {
2282 Entry::Occupied(_) => false,
2283 Entry::Vacant(entry) => {
2284 entry.insert(Box::new(sender));
2285 true
2286 }
2287 }
2288 }
2289
2290 pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2292 self.bind(mailbox.actor_id().clone(), mailbox)
2293 }
2294
2295 pub(crate) fn unbind(&self, actor_id: &ActorId) {
2299 self.mailboxes.remove(actor_id);
2300 }
2301
2302 pub fn bound_actors(&self) -> Vec<ActorId> {
2304 self.mailboxes.iter().map(|e| e.key().clone()).collect()
2305 }
2306}
2307
2308impl MailboxSender for MailboxMuxer {
2309 fn post_unchecked(
2310 &self,
2311 envelope: MessageEnvelope,
2312 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2313 ) {
2314 let dest_actor_id = envelope.dest().actor_id();
2315 match self.mailboxes.get(envelope.dest().actor_id()) {
2316 None => {
2317 let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2318 envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2319 }
2320 Some(sender) => sender.post(envelope, return_handle),
2321 }
2322 }
2323}
2324
2325#[derive(Clone)]
2328pub struct MailboxRouter {
2329 entries: Arc<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2330}
2331
2332impl Default for MailboxRouter {
2333 fn default() -> Self {
2334 Self::new()
2335 }
2336}
2337
2338impl MailboxRouter {
2339 pub fn new() -> Self {
2341 Self {
2342 entries: Arc::new(RwLock::new(BTreeMap::new())),
2343 }
2344 }
2345
2346 pub fn downgrade(&self) -> WeakMailboxRouter {
2348 WeakMailboxRouter(Arc::downgrade(&self.entries))
2349 }
2350
2351 pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2355 FallbackMailboxRouter {
2356 router: self.clone(),
2357 default,
2358 }
2359 }
2360
2361 pub fn bind(&self, dest: Reference, sender: impl MailboxSender + 'static) {
2365 let mut w = self.entries.write().unwrap();
2366 w.insert(dest, Arc::new(sender));
2367 }
2368
2369 fn sender(&self, actor_id: &ActorId) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2370 match self
2371 .entries
2372 .read()
2373 .unwrap()
2374 .lower_bound(Excluded(&actor_id.clone().into()))
2375 .prev()
2376 {
2377 None => None,
2378 Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2379 Some(sender.clone())
2380 }
2381 Some(_) => None,
2382 }
2383 }
2384}
2385
2386impl MailboxSender for MailboxRouter {
2387 fn post_unchecked(
2388 &self,
2389 envelope: MessageEnvelope,
2390 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2391 ) {
2392 match self.sender(envelope.dest().actor_id()) {
2393 None => envelope.undeliverable(
2394 DeliveryError::Unroutable(
2395 "no destination found for actor in routing table".to_string(),
2396 ),
2397 return_handle,
2398 ),
2399 Some(sender) => sender.post(envelope, return_handle),
2400 }
2401 }
2402}
2403
2404#[derive(Clone)]
2405struct FallbackMailboxRouter {
2406 router: MailboxRouter,
2407 default: BoxedMailboxSender,
2408}
2409
2410impl MailboxSender for FallbackMailboxRouter {
2411 fn post_unchecked(
2412 &self,
2413 envelope: MessageEnvelope,
2414 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2415 ) {
2416 match self.router.sender(envelope.dest().actor_id()) {
2417 Some(sender) => sender.post(envelope, return_handle),
2418 None => self.default.post(envelope, return_handle),
2419 }
2420 }
2421}
2422
2423#[derive(Debug, Clone)]
2432pub struct WeakMailboxRouter(
2433 Weak<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2434);
2435
2436impl WeakMailboxRouter {
2437 pub fn upgrade(&self) -> Option<MailboxRouter> {
2439 self.0.upgrade().map(|entries| MailboxRouter { entries })
2440 }
2441}
2442
2443impl MailboxSender for WeakMailboxRouter {
2444 fn post_unchecked(
2445 &self,
2446 envelope: MessageEnvelope,
2447 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2448 ) {
2449 match self.upgrade() {
2450 Some(router) => router.post(envelope, return_handle),
2451 None => envelope.undeliverable(
2452 DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2453 return_handle,
2454 ),
2455 }
2456 }
2457}
2458
2459#[derive(Clone)]
2473pub struct DialMailboxRouter {
2474 address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,
2475 sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2476
2477 default: BoxedMailboxSender,
2480
2481 direct_addressed_remote_only: bool,
2484}
2485
2486impl Default for DialMailboxRouter {
2487 fn default() -> Self {
2488 Self::new()
2489 }
2490}
2491
2492impl DialMailboxRouter {
2493 pub fn new() -> Self {
2495 Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2496 }
2497
2498 pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2503 Self {
2504 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2505 sender_cache: Arc::new(DashMap::new()),
2506 default,
2507 direct_addressed_remote_only: false,
2508 }
2509 }
2510
2511 pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
2516 Self {
2517 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2518 sender_cache: Arc::new(DashMap::new()),
2519 default,
2520 direct_addressed_remote_only: true,
2521 }
2522 }
2523
2524 pub fn bind(&self, dest: Reference, addr: ChannelAddr) {
2530 if let Ok(mut w) = self.address_book.write() {
2531 if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
2532 && old_addr != addr
2533 {
2534 tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2535 self.sender_cache.remove(&old_addr);
2536 }
2537 } else {
2538 tracing::error!("address book poisoned during bind of {:?}", dest);
2539 }
2540 }
2541
2542 pub fn unbind(&self, dest: &Reference) {
2548 if let Ok(mut w) = self.address_book.write() {
2549 let to_remove: Vec<(Reference, ChannelAddr)> = w
2550 .range(dest..)
2551 .take_while(|(key, _)| dest.is_prefix_of(key))
2552 .map(|(key, addr)| (key.clone(), addr.clone()))
2553 .collect();
2554
2555 for (key, addr) in to_remove {
2556 tracing::info!("unbinding {:?} from {:?}", key, addr);
2557 w.remove(&key);
2558 self.sender_cache.remove(&addr);
2559 }
2560 } else {
2561 tracing::error!("address book poisoned during unbind of {:?}", dest);
2562 }
2563 }
2564
2565 pub fn lookup_addr(&self, actor_id: &ActorId) -> Option<ChannelAddr> {
2567 let address_book = self.address_book.read().unwrap();
2568 let found = address_book
2569 .lower_bound(Excluded(&actor_id.clone().into()))
2570 .prev();
2571
2572 if let Some((key, addr)) = found
2575 && key.is_prefix_of(&actor_id.clone().into())
2576 {
2577 Some(addr.clone())
2578 } else if actor_id.proc_id().is_direct() {
2579 let (addr, _name) = actor_id.proc_id().clone().into_direct().unwrap();
2580 if self.direct_addressed_remote_only {
2581 addr.transport().is_remote().then_some(addr)
2582 } else {
2583 Some(addr)
2584 }
2585 } else {
2586 None
2587 }
2588 }
2589
2590 pub fn prefixes(&self) -> BTreeSet<Reference> {
2593 let addrs = self.address_book.read().unwrap();
2594 let mut prefixes: BTreeSet<Reference> = BTreeSet::new();
2595 for (reference, _) in addrs.iter() {
2596 match prefixes.lower_bound(Excluded(reference)).peek_prev() {
2597 Some(candidate) if candidate.is_prefix_of(reference) => (),
2598 _ => {
2599 prefixes.insert(reference.clone());
2600 }
2601 }
2602 }
2603
2604 prefixes
2605 }
2606
2607 fn dial(
2608 &self,
2609 addr: &ChannelAddr,
2610 actor_id: &ActorId,
2611 ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2612 match self.sender_cache.entry(addr.clone()) {
2616 Entry::Occupied(entry) => Ok(entry.get().clone()),
2617 Entry::Vacant(entry) => {
2618 let tx = channel::dial(addr.clone()).map_err(|err| {
2619 MailboxSenderError::new_unbound_type(
2620 actor_id.clone(),
2621 MailboxSenderErrorKind::Channel(err),
2622 "unknown",
2623 )
2624 })?;
2625 let sender = MailboxClient::new(tx);
2626 Ok(entry.insert(Arc::new(sender)).value().clone())
2627 }
2628 }
2629 }
2630}
2631
2632impl MailboxSender for DialMailboxRouter {
2633 fn post_unchecked(
2634 &self,
2635 envelope: MessageEnvelope,
2636 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2637 ) {
2638 let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2639 self.default.post(envelope, return_handle);
2640 return;
2641 };
2642
2643 match self.dial(&addr, envelope.dest().actor_id()) {
2644 Err(err) => envelope.undeliverable(
2645 DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2646 return_handle,
2647 ),
2648 Ok(sender) => sender.post(envelope, return_handle),
2649 }
2650 }
2651}
2652
2653#[derive(Debug)]
2656pub struct UnroutableMailboxSender;
2657
2658impl MailboxSender for UnroutableMailboxSender {
2659 fn post_unchecked(
2660 &self,
2661 envelope: MessageEnvelope,
2662 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2663 ) {
2664 envelope.undeliverable(
2665 DeliveryError::Unroutable("destination not found in routing table".to_string()),
2666 return_handle,
2667 );
2668 }
2669}
2670
2671#[cfg(test)]
2672mod tests {
2673
2674 use std::assert_matches::assert_matches;
2675 use std::mem::drop;
2676 use std::str::FromStr;
2677 use std::sync::atomic::AtomicUsize;
2678 use std::time::Duration;
2679
2680 use timed_test::async_timed_test;
2681
2682 use super::*;
2683 use crate::Actor;
2684 use crate::ActorHandle;
2685 use crate::Instance;
2686 use crate::PortId;
2687 use crate::accum;
2688 use crate::channel::ChannelTransport;
2689 use crate::channel::dial;
2690 use crate::channel::serve;
2691 use crate::channel::sim::SimAddr;
2692 use crate::clock::Clock;
2693 use crate::clock::RealClock;
2694 use crate::id;
2695 use crate::proc::Proc;
2696 use crate::reference::ProcId;
2697 use crate::reference::WorldId;
2698 use crate::simnet;
2699
2700 #[test]
2701 fn test_error() {
2702 let err = MailboxError::new(
2703 ActorId(
2704 ProcId::Ranked(WorldId("myworld".to_string()), 2),
2705 "myactor".to_string(),
2706 5,
2707 ),
2708 MailboxErrorKind::Closed,
2709 );
2710 assert_eq!(format!("{}", err), "myworld[2].myactor[5]: mailbox closed");
2711 }
2712
2713 #[tokio::test]
2714 async fn test_mailbox_basic() {
2715 let mbox = Mailbox::new_detached(id!(test[0].test));
2716 let (port, mut receiver) = mbox.open_port::<u64>();
2717 let port = port.bind();
2718
2719 mbox.serialize_and_send(&port, 123, monitored_return_handle())
2720 .unwrap();
2721 mbox.serialize_and_send(&port, 321, monitored_return_handle())
2722 .unwrap();
2723 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2724 assert_eq!(receiver.recv().await.unwrap(), 321u64);
2725
2726 let serialized = wirevalue::Any::serialize(&999u64).unwrap();
2727 mbox.post(
2728 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2729 monitored_return_handle(),
2730 );
2731 assert_eq!(receiver.recv().await.unwrap(), 999u64);
2732 }
2733
2734 #[tokio::test]
2735 async fn test_mailbox_accum() {
2736 let mbox = Mailbox::new_detached(id!(test[0].test));
2737 let (port, mut receiver) = mbox.open_accum_port(accum::max::<i64>());
2738
2739 for i in -3..4 {
2740 port.send(i).unwrap();
2741 let received: accum::Max<i64> = receiver.recv().await.unwrap();
2742 let msg = received.get();
2743 assert_eq!(msg, &i);
2744 }
2745 for i in -3..4 {
2747 port.send(i).unwrap();
2748 assert_eq!(receiver.recv().await.unwrap().get(), &3);
2749 }
2750 port.send(4).unwrap();
2752 assert_eq!(receiver.recv().await.unwrap().get(), &4);
2753
2754 for i in 5..10 {
2756 port.send(i).unwrap();
2757 }
2758 assert_eq!(receiver.recv().await.unwrap().get(), &9);
2759 port.send(1).unwrap();
2760 port.send(3).unwrap();
2761 port.send(2).unwrap();
2762 assert_eq!(receiver.recv().await.unwrap().get(), &9);
2763 }
2764
2765 #[test]
2766 fn test_port_and_reducer() {
2767 let mbox = Mailbox::new_detached(id!(test[0].test));
2768 {
2770 let accumulator = accum::max::<u64>();
2771 let reducer_spec = accumulator.reducer_spec().unwrap();
2772 let (port, _) = mbox.open_accum_port(accum::max::<u64>());
2773 assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
2774 let port_ref = port.bind();
2775 assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
2776 }
2777 {
2779 let (port, _) = mbox.open_port::<u64>();
2780 assert_eq!(port.reducer_spec, None);
2781 let port_ref = port.bind();
2782 assert_eq!(port_ref.reducer_spec(), &None);
2783 }
2784 }
2785
2786 #[tokio::test]
2787 #[ignore] async fn test_mailbox_once() {
2789 let mbox = Mailbox::new_detached(id!(test[0].test));
2790
2791 let (port, receiver) = mbox.open_once_port::<u64>();
2792
2793 port.send(123u64).unwrap();
2796 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2797
2798 }
2809
2810 #[tokio::test]
2811 #[ignore] async fn test_mailbox_receiver_drop() {
2813 let mbox = Mailbox::new_detached(id!(test[0].test));
2814 let (port, mut receiver) = mbox.open_port::<u64>();
2815 let port = port.bind();
2817 mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
2818 .unwrap();
2819 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2820 drop(receiver);
2821 let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
2822 panic!();
2823 };
2824
2825 assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2826 assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
2827 }
2828
2829 #[tokio::test]
2830 async fn test_drain() {
2831 let mbox = Mailbox::new_detached(id!(test[0].test));
2832
2833 let (port, mut receiver) = mbox.open_port();
2834 let port = port.bind();
2835
2836 for i in 0..10 {
2837 mbox.serialize_and_send(&port, i, monitored_return_handle())
2838 .unwrap();
2839 }
2840
2841 for i in 0..10 {
2842 assert_eq!(receiver.recv().await.unwrap(), i);
2843 }
2844
2845 assert!(receiver.drain().is_empty());
2846 }
2847
2848 #[tokio::test]
2849 async fn test_mailbox_muxer() {
2850 let muxer = MailboxMuxer::new();
2851
2852 let mbox0 = Mailbox::new_detached(id!(test[0].actor1));
2853 let mbox1 = Mailbox::new_detached(id!(test[0].actor2));
2854
2855 muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
2856 muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
2857
2858 let (port, receiver) = mbox0.open_once_port::<u64>();
2859
2860 port.send(123u64).unwrap();
2861 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2862
2863 }
2873
2874 #[tokio::test]
2875 async fn test_local_client_server() {
2876 let mbox = Mailbox::new_detached(id!(test[0].actor0));
2877 let (tx, rx) = channel::local::new();
2878 let serve_handle = mbox.clone().serve(rx);
2879 let client = MailboxClient::new(tx);
2880
2881 let (port, receiver) = mbox.open_once_port::<u64>();
2882 let port = port.bind();
2883
2884 client
2885 .serialize_and_send_once(port, 123u64, monitored_return_handle())
2886 .unwrap();
2887 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2888 serve_handle.stop("fromt test");
2889 serve_handle.await.unwrap().unwrap();
2890 }
2891
2892 #[tokio::test]
2893 async fn test_sim_client_server() {
2894 simnet::start();
2895 let dst_addr = SimAddr::new("local:1".parse::<ChannelAddr>().unwrap()).unwrap();
2896 let src_to_dst = ChannelAddr::Sim(
2897 SimAddr::new_with_src(
2898 "local:0".parse::<ChannelAddr>().unwrap(),
2899 dst_addr.addr().clone(),
2900 )
2901 .unwrap(),
2902 );
2903
2904 let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone())).unwrap();
2905 let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
2906 let mbox = Mailbox::new_detached(id!(test[0].actor0));
2907 let serve_handle = mbox.clone().serve(rx);
2908 let client = MailboxClient::new(tx);
2909 let (port, receiver) = mbox.open_once_port::<u64>();
2910 let port = port.bind();
2911 let msg: u64 = 123;
2912 client
2913 .serialize_and_send_once(port, msg, monitored_return_handle())
2914 .unwrap();
2915 assert_eq!(receiver.recv().await.unwrap(), msg);
2916 serve_handle.stop("from test");
2917 serve_handle.await.unwrap().unwrap();
2918 }
2919
2920 #[tokio::test]
2921 async fn test_mailbox_router() {
2922 let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2923 let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2924 let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2925 let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2926
2927 let comms: Vec<(OncePortRef<u64>, OncePortReceiver<u64>)> =
2928 [&mbox0, &mbox1, &mbox2, &mbox3]
2929 .into_iter()
2930 .map(|mbox| {
2931 let (port, receiver) = mbox.open_once_port::<u64>();
2932 (port.bind(), receiver)
2933 })
2934 .collect();
2935
2936 let router = MailboxRouter::new();
2937
2938 router.bind(id!(world0).into(), mbox0);
2939 router.bind(id!(world1[0]).into(), mbox1);
2940 router.bind(id!(world1[1]).into(), mbox2);
2941 router.bind(id!(world1[1].actor1).into(), mbox3);
2942
2943 for (i, (port, receiver)) in comms.into_iter().enumerate() {
2944 router
2945 .serialize_and_send_once(port, i as u64, monitored_return_handle())
2946 .unwrap();
2947 assert_eq!(receiver.recv().await.unwrap(), i as u64);
2948 }
2949
2950 let mbox4 = Mailbox::new_detached(id!(fallback[0].actor));
2953
2954 let (return_handle, mut return_receiver) =
2955 crate::mailbox::undeliverable::new_undeliverable_port();
2956 let (port, _receiver) = mbox4.open_once_port();
2957 router
2958 .serialize_and_send_once(port.bind(), 0, return_handle.clone())
2959 .unwrap();
2960 assert!(return_receiver.recv().await.is_ok());
2961
2962 let router = router.fallback(mbox4.clone().into_boxed());
2963 let (port, receiver) = mbox4.open_once_port();
2964 router
2965 .serialize_and_send_once(port.bind(), 0, return_handle)
2966 .unwrap();
2967 assert_eq!(receiver.recv().await.unwrap(), 0);
2968 }
2969
2970 #[tokio::test]
2971 async fn test_dial_mailbox_router() {
2972 let router = DialMailboxRouter::new();
2973
2974 router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
2975 router.bind(id!(world1[0]).into(), "unix!@2".parse().unwrap());
2976 router.bind(id!(world1[1]).into(), "unix!@3".parse().unwrap());
2977 router.bind(id!(world1[1].actor1).into(), "unix!@4".parse().unwrap());
2978 router.bind(
2980 "unix:@4,my_proc,my_actor".parse().unwrap(),
2981 "unix:@5".parse().unwrap(),
2982 );
2983
2984 router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2986 router.lookup_addr(&id!(world1[0].actor[0])).unwrap();
2987
2988 let actor_id = Reference::from_str("unix:@4,my_proc,my_actor")
2989 .unwrap()
2990 .into_actor()
2991 .unwrap();
2992 assert_eq!(
2993 router.lookup_addr(&actor_id).unwrap(),
2994 "unix!@5".parse().unwrap(),
2995 );
2996 router.unbind(&actor_id.clone().into());
2997 assert_eq!(
2998 router.lookup_addr(&actor_id).unwrap(),
2999 "unix!@4".parse().unwrap(),
3000 );
3001
3002 router.unbind(&id!(world1).into());
3004 assert!(router.lookup_addr(&id!(world1[0].actor1[0])).is_none());
3005 assert!(router.lookup_addr(&id!(world1[1].actor1[0])).is_none());
3006 assert!(router.lookup_addr(&id!(world1[2].actor1[0])).is_none());
3007 router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
3008 router.unbind(&id!(world0).into());
3009 assert!(router.lookup_addr(&id!(world0[0].actor[0])).is_none());
3010 }
3011
3012 #[tokio::test]
3013 #[ignore] async fn test_dial_mailbox_router_default() {
3015 let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
3016 let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
3017 let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
3018 let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
3019
3020 let root = MailboxRouter::new();
3023 let world0_router = DialMailboxRouter::new_with_default(root.boxed());
3024 let world1_router = DialMailboxRouter::new_with_default(root.boxed());
3025
3026 root.bind(id!(world0).into(), world0_router.clone());
3027 root.bind(id!(world1).into(), world1_router.clone());
3028
3029 let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
3030
3031 let mut handles = Vec::new(); for mbox in mailboxes.iter() {
3033 let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
3034 let handle = (*mbox).clone().serve(rx);
3035 handles.push(handle);
3036
3037 eprintln!("{}: {}", mbox.actor_id(), addr);
3038 if mbox.actor_id().world_name() == "world0" {
3039 world0_router.bind(mbox.actor_id().clone().into(), addr);
3040 } else {
3041 world1_router.bind(mbox.actor_id().clone().into(), addr);
3042 }
3043 }
3044
3045 for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
3047 for mbox in mailboxes.iter() {
3048 let (port, receiver) = mbox.open_once_port::<u64>();
3049 let port = port.bind();
3050 router
3051 .serialize_and_send_once(port, 123u64, monitored_return_handle())
3052 .unwrap();
3053 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3054 }
3055 }
3056 }
3057
3058 #[tokio::test]
3059 async fn test_enqueue_port() {
3060 let mbox = Mailbox::new_detached(id!(test[0].test));
3061
3062 let count = Arc::new(AtomicUsize::new(0));
3063 let count_clone = count.clone();
3064 let port = mbox.open_enqueue_port(move |_, n| {
3065 count_clone.fetch_add(n, Ordering::SeqCst);
3066 Ok(())
3067 });
3068
3069 port.send(10).unwrap();
3070 port.send(5).unwrap();
3071 port.send(1).unwrap();
3072 port.send(0).unwrap();
3073
3074 assert_eq!(count.load(Ordering::SeqCst), 16);
3075 }
3076
3077 #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3078 struct TestMessage;
3079
3080 #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3081 #[named(name = "some::custom::path")]
3082 struct TestMessage2;
3083
3084 #[test]
3085 fn test_remote_message_macros() {
3086 assert_eq!(
3087 TestMessage::typename(),
3088 "hyperactor::mailbox::tests::TestMessage"
3089 );
3090 assert_eq!(TestMessage2::typename(), "some::custom::path");
3091 }
3092
3093 #[test]
3094 fn test_message_envelope_display() {
3095 #[derive(typeuri::Named, Serialize, Deserialize)]
3096 struct MyTest {
3097 a: u64,
3098 b: String,
3099 }
3100 wirevalue::register_type!(MyTest);
3101
3102 let envelope = MessageEnvelope::serialize(
3103 id!(source[0].actor),
3104 id!(dest[1].actor[0][123]),
3105 &MyTest {
3106 a: 123,
3107 b: "hello".into(),
3108 },
3109 Attrs::new(),
3110 )
3111 .unwrap();
3112
3113 assert_eq!(
3114 format!("{}", envelope),
3115 r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"} {}"#
3116 );
3117 }
3118
3119 #[derive(Debug, Default)]
3120 struct Foo;
3121
3122 impl Actor for Foo {}
3123
3124 #[tokio::test]
3127 async fn test_actor_delivery_failure() {
3128 use crate::actor::ActorStatus;
3131 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
3132
3133 let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
3134 BOXED_PANICKING_MAILBOX_SENDER.clone(),
3135 ));
3136 let proc_id = id!(quux[0]);
3137 let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
3138 ProcSupervisionCoordinator::set(&proc).await.unwrap();
3139
3140 let foo = proc.spawn("foo", Foo).unwrap();
3141 let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
3142 let message = MessageEnvelope::new(
3143 foo.actor_id().clone(),
3144 PortId(id!(corge[0].bar), 9999u64),
3145 wirevalue::Any::serialize(&1u64).unwrap(),
3146 Attrs::new(),
3147 );
3148 return_handle.send(Undeliverable(message)).unwrap();
3149
3150 RealClock
3151 .sleep(tokio::time::Duration::from_millis(100))
3152 .await;
3153
3154 let foo_status = foo.status();
3155 assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
3156 let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
3157 unreachable!()
3158 };
3159 assert!(msg.to_string().contains(
3160 "a message from \
3161 quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
3162 ));
3163
3164 proc.destroy_and_wait::<()>(tokio::time::Duration::from_secs(1), None)
3165 .await
3166 .unwrap();
3167 }
3168
3169 #[tokio::test]
3170 async fn test_detached_return_handle() {
3171 let (return_handle, mut return_receiver) =
3172 crate::mailbox::undeliverable::new_undeliverable_port();
3173 let envelope = MessageEnvelope::new(
3175 id!(foo[0].bar),
3176 PortId(id!(baz[0].corge), 9999u64),
3177 wirevalue::Any::serialize(&1u64).unwrap(),
3178 Attrs::new(),
3179 );
3180 return_handle.send(Undeliverable(envelope.clone())).unwrap();
3181 assert!(
3183 RealClock
3184 .timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
3185 .await
3186 .is_ok()
3187 );
3188 let monitor_handle = tokio::spawn(async move {
3191 while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
3192 envelope.set_error(DeliveryError::BrokenLink(
3193 "returned in unit test".to_string(),
3194 ));
3195 UndeliverableMailboxSender
3196 .post(envelope, monitored_return_handle());
3197 }
3198 });
3199 drop(return_handle);
3200 assert!(
3201 RealClock
3202 .timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3203 .await
3204 .is_ok()
3205 );
3206 }
3207
3208 async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3209 fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3210 let dummy_state =
3213 State::new(id!(world[0].actor), BOXED_PANICKING_MAILBOX_SENDER.clone());
3214 let dummy_port_id = PortId(id!(world[0].actor), 0);
3215 let (sender, receiver) = mpsc::unbounded_channel::<M>();
3216 let receiver = PortReceiver {
3217 receiver,
3218 port_id: dummy_port_id,
3219 coalesce,
3220 mailbox: Mailbox {
3221 inner: Arc::new(dummy_state),
3222 },
3223 };
3224 (sender, receiver)
3225 }
3226
3227 {
3229 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3230 assert!(receiver.drain().is_empty());
3231
3232 sender.send(0).unwrap();
3233 sender.send(1).unwrap();
3234 sender.send(2).unwrap();
3235 sender.send(3).unwrap();
3236 sender.send(4).unwrap();
3237 sender.send(5).unwrap();
3238 sender.send(6).unwrap();
3239 sender.send(7).unwrap();
3240
3241 if drop_sender {
3242 drop(sender);
3243 }
3244
3245 if !coalesce {
3246 assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3247 } else {
3248 assert_eq!(receiver.drain(), vec![7]);
3249 }
3250
3251 assert!(receiver.drain().is_empty());
3252 assert!(receiver.drain().is_empty());
3253 }
3254
3255 {
3257 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3258 assert!(receiver.try_recv().unwrap().is_none());
3259
3260 sender.send(0).unwrap();
3261 sender.send(1).unwrap();
3262 sender.send(2).unwrap();
3263 sender.send(3).unwrap();
3264
3265 if drop_sender {
3266 drop(sender);
3267 }
3268
3269 if !coalesce {
3270 assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3271 assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3272 assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3273 }
3274 assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3275 if drop_sender {
3276 assert_matches!(
3277 receiver.try_recv().unwrap_err().kind(),
3278 MailboxErrorKind::Closed
3279 );
3280 assert_matches!(
3282 receiver.try_recv().unwrap_err().kind(),
3283 MailboxErrorKind::Closed
3284 );
3285 } else {
3286 assert!(receiver.try_recv().unwrap().is_none());
3287 assert!(receiver.try_recv().unwrap().is_none());
3289 }
3290 }
3291 {
3293 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3294 assert!(
3295 RealClock
3296 .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3297 .await
3298 .is_err()
3299 );
3300
3301 sender.send(4).unwrap();
3302 sender.send(5).unwrap();
3303 sender.send(6).unwrap();
3304 sender.send(7).unwrap();
3305
3306 if drop_sender {
3307 drop(sender);
3308 }
3309
3310 if !coalesce {
3311 assert_eq!(receiver.recv().await.unwrap(), 4);
3312 assert_eq!(receiver.recv().await.unwrap(), 5);
3313 assert_eq!(receiver.recv().await.unwrap(), 6);
3314 }
3315 assert_eq!(receiver.recv().await.unwrap(), 7);
3316 if drop_sender {
3317 assert_matches!(
3318 receiver.recv().await.unwrap_err().kind(),
3319 MailboxErrorKind::Closed
3320 );
3321 assert_matches!(
3323 receiver.recv().await.unwrap_err().kind(),
3324 MailboxErrorKind::Closed
3325 );
3326 } else {
3327 assert!(
3328 RealClock
3329 .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3330 .await
3331 .is_err()
3332 );
3333 }
3334 }
3335 }
3336
3337 #[tokio::test]
3338 async fn test_receiver_basic_default() {
3339 verify_receiver(false, false).await
3340 }
3341
3342 #[tokio::test]
3343 async fn test_receiver_basic_latest() {
3344 verify_receiver(true, false).await
3345 }
3346
3347 #[tokio::test]
3348 async fn test_receiver_after_sender_drop_default() {
3349 verify_receiver(false, true).await
3350 }
3351
3352 #[tokio::test]
3353 async fn test_receiver_after_sender_drop_latest() {
3354 verify_receiver(true, true).await
3355 }
3356
3357 struct Setup {
3358 receiver: PortReceiver<u64>,
3359 actor0: Instance<()>,
3360 actor1: Instance<()>,
3361 _actor0_handle: ActorHandle<()>,
3362 _actor1_handle: ActorHandle<()>,
3363 port_id: PortId,
3364 port_id1: PortId,
3365 port_id2: PortId,
3366 port_id2_1: PortId,
3367 }
3368
3369 async fn setup_split_port_ids(
3370 reducer_spec: Option<ReducerSpec>,
3371 reducer_opts: Option<ReducerOpts>,
3372 ) -> Setup {
3373 let proc = Proc::local();
3374 let (actor0, actor0_handle) = proc.instance("actor0").unwrap();
3375 let (actor1, actor1_handle) = proc.instance("actor1").unwrap();
3376
3377 let (port_handle, receiver) = actor0.open_port::<u64>();
3379 let port_id = port_handle.bind().port_id().clone();
3380
3381 let port_id1 = port_id
3383 .split(&actor1, reducer_spec.clone(), reducer_opts.clone(), true)
3384 .unwrap();
3385 let port_id2 = port_id
3386 .split(&actor1, reducer_spec.clone(), reducer_opts.clone(), true)
3387 .unwrap();
3388
3389 let port_id2_1 = port_id2
3391 .split(&actor1, reducer_spec, reducer_opts.clone(), true)
3392 .unwrap();
3393
3394 Setup {
3395 receiver,
3396 actor0,
3397 actor1,
3398 _actor0_handle: actor0_handle,
3399 _actor1_handle: actor1_handle,
3400 port_id,
3401 port_id1,
3402 port_id2,
3403 port_id2_1,
3404 }
3405 }
3406
3407 fn post(cx: &impl context::Actor, port_id: PortId, msg: u64) {
3408 let serialized = wirevalue::Any::serialize(&msg).unwrap();
3409 port_id.send(cx, serialized);
3410 }
3411
3412 #[async_timed_test(timeout_secs = 30)]
3413 #[cfg_attr(not(fbcode_build), ignore)]
3415 async fn test_split_port_id_no_reducer() {
3416 let Setup {
3417 mut receiver,
3418 actor0,
3419 actor1,
3420 port_id,
3421 port_id1,
3422 port_id2,
3423 port_id2_1,
3424 ..
3425 } = setup_split_port_ids(None, None).await;
3426 post(&actor0, port_id.clone(), 1);
3428 assert_eq!(receiver.recv().await.unwrap(), 1);
3429 post(&actor1, port_id1.clone(), 2);
3430 assert_eq!(receiver.recv().await.unwrap(), 2);
3431 post(&actor1, port_id2.clone(), 3);
3432 assert_eq!(receiver.recv().await.unwrap(), 3);
3433 post(&actor1, port_id2_1.clone(), 4);
3434 assert_eq!(receiver.recv().await.unwrap(), 4);
3435
3436 RealClock.sleep(Duration::from_secs(2)).await;
3438 let msg = receiver.try_recv().unwrap();
3439 assert_eq!(msg, None);
3440 }
3441
3442 async fn wait_for(
3443 receiver: &mut PortReceiver<u64>,
3444 expected_size: usize,
3445 timeout_duration: Duration,
3446 ) -> anyhow::Result<Vec<u64>> {
3447 let mut messeges = vec![];
3448
3449 RealClock
3450 .timeout(timeout_duration, async {
3451 loop {
3452 let msg = receiver.recv().await.unwrap();
3453 messeges.push(msg);
3454 if messeges.len() == expected_size {
3455 break;
3456 }
3457 }
3458 })
3459 .await?;
3460 Ok(messeges)
3461 }
3462
3463 #[async_timed_test(timeout_secs = 30)]
3464 async fn test_split_port_id_sum_reducer() {
3465 let config = hyperactor_config::global::lock();
3466 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3467
3468 let sum_accumulator = accum::sum::<u64>();
3469 let reducer_spec = sum_accumulator.reducer_spec();
3470 let Setup {
3471 mut receiver,
3472 actor0,
3473 actor1,
3474 port_id,
3475 port_id1,
3476 port_id2,
3477 port_id2_1,
3478 ..
3479 } = setup_split_port_ids(reducer_spec, None).await;
3480 post(&actor0, port_id.clone(), 4);
3481 post(&actor1, port_id1.clone(), 2);
3482 post(&actor1, port_id2.clone(), 3);
3483 post(&actor1, port_id2_1.clone(), 1);
3484 let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3485 .await
3486 .unwrap();
3487 messages.sort();
3490 assert_eq!(messages, vec![1, 2, 3, 4]);
3491
3492 RealClock.sleep(Duration::from_secs(2)).await;
3494 let msg = receiver.try_recv().unwrap();
3495 assert_eq!(msg, None);
3496 }
3497
3498 #[async_timed_test(timeout_secs = 30)]
3499 #[cfg_attr(not(fbcode_build), ignore)]
3501 async fn test_split_port_id_every_n_messages() {
3502 let config = hyperactor_config::global::lock();
3503 let _config_guard =
3504 config.override_key(crate::config::SPLIT_MAX_BUFFER_AGE, Duration::from_mins(10));
3505 let proc = Proc::local();
3506 let (actor, _actor_handle) = proc.instance("actor").unwrap();
3507 let (port_handle, mut receiver) = actor.open_port::<u64>();
3508 let port_id = port_handle.bind().port_id().clone();
3509 let reducer_spec = accum::sum::<u64>().reducer_spec();
3511 let split_port_id = port_id
3512 .split(
3513 &actor,
3514 reducer_spec,
3515 Some(ReducerOpts {
3516 max_update_interval: Some(Duration::from_mins(10)),
3517 initial_update_interval: Some(Duration::from_mins(10)),
3518 }),
3519 true,
3520 )
3521 .unwrap();
3522
3523 for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3525 post(&actor, split_port_id.clone(), msg);
3526 }
3527 let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3530 .await
3531 .unwrap();
3532 assert_eq!(messages, vec![15]);
3533
3534 RealClock.sleep(Duration::from_secs(2)).await;
3537 let msg = receiver.try_recv().unwrap();
3538 assert_eq!(msg, None);
3539 }
3540
3541 #[async_timed_test(timeout_secs = 30)]
3542 async fn test_split_port_timeout_flush() {
3543 let config = hyperactor_config::global::lock();
3544 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
3545
3546 let Setup {
3547 mut receiver,
3548 actor0: _,
3549 actor1,
3550 port_id: _,
3551 port_id1,
3552 port_id2: _,
3553 port_id2_1: _,
3554 ..
3555 } = setup_split_port_ids(
3556 Some(accum::sum::<u64>().reducer_spec().unwrap()),
3557 Some(ReducerOpts {
3558 max_update_interval: Some(Duration::from_millis(50)),
3559 initial_update_interval: Some(Duration::from_millis(50)),
3560 }),
3561 )
3562 .await;
3563
3564 post(&actor1, port_id1.clone(), 10);
3565 post(&actor1, port_id1.clone(), 20);
3566 post(&actor1, port_id1.clone(), 30);
3567
3568 RealClock.sleep(Duration::from_millis(10)).await;
3570 let msg = receiver.try_recv().unwrap();
3571 assert_eq!(msg, None);
3572
3573 RealClock.sleep(Duration::from_millis(100)).await;
3575
3576 let msg = receiver.recv().await.unwrap();
3578 assert_eq!(msg, 60); let msg = receiver.try_recv().unwrap();
3582 assert_eq!(msg, None);
3583 }
3584
3585 #[async_timed_test(timeout_secs = 30)]
3586 async fn test_split_port_timeout_and_size_flush() {
3587 let config = hyperactor_config::global::lock();
3588 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
3589
3590 let Setup {
3591 mut receiver,
3592 actor0: _,
3593 actor1,
3594 port_id: _,
3595 port_id1,
3596 port_id2: _,
3597 port_id2_1: _,
3598 ..
3599 } = setup_split_port_ids(
3600 Some(accum::sum::<u64>().reducer_spec().unwrap()),
3601 Some(ReducerOpts {
3602 max_update_interval: Some(Duration::from_millis(50)),
3603 initial_update_interval: Some(Duration::from_millis(50)),
3604 }),
3605 )
3606 .await;
3607
3608 post(&actor1, port_id1.clone(), 10);
3609 post(&actor1, port_id1.clone(), 20);
3610 post(&actor1, port_id1.clone(), 30);
3611 post(&actor1, port_id1.clone(), 40);
3612
3613 let msg = receiver.recv().await.unwrap();
3615 assert_eq!(msg, 60);
3616
3617 let msg = receiver.recv().await.unwrap();
3619 assert_eq!(msg, 40);
3620
3621 let msg = receiver.try_recv().unwrap();
3623 assert_eq!(msg, None);
3624 }
3625
3626 #[test]
3627 fn test_dial_mailbox_router_prefixes_empty() {
3628 assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
3629 }
3630
3631 #[test]
3632 fn test_dial_mailbox_router_prefixes_single_entry() {
3633 let router = DialMailboxRouter::new();
3634 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3635
3636 let prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3637 assert_eq!(prefixes.len(), 1);
3638 assert_eq!(prefixes[0], id!(world0).into());
3639 }
3640
3641 #[test]
3642 fn test_dial_mailbox_router_prefixes_no_overlap() {
3643 let router = DialMailboxRouter::new();
3644 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3645 router.bind(id!(world1).into(), "unix!@2".parse().unwrap());
3646 router.bind(id!(world2).into(), "unix!@3".parse().unwrap());
3647
3648 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3649 prefixes.sort();
3650
3651 let mut expected = vec![id!(world0).into(), id!(world1).into(), id!(world2).into()];
3652 expected.sort();
3653
3654 assert_eq!(prefixes, expected);
3655 }
3656
3657 #[test]
3658 fn test_dial_mailbox_router_prefixes_with_overlaps() {
3659 let router = DialMailboxRouter::new();
3660 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3661 router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3662 router.bind(id!(world0[1]).into(), "unix!@3".parse().unwrap());
3663 router.bind(id!(world1).into(), "unix!@4".parse().unwrap());
3664 router.bind(id!(world1[0]).into(), "unix!@5".parse().unwrap());
3665
3666 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3667 prefixes.sort();
3668
3669 let mut expected = vec![id!(world0).into(), id!(world1).into()];
3671 expected.sort();
3672
3673 assert_eq!(prefixes, expected);
3674 }
3675
3676 #[test]
3677 fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
3678 let router = DialMailboxRouter::new();
3679 router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3680 router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3681 router.bind(id!(world0[0].actor1).into(), "unix!@3".parse().unwrap());
3682 router.bind(id!(world1[0]).into(), "unix!@4".parse().unwrap());
3683 router.bind(id!(world1[1]).into(), "unix!@5".parse().unwrap());
3684 router.bind(id!(world2[0].actor0).into(), "unix!@6".parse().unwrap());
3685
3686 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3687 prefixes.sort();
3688
3689 let expected = vec![
3695 id!(world0).into(),
3696 id!(world1[0]).into(),
3697 id!(world1[1]).into(),
3698 id!(world2[0].actor0).into(),
3699 ];
3700
3701 assert_eq!(prefixes, expected);
3702 }
3703
3704 #[test]
3705 fn test_dial_mailbox_router_prefixes_same_level() {
3706 let router = DialMailboxRouter::new();
3707 router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
3708 router.bind(id!(world0[1]).into(), "unix!@2".parse().unwrap());
3709 router.bind(id!(world0[2]).into(), "unix!@3".parse().unwrap());
3710
3711 let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3712 prefixes.sort();
3713
3714 let mut expected = vec![
3716 id!(world0[0]).into(),
3717 id!(world0[1]).into(),
3718 id!(world0[2]).into(),
3719 ];
3720 expected.sort();
3721
3722 assert_eq!(prefixes, expected);
3723 }
3724
3725 #[derive(Clone, Debug)]
3729 struct AsyncLoopForwarder;
3730
3731 impl MailboxSender for AsyncLoopForwarder {
3732 fn post_unchecked(
3733 &self,
3734 envelope: MessageEnvelope,
3735 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3736 ) {
3737 let me = self.clone();
3738 tokio::spawn(async move {
3739 me.post(envelope, return_handle);
3741 });
3742 }
3743 }
3744
3745 #[tokio::test]
3746 async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
3747 let actor_id = ActorId(
3748 ProcId::Ranked(id!(test_world), 0),
3749 "ttl_actor".to_string(),
3750 0,
3751 );
3752 let mailbox = Mailbox::new(
3753 actor_id.clone(),
3754 BoxedMailboxSender::new(AsyncLoopForwarder),
3755 );
3756 let (ret_port, mut ret_rx) = mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
3757
3758 let remote_actor = ActorId(
3761 ProcId::Ranked(id!(remote_world), 1),
3762 "remote".to_string(),
3763 0,
3764 );
3765 let dest = PortId(remote_actor.clone(), 4242);
3766
3767 let payload = 1234_u64;
3770 let envelope =
3771 MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Attrs::new())
3772 .expect("serialize");
3773
3774 let return_handle = ret_port.clone();
3777 mailbox.post(envelope, return_handle);
3778
3779 #[allow(clippy::disallowed_methods)]
3781 let Undeliverable(undelivered) =
3782 tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
3783 .await
3784 .expect("timed out waiting for undeliverable")
3785 .expect("channel closed");
3786
3787 let got: u64 = undelivered.deserialized().expect("deserialize");
3789 assert_eq!(got, payload, "payload preserved");
3790 }
3791
3792 #[tokio::test]
3793 async fn message_ttl_success_local_delivery() {
3794 let actor_id = ActorId(
3795 ProcId::Ranked(id!(test_world), 0),
3796 "ttl_actor".to_string(),
3797 0,
3798 );
3799 let mailbox = Mailbox::new(
3800 actor_id.clone(),
3801 BoxedMailboxSender::new(PanickingMailboxSender),
3802 );
3803 let (_undeliverable_tx, mut undeliverable_rx) =
3804 mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
3805
3806 let (user_port, mut user_rx) = mailbox.open_port::<u64>();
3808
3809 let payload = 0xC0FFEE_u64;
3811 let envelope = MessageEnvelope::serialize(
3812 actor_id.clone(),
3813 user_port.bind().port_id().clone(),
3814 &payload,
3815 Attrs::new(),
3816 )
3817 .expect("serialize");
3818
3819 let return_handle = mailbox
3822 .bound_return_handle()
3823 .unwrap_or(monitored_return_handle());
3824 mailbox.post(envelope, return_handle);
3825
3826 #[allow(clippy::disallowed_methods)]
3828 let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
3829 .await
3830 .expect("timed out waiting for local delivery")
3831 .expect("user port closed");
3832 assert_eq!(got, payload);
3833
3834 #[allow(clippy::disallowed_methods)]
3836 let no_undeliverable =
3837 tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
3838 assert!(
3839 no_undeliverable.is_err(),
3840 "unexpected undeliverable returned on successful local delivery"
3841 );
3842 }
3843
3844 #[tokio::test]
3845 async fn test_port_contramap() {
3846 let mbox = Mailbox::new_detached(id!(test[0].test));
3847 let (handle, mut rx) = mbox.open_port();
3848
3849 handle
3850 .contramap(|m| (1, m))
3851 .send("hello".to_string())
3852 .unwrap();
3853 assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
3854 }
3855
3856 #[test]
3857 #[should_panic(expected = "already bound")]
3858 fn test_bind_port_handle_to_actor_port_twice() {
3859 let mbox = Mailbox::new_detached(id!(test[0].test));
3860 let (handle, _rx) = mbox.open_port::<String>();
3861 handle.bind_actor_port();
3862 handle.bind_actor_port();
3863 }
3864
3865 #[test]
3866 fn test_bind_port_handle_to_actor_port() {
3867 let mbox = Mailbox::new_detached(id!(test[0].test));
3868 let default_port = mbox.actor_id().port_id(String::port());
3869 let (handle, _rx) = mbox.open_port::<String>();
3870 assert_ne!(default_port.index(), handle.port_index);
3872 handle.bind_actor_port();
3874 assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
3875 handle.bind();
3877 handle.bind();
3878 assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
3879 }
3880
3881 #[test]
3882 #[should_panic(expected = "already bound")]
3883 fn test_bind_port_handle_to_actor_port_when_already_bound() {
3884 let mbox = Mailbox::new_detached(id!(test[0].test));
3885 let (handle, _rx) = mbox.open_port::<String>();
3886 handle.bind();
3888 assert_matches!(handle.location(), PortLocation::Bound(port) if port.index() == handle.port_index);
3889 handle.bind_actor_port();
3891 }
3892}