1use std::any::Any;
71use std::collections::BTreeMap;
72use std::collections::BTreeSet;
73use std::fmt;
74use std::fmt::Debug;
75use std::future;
76use std::future::Future;
77use std::ops::Bound::Excluded;
78use std::pin::Pin;
79use std::sync::Arc;
80use std::sync::LazyLock;
81use std::sync::Mutex;
82use std::sync::RwLock;
83use std::sync::Weak;
84use std::sync::atomic::AtomicU64;
85use std::sync::atomic::AtomicUsize;
86use std::sync::atomic::Ordering;
87use std::task::Context;
88use std::task::Poll;
89
90use async_trait::async_trait;
91use dashmap::DashMap;
92use dashmap::mapref::entry::Entry;
93use futures::Sink;
94use futures::Stream;
95use hyperactor_config::Flattrs;
96use hyperactor_telemetry::hash_to_u64;
97use serde::Deserialize;
98use serde::Serialize;
99use serde::de::DeserializeOwned;
100use tokio::sync::mpsc;
101use tokio::sync::oneshot;
102use tokio::sync::watch;
103use tokio::task::JoinHandle;
104use tokio_util::sync::CancellationToken;
105use typeuri::Named;
106
107use crate::Proc;
109use crate::accum::Accumulator;
110use crate::accum::ReducerSpec;
111use crate::accum::StreamingReducerOpts;
112use crate::actor::ActorStatus;
113use crate::actor::Signal;
114use crate::actor::remote::USER_PORT_OFFSET;
115use crate::channel;
116use crate::channel::ChannelAddr;
117use crate::channel::ChannelError;
118use crate::channel::ChannelTransport;
119use crate::channel::SendError;
120use crate::channel::TxStatus;
121use crate::context;
122use crate::metrics;
123use crate::ordering::SEQ_INFO;
124use crate::ordering::SeqInfo;
125use crate::reference;
126
127mod undeliverable;
128pub use undeliverable::Undeliverable;
130pub use undeliverable::UndeliverableMessageError;
131pub use undeliverable::custom_monitored_return_handle;
132pub use undeliverable::monitored_return_handle; pub mod mailbox_admin_message;
135pub use mailbox_admin_message::MailboxAdminMessage;
136pub use mailbox_admin_message::MailboxAdminMessageHandler;
137pub mod durable_mailbox_sender;
139pub use durable_mailbox_sender::log;
140use durable_mailbox_sender::log::*;
141pub mod headers;
143
144pub trait Message: Send + Sync + 'static {}
147impl<M: Send + Sync + 'static> Message for M {}
148
149pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
153
154impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
155
156pub type Data = Vec<u8>;
158
159#[derive(
161 thiserror::Error,
162 Debug,
163 Serialize,
164 Deserialize,
165 typeuri::Named,
166 Clone,
167 PartialEq,
168 Eq
169)]
170pub enum DeliveryError {
171 #[error("address not routable: {0}")]
173 Unroutable(String),
174
175 #[error("broken link: {0}")]
178 BrokenLink(String),
179
180 #[error("mailbox error: {0}")]
182 Mailbox(String),
183
184 #[error("multicast error: {0}")]
186 Multicast(String),
187
188 #[error("ttl expired")]
190 TtlExpired,
191}
192
193#[derive(Debug, Serialize, Deserialize, Clone, typeuri::Named)]
197pub struct MessageEnvelope {
198 sender: reference::ActorId,
200
201 dest: reference::PortId,
203
204 data: wirevalue::Any,
206
207 errors: Vec<DeliveryError>,
209
210 headers: Flattrs,
212
213 ttl: u8,
215
216 return_undeliverable: bool,
219 }
221wirevalue::register_type!(MessageEnvelope);
222
223impl MessageEnvelope {
224 pub fn new(
226 sender: reference::ActorId,
227 dest: reference::PortId,
228 data: wirevalue::Any,
229 headers: Flattrs,
230 ) -> Self {
231 Self {
232 sender,
233 dest,
234 data,
235 errors: Vec::new(),
236 headers,
237 ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
238 return_undeliverable: true,
240 }
241 }
242
243 pub(crate) fn new_unknown(dest: reference::PortId, data: wirevalue::Any) -> Self {
245 let unknown_addr = ChannelAddr::any(ChannelTransport::Local);
247 let unknown_proc_id = crate::reference::ProcId::unique(unknown_addr, "unknown");
248 let unknown_actor_id =
249 crate::reference::ActorId::root(unknown_proc_id, "unknown".to_string());
250 Self::new(unknown_actor_id, dest, data, Flattrs::new())
251 }
252
253 pub fn serialize<T: Serialize + Named>(
255 source: reference::ActorId,
256 dest: reference::PortId,
257 value: &T,
258 headers: Flattrs,
259 ) -> Result<Self, wirevalue::Error> {
260 Ok(Self {
261 headers,
262 data: wirevalue::Any::serialize(value)?,
263 sender: source,
264 dest,
265 errors: Vec::new(),
266 ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
267 return_undeliverable: true,
269 })
270 }
271
272 pub fn ttl(&self) -> u8 {
278 self.ttl
279 }
280
281 pub fn set_ttl(mut self, ttl: u8) -> Self {
291 self.ttl = ttl;
292 self
293 }
294
295 fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
303 if self.ttl == 0 {
304 Err(DeliveryError::TtlExpired)
305 } else {
306 self.ttl -= 1;
307 Ok(())
308 }
309 }
310
311 pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
313 Ok(self.data.deserialized()?)
314 }
315
316 pub fn data(&self) -> &wirevalue::Any {
318 &self.data
319 }
320
321 pub fn sender(&self) -> &reference::ActorId {
323 &self.sender
324 }
325
326 pub fn dest(&self) -> &reference::PortId {
328 &self.dest
329 }
330
331 pub fn headers(&self) -> &Flattrs {
333 &self.headers
334 }
335
336 pub fn is_signal(&self) -> bool {
338 self.dest.index() == Signal::port()
339 }
340
341 pub fn set_error(&mut self, error: DeliveryError) {
344 self.errors.push(error)
345 }
346
347 pub fn update_sender(&mut self, sender: reference::ActorId) {
351 self.sender = sender;
352 }
353
354 pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
358 self.return_undeliverable = return_undeliverable;
359 }
360
361 pub fn undeliverable(
365 mut self,
366 error: DeliveryError,
367 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
368 ) {
369 tracing::debug!(
370 name = "undelivered_message_attempt",
371 sender = self.sender.to_string(),
372 dest = self.dest.to_string(),
373 error = error.to_string(),
374 return_handle = %return_handle,
375 );
376 metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
377 1,
378 hyperactor_telemetry::kv_pairs!(
379 "sender_actor_id" => self.sender.to_string(),
380 "dest_actor_id" => self.dest.to_string(),
381 "message_type" => self.data.typename().unwrap_or("unknown"),
382 "error_type" => error.to_string(),
383 ),
384 );
385
386 self.set_error(error);
387 undeliverable::return_undeliverable(return_handle, self);
388 }
389
390 pub fn errors(&self) -> &Vec<DeliveryError> {
393 &self.errors
394 }
395
396 pub fn error_msg(&self) -> Option<String> {
400 if self.errors.is_empty() {
401 None
402 } else {
403 Some(
404 self.errors
405 .iter()
406 .map(|e| e.to_string())
407 .collect::<Vec<_>>()
408 .join("; "),
409 )
410 }
411 }
412
413 fn open(self) -> (MessageMetadata, wirevalue::Any) {
414 let Self {
415 sender,
416 dest,
417 data,
418 errors,
419 headers,
420 ttl,
421 return_undeliverable,
422 } = self;
423
424 (
425 MessageMetadata {
426 sender,
427 dest,
428 errors,
429 headers,
430 ttl,
431 return_undeliverable,
432 },
433 data,
434 )
435 }
436
437 fn seal(metadata: MessageMetadata, data: wirevalue::Any) -> Self {
438 let MessageMetadata {
439 sender,
440 dest,
441 errors,
442 headers,
443 ttl,
444 return_undeliverable,
445 } = metadata;
446
447 Self {
448 sender,
449 dest,
450 data,
451 errors,
452 headers,
453 ttl,
454 return_undeliverable,
455 }
456 }
457
458 fn return_undeliverable(&self) -> bool {
459 self.return_undeliverable
460 }
461
462 pub fn set_header<T: Serialize>(&mut self, key: hyperactor_config::attrs::Key<T>, value: T) {
464 self.headers.set(key, value);
465 }
466}
467
468impl fmt::Display for MessageEnvelope {
469 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470 match &self.error_msg() {
471 None => write!(
472 f,
473 "{} > {}: {} {{{}}}",
474 self.sender, self.dest, self.data, self.headers
475 ),
476 Some(err) => write!(
477 f,
478 "{} > {}: {} {{{}}}: delivery error: {}",
479 self.sender, self.dest, self.data, self.headers, err
480 ),
481 }
482 }
483}
484
485#[derive(Clone)]
487pub struct MessageMetadata {
488 sender: reference::ActorId,
489 dest: reference::PortId,
490 errors: Vec<DeliveryError>,
491 headers: Flattrs,
492 ttl: u8,
493 return_undeliverable: bool,
494}
495
496#[derive(Debug)]
499pub struct MailboxError {
500 actor_id: reference::ActorId,
501 kind: MailboxErrorKind,
502}
503
504#[derive(thiserror::Error, Debug)]
507#[non_exhaustive]
508pub enum MailboxErrorKind {
509 #[error("mailbox closed")]
511 Closed,
512
513 #[error("invalid port: {0}")]
515 InvalidPort(reference::PortId),
516
517 #[error("no sender for port: {0}")]
519 NoSenderForPort(reference::PortId),
520
521 #[error("no local sender for port: {0}")]
524 NoLocalSenderForPort(reference::PortId),
525
526 #[error("{0}: port closed")]
528 PortClosed(reference::PortId),
529
530 #[error("send {0}: {1}")]
532 Send(reference::PortId, #[source] anyhow::Error),
533
534 #[error("recv {0}: {1}")]
536 Recv(reference::PortId, #[source] anyhow::Error),
537
538 #[error("serialize: {0}")]
540 Serialize(#[source] anyhow::Error),
541
542 #[error("deserialize {0}: {1}")]
544 Deserialize(&'static str, anyhow::Error),
545
546 #[error(transparent)]
548 Channel(#[from] ChannelError),
549
550 #[error("owner terminated: {0}")]
552 OwnerTerminated(ActorStatus),
553}
554
555impl MailboxError {
556 pub fn new(actor_id: reference::ActorId, kind: MailboxErrorKind) -> Self {
559 Self { actor_id, kind }
560 }
561
562 pub fn actor_id(&self) -> &reference::ActorId {
564 &self.actor_id
565 }
566
567 pub fn kind(&self) -> &MailboxErrorKind {
569 &self.kind
570 }
571}
572
573impl fmt::Display for MailboxError {
574 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
575 write!(f, "{}: ", self.actor_id)?;
576 fmt::Display::fmt(&self.kind, f)
577 }
578}
579
580impl std::error::Error for MailboxError {
581 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
582 self.kind.source()
583 }
584}
585
586#[derive(Debug, Clone)]
590pub enum PortLocation {
591 Bound(reference::PortId),
593 Unbound(reference::ActorId, &'static str),
595}
596
597impl PortLocation {
598 fn new_unbound<M: Message>(actor_id: reference::ActorId) -> Self {
599 PortLocation::Unbound(actor_id, std::any::type_name::<M>())
600 }
601
602 #[allow(dead_code)]
603 fn new_unbound_type(actor_id: reference::ActorId, ty: &'static str) -> Self {
604 PortLocation::Unbound(actor_id, ty)
605 }
606
607 pub fn actor_id(&self) -> &reference::ActorId {
609 match self {
610 PortLocation::Bound(port_id) => port_id.actor_id(),
611 PortLocation::Unbound(actor_id, _) => actor_id,
612 }
613 }
614}
615
616impl fmt::Display for PortLocation {
617 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618 match self {
619 PortLocation::Bound(port_id) => write!(f, "{}", port_id),
620 PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
621 }
622 }
623}
624
625#[derive(Debug)]
628pub struct MailboxSenderError {
629 location: Box<PortLocation>,
630 kind: Box<MailboxSenderErrorKind>,
631}
632
633#[derive(thiserror::Error, Debug)]
635pub enum MailboxSenderErrorKind {
636 #[error("serialization error: {0}")]
638 Serialize(anyhow::Error),
639
640 #[error("deserialization error for type {0}: {1}")]
642 Deserialize(&'static str, anyhow::Error),
643
644 #[error("invalid port")]
646 Invalid,
647
648 #[error("port closed")]
650 Closed,
651
652 #[error(transparent)]
655 Mailbox(#[from] MailboxError),
656
657 #[error(transparent)]
659 Channel(#[from] ChannelError),
660
661 #[error(transparent)]
663 MessageLog(#[from] MessageLogError),
664
665 #[error("send error: {0}")]
667 Other(#[from] anyhow::Error),
668
669 #[error("unreachable: {0}")]
671 Unreachable(anyhow::Error),
672}
673
674impl MailboxSenderError {
675 pub fn new_unbound<M>(actor_id: reference::ActorId, kind: MailboxSenderErrorKind) -> Self {
677 Self {
678 location: Box::new(PortLocation::Unbound(actor_id, std::any::type_name::<M>())),
679 kind: Box::new(kind),
680 }
681 }
682
683 pub fn new_unbound_type(
685 actor_id: reference::ActorId,
686 kind: MailboxSenderErrorKind,
687 ty: &'static str,
688 ) -> Self {
689 Self {
690 location: Box::new(PortLocation::Unbound(actor_id, ty)),
691 kind: Box::new(kind),
692 }
693 }
694
695 pub fn new_bound(port_id: reference::PortId, kind: MailboxSenderErrorKind) -> Self {
697 Self {
698 location: Box::new(PortLocation::Bound(port_id)),
699 kind: Box::new(kind),
700 }
701 }
702
703 pub fn location(&self) -> &PortLocation {
705 &self.location
706 }
707
708 pub fn kind(&self) -> &MailboxSenderErrorKind {
710 &self.kind
711 }
712}
713
714impl fmt::Display for MailboxSenderError {
715 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
716 write!(f, "{}: ", self.location)?;
717 fmt::Display::fmt(&self.kind, f)
718 }
719}
720
721impl std::error::Error for MailboxSenderError {
722 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
723 self.kind.source()
724 }
725}
726
727pub trait MailboxSender: Send + Sync + Any {
730 fn post(
733 &self,
734 mut envelope: MessageEnvelope,
735 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
736 ) {
737 if let Err(err) = envelope.dec_ttl_or_err() {
738 envelope.undeliverable(err, return_handle);
739 return;
740 }
741 self.post_unchecked(envelope, return_handle);
742 }
743
744 fn post_unchecked(
746 &self,
747 envelope: MessageEnvelope,
748 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
749 );
750}
751
752pub trait PortSender: MailboxSender {
755 fn serialize_and_send<M: RemoteMessage>(
757 &self,
758 port: &reference::PortRef<M>,
759 message: M,
760 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
761 ) -> Result<(), MailboxSenderError> {
762 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
764 MailboxSenderError::new_bound(
765 port.port_id().clone(),
766 MailboxSenderErrorKind::Serialize(err.into()),
767 )
768 })?;
769 self.post(
770 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
771 return_handle,
772 );
773 Ok(())
774 }
775
776 fn serialize_and_send_once<M: RemoteMessage>(
779 &self,
780 once_port: reference::OncePortRef<M>,
781 message: M,
782 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
783 ) -> Result<(), MailboxSenderError> {
784 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
785 MailboxSenderError::new_bound(
786 once_port.port_id().clone(),
787 MailboxSenderErrorKind::Serialize(err.into()),
788 )
789 })?;
790 self.post(
791 MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
792 return_handle,
793 );
794 Ok(())
795 }
796}
797
798impl<T: ?Sized + MailboxSender> PortSender for T {}
799
800#[derive(Debug, Clone)]
804pub struct PanickingMailboxSender;
805
806impl MailboxSender for PanickingMailboxSender {
807 fn post_unchecked(
808 &self,
809 envelope: MessageEnvelope,
810 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
811 ) {
812 panic!("panic! in the mailbox! attempted post: {}", envelope)
813 }
814}
815
816#[derive(Debug)]
819pub struct UndeliverableMailboxSender;
820
821impl MailboxSender for UndeliverableMailboxSender {
822 fn post_unchecked(
823 &self,
824 envelope: MessageEnvelope,
825 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
826 ) {
827 let sender_name = envelope.sender.name();
828 let error_str = envelope.error_msg().unwrap_or("".to_string());
829 tracing::error!(
832 name = "undelivered_message_abandoned",
833 actor_name = sender_name,
834 actor_id = envelope.sender.to_string(),
835 dest = envelope.dest.to_string(),
836 headers = envelope.headers().to_string(), data = envelope.data().to_string(),
838 return_handle = %return_handle,
839 "message not delivered, {}",
840 error_str,
841 );
842 }
843}
844
845struct Buffer<T: Message> {
846 queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
847 #[allow(dead_code)]
848 processed: watch::Receiver<usize>,
849 seq: AtomicUsize,
850}
851
852impl<T: Message> Buffer<T> {
853 fn new<Fut>(
854 process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
855 ) -> Self
856 where
857 Fut: Future<Output = ()> + Send + 'static,
858 {
859 let (queue, mut next) = mpsc::unbounded_channel();
860 let (last_processed, processed) = watch::channel(0);
861 crate::init::get_runtime().spawn(async move {
862 let mut seq = 0;
863 while let Some((msg, return_handle)) = next.recv().await {
864 process(msg, return_handle).await;
865 seq += 1;
866 let _ = last_processed.send(seq);
867 }
868 });
869 Self {
870 queue,
871 processed,
872 seq: AtomicUsize::new(0),
873 }
874 }
875
876 #[allow(clippy::result_large_err)]
877 fn send(
878 &self,
879 item: (T, PortHandle<Undeliverable<T>>),
880 ) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
881 self.seq.fetch_add(1, Ordering::SeqCst);
882 self.queue.send(item)?;
883 Ok(())
884 }
885
886 #[allow(dead_code)]
887 async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
888 let seq = self.seq.load(Ordering::SeqCst);
889 while *self.processed.borrow_and_update() < seq {
890 self.processed.changed().await?;
891 }
892 Ok(())
893 }
894}
895
896static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
897 LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
898
899#[derive(Clone)]
905pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
906
907impl fmt::Debug for BoxedMailboxSender {
908 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
909 f.debug_struct("BoxedMailboxSender")
910 .field("sender", &"<dyn MailboxSender>")
911 .finish()
912 }
913}
914
915impl BoxedMailboxSender {
916 pub fn new(sender: impl MailboxSender + 'static) -> Self {
918 Self(Arc::new(sender))
919 }
920
921 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
924 (&*self.0 as &dyn Any).downcast_ref::<T>()
925 }
926}
927
928pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
930 fn boxed(&self) -> BoxedMailboxSender;
932}
933impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
934 fn boxed(&self) -> BoxedMailboxSender {
935 BoxedMailboxSender::new(self.clone())
936 }
937}
938
939pub trait IntoBoxedMailboxSender: MailboxSender {
941 fn into_boxed(self) -> BoxedMailboxSender;
943}
944impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
945 fn into_boxed(self) -> BoxedMailboxSender {
946 BoxedMailboxSender::new(self)
947 }
948}
949
950impl MailboxSender for BoxedMailboxSender {
951 fn post_unchecked(
952 &self,
953 envelope: MessageEnvelope,
954 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
955 ) {
956 self.0.post_unchecked(envelope, return_handle);
957 }
958}
959
960#[derive(thiserror::Error, Debug)]
962pub enum MailboxServerError {
963 #[error(transparent)]
965 Channel(#[from] ChannelError),
966
967 #[error(transparent)]
969 MailboxSender(#[from] MailboxSenderError),
970}
971
972#[derive(Debug)]
975pub struct MailboxServerHandle {
976 join_handle: JoinHandle<Result<(), MailboxServerError>>,
977 stopped_tx: watch::Sender<bool>,
978}
979
980impl MailboxServerHandle {
981 pub fn stop(&self, reason: &str) {
986 tracing::info!("stopping mailbox server; reason: {}", reason);
987 self.stopped_tx.send(true).expect("stop called twice");
988 }
989}
990
991impl Future for MailboxServerHandle {
993 type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
994
995 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
996 let join_handle_pinned =
998 unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
999 join_handle_pinned.poll(cx)
1000 }
1001}
1002
1003pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
1006 fn serve(
1010 self,
1011 mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
1012 ) -> MailboxServerHandle {
1013 let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
1018 let server = self.clone();
1019 tokio::task::spawn(async move {
1020 static NEXT_RANK: AtomicUsize = AtomicUsize::new(0);
1022 let rank = NEXT_RANK.fetch_add(1, Ordering::Relaxed);
1023 let addr = ChannelAddr::any(ChannelTransport::Local);
1024 let proc_id = reference::ProcId::unique(addr, format!("mailbox_server_{}", rank));
1025 let proc = Proc::configured(proc_id, BoxedMailboxSender::new(server));
1028 let (client, _) = proc.instance("undeliverable_supervisor").unwrap();
1029 while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
1030 if let Ok(Undeliverable(e)) =
1031 envelope.deserialized::<Undeliverable<MessageEnvelope>>()
1032 {
1033 UndeliverableMailboxSender.post(e, monitored_return_handle());
1035 continue;
1036 }
1037 envelope.set_error(DeliveryError::BrokenLink(
1038 "message was undeliverable".to_owned(),
1039 ));
1040 let return_port =
1041 reference::PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1042 envelope.sender(),
1043 );
1044 return_port.send_serialized(
1045 &client,
1046 Flattrs::new(),
1047 wirevalue::Any::serialize(&Undeliverable(envelope)).unwrap(),
1048 );
1049 }
1050 });
1051
1052 let (stopped_tx, mut stopped_rx) = watch::channel(false);
1053 let join_handle = tokio::spawn(async move {
1054 let mut detached = false;
1055
1056 let result = loop {
1057 if *stopped_rx.borrow_and_update() {
1058 break Ok(());
1059 }
1060
1061 tokio::select! {
1062 message = rx.recv() => {
1063 match message {
1064 Ok(envelope) => self.post(envelope, return_handle.clone()),
1066
1067 Err(ChannelError::Closed) => break Ok(()),
1070 Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1071 }
1072 }
1073 result = stopped_rx.changed(), if !detached => {
1074 detached = result.is_err();
1075 if detached {
1076 tracing::debug!(
1077 "the mailbox server is detached for Rx {}", rx.addr()
1078 );
1079 } else {
1080 tracing::debug!(
1081 "the mailbox server is stopped for Rx {}", rx.addr()
1082 );
1083 }
1084 }
1085 }
1086 };
1087
1088 rx.join().await;
1091
1092 result
1093 });
1094
1095 MailboxServerHandle {
1096 join_handle,
1097 stopped_tx,
1098 }
1099 }
1100}
1101
1102impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1103
1104pub struct MailboxClient {
1106 buffer: Buffer<MessageEnvelope>,
1108
1109 _tx_monitoring: CancellationToken,
1111}
1112
1113impl fmt::Debug for MailboxClient {
1114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1115 f.debug_struct("MailboxClient")
1116 .field("buffer", &"<Buffer>")
1117 .finish()
1118 }
1119}
1120
1121impl MailboxClient {
1122 pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1125 let addr = tx.addr();
1126 let tx = Arc::new(tx);
1127 let tx_status = tx.status().clone();
1128 let tx_monitoring = CancellationToken::new();
1129 let buffer = Buffer::new(move |envelope, return_handle| {
1130 let tx = Arc::clone(&tx);
1131 let (return_channel, return_receiver) =
1132 oneshot::channel::<SendError<MessageEnvelope>>();
1133 let return_handle_0 = return_handle.clone();
1135 tokio::spawn(async move {
1136 if let Ok(SendError {
1137 error,
1138 message,
1139 reason,
1140 }) = return_receiver.await
1141 {
1142 message.undeliverable(
1143 DeliveryError::BrokenLink(format!(
1144 "failed to enqueue in MailboxClient when processing buffer: {error} with reason {reason:?}"
1145 )),
1146 return_handle_0,
1147 );
1148 }
1149 });
1150 tx.try_post(envelope, return_channel);
1152 future::ready(())
1153 });
1154 let this = Self {
1155 buffer,
1156 _tx_monitoring: tx_monitoring.clone(),
1157 };
1158 Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1159 this
1160 }
1161
1162 pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1165 Ok(MailboxClient::new(channel::dial(addr)?))
1166 }
1167
1168 fn monitor_tx_health(
1170 mut rx: watch::Receiver<TxStatus>,
1171 cancel_token: CancellationToken,
1172 addr: ChannelAddr,
1173 ) {
1174 crate::init::get_runtime().spawn(async move {
1175 loop {
1176 tokio::select! {
1177 changed = rx.changed() => {
1178 if changed.is_err() || *rx.borrow() == TxStatus::Closed {
1179 tracing::warn!("connection to {} lost", addr);
1180 break;
1183 }
1184 }
1185 _ = cancel_token.cancelled() => {
1186 break;
1187 }
1188 }
1189 }
1190 });
1191 }
1192}
1193
1194impl MailboxSender for MailboxClient {
1195 #[tracing::instrument(level = "debug", skip_all)]
1196 fn post_unchecked(
1197 &self,
1198 envelope: MessageEnvelope,
1199 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1200 ) {
1201 tracing::event!(target:"messages", tracing::Level::TRACE, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.actor_id(), "port"= envelope.dest.index(), "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1202 if let Err(mpsc::error::SendError((envelope, return_handle))) =
1203 self.buffer.send((envelope, return_handle))
1204 {
1205 let err = DeliveryError::BrokenLink(
1206 "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
1207 );
1208
1209 envelope.undeliverable(err, return_handle);
1211 }
1212 }
1213}
1214
1215pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1217 cx: C,
1218 port: reference::PortRef<M>,
1219}
1220
1221impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1222 pub fn new(cx: C, port: reference::PortRef<M>) -> Self {
1224 Self { cx, port }
1225 }
1226}
1227
1228impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1229 type Error = MailboxSenderError;
1230
1231 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1232 Poll::Ready(Ok(()))
1233 }
1234
1235 fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1236 self.port.send(&self.cx, item)
1237 }
1238
1239 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1240 Poll::Ready(Ok(()))
1241 }
1242
1243 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1244 Poll::Ready(Ok(()))
1245 }
1246}
1247
1248#[derive(Clone, Debug)]
1251pub struct Mailbox {
1252 inner: Arc<State>,
1253}
1254
1255impl Mailbox {
1256 pub fn new(actor_id: reference::ActorId, forwarder: BoxedMailboxSender) -> Self {
1259 Self {
1260 inner: Arc::new(State::new(actor_id, forwarder)),
1261 }
1262 }
1263
1264 pub fn new_detached(actor_id: reference::ActorId) -> Self {
1266 Self {
1267 inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1268 }
1269 }
1270
1271 pub fn actor_id(&self) -> &reference::ActorId {
1273 &self.inner.actor_id
1274 }
1275
1276 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1281 let port_index = self.inner.allocate_port();
1282 let (sender, receiver) = mpsc::unbounded_channel::<M>();
1283 let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1284 tracing::trace!(
1285 name = "open_port",
1286 "opening port for {} at {}",
1287 self.inner.actor_id,
1288 port_id
1289 );
1290 (
1291 PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1292 PortReceiver::new(receiver, port_id, false, self.clone()),
1293 )
1294 }
1295
1296 pub(crate) fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1303 let (handle, receiver) = self.open_port();
1304 handle.bind_actor_port();
1305 (handle, receiver)
1306 }
1307
1308 pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1311 where
1312 A: Accumulator + Send + Sync + 'static,
1313 A::Update: Message,
1314 A::State: Message + Default + Clone,
1315 {
1316 self.open_accum_port_opts(accum, StreamingReducerOpts::default())
1317 }
1318
1319 pub fn open_accum_port_opts<A>(
1327 &self,
1328 accum: A,
1329 streaming_opts: StreamingReducerOpts,
1330 ) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1331 where
1332 A: Accumulator + Send + Sync + 'static,
1333 A::Update: Message,
1334 A::State: Message + Default + Clone,
1335 {
1336 let port_index = self.inner.allocate_port();
1337 let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1338 let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1339 let state = Mutex::new(A::State::default());
1340 let reducer_spec = accum.reducer_spec();
1341 let enqueue = move |_, update: A::Update| {
1342 let mut state = state.lock().unwrap();
1343 accum.accumulate(&mut state, update)?;
1344 let _ = sender.send(state.clone());
1345 Ok(())
1346 };
1347 (
1348 PortHandle {
1349 mailbox: self.clone(),
1350 port_index,
1351 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1352 bound: Arc::new(RwLock::new(None)),
1353 reducer_spec,
1354 streaming_opts,
1355 },
1356 PortReceiver::new(receiver, port_id, true, self.clone()),
1357 )
1358 }
1359
1360 pub(crate) fn open_enqueue_port<M: Message>(
1364 &self,
1365 enqueue: impl Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1366 ) -> PortHandle<M> {
1367 PortHandle {
1368 mailbox: self.clone(),
1369 port_index: self.inner.allocate_port(),
1370 sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1371 bound: Arc::new(RwLock::new(None)),
1372 reducer_spec: None,
1373 streaming_opts: StreamingReducerOpts::default(),
1374 }
1375 }
1376
1377 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1381 let port_index = self.inner.allocate_port();
1382 let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1383 let (sender, receiver) = oneshot::channel::<M>();
1384 (
1385 OncePortHandle {
1386 mailbox: self.clone(),
1387 port_index,
1388 port_id: port_id.clone(),
1389 sender,
1390 reducer_spec: None,
1391 },
1392 OncePortReceiver {
1393 receiver: Some(receiver),
1394 port_id,
1395 mailbox: self.clone(),
1396 },
1397 )
1398 }
1399
1400 pub fn open_reduce_port<A, T>(
1414 &self,
1415 accum: A,
1416 ) -> (OncePortHandle<A::State>, OncePortReceiver<A::State>)
1417 where
1418 A: Accumulator<State = T, Update = T> + Send + Sync + 'static,
1419 T: Message + Default + Clone,
1420 {
1421 let port_index = self.inner.allocate_port();
1422 let (sender, receiver) = oneshot::channel::<T>();
1423 let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1424 let reducer_spec = accum.reducer_spec();
1425 assert!(
1426 reducer_spec.is_some(),
1427 "cannot use a reduce port without a ReducerSpec"
1428 );
1429
1430 (
1431 OncePortHandle {
1432 mailbox: self.clone(),
1433 port_index,
1434 port_id: port_id.clone(),
1435 sender,
1436 reducer_spec,
1437 },
1438 OncePortReceiver {
1439 receiver: Some(receiver),
1440 port_id,
1441 mailbox: self.clone(),
1442 },
1443 )
1444 }
1445
1446 #[allow(dead_code)]
1447 fn error(&self, err: MailboxErrorKind) -> MailboxError {
1448 MailboxError::new(self.inner.actor_id.clone(), err)
1449 }
1450
1451 fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1452 let port_index = M::port();
1453 self.inner.ports.get(&port_index).and_then(|boxed| {
1454 boxed
1455 .as_any()
1456 .downcast_ref::<UnboundedSender<M>>()
1457 .map(|s| {
1458 assert_eq!(
1459 s.port_id,
1460 self.actor_id().port_id(port_index),
1461 "port_id mismatch in downcasted UnboundedSender"
1462 );
1463 s.sender.clone()
1464 })
1465 })
1466 }
1467
1468 pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1470 self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1471 .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1472 }
1473
1474 pub(crate) fn allocate_port(&self) -> u64 {
1475 self.inner.allocate_port()
1476 }
1477
1478 fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> reference::PortRef<M> {
1479 assert_eq!(
1480 handle.mailbox.actor_id(),
1481 self.actor_id(),
1482 "port does not belong to mailbox"
1483 );
1484
1485 let port_id = self.actor_id().port_id(handle.port_index);
1488 match self.inner.ports.entry(handle.port_index) {
1489 Entry::Vacant(entry) => {
1490 entry.insert(Box::new(UnboundedSender::new(
1491 handle.sender.clone(),
1492 port_id.clone(),
1493 )));
1494 }
1495 Entry::Occupied(_entry) => {}
1496 }
1497
1498 reference::PortRef::attest(port_id)
1499 }
1500
1501 fn bind_to_actor_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
1502 assert_eq!(
1503 handle.mailbox.actor_id(),
1504 self.actor_id(),
1505 "port does not belong to mailbox"
1506 );
1507
1508 let port_index = M::port();
1509 let port_id = self.actor_id().port_id(port_index);
1510 match self.inner.ports.entry(port_index) {
1511 Entry::Vacant(entry) => {
1512 entry.insert(Box::new(UnboundedSender::new(
1513 handle.sender.clone(),
1514 port_id,
1515 )));
1516 }
1517 Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1518 }
1519 }
1520
1521 fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1522 let port_id = handle.port_id().clone();
1523 match self.inner.ports.entry(handle.port_index) {
1524 Entry::Vacant(entry) => {
1525 entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1526 }
1527 Entry::Occupied(_entry) => {}
1528 }
1529 }
1530
1531 pub(crate) fn bind_untyped(&self, port_id: &reference::PortId, sender: UntypedUnboundedSender) {
1532 assert_eq!(
1533 port_id.actor_id(),
1534 self.actor_id(),
1535 "port does not belong to mailbox"
1536 );
1537
1538 match self.inner.ports.entry(port_id.index()) {
1539 Entry::Vacant(entry) => {
1540 entry.insert(Box::new(sender));
1541 }
1542 Entry::Occupied(_entry) => {}
1543 }
1544 }
1545
1546 pub(crate) fn close(&self, status: ActorStatus) {
1547 let mut closed = self.inner.closed.write().unwrap();
1548 if closed.is_some() {
1549 panic!("mailbox with owner {} already closed", self.actor_id());
1550 }
1551 let _ = closed.insert(status);
1552 }
1553}
1554
1555impl context::Mailbox for Mailbox {
1556 fn mailbox(&self) -> &Mailbox {
1557 self
1558 }
1559}
1560
1561pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1566 cx.mailbox().open_port()
1567}
1568
1569pub fn open_once_port<M: Message>(
1572 cx: &impl context::Mailbox,
1573) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1574 cx.mailbox().open_once_port()
1575}
1576
1577impl MailboxSender for Mailbox {
1578 fn post_unchecked(
1581 &self,
1582 envelope: MessageEnvelope,
1583 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1584 ) {
1585 metrics::MAILBOX_POSTS.add(
1586 1,
1587 hyperactor_telemetry::kv_pairs!(
1588 "actor_id" => envelope.sender.to_string(),
1589 "dest_actor_id" => envelope.dest.actor_id().to_string(),
1590 ),
1591 );
1592 tracing::trace!(
1593 name = "post",
1594 actor_name = envelope.sender.name(),
1595 actor_id = envelope.sender.to_string(),
1596 "posting message to {}",
1597 envelope.dest
1598 );
1599
1600 if envelope.dest().actor_id() != &self.inner.actor_id {
1601 return self.inner.forwarder.post(envelope, return_handle);
1602 }
1603
1604 match self.inner.ports.entry(envelope.dest().index()) {
1605 Entry::Vacant(_) => {
1606 let err = DeliveryError::Unroutable(format!(
1607 "port not bound in mailbox; port id: {}; message type: {}",
1608 envelope.dest().index(),
1609 envelope.data().typename().map_or_else(
1610 || format!("unregistered type hash {}", envelope.data().typehash()),
1611 |s| s.to_string(),
1612 )
1613 ));
1614
1615 envelope.undeliverable(err, return_handle);
1616 }
1617 Entry::Occupied(entry) => {
1618 let closed = self.inner.closed.read().unwrap();
1619 if let Some(status) = &*closed {
1620 match status {
1621 ActorStatus::Stopped(reason) => {
1622 let err = format!(
1623 "mailbox owner {} is stopped: {}",
1624 self.inner.actor_id, reason
1625 );
1626 return envelope
1627 .undeliverable(DeliveryError::Mailbox(err), return_handle);
1628 }
1629 ActorStatus::Failed(actor_error) => {
1630 let err = format!(
1631 "mailbox owner {} failed: {}",
1632 self.inner.actor_id, actor_error
1633 );
1634 return envelope
1635 .undeliverable(DeliveryError::Mailbox(err), return_handle);
1636 }
1637 _ => {
1638 let err = format!(
1639 "mailbox owner {} closed unexpectedly: {:?}",
1640 self.inner.actor_id, status
1641 );
1642 return envelope
1643 .undeliverable(DeliveryError::Mailbox(err), return_handle);
1644 }
1645 }
1646 }
1647
1648 let (metadata, data) = envelope.open();
1649 let MessageMetadata {
1650 mut headers,
1651 sender,
1652 dest,
1653 errors: metadata_errors,
1654 ttl,
1655 return_undeliverable,
1656 } = metadata;
1657
1658 let to_actor_id = hash_to_u64(&dest);
1659 let message_id = hyperactor_telemetry::generate_message_id(to_actor_id);
1660 headers.set(crate::mailbox::headers::TELEMETRY_MESSAGE_ID, message_id);
1661 if !headers.contains_key(crate::mailbox::headers::SENDER_ACTOR_ID_HASH) {
1664 headers.set(
1665 crate::mailbox::headers::SENDER_ACTOR_ID_HASH,
1666 hash_to_u64(&sender),
1667 );
1668 }
1669 headers.set(crate::mailbox::headers::TELEMETRY_PORT_ID, dest.index());
1670
1671 match entry.get().send_serialized(headers, data) {
1679 Ok(false) => {
1680 hyperactor_telemetry::notify_message_status(
1681 hyperactor_telemetry::MessageStatusEvent {
1682 timestamp: std::time::SystemTime::now(),
1683 id: hyperactor_telemetry::generate_status_event_id(message_id),
1684 message_id,
1685 status: "queued".to_string(),
1686 },
1687 );
1688 entry.remove();
1689 }
1690 Ok(true) => {
1691 hyperactor_telemetry::notify_message_status(
1692 hyperactor_telemetry::MessageStatusEvent {
1693 timestamp: std::time::SystemTime::now(),
1694 id: hyperactor_telemetry::generate_status_event_id(message_id),
1695 message_id,
1696 status: "queued".to_string(),
1697 },
1698 );
1699 }
1700 Err(SerializedSenderError {
1701 data,
1702 error: sender_error,
1703 headers,
1704 }) => {
1705 entry.remove();
1706 let err = DeliveryError::Mailbox(format!("{}", sender_error));
1707
1708 MessageEnvelope::seal(
1709 MessageMetadata {
1710 headers,
1711 sender,
1712 dest,
1713 errors: metadata_errors,
1714 ttl,
1715 return_undeliverable,
1716 },
1717 data,
1718 )
1719 .undeliverable(err, return_handle)
1720 }
1721 }
1722 }
1723 }
1724 }
1725}
1726
1727#[derive(Debug)]
1735pub struct PortHandle<M: Message> {
1736 mailbox: Mailbox,
1737 port_index: u64,
1738 sender: UnboundedPortSender<M>,
1739 bound: Arc<RwLock<Option<reference::PortId>>>,
1746 reducer_spec: Option<ReducerSpec>,
1749 streaming_opts: StreamingReducerOpts,
1751}
1752
1753impl<M: Message> PortHandle<M> {
1754 fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1755 Self {
1756 mailbox,
1757 port_index,
1758 sender,
1759 bound: Arc::new(RwLock::new(None)),
1760 reducer_spec: None,
1761 streaming_opts: StreamingReducerOpts::default(),
1762 }
1763 }
1764
1765 pub(crate) fn location(&self) -> PortLocation {
1766 match self.bound.read().unwrap().as_ref() {
1767 Some(port_id) => PortLocation::Bound(port_id.clone()),
1768 None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1769 }
1770 }
1771
1772 pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1774 let closed = self.mailbox.inner.closed.read().unwrap();
1775
1776 if let Some(status) = &*closed {
1777 let err = MailboxError {
1778 actor_id: self.mailbox.actor_id().clone(),
1779 kind: MailboxErrorKind::OwnerTerminated(status.clone()),
1780 };
1781 return Err(MailboxSenderError::new_unbound::<M>(
1782 self.mailbox.actor_id().clone(),
1783 MailboxSenderErrorKind::Mailbox(err),
1784 ));
1785 }
1786
1787 let mut headers = Flattrs::new();
1788
1789 crate::mailbox::headers::set_send_timestamp(&mut headers);
1790 crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1791 let bound_guard = self.bound.read().unwrap();
1795 if let Some(bound_port) = bound_guard.as_ref() {
1796 let sequencer = cx.instance().sequencer();
1797 let seq_info = sequencer.assign_seq(bound_port);
1798 headers.set(SEQ_INFO, seq_info);
1799 } else {
1800 headers.set(SEQ_INFO, SeqInfo::Direct);
1805 }
1806 self.sender.send(headers, message).map_err(|err| {
1818 MailboxSenderError::new_unbound::<M>(
1819 self.mailbox.actor_id().clone(),
1820 MailboxSenderErrorKind::Other(err),
1821 )
1822 })
1823 }
1824
1825 pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
1828 where
1829 R: Message,
1830 F: Fn(R) -> M + Send + Sync + 'static,
1831 {
1832 let port_index = self.mailbox.inner.allocate_port();
1833 let sender = self.sender.clone();
1834 PortHandle::new(
1835 self.mailbox.clone(),
1836 port_index,
1837 UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
1838 sender.send(headers, unmap(value))
1839 })),
1840 )
1841 }
1842}
1843
1844impl<M: RemoteMessage> PortHandle<M> {
1845 pub fn bind(&self) -> reference::PortRef<M> {
1847 let port_id = {
1848 let mut guard = self.bound.write().unwrap();
1849 guard
1850 .get_or_insert_with(|| self.mailbox.bind(self).port_id().clone())
1851 .clone()
1852 };
1853 reference::PortRef::attest_reducible(
1854 port_id,
1855 self.reducer_spec.clone(),
1856 self.streaming_opts.clone(),
1857 )
1858 }
1859
1860 pub(crate) fn bind_actor_port(&self) {
1866 let port_id = self.mailbox.actor_id().port_id(M::port());
1867 {
1868 let mut guard = self.bound.write().unwrap();
1869 if guard.is_some() {
1870 panic!(
1871 "could not bind port handle {} as {port_id}: already bound",
1872 self.port_index
1873 );
1874 }
1875 *guard = Some(port_id);
1876 }
1877 self.mailbox.bind_to_actor_port(self);
1878 }
1879}
1880
1881impl<M: Message> Clone for PortHandle<M> {
1882 fn clone(&self) -> Self {
1883 Self {
1884 mailbox: self.mailbox.clone(),
1885 port_index: self.port_index,
1886 sender: self.sender.clone(),
1887 bound: self.bound.clone(),
1888 reducer_spec: self.reducer_spec.clone(),
1889 streaming_opts: self.streaming_opts.clone(),
1890 }
1891 }
1892}
1893
1894impl<M: Message> fmt::Display for PortHandle<M> {
1895 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1896 fmt::Display::fmt(&self.location(), f)
1897 }
1898}
1899
1900#[derive(Debug)]
1902pub struct OncePortHandle<M: Message> {
1903 mailbox: Mailbox,
1904 port_index: u64,
1905 port_id: reference::PortId,
1906 sender: oneshot::Sender<M>,
1907 reducer_spec: Option<ReducerSpec>,
1908}
1909
1910impl<M: Message> OncePortHandle<M> {
1911 pub fn port_id(&self) -> &reference::PortId {
1914 &self.port_id
1915 }
1916
1917 pub fn send(self, _cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1920 assert!(
1923 !self.port_id().is_actor_port(),
1924 "OncePortHandle currently does not support actor ports; a \
1925 prerequisite of that support is to assign seq to messages \
1926 if the port is actor port."
1927 );
1928
1929 let actor_id = self.mailbox.actor_id().clone();
1930 self.sender.send(message).map_err(|_| {
1931 MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1936 })?;
1937 Ok(())
1938 }
1939}
1940
1941impl<M: RemoteMessage> OncePortHandle<M> {
1942 pub fn bind(self) -> reference::OncePortRef<M> {
1947 let port_id = self.port_id().clone();
1948 let reducer_spec = self.reducer_spec.clone();
1949 self.mailbox.clone().bind_once(self);
1950 reference::OncePortRef::attest_reducible(port_id, reducer_spec)
1951 }
1952}
1953
1954impl<M: Message> fmt::Display for OncePortHandle<M> {
1955 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1956 fmt::Display::fmt(&self.port_id(), f)
1957 }
1958}
1959
1960#[derive(Debug)]
1963pub struct PortReceiver<M> {
1964 receiver: mpsc::UnboundedReceiver<M>,
1965 port_id: reference::PortId,
1966 coalesce: bool,
1969 mailbox: Mailbox,
1972}
1973
1974impl<M> PortReceiver<M> {
1975 fn new(
1976 receiver: mpsc::UnboundedReceiver<M>,
1977 port_id: reference::PortId,
1978 coalesce: bool,
1979 mailbox: Mailbox,
1980 ) -> Self {
1981 Self {
1982 receiver,
1983 port_id,
1984 coalesce,
1985 mailbox,
1986 }
1987 }
1988
1989 #[allow(clippy::result_large_err)] pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
1994 let mut next = self.receiver.try_recv();
1995 if self.coalesce
1997 && let Some(latest) = self.drain().pop()
1998 {
1999 next = Ok(latest);
2000 }
2001 match next {
2002 Ok(msg) => Ok(Some(msg)),
2003 Err(mpsc::error::TryRecvError::Empty) => Ok(None),
2004 Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
2005 self.actor_id().clone(),
2006 MailboxErrorKind::Closed,
2007 )),
2008 }
2009 }
2010
2011 pub async fn recv(&mut self) -> Result<M, MailboxError> {
2014 let mut next = self.receiver.recv().await;
2015 if self.coalesce
2018 && let Some(latest) = self.drain().pop()
2019 {
2020 next = Some(latest);
2021 }
2022 next.ok_or(MailboxError::new(
2023 self.actor_id().clone(),
2024 MailboxErrorKind::Closed,
2025 ))
2026 }
2027
2028 pub fn drain(&mut self) -> Vec<M> {
2030 let mut drained: Vec<M> = Vec::new();
2031 while let Ok(msg) = self.receiver.try_recv() {
2032 if self.coalesce {
2034 drained.pop();
2035 }
2036 drained.push(msg);
2037 }
2038 drained
2039 }
2040
2041 fn port(&self) -> u64 {
2042 self.port_id.index()
2043 }
2044
2045 fn actor_id(&self) -> &reference::ActorId {
2046 self.port_id.actor_id()
2047 }
2048}
2049
2050impl<M> Drop for PortReceiver<M> {
2051 fn drop(&mut self) {
2052 self.mailbox.inner.ports.remove(&self.port());
2056 }
2057}
2058
2059impl<M> Stream for PortReceiver<M> {
2060 type Item = Result<M, MailboxError>;
2061
2062 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2063 std::pin::pin!(self.recv()).poll(cx).map(Some)
2064 }
2065}
2066
2067pub struct OncePortReceiver<M> {
2069 receiver: Option<oneshot::Receiver<M>>,
2070 port_id: reference::PortId,
2071
2072 mailbox: Mailbox,
2075}
2076
2077impl<M> OncePortReceiver<M> {
2078 pub async fn recv(mut self) -> Result<M, MailboxError> {
2082 std::mem::take(&mut self.receiver)
2083 .unwrap()
2084 .await
2085 .map_err(|err| {
2086 MailboxError::new(
2087 self.actor_id().clone(),
2088 MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
2089 )
2090 })
2091 }
2092
2093 fn port(&self) -> u64 {
2094 self.port_id.index()
2095 }
2096
2097 fn actor_id(&self) -> &reference::ActorId {
2098 self.port_id.actor_id()
2099 }
2100}
2101
2102impl<M> Drop for OncePortReceiver<M> {
2103 fn drop(&mut self) {
2104 self.mailbox.inner.ports.remove(&self.port());
2108 }
2109}
2110
2111pub struct SerializedSenderError {
2113 pub headers: Flattrs,
2115 pub data: wirevalue::Any,
2117 pub error: MailboxSenderError,
2119}
2120
2121trait SerializedSender: Send + Sync {
2126 fn as_any(&self) -> &dyn Any;
2132
2133 #[allow(clippy::result_large_err)] fn send_serialized(
2141 &self,
2142 headers: Flattrs,
2143 serialized: wirevalue::Any,
2144 ) -> Result<bool, SerializedSenderError>;
2145}
2146
2147enum UnboundedPortSender<M: Message> {
2149 Mpsc(mpsc::UnboundedSender<M>),
2151 Func(Arc<dyn Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
2153}
2154
2155impl<M: Message> UnboundedPortSender<M> {
2156 fn send(&self, headers: Flattrs, message: M) -> Result<(), anyhow::Error> {
2157 match self {
2158 Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
2159 Self::Func(func) => func(headers, message),
2160 }
2161 }
2162}
2163
2164impl<M: Message> Clone for UnboundedPortSender<M> {
2167 fn clone(&self) -> Self {
2168 match self {
2169 Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
2170 Self::Func(func) => Self::Func(func.clone()),
2171 }
2172 }
2173}
2174
2175impl<M: Message> Debug for UnboundedPortSender<M> {
2176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2177 match self {
2178 Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
2179 Self::Func(_) => f
2180 .debug_tuple("UnboundedPortSender::Func")
2181 .field(&"..")
2182 .finish(),
2183 }
2184 }
2185}
2186
2187struct UnboundedSender<M: Message> {
2188 sender: UnboundedPortSender<M>,
2189 port_id: reference::PortId,
2190}
2191
2192impl<M: Message> UnboundedSender<M> {
2193 fn new(sender: UnboundedPortSender<M>, port_id: reference::PortId) -> Self {
2196 Self { sender, port_id }
2197 }
2198
2199 #[allow(dead_code)]
2200 fn send(&self, headers: Flattrs, message: M) -> Result<(), MailboxSenderError> {
2201 self.sender.send(headers, message).map_err(|err| {
2202 MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
2203 })
2204 }
2205}
2206
2207impl<M: Message> Clone for UnboundedSender<M> {
2211 fn clone(&self) -> Self {
2212 Self {
2213 sender: self.sender.clone(),
2214 port_id: self.port_id.clone(),
2215 }
2216 }
2217}
2218
2219impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
2220 fn as_any(&self) -> &dyn Any {
2221 self
2222 }
2223
2224 fn send_serialized(
2225 &self,
2226 headers: Flattrs,
2227 serialized: wirevalue::Any,
2228 ) -> Result<bool, SerializedSenderError> {
2229 match serialized.deserialized_unchecked() {
2235 Ok(message) => {
2236 self.sender.send(headers.clone(), message).map_err(|err| {
2237 SerializedSenderError {
2238 data: serialized,
2239 error: MailboxSenderError::new_bound(
2240 self.port_id.clone(),
2241 MailboxSenderErrorKind::Other(err),
2242 ),
2243 headers,
2244 }
2245 })?;
2246
2247 Ok(true)
2248 }
2249 Err(err) => Err(SerializedSenderError {
2250 data: serialized,
2251 error: MailboxSenderError::new_bound(
2252 self.port_id.clone(),
2253 MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2254 ),
2255 headers,
2256 }),
2257 }
2258 }
2259}
2260
2261#[derive(Debug)]
2264struct OnceSender<M: Message> {
2265 sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
2266 port_id: reference::PortId,
2267}
2268
2269impl<M: Message> OnceSender<M> {
2270 fn new(sender: oneshot::Sender<M>, port_id: reference::PortId) -> Self {
2273 Self {
2274 sender: Arc::new(Mutex::new(Some(sender))),
2275 port_id,
2276 }
2277 }
2278
2279 fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2280 match self.sender.lock().unwrap().take() {
2282 None => Err(MailboxSenderError::new_bound(
2283 self.port_id.clone(),
2284 MailboxSenderErrorKind::Closed,
2285 )),
2286 Some(sender) => {
2287 sender.send(message).map_err(|_| {
2288 MailboxSenderError::new_bound(
2293 self.port_id.clone(),
2294 MailboxSenderErrorKind::Closed,
2295 )
2296 })?;
2297 Ok(false)
2298 }
2299 }
2300 }
2301}
2302
2303impl<M: Message> Clone for OnceSender<M> {
2307 fn clone(&self) -> Self {
2308 Self {
2309 sender: self.sender.clone(),
2310 port_id: self.port_id.clone(),
2311 }
2312 }
2313}
2314
2315impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2316 fn as_any(&self) -> &dyn Any {
2317 self
2318 }
2319
2320 fn send_serialized(
2321 &self,
2322 headers: Flattrs,
2323 serialized: wirevalue::Any,
2324 ) -> Result<bool, SerializedSenderError> {
2325 match serialized.deserialized() {
2326 Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2327 data: serialized,
2328 error: e,
2329 headers,
2330 }),
2331 Err(err) => Err(SerializedSenderError {
2332 data: serialized,
2333 error: MailboxSenderError::new_bound(
2334 self.port_id.clone(),
2335 MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2336 ),
2337 headers,
2338 }),
2339 }
2340 }
2341}
2342
2343pub(crate) struct UntypedUnboundedSender {
2345 pub(crate) sender:
2346 Box<dyn Fn(wirevalue::Any) -> Result<bool, (wirevalue::Any, anyhow::Error)> + Send + Sync>,
2347 pub(crate) port_id: reference::PortId,
2348}
2349
2350impl SerializedSender for UntypedUnboundedSender {
2351 fn as_any(&self) -> &dyn Any {
2352 self
2353 }
2354
2355 fn send_serialized(
2356 &self,
2357 headers: Flattrs,
2358 serialized: wirevalue::Any,
2359 ) -> Result<bool, SerializedSenderError> {
2360 (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2361 data,
2362 error: MailboxSenderError::new_bound(
2363 self.port_id.clone(),
2364 MailboxSenderErrorKind::Other(err),
2365 ),
2366 headers,
2367 })
2368 }
2369}
2370
2371struct State {
2373 actor_id: reference::ActorId,
2375
2376 ports: DashMap<u64, Box<dyn SerializedSender>>,
2380
2381 next_port: AtomicU64,
2383
2384 forwarder: BoxedMailboxSender,
2386
2387 closed: RwLock<Option<ActorStatus>>,
2390}
2391
2392impl State {
2393 fn new(actor_id: reference::ActorId, forwarder: BoxedMailboxSender) -> Self {
2395 Self {
2396 actor_id,
2397 ports: DashMap::new(),
2398 next_port: AtomicU64::new(USER_PORT_OFFSET),
2401 forwarder,
2402 closed: RwLock::new(None),
2403 }
2404 }
2405
2406 fn allocate_port(&self) -> u64 {
2408 self.next_port.fetch_add(1, Ordering::SeqCst)
2409 }
2410}
2411
2412impl fmt::Debug for State {
2413 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2414 f.debug_struct("State")
2415 .field("actor_id", &self.actor_id)
2416 .field(
2417 "open_ports",
2418 &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2419 )
2420 .field("next_port", &self.next_port)
2421 .finish()
2422 }
2423}
2424
2425#[derive(Clone)]
2429pub struct MailboxMuxer {
2430 mailboxes: Arc<DashMap<reference::ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2431}
2432
2433impl Default for MailboxMuxer {
2434 fn default() -> Self {
2435 Self::new()
2436 }
2437}
2438
2439impl MailboxMuxer {
2440 pub fn new() -> Self {
2442 Self {
2443 mailboxes: Arc::new(DashMap::new()),
2444 }
2445 }
2446
2447 pub fn bind(&self, actor_id: reference::ActorId, sender: impl MailboxSender + 'static) -> bool {
2452 match self.mailboxes.entry(actor_id) {
2453 Entry::Occupied(_) => false,
2454 Entry::Vacant(entry) => {
2455 entry.insert(Box::new(sender));
2456 true
2457 }
2458 }
2459 }
2460
2461 pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2463 self.bind(mailbox.actor_id().clone(), mailbox)
2464 }
2465
2466 #[allow(dead_code)]
2470 pub(crate) fn unbind(&self, actor_id: &reference::ActorId) {
2471 self.mailboxes.remove(actor_id);
2472 }
2473
2474 pub fn bound_actors(&self) -> Vec<reference::ActorId> {
2476 self.mailboxes.iter().map(|e| e.key().clone()).collect()
2477 }
2478}
2479
2480impl MailboxSender for MailboxMuxer {
2481 fn post_unchecked(
2482 &self,
2483 envelope: MessageEnvelope,
2484 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2485 ) {
2486 let dest_actor_id = envelope.dest().actor_id();
2487 match self.mailboxes.get(dest_actor_id) {
2488 None => {
2489 let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2490 envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2491 }
2492 Some(sender) => sender.post(envelope, return_handle),
2493 }
2494 }
2495}
2496
2497#[derive(Clone)]
2500pub struct MailboxRouter {
2501 entries: Arc<RwLock<BTreeMap<reference::Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2502}
2503
2504impl Default for MailboxRouter {
2505 fn default() -> Self {
2506 Self::new()
2507 }
2508}
2509
2510impl MailboxRouter {
2511 pub fn new() -> Self {
2513 Self {
2514 entries: Arc::new(RwLock::new(BTreeMap::new())),
2515 }
2516 }
2517
2518 pub fn downgrade(&self) -> WeakMailboxRouter {
2520 WeakMailboxRouter(Arc::downgrade(&self.entries))
2521 }
2522
2523 pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2527 FallbackMailboxRouter {
2528 router: self.clone(),
2529 default,
2530 }
2531 }
2532
2533 pub fn bind(&self, dest: reference::Reference, sender: impl MailboxSender + 'static) {
2537 let mut w = self.entries.write().unwrap();
2538 w.insert(dest, Arc::new(sender));
2539 }
2540
2541 fn sender(
2542 &self,
2543 actor_id: &reference::ActorId,
2544 ) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2545 match self
2546 .entries
2547 .read()
2548 .unwrap()
2549 .lower_bound(Excluded(&actor_id.clone().into()))
2550 .prev()
2551 {
2552 None => None,
2553 Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2554 Some(sender.clone())
2555 }
2556 Some(_) => None,
2557 }
2558 }
2559}
2560
2561impl MailboxSender for MailboxRouter {
2562 fn post_unchecked(
2563 &self,
2564 envelope: MessageEnvelope,
2565 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2566 ) {
2567 match self.sender(envelope.dest().actor_id()) {
2568 None => envelope.undeliverable(
2569 DeliveryError::Unroutable(
2570 "no destination found for actor in routing table".to_string(),
2571 ),
2572 return_handle,
2573 ),
2574 Some(sender) => sender.post(envelope, return_handle),
2575 }
2576 }
2577}
2578
2579#[derive(Clone)]
2580struct FallbackMailboxRouter {
2581 router: MailboxRouter,
2582 default: BoxedMailboxSender,
2583}
2584
2585impl MailboxSender for FallbackMailboxRouter {
2586 fn post_unchecked(
2587 &self,
2588 envelope: MessageEnvelope,
2589 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2590 ) {
2591 match self.router.sender(envelope.dest().actor_id()) {
2592 Some(sender) => sender.post(envelope, return_handle),
2593 None => self.default.post(envelope, return_handle),
2594 }
2595 }
2596}
2597
2598#[derive(Debug, Clone)]
2607pub struct WeakMailboxRouter(
2608 Weak<RwLock<BTreeMap<reference::Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2609);
2610
2611impl WeakMailboxRouter {
2612 pub fn upgrade(&self) -> Option<MailboxRouter> {
2614 self.0.upgrade().map(|entries| MailboxRouter { entries })
2615 }
2616}
2617
2618impl MailboxSender for WeakMailboxRouter {
2619 fn post_unchecked(
2620 &self,
2621 envelope: MessageEnvelope,
2622 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2623 ) {
2624 match self.upgrade() {
2625 Some(router) => router.post(envelope, return_handle),
2626 None => envelope.undeliverable(
2627 DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2628 return_handle,
2629 ),
2630 }
2631 }
2632}
2633
2634#[derive(Clone)]
2648pub struct DialMailboxRouter {
2649 address_book: Arc<RwLock<BTreeMap<reference::Reference, ChannelAddr>>>,
2650 sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2651
2652 default: BoxedMailboxSender,
2655
2656 direct_addressed_remote_only: bool,
2659}
2660
2661impl Default for DialMailboxRouter {
2662 fn default() -> Self {
2663 Self::new()
2664 }
2665}
2666
2667impl DialMailboxRouter {
2668 pub fn new() -> Self {
2670 Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2671 }
2672
2673 pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2678 Self {
2679 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2680 sender_cache: Arc::new(DashMap::new()),
2681 default,
2682 direct_addressed_remote_only: false,
2683 }
2684 }
2685
2686 pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
2691 Self {
2692 address_book: Arc::new(RwLock::new(BTreeMap::new())),
2693 sender_cache: Arc::new(DashMap::new()),
2694 default,
2695 direct_addressed_remote_only: true,
2696 }
2697 }
2698
2699 pub fn bind(&self, dest: reference::Reference, addr: ChannelAddr) {
2705 if let Ok(mut w) = self.address_book.write() {
2706 if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
2707 && old_addr != addr
2708 {
2709 tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2710 self.sender_cache.remove(&old_addr);
2711 }
2712 } else {
2713 tracing::error!("address book poisoned during bind of {:?}", dest);
2714 }
2715 }
2716
2717 pub fn unbind(&self, dest: &reference::Reference) {
2723 if let Ok(mut w) = self.address_book.write() {
2724 let to_remove: Vec<(reference::Reference, ChannelAddr)> = w
2725 .range(dest..)
2726 .take_while(|(key, _)| dest.is_prefix_of(key))
2727 .map(|(key, addr)| (key.clone(), addr.clone()))
2728 .collect();
2729
2730 for (key, addr) in to_remove {
2731 tracing::info!("unbinding {:?} from {:?}", key, addr);
2732 w.remove(&key);
2733 self.sender_cache.remove(&addr);
2734 }
2735 } else {
2736 tracing::error!("address book poisoned during unbind of {:?}", dest);
2737 }
2738 }
2739
2740 pub fn lookup_addr(&self, actor_id: &reference::ActorId) -> Option<ChannelAddr> {
2742 let address_book = self.address_book.read().unwrap();
2743 let found = address_book
2744 .lower_bound(Excluded(&actor_id.clone().into()))
2745 .prev();
2746
2747 if let Some((key, addr)) = found
2750 && key.is_prefix_of(&actor_id.clone().into())
2751 {
2752 Some(addr.clone())
2753 } else {
2754 let addr = actor_id.proc_id().addr().clone();
2755 if self.direct_addressed_remote_only {
2756 addr.transport().is_remote().then_some(addr)
2757 } else {
2758 Some(addr)
2759 }
2760 }
2761 }
2762
2763 pub fn prefixes(&self) -> BTreeSet<reference::Reference> {
2766 let addrs = self.address_book.read().unwrap();
2767 let mut prefixes: BTreeSet<reference::Reference> = BTreeSet::new();
2768 for (reference, _) in addrs.iter() {
2769 match prefixes.lower_bound(Excluded(reference)).peek_prev() {
2770 Some(candidate) if candidate.is_prefix_of(reference) => (),
2771 _ => {
2772 prefixes.insert(reference.clone());
2773 }
2774 }
2775 }
2776
2777 prefixes
2778 }
2779
2780 fn dial(
2781 &self,
2782 addr: &ChannelAddr,
2783 actor_id: &reference::ActorId,
2784 ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2785 match self.sender_cache.entry(addr.clone()) {
2789 Entry::Occupied(entry) => Ok(entry.get().clone()),
2790 Entry::Vacant(entry) => {
2791 let tx = channel::dial(addr.clone()).map_err(|err| {
2792 MailboxSenderError::new_unbound_type(
2793 actor_id.clone(),
2794 MailboxSenderErrorKind::Channel(err),
2795 "unknown",
2796 )
2797 })?;
2798 let sender = MailboxClient::new(tx);
2799 Ok(entry.insert(Arc::new(sender)).value().clone())
2800 }
2801 }
2802 }
2803}
2804
2805impl MailboxSender for DialMailboxRouter {
2806 fn post_unchecked(
2807 &self,
2808 envelope: MessageEnvelope,
2809 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2810 ) {
2811 let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2812 self.default.post(envelope, return_handle);
2813 return;
2814 };
2815
2816 match self.dial(&addr, envelope.dest().actor_id()) {
2817 Err(err) => envelope.undeliverable(
2818 DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2819 return_handle,
2820 ),
2821 Ok(sender) => sender.post(envelope, return_handle),
2822 }
2823 }
2824}
2825
2826#[derive(Debug)]
2829pub struct UnroutableMailboxSender;
2830
2831impl MailboxSender for UnroutableMailboxSender {
2832 fn post_unchecked(
2833 &self,
2834 envelope: MessageEnvelope,
2835 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2836 ) {
2837 envelope.undeliverable(
2838 DeliveryError::Unroutable("destination not found in routing table".to_string()),
2839 return_handle,
2840 );
2841 }
2842}
2843
2844#[cfg(test)]
2845mod tests {
2846
2847 use std::assert_matches::assert_matches;
2848 use std::mem::drop;
2849 use std::str::FromStr;
2850 use std::sync::atomic::AtomicUsize;
2851 use std::time::Duration;
2852
2853 use timed_test::async_timed_test;
2854
2855 use super::*;
2856 use crate::Actor;
2857 use crate::ActorHandle;
2858 use crate::Instance;
2859 use crate::accum;
2860 use crate::accum::ReducerMode;
2861 use crate::channel::ChannelTransport;
2862 use crate::context::Mailbox as MailboxContext;
2863 use crate::proc::Proc;
2864 use crate::testing::ids::test_actor_id;
2865 use crate::testing::ids::test_port_id;
2866 use crate::testing::ids::test_proc_id;
2867
2868 fn test_proc_ref(name: &str) -> reference::Reference {
2869 reference::Reference::Proc(test_proc_id(name))
2870 }
2871
2872 fn test_actor_ref(proc_name: &str, actor_name: &str) -> reference::Reference {
2873 reference::Reference::Actor(test_actor_id(proc_name, actor_name))
2874 }
2875
2876 #[test]
2877 fn test_error() {
2878 use crate::testing::ids::test_actor_id_with_pid;
2879 let err = MailboxError::new(
2880 test_actor_id_with_pid("myworld_2", "myactor", 5),
2881 MailboxErrorKind::Closed,
2882 );
2883 assert!(format!("{}", err).ends_with(",test_myworld_2,myactor[5]: mailbox closed"));
2886 }
2887
2888 #[tokio::test]
2889 async fn test_mailbox_basic() {
2890 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
2891 let (port, mut receiver) = mbox.open_port::<u64>();
2892 let port = port.bind();
2893
2894 mbox.serialize_and_send(&port, 123, monitored_return_handle())
2895 .unwrap();
2896 mbox.serialize_and_send(&port, 321, monitored_return_handle())
2897 .unwrap();
2898 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2899 assert_eq!(receiver.recv().await.unwrap(), 321u64);
2900
2901 let serialized = wirevalue::Any::serialize(&999u64).unwrap();
2902 mbox.post(
2903 MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2904 monitored_return_handle(),
2905 );
2906 assert_eq!(receiver.recv().await.unwrap(), 999u64);
2907 }
2908
2909 #[tokio::test]
2910 async fn test_mailbox_accum() {
2911 let proc = Proc::local();
2912 let (client, _) = proc.instance("client").unwrap();
2913 let (port, mut receiver) = client
2914 .mailbox()
2915 .open_accum_port(accum::join_semilattice::<accum::Max<i64>>());
2916
2917 for i in -3..4 {
2918 port.send(&client, accum::Max(i)).unwrap();
2919 let received: accum::Max<i64> = receiver.recv().await.unwrap();
2920 let msg = received.get();
2921 assert_eq!(msg, &i);
2922 }
2923 for i in -3..4 {
2925 port.send(&client, accum::Max(i)).unwrap();
2926 assert_eq!(receiver.recv().await.unwrap().get(), &3);
2927 }
2928 port.send(&client, accum::Max(4)).unwrap();
2930 assert_eq!(receiver.recv().await.unwrap().get(), &4);
2931
2932 for i in 5..10 {
2934 port.send(&client, accum::Max(i)).unwrap();
2935 }
2936 assert_eq!(receiver.recv().await.unwrap().get(), &9);
2937 port.send(&client, accum::Max(1)).unwrap();
2938 port.send(&client, accum::Max(3)).unwrap();
2939 port.send(&client, accum::Max(2)).unwrap();
2940 assert_eq!(receiver.recv().await.unwrap().get(), &9);
2941 }
2942
2943 #[test]
2944 fn test_port_and_reducer() {
2945 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
2946 {
2948 let accumulator = accum::join_semilattice::<accum::Max<u64>>();
2949 let reducer_spec = accumulator.reducer_spec().unwrap();
2950 let (port, _) = mbox.open_accum_port(accum::join_semilattice::<accum::Max<u64>>());
2951 assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
2952 let port_ref = port.bind();
2953 assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
2954 }
2955 {
2957 let (port, _) = mbox.open_port::<u64>();
2958 assert_eq!(port.reducer_spec, None);
2959 let port_ref = port.bind();
2960 assert_eq!(port_ref.reducer_spec(), &None);
2961 }
2962 }
2963
2964 #[tokio::test]
2965 #[ignore] async fn test_mailbox_once() {
2967 let proc = Proc::local();
2968 let (client, _) = proc.instance("client").unwrap();
2969
2970 let (port, receiver) = client.open_once_port::<u64>();
2971
2972 port.send(&client, 123u64).unwrap();
2975 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2976
2977 }
2988
2989 #[tokio::test]
2990 #[ignore] async fn test_mailbox_receiver_drop() {
2992 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
2993 let (port, mut receiver) = mbox.open_port::<u64>();
2994 let port = port.bind();
2996 mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
2997 .unwrap();
2998 assert_eq!(receiver.recv().await.unwrap(), 123u64);
2999 drop(receiver);
3000 let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
3001 panic!();
3002 };
3003
3004 assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
3005 assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
3006 }
3007
3008 #[tokio::test]
3009 async fn test_drain() {
3010 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
3011
3012 let (port, mut receiver) = mbox.open_port();
3013 let port = port.bind();
3014
3015 for i in 0..10 {
3016 mbox.serialize_and_send(&port, i, monitored_return_handle())
3017 .unwrap();
3018 }
3019
3020 for i in 0..10 {
3021 assert_eq!(receiver.recv().await.unwrap(), i);
3022 }
3023
3024 assert!(receiver.drain().is_empty());
3025 }
3026
3027 #[tokio::test]
3028 async fn test_mailbox_muxer() {
3029 let muxer = MailboxMuxer::new();
3030
3031 let mbox0 = Mailbox::new_detached(test_actor_id("0", "actor1"));
3032 let mbox1 = Mailbox::new_detached(test_actor_id("0", "actor2"));
3033
3034 muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
3035 muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
3036
3037 let (port, receiver) = mbox0.open_once_port::<u64>();
3038
3039 let proc = Proc::configured(test_proc_id("0"), BoxedMailboxSender::new(muxer));
3040 let (client, _) = proc.instance("client").unwrap();
3041
3042 port.send(&client, 123u64).unwrap();
3043 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3044
3045 }
3055
3056 #[tokio::test]
3057 async fn test_local_client_server() {
3058 let mbox = Mailbox::new_detached(test_actor_id("0", "actor0"));
3059 let (tx, rx) = channel::local::new();
3060 let serve_handle = mbox.clone().serve(rx);
3061 let client = MailboxClient::new(tx);
3062
3063 let (port, receiver) = mbox.open_once_port::<u64>();
3064 let port = port.bind();
3065
3066 client
3067 .serialize_and_send_once(port, 123u64, monitored_return_handle())
3068 .unwrap();
3069 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3070 serve_handle.stop("fromt test");
3071 serve_handle.await.unwrap().unwrap();
3072 }
3073
3074 #[tokio::test]
3075 async fn test_mailbox_router() {
3076 let mbox0 = Mailbox::new_detached(test_actor_id("world0_0", "actor0"));
3077 let mbox1 = Mailbox::new_detached(test_actor_id("world1_0", "actor0"));
3078 let mbox2 = Mailbox::new_detached(test_actor_id("world1_1", "actor0"));
3079 let mbox3 = Mailbox::new_detached(test_actor_id("world1_1", "actor1"));
3080
3081 let comms: Vec<(reference::OncePortRef<u64>, OncePortReceiver<u64>)> =
3082 [&mbox0, &mbox1, &mbox2, &mbox3]
3083 .into_iter()
3084 .map(|mbox| {
3085 let (port, receiver) = mbox.open_once_port::<u64>();
3086 (port.bind(), receiver)
3087 })
3088 .collect();
3089
3090 let router = MailboxRouter::new();
3091
3092 router.bind(test_proc_id("world0_0").into(), mbox0);
3093 router.bind(test_proc_id("world1_0").into(), mbox1);
3094 router.bind(test_proc_id("world1_1").into(), mbox2);
3095 router.bind(test_actor_id("world1_1", "actor1").into(), mbox3);
3096
3097 for (i, (port, receiver)) in comms.into_iter().enumerate() {
3098 router
3099 .serialize_and_send_once(port, i as u64, monitored_return_handle())
3100 .unwrap();
3101 assert_eq!(receiver.recv().await.unwrap(), i as u64);
3102 }
3103
3104 let mbox4 = Mailbox::new_detached(test_actor_id("fallback_0", "actor"));
3107
3108 let (return_handle, mut return_receiver) =
3109 crate::mailbox::undeliverable::new_undeliverable_port();
3110 let (port, _receiver) = mbox4.open_once_port();
3111 router
3112 .serialize_and_send_once(port.bind(), 0, return_handle.clone())
3113 .unwrap();
3114 assert!(return_receiver.recv().await.is_ok());
3115
3116 let router = router.fallback(mbox4.clone().into_boxed());
3117 let (port, receiver) = mbox4.open_once_port();
3118 router
3119 .serialize_and_send_once(port.bind(), 0, return_handle)
3120 .unwrap();
3121 assert_eq!(receiver.recv().await.unwrap(), 0);
3122 }
3123
3124 #[tokio::test]
3125 async fn test_dial_mailbox_router() {
3126 let router = DialMailboxRouter::new();
3127
3128 router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
3129 router.bind(test_proc_ref("world1_0"), "unix!@2".parse().unwrap());
3130 router.bind(test_proc_ref("world1_1"), "unix!@3".parse().unwrap());
3131 router.bind(
3132 test_actor_ref("world1_1", "actor1"),
3133 "unix!@4".parse().unwrap(),
3134 );
3135 router.bind(
3137 "unix:@4,my_proc,my_actor".parse().unwrap(),
3138 "unix:@5".parse().unwrap(),
3139 );
3140
3141 router
3143 .lookup_addr(&test_actor_id("world0_0", "actor"))
3144 .unwrap();
3145 router
3146 .lookup_addr(&test_actor_id("world1_0", "actor"))
3147 .unwrap();
3148
3149 let actor_id = reference::Reference::from_str("unix:@4,my_proc,my_actor")
3150 .unwrap()
3151 .into_actor()
3152 .unwrap();
3153 assert_eq!(
3154 router.lookup_addr(&actor_id).unwrap(),
3155 "unix!@5".parse().unwrap(),
3156 );
3157 router.unbind(&actor_id.clone().into());
3158 assert_eq!(
3159 router.lookup_addr(&actor_id).unwrap(),
3160 "unix!@4".parse().unwrap(),
3161 );
3162
3163 let fallback = ChannelAddr::any(ChannelTransport::Local);
3168 router.unbind(&test_proc_ref("world1_0"));
3169 router.unbind(&test_proc_ref("world1_1"));
3170 assert_eq!(
3171 router
3172 .lookup_addr(&test_actor_id("world1_0", "actor1"))
3173 .unwrap(),
3174 fallback,
3175 );
3176 assert_eq!(
3177 router
3178 .lookup_addr(&test_actor_id("world1_1", "actor1"))
3179 .unwrap(),
3180 fallback,
3181 );
3182 router
3183 .lookup_addr(&test_actor_id("world0_0", "actor"))
3184 .unwrap();
3185 router.unbind(&test_proc_ref("world0_0"));
3186 assert_eq!(
3187 router
3188 .lookup_addr(&test_actor_id("world0_0", "actor"))
3189 .unwrap(),
3190 fallback,
3191 );
3192 }
3193
3194 #[tokio::test]
3195 #[ignore] async fn test_dial_mailbox_router_default() {
3197 let mbox0 = Mailbox::new_detached(test_actor_id("world0_0", "actor0"));
3198 let mbox1 = Mailbox::new_detached(test_actor_id("world1_0", "actor0"));
3199 let mbox2 = Mailbox::new_detached(test_actor_id("world1_1", "actor0"));
3200 let mbox3 = Mailbox::new_detached(test_actor_id("world1_1", "actor1"));
3201
3202 let root = MailboxRouter::new();
3205 let world0_router = DialMailboxRouter::new_with_default(root.boxed());
3206 let world1_router = DialMailboxRouter::new_with_default(root.boxed());
3207
3208 root.bind(test_proc_ref("world0"), world0_router.clone());
3209 root.bind(test_proc_ref("world1"), world1_router.clone());
3210
3211 let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
3212
3213 let mut handles = Vec::new(); for mbox in mailboxes.iter() {
3215 let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
3216 let handle = (*mbox).clone().serve(rx);
3217 handles.push(handle);
3218
3219 eprintln!("{}: {}", mbox.actor_id(), addr);
3220 if mbox.actor_id().proc_id().name().starts_with("world0") {
3221 world0_router.bind(mbox.actor_id().clone().into(), addr);
3222 } else {
3223 world1_router.bind(mbox.actor_id().clone().into(), addr);
3224 }
3225 }
3226
3227 for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
3229 for mbox in mailboxes.iter() {
3230 let (port, receiver) = mbox.open_once_port::<u64>();
3231 let port = port.bind();
3232 router
3233 .serialize_and_send_once(port, 123u64, monitored_return_handle())
3234 .unwrap();
3235 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3236 }
3237 }
3238 }
3239
3240 #[tokio::test]
3241 async fn test_enqueue_port() {
3242 let proc = Proc::local();
3243 let (client, _) = proc.instance("client").unwrap();
3244
3245 let count = Arc::new(AtomicUsize::new(0));
3246 let count_clone = count.clone();
3247 let port = client.mailbox().open_enqueue_port(move |_, n| {
3248 count_clone.fetch_add(n, Ordering::SeqCst);
3249 Ok(())
3250 });
3251
3252 port.send(&client, 10).unwrap();
3253 port.send(&client, 5).unwrap();
3254 port.send(&client, 1).unwrap();
3255 port.send(&client, 0).unwrap();
3256
3257 assert_eq!(count.load(Ordering::SeqCst), 16);
3258 }
3259
3260 #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3261 struct TestMessage;
3262
3263 #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3264 #[named(name = "some::custom::path")]
3265 struct TestMessage2;
3266
3267 #[test]
3268 fn test_remote_message_macros() {
3269 assert_eq!(
3270 TestMessage::typename(),
3271 "hyperactor::mailbox::tests::TestMessage"
3272 );
3273 assert_eq!(TestMessage2::typename(), "some::custom::path");
3274 }
3275
3276 #[test]
3277 fn test_message_envelope_display() {
3278 #[derive(typeuri::Named, Serialize, Deserialize)]
3279 struct MyTest {
3280 a: u64,
3281 b: String,
3282 }
3283 wirevalue::register_type!(MyTest);
3284
3285 let envelope = MessageEnvelope::serialize(
3286 test_actor_id("source_0", "actor"),
3287 test_port_id("dest_1", "actor", 123),
3288 &MyTest {
3289 a: 123,
3290 b: "hello".into(),
3291 },
3292 Flattrs::new(),
3293 )
3294 .unwrap();
3295
3296 assert!(format!("{}", envelope).contains("MyTest{\"a\":123,\"b\":\"hello\"}"));
3298 }
3299
3300 #[derive(Debug, Default)]
3301 struct Foo;
3302
3303 impl Actor for Foo {}
3304
3305 #[tokio::test]
3308 async fn test_actor_delivery_failure() {
3309 use crate::actor::ActorStatus;
3312 use crate::testing::proc_supervison::ProcSupervisionCoordinator;
3313
3314 let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
3315 BOXED_PANICKING_MAILBOX_SENDER.clone(),
3316 ));
3317 let proc_id = test_proc_id("quux_0");
3318 let mut proc = Proc::configured(proc_id.clone(), proc_forwarder);
3319 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3320 let (client, _) = proc.instance("client").unwrap();
3321
3322 let foo = proc.spawn("foo", Foo).unwrap();
3323 let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
3324 let message = MessageEnvelope::new(
3325 foo.actor_id().clone(),
3326 test_port_id("corge_0", "bar", 9999),
3327 wirevalue::Any::serialize(&1u64).unwrap(),
3328 Flattrs::new(),
3329 );
3330 return_handle.send(&client, Undeliverable(message)).unwrap();
3331
3332 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
3333
3334 let foo_status = foo.status();
3335 assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
3336 let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
3337 unreachable!()
3338 };
3339 let msg_str = msg.to_string();
3340 assert!(msg_str.contains("undeliverable message error"));
3341 assert!(msg_str.contains("sender:") && msg_str.contains("quux_0"));
3343 assert!(msg_str.contains("dest:") && msg_str.contains("corge_0"));
3344
3345 proc.destroy_and_wait::<()>(tokio::time::Duration::from_secs(1), None, "test cleanup")
3346 .await
3347 .unwrap();
3348 }
3349
3350 #[tokio::test]
3351 async fn test_detached_return_handle() {
3352 let (return_handle, mut return_receiver) =
3353 crate::mailbox::undeliverable::new_undeliverable_port();
3354 let envelope = MessageEnvelope::new(
3356 test_actor_id("foo_0", "bar"),
3357 test_port_id("baz_0", "corge", 9999),
3358 wirevalue::Any::serialize(&1u64).unwrap(),
3359 Flattrs::new(),
3360 );
3361 let proc = Proc::local();
3362 let (client, _) = proc.instance("client").unwrap();
3363 return_handle
3364 .send(&client, Undeliverable(envelope.clone()))
3365 .unwrap();
3366 assert!(
3368 tokio::time::timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
3369 .await
3370 .is_ok()
3371 );
3372 let monitor_handle = tokio::spawn(async move {
3375 while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
3376 envelope.set_error(DeliveryError::BrokenLink(
3377 "returned in unit test".to_string(),
3378 ));
3379 UndeliverableMailboxSender
3380 .post(envelope, monitored_return_handle());
3381 }
3382 });
3383 drop(return_handle);
3384 assert!(
3385 tokio::time::timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3386 .await
3387 .is_ok()
3388 );
3389 }
3390
3391 async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3392 fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3393 let dummy_actor_id = test_actor_id("world_0", "actor");
3396 let dummy_state = State::new(
3397 dummy_actor_id.clone(),
3398 BOXED_PANICKING_MAILBOX_SENDER.clone(),
3399 );
3400 let dummy_port_id = reference::PortId::new(dummy_actor_id, 0);
3401 let (sender, receiver) = mpsc::unbounded_channel::<M>();
3402 let receiver = PortReceiver {
3403 receiver,
3404 port_id: dummy_port_id,
3405 coalesce,
3406 mailbox: Mailbox {
3407 inner: Arc::new(dummy_state),
3408 },
3409 };
3410 (sender, receiver)
3411 }
3412
3413 {
3415 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3416 assert!(receiver.drain().is_empty());
3417
3418 sender.send(0).unwrap();
3419 sender.send(1).unwrap();
3420 sender.send(2).unwrap();
3421 sender.send(3).unwrap();
3422 sender.send(4).unwrap();
3423 sender.send(5).unwrap();
3424 sender.send(6).unwrap();
3425 sender.send(7).unwrap();
3426
3427 if drop_sender {
3428 drop(sender);
3429 }
3430
3431 if !coalesce {
3432 assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3433 } else {
3434 assert_eq!(receiver.drain(), vec![7]);
3435 }
3436
3437 assert!(receiver.drain().is_empty());
3438 assert!(receiver.drain().is_empty());
3439 }
3440
3441 {
3443 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3444 assert!(receiver.try_recv().unwrap().is_none());
3445
3446 sender.send(0).unwrap();
3447 sender.send(1).unwrap();
3448 sender.send(2).unwrap();
3449 sender.send(3).unwrap();
3450
3451 if drop_sender {
3452 drop(sender);
3453 }
3454
3455 if !coalesce {
3456 assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3457 assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3458 assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3459 }
3460 assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3461 if drop_sender {
3462 assert_matches!(
3463 receiver.try_recv().unwrap_err().kind(),
3464 MailboxErrorKind::Closed
3465 );
3466 assert_matches!(
3468 receiver.try_recv().unwrap_err().kind(),
3469 MailboxErrorKind::Closed
3470 );
3471 } else {
3472 assert!(receiver.try_recv().unwrap().is_none());
3473 assert!(receiver.try_recv().unwrap().is_none());
3475 }
3476 }
3477 {
3479 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3480 assert!(
3481 tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3482 .await
3483 .is_err()
3484 );
3485
3486 sender.send(4).unwrap();
3487 sender.send(5).unwrap();
3488 sender.send(6).unwrap();
3489 sender.send(7).unwrap();
3490
3491 if drop_sender {
3492 drop(sender);
3493 }
3494
3495 if !coalesce {
3496 assert_eq!(receiver.recv().await.unwrap(), 4);
3497 assert_eq!(receiver.recv().await.unwrap(), 5);
3498 assert_eq!(receiver.recv().await.unwrap(), 6);
3499 }
3500 assert_eq!(receiver.recv().await.unwrap(), 7);
3501 if drop_sender {
3502 assert_matches!(
3503 receiver.recv().await.unwrap_err().kind(),
3504 MailboxErrorKind::Closed
3505 );
3506 assert_matches!(
3508 receiver.recv().await.unwrap_err().kind(),
3509 MailboxErrorKind::Closed
3510 );
3511 } else {
3512 assert!(
3513 tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3514 .await
3515 .is_err()
3516 );
3517 }
3518 }
3519 }
3520
3521 #[tokio::test]
3522 async fn test_receiver_basic_default() {
3523 verify_receiver(false, false).await
3524 }
3525
3526 #[tokio::test]
3527 async fn test_receiver_basic_latest() {
3528 verify_receiver(true, false).await
3529 }
3530
3531 #[tokio::test]
3532 async fn test_receiver_after_sender_drop_default() {
3533 verify_receiver(false, true).await
3534 }
3535
3536 #[tokio::test]
3537 async fn test_receiver_after_sender_drop_latest() {
3538 verify_receiver(true, true).await
3539 }
3540
3541 struct Setup {
3542 receiver: PortReceiver<u64>,
3543 actor0: Instance<()>,
3544 actor1: Instance<()>,
3545 _actor0_handle: ActorHandle<()>,
3546 _actor1_handle: ActorHandle<()>,
3547 port_id: reference::PortId,
3548 port_id1: reference::PortId,
3549 port_id2: reference::PortId,
3550 port_id2_1: reference::PortId,
3551 }
3552
3553 async fn setup_split_port_ids(
3554 reducer_spec: Option<ReducerSpec>,
3555 reducer_mode: ReducerMode,
3556 ) -> Setup {
3557 let proc = Proc::local();
3558 let (actor0, actor0_handle) = proc.instance("actor0").unwrap();
3559 let (actor1, actor1_handle) = proc.instance("actor1").unwrap();
3560
3561 let (port_handle, receiver) = actor0.open_port::<u64>();
3563 let port_id = port_handle.bind().port_id().clone();
3564
3565 let port_id1 = port_id
3567 .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
3568 .unwrap();
3569 let port_id2 = port_id
3570 .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
3571 .unwrap();
3572
3573 let port_id2_1 = port_id2
3575 .split(&actor1, reducer_spec, reducer_mode.clone(), true)
3576 .unwrap();
3577
3578 Setup {
3579 receiver,
3580 actor0,
3581 actor1,
3582 _actor0_handle: actor0_handle,
3583 _actor1_handle: actor1_handle,
3584 port_id,
3585 port_id1,
3586 port_id2,
3587 port_id2_1,
3588 }
3589 }
3590
3591 fn post(cx: &impl context::Actor, port_id: reference::PortId, msg: u64) {
3592 let serialized = wirevalue::Any::serialize(&msg).unwrap();
3593 port_id.send(cx, serialized);
3594 }
3595
3596 #[async_timed_test(timeout_secs = 30)]
3597 #[cfg_attr(not(fbcode_build), ignore)]
3599 async fn test_split_port_id_no_reducer() {
3600 let Setup {
3601 mut receiver,
3602 actor0,
3603 actor1,
3604 port_id,
3605 port_id1,
3606 port_id2,
3607 port_id2_1,
3608 ..
3609 } = setup_split_port_ids(None, ReducerMode::default()).await;
3610 post(&actor0, port_id.clone(), 1);
3612 assert_eq!(receiver.recv().await.unwrap(), 1);
3613 post(&actor1, port_id1.clone(), 2);
3614 assert_eq!(receiver.recv().await.unwrap(), 2);
3615 post(&actor1, port_id2.clone(), 3);
3616 assert_eq!(receiver.recv().await.unwrap(), 3);
3617 post(&actor1, port_id2_1.clone(), 4);
3618 assert_eq!(receiver.recv().await.unwrap(), 4);
3619
3620 tokio::time::sleep(Duration::from_secs(2)).await;
3622 let msg = receiver.try_recv().unwrap();
3623 assert_eq!(msg, None);
3624 }
3625
3626 async fn wait_for(
3627 receiver: &mut PortReceiver<u64>,
3628 expected_size: usize,
3629 timeout_duration: Duration,
3630 ) -> anyhow::Result<Vec<u64>> {
3631 let mut messeges = vec![];
3632
3633 tokio::time::timeout(timeout_duration, async {
3634 loop {
3635 let msg = receiver.recv().await.unwrap();
3636 messeges.push(msg);
3637 if messeges.len() == expected_size {
3638 break;
3639 }
3640 }
3641 })
3642 .await?;
3643 Ok(messeges)
3644 }
3645
3646 #[async_timed_test(timeout_secs = 30)]
3647 async fn test_split_port_id_sum_reducer() {
3648 let config = hyperactor_config::global::lock();
3649 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3650
3651 let sum_accumulator = accum::sum::<u64>();
3652 let reducer_spec = sum_accumulator.reducer_spec();
3653 let Setup {
3654 mut receiver,
3655 actor0,
3656 actor1,
3657 port_id,
3658 port_id1,
3659 port_id2,
3660 port_id2_1,
3661 ..
3662 } = setup_split_port_ids(reducer_spec, ReducerMode::default()).await;
3663 post(&actor0, port_id.clone(), 4);
3664 post(&actor1, port_id1.clone(), 2);
3665 post(&actor1, port_id2.clone(), 3);
3666 post(&actor1, port_id2_1.clone(), 1);
3667 let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3668 .await
3669 .unwrap();
3670 messages.sort();
3673 assert_eq!(messages, vec![1, 2, 3, 4]);
3674
3675 tokio::time::sleep(Duration::from_secs(2)).await;
3677 let msg = receiver.try_recv().unwrap();
3678 assert_eq!(msg, None);
3679 }
3680
3681 #[async_timed_test(timeout_secs = 30)]
3682 #[cfg_attr(not(fbcode_build), ignore)]
3684 async fn test_split_port_id_every_n_messages() {
3685 let config = hyperactor_config::global::lock();
3686 let _config_guard =
3687 config.override_key(crate::config::SPLIT_MAX_BUFFER_AGE, Duration::from_mins(10));
3688 let proc = Proc::local();
3689 let (actor, _actor_handle) = proc.instance("actor").unwrap();
3690 let (port_handle, mut receiver) = actor.open_port::<u64>();
3691 let port_id = port_handle.bind().port_id().clone();
3692 let reducer_spec = accum::sum::<u64>().reducer_spec();
3694 let split_port_id = port_id
3695 .split(
3696 &actor,
3697 reducer_spec,
3698 ReducerMode::Streaming(accum::StreamingReducerOpts {
3699 max_update_interval: Some(Duration::from_mins(10)),
3700 initial_update_interval: Some(Duration::from_mins(10)),
3701 }),
3702 true,
3703 )
3704 .unwrap();
3705
3706 for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3708 post(&actor, split_port_id.clone(), msg);
3709 }
3710 let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3713 .await
3714 .unwrap();
3715 assert_eq!(messages, vec![15]);
3716
3717 tokio::time::sleep(Duration::from_secs(2)).await;
3720 let msg = receiver.try_recv().unwrap();
3721 assert_eq!(msg, None);
3722 }
3723
3724 #[async_timed_test(timeout_secs = 30)]
3725 async fn test_split_port_timeout_flush() {
3726 let config = hyperactor_config::global::lock();
3727 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
3728
3729 let Setup {
3730 mut receiver,
3731 actor0: _,
3732 actor1,
3733 port_id: _,
3734 port_id1,
3735 port_id2: _,
3736 port_id2_1: _,
3737 ..
3738 } = setup_split_port_ids(
3739 Some(accum::sum::<u64>().reducer_spec().unwrap()),
3740 ReducerMode::Streaming(accum::StreamingReducerOpts {
3741 max_update_interval: Some(Duration::from_millis(50)),
3742 initial_update_interval: Some(Duration::from_millis(50)),
3743 }),
3744 )
3745 .await;
3746
3747 post(&actor1, port_id1.clone(), 10);
3748 post(&actor1, port_id1.clone(), 20);
3749 post(&actor1, port_id1.clone(), 30);
3750
3751 tokio::time::sleep(Duration::from_millis(10)).await;
3753 let msg = receiver.try_recv().unwrap();
3754 assert_eq!(msg, None);
3755
3756 tokio::time::sleep(Duration::from_millis(100)).await;
3758
3759 let msg = receiver.recv().await.unwrap();
3761 assert_eq!(msg, 60); let msg = receiver.try_recv().unwrap();
3765 assert_eq!(msg, None);
3766 }
3767
3768 #[async_timed_test(timeout_secs = 30)]
3769 async fn test_split_port_timeout_and_size_flush() {
3770 let config = hyperactor_config::global::lock();
3771 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
3772
3773 let Setup {
3774 mut receiver,
3775 actor0: _,
3776 actor1,
3777 port_id: _,
3778 port_id1,
3779 port_id2: _,
3780 port_id2_1: _,
3781 ..
3782 } = setup_split_port_ids(
3783 Some(accum::sum::<u64>().reducer_spec().unwrap()),
3784 ReducerMode::Streaming(accum::StreamingReducerOpts {
3785 max_update_interval: Some(Duration::from_millis(50)),
3786 initial_update_interval: Some(Duration::from_millis(50)),
3787 }),
3788 )
3789 .await;
3790
3791 post(&actor1, port_id1.clone(), 10);
3792 post(&actor1, port_id1.clone(), 20);
3793 post(&actor1, port_id1.clone(), 30);
3794 post(&actor1, port_id1.clone(), 40);
3795
3796 let msg = receiver.recv().await.unwrap();
3798 assert_eq!(msg, 60);
3799
3800 let msg = receiver.recv().await.unwrap();
3802 assert_eq!(msg, 40);
3803
3804 let msg = receiver.try_recv().unwrap();
3806 assert_eq!(msg, None);
3807 }
3808
3809 #[async_timed_test(timeout_secs = 30)]
3810 async fn test_split_port_once_mode_basic() {
3811 let proc = Proc::local();
3812 let (actor, _actor_handle) = proc.instance("actor").unwrap();
3813 let (port_handle, mut receiver) = actor.open_port::<u64>();
3814 let port_id = port_handle.bind().port_id().clone();
3815
3816 let reducer_spec = accum::sum::<u64>().reducer_spec();
3818 let split_port_id = port_id
3819 .split(&actor, reducer_spec, ReducerMode::Once(3), true)
3820 .unwrap();
3821
3822 post(&actor, split_port_id.clone(), 10);
3824 post(&actor, split_port_id.clone(), 20);
3825 post(&actor, split_port_id.clone(), 30);
3826
3827 let msg = receiver.recv().await.unwrap();
3829 assert_eq!(msg, 60); tokio::time::sleep(Duration::from_millis(100)).await;
3833 let msg = receiver.try_recv().unwrap();
3834 assert_eq!(msg, None);
3835 }
3836
3837 #[async_timed_test(timeout_secs = 30)]
3838 async fn test_split_port_once_mode_teardown() {
3839 let proc = Proc::local();
3840 let (actor, _actor_handle) = proc.instance("actor").unwrap();
3841 let (port_handle, mut receiver) = actor.open_port::<u64>();
3842 let port_id = port_handle.bind().port_id().clone();
3843
3844 let (undeliverable_handle, mut undeliverable_receiver) =
3846 undeliverable::new_undeliverable_port();
3847
3848 let reducer_spec = accum::sum::<u64>().reducer_spec();
3850 let split_port_id = port_id
3851 .split(&actor, reducer_spec, ReducerMode::Once(3), true)
3852 .unwrap();
3853
3854 post(&actor, split_port_id.clone(), 10);
3856 post(&actor, split_port_id.clone(), 20);
3857 post(&actor, split_port_id.clone(), 30);
3858
3859 let msg = receiver.recv().await.unwrap();
3861 assert_eq!(msg, 60); let serialized = wirevalue::Any::serialize(&100u64).unwrap();
3865 let envelope = MessageEnvelope::new(
3866 actor.mailbox().actor_id().clone(),
3867 split_port_id.clone(),
3868 serialized,
3869 Flattrs::new(),
3870 );
3871 actor.mailbox().post(envelope, undeliverable_handle);
3872
3873 let undeliverable =
3875 tokio::time::timeout(Duration::from_secs(2), undeliverable_receiver.recv())
3876 .await
3877 .expect("should receive undeliverable message")
3878 .expect("receiver should not be closed");
3879
3880 assert_eq!(undeliverable.0.dest(), &split_port_id);
3882
3883 let msg = receiver.try_recv().unwrap();
3885 assert_eq!(msg, None);
3886 }
3887
3888 #[test]
3889 fn test_dial_mailbox_router_prefixes_empty() {
3890 assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
3891 }
3892
3893 #[test]
3894 fn test_dial_mailbox_router_prefixes_single_entry() {
3895 let router = DialMailboxRouter::new();
3896 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3897
3898 let prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3899 assert_eq!(prefixes.len(), 1);
3900 assert_eq!(prefixes[0], test_proc_ref("world0"));
3901 }
3902
3903 #[test]
3904 fn test_dial_mailbox_router_prefixes_no_overlap() {
3905 let router = DialMailboxRouter::new();
3906 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3907 router.bind(test_proc_ref("world1"), "unix!@2".parse().unwrap());
3908 router.bind(test_proc_ref("world2"), "unix!@3".parse().unwrap());
3909
3910 let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3911 prefixes.sort();
3912
3913 let mut expected = vec![
3914 test_proc_ref("world0"),
3915 test_proc_ref("world1"),
3916 test_proc_ref("world2"),
3917 ];
3918 expected.sort();
3919
3920 assert_eq!(prefixes, expected);
3921 }
3922
3923 #[test]
3924 fn test_dial_mailbox_router_prefixes_with_overlaps() {
3925 let router = DialMailboxRouter::new();
3926 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3928 router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
3929 router.bind(test_proc_ref("world0_1"), "unix!@3".parse().unwrap());
3930 router.bind(test_proc_ref("world1"), "unix!@4".parse().unwrap());
3931 router.bind(test_proc_ref("world1_0"), "unix!@5".parse().unwrap());
3932
3933 let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3934 prefixes.sort();
3935
3936 let mut expected = vec![
3937 test_proc_ref("world0"),
3938 test_proc_ref("world0_0"),
3939 test_proc_ref("world0_1"),
3940 test_proc_ref("world1"),
3941 test_proc_ref("world1_0"),
3942 ];
3943 expected.sort();
3944
3945 assert_eq!(prefixes, expected);
3946 }
3947
3948 #[test]
3949 fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
3950 let router = DialMailboxRouter::new();
3951 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3953 router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
3954 router.bind(
3955 test_actor_ref("world0_0", "actor1"),
3956 "unix!@3".parse().unwrap(),
3957 );
3958 router.bind(test_proc_ref("world1_0"), "unix!@4".parse().unwrap());
3959 router.bind(test_proc_ref("world1_1"), "unix!@5".parse().unwrap());
3960 router.bind(
3961 test_actor_ref("world2_0", "actor0"),
3962 "unix!@6".parse().unwrap(),
3963 );
3964
3965 let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3966 prefixes.sort();
3967
3968 let mut expected = vec![
3975 test_proc_ref("world0"),
3976 test_proc_ref("world0_0"),
3977 test_proc_ref("world1_0"),
3978 test_proc_ref("world1_1"),
3979 test_actor_ref("world2_0", "actor0"),
3980 ];
3981 expected.sort();
3982
3983 assert_eq!(prefixes, expected);
3984 }
3985
3986 #[test]
3987 fn test_dial_mailbox_router_prefixes_same_level() {
3988 let router = DialMailboxRouter::new();
3989 router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
3990 router.bind(test_proc_ref("world0_1"), "unix!@2".parse().unwrap());
3991 router.bind(test_proc_ref("world0_2"), "unix!@3".parse().unwrap());
3992
3993 let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3994 prefixes.sort();
3995
3996 let mut expected = vec![
3998 test_proc_ref("world0_0"),
3999 test_proc_ref("world0_1"),
4000 test_proc_ref("world0_2"),
4001 ];
4002 expected.sort();
4003
4004 assert_eq!(prefixes, expected);
4005 }
4006
4007 #[derive(Clone, Debug)]
4011 struct AsyncLoopForwarder;
4012
4013 impl MailboxSender for AsyncLoopForwarder {
4014 fn post_unchecked(
4015 &self,
4016 envelope: MessageEnvelope,
4017 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
4018 ) {
4019 let me = self.clone();
4020 tokio::spawn(async move {
4021 me.post(envelope, return_handle);
4023 });
4024 }
4025 }
4026
4027 #[tokio::test]
4028 async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
4029 let actor_id = test_actor_id("world_0", "ttl_actor");
4030 let mailbox = Mailbox::new(
4031 actor_id.clone(),
4032 BoxedMailboxSender::new(AsyncLoopForwarder),
4033 );
4034 let (ret_port, mut ret_rx) = mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
4035
4036 let remote_actor = test_actor_id("remote_world_1", "remote");
4039 let dest = reference::PortId::new(remote_actor.clone(), 4242);
4040
4041 let payload = 1234_u64;
4044 let envelope =
4045 MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Flattrs::new())
4046 .expect("serialize");
4047
4048 let return_handle = ret_port.clone();
4051 mailbox.post(envelope, return_handle);
4052
4053 let Undeliverable(undelivered) =
4055 tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
4056 .await
4057 .expect("timed out waiting for undeliverable")
4058 .expect("channel closed");
4059
4060 let got: u64 = undelivered.deserialized().expect("deserialize");
4062 assert_eq!(got, payload, "payload preserved");
4063 }
4064
4065 #[tokio::test]
4066 async fn message_ttl_success_local_delivery() {
4067 let actor_id = test_actor_id("world_0", "ttl_actor");
4068 let mailbox = Mailbox::new(
4069 actor_id.clone(),
4070 BoxedMailboxSender::new(PanickingMailboxSender),
4071 );
4072 let (_undeliverable_tx, mut undeliverable_rx) =
4073 mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
4074
4075 let (user_port, mut user_rx) = mailbox.open_port::<u64>();
4077
4078 let payload = 0xC0FFEE_u64;
4080 let envelope = MessageEnvelope::serialize(
4081 actor_id.clone(),
4082 user_port.bind().port_id().clone(),
4083 &payload,
4084 Flattrs::new(),
4085 )
4086 .expect("serialize");
4087
4088 let return_handle = mailbox
4091 .bound_return_handle()
4092 .unwrap_or(monitored_return_handle());
4093 mailbox.post(envelope, return_handle);
4094
4095 let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
4097 .await
4098 .expect("timed out waiting for local delivery")
4099 .expect("user port closed");
4100 assert_eq!(got, payload);
4101
4102 let no_undeliverable =
4104 tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
4105 assert!(
4106 no_undeliverable.is_err(),
4107 "unexpected undeliverable returned on successful local delivery"
4108 );
4109 }
4110
4111 #[tokio::test]
4112 async fn test_port_contramap() {
4113 let proc = Proc::local();
4114 let (client, _) = proc.instance("client").unwrap();
4115 let (handle, mut rx) = client.open_port();
4116
4117 handle
4118 .contramap(|m| (1, m))
4119 .send(&client, "hello".to_string())
4120 .unwrap();
4121 assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
4122 }
4123
4124 #[test]
4125 #[should_panic(expected = "already bound")]
4126 fn test_bind_port_handle_to_actor_port_twice() {
4127 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4128 let (handle, _rx) = mbox.open_port::<String>();
4129 handle.bind_actor_port();
4130 handle.bind_actor_port();
4131 }
4132
4133 #[test]
4134 fn test_bind_port_handle_to_actor_port() {
4135 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4136 let default_port = mbox.actor_id().port_id(String::port());
4137 let (handle, _rx) = mbox.open_port::<String>();
4138 assert_ne!(default_port.index(), handle.port_index);
4140 handle.bind_actor_port();
4142 assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4143 handle.bind();
4145 handle.bind();
4146 assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4147 }
4148
4149 #[test]
4150 #[should_panic(expected = "already bound")]
4151 fn test_bind_port_handle_to_actor_port_when_already_bound() {
4152 let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4153 let (handle, _rx) = mbox.open_port::<String>();
4154 handle.bind();
4156 assert_matches!(handle.location(), PortLocation::Bound(port) if port.index() == handle.port_index);
4157 handle.bind_actor_port();
4159 }
4160
4161 #[tokio::test]
4162 async fn test_mailbox_post_fails_when_actor_stopped() {
4163 let actor_id = test_actor_id("0", "stopped_actor");
4164
4165 let mailbox = Mailbox::new(
4166 actor_id.clone(),
4167 BoxedMailboxSender::new(PanickingMailboxSender),
4168 );
4169
4170 mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4171
4172 let (user_port, _user_rx) = mailbox.open_port::<u64>();
4173
4174 let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4177
4178 let envelope = MessageEnvelope::serialize(
4179 actor_id.clone(),
4180 user_port.bind().port_id().clone(),
4181 &42u64,
4182 Flattrs::new(),
4183 )
4184 .expect("serialize");
4185
4186 mailbox.post(envelope, return_handle);
4187
4188 let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4189 .await
4190 .expect("timed out waiting for undeliverable")
4191 .expect("return port closed");
4192
4193 let err = undeliverable.0.error_msg().expect("expected error");
4194 assert!(
4195 err.contains(&format!("owner {} is stopped", actor_id)),
4196 "error should indicate actor stopped: {}",
4197 err
4198 );
4199 }
4200
4201 #[tokio::test]
4202 async fn test_mailbox_post_fails_when_actor_failed() {
4203 use crate::actor::ActorErrorKind;
4204
4205 let actor_id = test_actor_id("0", "failed_actor");
4206
4207 let mailbox = Mailbox::new(
4208 actor_id.clone(),
4209 BoxedMailboxSender::new(PanickingMailboxSender),
4210 );
4211
4212 let (user_port, _user_rx) = mailbox.open_port::<u64>();
4213
4214 mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4215 "test failure".to_string(),
4216 )));
4217
4218 let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4221
4222 let envelope = MessageEnvelope::serialize(
4223 actor_id.clone(),
4224 user_port.bind().port_id().clone(),
4225 &42u64,
4226 Flattrs::new(),
4227 )
4228 .expect("serialize");
4229
4230 mailbox.post(envelope, return_handle);
4231
4232 let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4233 .await
4234 .expect("timed out waiting for undeliverable")
4235 .expect("return port closed");
4236
4237 let err = undeliverable.0.error_msg().expect("expected error");
4238 assert!(
4239 err.contains(&format!("owner {} failed", actor_id)),
4240 "error should indicate actor failed: {}",
4241 err
4242 );
4243 }
4244
4245 #[tokio::test]
4246 async fn test_port_handle_send_fails_when_actor_stopped() {
4247 let actor_id = test_actor_id("0", "stopped_actor");
4248
4249 let mailbox = Mailbox::new(
4250 actor_id.clone(),
4251 BoxedMailboxSender::new(PanickingMailboxSender),
4252 );
4253
4254 let (port_handle, _rx) = mailbox.open_port::<u64>();
4255 let proc = Proc::local();
4256 let (client, _) = proc.instance("client").unwrap();
4257
4258 mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4259
4260 let result = port_handle.send(&client, 42u64);
4261
4262 assert!(result.is_err(), "send should fail when actor is stopped");
4263 let err = result.unwrap_err();
4264 assert_matches!(
4265 err.kind(),
4266 MailboxSenderErrorKind::Mailbox(mailbox_err)
4267 if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "test stop")
4268 );
4269 }
4270
4271 #[tokio::test]
4272 async fn test_port_handle_send_fails_when_actor_failed() {
4273 use crate::actor::ActorErrorKind;
4274
4275 let actor_id = test_actor_id("0", "failed_actor");
4276
4277 let mailbox = Mailbox::new(
4278 actor_id.clone(),
4279 BoxedMailboxSender::new(PanickingMailboxSender),
4280 );
4281
4282 let (port_handle, _rx) = mailbox.open_port::<u64>();
4283 let proc = Proc::local();
4284 let (client, _) = proc.instance("client").unwrap();
4285
4286 mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4287 "test failure".to_string(),
4288 )));
4289
4290 let result = port_handle.send(&client, 42u64);
4291
4292 assert!(result.is_err(), "send should fail when actor is failed");
4293 let err = result.unwrap_err();
4294 assert_matches!(
4295 err.kind(),
4296 MailboxSenderErrorKind::Mailbox(mailbox_err)
4297 if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg))) if msg == "test failure")
4298 );
4299 }
4300
4301 #[async_timed_test(timeout_secs = 30)]
4302 async fn test_open_reduce_port() {
4303 let proc = Proc::local();
4304 let (client, _) = proc.instance("client").unwrap();
4305
4306 let (port_handle, receiver) = client.mailbox().open_reduce_port(accum::sum::<u64>());
4308
4309 let port_ref = port_handle.bind();
4311 assert!(port_ref.reducer_spec().is_some());
4312
4313 port_ref.send(&client, 42).unwrap();
4315
4316 let result = receiver.recv().await.unwrap();
4318 assert_eq!(result, 42);
4319 }
4320
4321 #[async_timed_test(timeout_secs = 30)]
4322 async fn test_open_reduce_port_reducer_spec_preserved() {
4323 let proc = Proc::local();
4324 let (client, _) = proc.instance("client").unwrap();
4325
4326 let (sum_handle, _) = client.mailbox().open_reduce_port(accum::sum::<u64>());
4328 let sum_ref = sum_handle.bind();
4329 let sum_typehash = sum_ref.reducer_spec().as_ref().unwrap().typehash;
4330
4331 let (max_handle, _) = client
4332 .mailbox()
4333 .open_reduce_port(accum::join_semilattice::<accum::Max<u64>>());
4334 let max_ref = max_handle.bind();
4335 let max_typehash = max_ref.reducer_spec().as_ref().unwrap().typehash;
4336
4337 assert_ne!(sum_typehash, max_typehash);
4339 }
4340}