1#![allow(dead_code)] use std::any::Any;
69use std::collections::BTreeMap;
70use std::fmt;
71use std::fmt::Debug;
72use std::future::Future;
73use std::ops::Bound::Excluded;
74use std::pin::Pin;
75use std::sync::Arc;
76use std::sync::LazyLock;
77use std::sync::Mutex;
78use std::sync::OnceLock;
79use std::sync::RwLock;
80use std::sync::Weak;
81use std::sync::atomic::AtomicU64;
82use std::sync::atomic::AtomicUsize;
83use std::sync::atomic::Ordering;
84use std::task::Context;
85use std::task::Poll;
86
87use async_trait::async_trait;
88use dashmap::DashMap;
89use dashmap::DashSet;
90use dashmap::mapref::entry::Entry;
91use futures::Sink;
92use futures::Stream;
93use serde::Deserialize;
94use serde::Serialize;
95use serde::de::DeserializeOwned;
96use tokio::sync::mpsc;
97use tokio::sync::oneshot;
98use tokio::sync::watch;
99use tokio::task::JoinHandle;
100use tokio_util::sync::CancellationToken;
101
102use crate as hyperactor; use crate::Named;
104use crate::OncePortRef;
105use crate::PortRef;
106use crate::accum;
107use crate::accum::Accumulator;
108use crate::accum::ReducerSpec;
109use crate::actor::Signal;
110use crate::actor::remote::USER_PORT_OFFSET;
111use crate::attrs::Attrs;
112use crate::cap;
113use crate::cap::CanSend;
114use crate::channel;
115use crate::channel::ChannelAddr;
116use crate::channel::ChannelError;
117use crate::channel::SendError;
118use crate::channel::TxStatus;
119use crate::data::Serialized;
120use crate::id;
121use crate::metrics;
122use crate::reference::ActorId;
123use crate::reference::PortId;
124use crate::reference::Reference;
125
126mod undeliverable;
127pub use undeliverable::Undeliverable;
129pub use undeliverable::UndeliverableMessageError;
130pub use undeliverable::custom_monitored_return_handle;
131pub use undeliverable::monitored_return_handle; pub use undeliverable::supervise_undeliverable_messages;
133pub mod mailbox_admin_message;
135pub use mailbox_admin_message::MailboxAdminMessage;
136pub use mailbox_admin_message::MailboxAdminMessageHandler;
137pub mod durable_mailbox_sender;
139pub use durable_mailbox_sender::log;
140use durable_mailbox_sender::log::*;
141
142pub trait Message: Debug + Send + Sync + 'static {}
145impl<M: Debug + Send + Sync + 'static> Message for M {}
146
147pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
151
152impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
153
154pub type Data = Vec<u8>;
156
157#[derive(
159 thiserror::Error,
160 Debug,
161 Serialize,
162 Deserialize,
163 Named,
164 Clone,
165 PartialEq
166)]
167pub enum DeliveryError {
168 #[error("address not routable: {0}")]
170 Unroutable(String),
171
172 #[error("broken link: {0}")]
175 BrokenLink(String),
176
177 #[error("mailbox error: {0}")]
179 Mailbox(String),
180}
181
182#[derive(Debug, Serialize, Deserialize, Clone, Named)]
186pub struct MessageEnvelope {
187 sender: ActorId,
189
190 dest: PortId,
192
193 data: Serialized,
195
196 error: Option<DeliveryError>,
198
199 headers: Attrs,
201 }
203
204impl MessageEnvelope {
205 pub fn new(sender: ActorId, dest: PortId, data: Serialized, headers: Attrs) -> Self {
207 Self {
208 sender,
209 dest,
210 data,
211 error: None,
212 headers,
213 }
214 }
215
216 pub(crate) fn new_unknown(dest: PortId, data: Serialized) -> Self {
218 Self::new(id!(unknown[0].unknown), dest, data, Attrs::new())
219 }
220
221 pub fn serialize<T: Serialize + Named>(
223 source: ActorId,
224 dest: PortId,
225 value: &T,
226 headers: Attrs,
227 ) -> Result<Self, bincode::Error> {
228 Ok(Self {
229 headers,
230 data: Serialized::serialize(value)?,
231 sender: source,
232 dest,
233 error: None,
234 })
235 }
236
237 pub fn deserialized<T: DeserializeOwned>(&self) -> Result<T, anyhow::Error> {
239 self.data.deserialized()
240 }
241
242 pub fn data(&self) -> &Serialized {
244 &self.data
245 }
246
247 pub fn sender(&self) -> &ActorId {
249 &self.sender
250 }
251
252 pub fn dest(&self) -> &PortId {
254 &self.dest
255 }
256
257 pub fn headers(&self) -> &Attrs {
259 &self.headers
260 }
261
262 pub fn is_signal(&self) -> bool {
264 self.dest.index() == Signal::port()
265 }
266
267 pub fn try_set_error(&mut self, error: DeliveryError) {
270 if self.error.is_none() {
271 self.error = Some(error);
272 }
273 }
274
275 pub fn undeliverable(
279 mut self,
280 error: DeliveryError,
281 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
282 ) {
283 metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
284 1,
285 hyperactor_telemetry::kv_pairs!(
286 "actor_id" => self.sender.to_string(),
287 "dest_actor_id" => self.dest.0.to_string(),
288 "message_type" => self.data.typename().unwrap_or("unknown"),
289 "error_type" => error.to_string(),
290 ),
291 );
292
293 self.try_set_error(error);
294 undeliverable::return_undeliverable(return_handle, self);
295 }
296
297 pub fn error(&self) -> Option<&DeliveryError> {
300 self.error.as_ref()
301 }
302
303 fn open(self) -> (MessageMetadata, Serialized) {
304 let Self {
305 sender,
306 dest,
307 data,
308 error,
309 headers,
310 } = self;
311
312 (
313 MessageMetadata {
314 sender,
315 dest,
316 error,
317 headers,
318 },
319 data,
320 )
321 }
322
323 fn seal(metadata: MessageMetadata, data: Serialized) -> Self {
324 let MessageMetadata {
325 sender,
326 dest,
327 error,
328 headers,
329 } = metadata;
330
331 Self {
332 sender,
333 dest,
334 data,
335 error,
336 headers,
337 }
338 }
339}
340
341impl fmt::Display for MessageEnvelope {
342 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343 match &self.error {
344 None => write!(f, "{} > {}: {}", self.sender, self.dest, self.data),
345 Some(err) => write!(
346 f,
347 "{} > {}: {}: delivery error: {}",
348 self.sender, self.dest, self.data, err
349 ),
350 }
351 }
352}
353
354#[derive(Clone)]
356pub struct MessageMetadata {
357 sender: ActorId,
358 dest: PortId,
359 error: Option<DeliveryError>,
360 headers: Attrs,
361}
362
363#[derive(Debug)]
366pub struct MailboxError {
367 actor_id: ActorId,
368 kind: MailboxErrorKind,
369}
370
371#[derive(thiserror::Error, Debug)]
374#[non_exhaustive]
375pub enum MailboxErrorKind {
376 #[error("mailbox closed")]
378 Closed,
379
380 #[error("invalid port: {0}")]
382 InvalidPort(PortId),
383
384 #[error("no sender for port: {0}")]
386 NoSenderForPort(PortId),
387
388 #[error("no local sender for port: {0}")]
391 NoLocalSenderForPort(PortId),
392
393 #[error("{0}: port closed")]
395 PortClosed(PortId),
396
397 #[error("send {0}: {1}")]
399 Send(PortId, #[source] anyhow::Error),
400
401 #[error("recv {0}: {1}")]
403 Recv(PortId, #[source] anyhow::Error),
404
405 #[error("serialize: {0}")]
407 Serialize(#[source] anyhow::Error),
408
409 #[error("deserialize {0}: {1}")]
411 Deserialize(&'static str, anyhow::Error),
412
413 #[error(transparent)]
415 Channel(#[from] ChannelError),
416}
417
418impl MailboxError {
419 pub fn new(actor_id: ActorId, kind: MailboxErrorKind) -> Self {
422 Self { actor_id, kind }
423 }
424
425 pub fn actor_id(&self) -> &ActorId {
427 &self.actor_id
428 }
429
430 pub fn kind(&self) -> &MailboxErrorKind {
432 &self.kind
433 }
434}
435
436impl fmt::Display for MailboxError {
437 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438 write!(f, "{}: ", self.actor_id)?;
439 fmt::Display::fmt(&self.kind, f)
440 }
441}
442
443impl std::error::Error for MailboxError {
444 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
445 self.kind.source()
446 }
447}
448
449#[derive(Debug, Clone)]
453pub enum PortLocation {
454 Bound(PortId),
456 Unbound(ActorId, &'static str),
458}
459
460impl PortLocation {
461 fn new_unbound<M: Message>(actor_id: ActorId) -> Self {
462 PortLocation::Unbound(actor_id, std::any::type_name::<M>())
463 }
464
465 fn new_unbound_type(actor_id: ActorId, ty: &'static str) -> Self {
466 PortLocation::Unbound(actor_id, ty)
467 }
468
469 pub fn actor_id(&self) -> &ActorId {
471 match self {
472 PortLocation::Bound(port_id) => port_id.actor_id(),
473 PortLocation::Unbound(actor_id, _) => actor_id,
474 }
475 }
476}
477
478impl fmt::Display for PortLocation {
479 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
480 match self {
481 PortLocation::Bound(port_id) => write!(f, "{}", port_id),
482 PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
483 }
484 }
485}
486
487#[derive(Debug)]
490pub struct MailboxSenderError {
491 location: PortLocation,
492 kind: MailboxSenderErrorKind,
493}
494
495#[derive(thiserror::Error, Debug)]
497pub enum MailboxSenderErrorKind {
498 #[error("serialization error: {0}")]
500 Serialize(anyhow::Error),
501
502 #[error("deserialization error for type {0}: {1}")]
504 Deserialize(&'static str, anyhow::Error),
505
506 #[error("invalid port")]
508 Invalid,
509
510 #[error("port closed")]
512 Closed,
513
514 #[error(transparent)]
517 Mailbox(#[from] MailboxError),
518
519 #[error(transparent)]
521 Channel(#[from] ChannelError),
522
523 #[error(transparent)]
525 MessageLog(#[from] MessageLogError),
526
527 #[error("send error: {0}")]
529 Other(#[from] anyhow::Error),
530
531 #[error("unreachable: {0}")]
533 Unreachable(anyhow::Error),
534}
535
536impl MailboxSenderError {
537 pub fn new_unbound<M>(actor_id: ActorId, kind: MailboxSenderErrorKind) -> Self {
539 Self {
540 location: PortLocation::Unbound(actor_id, std::any::type_name::<M>()),
541 kind,
542 }
543 }
544
545 pub fn new_unbound_type(
547 actor_id: ActorId,
548 kind: MailboxSenderErrorKind,
549 ty: &'static str,
550 ) -> Self {
551 Self {
552 location: PortLocation::Unbound(actor_id, ty),
553 kind,
554 }
555 }
556
557 pub fn new_bound(port_id: PortId, kind: MailboxSenderErrorKind) -> Self {
559 Self {
560 location: PortLocation::Bound(port_id),
561 kind,
562 }
563 }
564
565 pub fn location(&self) -> &PortLocation {
567 &self.location
568 }
569
570 pub fn kind(&self) -> &MailboxSenderErrorKind {
572 &self.kind
573 }
574}
575
576impl fmt::Display for MailboxSenderError {
577 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578 write!(f, "{}: ", self.location)?;
579 fmt::Display::fmt(&self.kind, f)
580 }
581}
582
583impl std::error::Error for MailboxSenderError {
584 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
585 self.kind.source()
586 }
587}
588
589pub trait MailboxSender: Send + Sync + Debug + Any {
592 fn post(
596 &self,
597 envelope: MessageEnvelope,
598 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
599 );
600}
601
602pub trait PortSender: MailboxSender {
608 #[allow(clippy::result_large_err)] fn serialize_and_send<M: RemoteMessage>(
611 &self,
612 port: &PortRef<M>,
613 message: M,
614 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
615 ) -> Result<(), MailboxSenderError> {
616 let serialized = Serialized::serialize(&message).map_err(|err| {
618 MailboxSenderError::new_bound(
619 port.port_id().clone(),
620 MailboxSenderErrorKind::Serialize(err.into()),
621 )
622 })?;
623 self.post(
624 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
625 return_handle,
626 );
627 Ok(())
628 }
629
630 #[allow(clippy::result_large_err)] fn serialize_and_send_once<M: RemoteMessage>(
634 &self,
635 once_port: OncePortRef<M>,
636 message: M,
637 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
638 ) -> Result<(), MailboxSenderError> {
639 let serialized = Serialized::serialize(&message).map_err(|err| {
640 MailboxSenderError::new_bound(
641 once_port.port_id().clone(),
642 MailboxSenderErrorKind::Serialize(err.into()),
643 )
644 })?;
645 self.post(
646 MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
647 return_handle,
648 );
649 Ok(())
650 }
651}
652
653impl<T: ?Sized + MailboxSender> PortSender for T {}
654
655#[derive(Debug, Clone)]
659pub struct PanickingMailboxSender;
660
661impl MailboxSender for PanickingMailboxSender {
662 fn post(
663 &self,
664 envelope: MessageEnvelope,
665 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
666 ) {
667 panic!("panic! in the mailbox! attempted post: {}", envelope)
668 }
669}
670
671#[derive(Debug)]
674pub struct UndeliverableMailboxSender;
675
676impl MailboxSender for UndeliverableMailboxSender {
677 fn post(
678 &self,
679 envelope: MessageEnvelope,
680 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
681 ) {
682 tracing::error!("message not delivered: {}", envelope);
683 }
684}
685
686#[derive(Debug)]
687struct Buffer<T: Message> {
688 queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
689 processed: watch::Receiver<usize>,
690 seq: AtomicUsize,
691}
692
693impl<T: Message> Buffer<T> {
694 fn new<Fut>(
695 process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
696 ) -> Self
697 where
698 Fut: Future<Output = ()> + Send + 'static,
699 {
700 let (queue, mut next) = mpsc::unbounded_channel();
701 let (last_processed, processed) = watch::channel(0);
702 crate::init::get_runtime().spawn(async move {
703 let mut seq = 0;
704 while let Some((msg, return_handle)) = next.recv().await {
705 process(msg, return_handle).await;
706 seq += 1;
707 let _ = last_processed.send(seq);
708 }
709 });
710 Self {
711 queue,
712 processed,
713 seq: AtomicUsize::new(0),
714 }
715 }
716
717 fn send(
718 &self,
719 item: (T, PortHandle<Undeliverable<T>>),
720 ) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
721 self.seq.fetch_add(1, Ordering::SeqCst);
722 self.queue.send(item)?;
723 Ok(())
724 }
725
726 async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
727 let seq = self.seq.load(Ordering::SeqCst);
728 while *self.processed.borrow_and_update() < seq {
729 self.processed.changed().await?;
730 }
731 Ok(())
732 }
733}
734
735static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
736 LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
737
738#[derive(Debug, Clone)]
744pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
745
746impl BoxedMailboxSender {
747 pub fn new(sender: impl MailboxSender + 'static) -> Self {
749 Self(Arc::new(sender))
750 }
751
752 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
755 (&*self.0 as &dyn Any).downcast_ref::<T>()
756 }
757}
758
759pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
761 fn boxed(&self) -> BoxedMailboxSender;
763}
764impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
765 fn boxed(&self) -> BoxedMailboxSender {
766 BoxedMailboxSender::new(self.clone())
767 }
768}
769
770pub trait IntoBoxedMailboxSender: MailboxSender {
772 fn into_boxed(self) -> BoxedMailboxSender;
774}
775impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
776 fn into_boxed(self) -> BoxedMailboxSender {
777 BoxedMailboxSender::new(self)
778 }
779}
780
781impl MailboxSender for BoxedMailboxSender {
782 fn post(
783 &self,
784 envelope: MessageEnvelope,
785 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
786 ) {
787 metrics::MAILBOX_POSTS.add(
788 1,
789 hyperactor_telemetry::kv_pairs!(
790 "actor_id" => envelope.sender.to_string(),
791 "dest_actor_id" => envelope.dest.0.to_string(),
792 ),
793 );
794 self.0.post(envelope, return_handle);
795 }
796}
797
798#[derive(thiserror::Error, Debug)]
800pub enum MailboxServerError {
801 #[error(transparent)]
803 Channel(#[from] ChannelError),
804
805 #[error(transparent)]
807 MailboxSender(#[from] MailboxSenderError),
808}
809
810#[derive(Debug)]
813pub struct MailboxServerHandle {
814 join_handle: JoinHandle<Result<(), MailboxServerError>>,
815 stopped_tx: watch::Sender<bool>,
816}
817
818impl MailboxServerHandle {
819 pub fn stop(&self, reason: &str) {
824 tracing::info!("stopping mailbox server; reason: {}", reason);
825 self.stopped_tx.send(true).expect("stop called twice");
826 }
827}
828
829impl Future for MailboxServerHandle {
831 type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
832
833 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
834 let join_handle_pinned =
836 unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
837 join_handle_pinned.poll(cx)
838 }
839}
840
841fn server_return_handle<T: MailboxServer>(server: T) -> PortHandle<Undeliverable<MessageEnvelope>> {
846 let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
847
848 tokio::task::spawn(async move {
849 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
850 if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
851 {
852 UndeliverableMailboxSender.post(e, monitored_return_handle());
854 continue;
855 }
856 envelope.try_set_error(DeliveryError::BrokenLink(
857 "message was undeliverable".to_owned(),
858 ));
859 server.post(
860 MessageEnvelope::new(
861 envelope.sender().clone(),
862 PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
863 envelope.sender(),
864 )
865 .port_id()
866 .clone(),
867 Serialized::serialize(&Undeliverable(envelope)).unwrap(),
868 Attrs::new(),
869 ),
870 monitored_return_handle(),
871 );
872 }
873 });
874
875 return_handle
876}
877
878pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
881 fn serve(
885 self,
886 mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
887 ) -> MailboxServerHandle {
888 let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
893 let server = self.clone();
894 tokio::task::spawn(async move {
895 while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
896 if let Ok(Undeliverable(e)) =
897 envelope.deserialized::<Undeliverable<MessageEnvelope>>()
898 {
899 UndeliverableMailboxSender.post(e, monitored_return_handle());
901 continue;
902 }
903 envelope.try_set_error(DeliveryError::BrokenLink(
904 "message was undeliverable".to_owned(),
905 ));
906 server.post(
907 MessageEnvelope::new(
908 envelope.sender().clone(),
909 PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
910 envelope.sender(),
911 )
912 .port_id()
913 .clone(),
914 Serialized::serialize(&Undeliverable(envelope)).unwrap(),
915 Attrs::new(),
916 ),
917 monitored_return_handle(),
918 );
919 }
920 });
921
922 let (stopped_tx, mut stopped_rx) = watch::channel(false);
923 let join_handle = tokio::spawn(async move {
924 let mut detached = false;
925
926 loop {
927 if *stopped_rx.borrow_and_update() {
928 break Ok(());
929 }
930
931 tokio::select! {
932 message = rx.recv() => {
933 match message {
934 Ok(envelope) => self.post(envelope, return_handle.clone()),
936
937 Err(ChannelError::Closed) => break Ok(()),
940 Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
941 }
942 }
943 result = stopped_rx.changed(), if !detached => {
944 tracing::debug!(
945 "the mailbox server is stopped"
946 );
947 detached = result.is_err();
948 }
949 }
950 }
951 });
952
953 MailboxServerHandle {
954 join_handle,
955 stopped_tx,
956 }
957 }
958}
959
960impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
961
962#[derive(Debug)]
964pub struct MailboxClient {
965 buffer: Buffer<MessageEnvelope>,
967
968 _tx_monitoring: CancellationToken,
970}
971
972impl MailboxClient {
973 pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
976 let addr = tx.addr();
977 let tx = Arc::new(tx);
978 let tx_status = tx.status().clone();
979 let tx_monitoring = CancellationToken::new();
980 let buffer = Buffer::new(move |envelope, return_handle| {
981 let tx = Arc::clone(&tx);
982 let (return_channel, return_receiver) = oneshot::channel();
983 let return_handle_0 = return_handle.clone();
985 tokio::spawn(async move {
986 let result = return_receiver.await;
987 if let Ok(message) = result {
988 let _ = return_handle_0.send(Undeliverable(message));
989 } else {
990 }
992 });
993 let return_handle_1 = return_handle.clone();
995 async move {
996 if let Err(SendError(_, envelope)) = tx.try_post(envelope, return_channel) {
997 envelope.undeliverable(
999 DeliveryError::BrokenLink("failed to enqueue in MailboxClient".to_string()),
1000 return_handle_1.clone(),
1001 );
1002 }
1003 }
1004 });
1005 let this = Self {
1006 buffer,
1007 _tx_monitoring: tx_monitoring.clone(),
1008 };
1009 Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1010 this
1011 }
1012
1013 fn monitor_tx_health(
1015 mut rx: watch::Receiver<TxStatus>,
1016 cancel_token: CancellationToken,
1017 addr: ChannelAddr,
1018 ) {
1019 crate::init::get_runtime().spawn(async move {
1020 loop {
1021 tokio::select! {
1022 changed = rx.changed() => {
1023 if changed.is_err() || *rx.borrow() == TxStatus::Closed {
1024 tracing::warn!("connection to {} lost", addr);
1025 break;
1028 }
1029 }
1030 _ = cancel_token.cancelled() => {
1031 break;
1032 }
1033 }
1034 }
1035 });
1036 }
1037}
1038
1039impl MailboxSender for MailboxClient {
1040 fn post(
1041 &self,
1042 envelope: MessageEnvelope,
1043 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1044 ) {
1045 tracing::event!(target:"messages", tracing::Level::DEBUG, "crc"=envelope.data.crc(), "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1047
1048 if let Err(mpsc::error::SendError((envelope, return_handle))) =
1049 self.buffer.send((envelope, return_handle))
1050 {
1051 let err = DeliveryError::BrokenLink("failed to enqueue in MailboxClient".to_string());
1052 metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
1053 1,
1054 hyperactor_telemetry::kv_pairs!(
1055 "actor_id" => envelope.sender.to_string(),
1056 "dest_actor_id" => envelope.dest.0.to_string(),
1057 "message_type" => envelope.data.typename().unwrap_or("unknown"),
1058 "reason" => err.to_string(),
1059 ),
1060 );
1061
1062 envelope.undeliverable(err, return_handle);
1064 }
1065 }
1066}
1067
1068pub struct PortSink<C: CanSend, M: RemoteMessage> {
1070 caps: C,
1071 port: PortRef<M>,
1072}
1073
1074impl<C: CanSend, M: RemoteMessage> PortSink<C, M> {
1075 pub fn new(caps: C, port: PortRef<M>) -> Self {
1077 Self { caps, port }
1078 }
1079}
1080
1081impl<C: CanSend, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1082 type Error = MailboxSenderError;
1083
1084 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1085 Poll::Ready(Ok(()))
1086 }
1087
1088 fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1089 self.port.send(&self.caps, item)
1090 }
1091
1092 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1093 Poll::Ready(Ok(()))
1094 }
1095
1096 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1097 Poll::Ready(Ok(()))
1098 }
1099}
1100
1101#[derive(Clone, Debug)]
1104pub struct Mailbox {
1105 inner: Arc<State>,
1106}
1107
1108impl Mailbox {
1109 pub fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
1112 Self {
1113 inner: Arc::new(State::new(actor_id, forwarder)),
1114 }
1115 }
1116
1117 pub fn new_detached(actor_id: ActorId) -> Self {
1119 Self {
1120 inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1121 }
1122 }
1123
1124 pub fn actor_id(&self) -> &ActorId {
1126 &self.inner.actor_id
1127 }
1128
1129 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1134 let port_index = self.inner.allocate_port();
1135 let (sender, receiver) = mpsc::unbounded_channel::<M>();
1136 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1137 tracing::trace!(
1138 name = "open_port",
1139 "opening port for {} at {}",
1140 self.inner.actor_id,
1141 port_id
1142 );
1143 (
1144 PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1145 PortReceiver::new(receiver, port_id, false, self.clone()),
1146 )
1147 }
1148
1149 pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1155 where
1156 A: Accumulator + Send + Sync + 'static,
1157 A::Update: Message,
1158 A::State: Message + Default + Clone,
1159 {
1160 let port_index = self.inner.allocate_port();
1161 let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1162 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1163 let state = Mutex::new(A::State::default());
1164 let reducer_spec = accum.reducer_spec();
1165 let enqueue = move |_, update: A::Update| {
1166 let mut state = state.lock().unwrap();
1167 accum.accumulate(&mut state, update)?;
1168 let _ = sender.send(state.clone());
1169 Ok(())
1170 };
1171 (
1172 PortHandle {
1173 mailbox: self.clone(),
1174 port_index,
1175 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1176 bound: Arc::new(OnceLock::new()),
1177 reducer_spec,
1178 },
1179 PortReceiver::new(receiver, port_id, true, self.clone()),
1180 )
1181 }
1182
1183 pub(crate) fn open_enqueue_port<M: Message>(
1187 &self,
1188 enqueue: impl Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1189 ) -> PortHandle<M> {
1190 PortHandle {
1191 mailbox: self.clone(),
1192 port_index: self.inner.allocate_port(),
1193 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1194 bound: Arc::new(OnceLock::new()),
1195 reducer_spec: None,
1196 }
1197 }
1198
1199 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1203 let port_index = self.inner.allocate_port();
1204 let port_id = PortId(self.inner.actor_id.clone(), port_index);
1205 let (sender, receiver) = oneshot::channel::<M>();
1206 (
1207 OncePortHandle {
1208 mailbox: self.clone(),
1209 port_index,
1210 port_id: port_id.clone(),
1211 sender,
1212 },
1213 OncePortReceiver {
1214 receiver: Some(receiver),
1215 port_id,
1216 mailbox: self.clone(),
1217 },
1218 )
1219 }
1220
1221 fn error(&self, err: MailboxErrorKind) -> MailboxError {
1222 MailboxError::new(self.inner.actor_id.clone(), err)
1223 }
1224
1225 fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1226 let port_index = M::port();
1227 self.inner.ports.get(&port_index).and_then(|boxed| {
1228 boxed
1229 .as_any()
1230 .downcast_ref::<UnboundedSender<M>>()
1231 .map(|s| {
1232 assert_eq!(
1233 s.port_id,
1234 self.actor_id().port_id(port_index),
1235 "port_id mismatch in downcasted UnboundedSender"
1236 );
1237 s.sender.clone()
1238 })
1239 })
1240 }
1241
1242 pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1244 self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1245 .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1246 }
1247
1248 fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
1249 assert_eq!(
1250 handle.mailbox.actor_id(),
1251 self.actor_id(),
1252 "port does not belong to mailbox"
1253 );
1254
1255 let port_id = self.actor_id().port_id(handle.port_index);
1258 match self.inner.ports.entry(handle.port_index) {
1259 Entry::Vacant(entry) => {
1260 entry.insert(Box::new(UnboundedSender::new(
1261 handle.sender.clone(),
1262 port_id.clone(),
1263 )));
1264 }
1265 Entry::Occupied(_entry) => {}
1266 }
1267
1268 PortRef::attest(port_id)
1269 }
1270
1271 fn bind_to<M: RemoteMessage>(&self, handle: &PortHandle<M>, port_index: u64) {
1272 assert_eq!(
1273 handle.mailbox.actor_id(),
1274 self.actor_id(),
1275 "port does not belong to mailbox"
1276 );
1277
1278 let port_id = self.actor_id().port_id(port_index);
1279 match self.inner.ports.entry(port_index) {
1280 Entry::Vacant(entry) => {
1281 entry.insert(Box::new(UnboundedSender::new(
1282 handle.sender.clone(),
1283 port_id,
1284 )));
1285 }
1286 Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1287 }
1288 }
1289
1290 fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1291 let port_id = handle.port_id().clone();
1292 match self.inner.ports.entry(handle.port_index) {
1293 Entry::Vacant(entry) => {
1294 entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1295 }
1296 Entry::Occupied(_entry) => {}
1297 }
1298 }
1299
1300 fn bind_untyped(&self, port_id: &PortId, sender: UntypedUnboundedSender) {
1301 assert_eq!(
1302 port_id.actor_id(),
1303 self.actor_id(),
1304 "port does not belong to mailbox"
1305 );
1306
1307 match self.inner.ports.entry(port_id.index()) {
1308 Entry::Vacant(entry) => {
1309 entry.insert(Box::new(sender));
1310 }
1311 Entry::Occupied(_entry) => {}
1312 }
1313 }
1314}
1315
1316pub fn open_port<M: Message>(caps: &impl cap::CanOpenPort) -> (PortHandle<M>, PortReceiver<M>) {
1321 caps.mailbox().open_port()
1322}
1323
1324pub fn open_once_port<M: Message>(
1327 caps: &impl cap::CanOpenPort,
1328) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1329 caps.mailbox().open_once_port()
1330}
1331
1332impl MailboxSender for Mailbox {
1333 fn post(
1336 &self,
1337 envelope: MessageEnvelope,
1338 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1339 ) {
1340 tracing::trace!(name = "post", "posting message to {}", envelope.dest);
1341
1342 if envelope.dest().actor_id() != &self.inner.actor_id {
1343 return self.inner.forwarder.post(envelope, return_handle);
1344 }
1345
1346 match self.inner.ports.entry(envelope.dest().index()) {
1347 Entry::Vacant(_) => {
1348 let err = DeliveryError::Unroutable("port not bound in mailbox".to_string());
1349 metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
1350 1,
1351 hyperactor_telemetry::kv_pairs!(
1352 "actor_id" => envelope.sender.to_string(),
1353 "dest_actor_id" => envelope.dest.0.to_string(),
1354 "message_type" => envelope.data.typename().unwrap_or("unknown"),
1355 "reason" => err.to_string(),
1356 ),
1357 );
1358
1359 envelope.undeliverable(err, return_handle);
1360 }
1361 Entry::Occupied(entry) => {
1362 let (metadata, data) = envelope.open();
1363 let MessageMetadata {
1364 headers,
1365 sender,
1366 dest,
1367 error: metadata_error,
1368 } = metadata;
1369
1370 match entry.get().send_serialized(headers, data) {
1378 Ok(false) => {
1379 entry.remove();
1380 }
1381 Ok(true) => (),
1382 Err(SerializedSenderError {
1383 data,
1384 error: sender_error,
1385 headers,
1386 }) => {
1387 let err = DeliveryError::Mailbox(format!("{}", sender_error));
1388 metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
1389 1,
1390 hyperactor_telemetry::kv_pairs!(
1391 "actor_id" => sender.to_string(),
1392 "dest_actor_id" => dest.0.to_string(),
1393 "message_type" => data.typename().unwrap_or("unknown"),
1394 "reason" => err.to_string(),
1395 ),
1396 );
1397
1398 MessageEnvelope::seal(
1399 MessageMetadata {
1400 headers,
1401 sender,
1402 dest,
1403 error: metadata_error,
1404 },
1405 data,
1406 )
1407 .undeliverable(err, return_handle)
1408 }
1409 }
1410 }
1411 }
1412 }
1413}
1414
1415static CAN_SEND_WARNED_MAILBOXES: OnceLock<DashSet<ActorId>> = OnceLock::new();
1420
1421impl cap::sealed::CanSend for Mailbox {
1422 fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1423 let return_handle = self.bound_return_handle().unwrap_or_else(|| {
1424 let actor_id = self.actor_id();
1425 if CAN_SEND_WARNED_MAILBOXES
1426 .get_or_init(DashSet::new)
1427 .insert(actor_id.clone())
1428 {
1429 let bt = std::backtrace::Backtrace::force_capture();
1430 tracing::warn!(
1431 actor_id = ?actor_id,
1432 backtrace = ?bt,
1433 "mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
1434 );
1435 }
1436 monitored_return_handle()
1437 });
1438
1439 let envelope = MessageEnvelope::new(self.actor_id().clone(), dest, data, headers);
1440 MailboxSender::post(self, envelope, return_handle);
1441 }
1442 fn actor_id(&self) -> &ActorId {
1443 self.actor_id()
1444 }
1445}
1446impl cap::sealed::CanSend for &Mailbox {
1447 fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1448 cap::sealed::CanSend::post(*self, dest, headers, data)
1449 }
1450 fn actor_id(&self) -> &ActorId {
1451 (**self).actor_id()
1452 }
1453}
1454
1455impl cap::sealed::CanOpenPort for &Mailbox {
1456 fn mailbox(&self) -> &Mailbox {
1457 self
1458 }
1459}
1460
1461impl cap::sealed::CanOpenPort for Mailbox {
1462 fn mailbox(&self) -> &Mailbox {
1463 self
1464 }
1465}
1466
1467#[derive(Default)]
1468struct SplitPortBuffer(Vec<Serialized>);
1469
1470impl SplitPortBuffer {
1471 fn push(&mut self, serialized: Serialized) -> Option<Vec<Serialized>> {
1474 let limit = crate::config::global::get(crate::config::SPLIT_MAX_BUFFER_SIZE);
1475
1476 self.0.push(serialized);
1477 if self.0.len() >= limit {
1478 Some(std::mem::take(&mut self.0))
1479 } else {
1480 None
1481 }
1482 }
1483}
1484
1485impl cap::sealed::CanSplitPort for Mailbox {
1486 fn split(&self, port_id: PortId, reducer_spec: Option<ReducerSpec>) -> anyhow::Result<PortId> {
1487 fn post(mailbox: &Mailbox, port_id: PortId, msg: Serialized) {
1488 mailbox.post(
1489 MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg, Attrs::new()),
1490 monitored_return_handle(),
1495 );
1496 }
1497
1498 let port_index = self.inner.allocate_port();
1499 let split_port = self.actor_id().port_id(port_index);
1500 let mailbox = self.clone();
1501 let reducer = reducer_spec
1502 .map(
1503 |ReducerSpec {
1504 typehash,
1505 builder_params,
1506 }| { accum::resolve_reducer(typehash, builder_params) },
1507 )
1508 .transpose()?
1509 .flatten();
1510 let enqueue: Box<
1511 dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync,
1512 > = match reducer {
1513 None => Box::new(move |serialized: Serialized| {
1514 post(&mailbox, port_id.clone(), serialized);
1515 Ok(())
1516 }),
1517 Some(r) => {
1518 let buffer = Mutex::new(SplitPortBuffer::default());
1519 Box::new(move |serialized: Serialized| {
1520 let mut buf = buffer.lock().unwrap();
1524 if let Some(buffered) = buf.push(serialized) {
1525 let reduced = r.reduce_updates(buffered).map_err(|(e, mut b)| {
1526 (
1527 b.pop()
1528 .expect("there should be at least one update from buffer"),
1529 e,
1530 )
1531 })?;
1532 post(&mailbox, port_id.clone(), reduced);
1533 }
1534 Ok(())
1535 })
1536 }
1537 };
1538 self.bind_untyped(
1539 &split_port,
1540 UntypedUnboundedSender {
1541 sender: enqueue,
1542 port_id: split_port.clone(),
1543 },
1544 );
1545 Ok(split_port)
1546 }
1547}
1548
1549#[derive(Debug)]
1557pub struct PortHandle<M: Message> {
1558 mailbox: Mailbox,
1559 port_index: u64,
1560 sender: UnboundedPortSender<M>,
1561 bound: Arc<OnceLock<PortId>>,
1568 reducer_spec: Option<ReducerSpec>,
1571}
1572
1573impl<M: Message> PortHandle<M> {
1574 fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1575 Self {
1576 mailbox,
1577 port_index,
1578 sender,
1579 bound: Arc::new(OnceLock::new()),
1580 reducer_spec: None,
1581 }
1582 }
1583
1584 fn location(&self) -> PortLocation {
1585 match self.bound.get() {
1586 Some(port_id) => PortLocation::Bound(port_id.clone()),
1587 None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1588 }
1589 }
1590
1591 #[allow(clippy::result_large_err)] pub fn send(&self, message: M) -> Result<(), MailboxSenderError> {
1594 self.sender.send(Attrs::new(), message).map_err(|err| {
1595 MailboxSenderError::new_unbound::<M>(
1596 self.mailbox.actor_id().clone(),
1597 MailboxSenderErrorKind::Other(err),
1598 )
1599 })
1600 }
1601}
1602
1603impl<M: RemoteMessage> PortHandle<M> {
1604 pub fn bind(&self) -> PortRef<M> {
1606 PortRef::attest_reducible(
1607 self.bound
1608 .get_or_init(|| self.mailbox.bind(self).port_id().clone())
1609 .clone(),
1610 self.reducer_spec.clone(),
1611 )
1612 }
1613
1614 pub fn bind_to(&self, port_index: u64) {
1617 self.mailbox.bind_to(self, port_index);
1618 }
1619}
1620
1621impl<M: Message> Clone for PortHandle<M> {
1622 fn clone(&self) -> Self {
1623 Self {
1624 mailbox: self.mailbox.clone(),
1625 port_index: self.port_index,
1626 sender: self.sender.clone(),
1627 bound: self.bound.clone(),
1628 reducer_spec: self.reducer_spec.clone(),
1629 }
1630 }
1631}
1632
1633impl<M: Message> fmt::Display for PortHandle<M> {
1634 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1635 fmt::Display::fmt(&self.location(), f)
1636 }
1637}
1638
1639#[derive(Debug)]
1641pub struct OncePortHandle<M: Message> {
1642 mailbox: Mailbox,
1643 port_index: u64,
1644 port_id: PortId,
1645 sender: oneshot::Sender<M>,
1646}
1647
1648impl<M: Message> OncePortHandle<M> {
1649 pub fn port_id(&self) -> &PortId {
1652 &self.port_id
1653 }
1654
1655 #[allow(clippy::result_large_err)] pub fn send(self, message: M) -> Result<(), MailboxSenderError> {
1659 let actor_id = self.mailbox.actor_id().clone();
1660 self.sender.send(message).map_err(|_| {
1661 MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1666 })?;
1667 Ok(())
1668 }
1669}
1670
1671impl<M: RemoteMessage> OncePortHandle<M> {
1672 pub fn bind(self) -> OncePortRef<M> {
1677 let port_id = self.port_id().clone();
1678 self.mailbox.clone().bind_once(self);
1679 OncePortRef::attest(port_id)
1680 }
1681}
1682
1683impl<M: Message> fmt::Display for OncePortHandle<M> {
1684 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1685 fmt::Display::fmt(&self.port_id(), f)
1686 }
1687}
1688
1689#[derive(Debug)]
1692pub struct PortReceiver<M> {
1693 receiver: mpsc::UnboundedReceiver<M>,
1694 port_id: PortId,
1695 coalesce: bool,
1698 mailbox: Mailbox,
1701}
1702
1703impl<M> PortReceiver<M> {
1704 fn new(
1705 receiver: mpsc::UnboundedReceiver<M>,
1706 port_id: PortId,
1707 coalesce: bool,
1708 mailbox: Mailbox,
1709 ) -> Self {
1710 Self {
1711 receiver,
1712 port_id,
1713 coalesce,
1714 mailbox,
1715 }
1716 }
1717
1718 #[allow(clippy::result_large_err)] pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
1723 let mut next = self.receiver.try_recv();
1724 if self.coalesce {
1726 if let Some(latest) = self.drain().pop() {
1727 next = Ok(latest);
1728 }
1729 }
1730 match next {
1731 Ok(msg) => Ok(Some(msg)),
1732 Err(mpsc::error::TryRecvError::Empty) => Ok(None),
1733 Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
1734 self.actor_id().clone(),
1735 MailboxErrorKind::Closed,
1736 )),
1737 }
1738 }
1739
1740 pub async fn recv(&mut self) -> Result<M, MailboxError> {
1743 let mut next = self.receiver.recv().await;
1744 if self.coalesce {
1747 if let Some(latest) = self.drain().pop() {
1748 next = Some(latest);
1749 }
1750 }
1751 next.ok_or(MailboxError::new(
1752 self.actor_id().clone(),
1753 MailboxErrorKind::Closed,
1754 ))
1755 }
1756
1757 pub fn drain(&mut self) -> Vec<M> {
1759 let mut drained: Vec<M> = Vec::new();
1760 while let Ok(msg) = self.receiver.try_recv() {
1761 if self.coalesce {
1763 drained.pop();
1764 }
1765 drained.push(msg);
1766 }
1767 drained
1768 }
1769
1770 fn port(&self) -> u64 {
1771 self.port_id.1
1772 }
1773
1774 fn actor_id(&self) -> &ActorId {
1775 &self.port_id.0
1776 }
1777}
1778
1779impl<M> Drop for PortReceiver<M> {
1780 fn drop(&mut self) {
1781 self.mailbox.inner.ports.remove(&self.port());
1785 }
1786}
1787
1788impl<M> Stream for PortReceiver<M> {
1789 type Item = Result<M, MailboxError>;
1790
1791 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1792 std::pin::pin!(self.recv()).poll(cx).map(Some)
1793 }
1794}
1795
1796pub struct OncePortReceiver<M> {
1798 receiver: Option<oneshot::Receiver<M>>,
1799 port_id: PortId,
1800
1801 mailbox: Mailbox,
1804}
1805
1806impl<M> OncePortReceiver<M> {
1807 pub async fn recv(mut self) -> Result<M, MailboxError> {
1811 std::mem::take(&mut self.receiver)
1812 .unwrap()
1813 .await
1814 .map_err(|err| {
1815 MailboxError::new(
1816 self.actor_id().clone(),
1817 MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
1818 )
1819 })
1820 }
1821
1822 fn port(&self) -> u64 {
1823 self.port_id.1
1824 }
1825
1826 fn actor_id(&self) -> &ActorId {
1827 &self.port_id.0
1828 }
1829}
1830
1831impl<M> Drop for OncePortReceiver<M> {
1832 fn drop(&mut self) {
1833 self.mailbox.inner.ports.remove(&self.port());
1837 }
1838}
1839
1840pub struct SerializedSenderError {
1842 pub headers: Attrs,
1844 pub data: Serialized,
1846 pub error: MailboxSenderError,
1848}
1849
1850trait SerializedSender: Send + Sync {
1855 fn as_any(&self) -> &dyn Any;
1861
1862 #[allow(clippy::result_large_err)] fn send_serialized(
1870 &self,
1871 headers: Attrs,
1872 serialized: Serialized,
1873 ) -> Result<bool, SerializedSenderError>;
1874}
1875
1876enum UnboundedPortSender<M: Message> {
1878 Mpsc(mpsc::UnboundedSender<M>),
1880 Func(Arc<dyn Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
1882}
1883
1884impl<M: Message> UnboundedPortSender<M> {
1885 fn send(&self, headers: Attrs, message: M) -> Result<(), anyhow::Error> {
1886 match self {
1887 Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
1888 Self::Func(func) => func(headers, message),
1889 }
1890 }
1891}
1892
1893impl<M: Message> Clone for UnboundedPortSender<M> {
1896 fn clone(&self) -> Self {
1897 match self {
1898 Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
1899 Self::Func(func) => Self::Func(func.clone()),
1900 }
1901 }
1902}
1903
1904impl<M: Message> Debug for UnboundedPortSender<M> {
1905 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1906 match self {
1907 Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
1908 Self::Func(_) => f
1909 .debug_tuple("UnboundedPortSender::Func")
1910 .field(&"..")
1911 .finish(),
1912 }
1913 }
1914}
1915
1916struct UnboundedSender<M: Message> {
1917 sender: UnboundedPortSender<M>,
1918 port_id: PortId,
1919}
1920
1921impl<M: Message> UnboundedSender<M> {
1922 fn new(sender: UnboundedPortSender<M>, port_id: PortId) -> Self {
1925 Self { sender, port_id }
1926 }
1927
1928 #[allow(clippy::result_large_err)] fn send(&self, headers: Attrs, message: M) -> Result<(), MailboxSenderError> {
1930 self.sender.send(headers, message).map_err(|err| {
1931 MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
1932 })
1933 }
1934}
1935
1936impl<M: Message> Clone for UnboundedSender<M> {
1940 fn clone(&self) -> Self {
1941 Self {
1942 sender: self.sender.clone(),
1943 port_id: self.port_id.clone(),
1944 }
1945 }
1946}
1947
1948impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
1949 fn as_any(&self) -> &dyn Any {
1950 self
1951 }
1952
1953 fn send_serialized(
1954 &self,
1955 headers: Attrs,
1956 serialized: Serialized,
1957 ) -> Result<bool, SerializedSenderError> {
1958 match serialized.deserialized() {
1959 Ok(message) => {
1960 self.sender.send(headers.clone(), message).map_err(|err| {
1961 SerializedSenderError {
1962 data: serialized,
1963 error: MailboxSenderError::new_bound(
1964 self.port_id.clone(),
1965 MailboxSenderErrorKind::Other(err),
1966 ),
1967 headers,
1968 }
1969 })?;
1970
1971 Ok(true)
1972 }
1973 Err(err) => Err(SerializedSenderError {
1974 data: serialized,
1975 error: MailboxSenderError::new_bound(
1976 self.port_id.clone(),
1977 MailboxSenderErrorKind::Deserialize(M::typename(), err),
1978 ),
1979 headers,
1980 }),
1981 }
1982 }
1983}
1984
1985#[derive(Debug)]
1988struct OnceSender<M: Message> {
1989 sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
1990 port_id: PortId,
1991}
1992
1993impl<M: Message> OnceSender<M> {
1994 fn new(sender: oneshot::Sender<M>, port_id: PortId) -> Self {
1997 Self {
1998 sender: Arc::new(Mutex::new(Some(sender))),
1999 port_id,
2000 }
2001 }
2002
2003 #[allow(clippy::result_large_err)] fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2005 match self.sender.lock().unwrap().take() {
2007 None => Err(MailboxSenderError::new_bound(
2008 self.port_id.clone(),
2009 MailboxSenderErrorKind::Closed,
2010 )),
2011 Some(sender) => {
2012 sender.send(message).map_err(|_| {
2013 MailboxSenderError::new_bound(
2018 self.port_id.clone(),
2019 MailboxSenderErrorKind::Closed,
2020 )
2021 })?;
2022 Ok(false)
2023 }
2024 }
2025 }
2026}
2027
2028impl<M: Message> Clone for OnceSender<M> {
2032 fn clone(&self) -> Self {
2033 Self {
2034 sender: self.sender.clone(),
2035 port_id: self.port_id.clone(),
2036 }
2037 }
2038}
2039
2040impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2041 fn as_any(&self) -> &dyn Any {
2042 self
2043 }
2044
2045 fn send_serialized(
2046 &self,
2047 headers: Attrs,
2048 serialized: Serialized,
2049 ) -> Result<bool, SerializedSenderError> {
2050 match serialized.deserialized() {
2051 Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2052 data: serialized,
2053 error: e,
2054 headers,
2055 }),
2056 Err(err) => Err(SerializedSenderError {
2057 data: serialized,
2058 error: MailboxSenderError::new_bound(
2059 self.port_id.clone(),
2060 MailboxSenderErrorKind::Deserialize(M::typename(), err),
2061 ),
2062 headers,
2063 }),
2064 }
2065 }
2066}
2067
2068struct UntypedUnboundedSender {
2070 sender: Box<dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync>,
2071 port_id: PortId,
2072}
2073
2074impl SerializedSender for UntypedUnboundedSender {
2075 fn as_any(&self) -> &dyn Any {
2076 self
2077 }
2078
2079 fn send_serialized(
2080 &self,
2081 headers: Attrs,
2082 serialized: Serialized,
2083 ) -> Result<bool, SerializedSenderError> {
2084 (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2085 data,
2086 error: MailboxSenderError::new_bound(
2087 self.port_id.clone(),
2088 MailboxSenderErrorKind::Other(err),
2089 ),
2090 headers,
2091 })?;
2092
2093 Ok(true)
2094 }
2095}
2096
2097struct State {
2099 actor_id: ActorId,
2101
2102 ports: DashMap<u64, Box<dyn SerializedSender>>,
2106
2107 next_port: AtomicU64,
2109
2110 forwarder: BoxedMailboxSender,
2112}
2113
2114impl State {
2115 fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
2117 Self {
2118 actor_id,
2119 ports: DashMap::new(),
2120 next_port: AtomicU64::new(USER_PORT_OFFSET),
2123 forwarder,
2124 }
2125 }
2126
2127 fn allocate_port(&self) -> u64 {
2129 self.next_port.fetch_add(1, Ordering::SeqCst)
2130 }
2131}
2132
2133impl fmt::Debug for State {
2134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2135 f.debug_struct("State")
2136 .field("actor_id", &self.actor_id)
2137 .field(
2138 "open_ports",
2139 &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2140 )
2141 .field("next_port", &self.next_port)
2142 .finish()
2143 }
2144}
2145
2146#[derive(Debug, Clone)]
2150pub struct MailboxMuxer {
2151 mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2152}
2153
2154impl MailboxMuxer {
2155 pub fn new() -> Self {
2157 Self {
2158 mailboxes: Arc::new(DashMap::new()),
2159 }
2160 }
2161
2162 pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
2167 match self.mailboxes.entry(actor_id) {
2168 Entry::Occupied(_) => false,
2169 Entry::Vacant(entry) => {
2170 entry.insert(Box::new(sender));
2171 true
2172 }
2173 }
2174 }
2175
2176 pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2178 self.bind(mailbox.actor_id().clone(), mailbox)
2179 }
2180
2181 pub(crate) fn unbind(&self, actor_id: &ActorId) {
2185 self.mailboxes.remove(actor_id);
2186 }
2187
2188 pub fn bound_actors(&self) -> Vec<ActorId> {
2190 self.mailboxes.iter().map(|e| e.key().clone()).collect()
2191 }
2192}
2193
2194impl MailboxSender for MailboxMuxer {
2195 fn post(
2196 &self,
2197 envelope: MessageEnvelope,
2198 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2199 ) {
2200 let dest_actor_id = envelope.dest().actor_id();
2201 match self.mailboxes.get(envelope.dest().actor_id()) {
2202 None => {
2203 let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2204 envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2205 }
2206 Some(sender) => sender.post(envelope, return_handle),
2207 }
2208 }
2209}
2210
2211#[derive(Debug, Clone)]
2214pub struct MailboxRouter {
2215 entries: Arc<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2216}
2217
2218impl MailboxRouter {
2219 pub fn new() -> Self {
2221 Self {
2222 entries: Arc::new(RwLock::new(BTreeMap::new())),
2223 }
2224 }
2225
2226 pub fn downgrade(&self) -> WeakMailboxRouter {
2228 WeakMailboxRouter(Arc::downgrade(&self.entries))
2229 }
2230
2231 pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2235 FallbackMailboxRouter {
2236 router: self.clone(),
2237 default,
2238 }
2239 }
2240
2241 pub fn bind(&self, dest: Reference, sender: impl MailboxSender + 'static) {
2245 let mut w = self.entries.write().unwrap();
2246 w.insert(dest, Arc::new(sender));
2247 }
2248
2249 fn sender(&self, actor_id: &ActorId) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2250 match self
2251 .entries
2252 .read()
2253 .unwrap()
2254 .lower_bound(Excluded(&actor_id.clone().into()))
2255 .prev()
2256 {
2257 None => None,
2258 Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2259 Some(sender.clone())
2260 }
2261 Some(_) => None,
2262 }
2263 }
2264}
2265
2266impl MailboxSender for MailboxRouter {
2267 fn post(
2268 &self,
2269 envelope: MessageEnvelope,
2270 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2271 ) {
2272 match self.sender(envelope.dest().actor_id()) {
2273 None => envelope.undeliverable(
2274 DeliveryError::Unroutable(
2275 "no destination found for actor in routing table".to_string(),
2276 ),
2277 return_handle,
2278 ),
2279 Some(sender) => sender.post(envelope, return_handle),
2280 }
2281 }
2282}
2283
2284#[derive(Debug, Clone)]
2285struct FallbackMailboxRouter {
2286 router: MailboxRouter,
2287 default: BoxedMailboxSender,
2288}
2289
2290impl MailboxSender for FallbackMailboxRouter {
2291 fn post(
2292 &self,
2293 envelope: MessageEnvelope,
2294 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2295 ) {
2296 match self.router.sender(envelope.dest().actor_id()) {
2297 Some(sender) => sender.post(envelope, return_handle),
2298 None => self.default.post(envelope, return_handle),
2299 }
2300 }
2301}
2302
2303#[derive(Debug, Clone)]
2312pub struct WeakMailboxRouter(
2313 Weak<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2314);
2315
2316impl WeakMailboxRouter {
2317 pub fn upgrade(&self) -> Option<MailboxRouter> {
2319 self.0.upgrade().map(|entries| MailboxRouter { entries })
2320 }
2321}
2322
2323impl MailboxSender for WeakMailboxRouter {
2324 fn post(
2325 &self,
2326 envelope: MessageEnvelope,
2327 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2328 ) {
2329 match self.upgrade() {
2330 Some(router) => router.post(envelope, return_handle),
2331 None => envelope.undeliverable(
2332 DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2333 return_handle,
2334 ),
2335 }
2336 }
2337}
2338
2339#[derive(Debug, Clone)]
2349pub struct DialMailboxRouter {
2350 address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,
2351 sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2352
2353 default: BoxedMailboxSender,
2356}
2357
2358impl DialMailboxRouter {
2359 pub fn new() -> Self {
2361 Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2362 }
2363
2364 pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2368 Self {
2369 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2370 sender_cache: Arc::new(DashMap::new()),
2371 default,
2372 }
2373 }
2374
2375 pub fn bind(&self, dest: Reference, addr: ChannelAddr) {
2381 if let Ok(mut w) = self.address_book.write() {
2382 if let Some(old_addr) = w.insert(dest.clone(), addr.clone()) {
2383 if old_addr != addr {
2384 tracing::info!("Rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2385 self.sender_cache.remove(&old_addr);
2386 }
2387 }
2388 } else {
2389 tracing::error!("Address book poisoned during bind of {:?}", dest);
2390 }
2391 }
2392
2393 pub fn unbind(&self, dest: &Reference) {
2399 if let Ok(mut w) = self.address_book.write() {
2400 let to_remove: Vec<(Reference, ChannelAddr)> = w
2401 .range(dest..)
2402 .take_while(|(key, _)| dest.is_prefix_of(key))
2403 .map(|(key, addr)| (key.clone(), addr.clone()))
2404 .collect();
2405
2406 for (key, addr) in to_remove {
2407 tracing::info!("Unbinding {:?} from {:?}", key, addr);
2408 w.remove(&key);
2409 self.sender_cache.remove(&addr);
2410 }
2411 } else {
2412 tracing::error!("Address book poisoned during unbind of {:?}", dest);
2413 }
2414 }
2415
2416 pub fn lookup_addr(&self, actor_id: &ActorId) -> Option<ChannelAddr> {
2418 let address_book = self.address_book.read().unwrap();
2419 address_book
2420 .lower_bound(Excluded(&actor_id.clone().into()))
2421 .prev()
2422 .and_then(|(key, addr)| {
2423 if key.is_prefix_of(&actor_id.clone().into()) {
2424 Some(addr.clone())
2425 } else {
2426 None
2427 }
2428 })
2429 }
2430
2431 #[allow(clippy::result_large_err)] fn dial(
2433 &self,
2434 addr: &ChannelAddr,
2435 actor_id: &ActorId,
2436 ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2437 match self.sender_cache.entry(addr.clone()) {
2441 Entry::Occupied(entry) => Ok(entry.get().clone()),
2442 Entry::Vacant(entry) => {
2443 let tx = channel::dial(addr.clone()).map_err(|err| {
2444 MailboxSenderError::new_unbound_type(
2445 actor_id.clone(),
2446 MailboxSenderErrorKind::Channel(err),
2447 "unknown",
2448 )
2449 })?;
2450 let sender = MailboxClient::new(tx);
2451 Ok(entry.insert(Arc::new(sender)).value().clone())
2452 }
2453 }
2454 }
2455}
2456
2457impl MailboxSender for DialMailboxRouter {
2458 fn post(
2459 &self,
2460 envelope: MessageEnvelope,
2461 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2462 ) {
2463 let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2464 self.default.post(envelope, return_handle);
2465 return;
2466 };
2467
2468 match self.dial(&addr, envelope.dest().actor_id()) {
2469 Err(err) => envelope.undeliverable(
2470 DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2471 return_handle,
2472 ),
2473 Ok(sender) => sender.post(envelope, return_handle),
2474 }
2475 }
2476}
2477
2478#[derive(Debug)]
2481pub struct UnroutableMailboxSender;
2482
2483impl MailboxSender for UnroutableMailboxSender {
2484 fn post(
2485 &self,
2486 envelope: MessageEnvelope,
2487 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2488 ) {
2489 envelope.undeliverable(
2490 DeliveryError::Unroutable("destination not found in routing table".to_string()),
2491 return_handle,
2492 );
2493 }
2494}
2495
2496#[cfg(test)]
2497mod tests {
2498
2499 use std::assert_matches::assert_matches;
2500 use std::mem::drop;
2501 use std::sync::atomic::AtomicUsize;
2502 use std::time::Duration;
2503
2504 use timed_test::async_timed_test;
2505
2506 use super::*;
2507 use crate::Actor;
2508 use crate::PortId;
2509 use crate::accum;
2510 use crate::channel::ChannelTransport;
2511 use crate::channel::dial;
2512 use crate::channel::serve;
2513 use crate::channel::sim::SimAddr;
2514 use crate::clock::Clock;
2515 use crate::clock::RealClock;
2516 use crate::data::Serialized;
2517 use crate::id;
2518 use crate::proc::Proc;
2519 use crate::reference::ProcId;
2520 use crate::reference::WorldId;
2521 use crate::simnet;
2522
2523 #[test]
2524 fn test_error() {
2525 let err = MailboxError::new(
2526 ActorId(
2527 ProcId::Ranked(WorldId("myworld".to_string()), 2),
2528 "myactor".to_string(),
2529 5,
2530 ),
2531 MailboxErrorKind::Closed,
2532 );
2533 assert_eq!(format!("{}", err), "myworld[2].myactor[5]: mailbox closed");
2534 }
2535
2536 #[tokio::test]
2537 async fn test_mailbox_basic() {
2538 let mbox = Mailbox::new_detached(id!(test[0].test));
2539 let (port, mut receiver) = mbox.open_port::<u64>();
2540 let port = port.bind();
2541
2542 mbox.serialize_and_send(&port, 123, monitored_return_handle())
2543 .unwrap();
2544 mbox.serialize_and_send(&port, 321, monitored_return_handle())
2545 .unwrap();
2546 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2547 assert_eq!(receiver.recv().await.unwrap(), 321u64);
2548
2549 let serialized = Serialized::serialize(&999u64).unwrap();
2550 mbox.post(
2551 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2552 monitored_return_handle(),
2553 );
2554 assert_eq!(receiver.recv().await.unwrap(), 999u64);
2555 }
2556
2557 #[tokio::test]
2558 async fn test_mailbox_accum() {
2559 let mbox = Mailbox::new_detached(id!(test[0].test));
2560 let (port, mut receiver) = mbox.open_accum_port(accum::max::<i64>());
2561
2562 for i in -3..4 {
2563 port.send(i).unwrap();
2564 let received: accum::Max<i64> = receiver.recv().await.unwrap();
2565 let msg = received.get();
2566 assert_eq!(msg, &i);
2567 }
2568 for i in -3..4 {
2570 port.send(i).unwrap();
2571 assert_eq!(receiver.recv().await.unwrap().get(), &3);
2572 }
2573 port.send(4).unwrap();
2575 assert_eq!(receiver.recv().await.unwrap().get(), &4);
2576
2577 for i in 5..10 {
2579 port.send(i).unwrap();
2580 }
2581 assert_eq!(receiver.recv().await.unwrap().get(), &9);
2582 port.send(1).unwrap();
2583 port.send(3).unwrap();
2584 port.send(2).unwrap();
2585 assert_eq!(receiver.recv().await.unwrap().get(), &9);
2586 }
2587
2588 #[test]
2589 fn test_port_and_reducer() {
2590 let mbox = Mailbox::new_detached(id!(test[0].test));
2591 {
2593 let accumulator = accum::max::<u64>();
2594 let reducer_spec = accumulator.reducer_spec().unwrap();
2595 let (port, _) = mbox.open_accum_port(accum::max::<u64>());
2596 assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
2597 let port_ref = port.bind();
2598 assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
2599 }
2600 {
2602 let (port, _) = mbox.open_port::<u64>();
2603 assert_eq!(port.reducer_spec, None);
2604 let port_ref = port.bind();
2605 assert_eq!(port_ref.reducer_spec(), &None);
2606 }
2607 }
2608
2609 #[tokio::test]
2610 #[ignore] async fn test_mailbox_once() {
2612 let mbox = Mailbox::new_detached(id!(test[0].test));
2613
2614 let (port, receiver) = mbox.open_once_port::<u64>();
2615
2616 port.send(123u64).unwrap();
2619 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2620
2621 }
2632
2633 #[tokio::test]
2634 #[ignore] async fn test_mailbox_receiver_drop() {
2636 let mbox = Mailbox::new_detached(id!(test[0].test));
2637 let (port, mut receiver) = mbox.open_port::<u64>();
2638 let port = port.bind();
2640 mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
2641 .unwrap();
2642 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2643 drop(receiver);
2644 let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
2645 panic!();
2646 };
2647
2648 assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2649 assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
2650 }
2651
2652 #[tokio::test]
2653 async fn test_drain() {
2654 let mbox = Mailbox::new_detached(id!(test[0].test));
2655
2656 let (port, mut receiver) = mbox.open_port();
2657 let port = port.bind();
2658
2659 for i in 0..10 {
2660 mbox.serialize_and_send(&port, i, monitored_return_handle())
2661 .unwrap();
2662 }
2663
2664 for i in 0..10 {
2665 assert_eq!(receiver.recv().await.unwrap(), i);
2666 }
2667
2668 assert!(receiver.drain().is_empty());
2669 }
2670
2671 #[tokio::test]
2672 async fn test_mailbox_muxer() {
2673 let muxer = MailboxMuxer::new();
2674
2675 let mbox0 = Mailbox::new_detached(id!(test[0].actor1));
2676 let mbox1 = Mailbox::new_detached(id!(test[0].actor2));
2677
2678 muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
2679 muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
2680
2681 let (port, receiver) = mbox0.open_once_port::<u64>();
2682
2683 port.send(123u64).unwrap();
2684 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2685
2686 }
2696
2697 #[tokio::test]
2698 async fn test_local_client_server() {
2699 let mbox = Mailbox::new_detached(id!(test[0].actor0));
2700 let (tx, rx) = channel::local::new();
2701 let serve_handle = mbox.clone().serve(rx);
2702 let client = MailboxClient::new(tx);
2703
2704 let (port, receiver) = mbox.open_once_port::<u64>();
2705 let port = port.bind();
2706
2707 client
2708 .serialize_and_send_once(port, 123u64, monitored_return_handle())
2709 .unwrap();
2710 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2711 serve_handle.stop("fromt test");
2712 serve_handle.await.unwrap().unwrap();
2713 }
2714
2715 #[tokio::test]
2716 async fn test_sim_client_server() {
2717 simnet::start();
2718 let dst_addr = SimAddr::new("local:1".parse::<ChannelAddr>().unwrap()).unwrap();
2719 let src_to_dst = ChannelAddr::Sim(
2720 SimAddr::new_with_src(
2721 "local:0".parse::<ChannelAddr>().unwrap(),
2722 dst_addr.addr().clone(),
2723 )
2724 .unwrap(),
2725 );
2726
2727 let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone()))
2728 .await
2729 .unwrap();
2730 let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
2731 let mbox = Mailbox::new_detached(id!(test[0].actor0));
2732 let serve_handle = mbox.clone().serve(rx);
2733 let client = MailboxClient::new(tx);
2734 let (port, receiver) = mbox.open_once_port::<u64>();
2735 let port = port.bind();
2736 let msg: u64 = 123;
2737 client
2738 .serialize_and_send_once(port, msg, monitored_return_handle())
2739 .unwrap();
2740 assert_eq!(receiver.recv().await.unwrap(), msg);
2741 serve_handle.stop("from test");
2742 serve_handle.await.unwrap().unwrap();
2743 }
2744
2745 #[tokio::test]
2746 async fn test_mailbox_router() {
2747 let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2748 let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2749 let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2750 let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2751
2752 let comms: Vec<(OncePortRef<u64>, OncePortReceiver<u64>)> =
2753 [&mbox0, &mbox1, &mbox2, &mbox3]
2754 .into_iter()
2755 .map(|mbox| {
2756 let (port, receiver) = mbox.open_once_port::<u64>();
2757 (port.bind(), receiver)
2758 })
2759 .collect();
2760
2761 let router = MailboxRouter::new();
2762
2763 router.bind(id!(world0).into(), mbox0);
2764 router.bind(id!(world1[0]).into(), mbox1);
2765 router.bind(id!(world1[1]).into(), mbox2);
2766 router.bind(id!(world1[1].actor1).into(), mbox3);
2767
2768 for (i, (port, receiver)) in comms.into_iter().enumerate() {
2769 router
2770 .serialize_and_send_once(port, i as u64, monitored_return_handle())
2771 .unwrap();
2772 assert_eq!(receiver.recv().await.unwrap(), i as u64);
2773 }
2774
2775 let mbox4 = Mailbox::new_detached(id!(fallback[0].actor));
2778
2779 let (return_handle, mut return_receiver) =
2780 crate::mailbox::undeliverable::new_undeliverable_port();
2781 let (port, _receiver) = mbox4.open_once_port();
2782 router
2783 .serialize_and_send_once(port.bind(), 0, return_handle.clone())
2784 .unwrap();
2785 assert!(return_receiver.recv().await.is_ok());
2786
2787 let router = router.fallback(mbox4.clone().into_boxed());
2788 let (port, receiver) = mbox4.open_once_port();
2789 router
2790 .serialize_and_send_once(port.bind(), 0, return_handle)
2791 .unwrap();
2792 assert_eq!(receiver.recv().await.unwrap(), 0);
2793 }
2794
2795 #[tokio::test]
2796 async fn test_dial_mailbox_router() {
2797 let router = DialMailboxRouter::new();
2798
2799 router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
2800 router.bind(id!(world1[0]).into(), "unix!@2".parse().unwrap());
2801 router.bind(id!(world1[1]).into(), "unix!@3".parse().unwrap());
2802 router.bind(id!(world1[1].actor1).into(), "unix!@4".parse().unwrap());
2803
2804 router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2806 router.lookup_addr(&id!(world1[0].actor[0])).unwrap();
2807
2808 router.unbind(&id!(world1).into());
2810 assert!(router.lookup_addr(&id!(world1[0].actor1[0])).is_none());
2811 assert!(router.lookup_addr(&id!(world1[1].actor1[0])).is_none());
2812 assert!(router.lookup_addr(&id!(world1[2].actor1[0])).is_none());
2813 router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2814 router.unbind(&id!(world0).into());
2815 assert!(router.lookup_addr(&id!(world0[0].actor[0])).is_none());
2816 }
2817
2818 #[tokio::test]
2819 #[ignore] async fn test_dial_mailbox_router_default() {
2821 let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2822 let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2823 let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2824 let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2825
2826 let root = MailboxRouter::new();
2829 let world0_router = DialMailboxRouter::new_with_default(root.boxed());
2830 let world1_router = DialMailboxRouter::new_with_default(root.boxed());
2831
2832 root.bind(id!(world0).into(), world0_router.clone());
2833 root.bind(id!(world1).into(), world1_router.clone());
2834
2835 let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
2836
2837 let mut handles = Vec::new(); for mbox in mailboxes.iter() {
2839 let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local))
2840 .await
2841 .unwrap();
2842 let handle = (*mbox).clone().serve(rx);
2843 handles.push(handle);
2844
2845 eprintln!("{}: {}", mbox.actor_id(), addr);
2846 if mbox.actor_id().world_name() == "world0" {
2847 world0_router.bind(mbox.actor_id().clone().into(), addr);
2848 } else {
2849 world1_router.bind(mbox.actor_id().clone().into(), addr);
2850 }
2851 }
2852
2853 for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
2855 for mbox in mailboxes.iter() {
2856 let (port, receiver) = mbox.open_once_port::<u64>();
2857 let port = port.bind();
2858 router
2859 .serialize_and_send_once(port, 123u64, monitored_return_handle())
2860 .unwrap();
2861 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2862 }
2863 }
2864 }
2865
2866 #[tokio::test]
2867 async fn test_enqueue_port() {
2868 let mbox = Mailbox::new_detached(id!(test[0].test));
2869
2870 let count = Arc::new(AtomicUsize::new(0));
2871 let count_clone = count.clone();
2872 let port = mbox.open_enqueue_port(move |_, n| {
2873 count_clone.fetch_add(n, Ordering::SeqCst);
2874 Ok(())
2875 });
2876
2877 port.send(10).unwrap();
2878 port.send(5).unwrap();
2879 port.send(1).unwrap();
2880 port.send(0).unwrap();
2881
2882 assert_eq!(count.load(Ordering::SeqCst), 16);
2883 }
2884
2885 #[derive(Clone, Debug, Serialize, Deserialize, Named)]
2886 struct TestMessage;
2887
2888 #[derive(Clone, Debug, Serialize, Deserialize, Named)]
2889 #[named(name = "some::custom::path")]
2890 struct TestMessage2;
2891
2892 #[test]
2893 fn test_remote_message_macros() {
2894 assert_eq!(
2895 TestMessage::typename(),
2896 "hyperactor::mailbox::tests::TestMessage"
2897 );
2898 assert_eq!(TestMessage2::typename(), "some::custom::path");
2899 }
2900
2901 #[test]
2902 fn test_message_envelope_display() {
2903 #[derive(Named, Serialize, Deserialize)]
2904 struct MyTest {
2905 a: u64,
2906 b: String,
2907 }
2908 crate::register_type!(MyTest);
2909
2910 let envelope = MessageEnvelope::serialize(
2911 id!(source[0].actor),
2912 id!(dest[1].actor[0][123]),
2913 &MyTest {
2914 a: 123,
2915 b: "hello".into(),
2916 },
2917 Attrs::new(),
2918 )
2919 .unwrap();
2920
2921 assert_eq!(
2922 format!("{}", envelope),
2923 r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"}"#
2924 );
2925 }
2926
2927 #[derive(Debug, Default, Actor)]
2928 struct Foo;
2929
2930 #[tokio::test]
2933 async fn test_actor_delivery_failure() {
2934 use crate::actor::ActorStatus;
2937 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
2938
2939 let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
2940 BOXED_PANICKING_MAILBOX_SENDER.clone(),
2941 ));
2942 let proc_id = id!(quux[0]);
2943 let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
2944 ProcSupervisionCoordinator::set(&proc).await.unwrap();
2945
2946 let foo = proc.spawn::<Foo>("foo", ()).await.unwrap();
2947 let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
2948 let message = MessageEnvelope::new(
2949 foo.actor_id().clone(),
2950 PortId(id!(corge[0].bar), 9999u64),
2951 Serialized::serialize(&1u64).unwrap(),
2952 Attrs::new(),
2953 );
2954 return_handle.send(Undeliverable(message)).unwrap();
2955
2956 RealClock
2957 .sleep(tokio::time::Duration::from_millis(100))
2958 .await;
2959
2960 let foo_status = foo.status();
2961 assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
2962 let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
2963 unreachable!()
2964 };
2965 assert!(msg.as_str().contains(
2966 "serving quux[0].foo[0]: processing error: a message from \
2967 quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
2968 ));
2969
2970 proc.destroy_and_wait(tokio::time::Duration::from_secs(1), None)
2971 .await
2972 .unwrap();
2973 }
2974
2975 #[tokio::test]
2976 async fn test_detached_return_handle() {
2977 let (return_handle, mut return_receiver) =
2978 crate::mailbox::undeliverable::new_undeliverable_port();
2979 let envelope = MessageEnvelope::new(
2981 id!(foo[0].bar),
2982 PortId(id!(baz[0].corge), 9999u64),
2983 Serialized::serialize(&1u64).unwrap(),
2984 Attrs::new(),
2985 );
2986 return_handle.send(Undeliverable(envelope.clone())).unwrap();
2987 assert!(
2989 RealClock
2990 .timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
2991 .await
2992 .is_ok()
2993 );
2994 let monitor_handle = tokio::spawn(async move {
2997 while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
2998 envelope.try_set_error(DeliveryError::BrokenLink(
2999 "returned in unit test".to_string(),
3000 ));
3001 UndeliverableMailboxSender
3002 .post(envelope, monitored_return_handle());
3003 }
3004 });
3005 drop(return_handle);
3006 assert!(
3007 RealClock
3008 .timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3009 .await
3010 .is_ok()
3011 );
3012 }
3013
3014 async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3015 fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3016 let dummy_state =
3019 State::new(id!(world[0].actor), BOXED_PANICKING_MAILBOX_SENDER.clone());
3020 let dummy_port_id = PortId(id!(world[0].actor), 0);
3021 let (sender, receiver) = mpsc::unbounded_channel::<M>();
3022 let receiver = PortReceiver {
3023 receiver,
3024 port_id: dummy_port_id,
3025 coalesce,
3026 mailbox: Mailbox {
3027 inner: Arc::new(dummy_state),
3028 },
3029 };
3030 (sender, receiver)
3031 }
3032
3033 {
3035 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3036 assert!(receiver.drain().is_empty());
3037
3038 sender.send(0).unwrap();
3039 sender.send(1).unwrap();
3040 sender.send(2).unwrap();
3041 sender.send(3).unwrap();
3042 sender.send(4).unwrap();
3043 sender.send(5).unwrap();
3044 sender.send(6).unwrap();
3045 sender.send(7).unwrap();
3046
3047 if drop_sender {
3048 drop(sender);
3049 }
3050
3051 if !coalesce {
3052 assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3053 } else {
3054 assert_eq!(receiver.drain(), vec![7]);
3055 }
3056
3057 assert!(receiver.drain().is_empty());
3058 assert!(receiver.drain().is_empty());
3059 }
3060
3061 {
3063 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3064 assert!(receiver.try_recv().unwrap().is_none());
3065
3066 sender.send(0).unwrap();
3067 sender.send(1).unwrap();
3068 sender.send(2).unwrap();
3069 sender.send(3).unwrap();
3070
3071 if drop_sender {
3072 drop(sender);
3073 }
3074
3075 if !coalesce {
3076 assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3077 assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3078 assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3079 }
3080 assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3081 if drop_sender {
3082 assert_matches!(
3083 receiver.try_recv().unwrap_err().kind(),
3084 MailboxErrorKind::Closed
3085 );
3086 assert_matches!(
3088 receiver.try_recv().unwrap_err().kind(),
3089 MailboxErrorKind::Closed
3090 );
3091 } else {
3092 assert!(receiver.try_recv().unwrap().is_none());
3093 assert!(receiver.try_recv().unwrap().is_none());
3095 }
3096 }
3097 {
3099 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3100 assert!(
3101 RealClock
3102 .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3103 .await
3104 .is_err()
3105 );
3106
3107 sender.send(4).unwrap();
3108 sender.send(5).unwrap();
3109 sender.send(6).unwrap();
3110 sender.send(7).unwrap();
3111
3112 if drop_sender {
3113 drop(sender);
3114 }
3115
3116 if !coalesce {
3117 assert_eq!(receiver.recv().await.unwrap(), 4);
3118 assert_eq!(receiver.recv().await.unwrap(), 5);
3119 assert_eq!(receiver.recv().await.unwrap(), 6);
3120 }
3121 assert_eq!(receiver.recv().await.unwrap(), 7);
3122 if drop_sender {
3123 assert_matches!(
3124 receiver.recv().await.unwrap_err().kind(),
3125 MailboxErrorKind::Closed
3126 );
3127 assert_matches!(
3129 receiver.recv().await.unwrap_err().kind(),
3130 MailboxErrorKind::Closed
3131 );
3132 } else {
3133 assert!(
3134 RealClock
3135 .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3136 .await
3137 .is_err()
3138 );
3139 }
3140 }
3141 }
3142
3143 #[tokio::test]
3144 async fn test_receiver_basic_default() {
3145 verify_receiver(false, false).await
3146 }
3147
3148 #[tokio::test]
3149 async fn test_receiver_basic_latest() {
3150 verify_receiver(true, false).await
3151 }
3152
3153 #[tokio::test]
3154 async fn test_receiver_after_sender_drop_default() {
3155 verify_receiver(false, true).await
3156 }
3157
3158 #[tokio::test]
3159 async fn test_receiver_after_sender_drop_latest() {
3160 verify_receiver(true, true).await
3161 }
3162
3163 struct Setup {
3164 receiver: PortReceiver<u64>,
3165 actor0: Mailbox,
3166 actor1: Mailbox,
3167 port_id: PortId,
3168 port_id1: PortId,
3169 port_id2: PortId,
3170 port_id2_1: PortId,
3171 }
3172
3173 async fn setup_split_port_ids(reducer_spec: Option<ReducerSpec>) -> Setup {
3174 let muxer = MailboxMuxer::new();
3175 let actor0 = Mailbox::new(id!(test[0].actor), BoxedMailboxSender::new(muxer.clone()));
3176 let actor1 = Mailbox::new(id!(test[1].actor1), BoxedMailboxSender::new(muxer.clone()));
3177 muxer.bind_mailbox(actor0.clone());
3178 muxer.bind_mailbox(actor1.clone());
3179
3180 let (port_handle, receiver) = actor0.open_port::<u64>();
3182 let port_id = port_handle.bind().port_id().clone();
3183
3184 let port_id1 = port_id.split(&actor1, reducer_spec.clone()).unwrap();
3186 let port_id2 = port_id.split(&actor1, reducer_spec.clone()).unwrap();
3187
3188 let port_id2_1 = port_id2.split(&actor1, reducer_spec).unwrap();
3190
3191 Setup {
3192 receiver,
3193 actor0,
3194 actor1,
3195 port_id,
3196 port_id1,
3197 port_id2,
3198 port_id2_1,
3199 }
3200 }
3201
3202 fn post(mailbox: &Mailbox, port_id: PortId, msg: u64) {
3203 mailbox.post(
3204 MessageEnvelope::new_unknown(port_id.clone(), Serialized::serialize(&msg).unwrap()),
3205 monitored_return_handle(),
3206 );
3207 }
3208
3209 #[async_timed_test(timeout_secs = 30)]
3210 async fn test_split_port_id_no_reducer() {
3211 let Setup {
3212 mut receiver,
3213 actor0,
3214 actor1,
3215 port_id,
3216 port_id1,
3217 port_id2,
3218 port_id2_1,
3219 } = setup_split_port_ids(None).await;
3220 post(&actor0, port_id.clone(), 1);
3222 assert_eq!(receiver.recv().await.unwrap(), 1);
3223 post(&actor1, port_id1.clone(), 2);
3224 assert_eq!(receiver.recv().await.unwrap(), 2);
3225 post(&actor1, port_id2.clone(), 3);
3226 assert_eq!(receiver.recv().await.unwrap(), 3);
3227 post(&actor1, port_id2_1.clone(), 4);
3228 assert_eq!(receiver.recv().await.unwrap(), 4);
3229
3230 RealClock.sleep(Duration::from_secs(2)).await;
3232 let msg = receiver.try_recv().unwrap();
3233 assert_eq!(msg, None);
3234 }
3235
3236 async fn wait_for(
3237 receiver: &mut PortReceiver<u64>,
3238 expected_size: usize,
3239 timeout_duration: Duration,
3240 ) -> anyhow::Result<Vec<u64>> {
3241 let mut messeges = vec![];
3242
3243 RealClock
3244 .timeout(timeout_duration, async {
3245 loop {
3246 let msg = receiver.recv().await.unwrap();
3247 messeges.push(msg);
3248 if messeges.len() == expected_size {
3249 break;
3250 }
3251 }
3252 })
3253 .await?;
3254 Ok(messeges)
3255 }
3256
3257 #[async_timed_test(timeout_secs = 30)]
3258 async fn test_split_port_id_sum_reducer() {
3259 let config = crate::config::global::lock();
3260 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3261
3262 let sum_accumulator = accum::sum::<u64>();
3263 let reducer_spec = sum_accumulator.reducer_spec();
3264 let Setup {
3265 mut receiver,
3266 actor0,
3267 actor1,
3268 port_id,
3269 port_id1,
3270 port_id2,
3271 port_id2_1,
3272 } = setup_split_port_ids(reducer_spec).await;
3273 post(&actor0, port_id.clone(), 4);
3274 post(&actor1, port_id1.clone(), 2);
3275 post(&actor1, port_id2.clone(), 3);
3276 post(&actor1, port_id2_1.clone(), 1);
3277 let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3278 .await
3279 .unwrap();
3280 messages.sort();
3283 assert_eq!(messages, vec![1, 2, 3, 4]);
3284
3285 RealClock.sleep(Duration::from_secs(2)).await;
3287 let msg = receiver.try_recv().unwrap();
3288 assert_eq!(msg, None);
3289 }
3290
3291 #[async_timed_test(timeout_secs = 30)]
3292 async fn test_split_port_id_every_n_messages() {
3293 let actor = Mailbox::new(
3294 id!(test[0].actor),
3295 BoxedMailboxSender::new(PanickingMailboxSender),
3296 );
3297 let (port_handle, mut receiver) = actor.open_port::<u64>();
3298 let port_id = port_handle.bind().port_id().clone();
3299 let reducer_spec = accum::sum::<u64>().reducer_spec();
3301 let split_port_id = port_id.split(&actor, reducer_spec).unwrap();
3302
3303 for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3305 post(&actor, split_port_id.clone(), msg);
3306 }
3307 let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3310 .await
3311 .unwrap();
3312 assert_eq!(messages, vec![15]);
3313
3314 RealClock.sleep(Duration::from_secs(2)).await;
3317 let msg = receiver.try_recv().unwrap();
3318 assert_eq!(msg, None);
3319 }
3320}