hyperactor/
mailbox.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mailboxes are the central message-passing mechanism in Hyperactor.
10//!
11//! Each actor owns a mailbox to which other actors can deliver messages.
12//! An actor can open one or more typed _ports_ in the mailbox; messages
13//! are in turn delivered to specific ports.
14//!
15//! Mailboxes are associated with an [`ActorId`] (given by `actor_id`
16//! in the following example):
17//!
18//! ```
19//! # use hyperactor::mailbox::Mailbox;
20//! # use hyperactor::reference::{ActorId, ProcId, WorldId};
21//! # tokio_test::block_on(async {
22//! # let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
23//! # let actor_id = ActorId(proc_id, "actor".to_string(), 0);
24//! let mbox = Mailbox::new_detached(actor_id);
25//! let (port, mut receiver) = mbox.open_port::<u64>();
26//!
27//! port.send(123).unwrap();
28//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
29//! # })
30//! ```
31//!
32//! Mailboxes also provide a form of one-shot ports, called [`OncePort`],
33//! that permits at most one message transmission:
34//!
35//! ```
36//! # use hyperactor::mailbox::Mailbox;
37//! # use hyperactor::reference::{ActorId, ProcId, WorldId};
38//! # tokio_test::block_on(async {
39//! # let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
40//! # let actor_id = ActorId(proc_id, "actor".to_string(), 0);
41//! let mbox = Mailbox::new_detached(actor_id);
42//!
43//! let (port, receiver) = mbox.open_once_port::<u64>();
44//!
45//! port.send(123u64).unwrap();
46//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
47//! # })
48//! ```
49//!
50//! [`OncePort`]s are correspondingly used for RPC replies in the actor
51//! system.
52//!
53//! ## Remote ports and serialization
54//!
55//! Mailboxes allow delivery of serialized messages to named ports:
56//!
57//! 1) Ports restrict message types to (serializable) [`Message`]s.
58//! 2) Each [`Port`] is associated with a [`PortId`] which globally names the port.
59//! 3) [`Mailbox`] provides interfaces to deliver serialized
60//!    messages to ports named by their [`PortId`].
61//!
62//! While this complicates the interface somewhat, it allows the
63//! implementation to avoid a serialization roundtrip when passing
64//! messages locally.
65
66#![allow(dead_code)] // Allow until this is used outside of tests.
67
68use std::any::Any;
69use std::collections::BTreeMap;
70use std::collections::BTreeSet;
71use std::fmt;
72use std::fmt::Debug;
73use std::future;
74use std::future::Future;
75use std::ops::Bound::Excluded;
76use std::pin::Pin;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::sync::Mutex;
80use std::sync::OnceLock;
81use std::sync::RwLock;
82use std::sync::Weak;
83use std::sync::atomic::AtomicU64;
84use std::sync::atomic::AtomicUsize;
85use std::sync::atomic::Ordering;
86use std::task::Context;
87use std::task::Poll;
88
89use async_trait::async_trait;
90use dashmap::DashMap;
91use dashmap::mapref::entry::Entry;
92use futures::Sink;
93use futures::Stream;
94use serde::Deserialize;
95use serde::Serialize;
96use serde::de::DeserializeOwned;
97use tokio::sync::mpsc;
98use tokio::sync::oneshot;
99use tokio::sync::watch;
100use tokio::task::JoinHandle;
101use tokio_util::sync::CancellationToken;
102
103use crate as hyperactor; // for macros
104use crate::Named;
105use crate::OncePortRef;
106use crate::PortRef;
107use crate::accum::Accumulator;
108use crate::accum::ReducerOpts;
109use crate::accum::ReducerSpec;
110use crate::actor::Signal;
111use crate::actor::remote::USER_PORT_OFFSET;
112use crate::attrs::Attrs;
113use crate::channel;
114use crate::channel::ChannelAddr;
115use crate::channel::ChannelError;
116use crate::channel::SendError;
117use crate::channel::TxStatus;
118use crate::context;
119use crate::data::Serialized;
120use crate::id;
121use crate::metrics;
122use crate::reference::ActorId;
123use crate::reference::PortId;
124use crate::reference::Reference;
125
126mod undeliverable;
127/// For [`Undeliverable`], a message type for delivery failures.
128pub use undeliverable::Undeliverable;
129pub use undeliverable::UndeliverableMessageError;
130pub use undeliverable::custom_monitored_return_handle;
131pub use undeliverable::monitored_return_handle; // TODO: Audit
132pub use undeliverable::supervise_undeliverable_messages;
133pub use undeliverable::supervise_undeliverable_messages_with;
134/// For [`MailboxAdminMessage`], a message type for mailbox administration.
135pub mod mailbox_admin_message;
136pub use mailbox_admin_message::MailboxAdminMessage;
137pub use mailbox_admin_message::MailboxAdminMessageHandler;
138/// For [`DurableMailboxSender`] a sender with a write-ahead log.
139pub mod durable_mailbox_sender;
140pub use durable_mailbox_sender::log;
141use durable_mailbox_sender::log::*;
142/// For message headers and latency tracking.
143pub mod headers;
144
145/// Message collects the necessary requirements for messages that are deposited
146/// into mailboxes.
147pub trait Message: Debug + Send + Sync + 'static {}
148impl<M: Debug + Send + Sync + 'static> Message for M {}
149
150/// RemoteMessage extends [`Message`] by requiring that the messages
151/// also be serializable, and can thus traverse process boundaries.
152/// RemoteMessages must also specify a globally unique type name (a URI).
153pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
154
155impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
156
157/// Type alias for bytestring data used throughout the system.
158pub type Data = Vec<u8>;
159
160/// Delivery errors occur during message posting.
161#[derive(
162    thiserror::Error,
163    Debug,
164    Serialize,
165    Deserialize,
166    Named,
167    Clone,
168    PartialEq,
169    Eq
170)]
171pub enum DeliveryError {
172    /// The destination address is not reachable.
173    #[error("address not routable: {0}")]
174    Unroutable(String),
175
176    /// A broken link indicates that a link in the message
177    /// delivery path has failed.
178    #[error("broken link: {0}")]
179    BrokenLink(String),
180
181    /// A (local) mailbox delivery error.
182    #[error("mailbox error: {0}")]
183    Mailbox(String),
184
185    /// A multicast related delivery error.
186    #[error("multicast error: {0}")]
187    Multicast(String),
188
189    /// The message went through too many hops and has expired.
190    #[error("ttl expired")]
191    TtlExpired,
192}
193
194/// An envelope that carries a message destined to a remote actor.
195/// The envelope contains a serialized message along with its destination
196/// and sender.
197#[derive(Debug, Serialize, Deserialize, Clone, Named)]
198pub struct MessageEnvelope {
199    /// The sender of this message.
200    sender: ActorId,
201
202    /// The destination of the message.
203    dest: PortId,
204
205    /// The serialized message.
206    data: Serialized,
207
208    /// Error contains a delivery error when message delivery failed.
209    errors: Vec<DeliveryError>,
210
211    /// Additional context for this message.
212    headers: Attrs,
213
214    /// Decremented at every `MailboxSender` hop.
215    ttl: u8,
216
217    /// If true, undeliverable messages should be returned to sender. Else, they
218    /// are dropped.
219    return_undeliverable: bool,
220    // TODO: add typename, source, seq, etc.
221}
222
223impl MessageEnvelope {
224    /// Create a new envelope with the provided sender, destination, and message.
225    pub fn new(sender: ActorId, dest: PortId, data: Serialized, headers: Attrs) -> Self {
226        Self {
227            sender,
228            dest,
229            data,
230            errors: Vec::new(),
231            headers,
232            ttl: crate::config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
233            // By default, all undeliverable messages should be returned to the sender.
234            return_undeliverable: true,
235        }
236    }
237
238    /// Create a new envelope whose sender ID is unknown.
239    pub(crate) fn new_unknown(dest: PortId, data: Serialized) -> Self {
240        Self::new(id!(unknown[0].unknown), dest, data, Attrs::new())
241    }
242
243    /// Construct a new serialized value by serializing the provided T-typed value.
244    pub fn serialize<T: Serialize + Named>(
245        source: ActorId,
246        dest: PortId,
247        value: &T,
248        headers: Attrs,
249    ) -> Result<Self, crate::data::Error> {
250        Ok(Self {
251            headers,
252            data: Serialized::serialize(value)?,
253            sender: source,
254            dest,
255            errors: Vec::new(),
256            ttl: crate::config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
257            // By default, all undeliverable messages should be returned to the sender.
258            return_undeliverable: true,
259        })
260    }
261
262    /// Returns the remaining time-to-live (TTL) for this message.
263    ///
264    /// The TTL is decremented at each `MailboxSender` hop. When it
265    /// reaches 0, the message is considered expired and is returned
266    /// to the sender as undeliverable.
267    pub fn ttl(&self) -> u8 {
268        self.ttl
269    }
270
271    /// Overrides the message’s time-to-live (TTL).
272    ///
273    /// This replaces the current TTL value (normally initialized from
274    /// `config::MESSAGE_TTL_DEFAULT`) with the provided `ttl`. The
275    /// updated envelope is returned for chaining.
276    ///
277    /// # Note
278    /// The TTL is decremented at each `MailboxSender` hop, and when
279    /// it reaches 0 the message will be treated as undeliverable.
280    pub fn set_ttl(mut self, ttl: u8) -> Self {
281        self.ttl = ttl;
282        self
283    }
284
285    /// Decrements the message's TTL by one hop.
286    ///
287    /// Returns `Ok(())` if the TTL was greater than zero and
288    /// successfully decremented. If the TTL was already zero, no
289    /// decrement occurs and `Err(DeliveryError::TtlExpired)` is
290    /// returned, indicating that the message has expired and should
291    /// be treated as undeliverable.
292    fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
293        if self.ttl == 0 {
294            Err(DeliveryError::TtlExpired)
295        } else {
296            self.ttl -= 1;
297            Ok(())
298        }
299    }
300
301    /// Deserialize the message in the envelope to the provided type T.
302    pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
303        self.data.deserialized()
304    }
305
306    /// The serialized message.
307    pub fn data(&self) -> &Serialized {
308        &self.data
309    }
310
311    /// The message sender.
312    pub fn sender(&self) -> &ActorId {
313        &self.sender
314    }
315
316    /// The destination of the message.
317    pub fn dest(&self) -> &PortId {
318        &self.dest
319    }
320
321    /// The message headers.
322    pub fn headers(&self) -> &Attrs {
323        &self.headers
324    }
325
326    /// Tells whether this is a signal message.
327    pub fn is_signal(&self) -> bool {
328        self.dest.index() == Signal::port()
329    }
330
331    /// Set a delivery error for the message. If errors are already set, append
332    /// it to the existing errors.
333    pub fn set_error(&mut self, error: DeliveryError) {
334        self.errors.push(error)
335    }
336
337    /// Change the sender on the envelope in case it was set incorrectly. This
338    /// should only be used by CommActor since it is forwarding from another
339    /// sender.
340    pub fn update_sender(&mut self, sender: ActorId) {
341        self.sender = sender;
342    }
343
344    /// Set to true if you want this message to be returned to sender if it cannot
345    /// reach dest. This is the default.
346    /// Set to false if you want the message to be dropped instead.
347    pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
348        self.return_undeliverable = return_undeliverable;
349    }
350
351    /// The message has been determined to be undeliverable with the
352    /// provided error. Mark the envelope with the error and return to
353    /// sender.
354    pub fn undeliverable(
355        mut self,
356        error: DeliveryError,
357        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
358    ) {
359        tracing::error!(
360            name = "undelivered_message_attempt",
361            sender = self.sender.to_string(),
362            dest = self.dest.to_string(),
363            error = error.to_string(),
364            return_handle = %return_handle,
365        );
366        metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
367            1,
368            hyperactor_telemetry::kv_pairs!(
369                "sender_actor_id" => self.sender.to_string(),
370                "dest_actor_id" => self.dest.to_string(),
371                "message_type" => self.data.typename().unwrap_or("unknown"),
372                "error_type" =>  error.to_string(),
373            ),
374        );
375
376        self.set_error(error);
377        undeliverable::return_undeliverable(return_handle, self);
378    }
379
380    /// Get the errors of why this message was undeliverable. Empty means this
381    /// message was not determined as undeliverable.
382    pub fn errors(&self) -> &Vec<DeliveryError> {
383        &self.errors
384    }
385
386    /// Get the string representation of the errors of this message was
387    /// undeliverable. None means this message was not determined as
388    /// undeliverable.
389    pub fn error_msg(&self) -> Option<String> {
390        if self.errors.is_empty() {
391            None
392        } else {
393            Some(
394                self.errors
395                    .iter()
396                    .map(|e| e.to_string())
397                    .collect::<Vec<_>>()
398                    .join("; "),
399            )
400        }
401    }
402
403    fn open(self) -> (MessageMetadata, Serialized) {
404        let Self {
405            sender,
406            dest,
407            data,
408            errors,
409            headers,
410            ttl,
411            return_undeliverable,
412        } = self;
413
414        (
415            MessageMetadata {
416                sender,
417                dest,
418                errors,
419                headers,
420                ttl,
421                return_undeliverable,
422            },
423            data,
424        )
425    }
426
427    fn seal(metadata: MessageMetadata, data: Serialized) -> Self {
428        let MessageMetadata {
429            sender,
430            dest,
431            errors,
432            headers,
433            ttl,
434            return_undeliverable,
435        } = metadata;
436
437        Self {
438            sender,
439            dest,
440            data,
441            errors,
442            headers,
443            ttl,
444            return_undeliverable,
445        }
446    }
447
448    fn return_undeliverable(&self) -> bool {
449        self.return_undeliverable
450    }
451}
452
453impl fmt::Display for MessageEnvelope {
454    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
455        match &self.error_msg() {
456            None => write!(
457                f,
458                "{} > {}: {} {{{}}}",
459                self.sender, self.dest, self.data, self.headers
460            ),
461            Some(err) => write!(
462                f,
463                "{} > {}: {} {{{}}}: delivery error: {}",
464                self.sender, self.dest, self.data, self.headers, err
465            ),
466        }
467    }
468}
469
470/// Metadata about a message sent via a MessageEnvelope.
471#[derive(Clone)]
472pub struct MessageMetadata {
473    sender: ActorId,
474    dest: PortId,
475    errors: Vec<DeliveryError>,
476    headers: Attrs,
477    ttl: u8,
478    return_undeliverable: bool,
479}
480
481/// Errors that occur during mailbox operations. Each error is associated
482/// with the mailbox's actor id.
483#[derive(Debug)]
484pub struct MailboxError {
485    actor_id: ActorId,
486    kind: MailboxErrorKind,
487}
488
489/// The kinds of mailbox errors. This enum is marked non-exhaustive to
490/// allow for extensibility.
491#[derive(thiserror::Error, Debug)]
492#[non_exhaustive]
493pub enum MailboxErrorKind {
494    /// An operation was attempted on a closed mailbox.
495    #[error("mailbox closed")]
496    Closed,
497
498    /// The port associated with an operation was invalid.
499    #[error("invalid port: {0}")]
500    InvalidPort(PortId),
501
502    /// There was no sender associated with the port.
503    #[error("no sender for port: {0}")]
504    NoSenderForPort(PortId),
505
506    /// There was no local sender associated with the port.
507    /// Returned by operations that require a local port.
508    #[error("no local sender for port: {0}")]
509    NoLocalSenderForPort(PortId),
510
511    /// The port was closed.
512    #[error("{0}: port closed")]
513    PortClosed(PortId),
514
515    /// An error occured during a send operation.
516    #[error("send {0}: {1}")]
517    Send(PortId, #[source] anyhow::Error),
518
519    /// An error occured during a receive operation.
520    #[error("recv {0}: {1}")]
521    Recv(PortId, #[source] anyhow::Error),
522
523    /// There was a serialization failure.
524    #[error("serialize: {0}")]
525    Serialize(#[source] anyhow::Error),
526
527    /// There was a deserialization failure.
528    #[error("deserialize {0}: {1}")]
529    Deserialize(&'static str, anyhow::Error),
530
531    /// There was an error during a channel operation.
532    #[error(transparent)]
533    Channel(#[from] ChannelError),
534}
535
536impl MailboxError {
537    /// Create a new mailbox error associated with the provided actor
538    /// id and of the given kind.
539    pub fn new(actor_id: ActorId, kind: MailboxErrorKind) -> Self {
540        Self { actor_id, kind }
541    }
542
543    /// The ID of the mailbox producing this error.
544    pub fn actor_id(&self) -> &ActorId {
545        &self.actor_id
546    }
547
548    /// The error's kind.
549    pub fn kind(&self) -> &MailboxErrorKind {
550        &self.kind
551    }
552}
553
554impl fmt::Display for MailboxError {
555    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556        write!(f, "{}: ", self.actor_id)?;
557        fmt::Display::fmt(&self.kind, f)
558    }
559}
560
561impl std::error::Error for MailboxError {
562    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
563        self.kind.source()
564    }
565}
566
567/// PortLocation describes the location of a port.
568/// This is used in errors to provide a uniform data type
569/// for ports that may or may not be bound.
570#[derive(Debug, Clone)]
571pub enum PortLocation {
572    /// The port was bound: the location is its underlying bound ID.
573    Bound(PortId),
574    /// The port was not bound: we provide the actor ID and the message type.
575    Unbound(ActorId, &'static str),
576}
577
578impl PortLocation {
579    fn new_unbound<M: Message>(actor_id: ActorId) -> Self {
580        PortLocation::Unbound(actor_id, std::any::type_name::<M>())
581    }
582
583    fn new_unbound_type(actor_id: ActorId, ty: &'static str) -> Self {
584        PortLocation::Unbound(actor_id, ty)
585    }
586
587    /// The actor id of the location.
588    pub fn actor_id(&self) -> &ActorId {
589        match self {
590            PortLocation::Bound(port_id) => port_id.actor_id(),
591            PortLocation::Unbound(actor_id, _) => actor_id,
592        }
593    }
594}
595
596impl fmt::Display for PortLocation {
597    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598        match self {
599            PortLocation::Bound(port_id) => write!(f, "{}", port_id),
600            PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
601        }
602    }
603}
604
605/// Errors that that occur during mailbox sending operations. Each error
606/// is associated with the port ID of the operation.
607#[derive(Debug)]
608pub struct MailboxSenderError {
609    location: Box<PortLocation>,
610    kind: Box<MailboxSenderErrorKind>,
611}
612
613/// The kind of mailbox sending errors.
614#[derive(thiserror::Error, Debug)]
615pub enum MailboxSenderErrorKind {
616    /// Error during serialization.
617    #[error("serialization error: {0}")]
618    Serialize(anyhow::Error),
619
620    /// Error during deserialization.
621    #[error("deserialization error for type {0}: {1}")]
622    Deserialize(&'static str, anyhow::Error),
623
624    /// A send to an invalid port.
625    #[error("invalid port")]
626    Invalid,
627
628    /// A send to a closed port.
629    #[error("port closed")]
630    Closed,
631
632    // The following pass through underlying errors:
633    /// An underlying mailbox error.
634    #[error(transparent)]
635    Mailbox(#[from] MailboxError),
636
637    /// An underlying channel error.
638    #[error(transparent)]
639    Channel(#[from] ChannelError),
640
641    /// An underlying message log error.
642    #[error(transparent)]
643    MessageLog(#[from] MessageLogError),
644
645    /// An other, uncategorized error.
646    #[error("send error: {0}")]
647    Other(#[from] anyhow::Error),
648
649    /// The destination was unreachable.
650    #[error("unreachable: {0}")]
651    Unreachable(anyhow::Error),
652}
653
654impl MailboxSenderError {
655    /// Create a new mailbox sender error to an unbound port.
656    pub fn new_unbound<M>(actor_id: ActorId, kind: MailboxSenderErrorKind) -> Self {
657        Self {
658            location: Box::new(PortLocation::Unbound(actor_id, std::any::type_name::<M>())),
659            kind: Box::new(kind),
660        }
661    }
662
663    /// Create a new mailbox sender, manually providing the type.
664    pub fn new_unbound_type(
665        actor_id: ActorId,
666        kind: MailboxSenderErrorKind,
667        ty: &'static str,
668    ) -> Self {
669        Self {
670            location: Box::new(PortLocation::Unbound(actor_id, ty)),
671            kind: Box::new(kind),
672        }
673    }
674
675    /// Create a new mailbox sender error with the provided port ID and kind.
676    pub fn new_bound(port_id: PortId, kind: MailboxSenderErrorKind) -> Self {
677        Self {
678            location: Box::new(PortLocation::Bound(port_id)),
679            kind: Box::new(kind),
680        }
681    }
682
683    /// The location at which the error occured.
684    pub fn location(&self) -> &PortLocation {
685        &self.location
686    }
687
688    /// The kind associated with the error.
689    pub fn kind(&self) -> &MailboxSenderErrorKind {
690        &self.kind
691    }
692}
693
694impl fmt::Display for MailboxSenderError {
695    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
696        write!(f, "{}: ", self.location)?;
697        fmt::Display::fmt(&self.kind, f)
698    }
699}
700
701impl std::error::Error for MailboxSenderError {
702    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
703        self.kind.source()
704    }
705}
706
707/// MailboxSenders can send messages through ports to mailboxes. It
708/// provides a unified interface for message delivery in the system.
709pub trait MailboxSender: Send + Sync + Debug + Any {
710    /// Apply hop semantics (TTL decrement; undeliverable on 0), then
711    /// delegate to transport.
712    fn post(
713        &self,
714        mut envelope: MessageEnvelope,
715        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
716    ) {
717        if let Err(err) = envelope.dec_ttl_or_err() {
718            envelope.undeliverable(err, return_handle);
719            return;
720        }
721        self.post_unchecked(envelope, return_handle);
722    }
723
724    /// Raw transport: **no** policy.
725    fn post_unchecked(
726        &self,
727        envelope: MessageEnvelope,
728        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
729    );
730}
731
732/// PortSender extends [`MailboxSender`] by providing typed endpoints
733/// for sending messages over ports
734pub trait PortSender: MailboxSender {
735    /// Deliver a message to the provided port.
736    fn serialize_and_send<M: RemoteMessage>(
737        &self,
738        port: &PortRef<M>,
739        message: M,
740        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
741    ) -> Result<(), MailboxSenderError> {
742        // TODO: convert this to a undeliverable error also
743        let serialized = Serialized::serialize(&message).map_err(|err| {
744            MailboxSenderError::new_bound(
745                port.port_id().clone(),
746                MailboxSenderErrorKind::Serialize(err.into()),
747            )
748        })?;
749        self.post(
750            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
751            return_handle,
752        );
753        Ok(())
754    }
755
756    /// Deliver a message to a one-shot port, consuming the provided port,
757    /// which is not reusable.
758    fn serialize_and_send_once<M: RemoteMessage>(
759        &self,
760        once_port: OncePortRef<M>,
761        message: M,
762        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
763    ) -> Result<(), MailboxSenderError> {
764        let serialized = Serialized::serialize(&message).map_err(|err| {
765            MailboxSenderError::new_bound(
766                once_port.port_id().clone(),
767                MailboxSenderErrorKind::Serialize(err.into()),
768            )
769        })?;
770        self.post(
771            MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
772            return_handle,
773        );
774        Ok(())
775    }
776}
777
778impl<T: ?Sized + MailboxSender> PortSender for T {}
779
780/// A perpetually closed mailbox sender. Panics if any messages are posted.
781/// Useful for tests, or where there is no meaningful mailbox sender
782/// implementation available.
783#[derive(Debug, Clone)]
784pub struct PanickingMailboxSender;
785
786impl MailboxSender for PanickingMailboxSender {
787    fn post_unchecked(
788        &self,
789        envelope: MessageEnvelope,
790        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
791    ) {
792        panic!("panic! in the mailbox! attempted post: {}", envelope)
793    }
794}
795
796/// A mailbox sender for undeliverable messages. This will simply record
797/// any undelivered messages.
798#[derive(Debug)]
799pub struct UndeliverableMailboxSender;
800
801impl MailboxSender for UndeliverableMailboxSender {
802    fn post_unchecked(
803        &self,
804        envelope: MessageEnvelope,
805        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
806    ) {
807        let sender_name = envelope.sender.name();
808        let mut error_str = "".to_string();
809        if !envelope.errors.is_empty() {
810            error_str = envelope
811                .errors
812                .iter()
813                .map(|e| e.to_string())
814                .collect::<Vec<_>>()
815                .join("; ");
816        }
817        // The undeliverable message was unable to be delivered back to the
818        // sender for some reason
819        tracing::error!(
820            name = "undelivered_message_abandoned",
821            actor_name = sender_name,
822            actor_id = envelope.sender.to_string(),
823            dest = envelope.dest.to_string(),
824            headers = envelope.headers().to_string(), // todo: implement tracing::Value for Attrs
825            data = envelope.data().to_string(),
826            "message not delivered, {}",
827            error_str,
828        );
829    }
830}
831
832#[derive(Debug)]
833struct Buffer<T: Message> {
834    queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
835    processed: watch::Receiver<usize>,
836    seq: AtomicUsize,
837}
838
839impl<T: Message> Buffer<T> {
840    fn new<Fut>(
841        process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
842    ) -> Self
843    where
844        Fut: Future<Output = ()> + Send + 'static,
845    {
846        let (queue, mut next) = mpsc::unbounded_channel();
847        let (last_processed, processed) = watch::channel(0);
848        crate::init::get_runtime().spawn(async move {
849            let mut seq = 0;
850            while let Some((msg, return_handle)) = next.recv().await {
851                process(msg, return_handle).await;
852                seq += 1;
853                let _ = last_processed.send(seq);
854            }
855        });
856        Self {
857            queue,
858            processed,
859            seq: AtomicUsize::new(0),
860        }
861    }
862
863    #[allow(clippy::result_large_err)]
864    fn send(
865        &self,
866        item: (T, PortHandle<Undeliverable<T>>),
867    ) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
868        self.seq.fetch_add(1, Ordering::SeqCst);
869        self.queue.send(item)?;
870        Ok(())
871    }
872
873    async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
874        let seq = self.seq.load(Ordering::SeqCst);
875        while *self.processed.borrow_and_update() < seq {
876            self.processed.changed().await?;
877        }
878        Ok(())
879    }
880}
881
882static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
883    LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
884
885/// Convenience boxing implementation for MailboxSender. Most APIs
886/// are parameterized on MailboxSender implementations, and it's thus
887/// difficult to work with dyn values.  BoxedMailboxSender bridges this
888/// gap by providing a concrete MailboxSender which dispatches using an
889/// underlying (boxed) dyn.
890#[derive(Debug, Clone)]
891pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
892
893impl BoxedMailboxSender {
894    /// Create a new boxed sender given the provided sender implementation.
895    pub fn new(sender: impl MailboxSender + 'static) -> Self {
896        Self(Arc::new(sender))
897    }
898
899    /// Attempts to downcast the inner sender to the given concrete
900    /// type.
901    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
902        (&*self.0 as &dyn Any).downcast_ref::<T>()
903    }
904}
905
906/// Extension trait that creates a boxed clone of a MailboxSender.
907pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
908    /// A boxed clone of this MailboxSender.
909    fn boxed(&self) -> BoxedMailboxSender;
910}
911impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
912    fn boxed(&self) -> BoxedMailboxSender {
913        BoxedMailboxSender::new(self.clone())
914    }
915}
916
917/// Extension trait that rehomes a MailboxSender into a BoxedMailboxSender.
918pub trait IntoBoxedMailboxSender: MailboxSender {
919    /// Rehome this MailboxSender into a BoxedMailboxSender.
920    fn into_boxed(self) -> BoxedMailboxSender;
921}
922impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
923    fn into_boxed(self) -> BoxedMailboxSender {
924        BoxedMailboxSender::new(self)
925    }
926}
927
928impl MailboxSender for BoxedMailboxSender {
929    fn post_unchecked(
930        &self,
931        envelope: MessageEnvelope,
932        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
933    ) {
934        self.0.post_unchecked(envelope, return_handle);
935    }
936}
937
938/// Errors that occur during mailbox serving.
939#[derive(thiserror::Error, Debug)]
940pub enum MailboxServerError {
941    /// An underlying channel error.
942    #[error(transparent)]
943    Channel(#[from] ChannelError),
944
945    /// An underlying mailbox sender error.
946    #[error(transparent)]
947    MailboxSender(#[from] MailboxSenderError),
948}
949
950/// Represents a running [`MailboxServer`]. The handle composes a
951/// ['tokio::task::JoinHandle'] and may be joined in the same manner.
952#[derive(Debug)]
953pub struct MailboxServerHandle {
954    join_handle: JoinHandle<Result<(), MailboxServerError>>,
955    stopped_tx: watch::Sender<bool>,
956}
957
958impl MailboxServerHandle {
959    /// Signal the server to stop serving the mailbox. The caller should
960    /// join the handle by awaiting the [`MailboxServerHandle`] future.
961    ///
962    /// Stop should be called at most once.
963    pub fn stop(&self, reason: &str) {
964        tracing::info!("stopping mailbox server; reason: {}", reason);
965        self.stopped_tx.send(true).expect("stop called twice");
966    }
967}
968
969/// Forward future implementation to underlying handle.
970impl Future for MailboxServerHandle {
971    type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
972
973    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
974        // SAFETY: This is safe to do because self is pinned.
975        let join_handle_pinned =
976            unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
977        join_handle_pinned.poll(cx)
978    }
979}
980
981// A `MailboxServer` (such as a router) can receive a message
982// that couldn't reach its destination. We can use the fact that
983// servers are `MailboxSender`s to attempt to forward them back to
984// their senders.
985fn server_return_handle<T: MailboxServer>(server: T) -> PortHandle<Undeliverable<MessageEnvelope>> {
986    let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
987
988    tokio::task::spawn(async move {
989        while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
990            if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
991            {
992                // A non-returnable undeliverable.
993                UndeliverableMailboxSender.post(e, monitored_return_handle());
994                continue;
995            }
996            envelope.set_error(DeliveryError::BrokenLink(
997                "message was undeliverable".to_owned(),
998            ));
999            server.post(
1000                MessageEnvelope::new(
1001                    envelope.sender().clone(),
1002                    PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1003                        envelope.sender(),
1004                    )
1005                    .port_id()
1006                    .clone(),
1007                    Serialized::serialize(&Undeliverable(envelope)).unwrap(),
1008                    Attrs::new(),
1009                ),
1010                monitored_return_handle(),
1011            );
1012        }
1013    });
1014
1015    return_handle
1016}
1017
1018/// Serve a port on the provided [`channel::Rx`]. This dispatches all
1019/// channel messages directly to the port.
1020pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
1021    /// Serve the provided port on the given channel on this sender on
1022    /// a background task which may be joined with the returned handle.
1023    /// The task fails on any send error.
1024    fn serve(
1025        self,
1026        mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
1027    ) -> MailboxServerHandle {
1028        // A `MailboxServer` can receive a message that couldn't
1029        // reach its destination. We can use the fact that servers are
1030        // `MailboxSender`s to attempt to forward them back to their
1031        // senders.
1032        let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
1033        let server = self.clone();
1034        tokio::task::spawn(async move {
1035            while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
1036                if let Ok(Undeliverable(e)) =
1037                    envelope.deserialized::<Undeliverable<MessageEnvelope>>()
1038                {
1039                    // A non-returnable undeliverable.
1040                    UndeliverableMailboxSender.post(e, monitored_return_handle());
1041                    continue;
1042                }
1043                envelope.set_error(DeliveryError::BrokenLink(
1044                    "message was undeliverable".to_owned(),
1045                ));
1046                server.post(
1047                    MessageEnvelope::new(
1048                        envelope.sender().clone(),
1049                        PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1050                            envelope.sender(),
1051                        )
1052                        .port_id()
1053                        .clone(),
1054                        Serialized::serialize(&Undeliverable(envelope)).unwrap(),
1055                        Attrs::new(),
1056                    ),
1057                    monitored_return_handle(),
1058                );
1059            }
1060        });
1061
1062        let (stopped_tx, mut stopped_rx) = watch::channel(false);
1063        let join_handle = tokio::spawn(async move {
1064            let mut detached = false;
1065
1066            loop {
1067                if *stopped_rx.borrow_and_update() {
1068                    break Ok(());
1069                }
1070
1071                tokio::select! {
1072                    message = rx.recv() => {
1073                        match message {
1074                            // Relay the message to the port directly.
1075                            Ok(envelope) => self.post(envelope, return_handle.clone()),
1076
1077                            // Closed is a "graceful" error in this case.
1078                            // We simply stop serving.
1079                            Err(ChannelError::Closed) => break Ok(()),
1080                            Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1081                        }
1082                    }
1083                    result = stopped_rx.changed(), if !detached  => {
1084                        detached = result.is_err();
1085                        if detached {
1086                            tracing::debug!(
1087                                "the mailbox server is detached for Rx {}", rx.addr()
1088                            );
1089                        } else {
1090                            tracing::debug!(
1091                                "the mailbox server is stopped for Rx {}", rx.addr()
1092                            );
1093                        }
1094                    }
1095                }
1096            }
1097        });
1098
1099        MailboxServerHandle {
1100            join_handle,
1101            stopped_tx,
1102        }
1103    }
1104}
1105
1106impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1107
1108/// A mailbox server client that transmits messages on a Tx channel.
1109#[derive(Debug)]
1110pub struct MailboxClient {
1111    // The unbounded sender.
1112    buffer: Buffer<MessageEnvelope>,
1113
1114    // To cancel monitoring tx health.
1115    _tx_monitoring: CancellationToken,
1116}
1117
1118impl MailboxClient {
1119    /// Create a new client that sends messages destined for a
1120    /// [`MailboxServer`] on the provided Tx channel.
1121    pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1122        let addr = tx.addr();
1123        let tx = Arc::new(tx);
1124        let tx_status = tx.status().clone();
1125        let tx_monitoring = CancellationToken::new();
1126        let buffer = Buffer::new(move |envelope, return_handle| {
1127            let tx = Arc::clone(&tx);
1128            let (return_channel, return_receiver) =
1129                oneshot::channel::<SendError<MessageEnvelope>>();
1130            // Set up for delivery failure.
1131            let return_handle_0 = return_handle.clone();
1132            tokio::spawn(async move {
1133                let result = return_receiver.await;
1134                if let Ok(SendError(e, message)) = result {
1135                    message.undeliverable(
1136                        DeliveryError::BrokenLink(format!(
1137                            "failed to enqueue in MailboxClient when processing buffer: {e}"
1138                        )),
1139                        return_handle_0,
1140                    );
1141                }
1142            });
1143            // Send the message for transmission.
1144            tx.try_post(envelope, return_channel);
1145            future::ready(())
1146        });
1147        let this = Self {
1148            buffer,
1149            _tx_monitoring: tx_monitoring.clone(),
1150        };
1151        Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1152        this
1153    }
1154
1155    /// Convenience constructor, to set up a mailbox client that forwards messages
1156    /// to the provided address.
1157    pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1158        Ok(MailboxClient::new(channel::dial(addr)?))
1159    }
1160
1161    // Set up a watch for the tx's health.
1162    fn monitor_tx_health(
1163        mut rx: watch::Receiver<TxStatus>,
1164        cancel_token: CancellationToken,
1165        addr: ChannelAddr,
1166    ) {
1167        crate::init::get_runtime().spawn(async move {
1168            loop {
1169                tokio::select! {
1170                    changed = rx.changed() => {
1171                        if changed.is_err() || *rx.borrow() == TxStatus::Closed {
1172                            tracing::warn!("connection to {} lost", addr);
1173                            // TODO: Potential for supervision event
1174                            // interaction here.
1175                            break;
1176                        }
1177                    }
1178                    _ = cancel_token.cancelled() => {
1179                        break;
1180                    }
1181                }
1182            }
1183        });
1184    }
1185}
1186
1187impl MailboxSender for MailboxClient {
1188    fn post_unchecked(
1189        &self,
1190        envelope: MessageEnvelope,
1191        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1192    ) {
1193        tracing::event!(target:"messages", tracing::Level::TRACE,  "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1194        if let Err(mpsc::error::SendError((envelope, return_handle))) =
1195            self.buffer.send((envelope, return_handle))
1196        {
1197            let err = DeliveryError::BrokenLink(
1198                "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
1199            );
1200
1201            // Failed to enqueue.
1202            envelope.undeliverable(err, return_handle);
1203        }
1204    }
1205}
1206
1207/// Wrapper to turn `PortRef` into a `Sink`.
1208pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1209    cx: C,
1210    port: PortRef<M>,
1211}
1212
1213impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1214    /// Create new PortSink
1215    pub fn new(cx: C, port: PortRef<M>) -> Self {
1216        Self { cx, port }
1217    }
1218}
1219
1220impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1221    type Error = MailboxSenderError;
1222
1223    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1224        Poll::Ready(Ok(()))
1225    }
1226
1227    fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1228        self.port.send(&self.cx, item)
1229    }
1230
1231    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1232        Poll::Ready(Ok(()))
1233    }
1234
1235    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1236        Poll::Ready(Ok(()))
1237    }
1238}
1239
1240/// A mailbox coordinates message delivery to actors through typed
1241/// [`Port`]s associated with the mailbox.
1242#[derive(Clone, Debug)]
1243pub struct Mailbox {
1244    inner: Arc<State>,
1245}
1246
1247impl Mailbox {
1248    /// Create a new mailbox associated with the provided actor ID, using the provided
1249    /// forwarder for external destinations.
1250    pub fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
1251        Self {
1252            inner: Arc::new(State::new(actor_id, forwarder)),
1253        }
1254    }
1255
1256    /// Create a new detached mailbox associated with the provided actor ID.
1257    pub fn new_detached(actor_id: ActorId) -> Self {
1258        Self {
1259            inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1260        }
1261    }
1262
1263    /// The actor id associated with this mailbox.
1264    pub fn actor_id(&self) -> &ActorId {
1265        &self.inner.actor_id
1266    }
1267
1268    /// Open a new port that accepts M-typed messages. The returned
1269    /// port may be freely cloned, serialized, and passed around. The
1270    /// returned receiver should only be retained by the actor responsible
1271    /// for processing the delivered messages.
1272    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1273        let port_index = self.inner.allocate_port();
1274        let (sender, receiver) = mpsc::unbounded_channel::<M>();
1275        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1276        tracing::trace!(
1277            name = "open_port",
1278            "opening port for {} at {}",
1279            self.inner.actor_id,
1280            port_id
1281        );
1282        (
1283            PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1284            PortReceiver::new(receiver, port_id, /*coalesce=*/ false, self.clone()),
1285        )
1286    }
1287
1288    /// Bind this message's actor port to this actor's mailbox. This method is
1289    /// normally used:
1290    ///   1. when we need to intercept a message sent to a handler, and re-route
1291    ///      that message to the returned receiver;
1292    ///   2. mock this message's handler when it is not implemented for this actor
1293    ///      type, with the returned receiver.
1294    pub(crate) fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1295        let (handle, receiver) = self.open_port();
1296        handle.bind_actor_port();
1297        (handle, receiver)
1298    }
1299
1300    /// Open a new port with an accumulator with default reduce options.
1301    /// See [`open_accum_port_opts`] for more details.
1302    pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1303    where
1304        A: Accumulator + Send + Sync + 'static,
1305        A::Update: Message,
1306        A::State: Message + Default + Clone,
1307    {
1308        self.open_accum_port_opts(accum, None)
1309    }
1310
1311    /// Open a new port with an accumulator. This port accepts A::Update type
1312    /// messages, accumulate them into A::State with the given accumulator.
1313    /// The latest changed state can be received from the returned receiver as
1314    /// a single A::State message. If there is no new update, the receiver will
1315    /// not receive any message.
1316    ///
1317    /// If provided, reducer options are applied to reduce operations.
1318    pub fn open_accum_port_opts<A>(
1319        &self,
1320        accum: A,
1321        reducer_opts: Option<ReducerOpts>,
1322    ) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1323    where
1324        A: Accumulator + Send + Sync + 'static,
1325        A::Update: Message,
1326        A::State: Message + Default + Clone,
1327    {
1328        let port_index = self.inner.allocate_port();
1329        let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1330        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1331        let state = Mutex::new(A::State::default());
1332        let reducer_spec = accum.reducer_spec();
1333        let enqueue = move |_, update: A::Update| {
1334            let mut state = state.lock().unwrap();
1335            accum.accumulate(&mut state, update)?;
1336            let _ = sender.send(state.clone());
1337            Ok(())
1338        };
1339        (
1340            PortHandle {
1341                mailbox: self.clone(),
1342                port_index,
1343                sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1344                bound: Arc::new(OnceLock::new()),
1345                reducer_spec,
1346                reducer_opts,
1347            },
1348            PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()),
1349        )
1350    }
1351
1352    /// Open a port that accepts M-typed messages, using the provided function
1353    /// to enqueue.
1354    // TODO: consider making lifetime bound to Self instead.
1355    pub(crate) fn open_enqueue_port<M: Message>(
1356        &self,
1357        enqueue: impl Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1358    ) -> PortHandle<M> {
1359        PortHandle {
1360            mailbox: self.clone(),
1361            port_index: self.inner.allocate_port(),
1362            sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1363            bound: Arc::new(OnceLock::new()),
1364            reducer_spec: None,
1365            reducer_opts: None,
1366        }
1367    }
1368
1369    /// Open a new one-shot port that accepts M-typed messages. The
1370    /// returned port may be used to send a single message; ditto the
1371    /// receiver may receive a single message.
1372    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1373        let port_index = self.inner.allocate_port();
1374        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1375        let (sender, receiver) = oneshot::channel::<M>();
1376        (
1377            OncePortHandle {
1378                mailbox: self.clone(),
1379                port_index,
1380                port_id: port_id.clone(),
1381                sender,
1382            },
1383            OncePortReceiver {
1384                receiver: Some(receiver),
1385                port_id,
1386                mailbox: self.clone(),
1387            },
1388        )
1389    }
1390
1391    fn error(&self, err: MailboxErrorKind) -> MailboxError {
1392        MailboxError::new(self.inner.actor_id.clone(), err)
1393    }
1394
1395    fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1396        let port_index = M::port();
1397        self.inner.ports.get(&port_index).and_then(|boxed| {
1398            boxed
1399                .as_any()
1400                .downcast_ref::<UnboundedSender<M>>()
1401                .map(|s| {
1402                    assert_eq!(
1403                        s.port_id,
1404                        self.actor_id().port_id(port_index),
1405                        "port_id mismatch in downcasted UnboundedSender"
1406                    );
1407                    s.sender.clone()
1408                })
1409        })
1410    }
1411
1412    /// Retrieve the bound undeliverable message port handle.
1413    pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1414        self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1415            .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1416    }
1417
1418    pub(crate) fn allocate_port(&self) -> u64 {
1419        self.inner.allocate_port()
1420    }
1421
1422    fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
1423        assert_eq!(
1424            handle.mailbox.actor_id(),
1425            self.actor_id(),
1426            "port does not belong to mailbox"
1427        );
1428
1429        // TODO: don't even allocate a port until the port is bound. Possibly
1430        // have handles explicitly staged (unbound, bound).
1431        let port_id = self.actor_id().port_id(handle.port_index);
1432        match self.inner.ports.entry(handle.port_index) {
1433            Entry::Vacant(entry) => {
1434                entry.insert(Box::new(UnboundedSender::new(
1435                    handle.sender.clone(),
1436                    port_id.clone(),
1437                )));
1438            }
1439            Entry::Occupied(_entry) => {}
1440        }
1441
1442        PortRef::attest(port_id)
1443    }
1444
1445    fn bind_to_actor_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
1446        assert_eq!(
1447            handle.mailbox.actor_id(),
1448            self.actor_id(),
1449            "port does not belong to mailbox"
1450        );
1451
1452        let port_index = M::port();
1453        let port_id = self.actor_id().port_id(port_index);
1454        match self.inner.ports.entry(port_index) {
1455            Entry::Vacant(entry) => {
1456                entry.insert(Box::new(UnboundedSender::new(
1457                    handle.sender.clone(),
1458                    port_id,
1459                )));
1460            }
1461            Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1462        }
1463    }
1464
1465    fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1466        let port_id = handle.port_id().clone();
1467        match self.inner.ports.entry(handle.port_index) {
1468            Entry::Vacant(entry) => {
1469                entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1470            }
1471            Entry::Occupied(_entry) => {}
1472        }
1473    }
1474
1475    pub(crate) fn bind_untyped(&self, port_id: &PortId, sender: UntypedUnboundedSender) {
1476        assert_eq!(
1477            port_id.actor_id(),
1478            self.actor_id(),
1479            "port does not belong to mailbox"
1480        );
1481
1482        match self.inner.ports.entry(port_id.index()) {
1483            Entry::Vacant(entry) => {
1484                entry.insert(Box::new(sender));
1485            }
1486            Entry::Occupied(_entry) => {}
1487        }
1488    }
1489}
1490
1491impl context::Mailbox for Mailbox {
1492    fn mailbox(&self) -> &Mailbox {
1493        self
1494    }
1495}
1496
1497// TODO: figure out what to do with these interfaces -- possibly these caps
1498// do not have to be private.
1499
1500/// Open a port given a capability.
1501pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1502    cx.mailbox().open_port()
1503}
1504
1505/// Open a one-shot port given a capability. This is a public method primarily to
1506/// enable macro-generated clients.
1507pub fn open_once_port<M: Message>(
1508    cx: &impl context::Mailbox,
1509) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1510    cx.mailbox().open_once_port()
1511}
1512
1513impl MailboxSender for Mailbox {
1514    /// Deliver a serialized message to the provided port ID. This method fails
1515    /// if the message does not deserialize into the expected type.
1516    fn post_unchecked(
1517        &self,
1518        envelope: MessageEnvelope,
1519        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1520    ) {
1521        metrics::MAILBOX_POSTS.add(
1522            1,
1523            hyperactor_telemetry::kv_pairs!(
1524                "actor_id" => envelope.sender.to_string(),
1525                "dest_actor_id" => envelope.dest.0.to_string(),
1526            ),
1527        );
1528        tracing::trace!(
1529            name = "post",
1530            actor_name = envelope.sender.name(),
1531            actor_id = envelope.sender.to_string(),
1532            "posting message to {}",
1533            envelope.dest
1534        );
1535
1536        if envelope.dest().actor_id() != &self.inner.actor_id {
1537            return self.inner.forwarder.post(envelope, return_handle);
1538        }
1539
1540        match self.inner.ports.entry(envelope.dest().index()) {
1541            Entry::Vacant(_) => {
1542                let err = DeliveryError::Unroutable(format!(
1543                    "port not bound in mailbox; port id: {}; message type: {}",
1544                    envelope.dest().index(),
1545                    envelope.data().typename().map_or_else(
1546                        || format!("unregistered type hash {}", envelope.data().typehash()),
1547                        |s| s.to_string(),
1548                    )
1549                ));
1550
1551                envelope.undeliverable(err, return_handle);
1552            }
1553            Entry::Occupied(entry) => {
1554                let (metadata, data) = envelope.open();
1555                let MessageMetadata {
1556                    headers,
1557                    sender,
1558                    dest,
1559                    errors: metadata_errors,
1560                    ttl,
1561                    return_undeliverable,
1562                } = metadata;
1563
1564                // We use the entry API here so that we can remove the
1565                // entry while holding an (entry) reference. The DashMap
1566                // documentation suggests that deadlocks are possible
1567                // "when holding any sort of reference into the map",
1568                // but surely this applies only to the same thread? This
1569                // would also imply we have to be careful holding any
1570                // sort of reference across .await points.
1571                match entry.get().send_serialized(headers, data) {
1572                    Ok(false) => {
1573                        entry.remove();
1574                    }
1575                    Ok(true) => (),
1576                    Err(SerializedSenderError {
1577                        data,
1578                        error: sender_error,
1579                        headers,
1580                    }) => {
1581                        let err = DeliveryError::Mailbox(format!("{}", sender_error));
1582
1583                        MessageEnvelope::seal(
1584                            MessageMetadata {
1585                                headers,
1586                                sender,
1587                                dest,
1588                                errors: metadata_errors,
1589                                ttl,
1590                                return_undeliverable,
1591                            },
1592                            data,
1593                        )
1594                        .undeliverable(err, return_handle)
1595                    }
1596                }
1597            }
1598        }
1599    }
1600}
1601
1602/// A port to which M-typed messages can be delivered. Ports may be
1603/// serialized to be sent to other actors. However, when a port is
1604/// deserialized, it may no longer be used to send messages directly
1605/// to a mailbox since it is no longer associated with a local mailbox
1606/// ([`Mailbox::send`] will fail). However, the runtime may accept
1607/// remote Ports, and arrange for these messages to be delivered
1608/// indirectly through inter-node message passing.
1609#[derive(Debug)]
1610pub struct PortHandle<M: Message> {
1611    mailbox: Mailbox,
1612    port_index: u64,
1613    sender: UnboundedPortSender<M>,
1614    // We would like this to be a Arc<OnceLock<PortRef<M>>>, but we cannot
1615    // write down the type PortRef<M> (M: Message), even though we cannot
1616    // legally construct such a value without M: RemoteMessage. We could consider
1617    // making PortRef<M> valid for M: Message, but constructible only for
1618    // M: RemoteMessage, but the guarantees offered by the impossibilty of even
1619    // writing down the type are appealing.
1620    bound: Arc<OnceLock<PortId>>,
1621    // Typehash of an optional reducer. When it's defined, we include it in port
1622    /// references to optionally enable incremental accumulation.
1623    reducer_spec: Option<ReducerSpec>,
1624    /// Reduction options. If unspecified, we use `ReducerOpts::default`.
1625    reducer_opts: Option<ReducerOpts>,
1626}
1627
1628impl<M: Message> PortHandle<M> {
1629    fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1630        Self {
1631            mailbox,
1632            port_index,
1633            sender,
1634            bound: Arc::new(OnceLock::new()),
1635            reducer_spec: None,
1636            reducer_opts: None,
1637        }
1638    }
1639
1640    fn location(&self) -> PortLocation {
1641        match self.bound.get() {
1642            Some(port_id) => PortLocation::Bound(port_id.clone()),
1643            None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1644        }
1645    }
1646
1647    /// Send a message to this port.
1648    pub fn send(&self, message: M) -> Result<(), MailboxSenderError> {
1649        let mut headers = Attrs::new();
1650
1651        crate::mailbox::headers::set_send_timestamp(&mut headers);
1652        crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1653
1654        self.sender.send(headers, message).map_err(|err| {
1655            MailboxSenderError::new_unbound::<M>(
1656                self.mailbox.actor_id().clone(),
1657                MailboxSenderErrorKind::Other(err),
1658            )
1659        })
1660    }
1661
1662    /// A contravariant map: using the provided function to translate
1663    /// `R`-typed messages to `M`-typed ones, delivered on this port.
1664    pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
1665    where
1666        R: Message,
1667        F: Fn(R) -> M + Send + Sync + 'static,
1668    {
1669        let port_index = self.mailbox.inner.allocate_port();
1670        let sender = self.sender.clone();
1671        PortHandle::new(
1672            self.mailbox.clone(),
1673            port_index,
1674            UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
1675                sender.send(headers, unmap(value))
1676            })),
1677        )
1678    }
1679}
1680
1681impl<M: RemoteMessage> PortHandle<M> {
1682    /// Bind this port, making it accessible to remote actors.
1683    pub fn bind(&self) -> PortRef<M> {
1684        PortRef::attest_reducible(
1685            self.bound
1686                .get_or_init(|| self.mailbox.bind(self).port_id().clone())
1687                .clone(),
1688            self.reducer_spec.clone(),
1689        )
1690    }
1691
1692    /// Bind to this message's actor port. This method will panic if the handle
1693    /// is already bound.
1694    ///
1695    /// This is used by [`actor::Binder`] implementations to bind actor refs.
1696    /// This is not intended for general use.
1697    pub(crate) fn bind_actor_port(&self) {
1698        let port_id = self.mailbox.actor_id().port_id(M::port());
1699        self.bound
1700            .set(port_id)
1701            .map_err(|p| {
1702                format!(
1703                    "could not bind port handle {} as {p}: already bound",
1704                    self.port_index
1705                )
1706            })
1707            .unwrap();
1708        self.mailbox.bind_to_actor_port(self);
1709    }
1710}
1711
1712impl<M: Message> Clone for PortHandle<M> {
1713    fn clone(&self) -> Self {
1714        Self {
1715            mailbox: self.mailbox.clone(),
1716            port_index: self.port_index,
1717            sender: self.sender.clone(),
1718            bound: self.bound.clone(),
1719            reducer_spec: self.reducer_spec.clone(),
1720            reducer_opts: self.reducer_opts.clone(),
1721        }
1722    }
1723}
1724
1725impl<M: Message> fmt::Display for PortHandle<M> {
1726    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1727        fmt::Display::fmt(&self.location(), f)
1728    }
1729}
1730
1731/// A one-shot port handle to which M-typed messages can be delivered.
1732#[derive(Debug)]
1733pub struct OncePortHandle<M: Message> {
1734    mailbox: Mailbox,
1735    port_index: u64,
1736    port_id: PortId,
1737    sender: oneshot::Sender<M>,
1738}
1739
1740impl<M: Message> OncePortHandle<M> {
1741    /// This port's ID.
1742    // TODO: make value
1743    pub fn port_id(&self) -> &PortId {
1744        &self.port_id
1745    }
1746
1747    /// Send a message to this port. The send operation will consume the
1748    /// port handle, as the port accepts at most one message.
1749    pub fn send(self, message: M) -> Result<(), MailboxSenderError> {
1750        let actor_id = self.mailbox.actor_id().clone();
1751        self.sender.send(message).map_err(|_| {
1752            // Here, the value is returned when the port is
1753            // closed.  We should consider having a similar
1754            // API for send_once, though arguably it makes less
1755            // sense in this context.
1756            MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1757        })?;
1758        Ok(())
1759    }
1760}
1761
1762impl<M: RemoteMessage> OncePortHandle<M> {
1763    /// Turn this handle into a ref that may be passed to
1764    /// a remote actor. The remote actor can then use the
1765    /// ref to send a message to the port. Creating a ref also
1766    /// binds the port, so that it is remotely writable.
1767    pub fn bind(self) -> OncePortRef<M> {
1768        let port_id = self.port_id().clone();
1769        self.mailbox.clone().bind_once(self);
1770        OncePortRef::attest(port_id)
1771    }
1772}
1773
1774impl<M: Message> fmt::Display for OncePortHandle<M> {
1775    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1776        fmt::Display::fmt(&self.port_id(), f)
1777    }
1778}
1779
1780/// A receiver of M-typed messages, used by actors to receive messages
1781/// on open ports.
1782#[derive(Debug)]
1783pub struct PortReceiver<M> {
1784    receiver: mpsc::UnboundedReceiver<M>,
1785    port_id: PortId,
1786    /// When multiple messages are put in channel, only receive the latest one
1787    /// if coalesce is true. Other messages will be discarded.
1788    coalesce: bool,
1789    /// State is used to remove the port from service when the receiver
1790    /// is dropped.
1791    mailbox: Mailbox,
1792}
1793
1794impl<M> PortReceiver<M> {
1795    fn new(
1796        receiver: mpsc::UnboundedReceiver<M>,
1797        port_id: PortId,
1798        coalesce: bool,
1799        mailbox: Mailbox,
1800    ) -> Self {
1801        Self {
1802            receiver,
1803            port_id,
1804            coalesce,
1805            mailbox,
1806        }
1807    }
1808
1809    /// Tries to receive the next value for this receiver.
1810    /// This function returns `Ok(None)` if the receiver is empty
1811    /// and returns a MailboxError if the receiver is disconnected.
1812    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxError`.
1813    pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
1814        let mut next = self.receiver.try_recv();
1815        // To coalesce, drain the mpsc queue and only keep the last one.
1816        if self.coalesce
1817            && let Some(latest) = self.drain().pop()
1818        {
1819            next = Ok(latest);
1820        }
1821        match next {
1822            Ok(msg) => Ok(Some(msg)),
1823            Err(mpsc::error::TryRecvError::Empty) => Ok(None),
1824            Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
1825                self.actor_id().clone(),
1826                MailboxErrorKind::Closed,
1827            )),
1828        }
1829    }
1830
1831    /// Receive the next message from the port corresponding with this
1832    /// receiver.
1833    pub async fn recv(&mut self) -> Result<M, MailboxError> {
1834        let mut next = self.receiver.recv().await;
1835        // To coalesce, get the last message from the queue if there are
1836        // more on the mspc queue.
1837        if self.coalesce
1838            && let Some(latest) = self.drain().pop()
1839        {
1840            next = Some(latest);
1841        }
1842        next.ok_or(MailboxError::new(
1843            self.actor_id().clone(),
1844            MailboxErrorKind::Closed,
1845        ))
1846    }
1847
1848    /// Drains all available messages from the port.
1849    pub fn drain(&mut self) -> Vec<M> {
1850        let mut drained: Vec<M> = Vec::new();
1851        while let Ok(msg) = self.receiver.try_recv() {
1852            // To coalesce, discard the old message if there is any.
1853            if self.coalesce {
1854                drained.pop();
1855            }
1856            drained.push(msg);
1857        }
1858        drained
1859    }
1860
1861    fn port(&self) -> u64 {
1862        self.port_id.1
1863    }
1864
1865    fn actor_id(&self) -> &ActorId {
1866        &self.port_id.0
1867    }
1868}
1869
1870impl<M> Drop for PortReceiver<M> {
1871    fn drop(&mut self) {
1872        // MARIUS: do we need to tombstone these? or should we
1873        // error out if we have removed the receiver before serializing the port ref?
1874        // ("no longer live")?
1875        self.mailbox.inner.ports.remove(&self.port());
1876    }
1877}
1878
1879impl<M> Stream for PortReceiver<M> {
1880    type Item = Result<M, MailboxError>;
1881
1882    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1883        std::pin::pin!(self.recv()).poll(cx).map(Some)
1884    }
1885}
1886
1887/// A receiver of M-typed messages from [`OncePort`]s.
1888pub struct OncePortReceiver<M> {
1889    receiver: Option<oneshot::Receiver<M>>,
1890    port_id: PortId,
1891
1892    /// Mailbox is used to remove the port from service when the receiver
1893    /// is dropped.
1894    mailbox: Mailbox,
1895}
1896
1897impl<M> OncePortReceiver<M> {
1898    /// Receive message from the one-shot port associated with this
1899    /// receiver.  Recv consumes the receiver: it is no longer valid
1900    /// after this call.
1901    pub async fn recv(mut self) -> Result<M, MailboxError> {
1902        std::mem::take(&mut self.receiver)
1903            .unwrap()
1904            .await
1905            .map_err(|err| {
1906                MailboxError::new(
1907                    self.actor_id().clone(),
1908                    MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
1909                )
1910            })
1911    }
1912
1913    fn port(&self) -> u64 {
1914        self.port_id.1
1915    }
1916
1917    fn actor_id(&self) -> &ActorId {
1918        &self.port_id.0
1919    }
1920}
1921
1922impl<M> Drop for OncePortReceiver<M> {
1923    fn drop(&mut self) {
1924        // MARIUS: do we need to tombstone these? or should we
1925        // error out if we have removed the receiver before serializing the port ref?
1926        // ("no longer live")?
1927        self.mailbox.inner.ports.remove(&self.port());
1928    }
1929}
1930
1931/// Error that that occur during SerializedSender's send operation.
1932pub struct SerializedSenderError {
1933    /// The headers associated with the message.
1934    pub headers: Attrs,
1935    /// The message was tried to send.
1936    pub data: Serialized,
1937    /// The mailbox sender error that occurred.
1938    pub error: MailboxSenderError,
1939}
1940
1941/// SerializedSender encapsulates senders:
1942///   - It performs type erasure (and thus it is object-safe).
1943///   - It abstracts over [`Port`]s and [`OncePort`]s, by dynamically tracking the
1944///     validity of the underlying port.
1945trait SerializedSender: Send + Sync {
1946    /// Enables downcasting from `&dyn SerializedSender` to concrete
1947    /// types.
1948    ///
1949    /// Used by `Mailbox::lookup_sender` to downcast to
1950    /// `&UnboundedSender<M>` via `Any::downcast_ref`.
1951    fn as_any(&self) -> &dyn Any;
1952
1953    /// Send a serialized message. SerializedSender will deserialize the
1954    /// message (failing if it fails to deserialize), and then send the
1955    /// resulting message on the underlying port.
1956    ///
1957    /// Send_serialized returns true whenever the port remains valid
1958    /// after the send operation.
1959    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SerializedSender`.
1960    fn send_serialized(
1961        &self,
1962        headers: Attrs,
1963        serialized: Serialized,
1964    ) -> Result<bool, SerializedSenderError>;
1965}
1966
1967/// A sender to an M-typed unbounded port.
1968enum UnboundedPortSender<M: Message> {
1969    /// Send directly to the mpsc queue.
1970    Mpsc(mpsc::UnboundedSender<M>),
1971    /// Use the provided function to enqueue the item.
1972    Func(Arc<dyn Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
1973}
1974
1975impl<M: Message> UnboundedPortSender<M> {
1976    fn send(&self, headers: Attrs, message: M) -> Result<(), anyhow::Error> {
1977        match self {
1978            Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
1979            Self::Func(func) => func(headers, message),
1980        }
1981    }
1982}
1983
1984// We implement Clone manually as derive(Clone) places unnecessarily
1985// strict bounds on the type parameter M.
1986impl<M: Message> Clone for UnboundedPortSender<M> {
1987    fn clone(&self) -> Self {
1988        match self {
1989            Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
1990            Self::Func(func) => Self::Func(func.clone()),
1991        }
1992    }
1993}
1994
1995impl<M: Message> Debug for UnboundedPortSender<M> {
1996    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1997        match self {
1998            Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
1999            Self::Func(_) => f
2000                .debug_tuple("UnboundedPortSender::Func")
2001                .field(&"..")
2002                .finish(),
2003        }
2004    }
2005}
2006
2007struct UnboundedSender<M: Message> {
2008    sender: UnboundedPortSender<M>,
2009    port_id: PortId,
2010}
2011
2012impl<M: Message> UnboundedSender<M> {
2013    /// Create a new UnboundedSender encapsulating the provided
2014    /// sender.
2015    fn new(sender: UnboundedPortSender<M>, port_id: PortId) -> Self {
2016        Self { sender, port_id }
2017    }
2018
2019    fn send(&self, headers: Attrs, message: M) -> Result<(), MailboxSenderError> {
2020        self.sender.send(headers, message).map_err(|err| {
2021            MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
2022        })
2023    }
2024}
2025
2026// Clone is implemented explicitly because the derive macro demands M:
2027// Clone directly. In this case, it isn't needed because Arc<T> can
2028// clone for any T.
2029impl<M: Message> Clone for UnboundedSender<M> {
2030    fn clone(&self) -> Self {
2031        Self {
2032            sender: self.sender.clone(),
2033            port_id: self.port_id.clone(),
2034        }
2035    }
2036}
2037
2038impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
2039    fn as_any(&self) -> &dyn Any {
2040        self
2041    }
2042
2043    fn send_serialized(
2044        &self,
2045        headers: Attrs,
2046        serialized: Serialized,
2047    ) -> Result<bool, SerializedSenderError> {
2048        // Here, the stack ensures that this port is only instantiated for M-typed messages.
2049        // This does not protect against bad senders (e.g., encoding wrongly-typed messages),
2050        // but it is required as we have some usages that rely on representational equivalence
2051        // to provide type indexing, specifically in `IndexedErasedUnbound` which is used to
2052        // support port aggregation.
2053        match serialized.deserialized_unchecked() {
2054            Ok(message) => {
2055                self.sender.send(headers.clone(), message).map_err(|err| {
2056                    SerializedSenderError {
2057                        data: serialized,
2058                        error: MailboxSenderError::new_bound(
2059                            self.port_id.clone(),
2060                            MailboxSenderErrorKind::Other(err),
2061                        ),
2062                        headers,
2063                    }
2064                })?;
2065
2066                Ok(true)
2067            }
2068            Err(err) => Err(SerializedSenderError {
2069                data: serialized,
2070                error: MailboxSenderError::new_bound(
2071                    self.port_id.clone(),
2072                    MailboxSenderErrorKind::Deserialize(M::typename(), err),
2073                ),
2074                headers,
2075            }),
2076        }
2077    }
2078}
2079
2080/// OnceSender encapsulates an underlying one-shot sender, dynamically
2081/// tracking its validity.
2082#[derive(Debug)]
2083struct OnceSender<M: Message> {
2084    sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
2085    port_id: PortId,
2086}
2087
2088impl<M: Message> OnceSender<M> {
2089    /// Create a new OnceSender encapsulating the provided one-shot
2090    /// sender.
2091    fn new(sender: oneshot::Sender<M>, port_id: PortId) -> Self {
2092        Self {
2093            sender: Arc::new(Mutex::new(Some(sender))),
2094            port_id,
2095        }
2096    }
2097
2098    fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2099        // TODO: we should replace the sender on error
2100        match self.sender.lock().unwrap().take() {
2101            None => Err(MailboxSenderError::new_bound(
2102                self.port_id.clone(),
2103                MailboxSenderErrorKind::Closed,
2104            )),
2105            Some(sender) => {
2106                sender.send(message).map_err(|_| {
2107                    // Here, the value is returned when the port is
2108                    // closed.  We should consider having a similar
2109                    // API for send_once, though arguably it makes less
2110                    // sense in this context.
2111                    MailboxSenderError::new_bound(
2112                        self.port_id.clone(),
2113                        MailboxSenderErrorKind::Closed,
2114                    )
2115                })?;
2116                Ok(false)
2117            }
2118        }
2119    }
2120}
2121
2122// Clone is implemented explicitly because the derive macro demands M:
2123// Clone directly. In this case, it isn't needed because Arc<T> can
2124// clone for any T.
2125impl<M: Message> Clone for OnceSender<M> {
2126    fn clone(&self) -> Self {
2127        Self {
2128            sender: self.sender.clone(),
2129            port_id: self.port_id.clone(),
2130        }
2131    }
2132}
2133
2134impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2135    fn as_any(&self) -> &dyn Any {
2136        self
2137    }
2138
2139    fn send_serialized(
2140        &self,
2141        headers: Attrs,
2142        serialized: Serialized,
2143    ) -> Result<bool, SerializedSenderError> {
2144        match serialized.deserialized() {
2145            Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2146                data: serialized,
2147                error: e,
2148                headers,
2149            }),
2150            Err(err) => Err(SerializedSenderError {
2151                data: serialized,
2152                error: MailboxSenderError::new_bound(
2153                    self.port_id.clone(),
2154                    MailboxSenderErrorKind::Deserialize(M::typename(), err),
2155                ),
2156                headers,
2157            }),
2158        }
2159    }
2160}
2161
2162/// Use the provided function to send untyped messages (i.e. Serialized objects).
2163pub(crate) struct UntypedUnboundedSender {
2164    pub(crate) sender:
2165        Box<dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync>,
2166    pub(crate) port_id: PortId,
2167}
2168
2169impl SerializedSender for UntypedUnboundedSender {
2170    fn as_any(&self) -> &dyn Any {
2171        self
2172    }
2173
2174    fn send_serialized(
2175        &self,
2176        headers: Attrs,
2177        serialized: Serialized,
2178    ) -> Result<bool, SerializedSenderError> {
2179        (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2180            data,
2181            error: MailboxSenderError::new_bound(
2182                self.port_id.clone(),
2183                MailboxSenderErrorKind::Other(err),
2184            ),
2185            headers,
2186        })?;
2187
2188        Ok(true)
2189    }
2190}
2191
2192/// State is the internal state of the mailbox.
2193struct State {
2194    /// The ID of the mailbox owner.
2195    actor_id: ActorId,
2196
2197    // insert if it's serializable; otherwise don't.
2198    /// The set of active ports in the mailbox. All currently
2199    /// allocated ports are
2200    ports: DashMap<u64, Box<dyn SerializedSender>>,
2201
2202    /// The next port ID to allocate.
2203    next_port: AtomicU64,
2204
2205    /// The forwarder for this mailbox.
2206    forwarder: BoxedMailboxSender,
2207}
2208
2209impl State {
2210    /// Create a new state with the provided owning ActorId.
2211    fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
2212        Self {
2213            actor_id,
2214            ports: DashMap::new(),
2215            // The first 1024 ports are allocated to actor handlers.
2216            // Other port IDs are ephemeral.
2217            next_port: AtomicU64::new(USER_PORT_OFFSET),
2218            forwarder,
2219        }
2220    }
2221
2222    /// Allocate a fresh port.
2223    fn allocate_port(&self) -> u64 {
2224        self.next_port.fetch_add(1, Ordering::SeqCst)
2225    }
2226}
2227
2228impl fmt::Debug for State {
2229    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2230        f.debug_struct("State")
2231            .field("actor_id", &self.actor_id)
2232            .field(
2233                "open_ports",
2234                &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2235            )
2236            .field("next_port", &self.next_port)
2237            .finish()
2238    }
2239}
2240
2241// TODO: mux based on some parameterized type. (mux key).
2242/// An in-memory mailbox muxer. This is used to route messages to
2243/// different underlying senders.
2244#[derive(Debug, Clone)]
2245pub struct MailboxMuxer {
2246    mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2247}
2248
2249impl Default for MailboxMuxer {
2250    fn default() -> Self {
2251        Self::new()
2252    }
2253}
2254
2255impl MailboxMuxer {
2256    /// Create a new, empty, muxer.
2257    pub fn new() -> Self {
2258        Self {
2259            mailboxes: Arc::new(DashMap::new()),
2260        }
2261    }
2262
2263    /// Route messages destined for the provided actor id to the provided
2264    /// sender. Returns false if there is already a sender associated
2265    /// with the actor. In this case, the sender is not replaced, and
2266    /// the caller must [`MailboxMuxer::unbind`] it first.
2267    pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
2268        match self.mailboxes.entry(actor_id) {
2269            Entry::Occupied(_) => false,
2270            Entry::Vacant(entry) => {
2271                entry.insert(Box::new(sender));
2272                true
2273            }
2274        }
2275    }
2276
2277    /// Convenience function to bind a mailbox.
2278    pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2279        self.bind(mailbox.actor_id().clone(), mailbox)
2280    }
2281
2282    /// Unbind the sender associated with the provided actor ID. After
2283    /// unbinding, the muxer will no longer be able to send messages to
2284    /// that actor.
2285    pub(crate) fn unbind(&self, actor_id: &ActorId) {
2286        self.mailboxes.remove(actor_id);
2287    }
2288
2289    /// Returns a list of all actors bound to this muxer. Useful in debugging.
2290    pub fn bound_actors(&self) -> Vec<ActorId> {
2291        self.mailboxes.iter().map(|e| e.key().clone()).collect()
2292    }
2293}
2294
2295impl MailboxSender for MailboxMuxer {
2296    fn post_unchecked(
2297        &self,
2298        envelope: MessageEnvelope,
2299        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2300    ) {
2301        let dest_actor_id = envelope.dest().actor_id();
2302        match self.mailboxes.get(envelope.dest().actor_id()) {
2303            None => {
2304                let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2305                envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2306            }
2307            Some(sender) => sender.post(envelope, return_handle),
2308        }
2309    }
2310}
2311
2312/// MailboxRouter routes messages to the sender that is bound to its
2313/// nearest prefix.
2314#[derive(Debug, Clone)]
2315pub struct MailboxRouter {
2316    entries: Arc<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2317}
2318
2319impl Default for MailboxRouter {
2320    fn default() -> Self {
2321        Self::new()
2322    }
2323}
2324
2325impl MailboxRouter {
2326    /// Create a new, empty router.
2327    pub fn new() -> Self {
2328        Self {
2329            entries: Arc::new(RwLock::new(BTreeMap::new())),
2330        }
2331    }
2332
2333    /// Downgrade this router to a [`WeakMailboxRouter`].
2334    pub fn downgrade(&self) -> WeakMailboxRouter {
2335        WeakMailboxRouter(Arc::downgrade(&self.entries))
2336    }
2337
2338    /// Returns a new router that will first attempt to find a route for the message
2339    /// in the router's table; otherwise post the message to the provided fallback
2340    /// sender.
2341    pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2342        FallbackMailboxRouter {
2343            router: self.clone(),
2344            default,
2345        }
2346    }
2347
2348    /// Bind the provided sender to the given reference. The destination
2349    /// is treated as a prefix to which messages can be routed, and
2350    /// messages are routed to their longest matching prefix.
2351    pub fn bind(&self, dest: Reference, sender: impl MailboxSender + 'static) {
2352        let mut w = self.entries.write().unwrap();
2353        w.insert(dest, Arc::new(sender));
2354    }
2355
2356    fn sender(&self, actor_id: &ActorId) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2357        match self
2358            .entries
2359            .read()
2360            .unwrap()
2361            .lower_bound(Excluded(&actor_id.clone().into()))
2362            .prev()
2363        {
2364            None => None,
2365            Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2366                Some(sender.clone())
2367            }
2368            Some(_) => None,
2369        }
2370    }
2371}
2372
2373impl MailboxSender for MailboxRouter {
2374    fn post_unchecked(
2375        &self,
2376        envelope: MessageEnvelope,
2377        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2378    ) {
2379        match self.sender(envelope.dest().actor_id()) {
2380            None => envelope.undeliverable(
2381                DeliveryError::Unroutable(
2382                    "no destination found for actor in routing table".to_string(),
2383                ),
2384                return_handle,
2385            ),
2386            Some(sender) => sender.post(envelope, return_handle),
2387        }
2388    }
2389}
2390
2391#[derive(Debug, Clone)]
2392struct FallbackMailboxRouter {
2393    router: MailboxRouter,
2394    default: BoxedMailboxSender,
2395}
2396
2397impl MailboxSender for FallbackMailboxRouter {
2398    fn post_unchecked(
2399        &self,
2400        envelope: MessageEnvelope,
2401        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2402    ) {
2403        match self.router.sender(envelope.dest().actor_id()) {
2404            Some(sender) => sender.post(envelope, return_handle),
2405            None => self.default.post(envelope, return_handle),
2406        }
2407    }
2408}
2409
2410/// A version of [`MailboxRouter`] that holds a weak reference to the underlying
2411/// state. This allows router references to be circular: an entity holding a reference
2412/// to the router may also contain the router itself.
2413///
2414/// TODO: this currently holds a weak reference to the entire router. This helps
2415/// prevent cycle leaks, but can cause excess memory usage as the cycle is at
2416/// the granularity of each entry. Possibly the router should allow weak references
2417/// on a per-entry basis.
2418#[derive(Debug, Clone)]
2419pub struct WeakMailboxRouter(
2420    Weak<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2421);
2422
2423impl WeakMailboxRouter {
2424    /// Upgrade the weak router to a strong reference router.
2425    pub fn upgrade(&self) -> Option<MailboxRouter> {
2426        self.0.upgrade().map(|entries| MailboxRouter { entries })
2427    }
2428}
2429
2430impl MailboxSender for WeakMailboxRouter {
2431    fn post_unchecked(
2432        &self,
2433        envelope: MessageEnvelope,
2434        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2435    ) {
2436        match self.upgrade() {
2437            Some(router) => router.post(envelope, return_handle),
2438            None => envelope.undeliverable(
2439                DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2440                return_handle,
2441            ),
2442        }
2443    }
2444}
2445
2446/// A dynamic mailbox router that supports remote delivery.
2447///
2448/// `DialMailboxRouter` maintains a runtime address book mapping
2449/// references to `ChannelAddr`s. It holds a cache of active
2450/// connections and forwards messages to the appropriate
2451/// `MailboxClient`.
2452///
2453/// If a message destination is not bound, but is a "direct mode" address
2454/// (i.e., its proc id contains the channel address through which the proc
2455/// is reachable), then DialMailboxRouter dials the proc directly.
2456///
2457/// Messages sent to unknown destinations are routed to the `default`
2458/// sender, if present.
2459#[derive(Debug, Clone)]
2460pub struct DialMailboxRouter {
2461    address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,
2462    sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2463
2464    // The default sender, to which messages for unknown recipients
2465    // are sent. (This is like a default route in a routing table.)
2466    default: BoxedMailboxSender,
2467
2468    // When true, only dial direct-addressed procs if their transport
2469    // type is remote. Otherwise, fall back to the default sender.
2470    direct_addressed_remote_only: bool,
2471}
2472
2473impl Default for DialMailboxRouter {
2474    fn default() -> Self {
2475        Self::new()
2476    }
2477}
2478
2479impl DialMailboxRouter {
2480    /// Create a new [`DialMailboxRouter`] with an empty routing table.
2481    pub fn new() -> Self {
2482        Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2483    }
2484
2485    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2486    /// and a default sender. Any message with an unknown destination is
2487    /// dispatched on this default sender, unless the destination is
2488    /// direct-addressed, in which case it is dialed directly.
2489    pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2490        Self {
2491            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2492            sender_cache: Arc::new(DashMap::new()),
2493            default,
2494            direct_addressed_remote_only: false,
2495        }
2496    }
2497
2498    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2499    /// and a default sender. Any message with an unknown destination is
2500    /// dispatched on this default sender, unless the destination is
2501    /// direct-addressed *and* has a remote channel transport type.
2502    pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
2503        Self {
2504            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2505            sender_cache: Arc::new(DashMap::new()),
2506            default,
2507            direct_addressed_remote_only: true,
2508        }
2509    }
2510
2511    /// Binds a [`Reference`] to a [`ChannelAddr`], replacing any
2512    /// existing binding.
2513    ///
2514    /// If the address changes, the old sender is evicted from the
2515    /// cache to ensure fresh routing on next use.
2516    pub fn bind(&self, dest: Reference, addr: ChannelAddr) {
2517        if let Ok(mut w) = self.address_book.write() {
2518            if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
2519                && old_addr != addr
2520            {
2521                tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2522                self.sender_cache.remove(&old_addr);
2523            }
2524        } else {
2525            tracing::error!("address book poisoned during bind of {:?}", dest);
2526        }
2527    }
2528
2529    /// Removes all address mappings with the given prefix from the
2530    /// router.
2531    ///
2532    /// Also evicts any corresponding cached senders to prevent reuse
2533    /// of stale connections.
2534    pub fn unbind(&self, dest: &Reference) {
2535        if let Ok(mut w) = self.address_book.write() {
2536            let to_remove: Vec<(Reference, ChannelAddr)> = w
2537                .range(dest..)
2538                .take_while(|(key, _)| dest.is_prefix_of(key))
2539                .map(|(key, addr)| (key.clone(), addr.clone()))
2540                .collect();
2541
2542            for (key, addr) in to_remove {
2543                tracing::info!("unbinding {:?} from {:?}", key, addr);
2544                w.remove(&key);
2545                self.sender_cache.remove(&addr);
2546            }
2547        } else {
2548            tracing::error!("address book poisoned during unbind of {:?}", dest);
2549        }
2550    }
2551
2552    /// Lookup an actor's channel in the router's address bok.
2553    pub fn lookup_addr(&self, actor_id: &ActorId) -> Option<ChannelAddr> {
2554        let address_book = self.address_book.read().unwrap();
2555        let found = address_book
2556            .lower_bound(Excluded(&actor_id.clone().into()))
2557            .prev();
2558
2559        // First try to look up the address in our address book; failing that,
2560        // try to resolve direct procs.
2561        if let Some((key, addr)) = found
2562            && key.is_prefix_of(&actor_id.clone().into())
2563        {
2564            Some(addr.clone())
2565        } else if actor_id.proc_id().is_direct() {
2566            let (addr, _name) = actor_id.proc_id().clone().into_direct().unwrap();
2567            if self.direct_addressed_remote_only {
2568                addr.transport().is_remote().then_some(addr)
2569            } else {
2570                Some(addr)
2571            }
2572        } else {
2573            None
2574        }
2575    }
2576
2577    /// Return all covering prefixes of this router. That is, all references that are not
2578    /// prefixed by another reference in the routing table
2579    pub fn prefixes(&self) -> BTreeSet<Reference> {
2580        let addrs = self.address_book.read().unwrap();
2581        let mut prefixes: BTreeSet<Reference> = BTreeSet::new();
2582        for (reference, _) in addrs.iter() {
2583            match prefixes.lower_bound(Excluded(reference)).peek_prev() {
2584                Some(candidate) if candidate.is_prefix_of(reference) => (),
2585                _ => {
2586                    prefixes.insert(reference.clone());
2587                }
2588            }
2589        }
2590
2591        prefixes
2592    }
2593
2594    fn dial(
2595        &self,
2596        addr: &ChannelAddr,
2597        actor_id: &ActorId,
2598    ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2599        // Get the sender. Create it if needed. Do not send the
2600        // messages inside this block so we do not hold onto the
2601        // reference of the dashmap entries.
2602        match self.sender_cache.entry(addr.clone()) {
2603            Entry::Occupied(entry) => Ok(entry.get().clone()),
2604            Entry::Vacant(entry) => {
2605                let tx = channel::dial(addr.clone()).map_err(|err| {
2606                    MailboxSenderError::new_unbound_type(
2607                        actor_id.clone(),
2608                        MailboxSenderErrorKind::Channel(err),
2609                        "unknown",
2610                    )
2611                })?;
2612                let sender = MailboxClient::new(tx);
2613                Ok(entry.insert(Arc::new(sender)).value().clone())
2614            }
2615        }
2616    }
2617}
2618
2619impl MailboxSender for DialMailboxRouter {
2620    fn post_unchecked(
2621        &self,
2622        envelope: MessageEnvelope,
2623        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2624    ) {
2625        let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2626            self.default.post(envelope, return_handle);
2627            return;
2628        };
2629
2630        match self.dial(&addr, envelope.dest().actor_id()) {
2631            Err(err) => envelope.undeliverable(
2632                DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2633                return_handle,
2634            ),
2635            Ok(sender) => sender.post(envelope, return_handle),
2636        }
2637    }
2638}
2639
2640/// A MailboxSender that reports any envelope as undeliverable due to
2641/// routing failure.
2642#[derive(Debug)]
2643pub struct UnroutableMailboxSender;
2644
2645impl MailboxSender for UnroutableMailboxSender {
2646    fn post_unchecked(
2647        &self,
2648        envelope: MessageEnvelope,
2649        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2650    ) {
2651        envelope.undeliverable(
2652            DeliveryError::Unroutable("destination not found in routing table".to_string()),
2653            return_handle,
2654        );
2655    }
2656}
2657
2658#[cfg(test)]
2659mod tests {
2660
2661    use std::assert_matches::assert_matches;
2662    use std::mem::drop;
2663    use std::str::FromStr;
2664    use std::sync::atomic::AtomicUsize;
2665    use std::time::Duration;
2666
2667    use timed_test::async_timed_test;
2668
2669    use super::*;
2670    use crate::Actor;
2671    use crate::ActorHandle;
2672    use crate::Instance;
2673    use crate::PortId;
2674    use crate::accum;
2675    use crate::channel::ChannelTransport;
2676    use crate::channel::dial;
2677    use crate::channel::serve;
2678    use crate::channel::sim::SimAddr;
2679    use crate::clock::Clock;
2680    use crate::clock::RealClock;
2681    use crate::data::Serialized;
2682    use crate::id;
2683    use crate::proc::Proc;
2684    use crate::reference::ProcId;
2685    use crate::reference::WorldId;
2686    use crate::simnet;
2687
2688    #[test]
2689    fn test_error() {
2690        let err = MailboxError::new(
2691            ActorId(
2692                ProcId::Ranked(WorldId("myworld".to_string()), 2),
2693                "myactor".to_string(),
2694                5,
2695            ),
2696            MailboxErrorKind::Closed,
2697        );
2698        assert_eq!(format!("{}", err), "myworld[2].myactor[5]: mailbox closed");
2699    }
2700
2701    #[tokio::test]
2702    async fn test_mailbox_basic() {
2703        let mbox = Mailbox::new_detached(id!(test[0].test));
2704        let (port, mut receiver) = mbox.open_port::<u64>();
2705        let port = port.bind();
2706
2707        mbox.serialize_and_send(&port, 123, monitored_return_handle())
2708            .unwrap();
2709        mbox.serialize_and_send(&port, 321, monitored_return_handle())
2710            .unwrap();
2711        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2712        assert_eq!(receiver.recv().await.unwrap(), 321u64);
2713
2714        let serialized = Serialized::serialize(&999u64).unwrap();
2715        mbox.post(
2716            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2717            monitored_return_handle(),
2718        );
2719        assert_eq!(receiver.recv().await.unwrap(), 999u64);
2720    }
2721
2722    #[tokio::test]
2723    async fn test_mailbox_accum() {
2724        let mbox = Mailbox::new_detached(id!(test[0].test));
2725        let (port, mut receiver) = mbox.open_accum_port(accum::max::<i64>());
2726
2727        for i in -3..4 {
2728            port.send(i).unwrap();
2729            let received: accum::Max<i64> = receiver.recv().await.unwrap();
2730            let msg = received.get();
2731            assert_eq!(msg, &i);
2732        }
2733        // Send a smaller or same value. Should still receive the previous max.
2734        for i in -3..4 {
2735            port.send(i).unwrap();
2736            assert_eq!(receiver.recv().await.unwrap().get(), &3);
2737        }
2738        // send a larger value. Should receive the new max.
2739        port.send(4).unwrap();
2740        assert_eq!(receiver.recv().await.unwrap().get(), &4);
2741
2742        // Send multiple updates. Should only receive the final change.
2743        for i in 5..10 {
2744            port.send(i).unwrap();
2745        }
2746        assert_eq!(receiver.recv().await.unwrap().get(), &9);
2747        port.send(1).unwrap();
2748        port.send(3).unwrap();
2749        port.send(2).unwrap();
2750        assert_eq!(receiver.recv().await.unwrap().get(), &9);
2751    }
2752
2753    #[test]
2754    fn test_port_and_reducer() {
2755        let mbox = Mailbox::new_detached(id!(test[0].test));
2756        // accum port could have reducer typehash
2757        {
2758            let accumulator = accum::max::<u64>();
2759            let reducer_spec = accumulator.reducer_spec().unwrap();
2760            let (port, _) = mbox.open_accum_port(accum::max::<u64>());
2761            assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
2762            let port_ref = port.bind();
2763            assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
2764        }
2765        // normal port should not have reducer typehash
2766        {
2767            let (port, _) = mbox.open_port::<u64>();
2768            assert_eq!(port.reducer_spec, None);
2769            let port_ref = port.bind();
2770            assert_eq!(port_ref.reducer_spec(), &None);
2771        }
2772    }
2773
2774    #[tokio::test]
2775    #[ignore] // error behavior changed, but we will bring it back
2776    async fn test_mailbox_once() {
2777        let mbox = Mailbox::new_detached(id!(test[0].test));
2778
2779        let (port, receiver) = mbox.open_once_port::<u64>();
2780
2781        // let port_id = port.port_id().clone();
2782
2783        port.send(123u64).unwrap();
2784        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2785
2786        // // The borrow checker won't let us send again on the port
2787        // // (good!), but we stashed the port-id and so we can try on the
2788        // // serialized interface.
2789        // let Err(err) = mbox
2790        //     .send_serialized(&port_id, &Serialized(Vec::new()))
2791        //     .await
2792        // else {
2793        //     unreachable!()
2794        // };
2795        // assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2796    }
2797
2798    #[tokio::test]
2799    #[ignore] // changed error behavior
2800    async fn test_mailbox_receiver_drop() {
2801        let mbox = Mailbox::new_detached(id!(test[0].test));
2802        let (port, mut receiver) = mbox.open_port::<u64>();
2803        // Make sure we go through "remote" path.
2804        let port = port.bind();
2805        mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
2806            .unwrap();
2807        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2808        drop(receiver);
2809        let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
2810            panic!();
2811        };
2812
2813        assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2814        assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
2815    }
2816
2817    #[tokio::test]
2818    async fn test_drain() {
2819        let mbox = Mailbox::new_detached(id!(test[0].test));
2820
2821        let (port, mut receiver) = mbox.open_port();
2822        let port = port.bind();
2823
2824        for i in 0..10 {
2825            mbox.serialize_and_send(&port, i, monitored_return_handle())
2826                .unwrap();
2827        }
2828
2829        for i in 0..10 {
2830            assert_eq!(receiver.recv().await.unwrap(), i);
2831        }
2832
2833        assert!(receiver.drain().is_empty());
2834    }
2835
2836    #[tokio::test]
2837    async fn test_mailbox_muxer() {
2838        let muxer = MailboxMuxer::new();
2839
2840        let mbox0 = Mailbox::new_detached(id!(test[0].actor1));
2841        let mbox1 = Mailbox::new_detached(id!(test[0].actor2));
2842
2843        muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
2844        muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
2845
2846        let (port, receiver) = mbox0.open_once_port::<u64>();
2847
2848        port.send(123u64).unwrap();
2849        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2850
2851        /*
2852        let (tx, rx) = channel::local::new::<u64>();
2853        let (port, _) = mbox0.open_port::<u64>();
2854        let handle = muxer.clone().serve_port(port, rx).unwrap();
2855        muxer.unbind(mbox0.actor_id());
2856        tx.send(123u64).await.unwrap();
2857        let Ok(Err(err)) = handle.await else { panic!() };
2858        assert_eq!(err.actor_id(), &actor_id(0));
2859        */
2860    }
2861
2862    #[tokio::test]
2863    async fn test_local_client_server() {
2864        let mbox = Mailbox::new_detached(id!(test[0].actor0));
2865        let (tx, rx) = channel::local::new();
2866        let serve_handle = mbox.clone().serve(rx);
2867        let client = MailboxClient::new(tx);
2868
2869        let (port, receiver) = mbox.open_once_port::<u64>();
2870        let port = port.bind();
2871
2872        client
2873            .serialize_and_send_once(port, 123u64, monitored_return_handle())
2874            .unwrap();
2875        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2876        serve_handle.stop("fromt test");
2877        serve_handle.await.unwrap().unwrap();
2878    }
2879
2880    #[tokio::test]
2881    async fn test_sim_client_server() {
2882        simnet::start();
2883        let dst_addr = SimAddr::new("local:1".parse::<ChannelAddr>().unwrap()).unwrap();
2884        let src_to_dst = ChannelAddr::Sim(
2885            SimAddr::new_with_src(
2886                "local:0".parse::<ChannelAddr>().unwrap(),
2887                dst_addr.addr().clone(),
2888            )
2889            .unwrap(),
2890        );
2891
2892        let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone())).unwrap();
2893        let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
2894        let mbox = Mailbox::new_detached(id!(test[0].actor0));
2895        let serve_handle = mbox.clone().serve(rx);
2896        let client = MailboxClient::new(tx);
2897        let (port, receiver) = mbox.open_once_port::<u64>();
2898        let port = port.bind();
2899        let msg: u64 = 123;
2900        client
2901            .serialize_and_send_once(port, msg, monitored_return_handle())
2902            .unwrap();
2903        assert_eq!(receiver.recv().await.unwrap(), msg);
2904        serve_handle.stop("from test");
2905        serve_handle.await.unwrap().unwrap();
2906    }
2907
2908    #[tokio::test]
2909    async fn test_mailbox_router() {
2910        let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2911        let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2912        let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2913        let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2914
2915        let comms: Vec<(OncePortRef<u64>, OncePortReceiver<u64>)> =
2916            [&mbox0, &mbox1, &mbox2, &mbox3]
2917                .into_iter()
2918                .map(|mbox| {
2919                    let (port, receiver) = mbox.open_once_port::<u64>();
2920                    (port.bind(), receiver)
2921                })
2922                .collect();
2923
2924        let router = MailboxRouter::new();
2925
2926        router.bind(id!(world0).into(), mbox0);
2927        router.bind(id!(world1[0]).into(), mbox1);
2928        router.bind(id!(world1[1]).into(), mbox2);
2929        router.bind(id!(world1[1].actor1).into(), mbox3);
2930
2931        for (i, (port, receiver)) in comms.into_iter().enumerate() {
2932            router
2933                .serialize_and_send_once(port, i as u64, monitored_return_handle())
2934                .unwrap();
2935            assert_eq!(receiver.recv().await.unwrap(), i as u64);
2936        }
2937
2938        // Test undeliverable messages, and that it is delivered with the appropriate fallback.
2939
2940        let mbox4 = Mailbox::new_detached(id!(fallback[0].actor));
2941
2942        let (return_handle, mut return_receiver) =
2943            crate::mailbox::undeliverable::new_undeliverable_port();
2944        let (port, _receiver) = mbox4.open_once_port();
2945        router
2946            .serialize_and_send_once(port.bind(), 0, return_handle.clone())
2947            .unwrap();
2948        assert!(return_receiver.recv().await.is_ok());
2949
2950        let router = router.fallback(mbox4.clone().into_boxed());
2951        let (port, receiver) = mbox4.open_once_port();
2952        router
2953            .serialize_and_send_once(port.bind(), 0, return_handle)
2954            .unwrap();
2955        assert_eq!(receiver.recv().await.unwrap(), 0);
2956    }
2957
2958    #[tokio::test]
2959    async fn test_dial_mailbox_router() {
2960        let router = DialMailboxRouter::new();
2961
2962        router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
2963        router.bind(id!(world1[0]).into(), "unix!@2".parse().unwrap());
2964        router.bind(id!(world1[1]).into(), "unix!@3".parse().unwrap());
2965        router.bind(id!(world1[1].actor1).into(), "unix!@4".parse().unwrap());
2966        // Bind a direct address -- we should use its bound address!
2967        router.bind(
2968            "unix:@4,my_proc,my_actor".parse().unwrap(),
2969            "unix:@5".parse().unwrap(),
2970        );
2971
2972        // We should be able to lookup the ids
2973        router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2974        router.lookup_addr(&id!(world1[0].actor[0])).unwrap();
2975
2976        let actor_id = Reference::from_str("unix:@4,my_proc,my_actor")
2977            .unwrap()
2978            .into_actor()
2979            .unwrap();
2980        assert_eq!(
2981            router.lookup_addr(&actor_id).unwrap(),
2982            "unix!@5".parse().unwrap(),
2983        );
2984        router.unbind(&actor_id.clone().into());
2985        assert_eq!(
2986            router.lookup_addr(&actor_id).unwrap(),
2987            "unix!@4".parse().unwrap(),
2988        );
2989
2990        // Unbind so we cannot find the ids anymore
2991        router.unbind(&id!(world1).into());
2992        assert!(router.lookup_addr(&id!(world1[0].actor1[0])).is_none());
2993        assert!(router.lookup_addr(&id!(world1[1].actor1[0])).is_none());
2994        assert!(router.lookup_addr(&id!(world1[2].actor1[0])).is_none());
2995        router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2996        router.unbind(&id!(world0).into());
2997        assert!(router.lookup_addr(&id!(world0[0].actor[0])).is_none());
2998    }
2999
3000    #[tokio::test]
3001    #[ignore] // TODO: there's a leak here, fix it
3002    async fn test_dial_mailbox_router_default() {
3003        let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
3004        let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
3005        let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
3006        let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
3007
3008        // We don't need to dial here, since we gain direct access to the
3009        // underlying routers.
3010        let root = MailboxRouter::new();
3011        let world0_router = DialMailboxRouter::new_with_default(root.boxed());
3012        let world1_router = DialMailboxRouter::new_with_default(root.boxed());
3013
3014        root.bind(id!(world0).into(), world0_router.clone());
3015        root.bind(id!(world1).into(), world1_router.clone());
3016
3017        let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
3018
3019        let mut handles = Vec::new(); // hold on to handles, or channels get closed
3020        for mbox in mailboxes.iter() {
3021            let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
3022            let handle = (*mbox).clone().serve(rx);
3023            handles.push(handle);
3024
3025            eprintln!("{}: {}", mbox.actor_id(), addr);
3026            if mbox.actor_id().world_name() == "world0" {
3027                world0_router.bind(mbox.actor_id().clone().into(), addr);
3028            } else {
3029                world1_router.bind(mbox.actor_id().clone().into(), addr);
3030            }
3031        }
3032
3033        // Make sure nodes are fully connected.
3034        for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
3035            for mbox in mailboxes.iter() {
3036                let (port, receiver) = mbox.open_once_port::<u64>();
3037                let port = port.bind();
3038                router
3039                    .serialize_and_send_once(port, 123u64, monitored_return_handle())
3040                    .unwrap();
3041                assert_eq!(receiver.recv().await.unwrap(), 123u64);
3042            }
3043        }
3044    }
3045
3046    #[tokio::test]
3047    async fn test_enqueue_port() {
3048        let mbox = Mailbox::new_detached(id!(test[0].test));
3049
3050        let count = Arc::new(AtomicUsize::new(0));
3051        let count_clone = count.clone();
3052        let port = mbox.open_enqueue_port(move |_, n| {
3053            count_clone.fetch_add(n, Ordering::SeqCst);
3054            Ok(())
3055        });
3056
3057        port.send(10).unwrap();
3058        port.send(5).unwrap();
3059        port.send(1).unwrap();
3060        port.send(0).unwrap();
3061
3062        assert_eq!(count.load(Ordering::SeqCst), 16);
3063    }
3064
3065    #[derive(Clone, Debug, Serialize, Deserialize, Named)]
3066    struct TestMessage;
3067
3068    #[derive(Clone, Debug, Serialize, Deserialize, Named)]
3069    #[named(name = "some::custom::path")]
3070    struct TestMessage2;
3071
3072    #[test]
3073    fn test_remote_message_macros() {
3074        assert_eq!(
3075            TestMessage::typename(),
3076            "hyperactor::mailbox::tests::TestMessage"
3077        );
3078        assert_eq!(TestMessage2::typename(), "some::custom::path");
3079    }
3080
3081    #[test]
3082    fn test_message_envelope_display() {
3083        #[derive(Named, Serialize, Deserialize)]
3084        struct MyTest {
3085            a: u64,
3086            b: String,
3087        }
3088        crate::register_type!(MyTest);
3089
3090        let envelope = MessageEnvelope::serialize(
3091            id!(source[0].actor),
3092            id!(dest[1].actor[0][123]),
3093            &MyTest {
3094                a: 123,
3095                b: "hello".into(),
3096            },
3097            Attrs::new(),
3098        )
3099        .unwrap();
3100
3101        assert_eq!(
3102            format!("{}", envelope),
3103            r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"} {}"#
3104        );
3105    }
3106
3107    #[derive(Debug, Default, Actor)]
3108    struct Foo;
3109
3110    // Test that a message delivery failure causes the sending actor
3111    // to stop running.
3112    #[tokio::test]
3113    async fn test_actor_delivery_failure() {
3114        // This test involves making an actor fail and so we must set
3115        // a supervision coordinator.
3116        use crate::actor::ActorStatus;
3117        use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
3118
3119        let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
3120            BOXED_PANICKING_MAILBOX_SENDER.clone(),
3121        ));
3122        let proc_id = id!(quux[0]);
3123        let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
3124        ProcSupervisionCoordinator::set(&proc).await.unwrap();
3125
3126        let foo = proc.spawn::<Foo>("foo", ()).await.unwrap();
3127        let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
3128        let message = MessageEnvelope::new(
3129            foo.actor_id().clone(),
3130            PortId(id!(corge[0].bar), 9999u64),
3131            Serialized::serialize(&1u64).unwrap(),
3132            Attrs::new(),
3133        );
3134        return_handle.send(Undeliverable(message)).unwrap();
3135
3136        RealClock
3137            .sleep(tokio::time::Duration::from_millis(100))
3138            .await;
3139
3140        let foo_status = foo.status();
3141        assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
3142        let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
3143            unreachable!()
3144        };
3145        assert!(msg.to_string().contains(
3146            "a message from \
3147                quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
3148        ));
3149
3150        proc.destroy_and_wait::<()>(tokio::time::Duration::from_secs(1), None)
3151            .await
3152            .unwrap();
3153    }
3154
3155    #[tokio::test]
3156    async fn test_detached_return_handle() {
3157        let (return_handle, mut return_receiver) =
3158            crate::mailbox::undeliverable::new_undeliverable_port();
3159        // Simulate an undelivered message return.
3160        let envelope = MessageEnvelope::new(
3161            id!(foo[0].bar),
3162            PortId(id!(baz[0].corge), 9999u64),
3163            Serialized::serialize(&1u64).unwrap(),
3164            Attrs::new(),
3165        );
3166        return_handle.send(Undeliverable(envelope.clone())).unwrap();
3167        // Check we receive the undelivered message.
3168        assert!(
3169            RealClock
3170                .timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
3171                .await
3172                .is_ok()
3173        );
3174        // Setup a monitor for the receiver and show that if there are
3175        // no outstanding return handles it terminates.
3176        let monitor_handle = tokio::spawn(async move {
3177            while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
3178                envelope.set_error(DeliveryError::BrokenLink(
3179                    "returned in unit test".to_string(),
3180                ));
3181                UndeliverableMailboxSender
3182                    .post(envelope, /*unused */ monitored_return_handle());
3183            }
3184        });
3185        drop(return_handle);
3186        assert!(
3187            RealClock
3188                .timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3189                .await
3190                .is_ok()
3191        );
3192    }
3193
3194    async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3195        fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3196            // Create dummy state and port_id to create PortReceiver. They are
3197            // not used in the test.
3198            let dummy_state =
3199                State::new(id!(world[0].actor), BOXED_PANICKING_MAILBOX_SENDER.clone());
3200            let dummy_port_id = PortId(id!(world[0].actor), 0);
3201            let (sender, receiver) = mpsc::unbounded_channel::<M>();
3202            let receiver = PortReceiver {
3203                receiver,
3204                port_id: dummy_port_id,
3205                coalesce,
3206                mailbox: Mailbox {
3207                    inner: Arc::new(dummy_state),
3208                },
3209            };
3210            (sender, receiver)
3211        }
3212
3213        // verify fn drain
3214        {
3215            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3216            assert!(receiver.drain().is_empty());
3217
3218            sender.send(0).unwrap();
3219            sender.send(1).unwrap();
3220            sender.send(2).unwrap();
3221            sender.send(3).unwrap();
3222            sender.send(4).unwrap();
3223            sender.send(5).unwrap();
3224            sender.send(6).unwrap();
3225            sender.send(7).unwrap();
3226
3227            if drop_sender {
3228                drop(sender);
3229            }
3230
3231            if !coalesce {
3232                assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3233            } else {
3234                assert_eq!(receiver.drain(), vec![7]);
3235            }
3236
3237            assert!(receiver.drain().is_empty());
3238            assert!(receiver.drain().is_empty());
3239        }
3240
3241        // verify fn try_recv
3242        {
3243            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3244            assert!(receiver.try_recv().unwrap().is_none());
3245
3246            sender.send(0).unwrap();
3247            sender.send(1).unwrap();
3248            sender.send(2).unwrap();
3249            sender.send(3).unwrap();
3250
3251            if drop_sender {
3252                drop(sender);
3253            }
3254
3255            if !coalesce {
3256                assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3257                assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3258                assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3259            }
3260            assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3261            if drop_sender {
3262                assert_matches!(
3263                    receiver.try_recv().unwrap_err().kind(),
3264                    MailboxErrorKind::Closed
3265                );
3266                // Still Closed error
3267                assert_matches!(
3268                    receiver.try_recv().unwrap_err().kind(),
3269                    MailboxErrorKind::Closed
3270                );
3271            } else {
3272                assert!(receiver.try_recv().unwrap().is_none());
3273                // Still empty
3274                assert!(receiver.try_recv().unwrap().is_none());
3275            }
3276        }
3277        // verify fn recv
3278        {
3279            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3280            assert!(
3281                RealClock
3282                    .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3283                    .await
3284                    .is_err()
3285            );
3286
3287            sender.send(4).unwrap();
3288            sender.send(5).unwrap();
3289            sender.send(6).unwrap();
3290            sender.send(7).unwrap();
3291
3292            if drop_sender {
3293                drop(sender);
3294            }
3295
3296            if !coalesce {
3297                assert_eq!(receiver.recv().await.unwrap(), 4);
3298                assert_eq!(receiver.recv().await.unwrap(), 5);
3299                assert_eq!(receiver.recv().await.unwrap(), 6);
3300            }
3301            assert_eq!(receiver.recv().await.unwrap(), 7);
3302            if drop_sender {
3303                assert_matches!(
3304                    receiver.recv().await.unwrap_err().kind(),
3305                    MailboxErrorKind::Closed
3306                );
3307                // Still None
3308                assert_matches!(
3309                    receiver.recv().await.unwrap_err().kind(),
3310                    MailboxErrorKind::Closed
3311                );
3312            } else {
3313                assert!(
3314                    RealClock
3315                        .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3316                        .await
3317                        .is_err()
3318                );
3319            }
3320        }
3321    }
3322
3323    #[tokio::test]
3324    async fn test_receiver_basic_default() {
3325        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ false).await
3326    }
3327
3328    #[tokio::test]
3329    async fn test_receiver_basic_latest() {
3330        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ false).await
3331    }
3332
3333    #[tokio::test]
3334    async fn test_receiver_after_sender_drop_default() {
3335        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ true).await
3336    }
3337
3338    #[tokio::test]
3339    async fn test_receiver_after_sender_drop_latest() {
3340        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ true).await
3341    }
3342
3343    struct Setup {
3344        receiver: PortReceiver<u64>,
3345        actor0: Instance<()>,
3346        actor1: Instance<()>,
3347        _actor0_handle: ActorHandle<()>,
3348        _actor1_handle: ActorHandle<()>,
3349        port_id: PortId,
3350        port_id1: PortId,
3351        port_id2: PortId,
3352        port_id2_1: PortId,
3353    }
3354
3355    async fn setup_split_port_ids(
3356        reducer_spec: Option<ReducerSpec>,
3357        reducer_opts: Option<ReducerOpts>,
3358    ) -> Setup {
3359        let proc = Proc::local();
3360        let (actor0, actor0_handle) = proc.instance("actor0").unwrap();
3361        let (actor1, actor1_handle) = proc.instance("actor1").unwrap();
3362
3363        // Open a port on actor0
3364        let (port_handle, receiver) = actor0.open_port::<u64>();
3365        let port_id = port_handle.bind().port_id().clone();
3366
3367        // Split it twice on actor1
3368        let port_id1 = port_id
3369            .split(&actor1, reducer_spec.clone(), reducer_opts.clone(), true)
3370            .unwrap();
3371        let port_id2 = port_id
3372            .split(&actor1, reducer_spec.clone(), reducer_opts.clone(), true)
3373            .unwrap();
3374
3375        // A split port id can also be split
3376        let port_id2_1 = port_id2
3377            .split(&actor1, reducer_spec, reducer_opts.clone(), true)
3378            .unwrap();
3379
3380        Setup {
3381            receiver,
3382            actor0,
3383            actor1,
3384            _actor0_handle: actor0_handle,
3385            _actor1_handle: actor1_handle,
3386            port_id,
3387            port_id1,
3388            port_id2,
3389            port_id2_1,
3390        }
3391    }
3392
3393    fn post(cx: &impl context::Actor, port_id: PortId, msg: u64) {
3394        let serialized = Serialized::serialize(&msg).unwrap();
3395        port_id.send(cx, serialized);
3396    }
3397
3398    #[async_timed_test(timeout_secs = 30)]
3399    // TODO: OSS: this test is flaky in OSS. Need to repo and fix it.
3400    #[cfg_attr(not(fbcode_build), ignore)]
3401    async fn test_split_port_id_no_reducer() {
3402        let Setup {
3403            mut receiver,
3404            actor0,
3405            actor1,
3406            port_id,
3407            port_id1,
3408            port_id2,
3409            port_id2_1,
3410            ..
3411        } = setup_split_port_ids(None, None).await;
3412        // Can send messages to receiver from all port handles
3413        post(&actor0, port_id.clone(), 1);
3414        assert_eq!(receiver.recv().await.unwrap(), 1);
3415        post(&actor1, port_id1.clone(), 2);
3416        assert_eq!(receiver.recv().await.unwrap(), 2);
3417        post(&actor1, port_id2.clone(), 3);
3418        assert_eq!(receiver.recv().await.unwrap(), 3);
3419        post(&actor1, port_id2_1.clone(), 4);
3420        assert_eq!(receiver.recv().await.unwrap(), 4);
3421
3422        // no more messages
3423        RealClock.sleep(Duration::from_secs(2)).await;
3424        let msg = receiver.try_recv().unwrap();
3425        assert_eq!(msg, None);
3426    }
3427
3428    async fn wait_for(
3429        receiver: &mut PortReceiver<u64>,
3430        expected_size: usize,
3431        timeout_duration: Duration,
3432    ) -> anyhow::Result<Vec<u64>> {
3433        let mut messeges = vec![];
3434
3435        RealClock
3436            .timeout(timeout_duration, async {
3437                loop {
3438                    let msg = receiver.recv().await.unwrap();
3439                    messeges.push(msg);
3440                    if messeges.len() == expected_size {
3441                        break;
3442                    }
3443                }
3444            })
3445            .await?;
3446        Ok(messeges)
3447    }
3448
3449    #[async_timed_test(timeout_secs = 30)]
3450    async fn test_split_port_id_sum_reducer() {
3451        let config = crate::config::global::lock();
3452        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3453
3454        let sum_accumulator = accum::sum::<u64>();
3455        let reducer_spec = sum_accumulator.reducer_spec();
3456        let Setup {
3457            mut receiver,
3458            actor0,
3459            actor1,
3460            port_id,
3461            port_id1,
3462            port_id2,
3463            port_id2_1,
3464            ..
3465        } = setup_split_port_ids(reducer_spec, None).await;
3466        post(&actor0, port_id.clone(), 4);
3467        post(&actor1, port_id1.clone(), 2);
3468        post(&actor1, port_id2.clone(), 3);
3469        post(&actor1, port_id2_1.clone(), 1);
3470        let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3471            .await
3472            .unwrap();
3473        // Message might be received out of their sending out. So we sort the
3474        // messages here.
3475        messages.sort();
3476        assert_eq!(messages, vec![1, 2, 3, 4]);
3477
3478        // no more messages
3479        RealClock.sleep(Duration::from_secs(2)).await;
3480        let msg = receiver.try_recv().unwrap();
3481        assert_eq!(msg, None);
3482    }
3483
3484    #[async_timed_test(timeout_secs = 30)]
3485    // TODO: OSS: this test is flaky in OSS. Need to repo and fix it.
3486    #[cfg_attr(not(fbcode_build), ignore)]
3487    async fn test_split_port_id_every_n_messages() {
3488        let config = crate::config::global::lock();
3489        let _config_guard = config.override_key(
3490            crate::config::SPLIT_MAX_BUFFER_AGE,
3491            Duration::from_secs(600),
3492        );
3493        let proc = Proc::local();
3494        let (actor, _actor_handle) = proc.instance("actor").unwrap();
3495        let (port_handle, mut receiver) = actor.open_port::<u64>();
3496        let port_id = port_handle.bind().port_id().clone();
3497        // Split it
3498        let reducer_spec = accum::sum::<u64>().reducer_spec();
3499        let split_port_id = port_id.split(&actor, reducer_spec, None, true).unwrap();
3500
3501        // Send 9 messages.
3502        for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3503            post(&actor, split_port_id.clone(), msg);
3504        }
3505        // The first 5 should be batched and reduced once due
3506        // to every_n_msgs = 5.
3507        let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3508            .await
3509            .unwrap();
3510        assert_eq!(messages, vec![15]);
3511
3512        // the last message unfortranately will never come because they do not
3513        // reach batch size.
3514        RealClock.sleep(Duration::from_secs(2)).await;
3515        let msg = receiver.try_recv().unwrap();
3516        assert_eq!(msg, None);
3517    }
3518
3519    #[async_timed_test(timeout_secs = 30)]
3520    async fn test_split_port_timeout_flush() {
3521        let config = crate::config::global::lock();
3522        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
3523
3524        let Setup {
3525            mut receiver,
3526            actor0: _,
3527            actor1,
3528            port_id: _,
3529            port_id1,
3530            port_id2: _,
3531            port_id2_1: _,
3532            ..
3533        } = setup_split_port_ids(
3534            Some(accum::sum::<u64>().reducer_spec().unwrap()),
3535            Some(ReducerOpts {
3536                max_update_interval: Some(Duration::from_millis(50)),
3537            }),
3538        )
3539        .await;
3540
3541        post(&actor1, port_id1.clone(), 10);
3542        post(&actor1, port_id1.clone(), 20);
3543        post(&actor1, port_id1.clone(), 30);
3544
3545        // Messages should accumulate for 50ms.
3546        RealClock.sleep(Duration::from_millis(10)).await;
3547        let msg = receiver.try_recv().unwrap();
3548        assert_eq!(msg, None);
3549
3550        // Wait until we are flushed.
3551        RealClock.sleep(Duration::from_millis(100)).await;
3552
3553        // Now we are reduced and accumulated:
3554        let msg = receiver.recv().await.unwrap();
3555        assert_eq!(msg, 60); // 10 + 20 + 30
3556
3557        // No further messages:
3558        let msg = receiver.try_recv().unwrap();
3559        assert_eq!(msg, None);
3560    }
3561
3562    #[async_timed_test(timeout_secs = 30)]
3563    async fn test_split_port_timeout_and_size_flush() {
3564        let config = crate::config::global::lock();
3565        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
3566
3567        let Setup {
3568            mut receiver,
3569            actor0: _,
3570            actor1,
3571            port_id: _,
3572            port_id1,
3573            port_id2: _,
3574            port_id2_1: _,
3575            ..
3576        } = setup_split_port_ids(
3577            Some(accum::sum::<u64>().reducer_spec().unwrap()),
3578            Some(ReducerOpts {
3579                max_update_interval: Some(Duration::from_millis(50)),
3580            }),
3581        )
3582        .await;
3583
3584        post(&actor1, port_id1.clone(), 10);
3585        post(&actor1, port_id1.clone(), 20);
3586        post(&actor1, port_id1.clone(), 30);
3587        post(&actor1, port_id1.clone(), 40);
3588
3589        // Should have flushed at the third message.
3590        let msg = receiver.recv().await.unwrap();
3591        assert_eq!(msg, 60);
3592
3593        // After 50ms, the next reduce will flush:
3594        let msg = receiver.recv().await.unwrap();
3595        assert_eq!(msg, 40);
3596
3597        // No further messages
3598        let msg = receiver.try_recv().unwrap();
3599        assert_eq!(msg, None);
3600    }
3601
3602    #[test]
3603    fn test_dial_mailbox_router_prefixes_empty() {
3604        assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
3605    }
3606
3607    #[test]
3608    fn test_dial_mailbox_router_prefixes_single_entry() {
3609        let router = DialMailboxRouter::new();
3610        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3611
3612        let prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3613        assert_eq!(prefixes.len(), 1);
3614        assert_eq!(prefixes[0], id!(world0).into());
3615    }
3616
3617    #[test]
3618    fn test_dial_mailbox_router_prefixes_no_overlap() {
3619        let router = DialMailboxRouter::new();
3620        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3621        router.bind(id!(world1).into(), "unix!@2".parse().unwrap());
3622        router.bind(id!(world2).into(), "unix!@3".parse().unwrap());
3623
3624        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3625        prefixes.sort();
3626
3627        let mut expected = vec![id!(world0).into(), id!(world1).into(), id!(world2).into()];
3628        expected.sort();
3629
3630        assert_eq!(prefixes, expected);
3631    }
3632
3633    #[test]
3634    fn test_dial_mailbox_router_prefixes_with_overlaps() {
3635        let router = DialMailboxRouter::new();
3636        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3637        router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3638        router.bind(id!(world0[1]).into(), "unix!@3".parse().unwrap());
3639        router.bind(id!(world1).into(), "unix!@4".parse().unwrap());
3640        router.bind(id!(world1[0]).into(), "unix!@5".parse().unwrap());
3641
3642        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3643        prefixes.sort();
3644
3645        // Only world0 and world1 should be covering prefixes since they cover their children
3646        let mut expected = vec![id!(world0).into(), id!(world1).into()];
3647        expected.sort();
3648
3649        assert_eq!(prefixes, expected);
3650    }
3651
3652    #[test]
3653    fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
3654        let router = DialMailboxRouter::new();
3655        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3656        router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3657        router.bind(id!(world0[0].actor1).into(), "unix!@3".parse().unwrap());
3658        router.bind(id!(world1[0]).into(), "unix!@4".parse().unwrap());
3659        router.bind(id!(world1[1]).into(), "unix!@5".parse().unwrap());
3660        router.bind(id!(world2[0].actor0).into(), "unix!@6".parse().unwrap());
3661
3662        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3663        prefixes.sort();
3664
3665        // Covering prefixes should be:
3666        // - world0 (covers world0[0] and world0[0].actor1)
3667        // - world1[0] (not covered by anything else)
3668        // - world1[1] (not covered by anything else)
3669        // - world2[0].actor0 (not covered by anything else)
3670        let expected = vec![
3671            id!(world0).into(),
3672            id!(world1[0]).into(),
3673            id!(world1[1]).into(),
3674            id!(world2[0].actor0).into(),
3675        ];
3676
3677        assert_eq!(prefixes, expected);
3678    }
3679
3680    #[test]
3681    fn test_dial_mailbox_router_prefixes_same_level() {
3682        let router = DialMailboxRouter::new();
3683        router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
3684        router.bind(id!(world0[1]).into(), "unix!@2".parse().unwrap());
3685        router.bind(id!(world0[2]).into(), "unix!@3".parse().unwrap());
3686
3687        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3688        prefixes.sort();
3689
3690        // All should be covering prefixes since none is a prefix of another
3691        let mut expected = vec![
3692            id!(world0[0]).into(),
3693            id!(world0[1]).into(),
3694            id!(world0[2]).into(),
3695        ];
3696        expected.sort();
3697
3698        assert_eq!(prefixes, expected);
3699    }
3700
3701    /// A forwarder that bounces messages back to the **same**
3702    /// mailbox, but does so on a task to avoid recursive stack
3703    /// growth.
3704    #[derive(Clone, Debug)]
3705    struct AsyncLoopForwarder;
3706
3707    impl MailboxSender for AsyncLoopForwarder {
3708        fn post_unchecked(
3709            &self,
3710            envelope: MessageEnvelope,
3711            return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3712        ) {
3713            let me = self.clone();
3714            tokio::spawn(async move {
3715                // Call `post` so each hop applies TTL exactly once.
3716                me.post(envelope, return_handle);
3717            });
3718        }
3719    }
3720
3721    #[tokio::test]
3722    async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
3723        let actor_id = ActorId(
3724            ProcId::Ranked(id!(test_world), 0),
3725            "ttl_actor".to_string(),
3726            0,
3727        );
3728        let mailbox = Mailbox::new(
3729            actor_id.clone(),
3730            BoxedMailboxSender::new(AsyncLoopForwarder),
3731        );
3732        let (ret_port, mut ret_rx) = mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
3733
3734        // Create a destination not owned by this mailbox to force
3735        // forwarding.
3736        let remote_actor = ActorId(
3737            ProcId::Ranked(id!(remote_world), 1),
3738            "remote".to_string(),
3739            0,
3740        );
3741        let dest = PortId(remote_actor.clone(), /*port index*/ 4242);
3742
3743        // Build an envelope (TTL is seeded in `MessageEnvelope::new` /
3744        // `::serialize`).
3745        let payload = 1234_u64;
3746        let envelope =
3747            MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Attrs::new())
3748                .expect("serialize");
3749
3750        // Post it. This will start bouncing between forwarder and
3751        // mailbox until TTL hits 0.
3752        let return_handle = ret_port.clone();
3753        mailbox.post(envelope, return_handle);
3754
3755        // We expect the undeliverable to come back once TTL expires.
3756        #[allow(clippy::disallowed_methods)]
3757        let Undeliverable(undelivered) =
3758            tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
3759                .await
3760                .expect("timed out waiting for undeliverable")
3761                .expect("channel closed");
3762
3763        // Sanity: round-trip payload still deserializes.
3764        let got: u64 = undelivered.deserialized().expect("deserialize");
3765        assert_eq!(got, payload, "payload preserved");
3766    }
3767
3768    #[tokio::test]
3769    async fn message_ttl_success_local_delivery() {
3770        let actor_id = ActorId(
3771            ProcId::Ranked(id!(test_world), 0),
3772            "ttl_actor".to_string(),
3773            0,
3774        );
3775        let mailbox = Mailbox::new(
3776            actor_id.clone(),
3777            BoxedMailboxSender::new(PanickingMailboxSender),
3778        );
3779        let (_undeliverable_tx, mut undeliverable_rx) =
3780            mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
3781
3782        // Open a local user u64 port.
3783        let (user_port, mut user_rx) = mailbox.open_port::<u64>();
3784
3785        // Build an envelope destined for this mailbox's own port.
3786        let payload = 0xC0FFEE_u64;
3787        let envelope = MessageEnvelope::serialize(
3788            actor_id.clone(),
3789            user_port.bind().port_id().clone(),
3790            &payload,
3791            Attrs::new(),
3792        )
3793        .expect("serialize");
3794
3795        // Post the message using the mailbox (local path). TTL will
3796        // not expire.
3797        let return_handle = mailbox
3798            .bound_return_handle()
3799            .unwrap_or(monitored_return_handle());
3800        mailbox.post(envelope, return_handle);
3801
3802        // We should receive the payload locally.
3803        #[allow(clippy::disallowed_methods)]
3804        let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
3805            .await
3806            .expect("timed out waiting for local delivery")
3807            .expect("user port closed");
3808        assert_eq!(got, payload);
3809
3810        // There should be no undeliverables arriving.
3811        #[allow(clippy::disallowed_methods)]
3812        let no_undeliverable =
3813            tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
3814        assert!(
3815            no_undeliverable.is_err(),
3816            "unexpected undeliverable returned on successful local delivery"
3817        );
3818    }
3819
3820    #[tokio::test]
3821    async fn test_port_contramap() {
3822        let mbox = Mailbox::new_detached(id!(test[0].test));
3823        let (handle, mut rx) = mbox.open_port();
3824
3825        handle
3826            .contramap(|m| (1, m))
3827            .send("hello".to_string())
3828            .unwrap();
3829        assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
3830    }
3831
3832    #[test]
3833    #[should_panic(expected = "already bound")]
3834    fn test_bind_port_handle_to_actor_port_twice() {
3835        let mbox = Mailbox::new_detached(id!(test[0].test));
3836        let (handle, _rx) = mbox.open_port::<String>();
3837        handle.bind_actor_port();
3838        handle.bind_actor_port();
3839    }
3840
3841    #[test]
3842    fn test_bind_port_handle_to_actor_port() {
3843        let mbox = Mailbox::new_detached(id!(test[0].test));
3844        let default_port = mbox.actor_id().port_id(String::port());
3845        let (handle, _rx) = mbox.open_port::<String>();
3846        // Handle's port index is allocated by mailbox, not the actor port.
3847        assert_ne!(default_port.index(), handle.port_index);
3848        // Bind the handle to the actor port.
3849        handle.bind_actor_port();
3850        assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
3851        // bind() can still be used, just it will not change handle's state.
3852        handle.bind();
3853        handle.bind();
3854        assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
3855    }
3856
3857    #[test]
3858    #[should_panic(expected = "already bound")]
3859    fn test_bind_port_handle_to_actor_port_when_already_bound() {
3860        let mbox = Mailbox::new_detached(id!(test[0].test));
3861        let (handle, _rx) = mbox.open_port::<String>();
3862        // Bound handle to the port allocated by mailbox.
3863        handle.bind();
3864        assert_matches!(handle.location(), PortLocation::Bound(port) if port.index() == handle.port_index);
3865        // Since handle is already bound, call bind_to() on it will cause panic.
3866        handle.bind_actor_port();
3867    }
3868}