1use std::any::Any;
108use std::collections::BTreeMap;
109use std::collections::BTreeSet;
110use std::fmt;
111use std::fmt::Debug;
112use std::future;
113use std::future::Future;
114use std::ops::Bound::Excluded;
115use std::pin::Pin;
116use std::sync::Arc;
117use std::sync::Condvar;
118use std::sync::Mutex;
119use std::sync::RwLock;
120use std::sync::Weak;
121use std::sync::atomic::AtomicU64;
122use std::sync::atomic::AtomicUsize;
123use std::sync::atomic::Ordering;
124use std::task::Context;
125use std::task::Poll;
126
127use async_trait::async_trait;
128use dashmap::DashMap;
129use dashmap::mapref::entry::Entry;
130use futures::Sink;
131use futures::Stream;
132use hyperactor_config::Flattrs;
133use hyperactor_telemetry::hash_to_u64;
134use serde::Deserialize;
135use serde::Serialize;
136use serde::de::DeserializeOwned;
137use tokio::sync::mpsc;
138use tokio::sync::oneshot;
139use tokio::sync::watch;
140use tokio::task::JoinHandle;
141use tokio_util::sync::CancellationToken;
142use typeuri::Named;
143
144use crate::ActorAddr;
145use crate::Addr;
146use crate::Endpoint;
147use crate::EndpointLocation;
148use crate::OncePortRef;
150use crate::PortAddr;
151use crate::PortRef;
152use crate::Proc;
153use crate::ProcAddr;
154use crate::accum::Accumulator;
155use crate::accum::ReducerSpec;
156use crate::accum::StreamingReducerOpts;
157use crate::actor::ActorStatus;
158use crate::actor::Signal;
159use crate::actor::remote::USER_PORT_OFFSET;
160use crate::channel;
161use crate::channel::ChannelAddr;
162use crate::channel::ChannelError;
163use crate::channel::ChannelTransport;
164use crate::channel::SendError;
165use crate::channel::TxStatus;
166use crate::context;
167use crate::gateway::Gateway;
168use crate::id::ActorId;
169use crate::metrics;
170use crate::ordering::SEQ_INFO;
171use crate::ordering::SeqInfo;
172use crate::port::Port;
173
174mod undeliverable;
175pub use undeliverable::LostMessage;
177pub use undeliverable::Undeliverable;
178pub use undeliverable::UndeliverableMessageError;
179pub use undeliverable::custom_monitored_return_handle;
180pub use undeliverable::monitored_return_handle; pub mod mailbox_admin_message;
183pub use mailbox_admin_message::MailboxAdminMessage;
184pub use mailbox_admin_message::MailboxAdminMessageHandler;
185pub mod headers;
187
188pub trait Message: Send + Sync + 'static {}
191impl<M: Send + Sync + 'static> Message for M {}
192
193pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
197
198impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
199
200pub type Data = Vec<u8>;
202
203#[derive(
205 thiserror::Error,
206 Debug,
207 Serialize,
208 Deserialize,
209 typeuri::Named,
210 Clone,
211 PartialEq,
212 Eq
213)]
214pub enum DeliveryError {
215 #[error("address not routable: {0}")]
217 Unroutable(String),
218
219 #[error("broken link: {0}")]
222 BrokenLink(String),
223
224 #[error("mailbox error: {0}")]
226 Mailbox(String),
227
228 #[error("multicast error: {0}")]
230 Multicast(String),
231
232 #[error("ttl expired")]
234 TtlExpired,
235}
236
237#[derive(Debug, Serialize, Deserialize, Clone, typeuri::Named)]
241pub struct MessageEnvelope {
242 sender: ActorAddr,
244
245 dest: PortAddr,
247
248 data: wirevalue::Any,
250
251 errors: Vec<DeliveryError>,
253
254 headers: Flattrs,
256
257 ttl: u8,
259
260 return_undeliverable: bool,
263 }
265wirevalue::register_type!(MessageEnvelope);
266
267impl MessageEnvelope {
268 pub fn new(
270 sender: impl Into<ActorAddr>,
271 dest: impl Into<PortAddr>,
272 data: wirevalue::Any,
273 headers: Flattrs,
274 ) -> Self {
275 let sender = sender.into();
276 let dest = dest.into();
277 Self {
278 sender,
279 dest,
280 data,
281 errors: Vec::new(),
282 headers,
283 ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
284 return_undeliverable: true,
286 }
287 }
288
289 pub(crate) fn new_unknown(dest: impl Into<PortAddr>, data: wirevalue::Any) -> Self {
291 let unknown_addr = ChannelAddr::any(ChannelTransport::Local);
293 let unknown_proc_ref = ProcAddr::instance(unknown_addr, "unknown");
294 let unknown_actor_ref =
295 ActorAddr::root(unknown_proc_ref, crate::id::Label::strip("unknown"));
296 Self::new(unknown_actor_ref, dest, data, Flattrs::new())
297 }
298
299 pub fn serialize<T: Serialize + Named>(
301 source: impl Into<ActorAddr>,
302 dest: impl Into<PortAddr>,
303 value: &T,
304 headers: Flattrs,
305 ) -> Result<Self, wirevalue::Error> {
306 Ok(Self {
307 headers,
308 data: wirevalue::Any::serialize(value)?,
309 sender: source.into(),
310 dest: dest.into(),
311 errors: Vec::new(),
312 ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
313 return_undeliverable: true,
315 })
316 }
317
318 pub fn ttl(&self) -> u8 {
324 self.ttl
325 }
326
327 pub fn set_ttl(mut self, ttl: u8) -> Self {
337 self.ttl = ttl;
338 self
339 }
340
341 fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
349 if self.ttl == 0 {
350 Err(DeliveryError::TtlExpired)
351 } else {
352 self.ttl -= 1;
353 Ok(())
354 }
355 }
356
357 pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
359 Ok(self.data.deserialized()?)
360 }
361
362 pub fn data(&self) -> &wirevalue::Any {
364 &self.data
365 }
366
367 pub fn sender(&self) -> &ActorAddr {
369 &self.sender
370 }
371
372 pub fn dest(&self) -> &PortAddr {
374 &self.dest
375 }
376
377 pub fn headers(&self) -> &Flattrs {
379 &self.headers
380 }
381
382 pub fn is_signal(&self) -> bool {
384 self.dest.index() == Signal::port()
385 }
386
387 pub fn set_error(&mut self, error: DeliveryError) {
390 self.errors.push(error)
391 }
392
393 pub fn update_sender(&mut self, sender: impl Into<ActorAddr>) {
397 self.sender = sender.into();
398 }
399
400 pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
404 self.return_undeliverable = return_undeliverable;
405 }
406
407 pub fn undeliverable(
411 mut self,
412 error: DeliveryError,
413 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
414 ) {
415 tracing::debug!(
416 name = "undelivered_message_attempt",
417 sender = self.sender.to_string(),
418 dest = self.dest.to_string(),
419 error = error.to_string(),
420 return_handle = %return_handle,
421 );
422 metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
423 1,
424 hyperactor_telemetry::kv_pairs!(
425 "sender_actor_id" => self.sender.to_string(),
426 "dest_actor_id" => self.dest.to_string(),
427 "message_type" => self.data.typename().unwrap_or("unknown"),
428 "error_type" => error.to_string(),
429 ),
430 );
431
432 self.set_error(error);
433 undeliverable::return_undeliverable(return_handle, self);
434 }
435
436 pub fn errors(&self) -> &Vec<DeliveryError> {
439 &self.errors
440 }
441
442 pub fn error_msg(&self) -> Option<String> {
446 if self.errors.is_empty() {
447 None
448 } else {
449 Some(
450 self.errors
451 .iter()
452 .map(|e| e.to_string())
453 .collect::<Vec<_>>()
454 .join("; "),
455 )
456 }
457 }
458
459 fn open(self) -> (MessageMetadata, wirevalue::Any) {
460 let Self {
461 sender,
462 dest,
463 data,
464 errors,
465 headers,
466 ttl,
467 return_undeliverable,
468 } = self;
469
470 (
471 MessageMetadata {
472 sender,
473 dest,
474 errors,
475 headers,
476 ttl,
477 return_undeliverable,
478 },
479 data,
480 )
481 }
482
483 fn seal(metadata: MessageMetadata, data: wirevalue::Any) -> Self {
484 let MessageMetadata {
485 sender,
486 dest,
487 errors,
488 headers,
489 ttl,
490 return_undeliverable,
491 } = metadata;
492
493 Self {
494 sender,
495 dest,
496 data,
497 errors,
498 headers,
499 ttl,
500 return_undeliverable,
501 }
502 }
503
504 fn return_undeliverable(&self) -> bool {
505 self.return_undeliverable
506 }
507
508 pub fn set_header<T: Serialize>(&mut self, key: hyperactor_config::attrs::Key<T>, value: T) {
510 self.headers.set(key, value);
511 }
512}
513
514impl fmt::Display for MessageEnvelope {
515 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516 match &self.error_msg() {
517 None => write!(
518 f,
519 "{} > {}: {} {{{}}}",
520 self.sender, self.dest, self.data, self.headers
521 ),
522 Some(err) => write!(
523 f,
524 "{} > {}: {} {{{}}}: delivery error: {}",
525 self.sender, self.dest, self.data, self.headers, err
526 ),
527 }
528 }
529}
530
531#[derive(Clone)]
533pub struct MessageMetadata {
534 sender: ActorAddr,
535 dest: PortAddr,
536 errors: Vec<DeliveryError>,
537 headers: Flattrs,
538 ttl: u8,
539 return_undeliverable: bool,
540}
541
542#[derive(Debug)]
545pub struct MailboxError {
546 actor_id: ActorAddr,
547 kind: MailboxErrorKind,
548}
549
550#[derive(thiserror::Error, Debug)]
553#[non_exhaustive]
554pub enum MailboxErrorKind {
555 #[error("mailbox closed")]
557 Closed,
558
559 #[error("invalid port: {0}")]
561 InvalidPort(PortAddr),
562
563 #[error("no sender for port: {0}")]
565 NoSenderForPort(PortAddr),
566
567 #[error("no local sender for port: {0}")]
570 NoLocalSenderForPort(PortAddr),
571
572 #[error("{0}: port closed")]
574 PortClosed(PortAddr),
575
576 #[error("send {0}: {1}")]
578 Send(PortAddr, #[source] anyhow::Error),
579
580 #[error("recv {0}: {1}")]
582 Recv(PortAddr, #[source] anyhow::Error),
583
584 #[error("serialize: {0}")]
586 Serialize(#[source] anyhow::Error),
587
588 #[error("deserialize {0}: {1}")]
590 Deserialize(&'static str, anyhow::Error),
591
592 #[error(transparent)]
594 Channel(#[from] ChannelError),
595
596 #[error("owner terminated: {0}")]
598 OwnerTerminated(ActorStatus),
599}
600
601impl MailboxError {
602 pub fn new(actor_id: impl Into<ActorAddr>, kind: MailboxErrorKind) -> Self {
605 Self {
606 actor_id: actor_id.into(),
607 kind,
608 }
609 }
610
611 pub fn actor_addr(&self) -> &ActorAddr {
613 &self.actor_id
614 }
615
616 pub fn kind(&self) -> &MailboxErrorKind {
618 &self.kind
619 }
620}
621
622impl fmt::Display for MailboxError {
623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624 write!(f, "{}: ", self.actor_id)?;
625 fmt::Display::fmt(&self.kind, f)
626 }
627}
628
629impl std::error::Error for MailboxError {
630 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
631 self.kind.source()
632 }
633}
634
635#[derive(Debug, Clone)]
639pub enum PortLocation {
640 Bound(PortAddr),
642 Unbound(ActorAddr, &'static str),
644}
645
646impl PortLocation {
647 fn new_unbound<M: Message>(actor_id: ActorAddr) -> Self {
648 PortLocation::Unbound(actor_id, std::any::type_name::<M>())
649 }
650
651 #[allow(dead_code)]
652 fn new_unbound_type(actor_id: ActorAddr, ty: &'static str) -> Self {
653 PortLocation::Unbound(actor_id, ty)
654 }
655
656 pub fn actor_addr(&self) -> ActorAddr {
658 match self {
659 PortLocation::Bound(port_addr) => port_addr.actor_addr(),
660 PortLocation::Unbound(actor_addr, _) => actor_addr.clone(),
661 }
662 }
663}
664
665impl fmt::Display for PortLocation {
666 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667 match self {
668 PortLocation::Bound(port_ref) => write!(f, "{}", port_ref),
669 PortLocation::Unbound(actor_ref, name) => write!(f, "{}<{}>", actor_ref, name),
670 }
671 }
672}
673
674#[derive(Debug)]
677pub struct MailboxSenderError {
678 location: Box<PortLocation>,
679 kind: Box<MailboxSenderErrorKind>,
680}
681
682#[derive(thiserror::Error, Debug)]
684pub enum MailboxSenderErrorKind {
685 #[error("serialization error: {0}")]
687 Serialize(anyhow::Error),
688
689 #[error("deserialization error for type {0}: {1}")]
691 Deserialize(&'static str, anyhow::Error),
692
693 #[error("invalid port")]
695 Invalid,
696
697 #[error("port closed")]
699 Closed,
700
701 #[error(transparent)]
704 Mailbox(#[from] MailboxError),
705
706 #[error(transparent)]
708 Channel(#[from] ChannelError),
709
710 #[error("send error: {0}")]
712 Other(#[from] anyhow::Error),
713
714 #[error("unreachable: {0}")]
716 Unreachable(anyhow::Error),
717}
718
719impl MailboxSenderError {
720 pub fn new_unbound<M>(actor_id: impl Into<ActorAddr>, kind: MailboxSenderErrorKind) -> Self {
722 Self {
723 location: Box::new(PortLocation::Unbound(
724 actor_id.into(),
725 std::any::type_name::<M>(),
726 )),
727 kind: Box::new(kind),
728 }
729 }
730
731 pub fn new_unbound_type(
733 actor_id: impl Into<ActorAddr>,
734 kind: MailboxSenderErrorKind,
735 ty: &'static str,
736 ) -> Self {
737 Self {
738 location: Box::new(PortLocation::Unbound(actor_id.into(), ty)),
739 kind: Box::new(kind),
740 }
741 }
742
743 pub fn new_bound(port_id: impl Into<PortAddr>, kind: MailboxSenderErrorKind) -> Self {
745 Self {
746 location: Box::new(PortLocation::Bound(port_id.into())),
747 kind: Box::new(kind),
748 }
749 }
750
751 pub fn location(&self) -> &PortLocation {
753 &self.location
754 }
755
756 pub fn kind(&self) -> &MailboxSenderErrorKind {
758 &self.kind
759 }
760}
761
762impl fmt::Display for MailboxSenderError {
763 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
764 write!(f, "{}: ", self.location)?;
765 fmt::Display::fmt(&self.kind, f)
766 }
767}
768
769impl std::error::Error for MailboxSenderError {
770 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
771 self.kind.source()
772 }
773}
774
775#[async_trait]
778pub trait MailboxSender: Send + Sync + Any {
779 fn post(
782 &self,
783 mut envelope: MessageEnvelope,
784 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
785 ) {
786 if let Err(err) = envelope.dec_ttl_or_err() {
787 envelope.undeliverable(err, return_handle);
788 return;
789 }
790 self.post_unchecked(envelope, return_handle);
791 }
792
793 fn post_unchecked(
795 &self,
796 envelope: MessageEnvelope,
797 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
798 );
799
800 async fn flush(&self) -> Result<(), anyhow::Error> {
805 Ok(())
806 }
807}
808
809pub trait PortSender: MailboxSender {
812 fn serialize_and_send<M: RemoteMessage>(
814 &self,
815 port: &PortRef<M>,
816 message: M,
817 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
818 ) -> Result<(), MailboxSenderError> {
819 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
821 MailboxSenderError::new_bound(
822 port.port_addr().clone(),
823 MailboxSenderErrorKind::Serialize(err.into()),
824 )
825 })?;
826 self.post(
827 MessageEnvelope::new_unknown(port.port_addr().clone(), serialized),
828 return_handle,
829 );
830 Ok(())
831 }
832
833 fn serialize_and_send_once<M: RemoteMessage>(
836 &self,
837 once_port: OncePortRef<M>,
838 message: M,
839 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
840 ) -> Result<(), MailboxSenderError> {
841 let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
842 MailboxSenderError::new_bound(
843 once_port.port_addr().clone(),
844 MailboxSenderErrorKind::Serialize(err.into()),
845 )
846 })?;
847 self.post(
848 MessageEnvelope::new_unknown(once_port.port_addr().clone(), serialized),
849 return_handle,
850 );
851 Ok(())
852 }
853}
854
855impl<T: ?Sized + MailboxSender> PortSender for T {}
856
857#[derive(Debug, Clone)]
861pub struct PanickingMailboxSender;
862
863#[async_trait]
864impl MailboxSender for PanickingMailboxSender {
865 fn post_unchecked(
866 &self,
867 envelope: MessageEnvelope,
868 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
869 ) {
870 panic!("panic! in the mailbox! attempted post: {}", envelope)
871 }
872}
873
874#[derive(Debug)]
877pub struct UndeliverableMailboxSender;
878
879#[async_trait]
880impl MailboxSender for UndeliverableMailboxSender {
881 fn post_unchecked(
882 &self,
883 envelope: MessageEnvelope,
884 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
885 ) {
886 let sender_name = envelope
887 .sender
888 .label()
889 .map_or("?".to_string(), |l| l.to_string());
890 let error_str = envelope.error_msg().unwrap_or("".to_string());
891 let operation_endpoint = envelope.headers().get(headers::OPERATION_ENDPOINT);
892 let operation_adverb = envelope.headers().get(headers::OPERATION_ADVERB);
893 match &operation_endpoint {
895 Some(endpoint) => tracing::error!(
896 name = "undelivered_message_abandoned",
897 actor_name = sender_name,
898 actor_id = envelope.sender.to_string(),
899 dest = envelope.dest.to_string(),
900 message_type = envelope.data().typename().unwrap_or("unknown"),
901 data_len = envelope.data().len(),
902 endpoint = %endpoint,
903 adverb = operation_adverb.as_deref().unwrap_or(""),
904 error = %error_str,
905 "abandoned message for {}",
906 endpoint,
907 ),
908 None => tracing::error!(
909 name = "undelivered_message_abandoned",
910 actor_name = sender_name,
911 actor_id = envelope.sender.to_string(),
912 dest = envelope.dest.to_string(),
913 message_type = envelope.data().typename().unwrap_or("unknown"),
914 data_len = envelope.data().len(),
915 error = %error_str,
916 "message not delivered to {}",
917 envelope.dest,
918 ),
919 }
920 }
921}
922
923#[derive(Clone)]
929pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
930
931impl fmt::Debug for BoxedMailboxSender {
932 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
933 f.debug_struct("BoxedMailboxSender")
934 .field("sender", &"<dyn MailboxSender>")
935 .finish()
936 }
937}
938
939impl BoxedMailboxSender {
940 pub fn new(sender: impl MailboxSender + 'static) -> Self {
942 Self(Arc::new(sender))
943 }
944
945 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
948 (&*self.0 as &dyn Any).downcast_ref::<T>()
949 }
950}
951
952pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
954 fn boxed(&self) -> BoxedMailboxSender;
956}
957impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
958 fn boxed(&self) -> BoxedMailboxSender {
959 BoxedMailboxSender::new(self.clone())
960 }
961}
962
963pub trait IntoBoxedMailboxSender: MailboxSender {
965 fn into_boxed(self) -> BoxedMailboxSender;
967}
968impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
969 fn into_boxed(self) -> BoxedMailboxSender {
970 BoxedMailboxSender::new(self)
971 }
972}
973
974#[async_trait]
975impl MailboxSender for BoxedMailboxSender {
976 fn post_unchecked(
977 &self,
978 envelope: MessageEnvelope,
979 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
980 ) {
981 self.0.post_unchecked(envelope, return_handle);
982 }
983
984 async fn flush(&self) -> Result<(), anyhow::Error> {
985 self.0.flush().await
986 }
987}
988
989#[derive(thiserror::Error, Debug)]
991pub enum MailboxServerError {
992 #[error(transparent)]
994 Channel(#[from] ChannelError),
995
996 #[error(transparent)]
998 MailboxSender(#[from] MailboxSenderError),
999}
1000
1001#[derive(Debug)]
1004pub struct MailboxServerHandle {
1005 join_handle: JoinHandle<Result<(), MailboxServerError>>,
1006 stopped_tx: watch::Sender<bool>,
1007}
1008
1009impl MailboxServerHandle {
1010 pub fn stop(&self, reason: &str) {
1015 tracing::info!("stopping mailbox server; reason: {}", reason);
1016 self.stopped_tx.send(true).expect("stop called twice");
1017 }
1018
1019 pub fn from_parts(
1024 join_handle: JoinHandle<Result<(), MailboxServerError>>,
1025 stopped_tx: watch::Sender<bool>,
1026 ) -> Self {
1027 Self {
1028 join_handle,
1029 stopped_tx,
1030 }
1031 }
1032}
1033
1034impl Future for MailboxServerHandle {
1036 type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
1037
1038 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1039 let join_handle_pinned =
1041 unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
1042 join_handle_pinned.poll(cx)
1043 }
1044}
1045
1046pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
1049 fn serve(
1053 self,
1054 mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
1055 ) -> MailboxServerHandle {
1056 let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
1061 let server = self.clone();
1062 tokio::task::spawn(async move {
1063 static NEXT_RANK: AtomicUsize = AtomicUsize::new(0);
1065 let rank = NEXT_RANK.fetch_add(1, Ordering::Relaxed);
1066 let addr = ChannelAddr::any(ChannelTransport::Local);
1067 let proc_id = ProcAddr::instance(addr, format!("mailbox_server_{}", rank));
1068 let proc = Proc::builder()
1073 .proc_id(proc_id.id().clone())
1074 .shared_gateway(Gateway::configured(
1075 proc_id.location().clone(),
1076 BoxedMailboxSender::new(server),
1077 ))
1078 .build()
1079 .expect("mailbox server proc builder is valid");
1080 let (client, _) = proc.client("undeliverable_supervisor").unwrap();
1081 while let Ok(undeliverable) = undeliverable_rx.recv().await {
1082 match undeliverable {
1083 Undeliverable::Message(mut envelope) => {
1084 match envelope.deserialized::<Undeliverable<MessageEnvelope>>() {
1085 Ok(Undeliverable::Message(e)) => {
1086 UndeliverableMailboxSender.post(e, monitored_return_handle());
1088 continue;
1089 }
1090 Ok(Undeliverable::Lost(lost)) => {
1091 tracing::error!(
1092 sender = %lost.sender,
1093 dest = %lost.dest,
1094 message_type = lost.message_type.as_deref().unwrap_or("unknown"),
1095 error = %lost.error,
1096 "lost message was undeliverable"
1097 );
1098 continue;
1099 }
1100 Err(_) => {}
1101 }
1102 envelope.set_error(DeliveryError::BrokenLink(
1103 "message was undeliverable".to_owned(),
1104 ));
1105 let sender_id: ActorAddr = envelope.sender().clone();
1106 let return_port =
1107 PortRef::<Undeliverable<MessageEnvelope>>::attest_handler_port(
1108 &sender_id,
1109 );
1110 return_port.post_serialized(
1111 &client,
1112 Flattrs::new(),
1113 wirevalue::Any::serialize(&Undeliverable::Message(envelope)).unwrap(),
1114 );
1115 }
1116 Undeliverable::Lost(lost) => {
1117 tracing::error!(
1118 sender = %lost.sender,
1119 dest = %lost.dest,
1120 message_type = lost.message_type.as_deref().unwrap_or("unknown"),
1121 error = %lost.error,
1122 "lost message was undeliverable"
1123 );
1124 }
1125 }
1126 }
1127 });
1128
1129 let (stopped_tx, mut stopped_rx) = watch::channel(false);
1130 let join_handle = tokio::spawn(async move {
1131 let mut detached = false;
1132
1133 let result = loop {
1134 if *stopped_rx.borrow_and_update() {
1135 break Ok(());
1136 }
1137
1138 tokio::select! {
1139 message = rx.recv() => {
1140 match message {
1141 Ok(envelope) => self.post(envelope, return_handle.clone()),
1143
1144 Err(ChannelError::Closed) => break Ok(()),
1147 Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1148 }
1149 }
1150 result = stopped_rx.changed(), if !detached => {
1151 detached = result.is_err();
1152 if detached {
1153 tracing::debug!(
1154 "the mailbox server is detached for Rx {}", rx.addr()
1155 );
1156 } else {
1157 tracing::debug!(
1158 "the mailbox server is stopped for Rx {}", rx.addr()
1159 );
1160 }
1161 }
1162 }
1163 };
1164
1165 rx.join().await;
1168
1169 result
1170 });
1171
1172 MailboxServerHandle {
1173 join_handle,
1174 stopped_tx,
1175 }
1176 }
1177}
1178
1179impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1180
1181struct Buffer<T: Message> {
1182 queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
1183 #[allow(dead_code)]
1184 processed: watch::Receiver<usize>,
1185 seq: AtomicUsize,
1186}
1187
1188impl<T: Message> Buffer<T> {
1189 fn new<Fut>(
1190 process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
1191 ) -> Self
1192 where
1193 Fut: Future<Output = ()> + Send + 'static,
1194 {
1195 let (queue, mut next) = mpsc::unbounded_channel();
1196 let (last_processed, processed) = watch::channel(0);
1197 crate::init::get_runtime().spawn(async move {
1198 let mut seq = 0;
1199 while let Some((msg, return_handle)) = next.recv().await {
1200 process(msg, return_handle).await;
1201 seq += 1;
1202 let _ = last_processed.send(seq);
1203 }
1204 });
1205 Self {
1206 queue,
1207 processed,
1208 seq: AtomicUsize::new(0),
1209 }
1210 }
1211
1212 fn send(
1213 &self,
1214 item: (T, PortHandle<Undeliverable<T>>),
1215 ) -> Result<(), Box<mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>>> {
1216 self.seq.fetch_add(1, Ordering::SeqCst);
1217 self.queue.send(item).map_err(Box::new)?;
1218 Ok(())
1219 }
1220}
1221
1222pub struct MailboxClient {
1224 buffer: Buffer<MessageEnvelope>,
1226
1227 _tx_monitoring: CancellationToken,
1229
1230 submitted: Arc<AtomicUsize>,
1232 completed: Arc<AtomicUsize>,
1235 completed_notify: Arc<tokio::sync::Notify>,
1237}
1238
1239impl fmt::Debug for MailboxClient {
1240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1241 f.debug_struct("MailboxClient")
1242 .field("buffer", &"<Buffer>")
1243 .finish()
1244 }
1245}
1246
1247impl MailboxClient {
1248 pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1251 let addr = tx.addr();
1252 let tx = Arc::new(tx);
1253 let tx_status = tx.status().clone();
1254 let tx_monitoring = CancellationToken::new();
1255 let completed = Arc::new(AtomicUsize::new(0));
1256 let completed_notify = Arc::new(tokio::sync::Notify::new());
1257 let buffer = {
1258 let completed = completed.clone();
1259 let completed_notify = completed_notify.clone();
1260 Buffer::new(move |envelope, return_handle| {
1261 let tx = Arc::clone(&tx);
1262 let (return_channel, return_receiver) =
1263 oneshot::channel::<SendError<MessageEnvelope>>();
1264 let return_handle_0 = return_handle.clone();
1266 let completed = completed.clone();
1267 let completed_notify = completed_notify.clone();
1268 tokio::spawn(async move {
1269 match return_receiver.await {
1270 Ok(SendError {
1271 error,
1272 message,
1273 reason,
1274 }) => {
1275 message.undeliverable(
1276 DeliveryError::BrokenLink(format!(
1277 "failed to enqueue in MailboxClient when processing buffer: {error} with reason {reason:?}"
1278 )),
1279 return_handle_0,
1280 );
1281 }
1282 Err(_) => {
1283 }
1285 }
1286 completed.fetch_add(1, Ordering::SeqCst);
1287 completed_notify.notify_waiters();
1288 });
1289 tx.try_post(envelope, return_channel);
1291 future::ready(())
1292 })
1293 };
1294 let this = Self {
1295 buffer,
1296 _tx_monitoring: tx_monitoring.clone(),
1297 submitted: Arc::new(AtomicUsize::new(0)),
1298 completed,
1299 completed_notify,
1300 };
1301 Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1302 this
1303 }
1304
1305 pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1308 Ok(MailboxClient::new(channel::dial(addr)?))
1309 }
1310
1311 fn monitor_tx_health(
1313 mut rx: watch::Receiver<TxStatus>,
1314 cancel_token: CancellationToken,
1315 addr: ChannelAddr,
1316 ) {
1317 crate::init::get_runtime().spawn(async move {
1318 loop {
1319 tokio::select! {
1320 changed = rx.changed() => {
1321 if changed.is_err() || rx.borrow().is_closed() {
1322 let reason = rx.borrow().as_closed().map(|r| r.to_string()).unwrap_or_else(|| "unknown".to_string());
1323 tracing::warn!("connection to {} lost: {}", addr, reason);
1324 break;
1327 }
1328 }
1329 _ = cancel_token.cancelled() => {
1330 break;
1331 }
1332 }
1333 }
1334 });
1335 }
1336}
1337
1338#[async_trait]
1339impl MailboxSender for MailboxClient {
1340 #[tracing::instrument(level = "debug", skip_all)]
1341 fn post_unchecked(
1342 &self,
1343 envelope: MessageEnvelope,
1344 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1345 ) {
1346 tracing::event!(target:"messages", tracing::Level::TRACE, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.actor_addr(), "port"= envelope.dest.index(), "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1347 if let Err(err) = self.buffer.send((envelope, return_handle)) {
1348 let mpsc::error::SendError((envelope, return_handle)) = *err;
1349 let err = DeliveryError::BrokenLink(
1350 "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
1351 );
1352
1353 envelope.undeliverable(err, return_handle);
1355 } else {
1356 self.submitted.fetch_add(1, Ordering::SeqCst);
1357 }
1358 }
1359
1360 async fn flush(&self) -> Result<(), anyhow::Error> {
1361 let target = self.submitted.load(Ordering::SeqCst);
1362 loop {
1363 if self.completed.load(Ordering::SeqCst) >= target {
1364 return Ok(());
1365 }
1366 self.completed_notify.notified().await;
1367 }
1368 }
1369}
1370
1371pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1373 cx: C,
1374 port: PortRef<M>,
1375}
1376
1377impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1378 pub fn new(cx: C, port: PortRef<M>) -> Self {
1380 Self { cx, port }
1381 }
1382}
1383
1384impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1385 type Error = MailboxSenderError;
1386
1387 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1388 Poll::Ready(Ok(()))
1389 }
1390
1391 fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1392 crate::Endpoint::post(&self.port, &self.cx, item);
1393 Ok(())
1394 }
1395
1396 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1397 Poll::Ready(Ok(()))
1398 }
1399
1400 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1401 Poll::Ready(Ok(()))
1402 }
1403}
1404
1405#[derive(Clone, Debug)]
1408pub struct Mailbox {
1409 inner: Arc<State>,
1410}
1411
1412impl Mailbox {
1413 pub fn new(actor_id: impl Into<ActorAddr>) -> Self {
1415 Self {
1416 inner: Arc::new(State::new(actor_id.into())),
1417 }
1418 }
1419
1420 pub fn actor_addr(&self) -> &ActorAddr {
1422 &self.inner.actor_id
1423 }
1424
1425 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1430 let port_index = self.inner.allocate_port();
1431 let (sender, receiver) = mpsc::unbounded_channel::<M>();
1432 let port_id = self.inner.actor_id.port_addr(Port::from(port_index));
1433 tracing::trace!(
1434 name = "open_port",
1435 "opening port for {} at {}",
1436 self.inner.actor_id,
1437 port_id
1438 );
1439 (
1440 PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1441 PortReceiver::new(receiver, port_id, false, self.clone()),
1442 )
1443 }
1444
1445 pub(crate) fn bind_handler_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1452 let (handle, receiver) = self.open_port();
1453 handle.bind_handler_port();
1454 (handle, receiver)
1455 }
1456
1457 pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1460 where
1461 A: Accumulator + Send + Sync + 'static,
1462 A::Update: Message,
1463 A::State: Message + Default + Clone,
1464 {
1465 self.open_accum_port_opts(accum, StreamingReducerOpts::default())
1466 }
1467
1468 pub fn open_accum_port_opts<A>(
1476 &self,
1477 accum: A,
1478 streaming_opts: StreamingReducerOpts,
1479 ) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1480 where
1481 A: Accumulator + Send + Sync + 'static,
1482 A::Update: Message,
1483 A::State: Message + Default + Clone,
1484 {
1485 let port_index = self.inner.allocate_port();
1486 let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1487 let port_id = self.inner.actor_id.port_addr(Port::from(port_index));
1488 let state = Mutex::new(A::State::default());
1489 let reducer_spec = accum.reducer_spec();
1490 let enqueue = move |_, update: A::Update| {
1491 let mut state = state.lock().unwrap();
1492 accum.accumulate(&mut state, update)?;
1493 let _ = sender.send(state.clone());
1494 Ok(())
1495 };
1496 (
1497 PortHandle::new_full(
1498 self.clone(),
1499 port_index,
1500 UnboundedPortSender::Func(Arc::new(enqueue)),
1501 reducer_spec,
1502 streaming_opts,
1503 ),
1504 PortReceiver::new(receiver, port_id, true, self.clone()),
1505 )
1506 }
1507
1508 pub(crate) fn open_enqueue_port<M: Message>(
1512 &self,
1513 enqueue: impl Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1514 ) -> PortHandle<M> {
1515 PortHandle::new_full(
1516 self.clone(),
1517 self.inner.allocate_port(),
1518 UnboundedPortSender::Func(Arc::new(enqueue)),
1519 None,
1520 StreamingReducerOpts::default(),
1521 )
1522 }
1523
1524 pub(crate) fn open_handler_enqueue_port<M: Message>(
1527 &self,
1528 enqueue: impl Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1529 ) -> PortHandle<M> {
1530 let port_index = self.inner.allocate_port();
1531 let enqueue = Arc::new(enqueue);
1532 let sender = Arc::new(HandlerPortSender::new(
1533 UnboundedPortSender::Func(enqueue),
1534 self.inner.handler_ingress.clone(),
1535 ));
1536 PortHandle::new_full(
1537 self.clone(),
1538 port_index,
1539 UnboundedPortSender::Handler(sender),
1540 None,
1541 StreamingReducerOpts::default(),
1542 )
1543 }
1544
1545 pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1549 let port_index = self.inner.allocate_port();
1550 let port_id = self.inner.actor_id.port_addr(Port::from(port_index));
1551 let (sender, receiver) = oneshot::channel::<M>();
1552 (
1553 OncePortHandle {
1554 mailbox: self.clone(),
1555 port_index,
1556 port_id: port_id.clone(),
1557 sender,
1558 reducer_spec: None,
1559 },
1560 OncePortReceiver {
1561 receiver: Some(receiver),
1562 port_id,
1563 mailbox: self.clone(),
1564 },
1565 )
1566 }
1567
1568 pub fn open_reduce_port<A, T>(
1582 &self,
1583 accum: A,
1584 ) -> (OncePortHandle<A::State>, OncePortReceiver<A::State>)
1585 where
1586 A: Accumulator<State = T, Update = T> + Send + Sync + 'static,
1587 T: Message + Default + Clone,
1588 {
1589 let port_index = self.inner.allocate_port();
1590 let (sender, receiver) = oneshot::channel::<T>();
1591 let port_id = self.inner.actor_id.port_addr(Port::from(port_index));
1592 let reducer_spec = accum.reducer_spec();
1593 assert!(
1594 reducer_spec.is_some(),
1595 "cannot use a reduce port without a ReducerSpec"
1596 );
1597
1598 (
1599 OncePortHandle {
1600 mailbox: self.clone(),
1601 port_index,
1602 port_id: port_id.clone(),
1603 sender,
1604 reducer_spec,
1605 },
1606 OncePortReceiver {
1607 receiver: Some(receiver),
1608 port_id,
1609 mailbox: self.clone(),
1610 },
1611 )
1612 }
1613
1614 #[allow(dead_code)]
1615 fn error(&self, err: MailboxErrorKind) -> MailboxError {
1616 MailboxError::new(self.inner.actor_id.clone(), err)
1617 }
1618
1619 fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1620 let port_index = M::port();
1621 self.inner.ports.get(&port_index).and_then(|boxed| {
1622 boxed
1623 .as_any()
1624 .downcast_ref::<UnboundedSender<M>>()
1625 .map(|s| {
1626 assert_eq!(
1627 s.port_id,
1628 self.actor_addr().port_addr(Port::from(port_index)),
1629 "port_id mismatch in downcasted UnboundedSender"
1630 );
1631 s.sender.clone()
1632 })
1633 })
1634 }
1635
1636 pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1638 self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1639 .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1640 }
1641
1642 pub(crate) fn allocate_port(&self) -> u64 {
1643 self.inner.allocate_port()
1644 }
1645
1646 fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
1647 assert_eq!(
1648 handle.inner.mailbox.actor_addr(),
1649 self.actor_addr(),
1650 "port does not belong to mailbox"
1651 );
1652
1653 let port_ref = self
1656 .actor_addr()
1657 .port_addr(Port::from(handle.inner.port_index));
1658 match self.inner.ports.entry(handle.inner.port_index) {
1659 Entry::Vacant(entry) => {
1660 entry.insert(Arc::new(UnboundedSender::new(
1661 handle.inner.sender.clone(),
1662 port_ref.clone(),
1663 )));
1664 }
1665 Entry::Occupied(_entry) => {}
1666 }
1667
1668 PortRef::attest(port_ref)
1669 }
1670
1671 fn bind_to_handler_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
1672 assert_eq!(
1673 handle.inner.mailbox.actor_addr(),
1674 self.actor_addr(),
1675 "port does not belong to mailbox"
1676 );
1677
1678 let port_index = M::port();
1679 let port_ref = self.actor_addr().port_addr(Port::from(port_index));
1680 match self.inner.ports.entry(port_index) {
1681 Entry::Vacant(entry) => {
1682 entry.insert(Arc::new(UnboundedSender::new(
1683 handle.inner.sender.clone(),
1684 port_ref,
1685 )));
1686 }
1687 Entry::Occupied(_entry) => panic!("port {} already bound", port_ref),
1688 }
1689 }
1690
1691 fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1692 let port_id = handle.port_addr().clone();
1693 match self.inner.ports.entry(handle.port_index) {
1694 Entry::Vacant(entry) => {
1695 entry.insert(Arc::new(OnceSender::new(handle.sender, port_id.clone())));
1696 }
1697 Entry::Occupied(_entry) => {}
1698 }
1699 }
1700
1701 pub(crate) fn bind_untyped(&self, port_id: &PortAddr, sender: UntypedUnboundedSender) {
1702 assert_eq!(
1703 port_id.actor_addr(),
1704 *self.actor_addr(),
1705 "port does not belong to mailbox"
1706 );
1707
1708 match self.inner.ports.entry(port_id.index()) {
1709 Entry::Vacant(entry) => {
1710 entry.insert(Arc::new(sender));
1711 }
1712 Entry::Occupied(_entry) => {}
1713 }
1714 }
1715
1716 pub(crate) fn close(&self, status: ActorStatus) {
1717 let mut closed = self.inner.closed.write().unwrap();
1718 if closed.is_some() {
1719 panic!("mailbox with owner {} already closed", self.actor_addr());
1720 }
1721 let _ = closed.insert(status);
1722 }
1723
1724 pub(crate) fn drain(&self) {
1742 self.inner.handler_ingress.drain();
1743 }
1744}
1745
1746impl context::Mailbox for Mailbox {
1747 fn mailbox(&self) -> &Mailbox {
1748 self
1749 }
1750}
1751
1752pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1757 cx.mailbox().open_port()
1758}
1759
1760pub fn open_once_port<M: Message>(
1763 cx: &impl context::Mailbox,
1764) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1765 cx.mailbox().open_once_port()
1766}
1767
1768#[async_trait]
1769impl MailboxSender for Mailbox {
1770 fn post_unchecked(
1773 &self,
1774 envelope: MessageEnvelope,
1775 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1776 ) {
1777 metrics::MAILBOX_POSTS.add(
1778 1,
1779 hyperactor_telemetry::kv_pairs!(
1780 "actor_id" => envelope.sender.to_string(),
1781 "dest_actor_id" => envelope.dest.actor_addr().to_string(),
1782 ),
1783 );
1784 tracing::trace!(
1785 name = "post",
1786 actor_name = envelope.sender.label().map_or("?", |l| l.as_str()),
1787 actor_id = envelope.sender.to_string(),
1788 "posting message to {}",
1789 envelope.dest
1790 );
1791
1792 if envelope.dest().actor_id() != self.inner.actor_id.id() {
1793 let err = DeliveryError::Mailbox(format!(
1794 "mailbox owner {} cannot deliver to {}",
1795 self.inner.actor_id,
1796 envelope.dest().actor_addr()
1797 ));
1798 return envelope.undeliverable(err, return_handle);
1799 }
1800
1801 let port_index = envelope.dest().index();
1802
1803 let port_sender = match self.inner.ports.get(&port_index) {
1813 None => {
1814 let err = unbound_port_delivery_error(port_index, envelope.data());
1815 return envelope.undeliverable(err, return_handle);
1816 }
1817 Some(ref_) => {
1818 let closed = self.inner.closed.read().unwrap();
1819 if let Some(status) = &*closed {
1820 match status {
1821 ActorStatus::Stopped(reason) => {
1822 let err = format!(
1823 "mailbox owner {} is stopped: {}",
1824 self.inner.actor_id, reason
1825 );
1826 return envelope
1827 .undeliverable(DeliveryError::Mailbox(err), return_handle);
1828 }
1829 ActorStatus::Failed(actor_error) => {
1830 let err = format!(
1831 "mailbox owner {} failed: {}",
1832 self.inner.actor_id, actor_error
1833 );
1834 return envelope
1835 .undeliverable(DeliveryError::Mailbox(err), return_handle);
1836 }
1837 _ => {
1838 let err = format!(
1839 "mailbox owner {} closed unexpectedly: {:?}",
1840 self.inner.actor_id, status
1841 );
1842 return envelope
1843 .undeliverable(DeliveryError::Mailbox(err), return_handle);
1844 }
1845 }
1846 }
1847 Arc::clone(&*ref_)
1850 }
1851 };
1852 let (metadata, data) = envelope.open();
1855 let MessageMetadata {
1856 mut headers,
1857 sender,
1858 dest,
1859 errors: metadata_errors,
1860 ttl,
1861 return_undeliverable,
1862 } = metadata;
1863
1864 let to_actor_id = hash_to_u64(&dest);
1865 let message_id = hyperactor_telemetry::generate_message_id(to_actor_id);
1866 headers.set(crate::mailbox::headers::TELEMETRY_MESSAGE_ID, message_id);
1867 if !headers.contains_key(crate::mailbox::headers::SENDER_ACTOR_ID_HASH) {
1870 headers.set(
1871 crate::mailbox::headers::SENDER_ACTOR_ID_HASH,
1872 hash_to_u64(&sender),
1873 );
1874 }
1875 headers.set(crate::mailbox::headers::TELEMETRY_PORT_ID, dest.index());
1876
1877 match port_sender.send_serialized(headers, data) {
1878 Ok(disposition) => {
1879 hyperactor_telemetry::notify_message_status(
1880 hyperactor_telemetry::MessageStatusEvent {
1881 timestamp: std::time::SystemTime::now(),
1882 id: hyperactor_telemetry::generate_status_event_id(message_id),
1883 message_id,
1884 status: "queued".to_string(),
1885 },
1886 );
1887
1888 if disposition == SerializedSendDisposition::DeliveredAndExhausted {
1889 self.inner.ports.remove(&port_index);
1890 }
1891 }
1892 Err(SerializedSendFailure::Dead { data, headers }) => {
1893 self.inner.ports.remove(&port_index);
1894 let err = unbound_port_delivery_error(port_index, &data);
1895
1896 MessageEnvelope::seal(
1897 MessageMetadata {
1898 headers,
1899 sender,
1900 dest,
1901 errors: metadata_errors,
1902 ttl,
1903 return_undeliverable,
1904 },
1905 data,
1906 )
1907 .undeliverable(err, return_handle)
1908 }
1909 Err(SerializedSendFailure::Error(SerializedSendError {
1910 data,
1911 error: sender_error,
1912 headers,
1913 })) => {
1914 let err = DeliveryError::Mailbox(format!("{}", sender_error));
1915
1916 MessageEnvelope::seal(
1917 MessageMetadata {
1918 headers,
1919 sender,
1920 dest,
1921 errors: metadata_errors,
1922 ttl,
1923 return_undeliverable,
1924 },
1925 data,
1926 )
1927 .undeliverable(err, return_handle)
1928 }
1929 }
1930 }
1931}
1932
1933fn unbound_port_delivery_error(port_index: u64, data: &wirevalue::Any) -> DeliveryError {
1934 DeliveryError::Unroutable(format!(
1935 "port not bound in mailbox; port id: {}; message type: {}",
1936 port_index,
1937 data.typename().map_or_else(
1938 || format!("unregistered type hash {}", data.typehash()),
1939 |name| name.to_string(),
1940 )
1941 ))
1942}
1943
1944struct PortHandleInner<M: Message> {
1947 mailbox: Mailbox,
1948 port_index: u64,
1949 sender: UnboundedPortSender<M>,
1950 bound: Arc<RwLock<Option<PortAddr>>>,
1957 reducer_spec: Option<ReducerSpec>,
1960 streaming_opts: StreamingReducerOpts,
1962}
1963
1964impl<M: Message> fmt::Debug for PortHandleInner<M> {
1965 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1966 f.debug_struct("PortHandleInner")
1967 .field("mailbox", &self.mailbox)
1968 .field("port_index", &self.port_index)
1969 .field("sender", &self.sender)
1970 .field("bound", &self.bound)
1971 .field("reducer_spec", &self.reducer_spec)
1972 .field("streaming_opts", &self.streaming_opts)
1973 .finish()
1974 }
1975}
1976
1977#[derive(Debug)]
1985pub struct PortHandle<M: Message> {
1986 inner: Arc<PortHandleInner<M>>,
1987}
1988
1989impl<M: Message> PortHandle<M> {
1990 fn new_full(
1991 mailbox: Mailbox,
1992 port_index: u64,
1993 sender: UnboundedPortSender<M>,
1994 reducer_spec: Option<ReducerSpec>,
1995 streaming_opts: StreamingReducerOpts,
1996 ) -> Self {
1997 Self {
1998 inner: Arc::new(PortHandleInner {
1999 mailbox,
2000 port_index,
2001 sender,
2002 bound: Arc::new(RwLock::new(None)),
2003 reducer_spec,
2004 streaming_opts,
2005 }),
2006 }
2007 }
2008
2009 fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
2010 Self::new_full(
2011 mailbox,
2012 port_index,
2013 sender,
2014 None,
2015 StreamingReducerOpts::default(),
2016 )
2017 }
2018
2019 pub(crate) fn location(&self) -> PortLocation {
2020 match self.inner.bound.read().unwrap().as_ref() {
2021 Some(port_id) => PortLocation::Bound(port_id.clone()),
2022 None => PortLocation::new_unbound::<M>(self.inner.mailbox.actor_addr().clone()),
2023 }
2024 }
2025
2026 pub(crate) fn try_send<C>(&self, cx: &C, message: M) -> Result<(), MailboxSenderError>
2027 where
2028 C: context::Actor,
2029 {
2030 let closed = self.inner.mailbox.inner.closed.read().unwrap();
2031
2032 if let Some(status) = &*closed {
2033 let err = MailboxError {
2034 actor_id: self.inner.mailbox.actor_addr().clone(),
2035 kind: MailboxErrorKind::OwnerTerminated(status.clone()),
2036 };
2037 return Err(MailboxSenderError::new_unbound::<M>(
2038 self.inner.mailbox.actor_addr().clone(),
2039 MailboxSenderErrorKind::Mailbox(err),
2040 ));
2041 }
2042 let mut headers = Flattrs::new();
2043
2044 crate::mailbox::headers::set_send_timestamp(&mut headers);
2045 crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
2046 let bound_guard = self.inner.bound.read().unwrap();
2050 if let Some(bound_port) = bound_guard.as_ref() {
2051 let sequencer = cx.instance().sequencer();
2052 let bound_ref: PortAddr = bound_port.clone();
2053 let seq_info = sequencer.assign_seq(&bound_ref);
2054 headers.set(SEQ_INFO, seq_info);
2055 } else {
2056 headers.set(SEQ_INFO, SeqInfo::Direct);
2061 }
2062 self.inner.sender.send(headers, message).map_err(|err| {
2074 MailboxSenderError::new_unbound::<M>(
2075 self.inner.mailbox.actor_addr().clone(),
2076 classify_sender_error(err),
2077 )
2078 })
2079 }
2080}
2081
2082impl<M> Endpoint<M> for &PortHandle<M>
2083where
2084 M: Message,
2085{
2086 fn endpoint_location(&self) -> EndpointLocation {
2087 self.location().into()
2088 }
2089
2090 fn post<C>(self, cx: &C, message: M)
2091 where
2092 C: context::Actor,
2093 {
2094 if let Err(err) = self.try_send(cx, message) {
2095 cx.instance()
2096 .report_lost_message(LostMessage::from_send_error::<M>(
2097 cx.mailbox().actor_addr().clone(),
2098 self.endpoint_location(),
2099 &err,
2100 ));
2101 }
2102 }
2103}
2104
2105impl<M: Message> PortHandle<M> {
2106 pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
2109 where
2110 R: Message,
2111 F: Fn(R) -> M + Send + Sync + 'static,
2112 {
2113 let port_index = self.inner.mailbox.inner.allocate_port();
2114 let sender = self.inner.sender.clone();
2115 PortHandle::new(
2116 self.inner.mailbox.clone(),
2117 port_index,
2118 UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
2119 sender.send(headers, unmap(value))
2120 })),
2121 )
2122 }
2123}
2124
2125impl<M: RemoteMessage> PortHandle<M> {
2126 pub fn bind(&self) -> PortRef<M> {
2128 let port_ref = {
2129 let mut guard = self.inner.bound.write().unwrap();
2130 guard
2131 .get_or_insert_with(|| self.inner.mailbox.bind(self).into_port_addr())
2132 .clone()
2133 };
2134 PortRef::attest_reducible(
2135 port_ref,
2136 self.inner.reducer_spec.clone(),
2137 self.inner.streaming_opts.clone(),
2138 )
2139 }
2140
2141 pub(crate) fn bind_handler_port(&self) {
2147 let port_id = self
2148 .inner
2149 .mailbox
2150 .actor_addr()
2151 .port_addr(Port::from(M::port()));
2152 {
2153 let mut guard = self.inner.bound.write().unwrap();
2154 if guard.is_some() {
2155 panic!(
2156 "could not bind port handle {} as {port_id}: already bound",
2157 self.inner.port_index
2158 );
2159 }
2160 *guard = Some(port_id);
2161 }
2162 self.inner.mailbox.bind_to_handler_port(self);
2163 }
2164}
2165
2166impl<M: Message> Clone for PortHandle<M> {
2167 fn clone(&self) -> Self {
2168 Self {
2169 inner: Arc::clone(&self.inner),
2170 }
2171 }
2172}
2173
2174impl<M: Message> fmt::Display for PortHandle<M> {
2175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2176 fmt::Display::fmt(&self.location(), f)
2177 }
2178}
2179
2180#[derive(Debug)]
2182pub struct OncePortHandle<M: Message> {
2183 mailbox: Mailbox,
2184 port_index: u64,
2185 port_id: PortAddr,
2186 sender: oneshot::Sender<M>,
2187 reducer_spec: Option<ReducerSpec>,
2188}
2189
2190impl<M: Message> OncePortHandle<M> {
2191 pub fn port_addr(&self) -> &PortAddr {
2194 &self.port_id
2195 }
2196}
2197
2198impl<M> Endpoint<M> for OncePortHandle<M>
2199where
2200 M: Message,
2201{
2202 fn endpoint_location(&self) -> EndpointLocation {
2203 EndpointLocation::Port(self.port_id.clone())
2204 }
2205
2206 fn post<C>(self, _cx: &C, message: M)
2207 where
2208 C: context::Actor,
2209 {
2210 assert!(
2213 !self.port_addr().is_handler_port(),
2214 "OncePortHandle currently does not support handler ports; a \
2215 prerequisite of that support is to assign seq to messages \
2216 if the port is a handler port."
2217 );
2218
2219 let actor_id = self.mailbox.actor_addr().clone();
2220 let endpoint_location = self.endpoint_location();
2221 if let Err(err) = self.sender.send(message).map_err(|_| {
2222 MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
2227 }) {
2228 _cx.instance()
2229 .report_lost_message(LostMessage::from_send_error::<M>(
2230 _cx.mailbox().actor_addr().clone(),
2231 endpoint_location,
2232 &err,
2233 ));
2234 }
2235 }
2236}
2237
2238impl<M: RemoteMessage> OncePortHandle<M> {
2239 pub fn bind(self) -> OncePortRef<M> {
2244 let port_id: PortAddr = self.port_addr().clone();
2245 let reducer_spec = self.reducer_spec.clone();
2246 self.mailbox.clone().bind_once(self);
2247 OncePortRef::attest_reducible(port_id, reducer_spec)
2248 }
2249}
2250
2251impl<M: Message> fmt::Display for OncePortHandle<M> {
2252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2253 fmt::Display::fmt(&self.port_addr(), f)
2254 }
2255}
2256
2257#[derive(Debug)]
2260pub struct PortReceiver<M> {
2261 receiver: mpsc::UnboundedReceiver<M>,
2262 port_id: PortAddr,
2263 coalesce: bool,
2266 mailbox: Mailbox,
2269}
2270
2271impl<M> PortReceiver<M> {
2272 fn new(
2273 receiver: mpsc::UnboundedReceiver<M>,
2274 port_id: PortAddr,
2275 coalesce: bool,
2276 mailbox: Mailbox,
2277 ) -> Self {
2278 Self {
2279 receiver,
2280 port_id,
2281 coalesce,
2282 mailbox,
2283 }
2284 }
2285
2286 #[allow(clippy::result_large_err)] pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
2291 let mut next = self.receiver.try_recv();
2292 if self.coalesce
2294 && let Some(latest) = self.drain().pop()
2295 {
2296 next = Ok(latest);
2297 }
2298 match next {
2299 Ok(msg) => Ok(Some(msg)),
2300 Err(mpsc::error::TryRecvError::Empty) => Ok(None),
2301 Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
2302 self.actor_addr().clone(),
2303 MailboxErrorKind::Closed,
2304 )),
2305 }
2306 }
2307
2308 pub async fn recv(&mut self) -> Result<M, MailboxError> {
2311 let mut next = self.receiver.recv().await;
2312 if self.coalesce
2315 && let Some(latest) = self.drain().pop()
2316 {
2317 next = Some(latest);
2318 }
2319 next.ok_or(MailboxError::new(
2320 self.actor_addr().clone(),
2321 MailboxErrorKind::Closed,
2322 ))
2323 }
2324
2325 pub fn drain(&mut self) -> Vec<M> {
2327 let mut drained: Vec<M> = Vec::new();
2328 while let Ok(msg) = self.receiver.try_recv() {
2329 if self.coalesce {
2331 drained.pop();
2332 }
2333 drained.push(msg);
2334 }
2335 drained
2336 }
2337
2338 fn port(&self) -> u64 {
2339 self.port_id.index()
2340 }
2341
2342 fn actor_addr(&self) -> ActorAddr {
2343 self.port_id.actor_addr()
2344 }
2345}
2346
2347impl<M> Drop for PortReceiver<M> {
2348 fn drop(&mut self) {
2349 self.mailbox.inner.ports.remove(&self.port());
2353 }
2354}
2355
2356impl<M> Stream for PortReceiver<M> {
2357 type Item = Result<M, MailboxError>;
2358
2359 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2360 std::pin::pin!(self.recv()).poll(cx).map(Some)
2361 }
2362}
2363
2364pub struct OncePortReceiver<M> {
2366 receiver: Option<oneshot::Receiver<M>>,
2367 port_id: PortAddr,
2368
2369 mailbox: Mailbox,
2372}
2373
2374impl<M> OncePortReceiver<M> {
2375 pub async fn recv(mut self) -> Result<M, MailboxError> {
2379 std::mem::take(&mut self.receiver)
2380 .unwrap()
2381 .await
2382 .map_err(|err| {
2383 MailboxError::new(
2384 self.actor_addr().clone(),
2385 MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
2386 )
2387 })
2388 }
2389
2390 fn port(&self) -> u64 {
2391 self.port_id.index()
2392 }
2393
2394 fn actor_addr(&self) -> ActorAddr {
2395 self.port_id.actor_addr()
2396 }
2397}
2398
2399impl<M> Drop for OncePortReceiver<M> {
2400 fn drop(&mut self) {
2401 self.mailbox.inner.ports.remove(&self.port());
2405 }
2406}
2407
2408#[derive(Clone, Copy, Debug, PartialEq, Eq)]
2409pub(crate) enum SerializedSendDisposition {
2410 Delivered,
2411 DeliveredAndExhausted,
2412}
2413
2414pub(crate) struct SerializedSendError {
2416 pub(crate) headers: Flattrs,
2418 pub(crate) data: wirevalue::Any,
2420 pub(crate) error: MailboxSenderError,
2422}
2423
2424pub(crate) enum SerializedSendFailure {
2425 Dead {
2426 headers: Flattrs,
2427 data: wirevalue::Any,
2428 },
2429 Error(SerializedSendError),
2430}
2431
2432trait SerializedSender: Send + Sync {
2437 fn as_any(&self) -> &dyn Any;
2443
2444 fn send_serialized(
2451 &self,
2452 headers: Flattrs,
2453 serialized: wirevalue::Any,
2454 ) -> Result<SerializedSendDisposition, SerializedSendFailure>;
2455}
2456
2457#[derive(Debug, thiserror::Error)]
2458#[error("handler port closed")]
2459struct HandlerPortClosedError;
2460
2461fn classify_sender_error(err: anyhow::Error) -> MailboxSenderErrorKind {
2462 if err.is::<HandlerPortClosedError>() {
2463 MailboxSenderErrorKind::Closed
2464 } else {
2465 MailboxSenderErrorKind::Other(err)
2466 }
2467}
2468
2469enum UnboundedPortSender<M: Message> {
2471 Mpsc(mpsc::UnboundedSender<M>),
2473 Func(Arc<dyn Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
2475 Handler(Arc<HandlerPortSender<M>>),
2477}
2478
2479impl<M: Message> UnboundedPortSender<M> {
2480 fn send(&self, headers: Flattrs, message: M) -> Result<(), anyhow::Error> {
2481 match self {
2482 Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
2483 Self::Func(func) => func(headers, message),
2484 Self::Handler(sender) => sender.send(headers, message),
2485 }
2486 }
2487}
2488
2489impl<M: Message> Clone for UnboundedPortSender<M> {
2492 fn clone(&self) -> Self {
2493 match self {
2494 Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
2495 Self::Func(func) => Self::Func(func.clone()),
2496 Self::Handler(sender) => Self::Handler(sender.clone()),
2497 }
2498 }
2499}
2500
2501impl<M: Message> Debug for UnboundedPortSender<M> {
2502 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2503 match self {
2504 Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
2505 Self::Func(_) => f
2506 .debug_tuple("UnboundedPortSender::Func")
2507 .field(&"..")
2508 .finish(),
2509 Self::Handler(_) => f
2510 .debug_tuple("UnboundedPortSender::Handler")
2511 .field(&"..")
2512 .finish(),
2513 }
2514 }
2515}
2516
2517const HANDLER_INGRESS_DRAINING: usize = 1usize << (usize::BITS as usize - 1);
2518const HANDLER_INGRESS_ACTIVE_MASK: usize = !HANDLER_INGRESS_DRAINING;
2519
2520struct HandlerIngressGate {
2521 state: AtomicUsize,
2522 wait_lock: Mutex<()>,
2523 drained: Condvar,
2524}
2525
2526struct HandlerIngressGuard {
2527 gate: Arc<HandlerIngressGate>,
2528}
2529
2530impl HandlerIngressGate {
2531 fn new() -> Self {
2532 Self {
2533 state: AtomicUsize::new(0),
2534 wait_lock: Mutex::new(()),
2535 drained: Condvar::new(),
2536 }
2537 }
2538
2539 fn try_enter(self: &Arc<Self>) -> Result<HandlerIngressGuard, HandlerPortClosedError> {
2540 let mut state = self.state.load(Ordering::Acquire);
2541 loop {
2542 if state & HANDLER_INGRESS_DRAINING != 0 {
2543 return Err(HandlerPortClosedError);
2544 }
2545
2546 let active = state & HANDLER_INGRESS_ACTIVE_MASK;
2547 assert!(
2548 active < HANDLER_INGRESS_ACTIVE_MASK,
2549 "too many active handler ingress sends"
2550 );
2551
2552 match self.state.compare_exchange_weak(
2553 state,
2554 state + 1,
2555 Ordering::AcqRel,
2556 Ordering::Acquire,
2557 ) {
2558 Ok(_) => {
2559 return Ok(HandlerIngressGuard {
2560 gate: Arc::clone(self),
2561 });
2562 }
2563 Err(next_state) => state = next_state,
2564 }
2565 }
2566 }
2567
2568 fn drain(&self) {
2569 let mut state = self.state.load(Ordering::Acquire);
2570 loop {
2571 if state & HANDLER_INGRESS_DRAINING != 0 {
2572 break;
2573 }
2574 match self.state.compare_exchange_weak(
2575 state,
2576 state | HANDLER_INGRESS_DRAINING,
2577 Ordering::AcqRel,
2578 Ordering::Acquire,
2579 ) {
2580 Ok(_) => break,
2581 Err(next_state) => state = next_state,
2582 }
2583 }
2584
2585 let mut wait_guard = self.wait_lock.lock().unwrap();
2586 while self.state.load(Ordering::Acquire) & HANDLER_INGRESS_ACTIVE_MASK != 0 {
2587 wait_guard = self.drained.wait(wait_guard).unwrap();
2588 }
2589 }
2590}
2591
2592impl Drop for HandlerIngressGuard {
2593 fn drop(&mut self) {
2594 let previous = self.gate.state.fetch_sub(1, Ordering::AcqRel);
2595 assert!(
2596 previous & HANDLER_INGRESS_ACTIVE_MASK != 0,
2597 "handler ingress active count underflow"
2598 );
2599 if previous & HANDLER_INGRESS_DRAINING != 0 && previous & HANDLER_INGRESS_ACTIVE_MASK == 1 {
2600 let _wait_guard = self.gate.wait_lock.lock().unwrap();
2606 self.gate.drained.notify_all();
2607 }
2608 }
2609}
2610
2611struct HandlerPortSender<M: Message> {
2612 sender: UnboundedPortSender<M>,
2613 gate: Arc<HandlerIngressGate>,
2614}
2615
2616impl<M: Message> HandlerPortSender<M> {
2617 fn new(sender: UnboundedPortSender<M>, gate: Arc<HandlerIngressGate>) -> Self {
2618 Self { sender, gate }
2619 }
2620
2621 fn send(&self, headers: Flattrs, message: M) -> Result<(), anyhow::Error> {
2622 let _guard = self.gate.try_enter()?;
2623 self.sender.send(headers, message)
2624 }
2625}
2626
2627struct UnboundedSender<M: Message> {
2628 sender: UnboundedPortSender<M>,
2629 port_id: PortAddr,
2630}
2631
2632impl<M: Message> UnboundedSender<M> {
2633 fn new(sender: UnboundedPortSender<M>, port_id: PortAddr) -> Self {
2636 Self { sender, port_id }
2637 }
2638
2639 #[allow(dead_code)]
2640 fn send(&self, headers: Flattrs, message: M) -> Result<(), MailboxSenderError> {
2641 self.sender.send(headers, message).map_err(|err| {
2642 MailboxSenderError::new_bound(self.port_id.clone(), classify_sender_error(err))
2643 })
2644 }
2645}
2646
2647impl<M: Message> Clone for UnboundedSender<M> {
2651 fn clone(&self) -> Self {
2652 Self {
2653 sender: self.sender.clone(),
2654 port_id: self.port_id.clone(),
2655 }
2656 }
2657}
2658
2659impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
2660 fn as_any(&self) -> &dyn Any {
2661 self
2662 }
2663
2664 fn send_serialized(
2665 &self,
2666 headers: Flattrs,
2667 serialized: wirevalue::Any,
2668 ) -> Result<SerializedSendDisposition, SerializedSendFailure> {
2669 match serialized.deserialized_unchecked() {
2675 Ok(message) => match self.sender.send(headers.clone(), message) {
2676 Ok(()) => Ok(SerializedSendDisposition::Delivered),
2677 Err(_) if matches!(&self.sender, UnboundedPortSender::Mpsc(_)) => {
2678 Err(SerializedSendFailure::Dead {
2679 data: serialized,
2680 headers,
2681 })
2682 }
2683 Err(err) => Err(SerializedSendFailure::Error(SerializedSendError {
2684 data: serialized,
2685 error: MailboxSenderError::new_bound(
2686 self.port_id.clone(),
2687 classify_sender_error(err),
2688 ),
2689 headers,
2690 })),
2691 },
2692 Err(err) => Err(SerializedSendFailure::Error(SerializedSendError {
2693 data: serialized,
2694 error: MailboxSenderError::new_bound(
2695 self.port_id.clone(),
2696 MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2697 ),
2698 headers,
2699 })),
2700 }
2701 }
2702}
2703
2704#[derive(Debug)]
2707struct OnceSender<M: Message> {
2708 sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
2709 port_id: PortAddr,
2710}
2711
2712impl<M: Message> OnceSender<M> {
2713 fn new(sender: oneshot::Sender<M>, port_id: PortAddr) -> Self {
2716 Self {
2717 sender: Arc::new(Mutex::new(Some(sender))),
2718 port_id,
2719 }
2720 }
2721
2722 fn send_once(&self, message: M) -> Result<SerializedSendDisposition, MailboxSenderError> {
2723 match self.sender.lock().unwrap().take() {
2725 None => Err(MailboxSenderError::new_bound(
2726 self.port_id.clone(),
2727 MailboxSenderErrorKind::Closed,
2728 )),
2729 Some(sender) => {
2730 sender.send(message).map_err(|_| {
2731 MailboxSenderError::new_bound(
2736 self.port_id.clone(),
2737 MailboxSenderErrorKind::Closed,
2738 )
2739 })?;
2740 Ok(SerializedSendDisposition::DeliveredAndExhausted)
2741 }
2742 }
2743 }
2744}
2745
2746impl<M: Message> Clone for OnceSender<M> {
2750 fn clone(&self) -> Self {
2751 Self {
2752 sender: self.sender.clone(),
2753 port_id: self.port_id.clone(),
2754 }
2755 }
2756}
2757
2758impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2759 fn as_any(&self) -> &dyn Any {
2760 self
2761 }
2762
2763 fn send_serialized(
2764 &self,
2765 headers: Flattrs,
2766 serialized: wirevalue::Any,
2767 ) -> Result<SerializedSendDisposition, SerializedSendFailure> {
2768 match serialized.deserialized() {
2769 Ok(message) => self
2770 .send_once(message)
2771 .map_err(|_| SerializedSendFailure::Dead {
2772 data: serialized,
2773 headers,
2774 }),
2775 Err(err) => Err(SerializedSendFailure::Error(SerializedSendError {
2776 data: serialized,
2777 error: MailboxSenderError::new_bound(
2778 self.port_id.clone(),
2779 MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2780 ),
2781 headers,
2782 })),
2783 }
2784 }
2785}
2786
2787pub(crate) struct UntypedUnboundedSender {
2789 pub(crate) sender: Box<
2790 dyn Fn(Flattrs, wirevalue::Any) -> Result<SerializedSendDisposition, SerializedSendFailure>
2791 + Send
2792 + Sync,
2793 >,
2794}
2795
2796impl SerializedSender for UntypedUnboundedSender {
2797 fn as_any(&self) -> &dyn Any {
2798 self
2799 }
2800
2801 fn send_serialized(
2802 &self,
2803 headers: Flattrs,
2804 serialized: wirevalue::Any,
2805 ) -> Result<SerializedSendDisposition, SerializedSendFailure> {
2806 (self.sender)(headers, serialized)
2807 }
2808}
2809
2810struct State {
2812 actor_id: ActorAddr,
2814
2815 ports: DashMap<u64, Arc<dyn SerializedSender>>,
2819
2820 next_port: AtomicU64,
2822
2823 closed: RwLock<Option<ActorStatus>>,
2826
2827 handler_ingress: Arc<HandlerIngressGate>,
2829}
2830
2831impl State {
2832 fn new(actor_id: ActorAddr) -> Self {
2834 Self {
2835 actor_id,
2836 ports: DashMap::new(),
2837 next_port: AtomicU64::new(USER_PORT_OFFSET),
2840 closed: RwLock::new(None),
2841 handler_ingress: Arc::new(HandlerIngressGate::new()),
2842 }
2843 }
2844
2845 fn allocate_port(&self) -> u64 {
2847 self.next_port.fetch_add(1, Ordering::SeqCst)
2848 }
2849}
2850
2851impl fmt::Debug for State {
2852 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2853 f.debug_struct("State")
2854 .field("actor_id", &self.actor_id)
2855 .field(
2856 "open_ports",
2857 &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2858 )
2859 .field("next_port", &self.next_port)
2860 .finish()
2861 }
2862}
2863
2864#[derive(Clone)]
2868pub struct MailboxMuxer {
2869 mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2870}
2871
2872impl Default for MailboxMuxer {
2873 fn default() -> Self {
2874 Self::new()
2875 }
2876}
2877
2878impl MailboxMuxer {
2879 pub fn new() -> Self {
2881 Self {
2882 mailboxes: Arc::new(DashMap::new()),
2883 }
2884 }
2885
2886 pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
2891 match self.mailboxes.entry(actor_id) {
2892 Entry::Occupied(_) => false,
2893 Entry::Vacant(entry) => {
2894 entry.insert(Box::new(sender));
2895 true
2896 }
2897 }
2898 }
2899
2900 pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2902 self.bind(mailbox.actor_addr().id().clone(), mailbox)
2903 }
2904
2905 #[allow(dead_code)]
2909 pub(crate) fn unbind(&self, actor_id: &ActorId) {
2910 self.mailboxes.remove(actor_id);
2911 }
2912}
2913
2914#[async_trait]
2915impl MailboxSender for MailboxMuxer {
2916 fn post_unchecked(
2917 &self,
2918 envelope: MessageEnvelope,
2919 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2920 ) {
2921 let dest_actor_ref = envelope.dest().actor_addr();
2922 match self.mailboxes.get(dest_actor_ref.id()) {
2923 None => {
2924 let err = format!(
2925 "no mailbox for actor {} registered in muxer",
2926 dest_actor_ref
2927 );
2928 envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2929 }
2930 Some(sender) => sender.post(envelope, return_handle),
2931 }
2932 }
2933
2934 async fn flush(&self) -> Result<(), anyhow::Error> {
2935 let keys: Vec<_> = self
2936 .mailboxes
2937 .iter()
2938 .map(|entry| entry.key().clone())
2939 .collect();
2940 for key in keys {
2941 if let Some(sender) = self.mailboxes.get(&key) {
2942 sender.value().flush().await?;
2943 }
2944 }
2945 Ok(())
2946 }
2947}
2948
2949#[derive(Clone)]
2952pub struct MailboxRouter {
2953 entries: Arc<RwLock<BTreeMap<Addr, Arc<dyn MailboxSender + Send + Sync>>>>,
2954}
2955
2956impl Default for MailboxRouter {
2957 fn default() -> Self {
2958 Self::new()
2959 }
2960}
2961
2962impl MailboxRouter {
2963 pub fn new() -> Self {
2965 Self {
2966 entries: Arc::new(RwLock::new(BTreeMap::new())),
2967 }
2968 }
2969
2970 pub fn downgrade(&self) -> WeakMailboxRouter {
2972 WeakMailboxRouter(Arc::downgrade(&self.entries))
2973 }
2974
2975 pub fn fallback(&self, default: BoxedMailboxSender) -> BoxedMailboxSender {
2979 FallbackMailboxRouter {
2980 router: self.clone(),
2981 default,
2982 }
2983 .into_boxed()
2984 }
2985
2986 pub fn bind(&self, dest: impl Into<Addr>, sender: impl MailboxSender + 'static) {
2990 let dest = dest.into();
2991 let mut w = self.entries.write().unwrap();
2992 w.insert(dest, Arc::new(sender));
2993 }
2994
2995 pub fn unbind(&self, dest: &Addr) {
2999 let mut w = self.entries.write().unwrap();
3000 w.remove(dest);
3001 }
3002
3003 fn sender(&self, actor_ref: &ActorAddr) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
3004 let reference = Addr::from(actor_ref.clone());
3005 match self
3006 .entries
3007 .read()
3008 .unwrap()
3009 .lower_bound(Excluded(&reference))
3010 .prev()
3011 {
3012 None => None,
3013 Some((key, sender)) if key.is_prefix_of(&reference) => Some(sender.clone()),
3014 Some(_) => None,
3015 }
3016 }
3017}
3018
3019#[async_trait]
3020impl MailboxSender for MailboxRouter {
3021 fn post_unchecked(
3022 &self,
3023 envelope: MessageEnvelope,
3024 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3025 ) {
3026 let dest_actor_ref = envelope.dest().actor_addr();
3027 match self.sender(&dest_actor_ref) {
3028 None => envelope.undeliverable(
3029 DeliveryError::Unroutable(
3030 "no destination found for actor in routing table".to_string(),
3031 ),
3032 return_handle,
3033 ),
3034 Some(sender) => sender.post(envelope, return_handle),
3035 }
3036 }
3037
3038 async fn flush(&self) -> Result<(), anyhow::Error> {
3039 let senders: Vec<_> = self.entries.read().unwrap().values().cloned().collect();
3040 let futs: Vec<_> = senders.iter().map(|s| s.flush()).collect();
3041 futures::future::try_join_all(futs).await?;
3042 Ok(())
3043 }
3044}
3045
3046#[derive(Clone)]
3049pub struct FallbackMailboxRouter {
3050 router: MailboxRouter,
3051 default: BoxedMailboxSender,
3052}
3053
3054impl FallbackMailboxRouter {
3055 pub fn default_sender(&self) -> &BoxedMailboxSender {
3057 &self.default
3058 }
3059}
3060
3061#[async_trait]
3062impl MailboxSender for FallbackMailboxRouter {
3063 fn post_unchecked(
3064 &self,
3065 envelope: MessageEnvelope,
3066 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3067 ) {
3068 let dest_actor_ref = envelope.dest().actor_addr();
3069 match self.router.sender(&dest_actor_ref) {
3070 Some(sender) => sender.post(envelope, return_handle),
3071 None => self.default.post(envelope, return_handle),
3072 }
3073 }
3074
3075 async fn flush(&self) -> Result<(), anyhow::Error> {
3076 let (r1, r2) = futures::future::join(self.router.flush(), self.default.flush()).await;
3077 r1?;
3078 r2?;
3079 Ok(())
3080 }
3081}
3082
3083#[derive(Debug, Clone)]
3092pub struct WeakMailboxRouter(Weak<RwLock<BTreeMap<Addr, Arc<dyn MailboxSender + Send + Sync>>>>);
3093
3094impl WeakMailboxRouter {
3095 pub fn upgrade(&self) -> Option<MailboxRouter> {
3097 self.0.upgrade().map(|entries| MailboxRouter { entries })
3098 }
3099}
3100
3101#[async_trait]
3102impl MailboxSender for WeakMailboxRouter {
3103 fn post_unchecked(
3104 &self,
3105 envelope: MessageEnvelope,
3106 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3107 ) {
3108 match self.upgrade() {
3109 Some(router) => router.post(envelope, return_handle),
3110 None => envelope.undeliverable(
3111 DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
3112 return_handle,
3113 ),
3114 }
3115 }
3116
3117 async fn flush(&self) -> Result<(), anyhow::Error> {
3118 match self.upgrade() {
3119 Some(router) => router.flush().await,
3120 None => Ok(()),
3121 }
3122 }
3123}
3124
3125#[derive(Clone)]
3139pub struct DialMailboxRouter {
3140 address_book: Arc<RwLock<BTreeMap<Addr, ChannelAddr>>>,
3141 sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
3142
3143 default: BoxedMailboxSender,
3146
3147 direct_addressed_remote_only: bool,
3150}
3151
3152impl Default for DialMailboxRouter {
3153 fn default() -> Self {
3154 Self::new()
3155 }
3156}
3157
3158impl DialMailboxRouter {
3159 pub fn new() -> Self {
3161 Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
3162 }
3163
3164 pub fn new_with_default(default: BoxedMailboxSender) -> Self {
3169 Self {
3170 address_book: Arc::new(RwLock::new(BTreeMap::new())),
3171 sender_cache: Arc::new(DashMap::new()),
3172 default,
3173 direct_addressed_remote_only: false,
3174 }
3175 }
3176
3177 pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
3182 Self {
3183 address_book: Arc::new(RwLock::new(BTreeMap::new())),
3184 sender_cache: Arc::new(DashMap::new()),
3185 default,
3186 direct_addressed_remote_only: true,
3187 }
3188 }
3189
3190 pub fn bind(&self, dest: impl Into<Addr>, addr: ChannelAddr) {
3196 let dest = dest.into();
3197 if let Ok(mut w) = self.address_book.write() {
3198 if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
3199 && old_addr != addr
3200 {
3201 tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
3202 self.sender_cache.remove(&old_addr);
3203 }
3204 } else {
3205 tracing::error!("address book poisoned during bind of {:?}", dest);
3206 }
3207 }
3208
3209 pub fn unbind(&self, dest: &Addr) {
3215 if let Ok(mut w) = self.address_book.write() {
3216 let to_remove: Vec<(Addr, ChannelAddr)> = w
3217 .range(dest..)
3218 .take_while(|(key, _)| dest.is_prefix_of(key))
3219 .map(|(key, addr)| (key.clone(), addr.clone()))
3220 .collect();
3221
3222 for (key, addr) in to_remove {
3223 tracing::info!("unbinding {:?} from {:?}", key, addr);
3224 w.remove(&key);
3225 self.sender_cache.remove(&addr);
3226 }
3227 } else {
3228 tracing::error!("address book poisoned during unbind of {:?}", dest);
3229 }
3230 }
3231
3232 pub fn lookup_addr(&self, actor_ref: &ActorAddr) -> Option<ChannelAddr> {
3234 let address_book = self.address_book.read().unwrap();
3235 let reference = Addr::from(actor_ref.clone());
3236 let found = address_book.lower_bound(Excluded(&reference)).prev();
3237
3238 if let Some((key, addr)) = found
3241 && key.is_prefix_of(&reference)
3242 {
3243 Some(addr.clone())
3244 } else {
3245 let addr = actor_ref.addr().clone();
3246 if self.direct_addressed_remote_only {
3247 addr.transport().is_remote().then_some(addr)
3248 } else {
3249 Some(addr)
3250 }
3251 }
3252 }
3253
3254 pub fn prefixes(&self) -> BTreeSet<Addr> {
3257 let addrs = self.address_book.read().unwrap();
3258 let mut prefixes: BTreeSet<Addr> = BTreeSet::new();
3259 for (reference, _) in addrs.iter() {
3260 match prefixes.lower_bound(Excluded(reference)).peek_prev() {
3261 Some(candidate) if candidate.is_prefix_of(reference) => (),
3262 _ => {
3263 prefixes.insert(reference.clone());
3264 }
3265 }
3266 }
3267
3268 prefixes
3269 }
3270
3271 fn dial(
3272 &self,
3273 addr: &ChannelAddr,
3274 actor_ref: &ActorAddr,
3275 ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
3276 match self.sender_cache.entry(addr.clone()) {
3280 Entry::Occupied(entry) => Ok(entry.get().clone()),
3281 Entry::Vacant(entry) => {
3282 let tx = channel::dial(addr.clone()).map_err(|err| {
3283 MailboxSenderError::new_unbound_type(
3284 actor_ref.clone(),
3285 MailboxSenderErrorKind::Channel(err),
3286 "unknown",
3287 )
3288 })?;
3289 let sender = MailboxClient::new(tx);
3290 Ok(entry.insert(Arc::new(sender)).value().clone())
3291 }
3292 }
3293 }
3294}
3295
3296#[async_trait]
3297impl MailboxSender for DialMailboxRouter {
3298 fn post_unchecked(
3299 &self,
3300 envelope: MessageEnvelope,
3301 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3302 ) {
3303 let dest_actor_ref = envelope.dest().actor_addr();
3304 let Some(addr) = self.lookup_addr(&dest_actor_ref) else {
3305 self.default.post(envelope, return_handle);
3306 return;
3307 };
3308
3309 match self.dial(&addr, &dest_actor_ref) {
3310 Err(err) => envelope.undeliverable(
3311 DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
3312 return_handle,
3313 ),
3314 Ok(sender) => sender.post(envelope, return_handle),
3315 }
3316 }
3317
3318 async fn flush(&self) -> Result<(), anyhow::Error> {
3319 let senders: Vec<_> = self
3320 .sender_cache
3321 .iter()
3322 .map(|entry| entry.value().clone())
3323 .collect();
3324 let mut futs: Vec<_> = senders.iter().map(|s| s.flush()).collect();
3325 futs.push(self.default.flush());
3326 futures::future::try_join_all(futs).await?;
3327 Ok(())
3328 }
3329}
3330
3331#[derive(Debug)]
3334pub struct UnroutableMailboxSender;
3335
3336#[async_trait]
3337impl MailboxSender for UnroutableMailboxSender {
3338 fn post_unchecked(
3339 &self,
3340 envelope: MessageEnvelope,
3341 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3342 ) {
3343 envelope.undeliverable(
3344 DeliveryError::Unroutable("destination not found in routing table".to_string()),
3345 return_handle,
3346 );
3347 }
3348}
3349
3350#[cfg(test)]
3351mod tests {
3352
3353 use std::assert_matches;
3354 use std::mem::drop;
3355 use std::sync::atomic::AtomicUsize;
3356 use std::time::Duration;
3357
3358 use timed_test::async_timed_test;
3359
3360 use super::*;
3361 use crate::Actor;
3362 use crate::ActorHandle;
3363 use crate::Instance;
3364 use crate::accum;
3365 use crate::accum::ReducerMode;
3366 use crate::channel::ChannelTransport;
3367 use crate::context::Mailbox as MailboxContext;
3368 use crate::endpoint::Endpoint as _;
3369 use crate::proc::Proc;
3370 use crate::testing::ids::test_actor_id;
3371 use crate::testing::ids::test_port_id;
3372 use crate::testing::ids::test_proc_id;
3373
3374 fn test_proc_ref(name: &str) -> Addr {
3375 Addr::Proc(test_proc_id(name))
3376 }
3377
3378 fn test_actor_ref(proc_name: &str, actor_name: &str) -> Addr {
3379 Addr::Actor(test_actor_id(proc_name, actor_name))
3380 }
3381
3382 #[test]
3383 fn test_error() {
3384 use crate::testing::ids::test_actor_id;
3385 let err = MailboxError::new(
3386 test_actor_id("myworld_2", "myactor"),
3387 MailboxErrorKind::Closed,
3388 );
3389 let err_str = format!("{err}");
3391 assert!(
3392 err_str.contains("mailbox closed"),
3393 "expected error: {}",
3394 err_str
3395 );
3396 assert!(
3397 err_str.contains("@"),
3398 "expected ref-style location separator in {err_str}"
3399 );
3400 }
3401
3402 #[tokio::test]
3403 async fn test_mailbox_basic() {
3404 let mbox = Mailbox::new(test_actor_id("0", "test"));
3405 let (port, mut receiver) = mbox.open_port::<u64>();
3406 let port = port.bind();
3407
3408 mbox.serialize_and_send(&port, 123, monitored_return_handle())
3409 .unwrap();
3410 mbox.serialize_and_send(&port, 321, monitored_return_handle())
3411 .unwrap();
3412 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3413 assert_eq!(receiver.recv().await.unwrap(), 321u64);
3414
3415 let serialized = wirevalue::Any::serialize(&999u64).unwrap();
3416 mbox.post(
3417 MessageEnvelope::new_unknown(port.port_addr().clone(), serialized),
3418 monitored_return_handle(),
3419 );
3420 assert_eq!(receiver.recv().await.unwrap(), 999u64);
3421 }
3422
3423 #[tokio::test]
3424 async fn test_mailbox_rejects_messages_for_other_actors() {
3425 let mbox = Mailbox::new(test_actor_id("0", "owner"));
3426 let dest = test_actor_id("0", "other").port_addr(Port::from(1234));
3427 let envelope =
3428 MessageEnvelope::serialize(mbox.actor_addr().clone(), dest, &42u64, Flattrs::new())
3429 .expect("serialize");
3430 let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
3431
3432 mbox.post(envelope, return_handle);
3433
3434 let Undeliverable::Message(undelivered) =
3435 tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
3436 .await
3437 .expect("timed out waiting for undeliverable")
3438 .expect("return port closed")
3439 else {
3440 panic!("expected returned message");
3441 };
3442 assert!(
3443 undelivered
3444 .error_msg()
3445 .expect("expected error")
3446 .contains("cannot deliver to")
3447 );
3448 }
3449
3450 #[tokio::test]
3451 async fn test_mailbox_accum() {
3452 let proc = Proc::isolated();
3453 let (client, _) = proc.client("client").unwrap();
3454 let (port, mut receiver) = client
3455 .mailbox()
3456 .open_accum_port(accum::join_semilattice::<accum::Max<i64>>());
3457
3458 for i in -3..4 {
3459 port.post(&client, accum::Max(i));
3460 let received: accum::Max<i64> = receiver.recv().await.unwrap();
3461 let msg = received.get();
3462 assert_eq!(msg, &i);
3463 }
3464 for i in -3..4 {
3466 port.post(&client, accum::Max(i));
3467 assert_eq!(receiver.recv().await.unwrap().get(), &3);
3468 }
3469 port.post(&client, accum::Max(4));
3471 assert_eq!(receiver.recv().await.unwrap().get(), &4);
3472
3473 for i in 5..10 {
3475 port.post(&client, accum::Max(i));
3476 }
3477 assert_eq!(receiver.recv().await.unwrap().get(), &9);
3478 port.post(&client, accum::Max(1));
3479 port.post(&client, accum::Max(3));
3480 port.post(&client, accum::Max(2));
3481 assert_eq!(receiver.recv().await.unwrap().get(), &9);
3482 }
3483
3484 #[test]
3485 fn test_port_and_reducer() {
3486 let mbox = Mailbox::new(test_actor_id("0", "test"));
3487 {
3489 let accumulator = accum::join_semilattice::<accum::Max<u64>>();
3490 let reducer_spec = accumulator.reducer_spec().unwrap();
3491 let (port, _) = mbox.open_accum_port(accum::join_semilattice::<accum::Max<u64>>());
3492 assert_eq!(port.inner.reducer_spec, Some(reducer_spec.clone()));
3493 let port_ref = port.bind();
3494 assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
3495 }
3496 {
3498 let (port, _) = mbox.open_port::<u64>();
3499 assert_eq!(port.inner.reducer_spec, None);
3500 let port_ref = port.bind();
3501 assert_eq!(port_ref.reducer_spec(), &None);
3502 }
3503 }
3504
3505 #[tokio::test]
3506 #[ignore] async fn test_mailbox_once() {
3508 let proc = Proc::isolated();
3509 let (client, _) = proc.client("client").unwrap();
3510
3511 let (port, receiver) = client.open_once_port::<u64>();
3512
3513 port.post(&client, 123u64);
3516 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3517
3518 }
3529
3530 #[tokio::test]
3531 #[ignore] async fn test_mailbox_receiver_drop() {
3533 let mbox = Mailbox::new(test_actor_id("0", "test"));
3534 let (port, mut receiver) = mbox.open_port::<u64>();
3535 let port = port.bind();
3537 mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
3538 .unwrap();
3539 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3540 drop(receiver);
3541 let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
3542 panic!();
3543 };
3544
3545 assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
3546 assert_matches!(err.location(), PortLocation::Bound(bound) if *bound == *port.port_addr());
3547 }
3548
3549 #[tokio::test]
3550 async fn test_mailbox_type_mismatch_does_not_evict_unbounded_port() {
3551 let mbox = Mailbox::new(test_actor_id("0", "test"));
3552 let (port, mut receiver) = mbox.open_port::<u64>();
3553 let port = port.bind();
3554 let port_index = port.port_addr().index();
3555 let (return_handle, mut return_receiver) =
3556 crate::mailbox::undeliverable::new_undeliverable_port();
3557
3558 let wrong_message = wirevalue::Any::serialize(&TestMessage).unwrap();
3559 mbox.post(
3560 MessageEnvelope::new_unknown(port.port_addr().clone(), wrong_message),
3561 return_handle.clone(),
3562 );
3563
3564 let envelope = tokio::time::timeout(Duration::from_secs(1), return_receiver.recv())
3565 .await
3566 .expect("undeliverable mismatch should arrive")
3567 .unwrap()
3568 .into_message()
3569 .expect("expected returned envelope");
3570 assert!(
3571 envelope
3572 .error_msg()
3573 .is_some_and(|message| message.contains("deserialization error")),
3574 "expected deserialization error in {envelope}",
3575 );
3576 assert!(
3577 mbox.inner.ports.contains_key(&port_index),
3578 "deserialization mismatch should not evict reusable port",
3579 );
3580
3581 mbox.serialize_and_send(&port, 123u64, return_handle)
3582 .unwrap();
3583 assert_eq!(
3584 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
3585 .await
3586 .expect("valid message should still be delivered")
3587 .unwrap(),
3588 123u64
3589 );
3590 }
3591
3592 #[tokio::test]
3593 async fn test_mailbox_closed_unbounded_port_is_removed_after_send_failure() {
3594 let mbox = Mailbox::new(test_actor_id("0", "test"));
3595 let port_index = mbox.allocate_port();
3596 let port_id = mbox.actor_addr().port_addr(Port::from(port_index));
3597 let port = crate::PortRef::attest(port_id.clone());
3598 let (return_handle, mut return_receiver) =
3599 crate::mailbox::undeliverable::new_undeliverable_port();
3600 let (sender, receiver) = mpsc::unbounded_channel::<u64>();
3601
3602 drop(receiver);
3603
3604 mbox.inner.ports.insert(
3605 port_index,
3606 Arc::new(UnboundedSender::new(
3607 UnboundedPortSender::Mpsc(sender),
3608 port_id,
3609 )),
3610 );
3611
3612 mbox.serialize_and_send(&port, 123u64, return_handle.clone())
3613 .unwrap();
3614
3615 let envelope = tokio::time::timeout(Duration::from_secs(1), return_receiver.recv())
3616 .await
3617 .expect("closed port should produce undeliverable")
3618 .unwrap()
3619 .into_message()
3620 .expect("expected returned envelope");
3621 let first_error = envelope.error_msg().expect("expected delivery error");
3622 assert!(
3623 first_error.contains("port not bound in mailbox"),
3624 "expected unbound-port error in {envelope}",
3625 );
3626 assert!(
3627 !mbox.inner.ports.contains_key(&port_index),
3628 "dead reusable port should be removed after send failure",
3629 );
3630
3631 mbox.serialize_and_send(&port, 456u64, return_handle)
3632 .unwrap();
3633 let envelope = tokio::time::timeout(Duration::from_secs(1), return_receiver.recv())
3634 .await
3635 .expect("removed port should produce unbound undeliverable")
3636 .unwrap()
3637 .into_message()
3638 .expect("expected returned envelope");
3639 let second_error = envelope.error_msg().expect("expected delivery error");
3640 assert_eq!(
3641 first_error, second_error,
3642 "dead-port undeliverable should match unbound-port undeliverable exactly",
3643 );
3644 }
3645
3646 #[tokio::test]
3647 async fn test_mailbox_once_type_mismatch_preserves_sender_until_delivery() {
3648 let mbox = Mailbox::new(test_actor_id("0", "test"));
3649 let (port, receiver) = mbox.open_once_port::<u64>();
3650 let port = port.bind();
3651 let port_index = port.port_addr().index();
3652 let (return_handle, mut return_receiver) =
3653 crate::mailbox::undeliverable::new_undeliverable_port();
3654
3655 let wrong_message = wirevalue::Any::serialize(&TestMessage).unwrap();
3656 mbox.post(
3657 MessageEnvelope::new_unknown(port.port_addr().clone(), wrong_message),
3658 return_handle.clone(),
3659 );
3660
3661 let envelope = tokio::time::timeout(Duration::from_secs(1), return_receiver.recv())
3662 .await
3663 .expect("once-port mismatch should arrive")
3664 .unwrap()
3665 .into_message()
3666 .expect("expected returned envelope");
3667 assert!(
3668 envelope
3669 .error_msg()
3670 .is_some_and(|message| message.contains("deserialization error")),
3671 "expected deserialization error in {envelope}",
3672 );
3673 assert!(
3674 mbox.inner.ports.contains_key(&port_index),
3675 "once port should survive deserialization mismatch before delivery",
3676 );
3677
3678 mbox.serialize_and_send_once(port, 123u64, return_handle)
3679 .unwrap();
3680 assert_eq!(
3681 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
3682 .await
3683 .expect("valid once message should still be delivered")
3684 .unwrap(),
3685 123u64
3686 );
3687 assert!(
3688 !mbox.inner.ports.contains_key(&port_index),
3689 "successful once send should remove the sender entry",
3690 );
3691 }
3692
3693 #[tokio::test]
3694 async fn test_drain() {
3695 let mbox = Mailbox::new(test_actor_id("0", "test"));
3696
3697 let (port, mut receiver) = mbox.open_port();
3698 let port = port.bind();
3699
3700 for i in 0..10 {
3701 mbox.serialize_and_send(&port, i, monitored_return_handle())
3702 .unwrap();
3703 }
3704
3705 for i in 0..10 {
3706 assert_eq!(receiver.recv().await.unwrap(), i);
3707 }
3708
3709 assert!(receiver.drain().is_empty());
3710 }
3711
3712 #[tokio::test]
3713 async fn test_mailbox_muxer() {
3714 let muxer = MailboxMuxer::new();
3715
3716 let mbox0 = Mailbox::new(test_actor_id("0", "actor1"));
3717 let mbox1 = Mailbox::new(test_actor_id("0", "actor2"));
3718
3719 muxer.bind(mbox0.actor_addr().id().clone(), mbox0.clone());
3720 muxer.bind(mbox1.actor_addr().id().clone(), mbox1.clone());
3721
3722 let (port, receiver) = mbox0.open_once_port::<u64>();
3723
3724 let proc = Proc::configured(test_proc_id("0"), BoxedMailboxSender::new(muxer));
3725 let (client, _) = proc.client("client").unwrap();
3726
3727 port.post(&client, 123u64);
3728 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3729
3730 }
3740
3741 #[tokio::test]
3742 async fn test_local_client_server() {
3743 let mbox = Mailbox::new(test_actor_id("0", "actor0"));
3744 let (tx, rx) = channel::local::new();
3745 let serve_handle = mbox.clone().serve(rx);
3746 let client = MailboxClient::new(tx);
3747
3748 let (port, receiver) = mbox.open_once_port::<u64>();
3749 let port = port.bind();
3750
3751 client
3752 .serialize_and_send_once(port, 123u64, monitored_return_handle())
3753 .unwrap();
3754 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3755 serve_handle.stop("fromt test");
3756 serve_handle.await.unwrap().unwrap();
3757 }
3758
3759 #[tokio::test]
3760 async fn test_mailbox_router() {
3761 let mbox0 = Mailbox::new(test_actor_id("world0_0", "actor0"));
3762 let mbox1 = Mailbox::new(test_actor_id("world1_0", "actor0"));
3763 let mbox2 = Mailbox::new(test_actor_id("world1_1", "actor0"));
3764 let mbox3 = Mailbox::new(test_actor_id("world1_1", "actor1"));
3765
3766 let comms: Vec<(OncePortRef<u64>, OncePortReceiver<u64>)> =
3767 [&mbox0, &mbox1, &mbox2, &mbox3]
3768 .into_iter()
3769 .map(|mbox| {
3770 let (port, receiver) = mbox.open_once_port::<u64>();
3771 (port.bind(), receiver)
3772 })
3773 .collect();
3774
3775 let router = MailboxRouter::new();
3776
3777 router.bind(test_proc_ref("world0_0"), mbox0);
3778 router.bind(test_proc_ref("world1_0"), mbox1);
3779 router.bind(test_proc_ref("world1_1"), mbox2);
3780 router.bind(test_actor_ref("world1_1", "actor1"), mbox3);
3781
3782 for (i, (port, receiver)) in comms.into_iter().enumerate() {
3783 router
3784 .serialize_and_send_once(port, i as u64, monitored_return_handle())
3785 .unwrap();
3786 assert_eq!(receiver.recv().await.unwrap(), i as u64);
3787 }
3788
3789 let mbox4 = Mailbox::new(test_actor_id("fallback_0", "actor"));
3792
3793 let (return_handle, mut return_receiver) =
3794 crate::mailbox::undeliverable::new_undeliverable_port();
3795 let (port, _receiver) = mbox4.open_once_port();
3796 router
3797 .serialize_and_send_once(port.bind(), 0, return_handle.clone())
3798 .unwrap();
3799 assert!(return_receiver.recv().await.is_ok());
3800
3801 let router = router.fallback(mbox4.clone().into_boxed());
3802 let (port, receiver) = mbox4.open_once_port();
3803 router
3804 .serialize_and_send_once(port.bind(), 0, return_handle)
3805 .unwrap();
3806 assert_eq!(receiver.recv().await.unwrap(), 0);
3807 }
3808
3809 #[tokio::test]
3810 async fn test_dial_mailbox_router() {
3811 let router = DialMailboxRouter::new();
3812
3813 router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
3814 router.bind(test_proc_ref("world1_0"), "unix!@2".parse().unwrap());
3815 router.bind(test_proc_ref("world1_1"), "unix!@3".parse().unwrap());
3816 router.bind(
3817 test_actor_ref("world1_1", "actor1"),
3818 "unix!@4".parse().unwrap(),
3819 );
3820 let direct_actor_ref: ActorAddr =
3824 ProcAddr::singleton("unix:@4".parse().unwrap(), "my_proc").actor_addr("my_actor");
3825 router.bind(
3826 Addr::Actor(direct_actor_ref.clone()),
3827 "unix:@5".parse().unwrap(),
3828 );
3829
3830 router
3832 .lookup_addr(&test_actor_id("world0_0", "actor"))
3833 .unwrap();
3834 router
3835 .lookup_addr(&test_actor_id("world1_0", "actor"))
3836 .unwrap();
3837
3838 let actor_id = direct_actor_ref;
3839 assert_eq!(
3840 router.lookup_addr(&actor_id).unwrap(),
3841 "unix!@5".parse().unwrap(),
3842 );
3843 router.unbind(&actor_id.clone().into());
3844 assert_eq!(
3845 router.lookup_addr(&actor_id).unwrap(),
3846 "unix!@4".parse().unwrap(),
3847 );
3848
3849 let fallback = ChannelAddr::any(ChannelTransport::Local);
3854 router.unbind(&test_proc_ref("world1_0"));
3855 router.unbind(&test_proc_ref("world1_1"));
3856 assert_eq!(
3857 router
3858 .lookup_addr(&test_actor_id("world1_0", "actor1"))
3859 .unwrap(),
3860 fallback,
3861 );
3862 assert_eq!(
3863 router
3864 .lookup_addr(&test_actor_id("world1_1", "actor1"))
3865 .unwrap(),
3866 fallback,
3867 );
3868 router
3869 .lookup_addr(&test_actor_id("world0_0", "actor"))
3870 .unwrap();
3871 router.unbind(&test_proc_ref("world0_0"));
3872 assert_eq!(
3873 router
3874 .lookup_addr(&test_actor_id("world0_0", "actor"))
3875 .unwrap(),
3876 fallback,
3877 );
3878 }
3879
3880 #[tokio::test]
3881 #[ignore] async fn test_dial_mailbox_router_default() {
3883 let mbox0 = Mailbox::new(test_actor_id("world0_0", "actor0"));
3884 let mbox1 = Mailbox::new(test_actor_id("world1_0", "actor0"));
3885 let mbox2 = Mailbox::new(test_actor_id("world1_1", "actor0"));
3886 let mbox3 = Mailbox::new(test_actor_id("world1_1", "actor1"));
3887
3888 let root = MailboxRouter::new();
3891 let world0_router = DialMailboxRouter::new_with_default(root.boxed());
3892 let world1_router = DialMailboxRouter::new_with_default(root.boxed());
3893
3894 root.bind(test_proc_ref("world0"), world0_router.clone());
3895 root.bind(test_proc_ref("world1"), world1_router.clone());
3896
3897 let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
3898
3899 let mut handles = Vec::new(); for mbox in mailboxes.iter() {
3901 let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
3902 let handle = (*mbox).clone().serve(rx);
3903 handles.push(handle);
3904
3905 eprintln!("{}: {}", mbox.actor_addr(), addr);
3906 if mbox
3907 .actor_addr()
3908 .proc_addr()
3909 .label()
3910 .is_some_and(|l| l.as_str().starts_with("world0"))
3911 {
3912 world0_router.bind(Addr::from(mbox.actor_addr().clone()), addr);
3913 } else {
3914 world1_router.bind(Addr::from(mbox.actor_addr().clone()), addr);
3915 }
3916 }
3917
3918 for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
3920 for mbox in mailboxes.iter() {
3921 let (port, receiver) = mbox.open_once_port::<u64>();
3922 let port = port.bind();
3923 router
3924 .serialize_and_send_once(port, 123u64, monitored_return_handle())
3925 .unwrap();
3926 assert_eq!(receiver.recv().await.unwrap(), 123u64);
3927 }
3928 }
3929 }
3930
3931 #[tokio::test]
3932 async fn test_enqueue_port() {
3933 let proc = Proc::isolated();
3934 let (client, _) = proc.client("client").unwrap();
3935
3936 let count = Arc::new(AtomicUsize::new(0));
3937 let count_clone = count.clone();
3938 let port = client.mailbox().open_enqueue_port(move |_, n| {
3939 count_clone.fetch_add(n, Ordering::SeqCst);
3940 Ok(())
3941 });
3942
3943 port.post(&client, 10);
3944 port.post(&client, 5);
3945 port.post(&client, 1);
3946 port.post(&client, 0);
3947
3948 assert_eq!(count.load(Ordering::SeqCst), 16);
3949 }
3950
3951 #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3952 struct TestMessage;
3953
3954 #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3955 #[named(name = "some::custom::path")]
3956 struct TestMessage2;
3957
3958 #[test]
3959 fn test_remote_message_macros() {
3960 assert_eq!(
3961 TestMessage::typename(),
3962 "hyperactor::mailbox::tests::TestMessage"
3963 );
3964 assert_eq!(TestMessage2::typename(), "some::custom::path");
3965 }
3966
3967 #[test]
3968 fn test_message_envelope_display() {
3969 #[derive(typeuri::Named, Serialize, Deserialize)]
3970 struct MyTest {
3971 a: u64,
3972 b: String,
3973 }
3974 wirevalue::register_type!(MyTest);
3975
3976 let envelope = MessageEnvelope::serialize(
3977 test_actor_id("source_0", "actor"),
3978 test_port_id("dest_1", "actor", 123),
3979 &MyTest {
3980 a: 123,
3981 b: "hello".into(),
3982 },
3983 Flattrs::new(),
3984 )
3985 .unwrap();
3986
3987 assert!(format!("{}", envelope).contains("MyTest{\"a\":123,\"b\":\"hello\"}"));
3989 }
3990
3991 #[derive(Debug, Default)]
3992 struct Foo;
3993
3994 impl Actor for Foo {}
3995
3996 #[tokio::test]
3999 async fn test_actor_delivery_failure() {
4000 use crate::actor::ActorStatus;
4003 use crate::testing::proc_supervison::ProcSupervisionCoordinator;
4004
4005 let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
4006 BoxedMailboxSender::new(PanickingMailboxSender),
4007 ));
4008 let proc_id = test_proc_id("quux_0");
4009 let mut proc = Proc::configured(proc_id.clone(), proc_forwarder);
4010 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
4011 let (client, _) = proc.client("client").unwrap();
4012
4013 let foo = proc.spawn("foo", Foo).unwrap();
4014 let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
4015 let message = MessageEnvelope::new(
4016 foo.actor_addr().clone(),
4017 test_port_id("corge_0", "bar", 9999),
4018 wirevalue::Any::serialize(&1u64).unwrap(),
4019 Flattrs::new(),
4020 );
4021 return_handle.post(&client, Undeliverable::Message(message));
4022
4023 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
4024
4025 let foo_status = foo.status();
4026 assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
4027 let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
4028 unreachable!()
4029 };
4030 let msg_str = msg.to_string();
4031 assert!(
4036 msg_str.contains("undeliverable message to"),
4037 "expected destination-named top line, got:\n{msg_str}"
4038 );
4039 assert!(
4040 !msg_str.contains("undeliverable message error"),
4041 "retired neutral fallback must not appear, got:\n{msg_str}"
4042 );
4043 assert!(msg_str.contains("sender:") && msg_str.contains("quux_0"));
4044 assert!(msg_str.contains("dest:") && msg_str.contains("corge_0"));
4045
4046 proc.destroy_and_wait(tokio::time::Duration::from_secs(1), "test cleanup")
4047 .await
4048 .unwrap();
4049 }
4050
4051 #[tokio::test]
4052 async fn test_detached_return_handle() {
4053 let (return_handle, mut return_receiver) =
4054 crate::mailbox::undeliverable::new_undeliverable_port();
4055 let envelope = MessageEnvelope::new(
4057 test_actor_id("foo_0", "bar"),
4058 test_port_id("baz_0", "corge", 9999),
4059 wirevalue::Any::serialize(&1u64).unwrap(),
4060 Flattrs::new(),
4061 );
4062 let proc = Proc::isolated();
4063 let (client, _) = proc.client("client").unwrap();
4064 return_handle.post(&client, Undeliverable::Message(envelope.clone()));
4065 assert!(
4067 tokio::time::timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
4068 .await
4069 .is_ok()
4070 );
4071 let monitor_handle = tokio::spawn(async move {
4074 while let Ok(Undeliverable::Message(mut envelope)) = return_receiver.recv().await {
4075 envelope.set_error(DeliveryError::BrokenLink(
4076 "returned in unit test".to_string(),
4077 ));
4078 UndeliverableMailboxSender
4079 .post(envelope, monitored_return_handle());
4080 }
4081 });
4082 drop(return_handle);
4083 assert!(
4084 tokio::time::timeout(tokio::time::Duration::from_secs(1), monitor_handle)
4085 .await
4086 .is_ok()
4087 );
4088 }
4089
4090 async fn verify_receiver(coalesce: bool, drop_sender: bool) {
4091 fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
4092 let dummy_actor_ref: ActorAddr = test_actor_id("world_0", "actor");
4095 let dummy_state = State::new(dummy_actor_ref.clone());
4096 let dummy_port_id = dummy_actor_ref.port_addr(Port::from(0));
4097 let (sender, receiver) = mpsc::unbounded_channel::<M>();
4098 let receiver = PortReceiver {
4099 receiver,
4100 port_id: dummy_port_id,
4101 coalesce,
4102 mailbox: Mailbox {
4103 inner: Arc::new(dummy_state),
4104 },
4105 };
4106 (sender, receiver)
4107 }
4108
4109 {
4111 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
4112 assert!(receiver.drain().is_empty());
4113
4114 sender.send(0).unwrap();
4115 sender.send(1).unwrap();
4116 sender.send(2).unwrap();
4117 sender.send(3).unwrap();
4118 sender.send(4).unwrap();
4119 sender.send(5).unwrap();
4120 sender.send(6).unwrap();
4121 sender.send(7).unwrap();
4122
4123 if drop_sender {
4124 drop(sender);
4125 }
4126
4127 if !coalesce {
4128 assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
4129 } else {
4130 assert_eq!(receiver.drain(), vec![7]);
4131 }
4132
4133 assert!(receiver.drain().is_empty());
4134 assert!(receiver.drain().is_empty());
4135 }
4136
4137 {
4139 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
4140 assert!(receiver.try_recv().unwrap().is_none());
4141
4142 sender.send(0).unwrap();
4143 sender.send(1).unwrap();
4144 sender.send(2).unwrap();
4145 sender.send(3).unwrap();
4146
4147 if drop_sender {
4148 drop(sender);
4149 }
4150
4151 if !coalesce {
4152 assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
4153 assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
4154 assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
4155 }
4156 assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
4157 if drop_sender {
4158 assert_matches!(
4159 receiver.try_recv().unwrap_err().kind(),
4160 MailboxErrorKind::Closed
4161 );
4162 assert_matches!(
4164 receiver.try_recv().unwrap_err().kind(),
4165 MailboxErrorKind::Closed
4166 );
4167 } else {
4168 assert!(receiver.try_recv().unwrap().is_none());
4169 assert!(receiver.try_recv().unwrap().is_none());
4171 }
4172 }
4173 {
4175 let (sender, mut receiver) = create_receiver::<u64>(coalesce);
4176 assert!(
4177 tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
4178 .await
4179 .is_err()
4180 );
4181
4182 sender.send(4).unwrap();
4183 sender.send(5).unwrap();
4184 sender.send(6).unwrap();
4185 sender.send(7).unwrap();
4186
4187 if drop_sender {
4188 drop(sender);
4189 }
4190
4191 if !coalesce {
4192 assert_eq!(receiver.recv().await.unwrap(), 4);
4193 assert_eq!(receiver.recv().await.unwrap(), 5);
4194 assert_eq!(receiver.recv().await.unwrap(), 6);
4195 }
4196 assert_eq!(receiver.recv().await.unwrap(), 7);
4197 if drop_sender {
4198 assert_matches!(
4199 receiver.recv().await.unwrap_err().kind(),
4200 MailboxErrorKind::Closed
4201 );
4202 assert_matches!(
4204 receiver.recv().await.unwrap_err().kind(),
4205 MailboxErrorKind::Closed
4206 );
4207 } else {
4208 assert!(
4209 tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
4210 .await
4211 .is_err()
4212 );
4213 }
4214 }
4215 }
4216
4217 #[tokio::test]
4218 async fn test_receiver_basic_default() {
4219 verify_receiver(false, false).await
4220 }
4221
4222 #[tokio::test]
4223 async fn test_receiver_basic_latest() {
4224 verify_receiver(true, false).await
4225 }
4226
4227 #[tokio::test]
4228 async fn test_receiver_after_sender_drop_default() {
4229 verify_receiver(false, true).await
4230 }
4231
4232 #[tokio::test]
4233 async fn test_receiver_after_sender_drop_latest() {
4234 verify_receiver(true, true).await
4235 }
4236
4237 struct Setup {
4238 receiver: PortReceiver<u64>,
4239 actor0: Instance<()>,
4240 actor1: Instance<()>,
4241 _actor0_handle: ActorHandle<()>,
4242 _actor1_handle: ActorHandle<()>,
4243 port_id: PortAddr,
4244 port_id1: PortAddr,
4245 port_id2: PortAddr,
4246 port_id2_1: PortAddr,
4247 }
4248
4249 async fn setup_split_port_ids(
4250 reducer_spec: Option<ReducerSpec>,
4251 reducer_mode: ReducerMode,
4252 ) -> Setup {
4253 let proc = Proc::isolated();
4254 let (actor0, actor0_handle) = proc.client("actor0").unwrap();
4255 let (actor1, actor1_handle) = proc.client("actor1").unwrap();
4256
4257 let (port_handle, receiver) = actor0.open_port::<u64>();
4259 let port_id = port_handle.bind().port_addr().clone();
4260
4261 let port_id1 = port_id
4263 .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
4264 .unwrap();
4265 let port_id2 = port_id
4266 .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
4267 .unwrap();
4268
4269 let port_id2_1 = port_id2
4271 .split(&actor1, reducer_spec, reducer_mode.clone(), true)
4272 .unwrap();
4273
4274 Setup {
4275 receiver,
4276 actor0,
4277 actor1,
4278 _actor0_handle: actor0_handle,
4279 _actor1_handle: actor1_handle,
4280 port_id,
4281 port_id1,
4282 port_id2,
4283 port_id2_1,
4284 }
4285 }
4286
4287 fn post(cx: &impl context::Actor, port_id: PortAddr, msg: u64) {
4288 let serialized = wirevalue::Any::serialize(&msg).unwrap();
4289 port_id.send(cx, serialized);
4290 }
4291
4292 #[async_timed_test(timeout_secs = 30)]
4293 #[cfg_attr(not(fbcode_build), ignore)]
4295 async fn test_split_port_id_no_reducer() {
4296 let Setup {
4297 mut receiver,
4298 actor0,
4299 actor1,
4300 port_id,
4301 port_id1,
4302 port_id2,
4303 port_id2_1,
4304 ..
4305 } = setup_split_port_ids(None, ReducerMode::default()).await;
4306 post(&actor0, port_id.clone(), 1);
4308 assert_eq!(receiver.recv().await.unwrap(), 1);
4309 post(&actor1, port_id1.clone(), 2);
4310 assert_eq!(receiver.recv().await.unwrap(), 2);
4311 post(&actor1, port_id2.clone(), 3);
4312 assert_eq!(receiver.recv().await.unwrap(), 3);
4313 post(&actor1, port_id2_1.clone(), 4);
4314 assert_eq!(receiver.recv().await.unwrap(), 4);
4315
4316 tokio::time::sleep(Duration::from_secs(2)).await;
4318 let msg = receiver.try_recv().unwrap();
4319 assert_eq!(msg, None);
4320 }
4321
4322 async fn wait_for(
4323 receiver: &mut PortReceiver<u64>,
4324 expected_size: usize,
4325 timeout_duration: Duration,
4326 ) -> anyhow::Result<Vec<u64>> {
4327 let mut messeges = vec![];
4328
4329 tokio::time::timeout(timeout_duration, async {
4330 loop {
4331 let msg = receiver.recv().await.unwrap();
4332 messeges.push(msg);
4333 if messeges.len() == expected_size {
4334 break;
4335 }
4336 }
4337 })
4338 .await?;
4339 Ok(messeges)
4340 }
4341
4342 #[async_timed_test(timeout_secs = 30)]
4343 async fn test_split_port_id_sum_reducer() {
4344 let config = hyperactor_config::global::lock();
4345 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
4346
4347 let sum_accumulator = accum::sum::<u64>();
4348 let reducer_spec = sum_accumulator.reducer_spec();
4349 let Setup {
4350 mut receiver,
4351 actor0,
4352 actor1,
4353 port_id,
4354 port_id1,
4355 port_id2,
4356 port_id2_1,
4357 ..
4358 } = setup_split_port_ids(reducer_spec, ReducerMode::default()).await;
4359 post(&actor0, port_id.clone(), 4);
4360 post(&actor1, port_id1.clone(), 2);
4361 post(&actor1, port_id2.clone(), 3);
4362 post(&actor1, port_id2_1.clone(), 1);
4363 let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
4364 .await
4365 .unwrap();
4366 messages.sort();
4369 assert_eq!(messages, vec![1, 2, 3, 4]);
4370
4371 tokio::time::sleep(Duration::from_secs(2)).await;
4373 let msg = receiver.try_recv().unwrap();
4374 assert_eq!(msg, None);
4375 }
4376
4377 #[async_timed_test(timeout_secs = 30)]
4378 #[cfg_attr(not(fbcode_build), ignore)]
4380 async fn test_split_port_id_every_n_messages() {
4381 let config = hyperactor_config::global::lock();
4382 let _config_guard =
4383 config.override_key(crate::config::SPLIT_MAX_BUFFER_AGE, Duration::from_mins(10));
4384 let proc = Proc::isolated();
4385 let (actor, _actor_handle) = proc.client("actor").unwrap();
4386 let (port_handle, mut receiver) = actor.open_port::<u64>();
4387 let port_id = port_handle.bind().port_addr().clone();
4388 let reducer_spec = accum::sum::<u64>().reducer_spec();
4390 let split_port_id = port_id
4391 .split(
4392 &actor,
4393 reducer_spec,
4394 ReducerMode::Streaming(accum::StreamingReducerOpts {
4395 max_update_interval: Some(Duration::from_mins(10)),
4396 initial_update_interval: Some(Duration::from_mins(10)),
4397 }),
4398 true,
4399 )
4400 .unwrap();
4401
4402 for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
4404 post(&actor, split_port_id.clone(), msg);
4405 }
4406 let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
4409 .await
4410 .unwrap();
4411 assert_eq!(messages, vec![15]);
4412
4413 tokio::time::sleep(Duration::from_secs(2)).await;
4416 let msg = receiver.try_recv().unwrap();
4417 assert_eq!(msg, None);
4418 }
4419
4420 #[async_timed_test(timeout_secs = 30)]
4421 async fn test_split_port_timeout_flush() {
4422 let config = hyperactor_config::global::lock();
4423 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
4424
4425 let Setup {
4426 mut receiver,
4427 actor0: _,
4428 actor1,
4429 port_id: _,
4430 port_id1,
4431 port_id2: _,
4432 port_id2_1: _,
4433 ..
4434 } = setup_split_port_ids(
4435 Some(accum::sum::<u64>().reducer_spec().unwrap()),
4436 ReducerMode::Streaming(accum::StreamingReducerOpts {
4437 max_update_interval: Some(Duration::from_millis(50)),
4438 initial_update_interval: Some(Duration::from_millis(50)),
4439 }),
4440 )
4441 .await;
4442
4443 post(&actor1, port_id1.clone(), 10);
4444 post(&actor1, port_id1.clone(), 20);
4445 post(&actor1, port_id1.clone(), 30);
4446
4447 tokio::time::sleep(Duration::from_millis(10)).await;
4449 let msg = receiver.try_recv().unwrap();
4450 assert_eq!(msg, None);
4451
4452 tokio::time::sleep(Duration::from_millis(100)).await;
4454
4455 let msg = receiver.recv().await.unwrap();
4457 assert_eq!(msg, 60); let msg = receiver.try_recv().unwrap();
4461 assert_eq!(msg, None);
4462 }
4463
4464 #[async_timed_test(timeout_secs = 30)]
4465 async fn test_split_port_timeout_and_size_flush() {
4466 let config = hyperactor_config::global::lock();
4467 let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
4468
4469 let Setup {
4470 mut receiver,
4471 actor0: _,
4472 actor1,
4473 port_id: _,
4474 port_id1,
4475 port_id2: _,
4476 port_id2_1: _,
4477 ..
4478 } = setup_split_port_ids(
4479 Some(accum::sum::<u64>().reducer_spec().unwrap()),
4480 ReducerMode::Streaming(accum::StreamingReducerOpts {
4481 max_update_interval: Some(Duration::from_millis(50)),
4482 initial_update_interval: Some(Duration::from_millis(50)),
4483 }),
4484 )
4485 .await;
4486
4487 post(&actor1, port_id1.clone(), 10);
4488 post(&actor1, port_id1.clone(), 20);
4489 post(&actor1, port_id1.clone(), 30);
4490 post(&actor1, port_id1.clone(), 40);
4491
4492 let msg = receiver.recv().await.unwrap();
4494 assert_eq!(msg, 60);
4495
4496 let msg = receiver.recv().await.unwrap();
4498 assert_eq!(msg, 40);
4499
4500 let msg = receiver.try_recv().unwrap();
4502 assert_eq!(msg, None);
4503 }
4504
4505 #[async_timed_test(timeout_secs = 30)]
4506 async fn test_split_port_once_mode_basic() {
4507 let proc = Proc::isolated();
4508 let (actor, _actor_handle) = proc.client("actor").unwrap();
4509 let (port_handle, mut receiver) = actor.open_port::<u64>();
4510 let port_id = port_handle.bind().port_addr().clone();
4511
4512 let reducer_spec = accum::sum::<u64>().reducer_spec();
4514 let split_port_id = port_id
4515 .split(&actor, reducer_spec, ReducerMode::Once(3), true)
4516 .unwrap();
4517
4518 post(&actor, split_port_id.clone(), 10);
4520 post(&actor, split_port_id.clone(), 20);
4521 post(&actor, split_port_id.clone(), 30);
4522
4523 let msg = receiver.recv().await.unwrap();
4525 assert_eq!(msg, 60); tokio::time::sleep(Duration::from_millis(100)).await;
4529 let msg = receiver.try_recv().unwrap();
4530 assert_eq!(msg, None);
4531 }
4532
4533 #[async_timed_test(timeout_secs = 30)]
4534 async fn test_split_port_once_mode_teardown() {
4535 let proc = Proc::isolated();
4536 let (actor, _actor_handle) = proc.client("actor").unwrap();
4537 let (port_handle, mut receiver) = actor.open_port::<u64>();
4538 let port_id = port_handle.bind().port_addr().clone();
4539
4540 let (undeliverable_handle, mut undeliverable_receiver) =
4542 undeliverable::new_undeliverable_port();
4543
4544 let reducer_spec = accum::sum::<u64>().reducer_spec();
4546 let split_port_id = port_id
4547 .split(&actor, reducer_spec, ReducerMode::Once(3), true)
4548 .unwrap();
4549
4550 post(&actor, split_port_id.clone(), 10);
4552 post(&actor, split_port_id.clone(), 20);
4553 post(&actor, split_port_id.clone(), 30);
4554
4555 let msg = receiver.recv().await.unwrap();
4557 assert_eq!(msg, 60); let serialized = wirevalue::Any::serialize(&100u64).unwrap();
4561 let envelope = MessageEnvelope::new(
4562 actor.mailbox().actor_addr().clone(),
4563 split_port_id.clone(),
4564 serialized,
4565 Flattrs::new(),
4566 );
4567 actor.mailbox().post(envelope, undeliverable_handle);
4568
4569 let undeliverable =
4571 tokio::time::timeout(Duration::from_secs(2), undeliverable_receiver.recv())
4572 .await
4573 .expect("should receive undeliverable message")
4574 .expect("receiver should not be closed");
4575
4576 let split_port_ref: PortAddr = split_port_id;
4578 assert_eq!(
4579 undeliverable
4580 .into_message()
4581 .expect("expected returned envelope")
4582 .dest(),
4583 &split_port_ref
4584 );
4585
4586 let msg = receiver.try_recv().unwrap();
4588 assert_eq!(msg, None);
4589 }
4590
4591 #[test]
4592 fn test_dial_mailbox_router_prefixes_empty() {
4593 assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
4594 }
4595
4596 #[test]
4597 fn test_dial_mailbox_router_prefixes_single_entry() {
4598 let router = DialMailboxRouter::new();
4599 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4600
4601 let prefixes: Vec<Addr> = router.prefixes().into_iter().collect();
4602 assert_eq!(prefixes.len(), 1);
4603 assert_eq!(prefixes[0], test_proc_ref("world0"));
4604 }
4605
4606 #[test]
4607 fn test_dial_mailbox_router_prefixes_no_overlap() {
4608 let router = DialMailboxRouter::new();
4609 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4610 router.bind(test_proc_ref("world1"), "unix!@2".parse().unwrap());
4611 router.bind(test_proc_ref("world2"), "unix!@3".parse().unwrap());
4612
4613 let mut prefixes: Vec<Addr> = router.prefixes().into_iter().collect();
4614 prefixes.sort();
4615
4616 let mut expected = vec![
4617 test_proc_ref("world0"),
4618 test_proc_ref("world1"),
4619 test_proc_ref("world2"),
4620 ];
4621 expected.sort();
4622
4623 assert_eq!(prefixes, expected);
4624 }
4625
4626 #[test]
4627 fn test_dial_mailbox_router_prefixes_with_overlaps() {
4628 let router = DialMailboxRouter::new();
4629 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4631 router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
4632 router.bind(test_proc_ref("world0_1"), "unix!@3".parse().unwrap());
4633 router.bind(test_proc_ref("world1"), "unix!@4".parse().unwrap());
4634 router.bind(test_proc_ref("world1_0"), "unix!@5".parse().unwrap());
4635
4636 let mut prefixes: Vec<Addr> = router.prefixes().into_iter().collect();
4637 prefixes.sort();
4638
4639 let mut expected = vec![
4640 test_proc_ref("world0"),
4641 test_proc_ref("world0_0"),
4642 test_proc_ref("world0_1"),
4643 test_proc_ref("world1"),
4644 test_proc_ref("world1_0"),
4645 ];
4646 expected.sort();
4647
4648 assert_eq!(prefixes, expected);
4649 }
4650
4651 #[test]
4652 fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
4653 let router = DialMailboxRouter::new();
4654 router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4656 router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
4657 router.bind(
4658 test_actor_ref("world0_0", "actor1"),
4659 "unix!@3".parse().unwrap(),
4660 );
4661 router.bind(test_proc_ref("world1_0"), "unix!@4".parse().unwrap());
4662 router.bind(test_proc_ref("world1_1"), "unix!@5".parse().unwrap());
4663 router.bind(
4664 test_actor_ref("world2_0", "actor0"),
4665 "unix!@6".parse().unwrap(),
4666 );
4667
4668 let mut prefixes: Vec<Addr> = router.prefixes().into_iter().collect();
4669 prefixes.sort();
4670
4671 let mut expected = vec![
4678 test_proc_ref("world0"),
4679 test_proc_ref("world0_0"),
4680 test_proc_ref("world1_0"),
4681 test_proc_ref("world1_1"),
4682 test_actor_ref("world2_0", "actor0"),
4683 ];
4684 expected.sort();
4685
4686 assert_eq!(prefixes, expected);
4687 }
4688
4689 #[test]
4690 fn test_dial_mailbox_router_prefixes_same_level() {
4691 let router = DialMailboxRouter::new();
4692 router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
4693 router.bind(test_proc_ref("world0_1"), "unix!@2".parse().unwrap());
4694 router.bind(test_proc_ref("world0_2"), "unix!@3".parse().unwrap());
4695
4696 let mut prefixes: Vec<Addr> = router.prefixes().into_iter().collect();
4697 prefixes.sort();
4698
4699 let mut expected = vec![
4701 test_proc_ref("world0_0"),
4702 test_proc_ref("world0_1"),
4703 test_proc_ref("world0_2"),
4704 ];
4705 expected.sort();
4706
4707 assert_eq!(prefixes, expected);
4708 }
4709
4710 #[derive(Clone, Debug)]
4714 struct AsyncLoopForwarder;
4715
4716 #[async_trait]
4717 impl MailboxSender for AsyncLoopForwarder {
4718 fn post_unchecked(
4719 &self,
4720 envelope: MessageEnvelope,
4721 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
4722 ) {
4723 let me = self.clone();
4724 tokio::spawn(async move {
4725 me.post(envelope, return_handle);
4727 });
4728 }
4729 }
4730
4731 #[tokio::test]
4732 async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
4733 let actor_id = test_actor_id("world_0", "ttl_actor");
4734 let (ret_port, mut ret_rx) = undeliverable::new_undeliverable_port();
4735
4736 let remote_actor = test_actor_id("remote_world_1", "remote");
4737 let dest = remote_actor.port_addr(4242.into());
4738
4739 let payload = 1234_u64;
4742 let envelope =
4743 MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Flattrs::new())
4744 .expect("serialize");
4745
4746 AsyncLoopForwarder.post(envelope, ret_port.clone());
4747
4748 let undelivered = tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
4750 .await
4751 .expect("timed out waiting for undeliverable")
4752 .expect("channel closed")
4753 .into_message()
4754 .expect("expected returned envelope");
4755
4756 let got: u64 = undelivered.deserialized().expect("deserialize");
4758 assert_eq!(got, payload, "payload preserved");
4759 }
4760
4761 #[tokio::test]
4762 async fn message_ttl_success_local_delivery() {
4763 let actor_id = test_actor_id("world_0", "ttl_actor");
4764 let mailbox = Mailbox::new(actor_id.clone());
4765 let (_undeliverable_tx, mut undeliverable_rx) =
4766 mailbox.bind_handler_port::<Undeliverable<MessageEnvelope>>();
4767
4768 let (user_port, mut user_rx) = mailbox.open_port::<u64>();
4770
4771 let payload = 0xC0FFEE_u64;
4773 let envelope = MessageEnvelope::serialize(
4774 actor_id.clone(),
4775 user_port.bind().port_addr().clone(),
4776 &payload,
4777 Flattrs::new(),
4778 )
4779 .expect("serialize");
4780
4781 let return_handle = mailbox
4784 .bound_return_handle()
4785 .unwrap_or(monitored_return_handle());
4786 mailbox.post(envelope, return_handle);
4787
4788 let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
4790 .await
4791 .expect("timed out waiting for local delivery")
4792 .expect("user port closed");
4793 assert_eq!(got, payload);
4794
4795 let no_undeliverable =
4797 tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
4798 assert!(
4799 no_undeliverable.is_err(),
4800 "unexpected undeliverable returned on successful local delivery"
4801 );
4802 }
4803
4804 #[tokio::test]
4805 async fn test_port_contramap() {
4806 let proc = Proc::isolated();
4807 let (client, _) = proc.client("client").unwrap();
4808 let (handle, mut rx) = client.open_port();
4809
4810 handle
4811 .contramap(|m| (1, m))
4812 .post(&client, "hello".to_string());
4813 assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
4814 }
4815
4816 #[test]
4817 #[should_panic(expected = "already bound")]
4818 fn test_bind_port_handle_to_handler_port_twice() {
4819 let mbox = Mailbox::new(test_actor_id("0", "test"));
4820 let (handle, _rx) = mbox.open_port::<String>();
4821 handle.bind_handler_port();
4822 handle.bind_handler_port();
4823 }
4824
4825 #[test]
4826 fn test_bind_port_handle_to_handler_port() {
4827 let mbox = Mailbox::new(test_actor_id("0", "test"));
4828 let default_port = mbox.actor_addr().port_addr(Port::from(String::port()));
4829 let (handle, _rx) = mbox.open_port::<String>();
4830 assert_ne!(default_port.index(), handle.inner.port_index);
4832 handle.bind_handler_port();
4834 assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4835 handle.bind();
4837 handle.bind();
4838 assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4839 }
4840
4841 #[test]
4842 #[should_panic(expected = "already bound")]
4843 fn test_bind_port_handle_to_handler_port_when_already_bound() {
4844 let mbox = Mailbox::new(test_actor_id("0", "test"));
4845 let (handle, _rx) = mbox.open_port::<String>();
4846 handle.bind();
4848 assert_matches!(handle.location(), PortLocation::Bound(port) if port.index() == handle.inner.port_index);
4849 handle.bind_handler_port();
4851 }
4852
4853 #[tokio::test]
4854 async fn test_mailbox_post_fails_when_actor_stopped() {
4855 let actor_id = test_actor_id("0", "stopped_actor");
4856
4857 let mailbox = Mailbox::new(actor_id.clone());
4858
4859 mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4860
4861 let (user_port, _user_rx) = mailbox.open_port::<u64>();
4862
4863 let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4866
4867 let envelope = MessageEnvelope::serialize(
4868 actor_id.clone(),
4869 user_port.bind().port_addr().clone(),
4870 &42u64,
4871 Flattrs::new(),
4872 )
4873 .expect("serialize");
4874
4875 mailbox.post(envelope, return_handle);
4876
4877 let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4878 .await
4879 .expect("timed out waiting for undeliverable")
4880 .expect("return port closed");
4881
4882 let err = undeliverable
4883 .into_message()
4884 .expect("expected returned envelope")
4885 .error_msg()
4886 .expect("expected error");
4887 assert!(
4888 err.contains(&format!("owner {} is stopped", actor_id)),
4889 "error should indicate actor stopped: {}",
4890 err
4891 );
4892 }
4893
4894 #[tokio::test]
4895 async fn test_mailbox_post_fails_when_actor_failed() {
4896 use crate::actor::ActorErrorKind;
4897
4898 let actor_id = test_actor_id("0", "failed_actor");
4899
4900 let mailbox = Mailbox::new(actor_id.clone());
4901
4902 let (user_port, _user_rx) = mailbox.open_port::<u64>();
4903
4904 mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4905 "test failure".to_string(),
4906 )));
4907
4908 let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4911
4912 let envelope = MessageEnvelope::serialize(
4913 actor_id.clone(),
4914 user_port.bind().port_addr().clone(),
4915 &42u64,
4916 Flattrs::new(),
4917 )
4918 .expect("serialize");
4919
4920 mailbox.post(envelope, return_handle);
4921
4922 let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4923 .await
4924 .expect("timed out waiting for undeliverable")
4925 .expect("return port closed");
4926
4927 let err = undeliverable
4928 .into_message()
4929 .expect("expected returned envelope")
4930 .error_msg()
4931 .expect("expected error");
4932 assert!(
4933 err.contains(&format!("owner {} failed", actor_id)),
4934 "error should indicate actor failed: {}",
4935 err
4936 );
4937 }
4938
4939 #[tokio::test]
4940 async fn test_port_handle_send_fails_when_actor_stopped() {
4941 let actor_id = test_actor_id("0", "stopped_actor");
4942
4943 let mailbox = Mailbox::new(actor_id.clone());
4944
4945 let (port_handle, _rx) = mailbox.open_port::<u64>();
4946 let proc = Proc::isolated();
4947 let (client, _) = proc.client("client").unwrap();
4948
4949 mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4950
4951 let err = port_handle.try_send(&client, 42u64).unwrap_err();
4952 assert_matches!(
4953 err.kind(),
4954 MailboxSenderErrorKind::Mailbox(mailbox_err)
4955 if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "test stop")
4956 );
4957 }
4958
4959 #[tokio::test]
4960 async fn test_port_handle_send_fails_when_actor_failed() {
4961 use crate::actor::ActorErrorKind;
4962
4963 let actor_id = test_actor_id("0", "failed_actor");
4964
4965 let mailbox = Mailbox::new(actor_id.clone());
4966
4967 let (port_handle, _rx) = mailbox.open_port::<u64>();
4968 let proc = Proc::isolated();
4969 let (client, _) = proc.client("client").unwrap();
4970
4971 mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4972 "test failure".to_string(),
4973 )));
4974
4975 let err = port_handle.try_send(&client, 42u64).unwrap_err();
4976 assert_matches!(
4977 err.kind(),
4978 MailboxSenderErrorKind::Mailbox(mailbox_err)
4979 if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg))) if msg == "test failure")
4980 );
4981 }
4982
4983 #[async_timed_test(timeout_secs = 30)]
4984 async fn test_open_reduce_port() {
4985 let proc = Proc::isolated();
4986 let (client, _) = proc.client("client").unwrap();
4987
4988 let (port_handle, receiver) = client.mailbox().open_reduce_port(accum::sum::<u64>());
4990
4991 let port_ref = port_handle.bind();
4993 assert!(port_ref.reducer_spec().is_some());
4994
4995 port_ref.post(&client, 42);
4997
4998 let result = receiver.recv().await.unwrap();
5000 assert_eq!(result, 42);
5001 }
5002
5003 #[async_timed_test(timeout_secs = 30)]
5004 async fn test_open_reduce_port_reducer_spec_preserved() {
5005 let proc = Proc::isolated();
5006 let (client, _) = proc.client("client").unwrap();
5007
5008 let (sum_handle, _) = client.mailbox().open_reduce_port(accum::sum::<u64>());
5010 let sum_ref = sum_handle.bind();
5011 let sum_typehash = sum_ref.reducer_spec().as_ref().unwrap().typehash;
5012
5013 let (max_handle, _) = client
5014 .mailbox()
5015 .open_reduce_port(accum::join_semilattice::<accum::Max<u64>>());
5016 let max_ref = max_handle.bind();
5017 let max_typehash = max_ref.reducer_spec().as_ref().unwrap().typehash;
5018
5019 assert_ne!(sum_typehash, max_typehash);
5021 }
5022
5023 #[tokio::test]
5028 async fn test_flush_over_unix_channel() {
5029 let mbox = Mailbox::new(test_actor_id("0", "actor0"));
5030
5031 let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
5033 let serve_handle = mbox.clone().serve(rx);
5034
5035 let client = MailboxClient::dial(addr).unwrap();
5037
5038 let (port, mut receiver) = mbox.open_port::<u64>();
5040 let port = port.bind();
5041
5042 for i in 0..10u64 {
5044 client
5045 .serialize_and_send(&port, i, monitored_return_handle())
5046 .unwrap();
5047 }
5048
5049 client.flush().await.unwrap();
5052
5053 for i in 0..10u64 {
5055 let msg = receiver
5056 .try_recv()
5057 .expect("message should be available after flush")
5058 .expect("receiver should not be empty after flush");
5059 assert_eq!(msg, i);
5060 }
5061
5062 serve_handle.stop("test done");
5063 serve_handle.await.unwrap().unwrap();
5064 }
5065
5066 #[test]
5067 fn test_drain_waits_for_active_handler_enqueue() {
5068 let mailbox = Mailbox::new(test_actor_id("drain", "actor"));
5069 let (entered_tx, entered_rx) = std::sync::mpsc::channel();
5070 let release = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
5071 let delivered = Arc::new(AtomicUsize::new(0));
5072
5073 let port = mailbox.open_handler_enqueue_port({
5074 let release = Arc::clone(&release);
5075 let delivered = Arc::clone(&delivered);
5076 move |_headers, _message: u64| {
5077 entered_tx.send(()).unwrap();
5078 let (lock, cvar) = &*release;
5079 let mut released = lock.lock().unwrap();
5080 while !*released {
5081 released = cvar.wait(released).unwrap();
5082 }
5083 delivered.fetch_add(1, Ordering::SeqCst);
5084 Ok(())
5085 }
5086 });
5087
5088 let sender = port.inner.sender.clone();
5089 let sender_thread = std::thread::spawn(move || sender.send(Flattrs::new(), 1u64).unwrap());
5090 entered_rx.recv_timeout(Duration::from_secs(1)).unwrap();
5091
5092 let (drained_tx, drained_rx) = std::sync::mpsc::channel();
5093 let drain_thread = std::thread::spawn({
5094 let mailbox = mailbox.clone();
5095 move || {
5096 mailbox.drain();
5097 drained_tx.send(()).unwrap();
5098 }
5099 });
5100
5101 let deadline = std::time::Instant::now() + Duration::from_secs(1);
5102 while mailbox.inner.handler_ingress.state.load(Ordering::Acquire) & HANDLER_INGRESS_DRAINING
5103 == 0
5104 {
5105 assert!(std::time::Instant::now() < deadline, "drain did not start");
5106 std::thread::yield_now();
5107 }
5108 assert_matches!(
5109 drained_rx.try_recv(),
5110 Err(std::sync::mpsc::TryRecvError::Empty)
5111 );
5112
5113 let (lock, cvar) = &*release;
5114 *lock.lock().unwrap() = true;
5115 cvar.notify_all();
5116
5117 sender_thread.join().unwrap();
5118 drained_rx.recv_timeout(Duration::from_secs(1)).unwrap();
5119 drain_thread.join().unwrap();
5120 assert_eq!(delivered.load(Ordering::SeqCst), 1);
5121
5122 let err = port.inner.sender.send(Flattrs::new(), 2u64).unwrap_err();
5123 assert!(err.is::<HandlerPortClosedError>());
5124 }
5125
5126 fn drive_abandonment_log(payload_sentinel: &str) -> (crate::ActorAddr, crate::PortAddr) {
5132 use hyperactor_config::declare_attrs;
5133
5134 declare_attrs! {
5135 attr UM_TEST_HEADER: u64;
5138 }
5139
5140 let sender = test_actor_id("um_proc", "um_sender");
5141 let dest = test_port_id("um_dest_proc", "um_dest", 42);
5142
5143 let mut headers = Flattrs::new();
5144 headers.set(UM_TEST_HEADER, 0xC0FFEEu64);
5145
5146 let envelope = MessageEnvelope::new(
5147 sender.clone(),
5148 dest.clone(),
5149 wirevalue::Any::serialize(&payload_sentinel.to_string()).unwrap(),
5150 headers,
5151 );
5152
5153 let (return_handle, _rx) = crate::mailbox::undeliverable::new_undeliverable_port();
5154 UndeliverableMailboxSender.post_unchecked(envelope, return_handle);
5155 (sender, dest)
5156 }
5157
5158 #[tracing_test::traced_test]
5162 #[test]
5163 fn test_um1_bounded_fields() {
5164 let payload_sentinel = "um1_payload_sentinel_5b7a9c3d";
5165 let (_sender, _dest) = drive_abandonment_log(payload_sentinel);
5166
5167 let buf = tracing_test::internal::global_buf().lock().unwrap();
5168 let logs = std::str::from_utf8(&buf).expect("logs are utf-8");
5169
5170 assert!(
5172 logs.contains("message_type="),
5173 "UM-1: expected message_type field, got:\n{logs}"
5174 );
5175 assert!(
5176 logs.contains("data_len="),
5177 "UM-1: expected data_len field, got:\n{logs}"
5178 );
5179 assert!(
5182 !logs.contains(payload_sentinel),
5183 "UM-1: payload body leaked into the log:\n{logs}"
5184 );
5185 assert!(
5189 !logs.contains(" headers="),
5190 "UM-1: unbounded headers field leaked into the log:\n{logs}"
5191 );
5192 }
5193
5194 #[tracing_test::traced_test]
5198 #[test]
5199 fn test_um2_compat_fields_preserved() {
5200 let (sender, _dest) = drive_abandonment_log("um2_payload");
5201
5202 let buf = tracing_test::internal::global_buf().lock().unwrap();
5203 let logs = std::str::from_utf8(&buf).expect("logs are utf-8");
5204
5205 let actor_name = sender.log_name();
5208 let actor_id = sender.to_string();
5209 assert!(
5210 logs.contains("actor_name=") && logs.contains(actor_name),
5211 "UM-2: expected actor_name={actor_name} on the log, got:\n{logs}"
5212 );
5213 assert!(
5214 logs.contains("actor_id=") && logs.contains(&actor_id),
5215 "UM-2: expected actor_id={actor_id} on the log, got:\n{logs}"
5216 );
5217 }
5218
5219 #[tracing_test::traced_test]
5221 #[test]
5222 fn test_um3_destination_format() {
5223 let (_sender, dest) = drive_abandonment_log("um3_payload");
5224
5225 let buf = tracing_test::internal::global_buf().lock().unwrap();
5226 let logs = std::str::from_utf8(&buf).expect("logs are utf-8");
5227
5228 assert!(
5229 logs.contains(&format!("message not delivered to {}", dest)),
5230 "UM-3: expected destination-naming format string, got:\n{logs}"
5231 );
5232 }
5233
5234 #[tracing_test::traced_test]
5238 #[test]
5239 fn test_um3b_operation_format_with_operation_context() {
5240 let sender = test_actor_id("um_proc", "um_sender");
5241 let dest = test_port_id("um_dest_proc", "um_dest", 42);
5242
5243 let mut headers = Flattrs::new();
5244 headers.set(
5245 headers::OPERATION_ENDPOINT,
5246 "training.buffer.sample()".to_string(),
5247 );
5248 headers.set(headers::OPERATION_ADVERB, "call_one".to_string());
5249
5250 let envelope = MessageEnvelope::new(
5251 sender,
5252 dest,
5253 wirevalue::Any::serialize(&"um3b_payload".to_string()).unwrap(),
5254 headers,
5255 );
5256
5257 let (return_handle, _rx) = crate::mailbox::undeliverable::new_undeliverable_port();
5258 UndeliverableMailboxSender.post_unchecked(envelope, return_handle);
5259
5260 let buf = tracing_test::internal::global_buf().lock().unwrap();
5261 let logs = std::str::from_utf8(&buf).expect("logs are utf-8");
5262
5263 assert!(
5264 logs.contains("abandoned message for training.buffer.sample()"),
5265 "UM-3b: expected operation-naming format string, got:\n{logs}"
5266 );
5267 assert!(
5268 logs.contains("endpoint=") && logs.contains("training.buffer.sample()"),
5269 "UM-3b: expected endpoint field with the caller's operation, got:\n{logs}"
5270 );
5271 assert!(
5272 logs.contains("adverb=") && logs.contains("call_one"),
5273 "UM-3b: expected adverb field, got:\n{logs}"
5274 );
5275 assert!(
5278 !logs.contains("message not delivered to"),
5279 "UM-3b: unexpected destination-naming format string:\n{logs}"
5280 );
5281 }
5282}