hyperactor/
mailbox.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mailboxes are the central message-passing mechanism in Hyperactor.
10//!
11//! Each actor owns a mailbox to which other actors can deliver messages.
12//! An actor can open one or more typed _ports_ in the mailbox; messages
13//! are in turn delivered to specific ports.
14//!
15//! Mailboxes are associated with an [`ActorId`] (given by `actor_id`
16//! in the following example):
17//!
18//! ```
19//! # use hyperactor::mailbox::Mailbox;
20//! # use hyperactor::reference::{ActorId, ProcId, WorldId};
21//! # tokio_test::block_on(async {
22//! # let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
23//! # let actor_id = ActorId(proc_id, "actor".to_string(), 0);
24//! let mbox = Mailbox::new_detached(actor_id);
25//! let (port, mut receiver) = mbox.open_port::<u64>();
26//!
27//! port.send(123).unwrap();
28//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
29//! # })
30//! ```
31//!
32//! Mailboxes also provide a form of one-shot ports, called [`OncePort`],
33//! that permits at most one message transmission:
34//!
35//! ```
36//! # use hyperactor::mailbox::Mailbox;
37//! # use hyperactor::reference::{ActorId, ProcId, WorldId};
38//! # tokio_test::block_on(async {
39//! # let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
40//! # let actor_id = ActorId(proc_id, "actor".to_string(), 0);
41//! let mbox = Mailbox::new_detached(actor_id);
42//!
43//! let (port, receiver) = mbox.open_once_port::<u64>();
44//!
45//! port.send(123u64).unwrap();
46//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
47//! # })
48//! ```
49//!
50//! [`OncePort`]s are correspondingly used for RPC replies in the actor
51//! system.
52//!
53//! ## Remote ports and serialization
54//!
55//! Mailboxes allow delivery of serialized messages to named ports:
56//!
57//! 1) Ports restrict message types to (serializable) [`Message`]s.
58//! 2) Each [`Port`] is associated with a [`PortId`] which globally names the port.
59//! 3) [`Mailbox`] provides interfaces to deliver serialized
60//!    messages to ports named by their [`PortId`].
61//!
62//! While this complicates the interface somewhat, it allows the
63//! implementation to avoid a serialization roundtrip when passing
64//! messages locally.
65
66#![allow(dead_code)] // Allow until this is used outside of tests.
67
68use std::any::Any;
69use std::collections::BTreeMap;
70use std::collections::BTreeSet;
71use std::fmt;
72use std::fmt::Debug;
73use std::future::Future;
74use std::ops::Bound::Excluded;
75use std::pin::Pin;
76use std::sync::Arc;
77use std::sync::LazyLock;
78use std::sync::Mutex;
79use std::sync::OnceLock;
80use std::sync::RwLock;
81use std::sync::Weak;
82use std::sync::atomic::AtomicU64;
83use std::sync::atomic::AtomicUsize;
84use std::sync::atomic::Ordering;
85use std::task::Context;
86use std::task::Poll;
87
88use async_trait::async_trait;
89use dashmap::DashMap;
90use dashmap::mapref::entry::Entry;
91use futures::Sink;
92use futures::Stream;
93use serde::Deserialize;
94use serde::Serialize;
95use serde::de::DeserializeOwned;
96use tokio::sync::mpsc;
97use tokio::sync::oneshot;
98use tokio::sync::watch;
99use tokio::task::JoinHandle;
100use tokio_util::sync::CancellationToken;
101
102use crate as hyperactor; // for macros
103use crate::Named;
104use crate::OncePortRef;
105use crate::PortRef;
106use crate::accum::Accumulator;
107use crate::accum::ReducerOpts;
108use crate::accum::ReducerSpec;
109use crate::actor::Signal;
110use crate::actor::remote::USER_PORT_OFFSET;
111use crate::attrs::Attrs;
112use crate::channel;
113use crate::channel::ChannelAddr;
114use crate::channel::ChannelError;
115use crate::channel::SendError;
116use crate::channel::TxStatus;
117use crate::context;
118use crate::data::Serialized;
119use crate::id;
120use crate::metrics;
121use crate::reference::ActorId;
122use crate::reference::PortId;
123use crate::reference::Reference;
124
125mod undeliverable;
126/// For [`Undeliverable`], a message type for delivery failures.
127pub use undeliverable::Undeliverable;
128pub use undeliverable::UndeliverableMessageError;
129pub use undeliverable::custom_monitored_return_handle;
130pub use undeliverable::monitored_return_handle; // TODO: Audit
131pub use undeliverable::supervise_undeliverable_messages;
132pub use undeliverable::supervise_undeliverable_messages_with;
133/// For [`MailboxAdminMessage`], a message type for mailbox administration.
134pub mod mailbox_admin_message;
135pub use mailbox_admin_message::MailboxAdminMessage;
136pub use mailbox_admin_message::MailboxAdminMessageHandler;
137/// For [`DurableMailboxSender`] a sender with a write-ahead log.
138pub mod durable_mailbox_sender;
139pub use durable_mailbox_sender::log;
140use durable_mailbox_sender::log::*;
141/// For message headers and latency tracking.
142pub mod headers;
143
144/// Message collects the necessary requirements for messages that are deposited
145/// into mailboxes.
146pub trait Message: Debug + Send + Sync + 'static {}
147impl<M: Debug + Send + Sync + 'static> Message for M {}
148
149/// RemoteMessage extends [`Message`] by requiring that the messages
150/// also be serializable, and can thus traverse process boundaries.
151/// RemoteMessages must also specify a globally unique type name (a URI).
152pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
153
154impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
155
156/// Type alias for bytestring data used throughout the system.
157pub type Data = Vec<u8>;
158
159/// Delivery errors occur during message posting.
160#[derive(
161    thiserror::Error,
162    Debug,
163    Serialize,
164    Deserialize,
165    Named,
166    Clone,
167    PartialEq
168)]
169pub enum DeliveryError {
170    /// The destination address is not reachable.
171    #[error("address not routable: {0}")]
172    Unroutable(String),
173
174    /// A broken link indicates that a link in the message
175    /// delivery path has failed.
176    #[error("broken link: {0}")]
177    BrokenLink(String),
178
179    /// A (local) mailbox delivery error.
180    #[error("mailbox error: {0}")]
181    Mailbox(String),
182
183    /// A multicast related delivery error.
184    #[error("multicast error: {0}")]
185    Multicast(String),
186
187    /// The message went through too many hops and has expired.
188    #[error("ttl expired")]
189    TtlExpired,
190}
191
192/// An envelope that carries a message destined to a remote actor.
193/// The envelope contains a serialized message along with its destination
194/// and sender.
195#[derive(Debug, Serialize, Deserialize, Clone, Named)]
196pub struct MessageEnvelope {
197    /// The sender of this message.
198    sender: ActorId,
199
200    /// The destination of the message.
201    dest: PortId,
202
203    /// The serialized message.
204    data: Serialized,
205
206    /// Error contains a delivery error when message delivery failed.
207    errors: Vec<DeliveryError>,
208
209    /// Additional context for this message.
210    headers: Attrs,
211
212    /// Decremented at every `MailboxSender` hop.
213    ttl: u8,
214    // TODO: add typename, source, seq, etc.
215}
216
217impl MessageEnvelope {
218    /// Create a new envelope with the provided sender, destination, and message.
219    pub fn new(sender: ActorId, dest: PortId, data: Serialized, headers: Attrs) -> Self {
220        Self {
221            sender,
222            dest,
223            data,
224            errors: Vec::new(),
225            headers,
226            ttl: crate::config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
227        }
228    }
229
230    /// Create a new envelope whose sender ID is unknown.
231    pub(crate) fn new_unknown(dest: PortId, data: Serialized) -> Self {
232        Self::new(id!(unknown[0].unknown), dest, data, Attrs::new())
233    }
234
235    /// Construct a new serialized value by serializing the provided T-typed value.
236    pub fn serialize<T: Serialize + Named>(
237        source: ActorId,
238        dest: PortId,
239        value: &T,
240        headers: Attrs,
241    ) -> Result<Self, crate::data::Error> {
242        Ok(Self {
243            headers,
244            data: Serialized::serialize(value)?,
245            sender: source,
246            dest,
247            errors: Vec::new(),
248            ttl: crate::config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
249        })
250    }
251
252    /// Returns the remaining time-to-live (TTL) for this message.
253    ///
254    /// The TTL is decremented at each `MailboxSender` hop. When it
255    /// reaches 0, the message is considered expired and is returned
256    /// to the sender as undeliverable.
257    pub fn ttl(&self) -> u8 {
258        self.ttl
259    }
260
261    /// Overrides the message’s time-to-live (TTL).
262    ///
263    /// This replaces the current TTL value (normally initialized from
264    /// `config::MESSAGE_TTL_DEFAULT`) with the provided `ttl`. The
265    /// updated envelope is returned for chaining.
266    ///
267    /// # Note
268    /// The TTL is decremented at each `MailboxSender` hop, and when
269    /// it reaches 0 the message will be treated as undeliverable.
270    pub fn set_ttl(mut self, ttl: u8) -> Self {
271        self.ttl = ttl;
272        self
273    }
274
275    /// Decrements the message's TTL by one hop.
276    ///
277    /// Returns `Ok(())` if the TTL was greater than zero and
278    /// successfully decremented. If the TTL was already zero, no
279    /// decrement occurs and `Err(DeliveryError::TtlExpired)` is
280    /// returned, indicating that the message has expired and should
281    /// be treated as undeliverable.
282    fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
283        if self.ttl == 0 {
284            Err(DeliveryError::TtlExpired)
285        } else {
286            self.ttl -= 1;
287            Ok(())
288        }
289    }
290
291    /// Deserialize the message in the envelope to the provided type T.
292    pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
293        self.data.deserialized()
294    }
295
296    /// The serialized message.
297    pub fn data(&self) -> &Serialized {
298        &self.data
299    }
300
301    /// The message sender.
302    pub fn sender(&self) -> &ActorId {
303        &self.sender
304    }
305
306    /// The destination of the message.
307    pub fn dest(&self) -> &PortId {
308        &self.dest
309    }
310
311    /// The message headers.
312    pub fn headers(&self) -> &Attrs {
313        &self.headers
314    }
315
316    /// Tells whether this is a signal message.
317    pub fn is_signal(&self) -> bool {
318        self.dest.index() == Signal::port()
319    }
320
321    /// Set a delivery error for the message. If errors are already set, append
322    /// it to the existing errors.
323    pub fn set_error(&mut self, error: DeliveryError) {
324        self.errors.push(error)
325    }
326
327    /// The message has been determined to be undeliverable with the
328    /// provided error. Mark the envelope with the error and return to
329    /// sender.
330    pub fn undeliverable(
331        mut self,
332        error: DeliveryError,
333        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
334    ) {
335        tracing::error!(
336            name = "undelivered_message_attempt",
337            sender = self.sender.to_string(),
338            dest = self.dest.to_string(),
339            error = error.to_string(),
340            return_handle = %return_handle,
341        );
342        metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
343            1,
344            hyperactor_telemetry::kv_pairs!(
345                "sender_actor_id" => self.sender.to_string(),
346                "dest_actor_id" => self.dest.to_string(),
347                "message_type" => self.data.typename().unwrap_or("unknown"),
348                "error_type" =>  error.to_string(),
349            ),
350        );
351
352        self.set_error(error);
353        undeliverable::return_undeliverable(return_handle, self);
354    }
355
356    /// Get the errors of why this message was undeliverable. Empty means this
357    /// message was not determined as undeliverable.
358    pub fn errors(&self) -> &Vec<DeliveryError> {
359        &self.errors
360    }
361
362    /// Get the string representation of the errors of this message was
363    /// undeliverable. None means this message was not determined as
364    /// undeliverable.
365    pub fn error_msg(&self) -> Option<String> {
366        if self.errors.is_empty() {
367            None
368        } else {
369            Some(
370                self.errors
371                    .iter()
372                    .map(|e| e.to_string())
373                    .collect::<Vec<_>>()
374                    .join("; "),
375            )
376        }
377    }
378
379    fn open(self) -> (MessageMetadata, Serialized) {
380        let Self {
381            sender,
382            dest,
383            data,
384            errors,
385            headers,
386            ttl,
387        } = self;
388
389        (
390            MessageMetadata {
391                sender,
392                dest,
393                errors,
394                headers,
395                ttl,
396            },
397            data,
398        )
399    }
400
401    fn seal(metadata: MessageMetadata, data: Serialized) -> Self {
402        let MessageMetadata {
403            sender,
404            dest,
405            errors,
406            headers,
407            ttl,
408        } = metadata;
409
410        Self {
411            sender,
412            dest,
413            data,
414            errors,
415            headers,
416            ttl,
417        }
418    }
419}
420
421impl fmt::Display for MessageEnvelope {
422    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423        match &self.error_msg() {
424            None => write!(f, "{} > {}: {}", self.sender, self.dest, self.data),
425            Some(err) => write!(
426                f,
427                "{} > {}: {}: delivery error: {}",
428                self.sender, self.dest, self.data, err
429            ),
430        }
431    }
432}
433
434/// Metadata about a message sent via a MessageEnvelope.
435#[derive(Clone)]
436pub struct MessageMetadata {
437    sender: ActorId,
438    dest: PortId,
439    errors: Vec<DeliveryError>,
440    headers: Attrs,
441    ttl: u8,
442}
443
444/// Errors that occur during mailbox operations. Each error is associated
445/// with the mailbox's actor id.
446#[derive(Debug)]
447pub struct MailboxError {
448    actor_id: ActorId,
449    kind: MailboxErrorKind,
450}
451
452/// The kinds of mailbox errors. This enum is marked non-exhaustive to
453/// allow for extensibility.
454#[derive(thiserror::Error, Debug)]
455#[non_exhaustive]
456pub enum MailboxErrorKind {
457    /// An operation was attempted on a closed mailbox.
458    #[error("mailbox closed")]
459    Closed,
460
461    /// The port associated with an operation was invalid.
462    #[error("invalid port: {0}")]
463    InvalidPort(PortId),
464
465    /// There was no sender associated with the port.
466    #[error("no sender for port: {0}")]
467    NoSenderForPort(PortId),
468
469    /// There was no local sender associated with the port.
470    /// Returned by operations that require a local port.
471    #[error("no local sender for port: {0}")]
472    NoLocalSenderForPort(PortId),
473
474    /// The port was closed.
475    #[error("{0}: port closed")]
476    PortClosed(PortId),
477
478    /// An error occured during a send operation.
479    #[error("send {0}: {1}")]
480    Send(PortId, #[source] anyhow::Error),
481
482    /// An error occured during a receive operation.
483    #[error("recv {0}: {1}")]
484    Recv(PortId, #[source] anyhow::Error),
485
486    /// There was a serialization failure.
487    #[error("serialize: {0}")]
488    Serialize(#[source] anyhow::Error),
489
490    /// There was a deserialization failure.
491    #[error("deserialize {0}: {1}")]
492    Deserialize(&'static str, anyhow::Error),
493
494    /// There was an error during a channel operation.
495    #[error(transparent)]
496    Channel(#[from] ChannelError),
497}
498
499impl MailboxError {
500    /// Create a new mailbox error associated with the provided actor
501    /// id and of the given kind.
502    pub fn new(actor_id: ActorId, kind: MailboxErrorKind) -> Self {
503        Self { actor_id, kind }
504    }
505
506    /// The ID of the mailbox producing this error.
507    pub fn actor_id(&self) -> &ActorId {
508        &self.actor_id
509    }
510
511    /// The error's kind.
512    pub fn kind(&self) -> &MailboxErrorKind {
513        &self.kind
514    }
515}
516
517impl fmt::Display for MailboxError {
518    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
519        write!(f, "{}: ", self.actor_id)?;
520        fmt::Display::fmt(&self.kind, f)
521    }
522}
523
524impl std::error::Error for MailboxError {
525    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
526        self.kind.source()
527    }
528}
529
530/// PortLocation describes the location of a port.
531/// This is used in errors to provide a uniform data type
532/// for ports that may or may not be bound.
533#[derive(Debug, Clone)]
534pub enum PortLocation {
535    /// The port was bound: the location is its underlying bound ID.
536    Bound(PortId),
537    /// The port was not bound: we provide the actor ID and the message type.
538    Unbound(ActorId, &'static str),
539}
540
541impl PortLocation {
542    fn new_unbound<M: Message>(actor_id: ActorId) -> Self {
543        PortLocation::Unbound(actor_id, std::any::type_name::<M>())
544    }
545
546    fn new_unbound_type(actor_id: ActorId, ty: &'static str) -> Self {
547        PortLocation::Unbound(actor_id, ty)
548    }
549
550    /// The actor id of the location.
551    pub fn actor_id(&self) -> &ActorId {
552        match self {
553            PortLocation::Bound(port_id) => port_id.actor_id(),
554            PortLocation::Unbound(actor_id, _) => actor_id,
555        }
556    }
557}
558
559impl fmt::Display for PortLocation {
560    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
561        match self {
562            PortLocation::Bound(port_id) => write!(f, "{}", port_id),
563            PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
564        }
565    }
566}
567
568/// Errors that that occur during mailbox sending operations. Each error
569/// is associated with the port ID of the operation.
570#[derive(Debug)]
571pub struct MailboxSenderError {
572    location: Box<PortLocation>,
573    kind: Box<MailboxSenderErrorKind>,
574}
575
576/// The kind of mailbox sending errors.
577#[derive(thiserror::Error, Debug)]
578pub enum MailboxSenderErrorKind {
579    /// Error during serialization.
580    #[error("serialization error: {0}")]
581    Serialize(anyhow::Error),
582
583    /// Error during deserialization.
584    #[error("deserialization error for type {0}: {1}")]
585    Deserialize(&'static str, anyhow::Error),
586
587    /// A send to an invalid port.
588    #[error("invalid port")]
589    Invalid,
590
591    /// A send to a closed port.
592    #[error("port closed")]
593    Closed,
594
595    // The following pass through underlying errors:
596    /// An underlying mailbox error.
597    #[error(transparent)]
598    Mailbox(#[from] MailboxError),
599
600    /// An underlying channel error.
601    #[error(transparent)]
602    Channel(#[from] ChannelError),
603
604    /// An underlying message log error.
605    #[error(transparent)]
606    MessageLog(#[from] MessageLogError),
607
608    /// An other, uncategorized error.
609    #[error("send error: {0}")]
610    Other(#[from] anyhow::Error),
611
612    /// The destination was unreachable.
613    #[error("unreachable: {0}")]
614    Unreachable(anyhow::Error),
615}
616
617impl MailboxSenderError {
618    /// Create a new mailbox sender error to an unbound port.
619    pub fn new_unbound<M>(actor_id: ActorId, kind: MailboxSenderErrorKind) -> Self {
620        Self {
621            location: Box::new(PortLocation::Unbound(actor_id, std::any::type_name::<M>())),
622            kind: Box::new(kind),
623        }
624    }
625
626    /// Create a new mailbox sender, manually providing the type.
627    pub fn new_unbound_type(
628        actor_id: ActorId,
629        kind: MailboxSenderErrorKind,
630        ty: &'static str,
631    ) -> Self {
632        Self {
633            location: Box::new(PortLocation::Unbound(actor_id, ty)),
634            kind: Box::new(kind),
635        }
636    }
637
638    /// Create a new mailbox sender error with the provided port ID and kind.
639    pub fn new_bound(port_id: PortId, kind: MailboxSenderErrorKind) -> Self {
640        Self {
641            location: Box::new(PortLocation::Bound(port_id)),
642            kind: Box::new(kind),
643        }
644    }
645
646    /// The location at which the error occured.
647    pub fn location(&self) -> &PortLocation {
648        &self.location
649    }
650
651    /// The kind associated with the error.
652    pub fn kind(&self) -> &MailboxSenderErrorKind {
653        &self.kind
654    }
655}
656
657impl fmt::Display for MailboxSenderError {
658    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
659        write!(f, "{}: ", self.location)?;
660        fmt::Display::fmt(&self.kind, f)
661    }
662}
663
664impl std::error::Error for MailboxSenderError {
665    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
666        self.kind.source()
667    }
668}
669
670/// MailboxSenders can send messages through ports to mailboxes. It
671/// provides a unified interface for message delivery in the system.
672pub trait MailboxSender: Send + Sync + Debug + Any {
673    /// Apply hop semantics (TTL decrement; undeliverable on 0), then
674    /// delegate to transport.
675    fn post(
676        &self,
677        mut envelope: MessageEnvelope,
678        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
679    ) {
680        if let Err(err) = envelope.dec_ttl_or_err() {
681            envelope.undeliverable(err, return_handle);
682            return;
683        }
684        self.post_unchecked(envelope, return_handle);
685    }
686
687    /// Raw transport: **no** policy.
688    fn post_unchecked(
689        &self,
690        envelope: MessageEnvelope,
691        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
692    );
693}
694
695/// PortSender extends [`MailboxSender`] by providing typed endpoints
696/// for sending messages over ports
697pub trait PortSender: MailboxSender {
698    /// Deliver a message to the provided port.
699    fn serialize_and_send<M: RemoteMessage>(
700        &self,
701        port: &PortRef<M>,
702        message: M,
703        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
704    ) -> Result<(), MailboxSenderError> {
705        // TODO: convert this to a undeliverable error also
706        let serialized = Serialized::serialize(&message).map_err(|err| {
707            MailboxSenderError::new_bound(
708                port.port_id().clone(),
709                MailboxSenderErrorKind::Serialize(err.into()),
710            )
711        })?;
712        self.post(
713            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
714            return_handle,
715        );
716        Ok(())
717    }
718
719    /// Deliver a message to a one-shot port, consuming the provided port,
720    /// which is not reusable.
721    fn serialize_and_send_once<M: RemoteMessage>(
722        &self,
723        once_port: OncePortRef<M>,
724        message: M,
725        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
726    ) -> Result<(), MailboxSenderError> {
727        let serialized = Serialized::serialize(&message).map_err(|err| {
728            MailboxSenderError::new_bound(
729                once_port.port_id().clone(),
730                MailboxSenderErrorKind::Serialize(err.into()),
731            )
732        })?;
733        self.post(
734            MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
735            return_handle,
736        );
737        Ok(())
738    }
739}
740
741impl<T: ?Sized + MailboxSender> PortSender for T {}
742
743/// A perpetually closed mailbox sender. Panics if any messages are posted.
744/// Useful for tests, or where there is no meaningful mailbox sender
745/// implementation available.
746#[derive(Debug, Clone)]
747pub struct PanickingMailboxSender;
748
749impl MailboxSender for PanickingMailboxSender {
750    fn post_unchecked(
751        &self,
752        envelope: MessageEnvelope,
753        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
754    ) {
755        panic!("panic! in the mailbox! attempted post: {}", envelope)
756    }
757}
758
759/// A mailbox sender for undeliverable messages. This will simply record
760/// any undelivered messages.
761#[derive(Debug)]
762pub struct UndeliverableMailboxSender;
763
764impl MailboxSender for UndeliverableMailboxSender {
765    fn post_unchecked(
766        &self,
767        envelope: MessageEnvelope,
768        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
769    ) {
770        let sender_name = envelope.sender.name();
771        let mut error_str = "".to_string();
772        if !envelope.errors.is_empty() {
773            error_str = envelope
774                .errors
775                .iter()
776                .map(|e| e.to_string())
777                .collect::<Vec<_>>()
778                .join("; ");
779        }
780        // The undeliverable message was unable to be delivered back to the
781        // sender for some reason
782        tracing::error!(
783            name = "undelivered_message_abandoned",
784            actor_name = sender_name,
785            actor_id = envelope.sender.to_string(),
786            dest = envelope.dest.to_string(),
787            headers = envelope.headers().to_string(), // todo: implement tracing::Value for Attrs
788            data = envelope.data().to_string(),
789            "message not delivered, {}",
790            error_str,
791        );
792    }
793}
794
795#[derive(Debug)]
796struct Buffer<T: Message> {
797    queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
798    processed: watch::Receiver<usize>,
799    seq: AtomicUsize,
800}
801
802impl<T: Message> Buffer<T> {
803    fn new<Fut>(
804        process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
805    ) -> Self
806    where
807        Fut: Future<Output = ()> + Send + 'static,
808    {
809        let (queue, mut next) = mpsc::unbounded_channel();
810        let (last_processed, processed) = watch::channel(0);
811        crate::init::get_runtime().spawn(async move {
812            let mut seq = 0;
813            while let Some((msg, return_handle)) = next.recv().await {
814                process(msg, return_handle).await;
815                seq += 1;
816                let _ = last_processed.send(seq);
817            }
818        });
819        Self {
820            queue,
821            processed,
822            seq: AtomicUsize::new(0),
823        }
824    }
825
826    #[allow(clippy::result_large_err)]
827    fn send(
828        &self,
829        item: (T, PortHandle<Undeliverable<T>>),
830    ) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
831        self.seq.fetch_add(1, Ordering::SeqCst);
832        self.queue.send(item)?;
833        Ok(())
834    }
835
836    async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
837        let seq = self.seq.load(Ordering::SeqCst);
838        while *self.processed.borrow_and_update() < seq {
839            self.processed.changed().await?;
840        }
841        Ok(())
842    }
843}
844
845static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
846    LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
847
848/// Convenience boxing implementation for MailboxSender. Most APIs
849/// are parameterized on MailboxSender implementations, and it's thus
850/// difficult to work with dyn values.  BoxedMailboxSender bridges this
851/// gap by providing a concrete MailboxSender which dispatches using an
852/// underlying (boxed) dyn.
853#[derive(Debug, Clone)]
854pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
855
856impl BoxedMailboxSender {
857    /// Create a new boxed sender given the provided sender implementation.
858    pub fn new(sender: impl MailboxSender + 'static) -> Self {
859        Self(Arc::new(sender))
860    }
861
862    /// Attempts to downcast the inner sender to the given concrete
863    /// type.
864    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
865        (&*self.0 as &dyn Any).downcast_ref::<T>()
866    }
867}
868
869/// Extension trait that creates a boxed clone of a MailboxSender.
870pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
871    /// A boxed clone of this MailboxSender.
872    fn boxed(&self) -> BoxedMailboxSender;
873}
874impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
875    fn boxed(&self) -> BoxedMailboxSender {
876        BoxedMailboxSender::new(self.clone())
877    }
878}
879
880/// Extension trait that rehomes a MailboxSender into a BoxedMailboxSender.
881pub trait IntoBoxedMailboxSender: MailboxSender {
882    /// Rehome this MailboxSender into a BoxedMailboxSender.
883    fn into_boxed(self) -> BoxedMailboxSender;
884}
885impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
886    fn into_boxed(self) -> BoxedMailboxSender {
887        BoxedMailboxSender::new(self)
888    }
889}
890
891impl MailboxSender for BoxedMailboxSender {
892    fn post_unchecked(
893        &self,
894        envelope: MessageEnvelope,
895        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
896    ) {
897        self.0.post_unchecked(envelope, return_handle);
898    }
899}
900
901/// Errors that occur during mailbox serving.
902#[derive(thiserror::Error, Debug)]
903pub enum MailboxServerError {
904    /// An underlying channel error.
905    #[error(transparent)]
906    Channel(#[from] ChannelError),
907
908    /// An underlying mailbox sender error.
909    #[error(transparent)]
910    MailboxSender(#[from] MailboxSenderError),
911}
912
913/// Represents a running [`MailboxServer`]. The handle composes a
914/// ['tokio::task::JoinHandle'] and may be joined in the same manner.
915#[derive(Debug)]
916pub struct MailboxServerHandle {
917    join_handle: JoinHandle<Result<(), MailboxServerError>>,
918    stopped_tx: watch::Sender<bool>,
919}
920
921impl MailboxServerHandle {
922    /// Signal the server to stop serving the mailbox. The caller should
923    /// join the handle by awaiting the [`MailboxServerHandle`] future.
924    ///
925    /// Stop should be called at most once.
926    pub fn stop(&self, reason: &str) {
927        tracing::info!("stopping mailbox server; reason: {}", reason);
928        self.stopped_tx.send(true).expect("stop called twice");
929    }
930}
931
932/// Forward future implementation to underlying handle.
933impl Future for MailboxServerHandle {
934    type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
935
936    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
937        // SAFETY: This is safe to do because self is pinned.
938        let join_handle_pinned =
939            unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
940        join_handle_pinned.poll(cx)
941    }
942}
943
944// A `MailboxServer` (such as a router) can receive a message
945// that couldn't reach its destination. We can use the fact that
946// servers are `MailboxSender`s to attempt to forward them back to
947// their senders.
948fn server_return_handle<T: MailboxServer>(server: T) -> PortHandle<Undeliverable<MessageEnvelope>> {
949    let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
950
951    tokio::task::spawn(async move {
952        while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
953            if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
954            {
955                // A non-returnable undeliverable.
956                UndeliverableMailboxSender.post(e, monitored_return_handle());
957                continue;
958            }
959            envelope.set_error(DeliveryError::BrokenLink(
960                "message was undeliverable".to_owned(),
961            ));
962            server.post(
963                MessageEnvelope::new(
964                    envelope.sender().clone(),
965                    PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
966                        envelope.sender(),
967                    )
968                    .port_id()
969                    .clone(),
970                    Serialized::serialize(&Undeliverable(envelope)).unwrap(),
971                    Attrs::new(),
972                ),
973                monitored_return_handle(),
974            );
975        }
976    });
977
978    return_handle
979}
980
981/// Serve a port on the provided [`channel::Rx`]. This dispatches all
982/// channel messages directly to the port.
983pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
984    /// Serve the provided port on the given channel on this sender on
985    /// a background task which may be joined with the returned handle.
986    /// The task fails on any send error.
987    fn serve(
988        self,
989        mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
990    ) -> MailboxServerHandle {
991        // A `MailboxServer` can receive a message that couldn't
992        // reach its destination. We can use the fact that servers are
993        // `MailboxSender`s to attempt to forward them back to their
994        // senders.
995        let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
996        let server = self.clone();
997        tokio::task::spawn(async move {
998            while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
999                if let Ok(Undeliverable(e)) =
1000                    envelope.deserialized::<Undeliverable<MessageEnvelope>>()
1001                {
1002                    // A non-returnable undeliverable.
1003                    UndeliverableMailboxSender.post(e, monitored_return_handle());
1004                    continue;
1005                }
1006                envelope.set_error(DeliveryError::BrokenLink(
1007                    "message was undeliverable".to_owned(),
1008                ));
1009                server.post(
1010                    MessageEnvelope::new(
1011                        envelope.sender().clone(),
1012                        PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1013                            envelope.sender(),
1014                        )
1015                        .port_id()
1016                        .clone(),
1017                        Serialized::serialize(&Undeliverable(envelope)).unwrap(),
1018                        Attrs::new(),
1019                    ),
1020                    monitored_return_handle(),
1021                );
1022            }
1023        });
1024
1025        let (stopped_tx, mut stopped_rx) = watch::channel(false);
1026        let join_handle = tokio::spawn(async move {
1027            let mut detached = false;
1028
1029            loop {
1030                if *stopped_rx.borrow_and_update() {
1031                    break Ok(());
1032                }
1033
1034                tokio::select! {
1035                    message = rx.recv() => {
1036                        match message {
1037                            // Relay the message to the port directly.
1038                            Ok(envelope) => self.post(envelope, return_handle.clone()),
1039
1040                            // Closed is a "graceful" error in this case.
1041                            // We simply stop serving.
1042                            Err(ChannelError::Closed) => break Ok(()),
1043                            Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1044                        }
1045                    }
1046                    result = stopped_rx.changed(), if !detached  => {
1047                        tracing::debug!(
1048                            "the mailbox server is stopped"
1049                        );
1050                        detached = result.is_err();
1051                    }
1052                }
1053            }
1054        });
1055
1056        MailboxServerHandle {
1057            join_handle,
1058            stopped_tx,
1059        }
1060    }
1061}
1062
1063impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1064
1065/// A mailbox server client that transmits messages on a Tx channel.
1066#[derive(Debug)]
1067pub struct MailboxClient {
1068    // The unbounded sender.
1069    buffer: Buffer<MessageEnvelope>,
1070
1071    // To cancel monitoring tx health.
1072    _tx_monitoring: CancellationToken,
1073}
1074
1075impl MailboxClient {
1076    /// Create a new client that sends messages destined for a
1077    /// [`MailboxServer`] on the provided Tx channel.
1078    pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1079        let addr = tx.addr();
1080        let tx = Arc::new(tx);
1081        let tx_status = tx.status().clone();
1082        let tx_monitoring = CancellationToken::new();
1083        let buffer = Buffer::new(move |envelope, return_handle| {
1084            let tx = Arc::clone(&tx);
1085            let (return_channel, return_receiver) = oneshot::channel();
1086            // Set up for delivery failure.
1087            let return_handle_0 = return_handle.clone();
1088            tokio::spawn(async move {
1089                let result = return_receiver.await;
1090                if let Ok(message) = result {
1091                    let _ = return_handle_0.send(Undeliverable(message));
1092                } else {
1093                    // Sender dropped, this task can end.
1094                }
1095            });
1096            // Send the message for transmission.
1097            let return_handle_1 = return_handle.clone();
1098            async move {
1099                if let Err(SendError(_, envelope)) = tx.try_post(envelope, return_channel) {
1100                    // Failed to enqueue.
1101                    envelope.undeliverable(
1102                        DeliveryError::BrokenLink("failed to enqueue in MailboxClient".to_string()),
1103                        return_handle_1.clone(),
1104                    );
1105                }
1106            }
1107        });
1108        let this = Self {
1109            buffer,
1110            _tx_monitoring: tx_monitoring.clone(),
1111        };
1112        Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1113        this
1114    }
1115
1116    /// Convenience constructor, to set up a mailbox client that forwards messages
1117    /// to the provided address.
1118    pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1119        Ok(MailboxClient::new(channel::dial(addr)?))
1120    }
1121
1122    // Set up a watch for the tx's health.
1123    fn monitor_tx_health(
1124        mut rx: watch::Receiver<TxStatus>,
1125        cancel_token: CancellationToken,
1126        addr: ChannelAddr,
1127    ) {
1128        crate::init::get_runtime().spawn(async move {
1129            loop {
1130                tokio::select! {
1131                    changed = rx.changed() => {
1132                        if changed.is_err() || *rx.borrow() == TxStatus::Closed {
1133                            tracing::warn!("connection to {} lost", addr);
1134                            // TODO: Potential for supervision event
1135                            // interaction here.
1136                            break;
1137                        }
1138                    }
1139                    _ = cancel_token.cancelled() => {
1140                        break;
1141                    }
1142                }
1143            }
1144        });
1145    }
1146}
1147
1148impl MailboxSender for MailboxClient {
1149    fn post_unchecked(
1150        &self,
1151        envelope: MessageEnvelope,
1152        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1153    ) {
1154        tracing::event!(target:"messages", tracing::Level::DEBUG,  "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1155        if let Err(mpsc::error::SendError((envelope, return_handle))) =
1156            self.buffer.send((envelope, return_handle))
1157        {
1158            let err = DeliveryError::BrokenLink("failed to enqueue in MailboxClient".to_string());
1159
1160            // Failed to enqueue.
1161            envelope.undeliverable(err, return_handle);
1162        }
1163    }
1164}
1165
1166/// Wrapper to turn `PortRef` into a `Sink`.
1167pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1168    cx: C,
1169    port: PortRef<M>,
1170}
1171
1172impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1173    /// Create new PortSink
1174    pub fn new(cx: C, port: PortRef<M>) -> Self {
1175        Self { cx, port }
1176    }
1177}
1178
1179impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1180    type Error = MailboxSenderError;
1181
1182    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1183        Poll::Ready(Ok(()))
1184    }
1185
1186    fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1187        self.port.send(&self.cx, item)
1188    }
1189
1190    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1191        Poll::Ready(Ok(()))
1192    }
1193
1194    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1195        Poll::Ready(Ok(()))
1196    }
1197}
1198
1199/// A mailbox coordinates message delivery to actors through typed
1200/// [`Port`]s associated with the mailbox.
1201#[derive(Clone, Debug)]
1202pub struct Mailbox {
1203    inner: Arc<State>,
1204}
1205
1206impl Mailbox {
1207    /// Create a new mailbox associated with the provided actor ID, using the provided
1208    /// forwarder for external destinations.
1209    pub fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
1210        Self {
1211            inner: Arc::new(State::new(actor_id, forwarder)),
1212        }
1213    }
1214
1215    /// Create a new detached mailbox associated with the provided actor ID.
1216    pub fn new_detached(actor_id: ActorId) -> Self {
1217        Self {
1218            inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1219        }
1220    }
1221
1222    /// The actor id associated with this mailbox.
1223    pub fn actor_id(&self) -> &ActorId {
1224        &self.inner.actor_id
1225    }
1226
1227    /// Open a new port that accepts M-typed messages. The returned
1228    /// port may be freely cloned, serialized, and passed around. The
1229    /// returned receiver should only be retained by the actor responsible
1230    /// for processing the delivered messages.
1231    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1232        let port_index = self.inner.allocate_port();
1233        let (sender, receiver) = mpsc::unbounded_channel::<M>();
1234        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1235        tracing::trace!(
1236            name = "open_port",
1237            "opening port for {} at {}",
1238            self.inner.actor_id,
1239            port_id
1240        );
1241        (
1242            PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1243            PortReceiver::new(receiver, port_id, /*coalesce=*/ false, self.clone()),
1244        )
1245    }
1246
1247    /// Open a new port with an accumulator. This port accepts A::Update type
1248    /// messages, accumulate them into A::State with the given accumulator.
1249    /// The latest changed state can be received from the returned receiver as
1250    /// a single A::State message. If there is no new update, the receiver will
1251    /// not receive any message.
1252    pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1253    where
1254        A: Accumulator + Send + Sync + 'static,
1255        A::Update: Message,
1256        A::State: Message + Default + Clone,
1257    {
1258        let port_index = self.inner.allocate_port();
1259        let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1260        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1261        let state = Mutex::new(A::State::default());
1262        let reducer_spec = accum.reducer_spec();
1263        let enqueue = move |_, update: A::Update| {
1264            let mut state = state.lock().unwrap();
1265            accum.accumulate(&mut state, update)?;
1266            let _ = sender.send(state.clone());
1267            Ok(())
1268        };
1269        (
1270            PortHandle {
1271                mailbox: self.clone(),
1272                port_index,
1273                sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1274                bound: Arc::new(OnceLock::new()),
1275                reducer_spec,
1276                reducer_opts: None, // TODO: provide open_accum_port_opts
1277            },
1278            PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()),
1279        )
1280    }
1281
1282    /// Open a port that accepts M-typed messages, using the provided function
1283    /// to enqueue.
1284    // TODO: consider making lifetime bound to Self instead.
1285    pub(crate) fn open_enqueue_port<M: Message>(
1286        &self,
1287        enqueue: impl Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1288    ) -> PortHandle<M> {
1289        PortHandle {
1290            mailbox: self.clone(),
1291            port_index: self.inner.allocate_port(),
1292            sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1293            bound: Arc::new(OnceLock::new()),
1294            reducer_spec: None,
1295            reducer_opts: None,
1296        }
1297    }
1298
1299    /// Open a new one-shot port that accepts M-typed messages. The
1300    /// returned port may be used to send a single message; ditto the
1301    /// receiver may receive a single message.
1302    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1303        let port_index = self.inner.allocate_port();
1304        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1305        let (sender, receiver) = oneshot::channel::<M>();
1306        (
1307            OncePortHandle {
1308                mailbox: self.clone(),
1309                port_index,
1310                port_id: port_id.clone(),
1311                sender,
1312            },
1313            OncePortReceiver {
1314                receiver: Some(receiver),
1315                port_id,
1316                mailbox: self.clone(),
1317            },
1318        )
1319    }
1320
1321    fn error(&self, err: MailboxErrorKind) -> MailboxError {
1322        MailboxError::new(self.inner.actor_id.clone(), err)
1323    }
1324
1325    fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1326        let port_index = M::port();
1327        self.inner.ports.get(&port_index).and_then(|boxed| {
1328            boxed
1329                .as_any()
1330                .downcast_ref::<UnboundedSender<M>>()
1331                .map(|s| {
1332                    assert_eq!(
1333                        s.port_id,
1334                        self.actor_id().port_id(port_index),
1335                        "port_id mismatch in downcasted UnboundedSender"
1336                    );
1337                    s.sender.clone()
1338                })
1339        })
1340    }
1341
1342    /// Retrieve the bound undeliverable message port handle.
1343    pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1344        self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1345            .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1346    }
1347
1348    pub(crate) fn allocate_port(&self) -> u64 {
1349        self.inner.allocate_port()
1350    }
1351
1352    fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
1353        assert_eq!(
1354            handle.mailbox.actor_id(),
1355            self.actor_id(),
1356            "port does not belong to mailbox"
1357        );
1358
1359        // TODO: don't even allocate a port until the port is bound. Possibly
1360        // have handles explicitly staged (unbound, bound).
1361        let port_id = self.actor_id().port_id(handle.port_index);
1362        match self.inner.ports.entry(handle.port_index) {
1363            Entry::Vacant(entry) => {
1364                entry.insert(Box::new(UnboundedSender::new(
1365                    handle.sender.clone(),
1366                    port_id.clone(),
1367                )));
1368            }
1369            Entry::Occupied(_entry) => {}
1370        }
1371
1372        PortRef::attest(port_id)
1373    }
1374
1375    fn bind_to<M: RemoteMessage>(&self, handle: &PortHandle<M>, port_index: u64) {
1376        assert_eq!(
1377            handle.mailbox.actor_id(),
1378            self.actor_id(),
1379            "port does not belong to mailbox"
1380        );
1381
1382        let port_id = self.actor_id().port_id(port_index);
1383        match self.inner.ports.entry(port_index) {
1384            Entry::Vacant(entry) => {
1385                entry.insert(Box::new(UnboundedSender::new(
1386                    handle.sender.clone(),
1387                    port_id,
1388                )));
1389            }
1390            Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1391        }
1392    }
1393
1394    fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1395        let port_id = handle.port_id().clone();
1396        match self.inner.ports.entry(handle.port_index) {
1397            Entry::Vacant(entry) => {
1398                entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1399            }
1400            Entry::Occupied(_entry) => {}
1401        }
1402    }
1403
1404    pub(crate) fn bind_untyped(&self, port_id: &PortId, sender: UntypedUnboundedSender) {
1405        assert_eq!(
1406            port_id.actor_id(),
1407            self.actor_id(),
1408            "port does not belong to mailbox"
1409        );
1410
1411        match self.inner.ports.entry(port_id.index()) {
1412            Entry::Vacant(entry) => {
1413                entry.insert(Box::new(sender));
1414            }
1415            Entry::Occupied(_entry) => {}
1416        }
1417    }
1418}
1419
1420impl context::Mailbox for Mailbox {
1421    fn mailbox(&self) -> &Mailbox {
1422        self
1423    }
1424}
1425
1426// TODO: figure out what to do with these interfaces -- possibly these caps
1427// do not have to be private.
1428
1429/// Open a port given a capability.
1430pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1431    cx.mailbox().open_port()
1432}
1433
1434/// Open a one-shot port given a capability. This is a public method primarily to
1435/// enable macro-generated clients.
1436pub fn open_once_port<M: Message>(
1437    cx: &impl context::Mailbox,
1438) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1439    cx.mailbox().open_once_port()
1440}
1441
1442impl MailboxSender for Mailbox {
1443    /// Deliver a serialized message to the provided port ID. This method fails
1444    /// if the message does not deserialize into the expected type.
1445    fn post_unchecked(
1446        &self,
1447        envelope: MessageEnvelope,
1448        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1449    ) {
1450        metrics::MAILBOX_POSTS.add(
1451            1,
1452            hyperactor_telemetry::kv_pairs!(
1453                "actor_id" => envelope.sender.to_string(),
1454                "dest_actor_id" => envelope.dest.0.to_string(),
1455            ),
1456        );
1457        tracing::trace!(
1458            name = "post",
1459            actor_name = envelope.sender.name(),
1460            actor_id = envelope.sender.to_string(),
1461            "posting message to {}",
1462            envelope.dest
1463        );
1464
1465        if envelope.dest().actor_id() != &self.inner.actor_id {
1466            return self.inner.forwarder.post(envelope, return_handle);
1467        }
1468
1469        match self.inner.ports.entry(envelope.dest().index()) {
1470            Entry::Vacant(_) => {
1471                let err = DeliveryError::Unroutable("port not bound in mailbox".to_string());
1472
1473                envelope.undeliverable(err, return_handle);
1474            }
1475            Entry::Occupied(entry) => {
1476                let (metadata, data) = envelope.open();
1477                let MessageMetadata {
1478                    headers,
1479                    sender,
1480                    dest,
1481                    errors: metadata_errors,
1482                    ttl,
1483                } = metadata;
1484
1485                // We use the entry API here so that we can remove the
1486                // entry while holding an (entry) reference. The DashMap
1487                // documentation suggests that deadlocks are possible
1488                // "when holding any sort of reference into the map",
1489                // but surely this applies only to the same thread? This
1490                // would also imply we have to be careful holding any
1491                // sort of reference across .await points.
1492                match entry.get().send_serialized(headers, data) {
1493                    Ok(false) => {
1494                        entry.remove();
1495                    }
1496                    Ok(true) => (),
1497                    Err(SerializedSenderError {
1498                        data,
1499                        error: sender_error,
1500                        headers,
1501                    }) => {
1502                        let err = DeliveryError::Mailbox(format!("{}", sender_error));
1503
1504                        MessageEnvelope::seal(
1505                            MessageMetadata {
1506                                headers,
1507                                sender,
1508                                dest,
1509                                errors: metadata_errors,
1510                                ttl,
1511                            },
1512                            data,
1513                        )
1514                        .undeliverable(err, return_handle)
1515                    }
1516                }
1517            }
1518        }
1519    }
1520}
1521
1522/// A port to which M-typed messages can be delivered. Ports may be
1523/// serialized to be sent to other actors. However, when a port is
1524/// deserialized, it may no longer be used to send messages directly
1525/// to a mailbox since it is no longer associated with a local mailbox
1526/// ([`Mailbox::send`] will fail). However, the runtime may accept
1527/// remote Ports, and arrange for these messages to be delivered
1528/// indirectly through inter-node message passing.
1529#[derive(Debug)]
1530pub struct PortHandle<M: Message> {
1531    mailbox: Mailbox,
1532    port_index: u64,
1533    sender: UnboundedPortSender<M>,
1534    // We would like this to be a Arc<OnceLock<PortRef<M>>>, but we cannot
1535    // write down the type PortRef<M> (M: Message), even though we cannot
1536    // legally construct such a value without M: RemoteMessage. We could consider
1537    // making PortRef<M> valid for M: Message, but constructible only for
1538    // M: RemoteMessage, but the guarantees offered by the impossibilty of even
1539    // writing down the type are appealing.
1540    bound: Arc<OnceLock<PortId>>,
1541    // Typehash of an optional reducer. When it's defined, we include it in port
1542    /// references to optionally enable incremental accumulation.
1543    reducer_spec: Option<ReducerSpec>,
1544    /// Reduction options. If unspecified, we use `ReducerOpts::default`.
1545    reducer_opts: Option<ReducerOpts>,
1546}
1547
1548impl<M: Message> PortHandle<M> {
1549    fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1550        Self {
1551            mailbox,
1552            port_index,
1553            sender,
1554            bound: Arc::new(OnceLock::new()),
1555            reducer_spec: None,
1556            reducer_opts: None,
1557        }
1558    }
1559
1560    fn location(&self) -> PortLocation {
1561        match self.bound.get() {
1562            Some(port_id) => PortLocation::Bound(port_id.clone()),
1563            None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1564        }
1565    }
1566
1567    /// Send a message to this port.
1568    pub fn send(&self, message: M) -> Result<(), MailboxSenderError> {
1569        let mut headers = Attrs::new();
1570
1571        crate::mailbox::headers::set_send_timestamp(&mut headers);
1572
1573        self.sender.send(headers, message).map_err(|err| {
1574            MailboxSenderError::new_unbound::<M>(
1575                self.mailbox.actor_id().clone(),
1576                MailboxSenderErrorKind::Other(err),
1577            )
1578        })
1579    }
1580
1581    /// A contravariant map: using the provided function to translate
1582    /// `R`-typed messages to `M`-typed ones, delivered on this port.
1583    pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
1584    where
1585        R: Message,
1586        F: Fn(R) -> M + Send + Sync + 'static,
1587    {
1588        let port_index = self.mailbox.inner.allocate_port();
1589        let sender = self.sender.clone();
1590        PortHandle::new(
1591            self.mailbox.clone(),
1592            port_index,
1593            UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
1594                sender.send(headers, unmap(value))
1595            })),
1596        )
1597    }
1598}
1599
1600impl<M: RemoteMessage> PortHandle<M> {
1601    /// Bind this port, making it accessible to remote actors.
1602    pub fn bind(&self) -> PortRef<M> {
1603        PortRef::attest_reducible(
1604            self.bound
1605                .get_or_init(|| self.mailbox.bind(self).port_id().clone())
1606                .clone(),
1607            self.reducer_spec.clone(),
1608        )
1609    }
1610
1611    /// Bind to a specific port index. This is used by [`actor::Binder`] implementations to
1612    /// bind actor refs. This is not intended for general use.
1613    pub fn bind_to(&self, port_index: u64) {
1614        self.mailbox.bind_to(self, port_index);
1615    }
1616}
1617
1618impl<M: Message> Clone for PortHandle<M> {
1619    fn clone(&self) -> Self {
1620        Self {
1621            mailbox: self.mailbox.clone(),
1622            port_index: self.port_index,
1623            sender: self.sender.clone(),
1624            bound: self.bound.clone(),
1625            reducer_spec: self.reducer_spec.clone(),
1626            reducer_opts: self.reducer_opts.clone(),
1627        }
1628    }
1629}
1630
1631impl<M: Message> fmt::Display for PortHandle<M> {
1632    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1633        fmt::Display::fmt(&self.location(), f)
1634    }
1635}
1636
1637/// A one-shot port handle to which M-typed messages can be delivered.
1638#[derive(Debug)]
1639pub struct OncePortHandle<M: Message> {
1640    mailbox: Mailbox,
1641    port_index: u64,
1642    port_id: PortId,
1643    sender: oneshot::Sender<M>,
1644}
1645
1646impl<M: Message> OncePortHandle<M> {
1647    /// This port's ID.
1648    // TODO: make value
1649    pub fn port_id(&self) -> &PortId {
1650        &self.port_id
1651    }
1652
1653    /// Send a message to this port. The send operation will consume the
1654    /// port handle, as the port accepts at most one message.
1655    pub fn send(self, message: M) -> Result<(), MailboxSenderError> {
1656        let actor_id = self.mailbox.actor_id().clone();
1657        self.sender.send(message).map_err(|_| {
1658            // Here, the value is returned when the port is
1659            // closed.  We should consider having a similar
1660            // API for send_once, though arguably it makes less
1661            // sense in this context.
1662            MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1663        })?;
1664        Ok(())
1665    }
1666}
1667
1668impl<M: RemoteMessage> OncePortHandle<M> {
1669    /// Turn this handle into a ref that may be passed to
1670    /// a remote actor. The remote actor can then use the
1671    /// ref to send a message to the port. Creating a ref also
1672    /// binds the port, so that it is remotely writable.
1673    pub fn bind(self) -> OncePortRef<M> {
1674        let port_id = self.port_id().clone();
1675        self.mailbox.clone().bind_once(self);
1676        OncePortRef::attest(port_id)
1677    }
1678}
1679
1680impl<M: Message> fmt::Display for OncePortHandle<M> {
1681    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1682        fmt::Display::fmt(&self.port_id(), f)
1683    }
1684}
1685
1686/// A receiver of M-typed messages, used by actors to receive messages
1687/// on open ports.
1688#[derive(Debug)]
1689pub struct PortReceiver<M> {
1690    receiver: mpsc::UnboundedReceiver<M>,
1691    port_id: PortId,
1692    /// When multiple messages are put in channel, only receive the latest one
1693    /// if coalesce is true. Other messages will be discarded.
1694    coalesce: bool,
1695    /// State is used to remove the port from service when the receiver
1696    /// is dropped.
1697    mailbox: Mailbox,
1698}
1699
1700impl<M> PortReceiver<M> {
1701    fn new(
1702        receiver: mpsc::UnboundedReceiver<M>,
1703        port_id: PortId,
1704        coalesce: bool,
1705        mailbox: Mailbox,
1706    ) -> Self {
1707        Self {
1708            receiver,
1709            port_id,
1710            coalesce,
1711            mailbox,
1712        }
1713    }
1714
1715    /// Tries to receive the next value for this receiver.
1716    /// This function returns `Ok(None)` if the receiver is empty
1717    /// and returns a MailboxError if the receiver is disconnected.
1718    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxError`.
1719    pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
1720        let mut next = self.receiver.try_recv();
1721        // To coalesce, drain the mpsc queue and only keep the last one.
1722        if self.coalesce
1723            && let Some(latest) = self.drain().pop()
1724        {
1725            next = Ok(latest);
1726        }
1727        match next {
1728            Ok(msg) => Ok(Some(msg)),
1729            Err(mpsc::error::TryRecvError::Empty) => Ok(None),
1730            Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
1731                self.actor_id().clone(),
1732                MailboxErrorKind::Closed,
1733            )),
1734        }
1735    }
1736
1737    /// Receive the next message from the port corresponding with this
1738    /// receiver.
1739    pub async fn recv(&mut self) -> Result<M, MailboxError> {
1740        let mut next = self.receiver.recv().await;
1741        // To coalesce, get the last message from the queue if there are
1742        // more on the mspc queue.
1743        if self.coalesce
1744            && let Some(latest) = self.drain().pop()
1745        {
1746            next = Some(latest);
1747        }
1748        next.ok_or(MailboxError::new(
1749            self.actor_id().clone(),
1750            MailboxErrorKind::Closed,
1751        ))
1752    }
1753
1754    /// Drains all available messages from the port.
1755    pub fn drain(&mut self) -> Vec<M> {
1756        let mut drained: Vec<M> = Vec::new();
1757        while let Ok(msg) = self.receiver.try_recv() {
1758            // To coalesce, discard the old message if there is any.
1759            if self.coalesce {
1760                drained.pop();
1761            }
1762            drained.push(msg);
1763        }
1764        drained
1765    }
1766
1767    fn port(&self) -> u64 {
1768        self.port_id.1
1769    }
1770
1771    fn actor_id(&self) -> &ActorId {
1772        &self.port_id.0
1773    }
1774}
1775
1776impl<M> Drop for PortReceiver<M> {
1777    fn drop(&mut self) {
1778        // MARIUS: do we need to tombstone these? or should we
1779        // error out if we have removed the receiver before serializing the port ref?
1780        // ("no longer live")?
1781        self.mailbox.inner.ports.remove(&self.port());
1782    }
1783}
1784
1785impl<M> Stream for PortReceiver<M> {
1786    type Item = Result<M, MailboxError>;
1787
1788    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1789        std::pin::pin!(self.recv()).poll(cx).map(Some)
1790    }
1791}
1792
1793/// A receiver of M-typed messages from [`OncePort`]s.
1794pub struct OncePortReceiver<M> {
1795    receiver: Option<oneshot::Receiver<M>>,
1796    port_id: PortId,
1797
1798    /// Mailbox is used to remove the port from service when the receiver
1799    /// is dropped.
1800    mailbox: Mailbox,
1801}
1802
1803impl<M> OncePortReceiver<M> {
1804    /// Receive message from the one-shot port associated with this
1805    /// receiver.  Recv consumes the receiver: it is no longer valid
1806    /// after this call.
1807    pub async fn recv(mut self) -> Result<M, MailboxError> {
1808        std::mem::take(&mut self.receiver)
1809            .unwrap()
1810            .await
1811            .map_err(|err| {
1812                MailboxError::new(
1813                    self.actor_id().clone(),
1814                    MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
1815                )
1816            })
1817    }
1818
1819    fn port(&self) -> u64 {
1820        self.port_id.1
1821    }
1822
1823    fn actor_id(&self) -> &ActorId {
1824        &self.port_id.0
1825    }
1826}
1827
1828impl<M> Drop for OncePortReceiver<M> {
1829    fn drop(&mut self) {
1830        // MARIUS: do we need to tombstone these? or should we
1831        // error out if we have removed the receiver before serializing the port ref?
1832        // ("no longer live")?
1833        self.mailbox.inner.ports.remove(&self.port());
1834    }
1835}
1836
1837/// Error that that occur during SerializedSender's send operation.
1838pub struct SerializedSenderError {
1839    /// The headers associated with the message.
1840    pub headers: Attrs,
1841    /// The message was tried to send.
1842    pub data: Serialized,
1843    /// The mailbox sender error that occurred.
1844    pub error: MailboxSenderError,
1845}
1846
1847/// SerializedSender encapsulates senders:
1848///   - It performs type erasure (and thus it is object-safe).
1849///   - It abstracts over [`Port`]s and [`OncePort`]s, by dynamically tracking the
1850///     validity of the underlying port.
1851trait SerializedSender: Send + Sync {
1852    /// Enables downcasting from `&dyn SerializedSender` to concrete
1853    /// types.
1854    ///
1855    /// Used by `Mailbox::lookup_sender` to downcast to
1856    /// `&UnboundedSender<M>` via `Any::downcast_ref`.
1857    fn as_any(&self) -> &dyn Any;
1858
1859    /// Send a serialized message. SerializedSender will deserialize the
1860    /// message (failing if it fails to deserialize), and then send the
1861    /// resulting message on the underlying port.
1862    ///
1863    /// Send_serialized returns true whenever the port remains valid
1864    /// after the send operation.
1865    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SerializedSender`.
1866    fn send_serialized(
1867        &self,
1868        headers: Attrs,
1869        serialized: Serialized,
1870    ) -> Result<bool, SerializedSenderError>;
1871}
1872
1873/// A sender to an M-typed unbounded port.
1874enum UnboundedPortSender<M: Message> {
1875    /// Send directly to the mpsc queue.
1876    Mpsc(mpsc::UnboundedSender<M>),
1877    /// Use the provided function to enqueue the item.
1878    Func(Arc<dyn Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
1879}
1880
1881impl<M: Message> UnboundedPortSender<M> {
1882    fn send(&self, headers: Attrs, message: M) -> Result<(), anyhow::Error> {
1883        match self {
1884            Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
1885            Self::Func(func) => func(headers, message),
1886        }
1887    }
1888}
1889
1890// We implement Clone manually as derive(Clone) places unnecessarily
1891// strict bounds on the type parameter M.
1892impl<M: Message> Clone for UnboundedPortSender<M> {
1893    fn clone(&self) -> Self {
1894        match self {
1895            Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
1896            Self::Func(func) => Self::Func(func.clone()),
1897        }
1898    }
1899}
1900
1901impl<M: Message> Debug for UnboundedPortSender<M> {
1902    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1903        match self {
1904            Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
1905            Self::Func(_) => f
1906                .debug_tuple("UnboundedPortSender::Func")
1907                .field(&"..")
1908                .finish(),
1909        }
1910    }
1911}
1912
1913struct UnboundedSender<M: Message> {
1914    sender: UnboundedPortSender<M>,
1915    port_id: PortId,
1916}
1917
1918impl<M: Message> UnboundedSender<M> {
1919    /// Create a new UnboundedSender encapsulating the provided
1920    /// sender.
1921    fn new(sender: UnboundedPortSender<M>, port_id: PortId) -> Self {
1922        Self { sender, port_id }
1923    }
1924
1925    fn send(&self, headers: Attrs, message: M) -> Result<(), MailboxSenderError> {
1926        self.sender.send(headers, message).map_err(|err| {
1927            MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
1928        })
1929    }
1930}
1931
1932// Clone is implemented explicitly because the derive macro demands M:
1933// Clone directly. In this case, it isn't needed because Arc<T> can
1934// clone for any T.
1935impl<M: Message> Clone for UnboundedSender<M> {
1936    fn clone(&self) -> Self {
1937        Self {
1938            sender: self.sender.clone(),
1939            port_id: self.port_id.clone(),
1940        }
1941    }
1942}
1943
1944impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
1945    fn as_any(&self) -> &dyn Any {
1946        self
1947    }
1948
1949    fn send_serialized(
1950        &self,
1951        headers: Attrs,
1952        serialized: Serialized,
1953    ) -> Result<bool, SerializedSenderError> {
1954        // Here, the stack ensures that this port is only instantiated for M-typed messages.
1955        // This does not protect against bad senders (e.g., encoding wrongly-typed messages),
1956        // but it is required as we have some usages that rely on representational equivalence
1957        // to provide type indexing, specifically in `IndexedErasedUnbound` which is used to
1958        // support port aggregation.
1959        match serialized.deserialized_unchecked() {
1960            Ok(message) => {
1961                self.sender.send(headers.clone(), message).map_err(|err| {
1962                    SerializedSenderError {
1963                        data: serialized,
1964                        error: MailboxSenderError::new_bound(
1965                            self.port_id.clone(),
1966                            MailboxSenderErrorKind::Other(err),
1967                        ),
1968                        headers,
1969                    }
1970                })?;
1971
1972                Ok(true)
1973            }
1974            Err(err) => Err(SerializedSenderError {
1975                data: serialized,
1976                error: MailboxSenderError::new_bound(
1977                    self.port_id.clone(),
1978                    MailboxSenderErrorKind::Deserialize(M::typename(), err),
1979                ),
1980                headers,
1981            }),
1982        }
1983    }
1984}
1985
1986/// OnceSender encapsulates an underlying one-shot sender, dynamically
1987/// tracking its validity.
1988#[derive(Debug)]
1989struct OnceSender<M: Message> {
1990    sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
1991    port_id: PortId,
1992}
1993
1994impl<M: Message> OnceSender<M> {
1995    /// Create a new OnceSender encapsulating the provided one-shot
1996    /// sender.
1997    fn new(sender: oneshot::Sender<M>, port_id: PortId) -> Self {
1998        Self {
1999            sender: Arc::new(Mutex::new(Some(sender))),
2000            port_id,
2001        }
2002    }
2003
2004    fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2005        // TODO: we should replace the sender on error
2006        match self.sender.lock().unwrap().take() {
2007            None => Err(MailboxSenderError::new_bound(
2008                self.port_id.clone(),
2009                MailboxSenderErrorKind::Closed,
2010            )),
2011            Some(sender) => {
2012                sender.send(message).map_err(|_| {
2013                    // Here, the value is returned when the port is
2014                    // closed.  We should consider having a similar
2015                    // API for send_once, though arguably it makes less
2016                    // sense in this context.
2017                    MailboxSenderError::new_bound(
2018                        self.port_id.clone(),
2019                        MailboxSenderErrorKind::Closed,
2020                    )
2021                })?;
2022                Ok(false)
2023            }
2024        }
2025    }
2026}
2027
2028// Clone is implemented explicitly because the derive macro demands M:
2029// Clone directly. In this case, it isn't needed because Arc<T> can
2030// clone for any T.
2031impl<M: Message> Clone for OnceSender<M> {
2032    fn clone(&self) -> Self {
2033        Self {
2034            sender: self.sender.clone(),
2035            port_id: self.port_id.clone(),
2036        }
2037    }
2038}
2039
2040impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2041    fn as_any(&self) -> &dyn Any {
2042        self
2043    }
2044
2045    fn send_serialized(
2046        &self,
2047        headers: Attrs,
2048        serialized: Serialized,
2049    ) -> Result<bool, SerializedSenderError> {
2050        match serialized.deserialized() {
2051            Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2052                data: serialized,
2053                error: e,
2054                headers,
2055            }),
2056            Err(err) => Err(SerializedSenderError {
2057                data: serialized,
2058                error: MailboxSenderError::new_bound(
2059                    self.port_id.clone(),
2060                    MailboxSenderErrorKind::Deserialize(M::typename(), err),
2061                ),
2062                headers,
2063            }),
2064        }
2065    }
2066}
2067
2068/// Use the provided function to send untyped messages (i.e. Serialized objects).
2069pub(crate) struct UntypedUnboundedSender {
2070    pub(crate) sender:
2071        Box<dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync>,
2072    pub(crate) port_id: PortId,
2073}
2074
2075impl SerializedSender for UntypedUnboundedSender {
2076    fn as_any(&self) -> &dyn Any {
2077        self
2078    }
2079
2080    fn send_serialized(
2081        &self,
2082        headers: Attrs,
2083        serialized: Serialized,
2084    ) -> Result<bool, SerializedSenderError> {
2085        (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2086            data,
2087            error: MailboxSenderError::new_bound(
2088                self.port_id.clone(),
2089                MailboxSenderErrorKind::Other(err),
2090            ),
2091            headers,
2092        })?;
2093
2094        Ok(true)
2095    }
2096}
2097
2098/// State is the internal state of the mailbox.
2099struct State {
2100    /// The ID of the mailbox owner.
2101    actor_id: ActorId,
2102
2103    // insert if it's serializable; otherwise don't.
2104    /// The set of active ports in the mailbox. All currently
2105    /// allocated ports are
2106    ports: DashMap<u64, Box<dyn SerializedSender>>,
2107
2108    /// The next port ID to allocate.
2109    next_port: AtomicU64,
2110
2111    /// The forwarder for this mailbox.
2112    forwarder: BoxedMailboxSender,
2113}
2114
2115impl State {
2116    /// Create a new state with the provided owning ActorId.
2117    fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
2118        Self {
2119            actor_id,
2120            ports: DashMap::new(),
2121            // The first 1024 ports are allocated to actor handlers.
2122            // Other port IDs are ephemeral.
2123            next_port: AtomicU64::new(USER_PORT_OFFSET),
2124            forwarder,
2125        }
2126    }
2127
2128    /// Allocate a fresh port.
2129    fn allocate_port(&self) -> u64 {
2130        self.next_port.fetch_add(1, Ordering::SeqCst)
2131    }
2132}
2133
2134impl fmt::Debug for State {
2135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2136        f.debug_struct("State")
2137            .field("actor_id", &self.actor_id)
2138            .field(
2139                "open_ports",
2140                &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2141            )
2142            .field("next_port", &self.next_port)
2143            .finish()
2144    }
2145}
2146
2147// TODO: mux based on some parameterized type. (mux key).
2148/// An in-memory mailbox muxer. This is used to route messages to
2149/// different underlying senders.
2150#[derive(Debug, Clone)]
2151pub struct MailboxMuxer {
2152    mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2153}
2154
2155impl Default for MailboxMuxer {
2156    fn default() -> Self {
2157        Self::new()
2158    }
2159}
2160
2161impl MailboxMuxer {
2162    /// Create a new, empty, muxer.
2163    pub fn new() -> Self {
2164        Self {
2165            mailboxes: Arc::new(DashMap::new()),
2166        }
2167    }
2168
2169    /// Route messages destined for the provided actor id to the provided
2170    /// sender. Returns false if there is already a sender associated
2171    /// with the actor. In this case, the sender is not replaced, and
2172    /// the caller must [`MailboxMuxer::unbind`] it first.
2173    pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
2174        match self.mailboxes.entry(actor_id) {
2175            Entry::Occupied(_) => false,
2176            Entry::Vacant(entry) => {
2177                entry.insert(Box::new(sender));
2178                true
2179            }
2180        }
2181    }
2182
2183    /// Convenience function to bind a mailbox.
2184    pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2185        self.bind(mailbox.actor_id().clone(), mailbox)
2186    }
2187
2188    /// Unbind the sender associated with the provided actor ID. After
2189    /// unbinding, the muxer will no longer be able to send messages to
2190    /// that actor.
2191    pub(crate) fn unbind(&self, actor_id: &ActorId) {
2192        self.mailboxes.remove(actor_id);
2193    }
2194
2195    /// Returns a list of all actors bound to this muxer. Useful in debugging.
2196    pub fn bound_actors(&self) -> Vec<ActorId> {
2197        self.mailboxes.iter().map(|e| e.key().clone()).collect()
2198    }
2199}
2200
2201impl MailboxSender for MailboxMuxer {
2202    fn post_unchecked(
2203        &self,
2204        envelope: MessageEnvelope,
2205        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2206    ) {
2207        let dest_actor_id = envelope.dest().actor_id();
2208        match self.mailboxes.get(envelope.dest().actor_id()) {
2209            None => {
2210                let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2211                envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2212            }
2213            Some(sender) => sender.post(envelope, return_handle),
2214        }
2215    }
2216}
2217
2218/// MailboxRouter routes messages to the sender that is bound to its
2219/// nearest prefix.
2220#[derive(Debug, Clone)]
2221pub struct MailboxRouter {
2222    entries: Arc<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2223}
2224
2225impl Default for MailboxRouter {
2226    fn default() -> Self {
2227        Self::new()
2228    }
2229}
2230
2231impl MailboxRouter {
2232    /// Create a new, empty router.
2233    pub fn new() -> Self {
2234        Self {
2235            entries: Arc::new(RwLock::new(BTreeMap::new())),
2236        }
2237    }
2238
2239    /// Downgrade this router to a [`WeakMailboxRouter`].
2240    pub fn downgrade(&self) -> WeakMailboxRouter {
2241        WeakMailboxRouter(Arc::downgrade(&self.entries))
2242    }
2243
2244    /// Returns a new router that will first attempt to find a route for the message
2245    /// in the router's table; otherwise post the message to the provided fallback
2246    /// sender.
2247    pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2248        FallbackMailboxRouter {
2249            router: self.clone(),
2250            default,
2251        }
2252    }
2253
2254    /// Bind the provided sender to the given reference. The destination
2255    /// is treated as a prefix to which messages can be routed, and
2256    /// messages are routed to their longest matching prefix.
2257    pub fn bind(&self, dest: Reference, sender: impl MailboxSender + 'static) {
2258        let mut w = self.entries.write().unwrap();
2259        w.insert(dest, Arc::new(sender));
2260    }
2261
2262    fn sender(&self, actor_id: &ActorId) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2263        match self
2264            .entries
2265            .read()
2266            .unwrap()
2267            .lower_bound(Excluded(&actor_id.clone().into()))
2268            .prev()
2269        {
2270            None => None,
2271            Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2272                Some(sender.clone())
2273            }
2274            Some(_) => None,
2275        }
2276    }
2277}
2278
2279impl MailboxSender for MailboxRouter {
2280    fn post_unchecked(
2281        &self,
2282        envelope: MessageEnvelope,
2283        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2284    ) {
2285        match self.sender(envelope.dest().actor_id()) {
2286            None => envelope.undeliverable(
2287                DeliveryError::Unroutable(
2288                    "no destination found for actor in routing table".to_string(),
2289                ),
2290                return_handle,
2291            ),
2292            Some(sender) => sender.post(envelope, return_handle),
2293        }
2294    }
2295}
2296
2297#[derive(Debug, Clone)]
2298struct FallbackMailboxRouter {
2299    router: MailboxRouter,
2300    default: BoxedMailboxSender,
2301}
2302
2303impl MailboxSender for FallbackMailboxRouter {
2304    fn post_unchecked(
2305        &self,
2306        envelope: MessageEnvelope,
2307        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2308    ) {
2309        match self.router.sender(envelope.dest().actor_id()) {
2310            Some(sender) => sender.post(envelope, return_handle),
2311            None => self.default.post(envelope, return_handle),
2312        }
2313    }
2314}
2315
2316/// A version of [`MailboxRouter`] that holds a weak reference to the underlying
2317/// state. This allows router references to be circular: an entity holding a reference
2318/// to the router may also contain the router itself.
2319///
2320/// TODO: this currently holds a weak reference to the entire router. This helps
2321/// prevent cycle leaks, but can cause excess memory usage as the cycle is at
2322/// the granularity of each entry. Possibly the router should allow weak references
2323/// on a per-entry basis.
2324#[derive(Debug, Clone)]
2325pub struct WeakMailboxRouter(
2326    Weak<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2327);
2328
2329impl WeakMailboxRouter {
2330    /// Upgrade the weak router to a strong reference router.
2331    pub fn upgrade(&self) -> Option<MailboxRouter> {
2332        self.0.upgrade().map(|entries| MailboxRouter { entries })
2333    }
2334}
2335
2336impl MailboxSender for WeakMailboxRouter {
2337    fn post_unchecked(
2338        &self,
2339        envelope: MessageEnvelope,
2340        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2341    ) {
2342        match self.upgrade() {
2343            Some(router) => router.post(envelope, return_handle),
2344            None => envelope.undeliverable(
2345                DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2346                return_handle,
2347            ),
2348        }
2349    }
2350}
2351
2352/// A dynamic mailbox router that supports remote delivery.
2353///
2354/// `DialMailboxRouter` maintains a runtime address book mapping
2355/// references to `ChannelAddr`s. It holds a cache of active
2356/// connections and forwards messages to the appropriate
2357/// `MailboxClient`.
2358///
2359/// If a message destination is not bound, but is a "direct mode" address
2360/// (i.e., its proc id contains the channel address through which the proc
2361/// is reachable), then DialMailboxRouter dials the proc directly.
2362///
2363/// Messages sent to unknown destinations are routed to the `default`
2364/// sender, if present.
2365#[derive(Debug, Clone)]
2366pub struct DialMailboxRouter {
2367    address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,
2368    sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2369
2370    // The default sender, to which messages for unknown recipients
2371    // are sent. (This is like a default route in a routing table.)
2372    default: BoxedMailboxSender,
2373
2374    // When true, only dial direct-addressed procs if their transport
2375    // type is remote. Otherwise, fall back to the default sender.
2376    direct_addressed_remote_only: bool,
2377}
2378
2379impl Default for DialMailboxRouter {
2380    fn default() -> Self {
2381        Self::new()
2382    }
2383}
2384
2385impl DialMailboxRouter {
2386    /// Create a new [`DialMailboxRouter`] with an empty routing table.
2387    pub fn new() -> Self {
2388        Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2389    }
2390
2391    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2392    /// and a default sender. Any message with an unknown destination is
2393    /// dispatched on this default sender, unless the destination is
2394    /// direct-addressed, in which case it is dialed directly.
2395    pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2396        Self {
2397            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2398            sender_cache: Arc::new(DashMap::new()),
2399            default,
2400            direct_addressed_remote_only: false,
2401        }
2402    }
2403
2404    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2405    /// and a default sender. Any message with an unknown destination is
2406    /// dispatched on this default sender, unless the destination is
2407    /// direct-addressed *and* has a remote channel transport type.
2408    pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
2409        Self {
2410            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2411            sender_cache: Arc::new(DashMap::new()),
2412            default,
2413            direct_addressed_remote_only: true,
2414        }
2415    }
2416
2417    /// Binds a [`Reference`] to a [`ChannelAddr`], replacing any
2418    /// existing binding.
2419    ///
2420    /// If the address changes, the old sender is evicted from the
2421    /// cache to ensure fresh routing on next use.
2422    pub fn bind(&self, dest: Reference, addr: ChannelAddr) {
2423        if let Ok(mut w) = self.address_book.write() {
2424            if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
2425                && old_addr != addr
2426            {
2427                tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2428                self.sender_cache.remove(&old_addr);
2429            }
2430        } else {
2431            tracing::error!("address book poisoned during bind of {:?}", dest);
2432        }
2433    }
2434
2435    /// Removes all address mappings with the given prefix from the
2436    /// router.
2437    ///
2438    /// Also evicts any corresponding cached senders to prevent reuse
2439    /// of stale connections.
2440    pub fn unbind(&self, dest: &Reference) {
2441        if let Ok(mut w) = self.address_book.write() {
2442            let to_remove: Vec<(Reference, ChannelAddr)> = w
2443                .range(dest..)
2444                .take_while(|(key, _)| dest.is_prefix_of(key))
2445                .map(|(key, addr)| (key.clone(), addr.clone()))
2446                .collect();
2447
2448            for (key, addr) in to_remove {
2449                tracing::info!("unbinding {:?} from {:?}", key, addr);
2450                w.remove(&key);
2451                self.sender_cache.remove(&addr);
2452            }
2453        } else {
2454            tracing::error!("address book poisoned during unbind of {:?}", dest);
2455        }
2456    }
2457
2458    /// Lookup an actor's channel in the router's address bok.
2459    pub fn lookup_addr(&self, actor_id: &ActorId) -> Option<ChannelAddr> {
2460        let address_book = self.address_book.read().unwrap();
2461        let found = address_book
2462            .lower_bound(Excluded(&actor_id.clone().into()))
2463            .prev();
2464
2465        // First try to look up the address in our address book; failing that,
2466        // try to resolve direct procs.
2467        if let Some((key, addr)) = found
2468            && key.is_prefix_of(&actor_id.clone().into())
2469        {
2470            Some(addr.clone())
2471        } else if actor_id.proc_id().is_direct() {
2472            let (addr, _name) = actor_id.proc_id().clone().into_direct().unwrap();
2473            if self.direct_addressed_remote_only {
2474                addr.transport().is_remote().then_some(addr)
2475            } else {
2476                Some(addr)
2477            }
2478        } else {
2479            None
2480        }
2481    }
2482
2483    /// Return all covering prefixes of this router. That is, all references that are not
2484    /// prefixed by another reference in the routing table
2485    pub fn prefixes(&self) -> BTreeSet<Reference> {
2486        let addrs = self.address_book.read().unwrap();
2487        let mut prefixes: BTreeSet<Reference> = BTreeSet::new();
2488        for (reference, _) in addrs.iter() {
2489            match prefixes.lower_bound(Excluded(reference)).peek_prev() {
2490                Some(candidate) if candidate.is_prefix_of(reference) => (),
2491                _ => {
2492                    prefixes.insert(reference.clone());
2493                }
2494            }
2495        }
2496
2497        prefixes
2498    }
2499
2500    fn dial(
2501        &self,
2502        addr: &ChannelAddr,
2503        actor_id: &ActorId,
2504    ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2505        // Get the sender. Create it if needed. Do not send the
2506        // messages inside this block so we do not hold onto the
2507        // reference of the dashmap entries.
2508        match self.sender_cache.entry(addr.clone()) {
2509            Entry::Occupied(entry) => Ok(entry.get().clone()),
2510            Entry::Vacant(entry) => {
2511                let tx = channel::dial(addr.clone()).map_err(|err| {
2512                    MailboxSenderError::new_unbound_type(
2513                        actor_id.clone(),
2514                        MailboxSenderErrorKind::Channel(err),
2515                        "unknown",
2516                    )
2517                })?;
2518                let sender = MailboxClient::new(tx);
2519                Ok(entry.insert(Arc::new(sender)).value().clone())
2520            }
2521        }
2522    }
2523}
2524
2525impl MailboxSender for DialMailboxRouter {
2526    fn post_unchecked(
2527        &self,
2528        envelope: MessageEnvelope,
2529        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2530    ) {
2531        let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2532            self.default.post(envelope, return_handle);
2533            return;
2534        };
2535
2536        match self.dial(&addr, envelope.dest().actor_id()) {
2537            Err(err) => envelope.undeliverable(
2538                DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2539                return_handle,
2540            ),
2541            Ok(sender) => sender.post(envelope, return_handle),
2542        }
2543    }
2544}
2545
2546/// A MailboxSender that reports any envelope as undeliverable due to
2547/// routing failure.
2548#[derive(Debug)]
2549pub struct UnroutableMailboxSender;
2550
2551impl MailboxSender for UnroutableMailboxSender {
2552    fn post_unchecked(
2553        &self,
2554        envelope: MessageEnvelope,
2555        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2556    ) {
2557        envelope.undeliverable(
2558            DeliveryError::Unroutable("destination not found in routing table".to_string()),
2559            return_handle,
2560        );
2561    }
2562}
2563
2564#[cfg(test)]
2565mod tests {
2566
2567    use std::assert_matches::assert_matches;
2568    use std::mem::drop;
2569    use std::str::FromStr;
2570    use std::sync::atomic::AtomicUsize;
2571    use std::time::Duration;
2572
2573    use timed_test::async_timed_test;
2574
2575    use super::*;
2576    use crate::Actor;
2577    use crate::ActorHandle;
2578    use crate::Instance;
2579    use crate::PortId;
2580    use crate::accum;
2581    use crate::channel::ChannelTransport;
2582    use crate::channel::dial;
2583    use crate::channel::serve;
2584    use crate::channel::sim::SimAddr;
2585    use crate::clock::Clock;
2586    use crate::clock::RealClock;
2587    use crate::data::Serialized;
2588    use crate::id;
2589    use crate::proc::Proc;
2590    use crate::reference::ProcId;
2591    use crate::reference::WorldId;
2592    use crate::simnet;
2593
2594    #[test]
2595    fn test_error() {
2596        let err = MailboxError::new(
2597            ActorId(
2598                ProcId::Ranked(WorldId("myworld".to_string()), 2),
2599                "myactor".to_string(),
2600                5,
2601            ),
2602            MailboxErrorKind::Closed,
2603        );
2604        assert_eq!(format!("{}", err), "myworld[2].myactor[5]: mailbox closed");
2605    }
2606
2607    #[tokio::test]
2608    async fn test_mailbox_basic() {
2609        let mbox = Mailbox::new_detached(id!(test[0].test));
2610        let (port, mut receiver) = mbox.open_port::<u64>();
2611        let port = port.bind();
2612
2613        mbox.serialize_and_send(&port, 123, monitored_return_handle())
2614            .unwrap();
2615        mbox.serialize_and_send(&port, 321, monitored_return_handle())
2616            .unwrap();
2617        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2618        assert_eq!(receiver.recv().await.unwrap(), 321u64);
2619
2620        let serialized = Serialized::serialize(&999u64).unwrap();
2621        mbox.post(
2622            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2623            monitored_return_handle(),
2624        );
2625        assert_eq!(receiver.recv().await.unwrap(), 999u64);
2626    }
2627
2628    #[tokio::test]
2629    async fn test_mailbox_accum() {
2630        let mbox = Mailbox::new_detached(id!(test[0].test));
2631        let (port, mut receiver) = mbox.open_accum_port(accum::max::<i64>());
2632
2633        for i in -3..4 {
2634            port.send(i).unwrap();
2635            let received: accum::Max<i64> = receiver.recv().await.unwrap();
2636            let msg = received.get();
2637            assert_eq!(msg, &i);
2638        }
2639        // Send a smaller or same value. Should still receive the previous max.
2640        for i in -3..4 {
2641            port.send(i).unwrap();
2642            assert_eq!(receiver.recv().await.unwrap().get(), &3);
2643        }
2644        // send a larger value. Should receive the new max.
2645        port.send(4).unwrap();
2646        assert_eq!(receiver.recv().await.unwrap().get(), &4);
2647
2648        // Send multiple updates. Should only receive the final change.
2649        for i in 5..10 {
2650            port.send(i).unwrap();
2651        }
2652        assert_eq!(receiver.recv().await.unwrap().get(), &9);
2653        port.send(1).unwrap();
2654        port.send(3).unwrap();
2655        port.send(2).unwrap();
2656        assert_eq!(receiver.recv().await.unwrap().get(), &9);
2657    }
2658
2659    #[test]
2660    fn test_port_and_reducer() {
2661        let mbox = Mailbox::new_detached(id!(test[0].test));
2662        // accum port could have reducer typehash
2663        {
2664            let accumulator = accum::max::<u64>();
2665            let reducer_spec = accumulator.reducer_spec().unwrap();
2666            let (port, _) = mbox.open_accum_port(accum::max::<u64>());
2667            assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
2668            let port_ref = port.bind();
2669            assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
2670        }
2671        // normal port should not have reducer typehash
2672        {
2673            let (port, _) = mbox.open_port::<u64>();
2674            assert_eq!(port.reducer_spec, None);
2675            let port_ref = port.bind();
2676            assert_eq!(port_ref.reducer_spec(), &None);
2677        }
2678    }
2679
2680    #[tokio::test]
2681    #[ignore] // error behavior changed, but we will bring it back
2682    async fn test_mailbox_once() {
2683        let mbox = Mailbox::new_detached(id!(test[0].test));
2684
2685        let (port, receiver) = mbox.open_once_port::<u64>();
2686
2687        // let port_id = port.port_id().clone();
2688
2689        port.send(123u64).unwrap();
2690        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2691
2692        // // The borrow checker won't let us send again on the port
2693        // // (good!), but we stashed the port-id and so we can try on the
2694        // // serialized interface.
2695        // let Err(err) = mbox
2696        //     .send_serialized(&port_id, &Serialized(Vec::new()))
2697        //     .await
2698        // else {
2699        //     unreachable!()
2700        // };
2701        // assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2702    }
2703
2704    #[tokio::test]
2705    #[ignore] // changed error behavior
2706    async fn test_mailbox_receiver_drop() {
2707        let mbox = Mailbox::new_detached(id!(test[0].test));
2708        let (port, mut receiver) = mbox.open_port::<u64>();
2709        // Make sure we go through "remote" path.
2710        let port = port.bind();
2711        mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
2712            .unwrap();
2713        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2714        drop(receiver);
2715        let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
2716            panic!();
2717        };
2718
2719        assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2720        assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
2721    }
2722
2723    #[tokio::test]
2724    async fn test_drain() {
2725        let mbox = Mailbox::new_detached(id!(test[0].test));
2726
2727        let (port, mut receiver) = mbox.open_port();
2728        let port = port.bind();
2729
2730        for i in 0..10 {
2731            mbox.serialize_and_send(&port, i, monitored_return_handle())
2732                .unwrap();
2733        }
2734
2735        for i in 0..10 {
2736            assert_eq!(receiver.recv().await.unwrap(), i);
2737        }
2738
2739        assert!(receiver.drain().is_empty());
2740    }
2741
2742    #[tokio::test]
2743    async fn test_mailbox_muxer() {
2744        let muxer = MailboxMuxer::new();
2745
2746        let mbox0 = Mailbox::new_detached(id!(test[0].actor1));
2747        let mbox1 = Mailbox::new_detached(id!(test[0].actor2));
2748
2749        muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
2750        muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
2751
2752        let (port, receiver) = mbox0.open_once_port::<u64>();
2753
2754        port.send(123u64).unwrap();
2755        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2756
2757        /*
2758        let (tx, rx) = channel::local::new::<u64>();
2759        let (port, _) = mbox0.open_port::<u64>();
2760        let handle = muxer.clone().serve_port(port, rx).unwrap();
2761        muxer.unbind(mbox0.actor_id());
2762        tx.send(123u64).await.unwrap();
2763        let Ok(Err(err)) = handle.await else { panic!() };
2764        assert_eq!(err.actor_id(), &actor_id(0));
2765        */
2766    }
2767
2768    #[tokio::test]
2769    async fn test_local_client_server() {
2770        let mbox = Mailbox::new_detached(id!(test[0].actor0));
2771        let (tx, rx) = channel::local::new();
2772        let serve_handle = mbox.clone().serve(rx);
2773        let client = MailboxClient::new(tx);
2774
2775        let (port, receiver) = mbox.open_once_port::<u64>();
2776        let port = port.bind();
2777
2778        client
2779            .serialize_and_send_once(port, 123u64, monitored_return_handle())
2780            .unwrap();
2781        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2782        serve_handle.stop("fromt test");
2783        serve_handle.await.unwrap().unwrap();
2784    }
2785
2786    #[tokio::test]
2787    async fn test_sim_client_server() {
2788        simnet::start();
2789        let dst_addr = SimAddr::new("local:1".parse::<ChannelAddr>().unwrap()).unwrap();
2790        let src_to_dst = ChannelAddr::Sim(
2791            SimAddr::new_with_src(
2792                "local:0".parse::<ChannelAddr>().unwrap(),
2793                dst_addr.addr().clone(),
2794            )
2795            .unwrap(),
2796        );
2797
2798        let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone())).unwrap();
2799        let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
2800        let mbox = Mailbox::new_detached(id!(test[0].actor0));
2801        let serve_handle = mbox.clone().serve(rx);
2802        let client = MailboxClient::new(tx);
2803        let (port, receiver) = mbox.open_once_port::<u64>();
2804        let port = port.bind();
2805        let msg: u64 = 123;
2806        client
2807            .serialize_and_send_once(port, msg, monitored_return_handle())
2808            .unwrap();
2809        assert_eq!(receiver.recv().await.unwrap(), msg);
2810        serve_handle.stop("from test");
2811        serve_handle.await.unwrap().unwrap();
2812    }
2813
2814    #[tokio::test]
2815    async fn test_mailbox_router() {
2816        let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2817        let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2818        let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2819        let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2820
2821        let comms: Vec<(OncePortRef<u64>, OncePortReceiver<u64>)> =
2822            [&mbox0, &mbox1, &mbox2, &mbox3]
2823                .into_iter()
2824                .map(|mbox| {
2825                    let (port, receiver) = mbox.open_once_port::<u64>();
2826                    (port.bind(), receiver)
2827                })
2828                .collect();
2829
2830        let router = MailboxRouter::new();
2831
2832        router.bind(id!(world0).into(), mbox0);
2833        router.bind(id!(world1[0]).into(), mbox1);
2834        router.bind(id!(world1[1]).into(), mbox2);
2835        router.bind(id!(world1[1].actor1).into(), mbox3);
2836
2837        for (i, (port, receiver)) in comms.into_iter().enumerate() {
2838            router
2839                .serialize_and_send_once(port, i as u64, monitored_return_handle())
2840                .unwrap();
2841            assert_eq!(receiver.recv().await.unwrap(), i as u64);
2842        }
2843
2844        // Test undeliverable messages, and that it is delivered with the appropriate fallback.
2845
2846        let mbox4 = Mailbox::new_detached(id!(fallback[0].actor));
2847
2848        let (return_handle, mut return_receiver) =
2849            crate::mailbox::undeliverable::new_undeliverable_port();
2850        let (port, _receiver) = mbox4.open_once_port();
2851        router
2852            .serialize_and_send_once(port.bind(), 0, return_handle.clone())
2853            .unwrap();
2854        assert!(return_receiver.recv().await.is_ok());
2855
2856        let router = router.fallback(mbox4.clone().into_boxed());
2857        let (port, receiver) = mbox4.open_once_port();
2858        router
2859            .serialize_and_send_once(port.bind(), 0, return_handle)
2860            .unwrap();
2861        assert_eq!(receiver.recv().await.unwrap(), 0);
2862    }
2863
2864    #[tokio::test]
2865    async fn test_dial_mailbox_router() {
2866        let router = DialMailboxRouter::new();
2867
2868        router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
2869        router.bind(id!(world1[0]).into(), "unix!@2".parse().unwrap());
2870        router.bind(id!(world1[1]).into(), "unix!@3".parse().unwrap());
2871        router.bind(id!(world1[1].actor1).into(), "unix!@4".parse().unwrap());
2872        // Bind a direct address -- we should use its bound address!
2873        router.bind(
2874            "unix:@4,my_proc,my_actor".parse().unwrap(),
2875            "unix:@5".parse().unwrap(),
2876        );
2877
2878        // We should be able to lookup the ids
2879        router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2880        router.lookup_addr(&id!(world1[0].actor[0])).unwrap();
2881
2882        let actor_id = Reference::from_str("unix:@4,my_proc,my_actor")
2883            .unwrap()
2884            .into_actor()
2885            .unwrap();
2886        assert_eq!(
2887            router.lookup_addr(&actor_id).unwrap(),
2888            "unix!@5".parse().unwrap(),
2889        );
2890        router.unbind(&actor_id.clone().into());
2891        assert_eq!(
2892            router.lookup_addr(&actor_id).unwrap(),
2893            "unix!@4".parse().unwrap(),
2894        );
2895
2896        // Unbind so we cannot find the ids anymore
2897        router.unbind(&id!(world1).into());
2898        assert!(router.lookup_addr(&id!(world1[0].actor1[0])).is_none());
2899        assert!(router.lookup_addr(&id!(world1[1].actor1[0])).is_none());
2900        assert!(router.lookup_addr(&id!(world1[2].actor1[0])).is_none());
2901        router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2902        router.unbind(&id!(world0).into());
2903        assert!(router.lookup_addr(&id!(world0[0].actor[0])).is_none());
2904    }
2905
2906    #[tokio::test]
2907    #[ignore] // TODO: there's a leak here, fix it
2908    async fn test_dial_mailbox_router_default() {
2909        let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2910        let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2911        let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2912        let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2913
2914        // We don't need to dial here, since we gain direct access to the
2915        // underlying routers.
2916        let root = MailboxRouter::new();
2917        let world0_router = DialMailboxRouter::new_with_default(root.boxed());
2918        let world1_router = DialMailboxRouter::new_with_default(root.boxed());
2919
2920        root.bind(id!(world0).into(), world0_router.clone());
2921        root.bind(id!(world1).into(), world1_router.clone());
2922
2923        let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
2924
2925        let mut handles = Vec::new(); // hold on to handles, or channels get closed
2926        for mbox in mailboxes.iter() {
2927            let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
2928            let handle = (*mbox).clone().serve(rx);
2929            handles.push(handle);
2930
2931            eprintln!("{}: {}", mbox.actor_id(), addr);
2932            if mbox.actor_id().world_name() == "world0" {
2933                world0_router.bind(mbox.actor_id().clone().into(), addr);
2934            } else {
2935                world1_router.bind(mbox.actor_id().clone().into(), addr);
2936            }
2937        }
2938
2939        // Make sure nodes are fully connected.
2940        for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
2941            for mbox in mailboxes.iter() {
2942                let (port, receiver) = mbox.open_once_port::<u64>();
2943                let port = port.bind();
2944                router
2945                    .serialize_and_send_once(port, 123u64, monitored_return_handle())
2946                    .unwrap();
2947                assert_eq!(receiver.recv().await.unwrap(), 123u64);
2948            }
2949        }
2950    }
2951
2952    #[tokio::test]
2953    async fn test_enqueue_port() {
2954        let mbox = Mailbox::new_detached(id!(test[0].test));
2955
2956        let count = Arc::new(AtomicUsize::new(0));
2957        let count_clone = count.clone();
2958        let port = mbox.open_enqueue_port(move |_, n| {
2959            count_clone.fetch_add(n, Ordering::SeqCst);
2960            Ok(())
2961        });
2962
2963        port.send(10).unwrap();
2964        port.send(5).unwrap();
2965        port.send(1).unwrap();
2966        port.send(0).unwrap();
2967
2968        assert_eq!(count.load(Ordering::SeqCst), 16);
2969    }
2970
2971    #[derive(Clone, Debug, Serialize, Deserialize, Named)]
2972    struct TestMessage;
2973
2974    #[derive(Clone, Debug, Serialize, Deserialize, Named)]
2975    #[named(name = "some::custom::path")]
2976    struct TestMessage2;
2977
2978    #[test]
2979    fn test_remote_message_macros() {
2980        assert_eq!(
2981            TestMessage::typename(),
2982            "hyperactor::mailbox::tests::TestMessage"
2983        );
2984        assert_eq!(TestMessage2::typename(), "some::custom::path");
2985    }
2986
2987    #[test]
2988    fn test_message_envelope_display() {
2989        #[derive(Named, Serialize, Deserialize)]
2990        struct MyTest {
2991            a: u64,
2992            b: String,
2993        }
2994        crate::register_type!(MyTest);
2995
2996        let envelope = MessageEnvelope::serialize(
2997            id!(source[0].actor),
2998            id!(dest[1].actor[0][123]),
2999            &MyTest {
3000                a: 123,
3001                b: "hello".into(),
3002            },
3003            Attrs::new(),
3004        )
3005        .unwrap();
3006
3007        assert_eq!(
3008            format!("{}", envelope),
3009            r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"}"#
3010        );
3011    }
3012
3013    #[derive(Debug, Default, Actor)]
3014    struct Foo;
3015
3016    // Test that a message delivery failure causes the sending actor
3017    // to stop running.
3018    #[tokio::test]
3019    async fn test_actor_delivery_failure() {
3020        // This test involves making an actor fail and so we must set
3021        // a supervision coordinator.
3022        use crate::actor::ActorStatus;
3023        use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
3024
3025        let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
3026            BOXED_PANICKING_MAILBOX_SENDER.clone(),
3027        ));
3028        let proc_id = id!(quux[0]);
3029        let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
3030        ProcSupervisionCoordinator::set(&proc).await.unwrap();
3031
3032        let foo = proc.spawn::<Foo>("foo", ()).await.unwrap();
3033        let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
3034        let message = MessageEnvelope::new(
3035            foo.actor_id().clone(),
3036            PortId(id!(corge[0].bar), 9999u64),
3037            Serialized::serialize(&1u64).unwrap(),
3038            Attrs::new(),
3039        );
3040        return_handle.send(Undeliverable(message)).unwrap();
3041
3042        RealClock
3043            .sleep(tokio::time::Duration::from_millis(100))
3044            .await;
3045
3046        let foo_status = foo.status();
3047        assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
3048        let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
3049            unreachable!()
3050        };
3051        assert!(msg.as_str().contains(
3052            "serving quux[0].foo[0]: processing error: a message from \
3053                quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
3054        ));
3055
3056        proc.destroy_and_wait::<()>(tokio::time::Duration::from_secs(1), None)
3057            .await
3058            .unwrap();
3059    }
3060
3061    #[tokio::test]
3062    async fn test_detached_return_handle() {
3063        let (return_handle, mut return_receiver) =
3064            crate::mailbox::undeliverable::new_undeliverable_port();
3065        // Simulate an undelivered message return.
3066        let envelope = MessageEnvelope::new(
3067            id!(foo[0].bar),
3068            PortId(id!(baz[0].corge), 9999u64),
3069            Serialized::serialize(&1u64).unwrap(),
3070            Attrs::new(),
3071        );
3072        return_handle.send(Undeliverable(envelope.clone())).unwrap();
3073        // Check we receive the undelivered message.
3074        assert!(
3075            RealClock
3076                .timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
3077                .await
3078                .is_ok()
3079        );
3080        // Setup a monitor for the receiver and show that if there are
3081        // no outstanding return handles it terminates.
3082        let monitor_handle = tokio::spawn(async move {
3083            while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
3084                envelope.set_error(DeliveryError::BrokenLink(
3085                    "returned in unit test".to_string(),
3086                ));
3087                UndeliverableMailboxSender
3088                    .post(envelope, /*unused */ monitored_return_handle());
3089            }
3090        });
3091        drop(return_handle);
3092        assert!(
3093            RealClock
3094                .timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3095                .await
3096                .is_ok()
3097        );
3098    }
3099
3100    async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3101        fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3102            // Create dummy state and port_id to create PortReceiver. They are
3103            // not used in the test.
3104            let dummy_state =
3105                State::new(id!(world[0].actor), BOXED_PANICKING_MAILBOX_SENDER.clone());
3106            let dummy_port_id = PortId(id!(world[0].actor), 0);
3107            let (sender, receiver) = mpsc::unbounded_channel::<M>();
3108            let receiver = PortReceiver {
3109                receiver,
3110                port_id: dummy_port_id,
3111                coalesce,
3112                mailbox: Mailbox {
3113                    inner: Arc::new(dummy_state),
3114                },
3115            };
3116            (sender, receiver)
3117        }
3118
3119        // verify fn drain
3120        {
3121            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3122            assert!(receiver.drain().is_empty());
3123
3124            sender.send(0).unwrap();
3125            sender.send(1).unwrap();
3126            sender.send(2).unwrap();
3127            sender.send(3).unwrap();
3128            sender.send(4).unwrap();
3129            sender.send(5).unwrap();
3130            sender.send(6).unwrap();
3131            sender.send(7).unwrap();
3132
3133            if drop_sender {
3134                drop(sender);
3135            }
3136
3137            if !coalesce {
3138                assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3139            } else {
3140                assert_eq!(receiver.drain(), vec![7]);
3141            }
3142
3143            assert!(receiver.drain().is_empty());
3144            assert!(receiver.drain().is_empty());
3145        }
3146
3147        // verify fn try_recv
3148        {
3149            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3150            assert!(receiver.try_recv().unwrap().is_none());
3151
3152            sender.send(0).unwrap();
3153            sender.send(1).unwrap();
3154            sender.send(2).unwrap();
3155            sender.send(3).unwrap();
3156
3157            if drop_sender {
3158                drop(sender);
3159            }
3160
3161            if !coalesce {
3162                assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3163                assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3164                assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3165            }
3166            assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3167            if drop_sender {
3168                assert_matches!(
3169                    receiver.try_recv().unwrap_err().kind(),
3170                    MailboxErrorKind::Closed
3171                );
3172                // Still Closed error
3173                assert_matches!(
3174                    receiver.try_recv().unwrap_err().kind(),
3175                    MailboxErrorKind::Closed
3176                );
3177            } else {
3178                assert!(receiver.try_recv().unwrap().is_none());
3179                // Still empty
3180                assert!(receiver.try_recv().unwrap().is_none());
3181            }
3182        }
3183        // verify fn recv
3184        {
3185            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3186            assert!(
3187                RealClock
3188                    .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3189                    .await
3190                    .is_err()
3191            );
3192
3193            sender.send(4).unwrap();
3194            sender.send(5).unwrap();
3195            sender.send(6).unwrap();
3196            sender.send(7).unwrap();
3197
3198            if drop_sender {
3199                drop(sender);
3200            }
3201
3202            if !coalesce {
3203                assert_eq!(receiver.recv().await.unwrap(), 4);
3204                assert_eq!(receiver.recv().await.unwrap(), 5);
3205                assert_eq!(receiver.recv().await.unwrap(), 6);
3206            }
3207            assert_eq!(receiver.recv().await.unwrap(), 7);
3208            if drop_sender {
3209                assert_matches!(
3210                    receiver.recv().await.unwrap_err().kind(),
3211                    MailboxErrorKind::Closed
3212                );
3213                // Still None
3214                assert_matches!(
3215                    receiver.recv().await.unwrap_err().kind(),
3216                    MailboxErrorKind::Closed
3217                );
3218            } else {
3219                assert!(
3220                    RealClock
3221                        .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3222                        .await
3223                        .is_err()
3224                );
3225            }
3226        }
3227    }
3228
3229    #[tokio::test]
3230    async fn test_receiver_basic_default() {
3231        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ false).await
3232    }
3233
3234    #[tokio::test]
3235    async fn test_receiver_basic_latest() {
3236        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ false).await
3237    }
3238
3239    #[tokio::test]
3240    async fn test_receiver_after_sender_drop_default() {
3241        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ true).await
3242    }
3243
3244    #[tokio::test]
3245    async fn test_receiver_after_sender_drop_latest() {
3246        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ true).await
3247    }
3248
3249    struct Setup {
3250        receiver: PortReceiver<u64>,
3251        actor0: Instance<()>,
3252        actor1: Instance<()>,
3253        _actor0_handle: ActorHandle<()>,
3254        _actor1_handle: ActorHandle<()>,
3255        port_id: PortId,
3256        port_id1: PortId,
3257        port_id2: PortId,
3258        port_id2_1: PortId,
3259    }
3260
3261    async fn setup_split_port_ids(
3262        reducer_spec: Option<ReducerSpec>,
3263        reducer_opts: Option<ReducerOpts>,
3264    ) -> Setup {
3265        let proc = Proc::local();
3266        let (actor0, actor0_handle) = proc.instance("actor0").unwrap();
3267        let (actor1, actor1_handle) = proc.instance("actor1").unwrap();
3268
3269        // Open a port on actor0
3270        let (port_handle, receiver) = actor0.open_port::<u64>();
3271        let port_id = port_handle.bind().port_id().clone();
3272
3273        // Split it twice on actor1
3274        let port_id1 = port_id
3275            .split(&actor1, reducer_spec.clone(), reducer_opts.clone())
3276            .unwrap();
3277        let port_id2 = port_id
3278            .split(&actor1, reducer_spec.clone(), reducer_opts.clone())
3279            .unwrap();
3280
3281        // A split port id can also be split
3282        let port_id2_1 = port_id2
3283            .split(&actor1, reducer_spec, reducer_opts.clone())
3284            .unwrap();
3285
3286        Setup {
3287            receiver,
3288            actor0,
3289            actor1,
3290            _actor0_handle: actor0_handle,
3291            _actor1_handle: actor1_handle,
3292            port_id,
3293            port_id1,
3294            port_id2,
3295            port_id2_1,
3296        }
3297    }
3298
3299    fn post(cx: &impl context::Actor, port_id: PortId, msg: u64) {
3300        let serialized = Serialized::serialize(&msg).unwrap();
3301        port_id.send(cx, serialized);
3302    }
3303
3304    #[async_timed_test(timeout_secs = 30)]
3305    async fn test_split_port_id_no_reducer() {
3306        let Setup {
3307            mut receiver,
3308            actor0,
3309            actor1,
3310            port_id,
3311            port_id1,
3312            port_id2,
3313            port_id2_1,
3314            ..
3315        } = setup_split_port_ids(None, None).await;
3316        // Can send messages to receiver from all port handles
3317        post(&actor0, port_id.clone(), 1);
3318        assert_eq!(receiver.recv().await.unwrap(), 1);
3319        post(&actor1, port_id1.clone(), 2);
3320        assert_eq!(receiver.recv().await.unwrap(), 2);
3321        post(&actor1, port_id2.clone(), 3);
3322        assert_eq!(receiver.recv().await.unwrap(), 3);
3323        post(&actor1, port_id2_1.clone(), 4);
3324        assert_eq!(receiver.recv().await.unwrap(), 4);
3325
3326        // no more messages
3327        RealClock.sleep(Duration::from_secs(2)).await;
3328        let msg = receiver.try_recv().unwrap();
3329        assert_eq!(msg, None);
3330    }
3331
3332    async fn wait_for(
3333        receiver: &mut PortReceiver<u64>,
3334        expected_size: usize,
3335        timeout_duration: Duration,
3336    ) -> anyhow::Result<Vec<u64>> {
3337        let mut messeges = vec![];
3338
3339        RealClock
3340            .timeout(timeout_duration, async {
3341                loop {
3342                    let msg = receiver.recv().await.unwrap();
3343                    messeges.push(msg);
3344                    if messeges.len() == expected_size {
3345                        break;
3346                    }
3347                }
3348            })
3349            .await?;
3350        Ok(messeges)
3351    }
3352
3353    #[async_timed_test(timeout_secs = 30)]
3354    async fn test_split_port_id_sum_reducer() {
3355        let config = crate::config::global::lock();
3356        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3357
3358        let sum_accumulator = accum::sum::<u64>();
3359        let reducer_spec = sum_accumulator.reducer_spec();
3360        let Setup {
3361            mut receiver,
3362            actor0,
3363            actor1,
3364            port_id,
3365            port_id1,
3366            port_id2,
3367            port_id2_1,
3368            ..
3369        } = setup_split_port_ids(reducer_spec, None).await;
3370        post(&actor0, port_id.clone(), 4);
3371        post(&actor1, port_id1.clone(), 2);
3372        post(&actor1, port_id2.clone(), 3);
3373        post(&actor1, port_id2_1.clone(), 1);
3374        let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3375            .await
3376            .unwrap();
3377        // Message might be received out of their sending out. So we sort the
3378        // messages here.
3379        messages.sort();
3380        assert_eq!(messages, vec![1, 2, 3, 4]);
3381
3382        // no more messages
3383        RealClock.sleep(Duration::from_secs(2)).await;
3384        let msg = receiver.try_recv().unwrap();
3385        assert_eq!(msg, None);
3386    }
3387
3388    #[async_timed_test(timeout_secs = 30)]
3389    async fn test_split_port_id_every_n_messages() {
3390        let config = crate::config::global::lock();
3391        let _config_guard = config.override_key(
3392            crate::config::SPLIT_MAX_BUFFER_AGE,
3393            Duration::from_secs(600),
3394        );
3395        let proc = Proc::local();
3396        let (actor, _actor_handle) = proc.instance("actor").unwrap();
3397        let (port_handle, mut receiver) = actor.open_port::<u64>();
3398        let port_id = port_handle.bind().port_id().clone();
3399        // Split it
3400        let reducer_spec = accum::sum::<u64>().reducer_spec();
3401        let split_port_id = port_id.split(&actor, reducer_spec, None).unwrap();
3402
3403        // Send 9 messages.
3404        for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3405            post(&actor, split_port_id.clone(), msg);
3406        }
3407        // The first 5 should be batched and reduced once due
3408        // to every_n_msgs = 5.
3409        let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3410            .await
3411            .unwrap();
3412        assert_eq!(messages, vec![15]);
3413
3414        // the last message unfortranately will never come because they do not
3415        // reach batch size.
3416        RealClock.sleep(Duration::from_secs(2)).await;
3417        let msg = receiver.try_recv().unwrap();
3418        assert_eq!(msg, None);
3419    }
3420
3421    #[async_timed_test(timeout_secs = 30)]
3422    async fn test_split_port_timeout_flush() {
3423        let config = crate::config::global::lock();
3424        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
3425
3426        let Setup {
3427            mut receiver,
3428            actor0: _,
3429            actor1,
3430            port_id: _,
3431            port_id1,
3432            port_id2: _,
3433            port_id2_1: _,
3434            ..
3435        } = setup_split_port_ids(
3436            Some(accum::sum::<u64>().reducer_spec().unwrap()),
3437            Some(ReducerOpts {
3438                max_update_interval: Some(Duration::from_millis(50)),
3439            }),
3440        )
3441        .await;
3442
3443        post(&actor1, port_id1.clone(), 10);
3444        post(&actor1, port_id1.clone(), 20);
3445        post(&actor1, port_id1.clone(), 30);
3446
3447        // Messages should accumulate for 50ms.
3448        RealClock.sleep(Duration::from_millis(10)).await;
3449        let msg = receiver.try_recv().unwrap();
3450        assert_eq!(msg, None);
3451
3452        // Wait until we are flushed.
3453        RealClock.sleep(Duration::from_millis(100)).await;
3454
3455        // Now we are reduced and accumulated:
3456        let msg = receiver.recv().await.unwrap();
3457        assert_eq!(msg, 60); // 10 + 20 + 30
3458
3459        // No further messages:
3460        let msg = receiver.try_recv().unwrap();
3461        assert_eq!(msg, None);
3462    }
3463
3464    #[async_timed_test(timeout_secs = 30)]
3465    async fn test_split_port_timeout_and_size_flush() {
3466        let config = crate::config::global::lock();
3467        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
3468
3469        let Setup {
3470            mut receiver,
3471            actor0: _,
3472            actor1,
3473            port_id: _,
3474            port_id1,
3475            port_id2: _,
3476            port_id2_1: _,
3477            ..
3478        } = setup_split_port_ids(
3479            Some(accum::sum::<u64>().reducer_spec().unwrap()),
3480            Some(ReducerOpts {
3481                max_update_interval: Some(Duration::from_millis(50)),
3482            }),
3483        )
3484        .await;
3485
3486        post(&actor1, port_id1.clone(), 10);
3487        post(&actor1, port_id1.clone(), 20);
3488        post(&actor1, port_id1.clone(), 30);
3489        post(&actor1, port_id1.clone(), 40);
3490
3491        // Should have flushed at the third message.
3492        let msg = receiver.recv().await.unwrap();
3493        assert_eq!(msg, 60);
3494
3495        // After 50ms, the next reduce will flush:
3496        let msg = receiver.recv().await.unwrap();
3497        assert_eq!(msg, 40);
3498
3499        // No further messages
3500        let msg = receiver.try_recv().unwrap();
3501        assert_eq!(msg, None);
3502    }
3503
3504    #[test]
3505    fn test_dial_mailbox_router_prefixes_empty() {
3506        assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
3507    }
3508
3509    #[test]
3510    fn test_dial_mailbox_router_prefixes_single_entry() {
3511        let router = DialMailboxRouter::new();
3512        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3513
3514        let prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3515        assert_eq!(prefixes.len(), 1);
3516        assert_eq!(prefixes[0], id!(world0).into());
3517    }
3518
3519    #[test]
3520    fn test_dial_mailbox_router_prefixes_no_overlap() {
3521        let router = DialMailboxRouter::new();
3522        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3523        router.bind(id!(world1).into(), "unix!@2".parse().unwrap());
3524        router.bind(id!(world2).into(), "unix!@3".parse().unwrap());
3525
3526        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3527        prefixes.sort();
3528
3529        let mut expected = vec![id!(world0).into(), id!(world1).into(), id!(world2).into()];
3530        expected.sort();
3531
3532        assert_eq!(prefixes, expected);
3533    }
3534
3535    #[test]
3536    fn test_dial_mailbox_router_prefixes_with_overlaps() {
3537        let router = DialMailboxRouter::new();
3538        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3539        router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3540        router.bind(id!(world0[1]).into(), "unix!@3".parse().unwrap());
3541        router.bind(id!(world1).into(), "unix!@4".parse().unwrap());
3542        router.bind(id!(world1[0]).into(), "unix!@5".parse().unwrap());
3543
3544        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3545        prefixes.sort();
3546
3547        // Only world0 and world1 should be covering prefixes since they cover their children
3548        let mut expected = vec![id!(world0).into(), id!(world1).into()];
3549        expected.sort();
3550
3551        assert_eq!(prefixes, expected);
3552    }
3553
3554    #[test]
3555    fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
3556        let router = DialMailboxRouter::new();
3557        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3558        router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3559        router.bind(id!(world0[0].actor1).into(), "unix!@3".parse().unwrap());
3560        router.bind(id!(world1[0]).into(), "unix!@4".parse().unwrap());
3561        router.bind(id!(world1[1]).into(), "unix!@5".parse().unwrap());
3562        router.bind(id!(world2[0].actor0).into(), "unix!@6".parse().unwrap());
3563
3564        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3565        prefixes.sort();
3566
3567        // Covering prefixes should be:
3568        // - world0 (covers world0[0] and world0[0].actor1)
3569        // - world1[0] (not covered by anything else)
3570        // - world1[1] (not covered by anything else)
3571        // - world2[0].actor0 (not covered by anything else)
3572        let expected = vec![
3573            id!(world0).into(),
3574            id!(world1[0]).into(),
3575            id!(world1[1]).into(),
3576            id!(world2[0].actor0).into(),
3577        ];
3578
3579        assert_eq!(prefixes, expected);
3580    }
3581
3582    #[test]
3583    fn test_dial_mailbox_router_prefixes_same_level() {
3584        let router = DialMailboxRouter::new();
3585        router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
3586        router.bind(id!(world0[1]).into(), "unix!@2".parse().unwrap());
3587        router.bind(id!(world0[2]).into(), "unix!@3".parse().unwrap());
3588
3589        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3590        prefixes.sort();
3591
3592        // All should be covering prefixes since none is a prefix of another
3593        let mut expected = vec![
3594            id!(world0[0]).into(),
3595            id!(world0[1]).into(),
3596            id!(world0[2]).into(),
3597        ];
3598        expected.sort();
3599
3600        assert_eq!(prefixes, expected);
3601    }
3602
3603    /// A forwarder that bounces messages back to the **same**
3604    /// mailbox, but does so on a task to avoid recursive stack
3605    /// growth.
3606    #[derive(Clone, Debug)]
3607    struct AsyncLoopForwarder;
3608
3609    impl MailboxSender for AsyncLoopForwarder {
3610        fn post_unchecked(
3611            &self,
3612            envelope: MessageEnvelope,
3613            return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3614        ) {
3615            let me = self.clone();
3616            tokio::spawn(async move {
3617                // Call `post` so each hop applies TTL exactly once.
3618                me.post(envelope, return_handle);
3619            });
3620        }
3621    }
3622
3623    #[tokio::test]
3624    async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
3625        let actor_id = ActorId(
3626            ProcId::Ranked(id!(test_world), 0),
3627            "ttl_actor".to_string(),
3628            0,
3629        );
3630        let mailbox = Mailbox::new(
3631            actor_id.clone(),
3632            BoxedMailboxSender::new(AsyncLoopForwarder),
3633        );
3634        let (ret_port, mut ret_rx) = mailbox.open_port::<Undeliverable<MessageEnvelope>>();
3635        ret_port.bind_to(Undeliverable::<MessageEnvelope>::port());
3636
3637        // Create a destination not owned by this mailbox to force
3638        // forwarding.
3639        let remote_actor = ActorId(
3640            ProcId::Ranked(id!(remote_world), 1),
3641            "remote".to_string(),
3642            0,
3643        );
3644        let dest = PortId(remote_actor.clone(), /*port index*/ 4242);
3645
3646        // Build an envelope (TTL is seeded in `MessageEnvelope::new` /
3647        // `::serialize`).
3648        let payload = 1234_u64;
3649        let envelope =
3650            MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Attrs::new())
3651                .expect("serialize");
3652
3653        // Post it. This will start bouncing between forwarder and
3654        // mailbox until TTL hits 0.
3655        let return_handle = ret_port.clone();
3656        mailbox.post(envelope, return_handle);
3657
3658        // We expect the undeliverable to come back once TTL expires.
3659        #[allow(clippy::disallowed_methods)]
3660        let Undeliverable(undelivered) =
3661            tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
3662                .await
3663                .expect("timed out waiting for undeliverable")
3664                .expect("channel closed");
3665
3666        // Sanity: round-trip payload still deserializes.
3667        let got: u64 = undelivered.deserialized().expect("deserialize");
3668        assert_eq!(got, payload, "payload preserved");
3669    }
3670
3671    #[tokio::test]
3672    async fn message_ttl_success_local_delivery() {
3673        let actor_id = ActorId(
3674            ProcId::Ranked(id!(test_world), 0),
3675            "ttl_actor".to_string(),
3676            0,
3677        );
3678        let mailbox = Mailbox::new(
3679            actor_id.clone(),
3680            BoxedMailboxSender::new(PanickingMailboxSender),
3681        );
3682        let (undeliverable_tx, mut undeliverable_rx) =
3683            mailbox.open_port::<Undeliverable<MessageEnvelope>>();
3684        undeliverable_tx.bind_to(Undeliverable::<MessageEnvelope>::port());
3685
3686        // Open a local user u64 port.
3687        let (user_port, mut user_rx) = mailbox.open_port::<u64>();
3688
3689        // Build an envelope destined for this mailbox's own port.
3690        let payload = 0xC0FFEE_u64;
3691        let envelope = MessageEnvelope::serialize(
3692            actor_id.clone(),
3693            user_port.bind().port_id().clone(),
3694            &payload,
3695            Attrs::new(),
3696        )
3697        .expect("serialize");
3698
3699        // Post the message using the mailbox (local path). TTL will
3700        // not expire.
3701        let return_handle = mailbox
3702            .bound_return_handle()
3703            .unwrap_or(monitored_return_handle());
3704        mailbox.post(envelope, return_handle);
3705
3706        // We should receive the payload locally.
3707        #[allow(clippy::disallowed_methods)]
3708        let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
3709            .await
3710            .expect("timed out waiting for local delivery")
3711            .expect("user port closed");
3712        assert_eq!(got, payload);
3713
3714        // There should be no undeliverables arriving.
3715        #[allow(clippy::disallowed_methods)]
3716        let no_undeliverable =
3717            tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
3718        assert!(
3719            no_undeliverable.is_err(),
3720            "unexpected undeliverable returned on successful local delivery"
3721        );
3722    }
3723
3724    #[tokio::test]
3725    async fn test_port_contramap() {
3726        let mbox = Mailbox::new_detached(id!(test[0].test));
3727        let (handle, mut rx) = mbox.open_port();
3728
3729        handle
3730            .contramap(|m| (1, m))
3731            .send("hello".to_string())
3732            .unwrap();
3733        assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
3734    }
3735}