hyperactor/
mailbox.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mailboxes are the central message-passing mechanism in Hyperactor.
10//!
11//! Each actor owns a mailbox to which other actors can deliver messages.
12//! An actor can open one or more typed _ports_ in the mailbox; messages
13//! are in turn delivered to specific ports.
14//!
15//! Mailboxes are associated with an [`ActorId`] (given by `actor_id`
16//! in the following example):
17//!
18//! ```
19//! # use hyperactor::mailbox::Mailbox;
20//! # use hyperactor::reference::{ActorId, ProcId, WorldId};
21//! # tokio_test::block_on(async {
22//! # let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
23//! # let actor_id = ActorId(proc_id, "actor".to_string(), 0);
24//! let mbox = Mailbox::new_detached(actor_id);
25//! let (port, mut receiver) = mbox.open_port::<u64>();
26//!
27//! port.send(123).unwrap();
28//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
29//! # })
30//! ```
31//!
32//! Mailboxes also provide a form of one-shot ports, called [`OncePort`],
33//! that permits at most one message transmission:
34//!
35//! ```
36//! # use hyperactor::mailbox::Mailbox;
37//! # use hyperactor::reference::{ActorId, ProcId, WorldId};
38//! # tokio_test::block_on(async {
39//! # let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
40//! # let actor_id = ActorId(proc_id, "actor".to_string(), 0);
41//! let mbox = Mailbox::new_detached(actor_id);
42//!
43//! let (port, receiver) = mbox.open_once_port::<u64>();
44//!
45//! port.send(123u64).unwrap();
46//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
47//! # })
48//! ```
49//!
50//! [`OncePort`]s are correspondingly used for RPC replies in the actor
51//! system.
52//!
53//! ## Remote ports and serialization
54//!
55//! Mailboxes allow delivery of serialized messages to named ports:
56//!
57//! 1) Ports restrict message types to (serializable) [`Message`]s.
58//! 2) Each [`Port`] is associated with a [`PortId`] which globally names the port.
59//! 3) [`Mailbox`] provides interfaces to deliver serialized
60//!    messages to ports named by their [`PortId`].
61//!
62//! While this complicates the interface somewhat, it allows the
63//! implementation to avoid a serialization roundtrip when passing
64//! messages locally.
65
66#![allow(dead_code)] // Allow until this is used outside of tests.
67
68use std::any::Any;
69use std::collections::BTreeMap;
70use std::fmt;
71use std::fmt::Debug;
72use std::future::Future;
73use std::ops::Bound::Excluded;
74use std::pin::Pin;
75use std::sync::Arc;
76use std::sync::LazyLock;
77use std::sync::Mutex;
78use std::sync::OnceLock;
79use std::sync::RwLock;
80use std::sync::Weak;
81use std::sync::atomic::AtomicU64;
82use std::sync::atomic::AtomicUsize;
83use std::sync::atomic::Ordering;
84use std::task::Context;
85use std::task::Poll;
86
87use async_trait::async_trait;
88use dashmap::DashMap;
89use dashmap::DashSet;
90use dashmap::mapref::entry::Entry;
91use futures::Sink;
92use futures::Stream;
93use serde::Deserialize;
94use serde::Serialize;
95use serde::de::DeserializeOwned;
96use tokio::sync::mpsc;
97use tokio::sync::oneshot;
98use tokio::sync::watch;
99use tokio::task::JoinHandle;
100use tokio_util::sync::CancellationToken;
101
102use crate as hyperactor; // for macros
103use crate::Named;
104use crate::OncePortRef;
105use crate::PortRef;
106use crate::accum;
107use crate::accum::Accumulator;
108use crate::accum::ReducerSpec;
109use crate::actor::Signal;
110use crate::actor::remote::USER_PORT_OFFSET;
111use crate::attrs::Attrs;
112use crate::cap;
113use crate::cap::CanSend;
114use crate::channel;
115use crate::channel::ChannelAddr;
116use crate::channel::ChannelError;
117use crate::channel::SendError;
118use crate::channel::TxStatus;
119use crate::data::Serialized;
120use crate::id;
121use crate::metrics;
122use crate::reference::ActorId;
123use crate::reference::PortId;
124use crate::reference::Reference;
125
126mod undeliverable;
127/// For [`Undeliverable`], a message type for delivery failures.
128pub use undeliverable::Undeliverable;
129pub use undeliverable::UndeliverableMessageError;
130pub use undeliverable::custom_monitored_return_handle;
131pub use undeliverable::monitored_return_handle; // TODO: Audit
132pub use undeliverable::supervise_undeliverable_messages;
133/// For [`MailboxAdminMessage`], a message type for mailbox administration.
134pub mod mailbox_admin_message;
135pub use mailbox_admin_message::MailboxAdminMessage;
136pub use mailbox_admin_message::MailboxAdminMessageHandler;
137/// For [`DurableMailboxSender`] a sender with a write-ahead log.
138pub mod durable_mailbox_sender;
139pub use durable_mailbox_sender::log;
140use durable_mailbox_sender::log::*;
141
142/// Message collects the necessary requirements for messages that are deposited
143/// into mailboxes.
144pub trait Message: Debug + Send + Sync + 'static {}
145impl<M: Debug + Send + Sync + 'static> Message for M {}
146
147/// RemoteMessage extends [`Message`] by requiring that the messages
148/// also be serializable, and can thus traverse process boundaries.
149/// RemoteMessages must also specify a globally unique type name (a URI).
150pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
151
152impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
153
154/// Type alias for bytestring data used throughout the system.
155pub type Data = Vec<u8>;
156
157/// Delivery errors occur during message posting.
158#[derive(
159    thiserror::Error,
160    Debug,
161    Serialize,
162    Deserialize,
163    Named,
164    Clone,
165    PartialEq
166)]
167pub enum DeliveryError {
168    /// The destination address is not reachable.
169    #[error("address not routable: {0}")]
170    Unroutable(String),
171
172    /// A broken link indicates that a link in the message
173    /// delivery path has failed.
174    #[error("broken link: {0}")]
175    BrokenLink(String),
176
177    /// A (local) mailbox delivery error.
178    #[error("mailbox error: {0}")]
179    Mailbox(String),
180}
181
182/// An envelope that carries a message destined to a remote actor.
183/// The envelope contains a serialized message along with its destination
184/// and sender.
185#[derive(Debug, Serialize, Deserialize, Clone, Named)]
186pub struct MessageEnvelope {
187    /// The sender of this message.
188    sender: ActorId,
189
190    /// The destination of the message.
191    dest: PortId,
192
193    /// The serialized message.
194    data: Serialized,
195
196    /// Error contains a delivery error when message delivery failed.
197    error: Option<DeliveryError>,
198
199    /// Additional context for this message.
200    headers: Attrs,
201    // TODO: add typename, source, seq, TTL, etc.
202}
203
204impl MessageEnvelope {
205    /// Create a new envelope with the provided sender, destination, and message.
206    pub fn new(sender: ActorId, dest: PortId, data: Serialized, headers: Attrs) -> Self {
207        Self {
208            sender,
209            dest,
210            data,
211            error: None,
212            headers,
213        }
214    }
215
216    /// Create a new envelope whose sender ID is unknown.
217    pub(crate) fn new_unknown(dest: PortId, data: Serialized) -> Self {
218        Self::new(id!(unknown[0].unknown), dest, data, Attrs::new())
219    }
220
221    /// Construct a new serialized value by serializing the provided T-typed value.
222    pub fn serialize<T: Serialize + Named>(
223        source: ActorId,
224        dest: PortId,
225        value: &T,
226        headers: Attrs,
227    ) -> Result<Self, bincode::Error> {
228        Ok(Self {
229            headers,
230            data: Serialized::serialize(value)?,
231            sender: source,
232            dest,
233            error: None,
234        })
235    }
236
237    /// Deserialize the message in the envelope to the provided type T.
238    pub fn deserialized<T: DeserializeOwned>(&self) -> Result<T, anyhow::Error> {
239        self.data.deserialized()
240    }
241
242    /// The serialized message.
243    pub fn data(&self) -> &Serialized {
244        &self.data
245    }
246
247    /// The message sender.
248    pub fn sender(&self) -> &ActorId {
249        &self.sender
250    }
251
252    /// The destination of the message.
253    pub fn dest(&self) -> &PortId {
254        &self.dest
255    }
256
257    /// The message headers.
258    pub fn headers(&self) -> &Attrs {
259        &self.headers
260    }
261
262    /// Tells whether this is a signal message.
263    pub fn is_signal(&self) -> bool {
264        self.dest.index() == Signal::port()
265    }
266
267    /// Tries to sets a delivery error for the message. If the error is already
268    /// set, nothing is updated.
269    pub fn try_set_error(&mut self, error: DeliveryError) {
270        if self.error.is_none() {
271            self.error = Some(error);
272        }
273    }
274
275    /// The message has been determined to be undeliverable with the
276    /// provided error. Mark the envelope with the error and return to
277    /// sender.
278    pub fn undeliverable(
279        mut self,
280        error: DeliveryError,
281        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
282    ) {
283        metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
284            1,
285            hyperactor_telemetry::kv_pairs!(
286                "actor_id" => self.sender.to_string(),
287                "dest_actor_id" => self.dest.0.to_string(),
288                "message_type" => self.data.typename().unwrap_or("unknown"),
289                "error_type" =>  error.to_string(),
290            ),
291        );
292
293        self.try_set_error(error);
294        undeliverable::return_undeliverable(return_handle, self);
295    }
296
297    /// Get the error of why this message was undeliverable. None means this
298    /// message was not determined as undeliverable.
299    pub fn error(&self) -> Option<&DeliveryError> {
300        self.error.as_ref()
301    }
302
303    fn open(self) -> (MessageMetadata, Serialized) {
304        let Self {
305            sender,
306            dest,
307            data,
308            error,
309            headers,
310        } = self;
311
312        (
313            MessageMetadata {
314                sender,
315                dest,
316                error,
317                headers,
318            },
319            data,
320        )
321    }
322
323    fn seal(metadata: MessageMetadata, data: Serialized) -> Self {
324        let MessageMetadata {
325            sender,
326            dest,
327            error,
328            headers,
329        } = metadata;
330
331        Self {
332            sender,
333            dest,
334            data,
335            error,
336            headers,
337        }
338    }
339}
340
341impl fmt::Display for MessageEnvelope {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        match &self.error {
344            None => write!(f, "{} > {}: {}", self.sender, self.dest, self.data),
345            Some(err) => write!(
346                f,
347                "{} > {}: {}: delivery error: {}",
348                self.sender, self.dest, self.data, err
349            ),
350        }
351    }
352}
353
354/// Metadata about a message sent via a MessageEnvelope.
355#[derive(Clone)]
356pub struct MessageMetadata {
357    sender: ActorId,
358    dest: PortId,
359    error: Option<DeliveryError>,
360    headers: Attrs,
361}
362
363/// Errors that occur during mailbox operations. Each error is associated
364/// with the mailbox's actor id.
365#[derive(Debug)]
366pub struct MailboxError {
367    actor_id: ActorId,
368    kind: MailboxErrorKind,
369}
370
371/// The kinds of mailbox errors. This enum is marked non-exhaustive to
372/// allow for extensibility.
373#[derive(thiserror::Error, Debug)]
374#[non_exhaustive]
375pub enum MailboxErrorKind {
376    /// An operation was attempted on a closed mailbox.
377    #[error("mailbox closed")]
378    Closed,
379
380    /// The port associated with an operation was invalid.
381    #[error("invalid port: {0}")]
382    InvalidPort(PortId),
383
384    /// There was no sender associated with the port.
385    #[error("no sender for port: {0}")]
386    NoSenderForPort(PortId),
387
388    /// There was no local sender associated with the port.
389    /// Returned by operations that require a local port.
390    #[error("no local sender for port: {0}")]
391    NoLocalSenderForPort(PortId),
392
393    /// The port was closed.
394    #[error("{0}: port closed")]
395    PortClosed(PortId),
396
397    /// An error occured during a send operation.
398    #[error("send {0}: {1}")]
399    Send(PortId, #[source] anyhow::Error),
400
401    /// An error occured during a receive operation.
402    #[error("recv {0}: {1}")]
403    Recv(PortId, #[source] anyhow::Error),
404
405    /// There was a serialization failure.
406    #[error("serialize: {0}")]
407    Serialize(#[source] anyhow::Error),
408
409    /// There was a deserialization failure.
410    #[error("deserialize {0}: {1}")]
411    Deserialize(&'static str, anyhow::Error),
412
413    /// There was an error during a channel operation.
414    #[error(transparent)]
415    Channel(#[from] ChannelError),
416}
417
418impl MailboxError {
419    /// Create a new mailbox error associated with the provided actor
420    /// id and of the given kind.
421    pub fn new(actor_id: ActorId, kind: MailboxErrorKind) -> Self {
422        Self { actor_id, kind }
423    }
424
425    /// The ID of the mailbox producing this error.
426    pub fn actor_id(&self) -> &ActorId {
427        &self.actor_id
428    }
429
430    /// The error's kind.
431    pub fn kind(&self) -> &MailboxErrorKind {
432        &self.kind
433    }
434}
435
436impl fmt::Display for MailboxError {
437    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438        write!(f, "{}: ", self.actor_id)?;
439        fmt::Display::fmt(&self.kind, f)
440    }
441}
442
443impl std::error::Error for MailboxError {
444    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
445        self.kind.source()
446    }
447}
448
449/// PortLocation describes the location of a port.
450/// This is used in errors to provide a uniform data type
451/// for ports that may or may not be bound.
452#[derive(Debug, Clone)]
453pub enum PortLocation {
454    /// The port was bound: the location is its underlying bound ID.
455    Bound(PortId),
456    /// The port was not bound: we provide the actor ID and the message type.
457    Unbound(ActorId, &'static str),
458}
459
460impl PortLocation {
461    fn new_unbound<M: Message>(actor_id: ActorId) -> Self {
462        PortLocation::Unbound(actor_id, std::any::type_name::<M>())
463    }
464
465    fn new_unbound_type(actor_id: ActorId, ty: &'static str) -> Self {
466        PortLocation::Unbound(actor_id, ty)
467    }
468
469    /// The actor id of the location.
470    pub fn actor_id(&self) -> &ActorId {
471        match self {
472            PortLocation::Bound(port_id) => port_id.actor_id(),
473            PortLocation::Unbound(actor_id, _) => actor_id,
474        }
475    }
476}
477
478impl fmt::Display for PortLocation {
479    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
480        match self {
481            PortLocation::Bound(port_id) => write!(f, "{}", port_id),
482            PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
483        }
484    }
485}
486
487/// Errors that that occur during mailbox sending operations. Each error
488/// is associated with the port ID of the operation.
489#[derive(Debug)]
490pub struct MailboxSenderError {
491    location: PortLocation,
492    kind: MailboxSenderErrorKind,
493}
494
495/// The kind of mailbox sending errors.
496#[derive(thiserror::Error, Debug)]
497pub enum MailboxSenderErrorKind {
498    /// Error during serialization.
499    #[error("serialization error: {0}")]
500    Serialize(anyhow::Error),
501
502    /// Error during deserialization.
503    #[error("deserialization error for type {0}: {1}")]
504    Deserialize(&'static str, anyhow::Error),
505
506    /// A send to an invalid port.
507    #[error("invalid port")]
508    Invalid,
509
510    /// A send to a closed port.
511    #[error("port closed")]
512    Closed,
513
514    // The following pass through underlying errors:
515    /// An underlying mailbox error.
516    #[error(transparent)]
517    Mailbox(#[from] MailboxError),
518
519    /// An underlying channel error.
520    #[error(transparent)]
521    Channel(#[from] ChannelError),
522
523    /// An underlying message log error.
524    #[error(transparent)]
525    MessageLog(#[from] MessageLogError),
526
527    /// An other, uncategorized error.
528    #[error("send error: {0}")]
529    Other(#[from] anyhow::Error),
530
531    /// The destination was unreachable.
532    #[error("unreachable: {0}")]
533    Unreachable(anyhow::Error),
534}
535
536impl MailboxSenderError {
537    /// Create a new mailbox sender error to an unbound port.
538    pub fn new_unbound<M>(actor_id: ActorId, kind: MailboxSenderErrorKind) -> Self {
539        Self {
540            location: PortLocation::Unbound(actor_id, std::any::type_name::<M>()),
541            kind,
542        }
543    }
544
545    /// Create a new mailbox sender, manually providing the type.
546    pub fn new_unbound_type(
547        actor_id: ActorId,
548        kind: MailboxSenderErrorKind,
549        ty: &'static str,
550    ) -> Self {
551        Self {
552            location: PortLocation::Unbound(actor_id, ty),
553            kind,
554        }
555    }
556
557    /// Create a new mailbox sender error with the provided port ID and kind.
558    pub fn new_bound(port_id: PortId, kind: MailboxSenderErrorKind) -> Self {
559        Self {
560            location: PortLocation::Bound(port_id),
561            kind,
562        }
563    }
564
565    /// The location at which the error occured.
566    pub fn location(&self) -> &PortLocation {
567        &self.location
568    }
569
570    /// The kind associated with the error.
571    pub fn kind(&self) -> &MailboxSenderErrorKind {
572        &self.kind
573    }
574}
575
576impl fmt::Display for MailboxSenderError {
577    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578        write!(f, "{}: ", self.location)?;
579        fmt::Display::fmt(&self.kind, f)
580    }
581}
582
583impl std::error::Error for MailboxSenderError {
584    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
585        self.kind.source()
586    }
587}
588
589/// MailboxSenders can send messages through ports to mailboxes. It
590/// provides a unified interface for message delivery in the system.
591pub trait MailboxSender: Send + Sync + Debug + Any {
592    /// TODO: consider making this publicly inaccessible. While the trait itself
593    /// needs to be public, its only purpose for the end-user API is to provide
594    /// the typed messaging APIs from (Once)PortRef and ActorRef.
595    fn post(
596        &self,
597        envelope: MessageEnvelope,
598        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
599    );
600}
601
602// PortSender is an extension trait so that we can include generics,
603// making the API end-to-end typesafe.
604
605/// PortSender extends [`MailboxSender`] by providing typed endpoints
606/// for sending messages over ports.
607pub trait PortSender: MailboxSender {
608    /// Deliver a message to the provided port.
609    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
610    fn serialize_and_send<M: RemoteMessage>(
611        &self,
612        port: &PortRef<M>,
613        message: M,
614        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
615    ) -> Result<(), MailboxSenderError> {
616        // TODO: convert this to a undeliverable error also
617        let serialized = Serialized::serialize(&message).map_err(|err| {
618            MailboxSenderError::new_bound(
619                port.port_id().clone(),
620                MailboxSenderErrorKind::Serialize(err.into()),
621            )
622        })?;
623        self.post(
624            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
625            return_handle,
626        );
627        Ok(())
628    }
629
630    /// Deliver a message to a one-shot port, consuming the provided port,
631    /// which is not reusable.
632    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
633    fn serialize_and_send_once<M: RemoteMessage>(
634        &self,
635        once_port: OncePortRef<M>,
636        message: M,
637        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
638    ) -> Result<(), MailboxSenderError> {
639        let serialized = Serialized::serialize(&message).map_err(|err| {
640            MailboxSenderError::new_bound(
641                once_port.port_id().clone(),
642                MailboxSenderErrorKind::Serialize(err.into()),
643            )
644        })?;
645        self.post(
646            MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
647            return_handle,
648        );
649        Ok(())
650    }
651}
652
653impl<T: ?Sized + MailboxSender> PortSender for T {}
654
655/// A perpetually closed mailbox sender. Panics if any messages are posted.
656/// Useful for tests, or where there is no meaningful mailbox sender
657/// implementation available.
658#[derive(Debug, Clone)]
659pub struct PanickingMailboxSender;
660
661impl MailboxSender for PanickingMailboxSender {
662    fn post(
663        &self,
664        envelope: MessageEnvelope,
665        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
666    ) {
667        panic!("panic! in the mailbox! attempted post: {}", envelope)
668    }
669}
670
671/// A mailbox sender for undeliverable messages. This will simply record
672/// any undelivered messages.
673#[derive(Debug)]
674pub struct UndeliverableMailboxSender;
675
676impl MailboxSender for UndeliverableMailboxSender {
677    fn post(
678        &self,
679        envelope: MessageEnvelope,
680        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
681    ) {
682        tracing::error!("message not delivered: {}", envelope);
683    }
684}
685
686#[derive(Debug)]
687struct Buffer<T: Message> {
688    queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
689    processed: watch::Receiver<usize>,
690    seq: AtomicUsize,
691}
692
693impl<T: Message> Buffer<T> {
694    fn new<Fut>(
695        process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
696    ) -> Self
697    where
698        Fut: Future<Output = ()> + Send + 'static,
699    {
700        let (queue, mut next) = mpsc::unbounded_channel();
701        let (last_processed, processed) = watch::channel(0);
702        crate::init::get_runtime().spawn(async move {
703            let mut seq = 0;
704            while let Some((msg, return_handle)) = next.recv().await {
705                process(msg, return_handle).await;
706                seq += 1;
707                let _ = last_processed.send(seq);
708            }
709        });
710        Self {
711            queue,
712            processed,
713            seq: AtomicUsize::new(0),
714        }
715    }
716
717    fn send(
718        &self,
719        item: (T, PortHandle<Undeliverable<T>>),
720    ) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
721        self.seq.fetch_add(1, Ordering::SeqCst);
722        self.queue.send(item)?;
723        Ok(())
724    }
725
726    async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
727        let seq = self.seq.load(Ordering::SeqCst);
728        while *self.processed.borrow_and_update() < seq {
729            self.processed.changed().await?;
730        }
731        Ok(())
732    }
733}
734
735static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
736    LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
737
738/// Convenience boxing implementation for MailboxSender. Most APIs
739/// are parameterized on MailboxSender implementations, and it's thus
740/// difficult to work with dyn values.  BoxedMailboxSender bridges this
741/// gap by providing a concrete MailboxSender which dispatches using an
742/// underlying (boxed) dyn.
743#[derive(Debug, Clone)]
744pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
745
746impl BoxedMailboxSender {
747    /// Create a new boxed sender given the provided sender implementation.
748    pub fn new(sender: impl MailboxSender + 'static) -> Self {
749        Self(Arc::new(sender))
750    }
751
752    /// Attempts to downcast the inner sender to the given concrete
753    /// type.
754    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
755        (&*self.0 as &dyn Any).downcast_ref::<T>()
756    }
757}
758
759/// Extension trait that creates a boxed clone of a MailboxSender.
760pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
761    /// A boxed clone of this MailboxSender.
762    fn boxed(&self) -> BoxedMailboxSender;
763}
764impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
765    fn boxed(&self) -> BoxedMailboxSender {
766        BoxedMailboxSender::new(self.clone())
767    }
768}
769
770/// Extension trait that rehomes a MailboxSender into a BoxedMailboxSender.
771pub trait IntoBoxedMailboxSender: MailboxSender {
772    /// Rehome this MailboxSender into a BoxedMailboxSender.
773    fn into_boxed(self) -> BoxedMailboxSender;
774}
775impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
776    fn into_boxed(self) -> BoxedMailboxSender {
777        BoxedMailboxSender::new(self)
778    }
779}
780
781impl MailboxSender for BoxedMailboxSender {
782    fn post(
783        &self,
784        envelope: MessageEnvelope,
785        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
786    ) {
787        metrics::MAILBOX_POSTS.add(
788            1,
789            hyperactor_telemetry::kv_pairs!(
790                "actor_id" => envelope.sender.to_string(),
791                "dest_actor_id" => envelope.dest.0.to_string(),
792            ),
793        );
794        self.0.post(envelope, return_handle);
795    }
796}
797
798/// Errors that occur during mailbox serving.
799#[derive(thiserror::Error, Debug)]
800pub enum MailboxServerError {
801    /// An underlying channel error.
802    #[error(transparent)]
803    Channel(#[from] ChannelError),
804
805    /// An underlying mailbox sender error.
806    #[error(transparent)]
807    MailboxSender(#[from] MailboxSenderError),
808}
809
810/// Represents a running [`MailboxServer`]. The handle composes a
811/// ['tokio::task::JoinHandle'] and may be joined in the same manner.
812#[derive(Debug)]
813pub struct MailboxServerHandle {
814    join_handle: JoinHandle<Result<(), MailboxServerError>>,
815    stopped_tx: watch::Sender<bool>,
816}
817
818impl MailboxServerHandle {
819    /// Signal the server to stop serving the mailbox. The caller should
820    /// join the handle by awaiting the [`MailboxServerHandle`] future.
821    ///
822    /// Stop should be called at most once.
823    pub fn stop(&self, reason: &str) {
824        tracing::info!("stopping mailbox server; reason: {}", reason);
825        self.stopped_tx.send(true).expect("stop called twice");
826    }
827}
828
829/// Forward future implementation to underlying handle.
830impl Future for MailboxServerHandle {
831    type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
832
833    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
834        // SAFETY: This is safe to do because self is pinned.
835        let join_handle_pinned =
836            unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
837        join_handle_pinned.poll(cx)
838    }
839}
840
841// A `MailboxServer` (such as a router) can can receive a message
842// that couldn't reach its destination. We can use the fact that
843// servers are `MailboxSender`s to attempt to forward them back to
844// their senders.
845fn server_return_handle<T: MailboxServer>(server: T) -> PortHandle<Undeliverable<MessageEnvelope>> {
846    let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
847
848    tokio::task::spawn(async move {
849        while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
850            if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
851            {
852                // A non-returnable undeliverable.
853                UndeliverableMailboxSender.post(e, monitored_return_handle());
854                continue;
855            }
856            envelope.try_set_error(DeliveryError::BrokenLink(
857                "message was undeliverable".to_owned(),
858            ));
859            server.post(
860                MessageEnvelope::new(
861                    envelope.sender().clone(),
862                    PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
863                        envelope.sender(),
864                    )
865                    .port_id()
866                    .clone(),
867                    Serialized::serialize(&Undeliverable(envelope)).unwrap(),
868                    Attrs::new(),
869                ),
870                monitored_return_handle(),
871            );
872        }
873    });
874
875    return_handle
876}
877
878/// Serve a port on the provided [`channel::Rx`]. This dispatches all
879/// channel messages directly to the port.
880pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
881    /// Serve the provided port on the given channel on this sender on
882    /// a background task which may be joined with the returned handle.
883    /// The task fails on any send error.
884    fn serve(
885        self,
886        mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
887    ) -> MailboxServerHandle {
888        // A `MailboxServer` can can receive a message that couldn't
889        // reach its destination. We can use the fact that servers are
890        // `MailboxSender`s to attempt to forward them back to their
891        // senders.
892        let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
893        let server = self.clone();
894        tokio::task::spawn(async move {
895            while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
896                if let Ok(Undeliverable(e)) =
897                    envelope.deserialized::<Undeliverable<MessageEnvelope>>()
898                {
899                    // A non-returnable undeliverable.
900                    UndeliverableMailboxSender.post(e, monitored_return_handle());
901                    continue;
902                }
903                envelope.try_set_error(DeliveryError::BrokenLink(
904                    "message was undeliverable".to_owned(),
905                ));
906                server.post(
907                    MessageEnvelope::new(
908                        envelope.sender().clone(),
909                        PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
910                            envelope.sender(),
911                        )
912                        .port_id()
913                        .clone(),
914                        Serialized::serialize(&Undeliverable(envelope)).unwrap(),
915                        Attrs::new(),
916                    ),
917                    monitored_return_handle(),
918                );
919            }
920        });
921
922        let (stopped_tx, mut stopped_rx) = watch::channel(false);
923        let join_handle = tokio::spawn(async move {
924            let mut detached = false;
925
926            loop {
927                if *stopped_rx.borrow_and_update() {
928                    break Ok(());
929                }
930
931                tokio::select! {
932                    message = rx.recv() => {
933                        match message {
934                            // Relay the message to the port directly.
935                            Ok(envelope) => self.post(envelope, return_handle.clone()),
936
937                            // Closed is a "graceful" error in this case.
938                            // We simply stop serving.
939                            Err(ChannelError::Closed) => break Ok(()),
940                            Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
941                        }
942                    }
943                    result = stopped_rx.changed(), if !detached  => {
944                        tracing::debug!(
945                            "the mailbox server is stopped"
946                        );
947                        detached = result.is_err();
948                    }
949                }
950            }
951        });
952
953        MailboxServerHandle {
954            join_handle,
955            stopped_tx,
956        }
957    }
958}
959
960impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
961
962/// A mailbox server client that transmits messages on a Tx channel.
963#[derive(Debug)]
964pub struct MailboxClient {
965    // The unbounded sender.
966    buffer: Buffer<MessageEnvelope>,
967
968    // To cancel monitoring tx health.
969    _tx_monitoring: CancellationToken,
970}
971
972impl MailboxClient {
973    /// Create a new client that sends messages destined for a
974    /// [`MailboxServer`] on the provided Tx channel.
975    pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
976        let addr = tx.addr();
977        let tx = Arc::new(tx);
978        let tx_status = tx.status().clone();
979        let tx_monitoring = CancellationToken::new();
980        let buffer = Buffer::new(move |envelope, return_handle| {
981            let tx = Arc::clone(&tx);
982            let (return_channel, return_receiver) = oneshot::channel();
983            // Set up for delivery failure.
984            let return_handle_0 = return_handle.clone();
985            tokio::spawn(async move {
986                let result = return_receiver.await;
987                if let Ok(message) = result {
988                    let _ = return_handle_0.send(Undeliverable(message));
989                } else {
990                    // Sender dropped, this task can end.
991                }
992            });
993            // Send the message for transmission.
994            let return_handle_1 = return_handle.clone();
995            async move {
996                if let Err(SendError(_, envelope)) = tx.try_post(envelope, return_channel) {
997                    // Failed to enqueue.
998                    envelope.undeliverable(
999                        DeliveryError::BrokenLink("failed to enqueue in MailboxClient".to_string()),
1000                        return_handle_1.clone(),
1001                    );
1002                }
1003            }
1004        });
1005        let this = Self {
1006            buffer,
1007            _tx_monitoring: tx_monitoring.clone(),
1008        };
1009        Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1010        this
1011    }
1012
1013    // Set up a watch for the tx's health.
1014    fn monitor_tx_health(
1015        mut rx: watch::Receiver<TxStatus>,
1016        cancel_token: CancellationToken,
1017        addr: ChannelAddr,
1018    ) {
1019        crate::init::get_runtime().spawn(async move {
1020            loop {
1021                tokio::select! {
1022                    changed = rx.changed() => {
1023                        if changed.is_err() || *rx.borrow() == TxStatus::Closed {
1024                            tracing::warn!("connection to {} lost", addr);
1025                            // TODO: Potential for supervision event
1026                            // interaction here.
1027                            break;
1028                        }
1029                    }
1030                    _ = cancel_token.cancelled() => {
1031                        break;
1032                    }
1033                }
1034            }
1035        });
1036    }
1037}
1038
1039impl MailboxSender for MailboxClient {
1040    fn post(
1041        &self,
1042        envelope: MessageEnvelope,
1043        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1044    ) {
1045        // tracing::trace!(name = "post", "posting message to {}", envelope.dest);
1046        tracing::event!(target:"messages", tracing::Level::DEBUG, "crc"=envelope.data.crc(), "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1047
1048        if let Err(mpsc::error::SendError((envelope, return_handle))) =
1049            self.buffer.send((envelope, return_handle))
1050        {
1051            let err = DeliveryError::BrokenLink("failed to enqueue in MailboxClient".to_string());
1052            metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
1053                1,
1054                hyperactor_telemetry::kv_pairs!(
1055                    "actor_id" => envelope.sender.to_string(),
1056                    "dest_actor_id" => envelope.dest.0.to_string(),
1057                    "message_type" => envelope.data.typename().unwrap_or("unknown"),
1058                    "reason" => err.to_string(),
1059                ),
1060            );
1061
1062            // Failed to enqueue.
1063            envelope.undeliverable(err, return_handle);
1064        }
1065    }
1066}
1067
1068/// Wrapper to turn `PortRef` into a `Sink`.
1069pub struct PortSink<C: CanSend, M: RemoteMessage> {
1070    caps: C,
1071    port: PortRef<M>,
1072}
1073
1074impl<C: CanSend, M: RemoteMessage> PortSink<C, M> {
1075    /// Create new PortSink
1076    pub fn new(caps: C, port: PortRef<M>) -> Self {
1077        Self { caps, port }
1078    }
1079}
1080
1081impl<C: CanSend, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1082    type Error = MailboxSenderError;
1083
1084    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1085        Poll::Ready(Ok(()))
1086    }
1087
1088    fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1089        self.port.send(&self.caps, item)
1090    }
1091
1092    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1093        Poll::Ready(Ok(()))
1094    }
1095
1096    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1097        Poll::Ready(Ok(()))
1098    }
1099}
1100
1101/// A mailbox coordinates message delivery to actors through typed
1102/// [`Port`]s associated with the mailbox.
1103#[derive(Clone, Debug)]
1104pub struct Mailbox {
1105    inner: Arc<State>,
1106}
1107
1108impl Mailbox {
1109    /// Create a new mailbox associated with the provided actor ID, using the provided
1110    /// forwarder for external destinations.
1111    pub fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
1112        Self {
1113            inner: Arc::new(State::new(actor_id, forwarder)),
1114        }
1115    }
1116
1117    /// Create a new detached mailbox associated with the provided actor ID.
1118    pub fn new_detached(actor_id: ActorId) -> Self {
1119        Self {
1120            inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1121        }
1122    }
1123
1124    /// The actor id associated with this mailbox.
1125    pub fn actor_id(&self) -> &ActorId {
1126        &self.inner.actor_id
1127    }
1128
1129    /// Open a new port that accepts M-typed messages. The returned
1130    /// port may be freely cloned, serialized, and passed around. The
1131    /// returned receiver should only be retained by the actor responsible
1132    /// for processing the delivered messages.
1133    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1134        let port_index = self.inner.allocate_port();
1135        let (sender, receiver) = mpsc::unbounded_channel::<M>();
1136        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1137        tracing::trace!(
1138            name = "open_port",
1139            "opening port for {} at {}",
1140            self.inner.actor_id,
1141            port_id
1142        );
1143        (
1144            PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1145            PortReceiver::new(receiver, port_id, /*coalesce=*/ false, self.clone()),
1146        )
1147    }
1148
1149    /// Open a new port with an accumulator. This port accepts A::Update type
1150    /// messages, accumulate them into A::State with the given accumulator.
1151    /// The latest changed state can be received from the returned receiver as
1152    /// a single A::State message. If there is no new update, the receiver will
1153    /// not receive any message.
1154    pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1155    where
1156        A: Accumulator + Send + Sync + 'static,
1157        A::Update: Message,
1158        A::State: Message + Default + Clone,
1159    {
1160        let port_index = self.inner.allocate_port();
1161        let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1162        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1163        let state = Mutex::new(A::State::default());
1164        let reducer_spec = accum.reducer_spec();
1165        let enqueue = move |_, update: A::Update| {
1166            let mut state = state.lock().unwrap();
1167            accum.accumulate(&mut state, update)?;
1168            let _ = sender.send(state.clone());
1169            Ok(())
1170        };
1171        (
1172            PortHandle {
1173                mailbox: self.clone(),
1174                port_index,
1175                sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1176                bound: Arc::new(OnceLock::new()),
1177                reducer_spec,
1178            },
1179            PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()),
1180        )
1181    }
1182
1183    /// Open a port that accepts M-typed messages, using the provided function
1184    /// to enqueue.
1185    // TODO: consider making lifetime bound to Self instead.
1186    pub(crate) fn open_enqueue_port<M: Message>(
1187        &self,
1188        enqueue: impl Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1189    ) -> PortHandle<M> {
1190        PortHandle {
1191            mailbox: self.clone(),
1192            port_index: self.inner.allocate_port(),
1193            sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1194            bound: Arc::new(OnceLock::new()),
1195            reducer_spec: None,
1196        }
1197    }
1198
1199    /// Open a new one-shot port that accepts M-typed messages. The
1200    /// returned port may be used to send a single message; ditto the
1201    /// receiver may receive a single message.
1202    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1203        let port_index = self.inner.allocate_port();
1204        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1205        let (sender, receiver) = oneshot::channel::<M>();
1206        (
1207            OncePortHandle {
1208                mailbox: self.clone(),
1209                port_index,
1210                port_id: port_id.clone(),
1211                sender,
1212            },
1213            OncePortReceiver {
1214                receiver: Some(receiver),
1215                port_id,
1216                mailbox: self.clone(),
1217            },
1218        )
1219    }
1220
1221    fn error(&self, err: MailboxErrorKind) -> MailboxError {
1222        MailboxError::new(self.inner.actor_id.clone(), err)
1223    }
1224
1225    fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1226        let port_index = M::port();
1227        self.inner.ports.get(&port_index).and_then(|boxed| {
1228            boxed
1229                .as_any()
1230                .downcast_ref::<UnboundedSender<M>>()
1231                .map(|s| {
1232                    assert_eq!(
1233                        s.port_id,
1234                        self.actor_id().port_id(port_index),
1235                        "port_id mismatch in downcasted UnboundedSender"
1236                    );
1237                    s.sender.clone()
1238                })
1239        })
1240    }
1241
1242    /// Retrieve the bound undeliverable message port handle.
1243    pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1244        self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1245            .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1246    }
1247
1248    fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
1249        assert_eq!(
1250            handle.mailbox.actor_id(),
1251            self.actor_id(),
1252            "port does not belong to mailbox"
1253        );
1254
1255        // TODO: don't even allocate a port until the port is bound. Possibly
1256        // have handles explicitly staged (unbound, bound).
1257        let port_id = self.actor_id().port_id(handle.port_index);
1258        match self.inner.ports.entry(handle.port_index) {
1259            Entry::Vacant(entry) => {
1260                entry.insert(Box::new(UnboundedSender::new(
1261                    handle.sender.clone(),
1262                    port_id.clone(),
1263                )));
1264            }
1265            Entry::Occupied(_entry) => {}
1266        }
1267
1268        PortRef::attest(port_id)
1269    }
1270
1271    fn bind_to<M: RemoteMessage>(&self, handle: &PortHandle<M>, port_index: u64) {
1272        assert_eq!(
1273            handle.mailbox.actor_id(),
1274            self.actor_id(),
1275            "port does not belong to mailbox"
1276        );
1277
1278        let port_id = self.actor_id().port_id(port_index);
1279        match self.inner.ports.entry(port_index) {
1280            Entry::Vacant(entry) => {
1281                entry.insert(Box::new(UnboundedSender::new(
1282                    handle.sender.clone(),
1283                    port_id,
1284                )));
1285            }
1286            Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1287        }
1288    }
1289
1290    fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1291        let port_id = handle.port_id().clone();
1292        match self.inner.ports.entry(handle.port_index) {
1293            Entry::Vacant(entry) => {
1294                entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1295            }
1296            Entry::Occupied(_entry) => {}
1297        }
1298    }
1299
1300    fn bind_untyped(&self, port_id: &PortId, sender: UntypedUnboundedSender) {
1301        assert_eq!(
1302            port_id.actor_id(),
1303            self.actor_id(),
1304            "port does not belong to mailbox"
1305        );
1306
1307        match self.inner.ports.entry(port_id.index()) {
1308            Entry::Vacant(entry) => {
1309                entry.insert(Box::new(sender));
1310            }
1311            Entry::Occupied(_entry) => {}
1312        }
1313    }
1314}
1315
1316// TODO: figure out what to do with these interfaces -- possibly these caps
1317// do not have to be private.
1318
1319/// Open a port given a capability.
1320pub fn open_port<M: Message>(caps: &impl cap::CanOpenPort) -> (PortHandle<M>, PortReceiver<M>) {
1321    caps.mailbox().open_port()
1322}
1323
1324/// Open a one-shot port given a capability. This is a public method primarily to
1325/// enable macro-generated clients.
1326pub fn open_once_port<M: Message>(
1327    caps: &impl cap::CanOpenPort,
1328) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1329    caps.mailbox().open_once_port()
1330}
1331
1332impl MailboxSender for Mailbox {
1333    /// Deliver a serialized message to the provided port ID. This method fails
1334    /// if the message does not deserialize into the expected type.
1335    fn post(
1336        &self,
1337        envelope: MessageEnvelope,
1338        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1339    ) {
1340        tracing::trace!(name = "post", "posting message to {}", envelope.dest);
1341
1342        if envelope.dest().actor_id() != &self.inner.actor_id {
1343            return self.inner.forwarder.post(envelope, return_handle);
1344        }
1345
1346        match self.inner.ports.entry(envelope.dest().index()) {
1347            Entry::Vacant(_) => {
1348                let err = DeliveryError::Unroutable("port not bound in mailbox".to_string());
1349                metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
1350                    1,
1351                    hyperactor_telemetry::kv_pairs!(
1352                        "actor_id" => envelope.sender.to_string(),
1353                        "dest_actor_id" => envelope.dest.0.to_string(),
1354                        "message_type" => envelope.data.typename().unwrap_or("unknown"),
1355                        "reason" => err.to_string(),
1356                    ),
1357                );
1358
1359                envelope.undeliverable(err, return_handle);
1360            }
1361            Entry::Occupied(entry) => {
1362                let (metadata, data) = envelope.open();
1363                let MessageMetadata {
1364                    headers,
1365                    sender,
1366                    dest,
1367                    error: metadata_error,
1368                } = metadata;
1369
1370                // We use the entry API here so that we can remove the
1371                // entry while holding an (entry) reference. The DashMap
1372                // documentation suggests that deadlocks are possible
1373                // "when holding any sort of reference into the map",
1374                // but surely this applies only to the same thread? This
1375                // would also imply we have to be careful holding any
1376                // sort of reference across .await points.
1377                match entry.get().send_serialized(headers, data) {
1378                    Ok(false) => {
1379                        entry.remove();
1380                    }
1381                    Ok(true) => (),
1382                    Err(SerializedSenderError {
1383                        data,
1384                        error: sender_error,
1385                        headers,
1386                    }) => {
1387                        let err = DeliveryError::Mailbox(format!("{}", sender_error));
1388                        metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
1389                            1,
1390                            hyperactor_telemetry::kv_pairs!(
1391                                "actor_id" => sender.to_string(),
1392                                "dest_actor_id" => dest.0.to_string(),
1393                                "message_type" => data.typename().unwrap_or("unknown"),
1394                                "reason" => err.to_string(),
1395                            ),
1396                        );
1397
1398                        MessageEnvelope::seal(
1399                            MessageMetadata {
1400                                headers,
1401                                sender,
1402                                dest,
1403                                error: metadata_error,
1404                            },
1405                            data,
1406                        )
1407                        .undeliverable(err, return_handle)
1408                    }
1409                }
1410            }
1411        }
1412    }
1413}
1414
1415// Tracks mailboxes that have emitted a `CanSend::post` warning due to
1416// missing an `Undeliverable<MessageEnvelope>` binding. In this
1417// context, mailboxes are few and long-lived; unbounded growth is not
1418// a realistic concern.
1419static CAN_SEND_WARNED_MAILBOXES: OnceLock<DashSet<ActorId>> = OnceLock::new();
1420
1421impl cap::sealed::CanSend for Mailbox {
1422    fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1423        let return_handle = self.bound_return_handle().unwrap_or_else(|| {
1424            let actor_id = self.actor_id();
1425            if CAN_SEND_WARNED_MAILBOXES
1426                .get_or_init(DashSet::new)
1427                .insert(actor_id.clone())
1428            {
1429                let bt = std::backtrace::Backtrace::force_capture();
1430                tracing::warn!(
1431                    actor_id = ?actor_id,
1432                    backtrace = ?bt,
1433                    "mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
1434                );
1435            }
1436            monitored_return_handle()
1437        });
1438
1439        let envelope = MessageEnvelope::new(self.actor_id().clone(), dest, data, headers);
1440        MailboxSender::post(self, envelope, return_handle);
1441    }
1442    fn actor_id(&self) -> &ActorId {
1443        self.actor_id()
1444    }
1445}
1446impl cap::sealed::CanSend for &Mailbox {
1447    fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1448        cap::sealed::CanSend::post(*self, dest, headers, data)
1449    }
1450    fn actor_id(&self) -> &ActorId {
1451        (**self).actor_id()
1452    }
1453}
1454
1455impl cap::sealed::CanOpenPort for &Mailbox {
1456    fn mailbox(&self) -> &Mailbox {
1457        self
1458    }
1459}
1460
1461impl cap::sealed::CanOpenPort for Mailbox {
1462    fn mailbox(&self) -> &Mailbox {
1463        self
1464    }
1465}
1466
1467#[derive(Default)]
1468struct SplitPortBuffer(Vec<Serialized>);
1469
1470impl SplitPortBuffer {
1471    /// Push a new item to the buffer, and optionally return any items that should
1472    /// be flushed.
1473    fn push(&mut self, serialized: Serialized) -> Option<Vec<Serialized>> {
1474        let limit = crate::config::global::get(crate::config::SPLIT_MAX_BUFFER_SIZE);
1475
1476        self.0.push(serialized);
1477        if self.0.len() >= limit {
1478            Some(std::mem::take(&mut self.0))
1479        } else {
1480            None
1481        }
1482    }
1483}
1484
1485impl cap::sealed::CanSplitPort for Mailbox {
1486    fn split(&self, port_id: PortId, reducer_spec: Option<ReducerSpec>) -> anyhow::Result<PortId> {
1487        fn post(mailbox: &Mailbox, port_id: PortId, msg: Serialized) {
1488            mailbox.post(
1489                MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg, Attrs::new()),
1490                // TODO(pzhang) figure out how to use upstream's return handle,
1491                // instead of getting a new one like this.
1492                // This is okay for now because upstream is currently also using
1493                // the same handle singleton, but that could change in the future.
1494                monitored_return_handle(),
1495            );
1496        }
1497
1498        let port_index = self.inner.allocate_port();
1499        let split_port = self.actor_id().port_id(port_index);
1500        let mailbox = self.clone();
1501        let reducer = reducer_spec
1502            .map(
1503                |ReducerSpec {
1504                     typehash,
1505                     builder_params,
1506                 }| { accum::resolve_reducer(typehash, builder_params) },
1507            )
1508            .transpose()?
1509            .flatten();
1510        let enqueue: Box<
1511            dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync,
1512        > = match reducer {
1513            None => Box::new(move |serialized: Serialized| {
1514                post(&mailbox, port_id.clone(), serialized);
1515                Ok(())
1516            }),
1517            Some(r) => {
1518                let buffer = Mutex::new(SplitPortBuffer::default());
1519                Box::new(move |serialized: Serialized| {
1520                    // Hold the lock until messages are sent. This is to avoid another
1521                    // invocation of this method trying to send message concurrently and
1522                    // cause messages delivered out of order.
1523                    let mut buf = buffer.lock().unwrap();
1524                    if let Some(buffered) = buf.push(serialized) {
1525                        let reduced = r.reduce_updates(buffered).map_err(|(e, mut b)| {
1526                            (
1527                                b.pop()
1528                                    .expect("there should be at least one update from buffer"),
1529                                e,
1530                            )
1531                        })?;
1532                        post(&mailbox, port_id.clone(), reduced);
1533                    }
1534                    Ok(())
1535                })
1536            }
1537        };
1538        self.bind_untyped(
1539            &split_port,
1540            UntypedUnboundedSender {
1541                sender: enqueue,
1542                port_id: split_port.clone(),
1543            },
1544        );
1545        Ok(split_port)
1546    }
1547}
1548
1549/// A port to which M-typed messages can be delivered. Ports may be
1550/// serialized to be sent to other actors. However, when a port is
1551/// deserialized, it may no longer be used to send messages directly
1552/// to a mailbox since it is no longer associated with a local mailbox
1553/// ([`Mailbox::send`] will fail). However, the runtime may accept
1554/// remote Ports, and arrange for these messages to be delivered
1555/// indirectly through inter-node message passing.
1556#[derive(Debug)]
1557pub struct PortHandle<M: Message> {
1558    mailbox: Mailbox,
1559    port_index: u64,
1560    sender: UnboundedPortSender<M>,
1561    // We would like this to be a Arc<OnceLock<PortRef<M>>>, but we cannot
1562    // write down the type PortRef<M> (M: Message), even though we cannot
1563    // legally construct such a value without M: RemoteMessage. We could consider
1564    // making PortRef<M> valid for M: Message, but constructible only for
1565    // M: RemoteMessage, but the guarantees offered by the impossibilty of even
1566    // writing down the type are appealing.
1567    bound: Arc<OnceLock<PortId>>,
1568    // Typehash of an optional reducer. When it's defined, we include it in port
1569    /// references to optionally enable incremental accumulation.
1570    reducer_spec: Option<ReducerSpec>,
1571}
1572
1573impl<M: Message> PortHandle<M> {
1574    fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1575        Self {
1576            mailbox,
1577            port_index,
1578            sender,
1579            bound: Arc::new(OnceLock::new()),
1580            reducer_spec: None,
1581        }
1582    }
1583
1584    fn location(&self) -> PortLocation {
1585        match self.bound.get() {
1586            Some(port_id) => PortLocation::Bound(port_id.clone()),
1587            None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1588        }
1589    }
1590
1591    /// Send a message to this port.
1592    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
1593    pub fn send(&self, message: M) -> Result<(), MailboxSenderError> {
1594        self.sender.send(Attrs::new(), message).map_err(|err| {
1595            MailboxSenderError::new_unbound::<M>(
1596                self.mailbox.actor_id().clone(),
1597                MailboxSenderErrorKind::Other(err),
1598            )
1599        })
1600    }
1601}
1602
1603impl<M: RemoteMessage> PortHandle<M> {
1604    /// Bind this port, making it accessible to remote actors.
1605    pub fn bind(&self) -> PortRef<M> {
1606        PortRef::attest_reducible(
1607            self.bound
1608                .get_or_init(|| self.mailbox.bind(self).port_id().clone())
1609                .clone(),
1610            self.reducer_spec.clone(),
1611        )
1612    }
1613
1614    /// Bind to a specific port index. This is used by [`actor::Binder`] implementations to
1615    /// bind actor refs. This is not intended for general use.
1616    pub fn bind_to(&self, port_index: u64) {
1617        self.mailbox.bind_to(self, port_index);
1618    }
1619}
1620
1621impl<M: Message> Clone for PortHandle<M> {
1622    fn clone(&self) -> Self {
1623        Self {
1624            mailbox: self.mailbox.clone(),
1625            port_index: self.port_index,
1626            sender: self.sender.clone(),
1627            bound: self.bound.clone(),
1628            reducer_spec: self.reducer_spec.clone(),
1629        }
1630    }
1631}
1632
1633impl<M: Message> fmt::Display for PortHandle<M> {
1634    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1635        fmt::Display::fmt(&self.location(), f)
1636    }
1637}
1638
1639/// A one-shot port handle to which M-typed messages can be delivered.
1640#[derive(Debug)]
1641pub struct OncePortHandle<M: Message> {
1642    mailbox: Mailbox,
1643    port_index: u64,
1644    port_id: PortId,
1645    sender: oneshot::Sender<M>,
1646}
1647
1648impl<M: Message> OncePortHandle<M> {
1649    /// This port's ID.
1650    // TODO: make value
1651    pub fn port_id(&self) -> &PortId {
1652        &self.port_id
1653    }
1654
1655    /// Send a message to this port. The send operation will consume the
1656    /// port handle, as the port accepts at most one message.
1657    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
1658    pub fn send(self, message: M) -> Result<(), MailboxSenderError> {
1659        let actor_id = self.mailbox.actor_id().clone();
1660        self.sender.send(message).map_err(|_| {
1661            // Here, the value is returned when the port is
1662            // closed.  We should consider having a similar
1663            // API for send_once, though arguably it makes less
1664            // sense in this context.
1665            MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1666        })?;
1667        Ok(())
1668    }
1669}
1670
1671impl<M: RemoteMessage> OncePortHandle<M> {
1672    /// Turn this handle into a ref that may be passed to
1673    /// a remote actor. The remote actor can then use the
1674    /// ref to send a message to the port. Creating a ref also
1675    /// binds the port, so that it is remotely writable.
1676    pub fn bind(self) -> OncePortRef<M> {
1677        let port_id = self.port_id().clone();
1678        self.mailbox.clone().bind_once(self);
1679        OncePortRef::attest(port_id)
1680    }
1681}
1682
1683impl<M: Message> fmt::Display for OncePortHandle<M> {
1684    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1685        fmt::Display::fmt(&self.port_id(), f)
1686    }
1687}
1688
1689/// A receiver of M-typed messages, used by actors to receive messages
1690/// on open ports.
1691#[derive(Debug)]
1692pub struct PortReceiver<M> {
1693    receiver: mpsc::UnboundedReceiver<M>,
1694    port_id: PortId,
1695    /// When multiple messages are put in channel, only receive the latest one
1696    /// if coalesce is true. Other messages will be discarded.
1697    coalesce: bool,
1698    /// State is used to remove the port from service when the receiver
1699    /// is dropped.
1700    mailbox: Mailbox,
1701}
1702
1703impl<M> PortReceiver<M> {
1704    fn new(
1705        receiver: mpsc::UnboundedReceiver<M>,
1706        port_id: PortId,
1707        coalesce: bool,
1708        mailbox: Mailbox,
1709    ) -> Self {
1710        Self {
1711            receiver,
1712            port_id,
1713            coalesce,
1714            mailbox,
1715        }
1716    }
1717
1718    /// Tries to receive the next value for this receiver.
1719    /// This function returns `Ok(None)` if the receiver is empty
1720    /// and returns a MailboxError if the receiver is disconnected.
1721    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxError`.
1722    pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
1723        let mut next = self.receiver.try_recv();
1724        // To coalesce, drain the mpsc queue and only keep the last one.
1725        if self.coalesce {
1726            if let Some(latest) = self.drain().pop() {
1727                next = Ok(latest);
1728            }
1729        }
1730        match next {
1731            Ok(msg) => Ok(Some(msg)),
1732            Err(mpsc::error::TryRecvError::Empty) => Ok(None),
1733            Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
1734                self.actor_id().clone(),
1735                MailboxErrorKind::Closed,
1736            )),
1737        }
1738    }
1739
1740    /// Receive the next message from the port corresponding with this
1741    /// receiver.
1742    pub async fn recv(&mut self) -> Result<M, MailboxError> {
1743        let mut next = self.receiver.recv().await;
1744        // To coalesce, get the last message from the queue if there are
1745        // more on the mspc queue.
1746        if self.coalesce {
1747            if let Some(latest) = self.drain().pop() {
1748                next = Some(latest);
1749            }
1750        }
1751        next.ok_or(MailboxError::new(
1752            self.actor_id().clone(),
1753            MailboxErrorKind::Closed,
1754        ))
1755    }
1756
1757    /// Drains all available messages from the port.
1758    pub fn drain(&mut self) -> Vec<M> {
1759        let mut drained: Vec<M> = Vec::new();
1760        while let Ok(msg) = self.receiver.try_recv() {
1761            // To coalesce, discard the old message if there is any.
1762            if self.coalesce {
1763                drained.pop();
1764            }
1765            drained.push(msg);
1766        }
1767        drained
1768    }
1769
1770    fn port(&self) -> u64 {
1771        self.port_id.1
1772    }
1773
1774    fn actor_id(&self) -> &ActorId {
1775        &self.port_id.0
1776    }
1777}
1778
1779impl<M> Drop for PortReceiver<M> {
1780    fn drop(&mut self) {
1781        // MARIUS: do we need to tombstone these? or should we
1782        // error out if we have removed the receiver before serializing the port ref?
1783        // ("no longer live")?
1784        self.mailbox.inner.ports.remove(&self.port());
1785    }
1786}
1787
1788impl<M> Stream for PortReceiver<M> {
1789    type Item = Result<M, MailboxError>;
1790
1791    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1792        std::pin::pin!(self.recv()).poll(cx).map(Some)
1793    }
1794}
1795
1796/// A receiver of M-typed messages from [`OncePort`]s.
1797pub struct OncePortReceiver<M> {
1798    receiver: Option<oneshot::Receiver<M>>,
1799    port_id: PortId,
1800
1801    /// Mailbox is used to remove the port from service when the receiver
1802    /// is dropped.
1803    mailbox: Mailbox,
1804}
1805
1806impl<M> OncePortReceiver<M> {
1807    /// Receive message from the one-shot port associated with this
1808    /// receiver.  Recv consumes the receiver: it is no longer valid
1809    /// after this call.
1810    pub async fn recv(mut self) -> Result<M, MailboxError> {
1811        std::mem::take(&mut self.receiver)
1812            .unwrap()
1813            .await
1814            .map_err(|err| {
1815                MailboxError::new(
1816                    self.actor_id().clone(),
1817                    MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
1818                )
1819            })
1820    }
1821
1822    fn port(&self) -> u64 {
1823        self.port_id.1
1824    }
1825
1826    fn actor_id(&self) -> &ActorId {
1827        &self.port_id.0
1828    }
1829}
1830
1831impl<M> Drop for OncePortReceiver<M> {
1832    fn drop(&mut self) {
1833        // MARIUS: do we need to tombstone these? or should we
1834        // error out if we have removed the receiver before serializing the port ref?
1835        // ("no longer live")?
1836        self.mailbox.inner.ports.remove(&self.port());
1837    }
1838}
1839
1840/// Error that that occur during SerializedSender's send operation.
1841pub struct SerializedSenderError {
1842    /// The headers associated with the message.
1843    pub headers: Attrs,
1844    /// The message was tried to send.
1845    pub data: Serialized,
1846    /// The mailbox sender error that occurred.
1847    pub error: MailboxSenderError,
1848}
1849
1850/// SerializedSender encapsulates senders:
1851///   - It performs type erasure (and thus it is object-safe).
1852///   - It abstracts over [`Port`]s and [`OncePort`]s, by dynamically tracking the
1853///     validity of the underlying port.
1854trait SerializedSender: Send + Sync {
1855    /// Enables downcasting from `&dyn SerializedSender` to concrete
1856    /// types.
1857    ///
1858    /// Used by `Mailbox::lookup_sender` to downcast to
1859    /// `&UnboundedSender<M>` via `Any::downcast_ref`.
1860    fn as_any(&self) -> &dyn Any;
1861
1862    /// Send a serialized message. SerializedSender will deserialize the
1863    /// message (failing if it fails to deserialize), and then send the
1864    /// resulting message on the underlying port.
1865    ///
1866    /// Send_serialized returns true whenever the port remains valid
1867    /// after the send operation.
1868    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SerializedSender`.
1869    fn send_serialized(
1870        &self,
1871        headers: Attrs,
1872        serialized: Serialized,
1873    ) -> Result<bool, SerializedSenderError>;
1874}
1875
1876/// A sender to an M-typed unbounded port.
1877enum UnboundedPortSender<M: Message> {
1878    /// Send directly to the mpsc queue.
1879    Mpsc(mpsc::UnboundedSender<M>),
1880    /// Use the provided function to enqueue the item.
1881    Func(Arc<dyn Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
1882}
1883
1884impl<M: Message> UnboundedPortSender<M> {
1885    fn send(&self, headers: Attrs, message: M) -> Result<(), anyhow::Error> {
1886        match self {
1887            Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
1888            Self::Func(func) => func(headers, message),
1889        }
1890    }
1891}
1892
1893// We implement Clone manually as derive(Clone) places unnecessarily
1894// strict bounds on the type parameter M.
1895impl<M: Message> Clone for UnboundedPortSender<M> {
1896    fn clone(&self) -> Self {
1897        match self {
1898            Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
1899            Self::Func(func) => Self::Func(func.clone()),
1900        }
1901    }
1902}
1903
1904impl<M: Message> Debug for UnboundedPortSender<M> {
1905    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1906        match self {
1907            Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
1908            Self::Func(_) => f
1909                .debug_tuple("UnboundedPortSender::Func")
1910                .field(&"..")
1911                .finish(),
1912        }
1913    }
1914}
1915
1916struct UnboundedSender<M: Message> {
1917    sender: UnboundedPortSender<M>,
1918    port_id: PortId,
1919}
1920
1921impl<M: Message> UnboundedSender<M> {
1922    /// Create a new UnboundedSender encapsulating the provided
1923    /// sender.
1924    fn new(sender: UnboundedPortSender<M>, port_id: PortId) -> Self {
1925        Self { sender, port_id }
1926    }
1927
1928    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
1929    fn send(&self, headers: Attrs, message: M) -> Result<(), MailboxSenderError> {
1930        self.sender.send(headers, message).map_err(|err| {
1931            MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
1932        })
1933    }
1934}
1935
1936// Clone is implemented explicitly because the derive macro demands M:
1937// Clone directly. In this case, it isn't needed because Arc<T> can
1938// clone for any T.
1939impl<M: Message> Clone for UnboundedSender<M> {
1940    fn clone(&self) -> Self {
1941        Self {
1942            sender: self.sender.clone(),
1943            port_id: self.port_id.clone(),
1944        }
1945    }
1946}
1947
1948impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
1949    fn as_any(&self) -> &dyn Any {
1950        self
1951    }
1952
1953    fn send_serialized(
1954        &self,
1955        headers: Attrs,
1956        serialized: Serialized,
1957    ) -> Result<bool, SerializedSenderError> {
1958        match serialized.deserialized() {
1959            Ok(message) => {
1960                self.sender.send(headers.clone(), message).map_err(|err| {
1961                    SerializedSenderError {
1962                        data: serialized,
1963                        error: MailboxSenderError::new_bound(
1964                            self.port_id.clone(),
1965                            MailboxSenderErrorKind::Other(err),
1966                        ),
1967                        headers,
1968                    }
1969                })?;
1970
1971                Ok(true)
1972            }
1973            Err(err) => Err(SerializedSenderError {
1974                data: serialized,
1975                error: MailboxSenderError::new_bound(
1976                    self.port_id.clone(),
1977                    MailboxSenderErrorKind::Deserialize(M::typename(), err),
1978                ),
1979                headers,
1980            }),
1981        }
1982    }
1983}
1984
1985/// OnceSender encapsulates an underlying one-shot sender, dynamically
1986/// tracking its validity.
1987#[derive(Debug)]
1988struct OnceSender<M: Message> {
1989    sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
1990    port_id: PortId,
1991}
1992
1993impl<M: Message> OnceSender<M> {
1994    /// Create a new OnceSender encapsulating the provided one-shot
1995    /// sender.
1996    fn new(sender: oneshot::Sender<M>, port_id: PortId) -> Self {
1997        Self {
1998            sender: Arc::new(Mutex::new(Some(sender))),
1999            port_id,
2000        }
2001    }
2002
2003    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
2004    fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2005        // TODO: we should replace the sender on error
2006        match self.sender.lock().unwrap().take() {
2007            None => Err(MailboxSenderError::new_bound(
2008                self.port_id.clone(),
2009                MailboxSenderErrorKind::Closed,
2010            )),
2011            Some(sender) => {
2012                sender.send(message).map_err(|_| {
2013                    // Here, the value is returned when the port is
2014                    // closed.  We should consider having a similar
2015                    // API for send_once, though arguably it makes less
2016                    // sense in this context.
2017                    MailboxSenderError::new_bound(
2018                        self.port_id.clone(),
2019                        MailboxSenderErrorKind::Closed,
2020                    )
2021                })?;
2022                Ok(false)
2023            }
2024        }
2025    }
2026}
2027
2028// Clone is implemented explicitly because the derive macro demands M:
2029// Clone directly. In this case, it isn't needed because Arc<T> can
2030// clone for any T.
2031impl<M: Message> Clone for OnceSender<M> {
2032    fn clone(&self) -> Self {
2033        Self {
2034            sender: self.sender.clone(),
2035            port_id: self.port_id.clone(),
2036        }
2037    }
2038}
2039
2040impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2041    fn as_any(&self) -> &dyn Any {
2042        self
2043    }
2044
2045    fn send_serialized(
2046        &self,
2047        headers: Attrs,
2048        serialized: Serialized,
2049    ) -> Result<bool, SerializedSenderError> {
2050        match serialized.deserialized() {
2051            Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2052                data: serialized,
2053                error: e,
2054                headers,
2055            }),
2056            Err(err) => Err(SerializedSenderError {
2057                data: serialized,
2058                error: MailboxSenderError::new_bound(
2059                    self.port_id.clone(),
2060                    MailboxSenderErrorKind::Deserialize(M::typename(), err),
2061                ),
2062                headers,
2063            }),
2064        }
2065    }
2066}
2067
2068/// Use the provided function to send untyped messages (i.e. Serialized objects).
2069struct UntypedUnboundedSender {
2070    sender: Box<dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync>,
2071    port_id: PortId,
2072}
2073
2074impl SerializedSender for UntypedUnboundedSender {
2075    fn as_any(&self) -> &dyn Any {
2076        self
2077    }
2078
2079    fn send_serialized(
2080        &self,
2081        headers: Attrs,
2082        serialized: Serialized,
2083    ) -> Result<bool, SerializedSenderError> {
2084        (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2085            data,
2086            error: MailboxSenderError::new_bound(
2087                self.port_id.clone(),
2088                MailboxSenderErrorKind::Other(err),
2089            ),
2090            headers,
2091        })?;
2092
2093        Ok(true)
2094    }
2095}
2096
2097/// State is the internal state of the mailbox.
2098struct State {
2099    /// The ID of the mailbox owner.
2100    actor_id: ActorId,
2101
2102    // insert if it's serializable; otherwise don't.
2103    /// The set of active ports in the mailbox. All currently
2104    /// allocated ports are
2105    ports: DashMap<u64, Box<dyn SerializedSender>>,
2106
2107    /// The next port ID to allocate.
2108    next_port: AtomicU64,
2109
2110    /// The forwarder for this mailbox.
2111    forwarder: BoxedMailboxSender,
2112}
2113
2114impl State {
2115    /// Create a new state with the provided owning ActorId.
2116    fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
2117        Self {
2118            actor_id,
2119            ports: DashMap::new(),
2120            // The first 1024 ports are allocated to actor handlers.
2121            // Other port IDs are ephemeral.
2122            next_port: AtomicU64::new(USER_PORT_OFFSET),
2123            forwarder,
2124        }
2125    }
2126
2127    /// Allocate a fresh port.
2128    fn allocate_port(&self) -> u64 {
2129        self.next_port.fetch_add(1, Ordering::SeqCst)
2130    }
2131}
2132
2133impl fmt::Debug for State {
2134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2135        f.debug_struct("State")
2136            .field("actor_id", &self.actor_id)
2137            .field(
2138                "open_ports",
2139                &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2140            )
2141            .field("next_port", &self.next_port)
2142            .finish()
2143    }
2144}
2145
2146// TODO: mux based on some parameterized type. (mux key).
2147/// An in-memory mailbox muxer. This is used to route messages to
2148/// different underlying senders.
2149#[derive(Debug, Clone)]
2150pub struct MailboxMuxer {
2151    mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2152}
2153
2154impl MailboxMuxer {
2155    /// Create a new, empty, muxer.
2156    pub fn new() -> Self {
2157        Self {
2158            mailboxes: Arc::new(DashMap::new()),
2159        }
2160    }
2161
2162    /// Route messages destined for the provided actor id to the provided
2163    /// sender. Returns false if there is already a sender associated
2164    /// with the actor. In this case, the sender is not replaced, and
2165    /// the caller must [`MailboxMuxer::unbind`] it first.
2166    pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
2167        match self.mailboxes.entry(actor_id) {
2168            Entry::Occupied(_) => false,
2169            Entry::Vacant(entry) => {
2170                entry.insert(Box::new(sender));
2171                true
2172            }
2173        }
2174    }
2175
2176    /// Convenience function to bind a mailbox.
2177    pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2178        self.bind(mailbox.actor_id().clone(), mailbox)
2179    }
2180
2181    /// Unbind the sender associated with the provided actor ID. After
2182    /// unbinding, the muxer will no longer be able to send messages to
2183    /// that actor.
2184    pub(crate) fn unbind(&self, actor_id: &ActorId) {
2185        self.mailboxes.remove(actor_id);
2186    }
2187
2188    /// Returns a list of all actors bound to this muxer. Useful in debugging.
2189    pub fn bound_actors(&self) -> Vec<ActorId> {
2190        self.mailboxes.iter().map(|e| e.key().clone()).collect()
2191    }
2192}
2193
2194impl MailboxSender for MailboxMuxer {
2195    fn post(
2196        &self,
2197        envelope: MessageEnvelope,
2198        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2199    ) {
2200        let dest_actor_id = envelope.dest().actor_id();
2201        match self.mailboxes.get(envelope.dest().actor_id()) {
2202            None => {
2203                let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2204                envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2205            }
2206            Some(sender) => sender.post(envelope, return_handle),
2207        }
2208    }
2209}
2210
2211/// MailboxRouter routes messages to the sender that is bound to its
2212/// nearest prefix.
2213#[derive(Debug, Clone)]
2214pub struct MailboxRouter {
2215    entries: Arc<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2216}
2217
2218impl MailboxRouter {
2219    /// Create a new, empty router.
2220    pub fn new() -> Self {
2221        Self {
2222            entries: Arc::new(RwLock::new(BTreeMap::new())),
2223        }
2224    }
2225
2226    /// Downgrade this router to a [`WeakMailboxRouter`].
2227    pub fn downgrade(&self) -> WeakMailboxRouter {
2228        WeakMailboxRouter(Arc::downgrade(&self.entries))
2229    }
2230
2231    /// Returns a new router that will first attempt to find a route for the message
2232    /// in the router's table; otherwise post the message to the provided fallback
2233    /// sender.
2234    pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2235        FallbackMailboxRouter {
2236            router: self.clone(),
2237            default,
2238        }
2239    }
2240
2241    /// Bind the provided sender to the given reference. The destination
2242    /// is treated as a prefix to which messages can be routed, and
2243    /// messages are routed to their longest matching prefix.
2244    pub fn bind(&self, dest: Reference, sender: impl MailboxSender + 'static) {
2245        let mut w = self.entries.write().unwrap();
2246        w.insert(dest, Arc::new(sender));
2247    }
2248
2249    fn sender(&self, actor_id: &ActorId) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2250        match self
2251            .entries
2252            .read()
2253            .unwrap()
2254            .lower_bound(Excluded(&actor_id.clone().into()))
2255            .prev()
2256        {
2257            None => None,
2258            Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2259                Some(sender.clone())
2260            }
2261            Some(_) => None,
2262        }
2263    }
2264}
2265
2266impl MailboxSender for MailboxRouter {
2267    fn post(
2268        &self,
2269        envelope: MessageEnvelope,
2270        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2271    ) {
2272        match self.sender(envelope.dest().actor_id()) {
2273            None => envelope.undeliverable(
2274                DeliveryError::Unroutable(
2275                    "no destination found for actor in routing table".to_string(),
2276                ),
2277                return_handle,
2278            ),
2279            Some(sender) => sender.post(envelope, return_handle),
2280        }
2281    }
2282}
2283
2284#[derive(Debug, Clone)]
2285struct FallbackMailboxRouter {
2286    router: MailboxRouter,
2287    default: BoxedMailboxSender,
2288}
2289
2290impl MailboxSender for FallbackMailboxRouter {
2291    fn post(
2292        &self,
2293        envelope: MessageEnvelope,
2294        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2295    ) {
2296        match self.router.sender(envelope.dest().actor_id()) {
2297            Some(sender) => sender.post(envelope, return_handle),
2298            None => self.default.post(envelope, return_handle),
2299        }
2300    }
2301}
2302
2303/// A version of [`MailboxRouter`] that holds a weak reference to the underlying
2304/// state. This allows router references to be circular: an entity holding a reference
2305/// to the router may also contain the router itself.
2306///
2307/// TODO: this currently holds a weak reference to the entire router. This helps
2308/// prevent cycle leaks, but can cause excess memory usage as the cycle is at
2309/// the granularity of each entry. Possibly the router should allow weak references
2310/// on a per-entry basis.
2311#[derive(Debug, Clone)]
2312pub struct WeakMailboxRouter(
2313    Weak<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2314);
2315
2316impl WeakMailboxRouter {
2317    /// Upgrade the weak router to a strong reference router.
2318    pub fn upgrade(&self) -> Option<MailboxRouter> {
2319        self.0.upgrade().map(|entries| MailboxRouter { entries })
2320    }
2321}
2322
2323impl MailboxSender for WeakMailboxRouter {
2324    fn post(
2325        &self,
2326        envelope: MessageEnvelope,
2327        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2328    ) {
2329        match self.upgrade() {
2330            Some(router) => router.post(envelope, return_handle),
2331            None => envelope.undeliverable(
2332                DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2333                return_handle,
2334            ),
2335        }
2336    }
2337}
2338
2339/// A dynamic mailbox router that supports remote delivery.
2340///
2341/// `DialMailboxRouter` maintains a runtime address book mapping
2342/// references to `ChannelAddr`s. It holds a cache of active
2343/// connections and forwards messages to the appropriate
2344/// `MailboxClient`.
2345///
2346/// Messages sent to unknown destinations are routed to the `default`
2347/// sender, if present.
2348#[derive(Debug, Clone)]
2349pub struct DialMailboxRouter {
2350    address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,
2351    sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2352
2353    // The default sender, to which messages for unknown recipients
2354    // are sent. (This is like a default route in a routing table.)
2355    default: BoxedMailboxSender,
2356}
2357
2358impl DialMailboxRouter {
2359    /// Create a new [`DialMailboxRouter`] with an empty routing table.
2360    pub fn new() -> Self {
2361        Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2362    }
2363
2364    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2365    /// and a default sender. Any message with an unknown destination is
2366    /// dispatched on this default sender.
2367    pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2368        Self {
2369            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2370            sender_cache: Arc::new(DashMap::new()),
2371            default,
2372        }
2373    }
2374
2375    /// Binds a [`Reference`] to a [`ChannelAddr`], replacing any
2376    /// existing binding.
2377    ///
2378    /// If the address changes, the old sender is evicted from the
2379    /// cache to ensure fresh routing on next use.
2380    pub fn bind(&self, dest: Reference, addr: ChannelAddr) {
2381        if let Ok(mut w) = self.address_book.write() {
2382            if let Some(old_addr) = w.insert(dest.clone(), addr.clone()) {
2383                if old_addr != addr {
2384                    tracing::info!("Rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2385                    self.sender_cache.remove(&old_addr);
2386                }
2387            }
2388        } else {
2389            tracing::error!("Address book poisoned during bind of {:?}", dest);
2390        }
2391    }
2392
2393    /// Removes all address mappings with the given prefix from the
2394    /// router.
2395    ///
2396    /// Also evicts any corresponding cached senders to prevent reuse
2397    /// of stale connections.
2398    pub fn unbind(&self, dest: &Reference) {
2399        if let Ok(mut w) = self.address_book.write() {
2400            let to_remove: Vec<(Reference, ChannelAddr)> = w
2401                .range(dest..)
2402                .take_while(|(key, _)| dest.is_prefix_of(key))
2403                .map(|(key, addr)| (key.clone(), addr.clone()))
2404                .collect();
2405
2406            for (key, addr) in to_remove {
2407                tracing::info!("Unbinding {:?} from {:?}", key, addr);
2408                w.remove(&key);
2409                self.sender_cache.remove(&addr);
2410            }
2411        } else {
2412            tracing::error!("Address book poisoned during unbind of {:?}", dest);
2413        }
2414    }
2415
2416    /// Lookup an actor's channel in the router's address bok.
2417    pub fn lookup_addr(&self, actor_id: &ActorId) -> Option<ChannelAddr> {
2418        let address_book = self.address_book.read().unwrap();
2419        address_book
2420            .lower_bound(Excluded(&actor_id.clone().into()))
2421            .prev()
2422            .and_then(|(key, addr)| {
2423                if key.is_prefix_of(&actor_id.clone().into()) {
2424                    Some(addr.clone())
2425                } else {
2426                    None
2427                }
2428            })
2429    }
2430
2431    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
2432    fn dial(
2433        &self,
2434        addr: &ChannelAddr,
2435        actor_id: &ActorId,
2436    ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2437        // Get the sender. Create it if needed. Do not send the
2438        // messages inside this block so we do not hold onto the
2439        // reference of the dashmap entries.
2440        match self.sender_cache.entry(addr.clone()) {
2441            Entry::Occupied(entry) => Ok(entry.get().clone()),
2442            Entry::Vacant(entry) => {
2443                let tx = channel::dial(addr.clone()).map_err(|err| {
2444                    MailboxSenderError::new_unbound_type(
2445                        actor_id.clone(),
2446                        MailboxSenderErrorKind::Channel(err),
2447                        "unknown",
2448                    )
2449                })?;
2450                let sender = MailboxClient::new(tx);
2451                Ok(entry.insert(Arc::new(sender)).value().clone())
2452            }
2453        }
2454    }
2455}
2456
2457impl MailboxSender for DialMailboxRouter {
2458    fn post(
2459        &self,
2460        envelope: MessageEnvelope,
2461        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2462    ) {
2463        let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2464            self.default.post(envelope, return_handle);
2465            return;
2466        };
2467
2468        match self.dial(&addr, envelope.dest().actor_id()) {
2469            Err(err) => envelope.undeliverable(
2470                DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2471                return_handle,
2472            ),
2473            Ok(sender) => sender.post(envelope, return_handle),
2474        }
2475    }
2476}
2477
2478/// A MailboxSender that reports any envelope as undeliverable due to
2479/// routing failure.
2480#[derive(Debug)]
2481pub struct UnroutableMailboxSender;
2482
2483impl MailboxSender for UnroutableMailboxSender {
2484    fn post(
2485        &self,
2486        envelope: MessageEnvelope,
2487        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2488    ) {
2489        envelope.undeliverable(
2490            DeliveryError::Unroutable("destination not found in routing table".to_string()),
2491            return_handle,
2492        );
2493    }
2494}
2495
2496#[cfg(test)]
2497mod tests {
2498
2499    use std::assert_matches::assert_matches;
2500    use std::mem::drop;
2501    use std::sync::atomic::AtomicUsize;
2502    use std::time::Duration;
2503
2504    use timed_test::async_timed_test;
2505
2506    use super::*;
2507    use crate::Actor;
2508    use crate::PortId;
2509    use crate::accum;
2510    use crate::channel::ChannelTransport;
2511    use crate::channel::dial;
2512    use crate::channel::serve;
2513    use crate::channel::sim::SimAddr;
2514    use crate::clock::Clock;
2515    use crate::clock::RealClock;
2516    use crate::data::Serialized;
2517    use crate::id;
2518    use crate::proc::Proc;
2519    use crate::reference::ProcId;
2520    use crate::reference::WorldId;
2521    use crate::simnet;
2522
2523    #[test]
2524    fn test_error() {
2525        let err = MailboxError::new(
2526            ActorId(
2527                ProcId::Ranked(WorldId("myworld".to_string()), 2),
2528                "myactor".to_string(),
2529                5,
2530            ),
2531            MailboxErrorKind::Closed,
2532        );
2533        assert_eq!(format!("{}", err), "myworld[2].myactor[5]: mailbox closed");
2534    }
2535
2536    #[tokio::test]
2537    async fn test_mailbox_basic() {
2538        let mbox = Mailbox::new_detached(id!(test[0].test));
2539        let (port, mut receiver) = mbox.open_port::<u64>();
2540        let port = port.bind();
2541
2542        mbox.serialize_and_send(&port, 123, monitored_return_handle())
2543            .unwrap();
2544        mbox.serialize_and_send(&port, 321, monitored_return_handle())
2545            .unwrap();
2546        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2547        assert_eq!(receiver.recv().await.unwrap(), 321u64);
2548
2549        let serialized = Serialized::serialize(&999u64).unwrap();
2550        mbox.post(
2551            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2552            monitored_return_handle(),
2553        );
2554        assert_eq!(receiver.recv().await.unwrap(), 999u64);
2555    }
2556
2557    #[tokio::test]
2558    async fn test_mailbox_accum() {
2559        let mbox = Mailbox::new_detached(id!(test[0].test));
2560        let (port, mut receiver) = mbox.open_accum_port(accum::max::<i64>());
2561
2562        for i in -3..4 {
2563            port.send(i).unwrap();
2564            let received: accum::Max<i64> = receiver.recv().await.unwrap();
2565            let msg = received.get();
2566            assert_eq!(msg, &i);
2567        }
2568        // Send a smaller or same value. Should still receive the previous max.
2569        for i in -3..4 {
2570            port.send(i).unwrap();
2571            assert_eq!(receiver.recv().await.unwrap().get(), &3);
2572        }
2573        // send a larger value. Should receive the new max.
2574        port.send(4).unwrap();
2575        assert_eq!(receiver.recv().await.unwrap().get(), &4);
2576
2577        // Send multiple updates. Should only receive the final change.
2578        for i in 5..10 {
2579            port.send(i).unwrap();
2580        }
2581        assert_eq!(receiver.recv().await.unwrap().get(), &9);
2582        port.send(1).unwrap();
2583        port.send(3).unwrap();
2584        port.send(2).unwrap();
2585        assert_eq!(receiver.recv().await.unwrap().get(), &9);
2586    }
2587
2588    #[test]
2589    fn test_port_and_reducer() {
2590        let mbox = Mailbox::new_detached(id!(test[0].test));
2591        // accum port could have reducer typehash
2592        {
2593            let accumulator = accum::max::<u64>();
2594            let reducer_spec = accumulator.reducer_spec().unwrap();
2595            let (port, _) = mbox.open_accum_port(accum::max::<u64>());
2596            assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
2597            let port_ref = port.bind();
2598            assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
2599        }
2600        // normal port should not have reducer typehash
2601        {
2602            let (port, _) = mbox.open_port::<u64>();
2603            assert_eq!(port.reducer_spec, None);
2604            let port_ref = port.bind();
2605            assert_eq!(port_ref.reducer_spec(), &None);
2606        }
2607    }
2608
2609    #[tokio::test]
2610    #[ignore] // error behavior changed, but we will bring it back
2611    async fn test_mailbox_once() {
2612        let mbox = Mailbox::new_detached(id!(test[0].test));
2613
2614        let (port, receiver) = mbox.open_once_port::<u64>();
2615
2616        // let port_id = port.port_id().clone();
2617
2618        port.send(123u64).unwrap();
2619        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2620
2621        // // The borrow checker won't let us send again on the port
2622        // // (good!), but we stashed the port-id and so we can try on the
2623        // // serialized interface.
2624        // let Err(err) = mbox
2625        //     .send_serialized(&port_id, &Serialized(Vec::new()))
2626        //     .await
2627        // else {
2628        //     unreachable!()
2629        // };
2630        // assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2631    }
2632
2633    #[tokio::test]
2634    #[ignore] // changed error behavior
2635    async fn test_mailbox_receiver_drop() {
2636        let mbox = Mailbox::new_detached(id!(test[0].test));
2637        let (port, mut receiver) = mbox.open_port::<u64>();
2638        // Make sure we go through "remote" path.
2639        let port = port.bind();
2640        mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
2641            .unwrap();
2642        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2643        drop(receiver);
2644        let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
2645            panic!();
2646        };
2647
2648        assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2649        assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
2650    }
2651
2652    #[tokio::test]
2653    async fn test_drain() {
2654        let mbox = Mailbox::new_detached(id!(test[0].test));
2655
2656        let (port, mut receiver) = mbox.open_port();
2657        let port = port.bind();
2658
2659        for i in 0..10 {
2660            mbox.serialize_and_send(&port, i, monitored_return_handle())
2661                .unwrap();
2662        }
2663
2664        for i in 0..10 {
2665            assert_eq!(receiver.recv().await.unwrap(), i);
2666        }
2667
2668        assert!(receiver.drain().is_empty());
2669    }
2670
2671    #[tokio::test]
2672    async fn test_mailbox_muxer() {
2673        let muxer = MailboxMuxer::new();
2674
2675        let mbox0 = Mailbox::new_detached(id!(test[0].actor1));
2676        let mbox1 = Mailbox::new_detached(id!(test[0].actor2));
2677
2678        muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
2679        muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
2680
2681        let (port, receiver) = mbox0.open_once_port::<u64>();
2682
2683        port.send(123u64).unwrap();
2684        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2685
2686        /*
2687        let (tx, rx) = channel::local::new::<u64>();
2688        let (port, _) = mbox0.open_port::<u64>();
2689        let handle = muxer.clone().serve_port(port, rx).unwrap();
2690        muxer.unbind(mbox0.actor_id());
2691        tx.send(123u64).await.unwrap();
2692        let Ok(Err(err)) = handle.await else { panic!() };
2693        assert_eq!(err.actor_id(), &actor_id(0));
2694        */
2695    }
2696
2697    #[tokio::test]
2698    async fn test_local_client_server() {
2699        let mbox = Mailbox::new_detached(id!(test[0].actor0));
2700        let (tx, rx) = channel::local::new();
2701        let serve_handle = mbox.clone().serve(rx);
2702        let client = MailboxClient::new(tx);
2703
2704        let (port, receiver) = mbox.open_once_port::<u64>();
2705        let port = port.bind();
2706
2707        client
2708            .serialize_and_send_once(port, 123u64, monitored_return_handle())
2709            .unwrap();
2710        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2711        serve_handle.stop("fromt test");
2712        serve_handle.await.unwrap().unwrap();
2713    }
2714
2715    #[tokio::test]
2716    async fn test_sim_client_server() {
2717        simnet::start();
2718        let dst_addr = SimAddr::new("local:1".parse::<ChannelAddr>().unwrap()).unwrap();
2719        let src_to_dst = ChannelAddr::Sim(
2720            SimAddr::new_with_src(
2721                "local:0".parse::<ChannelAddr>().unwrap(),
2722                dst_addr.addr().clone(),
2723            )
2724            .unwrap(),
2725        );
2726
2727        let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone()))
2728            .await
2729            .unwrap();
2730        let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
2731        let mbox = Mailbox::new_detached(id!(test[0].actor0));
2732        let serve_handle = mbox.clone().serve(rx);
2733        let client = MailboxClient::new(tx);
2734        let (port, receiver) = mbox.open_once_port::<u64>();
2735        let port = port.bind();
2736        let msg: u64 = 123;
2737        client
2738            .serialize_and_send_once(port, msg, monitored_return_handle())
2739            .unwrap();
2740        assert_eq!(receiver.recv().await.unwrap(), msg);
2741        serve_handle.stop("from test");
2742        serve_handle.await.unwrap().unwrap();
2743    }
2744
2745    #[tokio::test]
2746    async fn test_mailbox_router() {
2747        let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2748        let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2749        let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2750        let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2751
2752        let comms: Vec<(OncePortRef<u64>, OncePortReceiver<u64>)> =
2753            [&mbox0, &mbox1, &mbox2, &mbox3]
2754                .into_iter()
2755                .map(|mbox| {
2756                    let (port, receiver) = mbox.open_once_port::<u64>();
2757                    (port.bind(), receiver)
2758                })
2759                .collect();
2760
2761        let router = MailboxRouter::new();
2762
2763        router.bind(id!(world0).into(), mbox0);
2764        router.bind(id!(world1[0]).into(), mbox1);
2765        router.bind(id!(world1[1]).into(), mbox2);
2766        router.bind(id!(world1[1].actor1).into(), mbox3);
2767
2768        for (i, (port, receiver)) in comms.into_iter().enumerate() {
2769            router
2770                .serialize_and_send_once(port, i as u64, monitored_return_handle())
2771                .unwrap();
2772            assert_eq!(receiver.recv().await.unwrap(), i as u64);
2773        }
2774
2775        // Test undeliverable messages, and that it is delivered with the appropriate fallback.
2776
2777        let mbox4 = Mailbox::new_detached(id!(fallback[0].actor));
2778
2779        let (return_handle, mut return_receiver) =
2780            crate::mailbox::undeliverable::new_undeliverable_port();
2781        let (port, _receiver) = mbox4.open_once_port();
2782        router
2783            .serialize_and_send_once(port.bind(), 0, return_handle.clone())
2784            .unwrap();
2785        assert!(return_receiver.recv().await.is_ok());
2786
2787        let router = router.fallback(mbox4.clone().into_boxed());
2788        let (port, receiver) = mbox4.open_once_port();
2789        router
2790            .serialize_and_send_once(port.bind(), 0, return_handle)
2791            .unwrap();
2792        assert_eq!(receiver.recv().await.unwrap(), 0);
2793    }
2794
2795    #[tokio::test]
2796    async fn test_dial_mailbox_router() {
2797        let router = DialMailboxRouter::new();
2798
2799        router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
2800        router.bind(id!(world1[0]).into(), "unix!@2".parse().unwrap());
2801        router.bind(id!(world1[1]).into(), "unix!@3".parse().unwrap());
2802        router.bind(id!(world1[1].actor1).into(), "unix!@4".parse().unwrap());
2803
2804        // We should be able to lookup the ids
2805        router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2806        router.lookup_addr(&id!(world1[0].actor[0])).unwrap();
2807
2808        // Unbind so we cannot find the ids anymore
2809        router.unbind(&id!(world1).into());
2810        assert!(router.lookup_addr(&id!(world1[0].actor1[0])).is_none());
2811        assert!(router.lookup_addr(&id!(world1[1].actor1[0])).is_none());
2812        assert!(router.lookup_addr(&id!(world1[2].actor1[0])).is_none());
2813        router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2814        router.unbind(&id!(world0).into());
2815        assert!(router.lookup_addr(&id!(world0[0].actor[0])).is_none());
2816    }
2817
2818    #[tokio::test]
2819    #[ignore] // TODO: there's a leak here, fix it
2820    async fn test_dial_mailbox_router_default() {
2821        let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2822        let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2823        let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2824        let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2825
2826        // We don't need to dial here, since we gain direct access to the
2827        // underlying routers.
2828        let root = MailboxRouter::new();
2829        let world0_router = DialMailboxRouter::new_with_default(root.boxed());
2830        let world1_router = DialMailboxRouter::new_with_default(root.boxed());
2831
2832        root.bind(id!(world0).into(), world0_router.clone());
2833        root.bind(id!(world1).into(), world1_router.clone());
2834
2835        let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
2836
2837        let mut handles = Vec::new(); // hold on to handles, or channels get closed
2838        for mbox in mailboxes.iter() {
2839            let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local))
2840                .await
2841                .unwrap();
2842            let handle = (*mbox).clone().serve(rx);
2843            handles.push(handle);
2844
2845            eprintln!("{}: {}", mbox.actor_id(), addr);
2846            if mbox.actor_id().world_name() == "world0" {
2847                world0_router.bind(mbox.actor_id().clone().into(), addr);
2848            } else {
2849                world1_router.bind(mbox.actor_id().clone().into(), addr);
2850            }
2851        }
2852
2853        // Make sure nodes are fully connected.
2854        for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
2855            for mbox in mailboxes.iter() {
2856                let (port, receiver) = mbox.open_once_port::<u64>();
2857                let port = port.bind();
2858                router
2859                    .serialize_and_send_once(port, 123u64, monitored_return_handle())
2860                    .unwrap();
2861                assert_eq!(receiver.recv().await.unwrap(), 123u64);
2862            }
2863        }
2864    }
2865
2866    #[tokio::test]
2867    async fn test_enqueue_port() {
2868        let mbox = Mailbox::new_detached(id!(test[0].test));
2869
2870        let count = Arc::new(AtomicUsize::new(0));
2871        let count_clone = count.clone();
2872        let port = mbox.open_enqueue_port(move |_, n| {
2873            count_clone.fetch_add(n, Ordering::SeqCst);
2874            Ok(())
2875        });
2876
2877        port.send(10).unwrap();
2878        port.send(5).unwrap();
2879        port.send(1).unwrap();
2880        port.send(0).unwrap();
2881
2882        assert_eq!(count.load(Ordering::SeqCst), 16);
2883    }
2884
2885    #[derive(Clone, Debug, Serialize, Deserialize, Named)]
2886    struct TestMessage;
2887
2888    #[derive(Clone, Debug, Serialize, Deserialize, Named)]
2889    #[named(name = "some::custom::path")]
2890    struct TestMessage2;
2891
2892    #[test]
2893    fn test_remote_message_macros() {
2894        assert_eq!(
2895            TestMessage::typename(),
2896            "hyperactor::mailbox::tests::TestMessage"
2897        );
2898        assert_eq!(TestMessage2::typename(), "some::custom::path");
2899    }
2900
2901    #[test]
2902    fn test_message_envelope_display() {
2903        #[derive(Named, Serialize, Deserialize)]
2904        struct MyTest {
2905            a: u64,
2906            b: String,
2907        }
2908        crate::register_type!(MyTest);
2909
2910        let envelope = MessageEnvelope::serialize(
2911            id!(source[0].actor),
2912            id!(dest[1].actor[0][123]),
2913            &MyTest {
2914                a: 123,
2915                b: "hello".into(),
2916            },
2917            Attrs::new(),
2918        )
2919        .unwrap();
2920
2921        assert_eq!(
2922            format!("{}", envelope),
2923            r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"}"#
2924        );
2925    }
2926
2927    #[derive(Debug, Default, Actor)]
2928    struct Foo;
2929
2930    // Test that a message delivery failure causes the sending actor
2931    // to stop running.
2932    #[tokio::test]
2933    async fn test_actor_delivery_failure() {
2934        // This test involves making an actor fail and so we must set
2935        // a supervision coordinator.
2936        use crate::actor::ActorStatus;
2937        use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
2938
2939        let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
2940            BOXED_PANICKING_MAILBOX_SENDER.clone(),
2941        ));
2942        let proc_id = id!(quux[0]);
2943        let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
2944        ProcSupervisionCoordinator::set(&proc).await.unwrap();
2945
2946        let foo = proc.spawn::<Foo>("foo", ()).await.unwrap();
2947        let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
2948        let message = MessageEnvelope::new(
2949            foo.actor_id().clone(),
2950            PortId(id!(corge[0].bar), 9999u64),
2951            Serialized::serialize(&1u64).unwrap(),
2952            Attrs::new(),
2953        );
2954        return_handle.send(Undeliverable(message)).unwrap();
2955
2956        RealClock
2957            .sleep(tokio::time::Duration::from_millis(100))
2958            .await;
2959
2960        let foo_status = foo.status();
2961        assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
2962        let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
2963            unreachable!()
2964        };
2965        assert!(msg.as_str().contains(
2966            "serving quux[0].foo[0]: processing error: a message from \
2967                quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
2968        ));
2969
2970        proc.destroy_and_wait(tokio::time::Duration::from_secs(1), None)
2971            .await
2972            .unwrap();
2973    }
2974
2975    #[tokio::test]
2976    async fn test_detached_return_handle() {
2977        let (return_handle, mut return_receiver) =
2978            crate::mailbox::undeliverable::new_undeliverable_port();
2979        // Simulate an undelivered message return.
2980        let envelope = MessageEnvelope::new(
2981            id!(foo[0].bar),
2982            PortId(id!(baz[0].corge), 9999u64),
2983            Serialized::serialize(&1u64).unwrap(),
2984            Attrs::new(),
2985        );
2986        return_handle.send(Undeliverable(envelope.clone())).unwrap();
2987        // Check we receive the undelivered message.
2988        assert!(
2989            RealClock
2990                .timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
2991                .await
2992                .is_ok()
2993        );
2994        // Setup a monitor for the receiver and show that if there are
2995        // no outstanding return handles it terminates.
2996        let monitor_handle = tokio::spawn(async move {
2997            while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
2998                envelope.try_set_error(DeliveryError::BrokenLink(
2999                    "returned in unit test".to_string(),
3000                ));
3001                UndeliverableMailboxSender
3002                    .post(envelope, /*unused */ monitored_return_handle());
3003            }
3004        });
3005        drop(return_handle);
3006        assert!(
3007            RealClock
3008                .timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3009                .await
3010                .is_ok()
3011        );
3012    }
3013
3014    async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3015        fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3016            // Create dummy state and port_id to create PortReceiver. They are
3017            // not used in the test.
3018            let dummy_state =
3019                State::new(id!(world[0].actor), BOXED_PANICKING_MAILBOX_SENDER.clone());
3020            let dummy_port_id = PortId(id!(world[0].actor), 0);
3021            let (sender, receiver) = mpsc::unbounded_channel::<M>();
3022            let receiver = PortReceiver {
3023                receiver,
3024                port_id: dummy_port_id,
3025                coalesce,
3026                mailbox: Mailbox {
3027                    inner: Arc::new(dummy_state),
3028                },
3029            };
3030            (sender, receiver)
3031        }
3032
3033        // verify fn drain
3034        {
3035            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3036            assert!(receiver.drain().is_empty());
3037
3038            sender.send(0).unwrap();
3039            sender.send(1).unwrap();
3040            sender.send(2).unwrap();
3041            sender.send(3).unwrap();
3042            sender.send(4).unwrap();
3043            sender.send(5).unwrap();
3044            sender.send(6).unwrap();
3045            sender.send(7).unwrap();
3046
3047            if drop_sender {
3048                drop(sender);
3049            }
3050
3051            if !coalesce {
3052                assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3053            } else {
3054                assert_eq!(receiver.drain(), vec![7]);
3055            }
3056
3057            assert!(receiver.drain().is_empty());
3058            assert!(receiver.drain().is_empty());
3059        }
3060
3061        // verify fn try_recv
3062        {
3063            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3064            assert!(receiver.try_recv().unwrap().is_none());
3065
3066            sender.send(0).unwrap();
3067            sender.send(1).unwrap();
3068            sender.send(2).unwrap();
3069            sender.send(3).unwrap();
3070
3071            if drop_sender {
3072                drop(sender);
3073            }
3074
3075            if !coalesce {
3076                assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3077                assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3078                assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3079            }
3080            assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3081            if drop_sender {
3082                assert_matches!(
3083                    receiver.try_recv().unwrap_err().kind(),
3084                    MailboxErrorKind::Closed
3085                );
3086                // Still Closed error
3087                assert_matches!(
3088                    receiver.try_recv().unwrap_err().kind(),
3089                    MailboxErrorKind::Closed
3090                );
3091            } else {
3092                assert!(receiver.try_recv().unwrap().is_none());
3093                // Still empty
3094                assert!(receiver.try_recv().unwrap().is_none());
3095            }
3096        }
3097        // verify fn recv
3098        {
3099            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3100            assert!(
3101                RealClock
3102                    .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3103                    .await
3104                    .is_err()
3105            );
3106
3107            sender.send(4).unwrap();
3108            sender.send(5).unwrap();
3109            sender.send(6).unwrap();
3110            sender.send(7).unwrap();
3111
3112            if drop_sender {
3113                drop(sender);
3114            }
3115
3116            if !coalesce {
3117                assert_eq!(receiver.recv().await.unwrap(), 4);
3118                assert_eq!(receiver.recv().await.unwrap(), 5);
3119                assert_eq!(receiver.recv().await.unwrap(), 6);
3120            }
3121            assert_eq!(receiver.recv().await.unwrap(), 7);
3122            if drop_sender {
3123                assert_matches!(
3124                    receiver.recv().await.unwrap_err().kind(),
3125                    MailboxErrorKind::Closed
3126                );
3127                // Still None
3128                assert_matches!(
3129                    receiver.recv().await.unwrap_err().kind(),
3130                    MailboxErrorKind::Closed
3131                );
3132            } else {
3133                assert!(
3134                    RealClock
3135                        .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3136                        .await
3137                        .is_err()
3138                );
3139            }
3140        }
3141    }
3142
3143    #[tokio::test]
3144    async fn test_receiver_basic_default() {
3145        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ false).await
3146    }
3147
3148    #[tokio::test]
3149    async fn test_receiver_basic_latest() {
3150        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ false).await
3151    }
3152
3153    #[tokio::test]
3154    async fn test_receiver_after_sender_drop_default() {
3155        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ true).await
3156    }
3157
3158    #[tokio::test]
3159    async fn test_receiver_after_sender_drop_latest() {
3160        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ true).await
3161    }
3162
3163    struct Setup {
3164        receiver: PortReceiver<u64>,
3165        actor0: Mailbox,
3166        actor1: Mailbox,
3167        port_id: PortId,
3168        port_id1: PortId,
3169        port_id2: PortId,
3170        port_id2_1: PortId,
3171    }
3172
3173    async fn setup_split_port_ids(reducer_spec: Option<ReducerSpec>) -> Setup {
3174        let muxer = MailboxMuxer::new();
3175        let actor0 = Mailbox::new(id!(test[0].actor), BoxedMailboxSender::new(muxer.clone()));
3176        let actor1 = Mailbox::new(id!(test[1].actor1), BoxedMailboxSender::new(muxer.clone()));
3177        muxer.bind_mailbox(actor0.clone());
3178        muxer.bind_mailbox(actor1.clone());
3179
3180        // Open a port on actor0
3181        let (port_handle, receiver) = actor0.open_port::<u64>();
3182        let port_id = port_handle.bind().port_id().clone();
3183
3184        // Split it twice on actor1
3185        let port_id1 = port_id.split(&actor1, reducer_spec.clone()).unwrap();
3186        let port_id2 = port_id.split(&actor1, reducer_spec.clone()).unwrap();
3187
3188        // A split port id can also be split
3189        let port_id2_1 = port_id2.split(&actor1, reducer_spec).unwrap();
3190
3191        Setup {
3192            receiver,
3193            actor0,
3194            actor1,
3195            port_id,
3196            port_id1,
3197            port_id2,
3198            port_id2_1,
3199        }
3200    }
3201
3202    fn post(mailbox: &Mailbox, port_id: PortId, msg: u64) {
3203        mailbox.post(
3204            MessageEnvelope::new_unknown(port_id.clone(), Serialized::serialize(&msg).unwrap()),
3205            monitored_return_handle(),
3206        );
3207    }
3208
3209    #[async_timed_test(timeout_secs = 30)]
3210    async fn test_split_port_id_no_reducer() {
3211        let Setup {
3212            mut receiver,
3213            actor0,
3214            actor1,
3215            port_id,
3216            port_id1,
3217            port_id2,
3218            port_id2_1,
3219        } = setup_split_port_ids(None).await;
3220        // Can send messages to receiver from all port handles
3221        post(&actor0, port_id.clone(), 1);
3222        assert_eq!(receiver.recv().await.unwrap(), 1);
3223        post(&actor1, port_id1.clone(), 2);
3224        assert_eq!(receiver.recv().await.unwrap(), 2);
3225        post(&actor1, port_id2.clone(), 3);
3226        assert_eq!(receiver.recv().await.unwrap(), 3);
3227        post(&actor1, port_id2_1.clone(), 4);
3228        assert_eq!(receiver.recv().await.unwrap(), 4);
3229
3230        // no more messages
3231        RealClock.sleep(Duration::from_secs(2)).await;
3232        let msg = receiver.try_recv().unwrap();
3233        assert_eq!(msg, None);
3234    }
3235
3236    async fn wait_for(
3237        receiver: &mut PortReceiver<u64>,
3238        expected_size: usize,
3239        timeout_duration: Duration,
3240    ) -> anyhow::Result<Vec<u64>> {
3241        let mut messeges = vec![];
3242
3243        RealClock
3244            .timeout(timeout_duration, async {
3245                loop {
3246                    let msg = receiver.recv().await.unwrap();
3247                    messeges.push(msg);
3248                    if messeges.len() == expected_size {
3249                        break;
3250                    }
3251                }
3252            })
3253            .await?;
3254        Ok(messeges)
3255    }
3256
3257    #[async_timed_test(timeout_secs = 30)]
3258    async fn test_split_port_id_sum_reducer() {
3259        let config = crate::config::global::lock();
3260        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3261
3262        let sum_accumulator = accum::sum::<u64>();
3263        let reducer_spec = sum_accumulator.reducer_spec();
3264        let Setup {
3265            mut receiver,
3266            actor0,
3267            actor1,
3268            port_id,
3269            port_id1,
3270            port_id2,
3271            port_id2_1,
3272        } = setup_split_port_ids(reducer_spec).await;
3273        post(&actor0, port_id.clone(), 4);
3274        post(&actor1, port_id1.clone(), 2);
3275        post(&actor1, port_id2.clone(), 3);
3276        post(&actor1, port_id2_1.clone(), 1);
3277        let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3278            .await
3279            .unwrap();
3280        // Message might be received out of their sending out. So we sort the
3281        // messages here.
3282        messages.sort();
3283        assert_eq!(messages, vec![1, 2, 3, 4]);
3284
3285        // no more messages
3286        RealClock.sleep(Duration::from_secs(2)).await;
3287        let msg = receiver.try_recv().unwrap();
3288        assert_eq!(msg, None);
3289    }
3290
3291    #[async_timed_test(timeout_secs = 30)]
3292    async fn test_split_port_id_every_n_messages() {
3293        let actor = Mailbox::new(
3294            id!(test[0].actor),
3295            BoxedMailboxSender::new(PanickingMailboxSender),
3296        );
3297        let (port_handle, mut receiver) = actor.open_port::<u64>();
3298        let port_id = port_handle.bind().port_id().clone();
3299        // Split it
3300        let reducer_spec = accum::sum::<u64>().reducer_spec();
3301        let split_port_id = port_id.split(&actor, reducer_spec).unwrap();
3302
3303        // Send 9 messages.
3304        for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3305            post(&actor, split_port_id.clone(), msg);
3306        }
3307        // The first 5 should be batched and reduced once due
3308        // to every_n_msgs = 5.
3309        let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3310            .await
3311            .unwrap();
3312        assert_eq!(messages, vec![15]);
3313
3314        // the last message unfortranately will never come because they do not
3315        // reach batch size.
3316        RealClock.sleep(Duration::from_secs(2)).await;
3317        let msg = receiver.try_recv().unwrap();
3318        assert_eq!(msg, None);
3319    }
3320}