hyperactor/
mailbox.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mailboxes are the central message-passing mechanism in Hyperactor.
10//!
11//! Each actor owns a mailbox to which other actors can deliver messages.
12//! An actor can open one or more typed _ports_ in the mailbox; messages
13//! are in turn delivered to specific ports.
14//!
15//! Mailboxes are associated with an [`ActorId`] (given by `actor_id`
16//! in the following example):
17//!
18//! ```
19//! # use hyperactor::mailbox::Mailbox;
20//! # use hyperactor::Proc;
21//! # use hyperactor::reference::{ActorId, ProcId};
22//! # tokio_test::block_on(async {
23//! # let proc = Proc::local();
24//! # let (client, _) = proc.instance("client").unwrap();
25//! # let actor_id = proc.proc_id().actor_id("actor", 0);
26//! let mbox = Mailbox::new_detached(actor_id);
27//! let (port, mut receiver) = mbox.open_port::<u64>();
28//!
29//! port.send(&client, 123).unwrap();
30//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
31//! # })
32//! ```
33//!
34//! Mailboxes also provide a form of one-shot ports, called [`OncePort`],
35//! that permits at most one message transmission:
36//!
37//! ```
38//! # use hyperactor::mailbox::Mailbox;
39//! # use hyperactor::Proc;
40//! # use hyperactor::reference::{ActorId, ProcId};
41//! # tokio_test::block_on(async {
42//! # let proc = Proc::local();
43//! # let (client, _) = proc.instance("client").unwrap();
44//! # let actor_id = proc.proc_id().actor_id("actor", 0);
45//! let mbox = Mailbox::new_detached(actor_id);
46//!
47//! let (port, receiver) = mbox.open_once_port::<u64>();
48//!
49//! port.send(&client, 123u64).unwrap();
50//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
51//! # })
52//! ```
53//!
54//! [`OncePort`]s are correspondingly used for RPC replies in the actor
55//! system.
56//!
57//! ## Remote ports and serialization
58//!
59//! Mailboxes allow delivery of serialized messages to named ports:
60//!
61//! 1) Ports restrict message types to (serializable) [`Message`]s.
62//! 2) Each [`Port`] is associated with a [`PortId`] which globally names the port.
63//! 3) [`Mailbox`] provides interfaces to deliver serialized
64//!    messages to ports named by their [`PortId`].
65//!
66//! While this complicates the interface somewhat, it allows the
67//! implementation to avoid a serialization roundtrip when passing
68//! messages locally.
69
70use std::any::Any;
71use std::collections::BTreeMap;
72use std::collections::BTreeSet;
73use std::fmt;
74use std::fmt::Debug;
75use std::future;
76use std::future::Future;
77use std::ops::Bound::Excluded;
78use std::pin::Pin;
79use std::sync::Arc;
80use std::sync::LazyLock;
81use std::sync::Mutex;
82use std::sync::RwLock;
83use std::sync::Weak;
84use std::sync::atomic::AtomicU64;
85use std::sync::atomic::AtomicUsize;
86use std::sync::atomic::Ordering;
87use std::task::Context;
88use std::task::Poll;
89
90use async_trait::async_trait;
91use dashmap::DashMap;
92use dashmap::mapref::entry::Entry;
93use futures::Sink;
94use futures::Stream;
95use hyperactor_config::Flattrs;
96use hyperactor_telemetry::hash_to_u64;
97use serde::Deserialize;
98use serde::Serialize;
99use serde::de::DeserializeOwned;
100use tokio::sync::mpsc;
101use tokio::sync::oneshot;
102use tokio::sync::watch;
103use tokio::task::JoinHandle;
104use tokio_util::sync::CancellationToken;
105use typeuri::Named;
106
107// for macros
108use crate::Proc;
109use crate::accum::Accumulator;
110use crate::accum::ReducerSpec;
111use crate::accum::StreamingReducerOpts;
112use crate::actor::ActorStatus;
113use crate::actor::Signal;
114use crate::actor::remote::USER_PORT_OFFSET;
115use crate::channel;
116use crate::channel::ChannelAddr;
117use crate::channel::ChannelError;
118use crate::channel::ChannelTransport;
119use crate::channel::SendError;
120use crate::channel::TxStatus;
121use crate::context;
122use crate::metrics;
123use crate::ordering::SEQ_INFO;
124use crate::ordering::SeqInfo;
125use crate::reference;
126
127mod undeliverable;
128/// For [`Undeliverable`], a message type for delivery failures.
129pub use undeliverable::Undeliverable;
130pub use undeliverable::UndeliverableMessageError;
131pub use undeliverable::custom_monitored_return_handle;
132pub use undeliverable::monitored_return_handle; // TODO: Audit
133/// For [`MailboxAdminMessage`], a message type for mailbox administration.
134pub mod mailbox_admin_message;
135pub use mailbox_admin_message::MailboxAdminMessage;
136pub use mailbox_admin_message::MailboxAdminMessageHandler;
137/// For message headers and latency tracking.
138pub mod headers;
139
140/// Message collects the necessary requirements for messages that are deposited
141/// into mailboxes.
142pub trait Message: Send + Sync + 'static {}
143impl<M: Send + Sync + 'static> Message for M {}
144
145/// RemoteMessage extends [`Message`] by requiring that the messages
146/// also be serializable, and can thus traverse process boundaries.
147/// RemoteMessages must also specify a globally unique type name (a URI).
148pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
149
150impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
151
152/// Type alias for bytestring data used throughout the system.
153pub type Data = Vec<u8>;
154
155/// Delivery errors occur during message posting.
156#[derive(
157    thiserror::Error,
158    Debug,
159    Serialize,
160    Deserialize,
161    typeuri::Named,
162    Clone,
163    PartialEq,
164    Eq
165)]
166pub enum DeliveryError {
167    /// The destination address is not reachable.
168    #[error("address not routable: {0}")]
169    Unroutable(String),
170
171    /// A broken link indicates that a link in the message
172    /// delivery path has failed.
173    #[error("broken link: {0}")]
174    BrokenLink(String),
175
176    /// A (local) mailbox delivery error.
177    #[error("mailbox error: {0}")]
178    Mailbox(String),
179
180    /// A multicast related delivery error.
181    #[error("multicast error: {0}")]
182    Multicast(String),
183
184    /// The message went through too many hops and has expired.
185    #[error("ttl expired")]
186    TtlExpired,
187}
188
189/// An envelope that carries a message destined to a remote actor.
190/// The envelope contains a serialized message along with its destination
191/// and sender.
192#[derive(Debug, Serialize, Deserialize, Clone, typeuri::Named)]
193pub struct MessageEnvelope {
194    /// The sender of this message.
195    sender: reference::ActorId,
196
197    /// The destination of the message.
198    dest: reference::PortId,
199
200    /// The serialized message.
201    data: wirevalue::Any,
202
203    /// Error contains a delivery error when message delivery failed.
204    errors: Vec<DeliveryError>,
205
206    /// Additional context for this message.
207    headers: Flattrs,
208
209    /// Decremented at every `MailboxSender` hop.
210    ttl: u8,
211
212    /// If true, undeliverable messages should be returned to sender. Else, they
213    /// are dropped.
214    return_undeliverable: bool,
215    // TODO: add typename, source, seq, etc.
216}
217wirevalue::register_type!(MessageEnvelope);
218
219impl MessageEnvelope {
220    /// Create a new envelope with the provided sender, destination, and message.
221    pub fn new(
222        sender: reference::ActorId,
223        dest: reference::PortId,
224        data: wirevalue::Any,
225        headers: Flattrs,
226    ) -> Self {
227        Self {
228            sender,
229            dest,
230            data,
231            errors: Vec::new(),
232            headers,
233            ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
234            // By default, all undeliverable messages should be returned to the sender.
235            return_undeliverable: true,
236        }
237    }
238
239    /// Create a new envelope whose sender ID is unknown.
240    pub(crate) fn new_unknown(dest: reference::PortId, data: wirevalue::Any) -> Self {
241        // Create a synthetic "unknown" actor ID for messages with no known sender
242        let unknown_addr = ChannelAddr::any(ChannelTransport::Local);
243        let unknown_proc_id = crate::reference::ProcId::unique(unknown_addr, "unknown");
244        let unknown_actor_id =
245            crate::reference::ActorId::root(unknown_proc_id, "unknown".to_string());
246        Self::new(unknown_actor_id, dest, data, Flattrs::new())
247    }
248
249    /// Construct a new serialized value by serializing the provided T-typed value.
250    pub fn serialize<T: Serialize + Named>(
251        source: reference::ActorId,
252        dest: reference::PortId,
253        value: &T,
254        headers: Flattrs,
255    ) -> Result<Self, wirevalue::Error> {
256        Ok(Self {
257            headers,
258            data: wirevalue::Any::serialize(value)?,
259            sender: source,
260            dest,
261            errors: Vec::new(),
262            ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
263            // By default, all undeliverable messages should be returned to the sender.
264            return_undeliverable: true,
265        })
266    }
267
268    /// Returns the remaining time-to-live (TTL) for this message.
269    ///
270    /// The TTL is decremented at each `MailboxSender` hop. When it
271    /// reaches 0, the message is considered expired and is returned
272    /// to the sender as undeliverable.
273    pub fn ttl(&self) -> u8 {
274        self.ttl
275    }
276
277    /// Overrides the message’s time-to-live (TTL).
278    ///
279    /// This replaces the current TTL value (normally initialized from
280    /// `config::MESSAGE_TTL_DEFAULT`) with the provided `ttl`. The
281    /// updated envelope is returned for chaining.
282    ///
283    /// # Note
284    /// The TTL is decremented at each `MailboxSender` hop, and when
285    /// it reaches 0 the message will be treated as undeliverable.
286    pub fn set_ttl(mut self, ttl: u8) -> Self {
287        self.ttl = ttl;
288        self
289    }
290
291    /// Decrements the message's TTL by one hop.
292    ///
293    /// Returns `Ok(())` if the TTL was greater than zero and
294    /// successfully decremented. If the TTL was already zero, no
295    /// decrement occurs and `Err(DeliveryError::TtlExpired)` is
296    /// returned, indicating that the message has expired and should
297    /// be treated as undeliverable.
298    fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
299        if self.ttl == 0 {
300            Err(DeliveryError::TtlExpired)
301        } else {
302            self.ttl -= 1;
303            Ok(())
304        }
305    }
306
307    /// Deserialize the message in the envelope to the provided type T.
308    pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
309        Ok(self.data.deserialized()?)
310    }
311
312    /// The serialized message.
313    pub fn data(&self) -> &wirevalue::Any {
314        &self.data
315    }
316
317    /// The message sender.
318    pub fn sender(&self) -> &reference::ActorId {
319        &self.sender
320    }
321
322    /// The destination of the message.
323    pub fn dest(&self) -> &reference::PortId {
324        &self.dest
325    }
326
327    /// The message headers.
328    pub fn headers(&self) -> &Flattrs {
329        &self.headers
330    }
331
332    /// Tells whether this is a signal message.
333    pub fn is_signal(&self) -> bool {
334        self.dest.index() == Signal::port()
335    }
336
337    /// Set a delivery error for the message. If errors are already set, append
338    /// it to the existing errors.
339    pub fn set_error(&mut self, error: DeliveryError) {
340        self.errors.push(error)
341    }
342
343    /// Change the sender on the envelope in case it was set incorrectly. This
344    /// should only be used by CommActor since it is forwarding from another
345    /// sender.
346    pub fn update_sender(&mut self, sender: reference::ActorId) {
347        self.sender = sender;
348    }
349
350    /// Set to true if you want this message to be returned to sender if it cannot
351    /// reach dest. This is the default.
352    /// Set to false if you want the message to be dropped instead.
353    pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
354        self.return_undeliverable = return_undeliverable;
355    }
356
357    /// The message has been determined to be undeliverable with the
358    /// provided error. Mark the envelope with the error and return to
359    /// sender.
360    pub fn undeliverable(
361        mut self,
362        error: DeliveryError,
363        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
364    ) {
365        tracing::debug!(
366            name = "undelivered_message_attempt",
367            sender = self.sender.to_string(),
368            dest = self.dest.to_string(),
369            error = error.to_string(),
370            return_handle = %return_handle,
371        );
372        metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
373            1,
374            hyperactor_telemetry::kv_pairs!(
375                "sender_actor_id" => self.sender.to_string(),
376                "dest_actor_id" => self.dest.to_string(),
377                "message_type" => self.data.typename().unwrap_or("unknown"),
378                "error_type" =>  error.to_string(),
379            ),
380        );
381
382        self.set_error(error);
383        undeliverable::return_undeliverable(return_handle, self);
384    }
385
386    /// Get the errors of why this message was undeliverable. Empty means this
387    /// message was not determined as undeliverable.
388    pub fn errors(&self) -> &Vec<DeliveryError> {
389        &self.errors
390    }
391
392    /// Get the string representation of the errors of this message was
393    /// undeliverable. None means this message was not determined as
394    /// undeliverable.
395    pub fn error_msg(&self) -> Option<String> {
396        if self.errors.is_empty() {
397            None
398        } else {
399            Some(
400                self.errors
401                    .iter()
402                    .map(|e| e.to_string())
403                    .collect::<Vec<_>>()
404                    .join("; "),
405            )
406        }
407    }
408
409    fn open(self) -> (MessageMetadata, wirevalue::Any) {
410        let Self {
411            sender,
412            dest,
413            data,
414            errors,
415            headers,
416            ttl,
417            return_undeliverable,
418        } = self;
419
420        (
421            MessageMetadata {
422                sender,
423                dest,
424                errors,
425                headers,
426                ttl,
427                return_undeliverable,
428            },
429            data,
430        )
431    }
432
433    fn seal(metadata: MessageMetadata, data: wirevalue::Any) -> Self {
434        let MessageMetadata {
435            sender,
436            dest,
437            errors,
438            headers,
439            ttl,
440            return_undeliverable,
441        } = metadata;
442
443        Self {
444            sender,
445            dest,
446            data,
447            errors,
448            headers,
449            ttl,
450            return_undeliverable,
451        }
452    }
453
454    fn return_undeliverable(&self) -> bool {
455        self.return_undeliverable
456    }
457
458    /// Set a header value on this envelope.
459    pub fn set_header<T: Serialize>(&mut self, key: hyperactor_config::attrs::Key<T>, value: T) {
460        self.headers.set(key, value);
461    }
462}
463
464impl fmt::Display for MessageEnvelope {
465    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466        match &self.error_msg() {
467            None => write!(
468                f,
469                "{} > {}: {} {{{}}}",
470                self.sender, self.dest, self.data, self.headers
471            ),
472            Some(err) => write!(
473                f,
474                "{} > {}: {} {{{}}}: delivery error: {}",
475                self.sender, self.dest, self.data, self.headers, err
476            ),
477        }
478    }
479}
480
481/// Metadata about a message sent via a MessageEnvelope.
482#[derive(Clone)]
483pub struct MessageMetadata {
484    sender: reference::ActorId,
485    dest: reference::PortId,
486    errors: Vec<DeliveryError>,
487    headers: Flattrs,
488    ttl: u8,
489    return_undeliverable: bool,
490}
491
492/// Errors that occur during mailbox operations. Each error is associated
493/// with the mailbox's actor id.
494#[derive(Debug)]
495pub struct MailboxError {
496    actor_id: reference::ActorId,
497    kind: MailboxErrorKind,
498}
499
500/// The kinds of mailbox errors. This enum is marked non-exhaustive to
501/// allow for extensibility.
502#[derive(thiserror::Error, Debug)]
503#[non_exhaustive]
504pub enum MailboxErrorKind {
505    /// An operation was attempted on a closed mailbox.
506    #[error("mailbox closed")]
507    Closed,
508
509    /// The port associated with an operation was invalid.
510    #[error("invalid port: {0}")]
511    InvalidPort(reference::PortId),
512
513    /// There was no sender associated with the port.
514    #[error("no sender for port: {0}")]
515    NoSenderForPort(reference::PortId),
516
517    /// There was no local sender associated with the port.
518    /// Returned by operations that require a local port.
519    #[error("no local sender for port: {0}")]
520    NoLocalSenderForPort(reference::PortId),
521
522    /// The port was closed.
523    #[error("{0}: port closed")]
524    PortClosed(reference::PortId),
525
526    /// An error occured during a send operation.
527    #[error("send {0}: {1}")]
528    Send(reference::PortId, #[source] anyhow::Error),
529
530    /// An error occured during a receive operation.
531    #[error("recv {0}: {1}")]
532    Recv(reference::PortId, #[source] anyhow::Error),
533
534    /// There was a serialization failure.
535    #[error("serialize: {0}")]
536    Serialize(#[source] anyhow::Error),
537
538    /// There was a deserialization failure.
539    #[error("deserialize {0}: {1}")]
540    Deserialize(&'static str, anyhow::Error),
541
542    /// There was an error during a channel operation.
543    #[error(transparent)]
544    Channel(#[from] ChannelError),
545
546    /// The owning actor terminated (either stopped or failed).
547    #[error("owner terminated: {0}")]
548    OwnerTerminated(ActorStatus),
549}
550
551impl MailboxError {
552    /// Create a new mailbox error associated with the provided actor
553    /// id and of the given kind.
554    pub fn new(actor_id: reference::ActorId, kind: MailboxErrorKind) -> Self {
555        Self { actor_id, kind }
556    }
557
558    /// The ID of the mailbox producing this error.
559    pub fn actor_id(&self) -> &reference::ActorId {
560        &self.actor_id
561    }
562
563    /// The error's kind.
564    pub fn kind(&self) -> &MailboxErrorKind {
565        &self.kind
566    }
567}
568
569impl fmt::Display for MailboxError {
570    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
571        write!(f, "{}: ", self.actor_id)?;
572        fmt::Display::fmt(&self.kind, f)
573    }
574}
575
576impl std::error::Error for MailboxError {
577    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
578        self.kind.source()
579    }
580}
581
582/// PortLocation describes the location of a port.
583/// This is used in errors to provide a uniform data type
584/// for ports that may or may not be bound.
585#[derive(Debug, Clone)]
586pub enum PortLocation {
587    /// The port was bound: the location is its underlying bound ID.
588    Bound(reference::PortId),
589    /// The port was not bound: we provide the actor ID and the message type.
590    Unbound(reference::ActorId, &'static str),
591}
592
593impl PortLocation {
594    fn new_unbound<M: Message>(actor_id: reference::ActorId) -> Self {
595        PortLocation::Unbound(actor_id, std::any::type_name::<M>())
596    }
597
598    #[allow(dead_code)]
599    fn new_unbound_type(actor_id: reference::ActorId, ty: &'static str) -> Self {
600        PortLocation::Unbound(actor_id, ty)
601    }
602
603    /// The actor id of the location.
604    pub fn actor_id(&self) -> &reference::ActorId {
605        match self {
606            PortLocation::Bound(port_id) => port_id.actor_id(),
607            PortLocation::Unbound(actor_id, _) => actor_id,
608        }
609    }
610}
611
612impl fmt::Display for PortLocation {
613    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614        match self {
615            PortLocation::Bound(port_id) => write!(f, "{}", port_id),
616            PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
617        }
618    }
619}
620
621/// Errors that that occur during mailbox sending operations. Each error
622/// is associated with the port ID of the operation.
623#[derive(Debug)]
624pub struct MailboxSenderError {
625    location: Box<PortLocation>,
626    kind: Box<MailboxSenderErrorKind>,
627}
628
629/// The kind of mailbox sending errors.
630#[derive(thiserror::Error, Debug)]
631pub enum MailboxSenderErrorKind {
632    /// Error during serialization.
633    #[error("serialization error: {0}")]
634    Serialize(anyhow::Error),
635
636    /// Error during deserialization.
637    #[error("deserialization error for type {0}: {1}")]
638    Deserialize(&'static str, anyhow::Error),
639
640    /// A send to an invalid port.
641    #[error("invalid port")]
642    Invalid,
643
644    /// A send to a closed port.
645    #[error("port closed")]
646    Closed,
647
648    // The following pass through underlying errors:
649    /// An underlying mailbox error.
650    #[error(transparent)]
651    Mailbox(#[from] MailboxError),
652
653    /// An underlying channel error.
654    #[error(transparent)]
655    Channel(#[from] ChannelError),
656
657    /// An other, uncategorized error.
658    #[error("send error: {0}")]
659    Other(#[from] anyhow::Error),
660
661    /// The destination was unreachable.
662    #[error("unreachable: {0}")]
663    Unreachable(anyhow::Error),
664}
665
666impl MailboxSenderError {
667    /// Create a new mailbox sender error to an unbound port.
668    pub fn new_unbound<M>(actor_id: reference::ActorId, kind: MailboxSenderErrorKind) -> Self {
669        Self {
670            location: Box::new(PortLocation::Unbound(actor_id, std::any::type_name::<M>())),
671            kind: Box::new(kind),
672        }
673    }
674
675    /// Create a new mailbox sender, manually providing the type.
676    pub fn new_unbound_type(
677        actor_id: reference::ActorId,
678        kind: MailboxSenderErrorKind,
679        ty: &'static str,
680    ) -> Self {
681        Self {
682            location: Box::new(PortLocation::Unbound(actor_id, ty)),
683            kind: Box::new(kind),
684        }
685    }
686
687    /// Create a new mailbox sender error with the provided port ID and kind.
688    pub fn new_bound(port_id: reference::PortId, kind: MailboxSenderErrorKind) -> Self {
689        Self {
690            location: Box::new(PortLocation::Bound(port_id)),
691            kind: Box::new(kind),
692        }
693    }
694
695    /// The location at which the error occured.
696    pub fn location(&self) -> &PortLocation {
697        &self.location
698    }
699
700    /// The kind associated with the error.
701    pub fn kind(&self) -> &MailboxSenderErrorKind {
702        &self.kind
703    }
704}
705
706impl fmt::Display for MailboxSenderError {
707    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
708        write!(f, "{}: ", self.location)?;
709        fmt::Display::fmt(&self.kind, f)
710    }
711}
712
713impl std::error::Error for MailboxSenderError {
714    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
715        self.kind.source()
716    }
717}
718
719/// MailboxSenders can send messages through ports to mailboxes. It
720/// provides a unified interface for message delivery in the system.
721#[async_trait]
722pub trait MailboxSender: Send + Sync + Any {
723    /// Apply hop semantics (TTL decrement; undeliverable on 0), then
724    /// delegate to transport.
725    fn post(
726        &self,
727        mut envelope: MessageEnvelope,
728        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
729    ) {
730        if let Err(err) = envelope.dec_ttl_or_err() {
731            envelope.undeliverable(err, return_handle);
732            return;
733        }
734        self.post_unchecked(envelope, return_handle);
735    }
736
737    /// Raw transport: **no** policy.
738    fn post_unchecked(
739        &self,
740        envelope: MessageEnvelope,
741        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
742    );
743
744    /// Wait until all messages previously posted through this sender
745    /// have been delivered (wire-acked) or confirmed undeliverable.
746    /// The default implementation is a no-op, appropriate for senders
747    /// whose `post` is synchronous (e.g. local in-process delivery).
748    async fn flush(&self) -> Result<(), anyhow::Error> {
749        Ok(())
750    }
751}
752
753/// PortSender extends [`MailboxSender`] by providing typed endpoints
754/// for sending messages over ports
755pub trait PortSender: MailboxSender {
756    /// Deliver a message to the provided port.
757    fn serialize_and_send<M: RemoteMessage>(
758        &self,
759        port: &reference::PortRef<M>,
760        message: M,
761        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
762    ) -> Result<(), MailboxSenderError> {
763        // TODO: convert this to a undeliverable error also
764        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
765            MailboxSenderError::new_bound(
766                port.port_id().clone(),
767                MailboxSenderErrorKind::Serialize(err.into()),
768            )
769        })?;
770        self.post(
771            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
772            return_handle,
773        );
774        Ok(())
775    }
776
777    /// Deliver a message to a one-shot port, consuming the provided port,
778    /// which is not reusable.
779    fn serialize_and_send_once<M: RemoteMessage>(
780        &self,
781        once_port: reference::OncePortRef<M>,
782        message: M,
783        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
784    ) -> Result<(), MailboxSenderError> {
785        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
786            MailboxSenderError::new_bound(
787                once_port.port_id().clone(),
788                MailboxSenderErrorKind::Serialize(err.into()),
789            )
790        })?;
791        self.post(
792            MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
793            return_handle,
794        );
795        Ok(())
796    }
797}
798
799impl<T: ?Sized + MailboxSender> PortSender for T {}
800
801/// A perpetually closed mailbox sender. Panics if any messages are posted.
802/// Useful for tests, or where there is no meaningful mailbox sender
803/// implementation available.
804#[derive(Debug, Clone)]
805pub struct PanickingMailboxSender;
806
807#[async_trait]
808impl MailboxSender for PanickingMailboxSender {
809    fn post_unchecked(
810        &self,
811        envelope: MessageEnvelope,
812        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
813    ) {
814        panic!("panic! in the mailbox! attempted post: {}", envelope)
815    }
816}
817
818/// A mailbox sender for undeliverable messages. This will simply record
819/// any undelivered messages.
820#[derive(Debug)]
821pub struct UndeliverableMailboxSender;
822
823#[async_trait]
824impl MailboxSender for UndeliverableMailboxSender {
825    fn post_unchecked(
826        &self,
827        envelope: MessageEnvelope,
828        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
829    ) {
830        let sender_name = envelope.sender.name();
831        let error_str = envelope.error_msg().unwrap_or("".to_string());
832        // The undeliverable message was unable to be delivered back to the
833        // sender for some reason
834        tracing::error!(
835            name = "undelivered_message_abandoned",
836            actor_name = sender_name,
837            actor_id = envelope.sender.to_string(),
838            dest = envelope.dest.to_string(),
839            headers = envelope.headers().to_string(), // todo: implement tracing::Value for Flattrs
840            data = envelope.data().to_string(),
841            return_handle = %return_handle,
842            "message not delivered, {}",
843            error_str,
844        );
845    }
846}
847
848static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
849    LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
850
851/// Convenience boxing implementation for MailboxSender. Most APIs
852/// are parameterized on MailboxSender implementations, and it's thus
853/// difficult to work with dyn values.  BoxedMailboxSender bridges this
854/// gap by providing a concrete MailboxSender which dispatches using an
855/// underlying (boxed) dyn.
856#[derive(Clone)]
857pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
858
859impl fmt::Debug for BoxedMailboxSender {
860    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861        f.debug_struct("BoxedMailboxSender")
862            .field("sender", &"<dyn MailboxSender>")
863            .finish()
864    }
865}
866
867impl BoxedMailboxSender {
868    /// Create a new boxed sender given the provided sender implementation.
869    pub fn new(sender: impl MailboxSender + 'static) -> Self {
870        Self(Arc::new(sender))
871    }
872
873    /// Attempts to downcast the inner sender to the given concrete
874    /// type.
875    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
876        (&*self.0 as &dyn Any).downcast_ref::<T>()
877    }
878}
879
880/// Extension trait that creates a boxed clone of a MailboxSender.
881pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
882    /// A boxed clone of this MailboxSender.
883    fn boxed(&self) -> BoxedMailboxSender;
884}
885impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
886    fn boxed(&self) -> BoxedMailboxSender {
887        BoxedMailboxSender::new(self.clone())
888    }
889}
890
891/// Extension trait that rehomes a MailboxSender into a BoxedMailboxSender.
892pub trait IntoBoxedMailboxSender: MailboxSender {
893    /// Rehome this MailboxSender into a BoxedMailboxSender.
894    fn into_boxed(self) -> BoxedMailboxSender;
895}
896impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
897    fn into_boxed(self) -> BoxedMailboxSender {
898        BoxedMailboxSender::new(self)
899    }
900}
901
902#[async_trait]
903impl MailboxSender for BoxedMailboxSender {
904    fn post_unchecked(
905        &self,
906        envelope: MessageEnvelope,
907        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
908    ) {
909        self.0.post_unchecked(envelope, return_handle);
910    }
911
912    async fn flush(&self) -> Result<(), anyhow::Error> {
913        self.0.flush().await
914    }
915}
916
917/// Errors that occur during mailbox serving.
918#[derive(thiserror::Error, Debug)]
919pub enum MailboxServerError {
920    /// An underlying channel error.
921    #[error(transparent)]
922    Channel(#[from] ChannelError),
923
924    /// An underlying mailbox sender error.
925    #[error(transparent)]
926    MailboxSender(#[from] MailboxSenderError),
927}
928
929/// Represents a running [`MailboxServer`]. The handle composes a
930/// ['tokio::task::JoinHandle'] and may be joined in the same manner.
931#[derive(Debug)]
932pub struct MailboxServerHandle {
933    join_handle: JoinHandle<Result<(), MailboxServerError>>,
934    stopped_tx: watch::Sender<bool>,
935}
936
937impl MailboxServerHandle {
938    /// Signal the server to stop serving the mailbox. The caller should
939    /// join the handle by awaiting the [`MailboxServerHandle`] future.
940    ///
941    /// Stop should be called at most once.
942    pub fn stop(&self, reason: &str) {
943        tracing::info!("stopping mailbox server; reason: {}", reason);
944        self.stopped_tx.send(true).expect("stop called twice");
945    }
946}
947
948/// Forward future implementation to underlying handle.
949impl Future for MailboxServerHandle {
950    type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
951
952    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
953        // SAFETY: This is safe to do because self is pinned.
954        let join_handle_pinned =
955            unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
956        join_handle_pinned.poll(cx)
957    }
958}
959
960/// Serve a port on the provided [`channel::Rx`]. This dispatches all
961/// channel messages directly to the port.
962pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
963    /// Serve the provided port on the given channel on this sender on
964    /// a background task which may be joined with the returned handle.
965    /// The task fails on any send error.
966    fn serve(
967        self,
968        mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
969    ) -> MailboxServerHandle {
970        // A `MailboxServer` can receive a message that couldn't
971        // reach its destination. We can use the fact that servers are
972        // `MailboxSender`s to attempt to forward them back to their
973        // senders.
974        let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
975        let server = self.clone();
976        tokio::task::spawn(async move {
977            // Create a client for this task.
978            static NEXT_RANK: AtomicUsize = AtomicUsize::new(0);
979            let rank = NEXT_RANK.fetch_add(1, Ordering::Relaxed);
980            let addr = ChannelAddr::any(ChannelTransport::Local);
981            let proc_id = reference::ProcId::unique(addr, format!("mailbox_server_{}", rank));
982            // Use this mailbox server as the forwarder, so we can use it to
983            // return message back to the sender.
984            let proc = Proc::configured(proc_id, BoxedMailboxSender::new(server));
985            let (client, _) = proc.instance("undeliverable_supervisor").unwrap();
986            while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
987                if let Ok(Undeliverable(e)) =
988                    envelope.deserialized::<Undeliverable<MessageEnvelope>>()
989                {
990                    // A non-returnable undeliverable.
991                    UndeliverableMailboxSender.post(e, monitored_return_handle());
992                    continue;
993                }
994                envelope.set_error(DeliveryError::BrokenLink(
995                    "message was undeliverable".to_owned(),
996                ));
997                let return_port =
998                    reference::PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
999                        envelope.sender(),
1000                    );
1001                return_port.send_serialized(
1002                    &client,
1003                    Flattrs::new(),
1004                    wirevalue::Any::serialize(&Undeliverable(envelope)).unwrap(),
1005                );
1006            }
1007        });
1008
1009        let (stopped_tx, mut stopped_rx) = watch::channel(false);
1010        let join_handle = tokio::spawn(async move {
1011            let mut detached = false;
1012
1013            let result = loop {
1014                if *stopped_rx.borrow_and_update() {
1015                    break Ok(());
1016                }
1017
1018                tokio::select! {
1019                    message = rx.recv() => {
1020                        match message {
1021                            // Relay the message to the port directly.
1022                            Ok(envelope) => self.post(envelope, return_handle.clone()),
1023
1024                            // Closed is a "graceful" error in this case.
1025                            // We simply stop serving.
1026                            Err(ChannelError::Closed) => break Ok(()),
1027                            Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1028                        }
1029                    }
1030                    result = stopped_rx.changed(), if !detached  => {
1031                        detached = result.is_err();
1032                        if detached {
1033                            tracing::debug!(
1034                                "the mailbox server is detached for Rx {}", rx.addr()
1035                            );
1036                        } else {
1037                            tracing::debug!(
1038                                "the mailbox server is stopped for Rx {}", rx.addr()
1039                            );
1040                        }
1041                    }
1042                }
1043            };
1044
1045            // Join the channel receiver to ensure pending acks are
1046            // sent before the underlying channel server is torn down.
1047            rx.join().await;
1048
1049            result
1050        });
1051
1052        MailboxServerHandle {
1053            join_handle,
1054            stopped_tx,
1055        }
1056    }
1057}
1058
1059impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1060
1061struct Buffer<T: Message> {
1062    queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
1063    #[allow(dead_code)]
1064    processed: watch::Receiver<usize>,
1065    seq: AtomicUsize,
1066}
1067
1068impl<T: Message> Buffer<T> {
1069    fn new<Fut>(
1070        process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
1071    ) -> Self
1072    where
1073        Fut: Future<Output = ()> + Send + 'static,
1074    {
1075        let (queue, mut next) = mpsc::unbounded_channel();
1076        let (last_processed, processed) = watch::channel(0);
1077        crate::init::get_runtime().spawn(async move {
1078            let mut seq = 0;
1079            while let Some((msg, return_handle)) = next.recv().await {
1080                process(msg, return_handle).await;
1081                seq += 1;
1082                let _ = last_processed.send(seq);
1083            }
1084        });
1085        Self {
1086            queue,
1087            processed,
1088            seq: AtomicUsize::new(0),
1089        }
1090    }
1091
1092    fn send(
1093        &self,
1094        item: (T, PortHandle<Undeliverable<T>>),
1095    ) -> Result<(), Box<mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>>> {
1096        self.seq.fetch_add(1, Ordering::SeqCst);
1097        self.queue.send(item).map_err(Box::new)?;
1098        Ok(())
1099    }
1100}
1101
1102/// A mailbox server client that transmits messages on a Tx channel.
1103pub struct MailboxClient {
1104    // The unbounded sender.
1105    buffer: Buffer<MessageEnvelope>,
1106
1107    // To cancel monitoring tx health.
1108    _tx_monitoring: CancellationToken,
1109
1110    // Flush tracking: counts messages successfully submitted to the buffer.
1111    submitted: Arc<AtomicUsize>,
1112    // Flush tracking: counts messages whose delivery oneshot has resolved
1113    // (acked or failed).
1114    completed: Arc<AtomicUsize>,
1115    // Notifies flush waiters when `completed` changes.
1116    completed_notify: Arc<tokio::sync::Notify>,
1117}
1118
1119impl fmt::Debug for MailboxClient {
1120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1121        f.debug_struct("MailboxClient")
1122            .field("buffer", &"<Buffer>")
1123            .finish()
1124    }
1125}
1126
1127impl MailboxClient {
1128    /// Create a new client that sends messages destined for a
1129    /// [`MailboxServer`] on the provided Tx channel.
1130    pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1131        let addr = tx.addr();
1132        let tx = Arc::new(tx);
1133        let tx_status = tx.status().clone();
1134        let tx_monitoring = CancellationToken::new();
1135        let completed = Arc::new(AtomicUsize::new(0));
1136        let completed_notify = Arc::new(tokio::sync::Notify::new());
1137        let buffer = {
1138            let completed = completed.clone();
1139            let completed_notify = completed_notify.clone();
1140            Buffer::new(move |envelope, return_handle| {
1141                let tx = Arc::clone(&tx);
1142                let (return_channel, return_receiver) =
1143                    oneshot::channel::<SendError<MessageEnvelope>>();
1144                // Set up for delivery failure.
1145                let return_handle_0 = return_handle.clone();
1146                let completed = completed.clone();
1147                let completed_notify = completed_notify.clone();
1148                tokio::spawn(async move {
1149                    match return_receiver.await {
1150                        Ok(SendError {
1151                            error,
1152                            message,
1153                            reason,
1154                        }) => {
1155                            message.undeliverable(
1156                                DeliveryError::BrokenLink(format!(
1157                                    "failed to enqueue in MailboxClient when processing buffer: {error} with reason {reason:?}"
1158                                )),
1159                                return_handle_0,
1160                            );
1161                        }
1162                        Err(_) => {
1163                            // Oneshot sender was dropped — message was acked.
1164                        }
1165                    }
1166                    completed.fetch_add(1, Ordering::SeqCst);
1167                    completed_notify.notify_waiters();
1168                });
1169                // Send the message for transmission.
1170                tx.try_post(envelope, return_channel);
1171                future::ready(())
1172            })
1173        };
1174        let this = Self {
1175            buffer,
1176            _tx_monitoring: tx_monitoring.clone(),
1177            submitted: Arc::new(AtomicUsize::new(0)),
1178            completed,
1179            completed_notify,
1180        };
1181        Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1182        this
1183    }
1184
1185    /// Convenience constructor, to set up a mailbox client that forwards messages
1186    /// to the provided address.
1187    pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1188        Ok(MailboxClient::new(channel::dial(addr)?))
1189    }
1190
1191    // Set up a watch for the tx's health.
1192    fn monitor_tx_health(
1193        mut rx: watch::Receiver<TxStatus>,
1194        cancel_token: CancellationToken,
1195        addr: ChannelAddr,
1196    ) {
1197        crate::init::get_runtime().spawn(async move {
1198            loop {
1199                tokio::select! {
1200                    changed = rx.changed() => {
1201                        if changed.is_err() || rx.borrow().is_closed() {
1202                            let reason = rx.borrow().as_closed().map(|r| r.to_string()).unwrap_or_else(|| "unknown".to_string());
1203                            tracing::warn!("connection to {} lost: {}", addr, reason);
1204                            // TODO: Potential for supervision event
1205                            // interaction here.
1206                            break;
1207                        }
1208                    }
1209                    _ = cancel_token.cancelled() => {
1210                        break;
1211                    }
1212                }
1213            }
1214        });
1215    }
1216}
1217
1218#[async_trait]
1219impl MailboxSender for MailboxClient {
1220    #[tracing::instrument(level = "debug", skip_all)]
1221    fn post_unchecked(
1222        &self,
1223        envelope: MessageEnvelope,
1224        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1225    ) {
1226        tracing::event!(target:"messages", tracing::Level::TRACE,  "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.actor_id(), "port"= envelope.dest.index(), "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1227        if let Err(err) = self.buffer.send((envelope, return_handle)) {
1228            let mpsc::error::SendError((envelope, return_handle)) = *err;
1229            let err = DeliveryError::BrokenLink(
1230                "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
1231            );
1232
1233            // Failed to enqueue.
1234            envelope.undeliverable(err, return_handle);
1235        } else {
1236            self.submitted.fetch_add(1, Ordering::SeqCst);
1237        }
1238    }
1239
1240    async fn flush(&self) -> Result<(), anyhow::Error> {
1241        let target = self.submitted.load(Ordering::SeqCst);
1242        loop {
1243            if self.completed.load(Ordering::SeqCst) >= target {
1244                return Ok(());
1245            }
1246            self.completed_notify.notified().await;
1247        }
1248    }
1249}
1250
1251/// Wrapper to turn `PortRef` into a `Sink`.
1252pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1253    cx: C,
1254    port: reference::PortRef<M>,
1255}
1256
1257impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1258    /// Create new PortSink
1259    pub fn new(cx: C, port: reference::PortRef<M>) -> Self {
1260        Self { cx, port }
1261    }
1262}
1263
1264impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1265    type Error = MailboxSenderError;
1266
1267    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1268        Poll::Ready(Ok(()))
1269    }
1270
1271    fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1272        self.port.send(&self.cx, item)
1273    }
1274
1275    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1276        Poll::Ready(Ok(()))
1277    }
1278
1279    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1280        Poll::Ready(Ok(()))
1281    }
1282}
1283
1284/// A mailbox coordinates message delivery to actors through typed
1285/// [`Port`]s associated with the mailbox.
1286#[derive(Clone, Debug)]
1287pub struct Mailbox {
1288    inner: Arc<State>,
1289}
1290
1291impl Mailbox {
1292    /// Create a new mailbox associated with the provided actor ID, using the provided
1293    /// forwarder for external destinations.
1294    pub fn new(actor_id: reference::ActorId, forwarder: BoxedMailboxSender) -> Self {
1295        Self {
1296            inner: Arc::new(State::new(actor_id, forwarder)),
1297        }
1298    }
1299
1300    /// Create a new detached mailbox associated with the provided actor ID.
1301    pub fn new_detached(actor_id: reference::ActorId) -> Self {
1302        Self {
1303            inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1304        }
1305    }
1306
1307    /// The actor id associated with this mailbox.
1308    pub fn actor_id(&self) -> &reference::ActorId {
1309        &self.inner.actor_id
1310    }
1311
1312    /// Open a new port that accepts M-typed messages. The returned
1313    /// port may be freely cloned, serialized, and passed around. The
1314    /// returned receiver should only be retained by the actor responsible
1315    /// for processing the delivered messages.
1316    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1317        let port_index = self.inner.allocate_port();
1318        let (sender, receiver) = mpsc::unbounded_channel::<M>();
1319        let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1320        tracing::trace!(
1321            name = "open_port",
1322            "opening port for {} at {}",
1323            self.inner.actor_id,
1324            port_id
1325        );
1326        (
1327            PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1328            PortReceiver::new(receiver, port_id, /*coalesce=*/ false, self.clone()),
1329        )
1330    }
1331
1332    /// Bind this message's actor port to this actor's mailbox. This method is
1333    /// normally used:
1334    ///   1. when we need to intercept a message sent to a handler, and re-route
1335    ///      that message to the returned receiver;
1336    ///   2. mock this message's handler when it is not implemented for this actor
1337    ///      type, with the returned receiver.
1338    pub(crate) fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1339        let (handle, receiver) = self.open_port();
1340        handle.bind_actor_port();
1341        (handle, receiver)
1342    }
1343
1344    /// Open a new port with an accumulator with default reduce options.
1345    /// See [`open_accum_port_opts`] for more details.
1346    pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1347    where
1348        A: Accumulator + Send + Sync + 'static,
1349        A::Update: Message,
1350        A::State: Message + Default + Clone,
1351    {
1352        self.open_accum_port_opts(accum, StreamingReducerOpts::default())
1353    }
1354
1355    /// Open a new port with an accumulator. This port accepts A::Update type
1356    /// messages, accumulate them into A::State with the given accumulator.
1357    /// The latest changed state can be received from the returned receiver as
1358    /// a single A::State message. If there is no new update, the receiver will
1359    /// not receive any message.
1360    ///
1361    /// If provided, reducer mode controls reduce operations.
1362    pub fn open_accum_port_opts<A>(
1363        &self,
1364        accum: A,
1365        streaming_opts: StreamingReducerOpts,
1366    ) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1367    where
1368        A: Accumulator + Send + Sync + 'static,
1369        A::Update: Message,
1370        A::State: Message + Default + Clone,
1371    {
1372        let port_index = self.inner.allocate_port();
1373        let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1374        let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1375        let state = Mutex::new(A::State::default());
1376        let reducer_spec = accum.reducer_spec();
1377        let enqueue = move |_, update: A::Update| {
1378            let mut state = state.lock().unwrap();
1379            accum.accumulate(&mut state, update)?;
1380            let _ = sender.send(state.clone());
1381            Ok(())
1382        };
1383        (
1384            PortHandle {
1385                mailbox: self.clone(),
1386                port_index,
1387                sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1388                bound: Arc::new(RwLock::new(None)),
1389                reducer_spec,
1390                streaming_opts,
1391            },
1392            PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()),
1393        )
1394    }
1395
1396    /// Open a port that accepts M-typed messages, using the provided function
1397    /// to enqueue.
1398    // TODO: consider making lifetime bound to Self instead.
1399    pub(crate) fn open_enqueue_port<M: Message>(
1400        &self,
1401        enqueue: impl Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1402    ) -> PortHandle<M> {
1403        PortHandle {
1404            mailbox: self.clone(),
1405            port_index: self.inner.allocate_port(),
1406            sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1407            bound: Arc::new(RwLock::new(None)),
1408            reducer_spec: None,
1409            streaming_opts: StreamingReducerOpts::default(),
1410        }
1411    }
1412
1413    /// Open a new one-shot port that accepts M-typed messages. The
1414    /// returned port may be used to send a single message; ditto the
1415    /// receiver may receive a single message.
1416    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1417        let port_index = self.inner.allocate_port();
1418        let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1419        let (sender, receiver) = oneshot::channel::<M>();
1420        (
1421            OncePortHandle {
1422                mailbox: self.clone(),
1423                port_index,
1424                port_id: port_id.clone(),
1425                sender,
1426                reducer_spec: None,
1427            },
1428            OncePortReceiver {
1429                receiver: Some(receiver),
1430                port_id,
1431                mailbox: self.clone(),
1432            },
1433        )
1434    }
1435
1436    /// Open a new one-shot port with a reducer. This port is designed
1437    /// to be used with casting, where the port is split across multiple
1438    /// destinations and responses are accumulated using the reducer.
1439    /// The accumulator type must have a ReducerSpec.
1440    ///
1441    /// The returned handle can be bound and embedded in cast messages.
1442    /// When the message is split by CommActor, each destination receives a
1443    /// split port. Responses to split ports are accumulated using the
1444    /// accumulator's reducer, and the final accumulated result is delivered
1445    /// to the returned receiver.
1446    ///
1447    /// Note: For accumulators used with casting, `Update` and `State` types
1448    /// must be the same (e.g., `sum<u64>` where both are `u64`).
1449    pub fn open_reduce_port<A, T>(
1450        &self,
1451        accum: A,
1452    ) -> (OncePortHandle<A::State>, OncePortReceiver<A::State>)
1453    where
1454        A: Accumulator<State = T, Update = T> + Send + Sync + 'static,
1455        T: Message + Default + Clone,
1456    {
1457        let port_index = self.inner.allocate_port();
1458        let (sender, receiver) = oneshot::channel::<T>();
1459        let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1460        let reducer_spec = accum.reducer_spec();
1461        assert!(
1462            reducer_spec.is_some(),
1463            "cannot use a reduce port without a ReducerSpec"
1464        );
1465
1466        (
1467            OncePortHandle {
1468                mailbox: self.clone(),
1469                port_index,
1470                port_id: port_id.clone(),
1471                sender,
1472                reducer_spec,
1473            },
1474            OncePortReceiver {
1475                receiver: Some(receiver),
1476                port_id,
1477                mailbox: self.clone(),
1478            },
1479        )
1480    }
1481
1482    #[allow(dead_code)]
1483    fn error(&self, err: MailboxErrorKind) -> MailboxError {
1484        MailboxError::new(self.inner.actor_id.clone(), err)
1485    }
1486
1487    fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1488        let port_index = M::port();
1489        self.inner.ports.get(&port_index).and_then(|boxed| {
1490            boxed
1491                .as_any()
1492                .downcast_ref::<UnboundedSender<M>>()
1493                .map(|s| {
1494                    assert_eq!(
1495                        s.port_id,
1496                        self.actor_id().port_id(port_index),
1497                        "port_id mismatch in downcasted UnboundedSender"
1498                    );
1499                    s.sender.clone()
1500                })
1501        })
1502    }
1503
1504    /// Retrieve the bound undeliverable message port handle.
1505    pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1506        self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1507            .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1508    }
1509
1510    pub(crate) fn allocate_port(&self) -> u64 {
1511        self.inner.allocate_port()
1512    }
1513
1514    fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> reference::PortRef<M> {
1515        assert_eq!(
1516            handle.mailbox.actor_id(),
1517            self.actor_id(),
1518            "port does not belong to mailbox"
1519        );
1520
1521        // TODO: don't even allocate a port until the port is bound. Possibly
1522        // have handles explicitly staged (unbound, bound).
1523        let port_id = self.actor_id().port_id(handle.port_index);
1524        match self.inner.ports.entry(handle.port_index) {
1525            Entry::Vacant(entry) => {
1526                entry.insert(Box::new(UnboundedSender::new(
1527                    handle.sender.clone(),
1528                    port_id.clone(),
1529                )));
1530            }
1531            Entry::Occupied(_entry) => {}
1532        }
1533
1534        reference::PortRef::attest(port_id)
1535    }
1536
1537    fn bind_to_actor_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
1538        assert_eq!(
1539            handle.mailbox.actor_id(),
1540            self.actor_id(),
1541            "port does not belong to mailbox"
1542        );
1543
1544        let port_index = M::port();
1545        let port_id = self.actor_id().port_id(port_index);
1546        match self.inner.ports.entry(port_index) {
1547            Entry::Vacant(entry) => {
1548                entry.insert(Box::new(UnboundedSender::new(
1549                    handle.sender.clone(),
1550                    port_id,
1551                )));
1552            }
1553            Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1554        }
1555    }
1556
1557    fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1558        let port_id = handle.port_id().clone();
1559        match self.inner.ports.entry(handle.port_index) {
1560            Entry::Vacant(entry) => {
1561                entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1562            }
1563            Entry::Occupied(_entry) => {}
1564        }
1565    }
1566
1567    pub(crate) fn bind_untyped(&self, port_id: &reference::PortId, sender: UntypedUnboundedSender) {
1568        assert_eq!(
1569            port_id.actor_id(),
1570            self.actor_id(),
1571            "port does not belong to mailbox"
1572        );
1573
1574        match self.inner.ports.entry(port_id.index()) {
1575            Entry::Vacant(entry) => {
1576                entry.insert(Box::new(sender));
1577            }
1578            Entry::Occupied(_entry) => {}
1579        }
1580    }
1581
1582    pub(crate) fn close(&self, status: ActorStatus) {
1583        let mut closed = self.inner.closed.write().unwrap();
1584        if closed.is_some() {
1585            panic!("mailbox with owner {} already closed", self.actor_id());
1586        }
1587        let _ = closed.insert(status);
1588    }
1589}
1590
1591impl context::Mailbox for Mailbox {
1592    fn mailbox(&self) -> &Mailbox {
1593        self
1594    }
1595}
1596
1597// TODO: figure out what to do with these interfaces -- possibly these caps
1598// do not have to be private.
1599
1600/// Open a port given a capability.
1601pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1602    cx.mailbox().open_port()
1603}
1604
1605/// Open a one-shot port given a capability. This is a public method primarily to
1606/// enable macro-generated clients.
1607pub fn open_once_port<M: Message>(
1608    cx: &impl context::Mailbox,
1609) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1610    cx.mailbox().open_once_port()
1611}
1612
1613#[async_trait]
1614impl MailboxSender for Mailbox {
1615    /// Deliver a serialized message to the provided port ID. This method fails
1616    /// if the message does not deserialize into the expected type.
1617    fn post_unchecked(
1618        &self,
1619        envelope: MessageEnvelope,
1620        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1621    ) {
1622        metrics::MAILBOX_POSTS.add(
1623            1,
1624            hyperactor_telemetry::kv_pairs!(
1625                "actor_id" => envelope.sender.to_string(),
1626                "dest_actor_id" => envelope.dest.actor_id().to_string(),
1627            ),
1628        );
1629        tracing::trace!(
1630            name = "post",
1631            actor_name = envelope.sender.name(),
1632            actor_id = envelope.sender.to_string(),
1633            "posting message to {}",
1634            envelope.dest
1635        );
1636
1637        if envelope.dest().actor_id() != &self.inner.actor_id {
1638            return self.inner.forwarder.post(envelope, return_handle);
1639        }
1640
1641        match self.inner.ports.entry(envelope.dest().index()) {
1642            Entry::Vacant(_) => {
1643                let err = DeliveryError::Unroutable(format!(
1644                    "port not bound in mailbox; port id: {}; message type: {}",
1645                    envelope.dest().index(),
1646                    envelope.data().typename().map_or_else(
1647                        || format!("unregistered type hash {}", envelope.data().typehash()),
1648                        |s| s.to_string(),
1649                    )
1650                ));
1651
1652                envelope.undeliverable(err, return_handle);
1653            }
1654            Entry::Occupied(entry) => {
1655                let closed = self.inner.closed.read().unwrap();
1656                if let Some(status) = &*closed {
1657                    match status {
1658                        ActorStatus::Stopped(reason) => {
1659                            let err = format!(
1660                                "mailbox owner {} is stopped: {}",
1661                                self.inner.actor_id, reason
1662                            );
1663                            return envelope
1664                                .undeliverable(DeliveryError::Mailbox(err), return_handle);
1665                        }
1666                        ActorStatus::Failed(actor_error) => {
1667                            let err = format!(
1668                                "mailbox owner {} failed: {}",
1669                                self.inner.actor_id, actor_error
1670                            );
1671                            return envelope
1672                                .undeliverable(DeliveryError::Mailbox(err), return_handle);
1673                        }
1674                        _ => {
1675                            let err = format!(
1676                                "mailbox owner {} closed unexpectedly: {:?}",
1677                                self.inner.actor_id, status
1678                            );
1679                            return envelope
1680                                .undeliverable(DeliveryError::Mailbox(err), return_handle);
1681                        }
1682                    }
1683                }
1684
1685                let (metadata, data) = envelope.open();
1686                let MessageMetadata {
1687                    mut headers,
1688                    sender,
1689                    dest,
1690                    errors: metadata_errors,
1691                    ttl,
1692                    return_undeliverable,
1693                } = metadata;
1694
1695                let to_actor_id = hash_to_u64(&dest);
1696                let message_id = hyperactor_telemetry::generate_message_id(to_actor_id);
1697                headers.set(crate::mailbox::headers::TELEMETRY_MESSAGE_ID, message_id);
1698                // Only set sender hash if not already present (cast path
1699                // pre-sets it with the originating actor).
1700                if !headers.contains_key(crate::mailbox::headers::SENDER_ACTOR_ID_HASH) {
1701                    headers.set(
1702                        crate::mailbox::headers::SENDER_ACTOR_ID_HASH,
1703                        hash_to_u64(&sender),
1704                    );
1705                }
1706                headers.set(crate::mailbox::headers::TELEMETRY_PORT_ID, dest.index());
1707
1708                // We use the entry API here so that we can remove the
1709                // entry while holding an (entry) reference. The DashMap
1710                // documentation suggests that deadlocks are possible
1711                // "when holding any sort of reference into the map",
1712                // but surely this applies only to the same thread? This
1713                // would also imply we have to be careful holding any
1714                // sort of reference across .await points.
1715                match entry.get().send_serialized(headers, data) {
1716                    Ok(false) => {
1717                        hyperactor_telemetry::notify_message_status(
1718                            hyperactor_telemetry::MessageStatusEvent {
1719                                timestamp: std::time::SystemTime::now(),
1720                                id: hyperactor_telemetry::generate_status_event_id(message_id),
1721                                message_id,
1722                                status: "queued".to_string(),
1723                            },
1724                        );
1725                        entry.remove();
1726                    }
1727                    Ok(true) => {
1728                        hyperactor_telemetry::notify_message_status(
1729                            hyperactor_telemetry::MessageStatusEvent {
1730                                timestamp: std::time::SystemTime::now(),
1731                                id: hyperactor_telemetry::generate_status_event_id(message_id),
1732                                message_id,
1733                                status: "queued".to_string(),
1734                            },
1735                        );
1736                    }
1737                    Err(SerializedSenderError {
1738                        data,
1739                        error: sender_error,
1740                        headers,
1741                    }) => {
1742                        entry.remove();
1743                        let err = DeliveryError::Mailbox(format!("{}", sender_error));
1744
1745                        MessageEnvelope::seal(
1746                            MessageMetadata {
1747                                headers,
1748                                sender,
1749                                dest,
1750                                errors: metadata_errors,
1751                                ttl,
1752                                return_undeliverable,
1753                            },
1754                            data,
1755                        )
1756                        .undeliverable(err, return_handle)
1757                    }
1758                }
1759            }
1760        }
1761    }
1762}
1763
1764/// A port to which M-typed messages can be delivered. Ports may be
1765/// serialized to be sent to other actors. However, when a port is
1766/// deserialized, it may no longer be used to send messages directly
1767/// to a mailbox since it is no longer associated with a local mailbox
1768/// ([`Mailbox::send`] will fail). However, the runtime may accept
1769/// remote Ports, and arrange for these messages to be delivered
1770/// indirectly through inter-node message passing.
1771#[derive(Debug)]
1772pub struct PortHandle<M: Message> {
1773    mailbox: Mailbox,
1774    port_index: u64,
1775    sender: UnboundedPortSender<M>,
1776    // We would like this to be a Arc<RwLock<Option<PortRef<M>>>>, but we cannot
1777    // write down the type PortRef<M> (M: Message), even though we cannot
1778    // legally construct such a value without M: RemoteMessage. We could consider
1779    // making PortRef<M> valid for M: Message, but constructible only for
1780    // M: RemoteMessage, but the guarantees offered by the impossibilty of even
1781    // writing down the type are appealing.
1782    bound: Arc<RwLock<Option<reference::PortId>>>,
1783    // Typehash of an optional reducer. When it's defined, we include it in port
1784    /// references to optionally enable incremental accumulation.
1785    reducer_spec: Option<ReducerSpec>,
1786    /// Streaming reducer options.
1787    streaming_opts: StreamingReducerOpts,
1788}
1789
1790impl<M: Message> PortHandle<M> {
1791    fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1792        Self {
1793            mailbox,
1794            port_index,
1795            sender,
1796            bound: Arc::new(RwLock::new(None)),
1797            reducer_spec: None,
1798            streaming_opts: StreamingReducerOpts::default(),
1799        }
1800    }
1801
1802    pub(crate) fn location(&self) -> PortLocation {
1803        match self.bound.read().unwrap().as_ref() {
1804            Some(port_id) => PortLocation::Bound(port_id.clone()),
1805            None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1806        }
1807    }
1808
1809    /// Send a message to this port.
1810    pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1811        let closed = self.mailbox.inner.closed.read().unwrap();
1812
1813        if let Some(status) = &*closed {
1814            let err = MailboxError {
1815                actor_id: self.mailbox.actor_id().clone(),
1816                kind: MailboxErrorKind::OwnerTerminated(status.clone()),
1817            };
1818            return Err(MailboxSenderError::new_unbound::<M>(
1819                self.mailbox.actor_id().clone(),
1820                MailboxSenderErrorKind::Mailbox(err),
1821            ));
1822        }
1823
1824        let mut headers = Flattrs::new();
1825
1826        crate::mailbox::headers::set_send_timestamp(&mut headers);
1827        crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1828        // Hold read lock while checking and sending to prevent race with bind().
1829        // Message sent from handle is delivered immediately. It could race with
1830        // messages from refs. So we need to assign seq if the handle is bound.
1831        let bound_guard = self.bound.read().unwrap();
1832        if let Some(bound_port) = bound_guard.as_ref() {
1833            let sequencer = cx.instance().sequencer();
1834            let seq_info = sequencer.assign_seq(bound_port);
1835            headers.set(SEQ_INFO, seq_info);
1836        } else {
1837            // Because the port is not bound, messages can only be sent through
1838            // this port handle's underlying tokio channel directly. As a result,
1839            // we do not need to assign seq to the message. We do not need to
1840            // worry about race condition due to bound_guard.
1841            headers.set(SEQ_INFO, SeqInfo::Direct);
1842        }
1843        // Encountering error means the port is closed. So we do not need to
1844        // rollback the seq, because no message can be delivered to it, and
1845        // subsequently do not need to worry about out-of-sequence for messages
1846        // after this seq.
1847        //
1848        // Theoretically, we could have deadlock if
1849        //   1. `sender.send` attempts to hold read lock of this PortHandle's
1850        //      `bound` field, and in the meantime,
1851        //   2.  another thread is trying to bind and thus waiting for write lock.
1852        // But we do not expect `sender.send` to use the same PortHandle, so this
1853        // deadlock scenario should not happen.
1854        self.sender.send(headers, message).map_err(|err| {
1855            MailboxSenderError::new_unbound::<M>(
1856                self.mailbox.actor_id().clone(),
1857                MailboxSenderErrorKind::Other(err),
1858            )
1859        })
1860    }
1861
1862    /// A contravariant map: using the provided function to translate
1863    /// `R`-typed messages to `M`-typed ones, delivered on this port.
1864    pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
1865    where
1866        R: Message,
1867        F: Fn(R) -> M + Send + Sync + 'static,
1868    {
1869        let port_index = self.mailbox.inner.allocate_port();
1870        let sender = self.sender.clone();
1871        PortHandle::new(
1872            self.mailbox.clone(),
1873            port_index,
1874            UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
1875                sender.send(headers, unmap(value))
1876            })),
1877        )
1878    }
1879}
1880
1881impl<M: RemoteMessage> PortHandle<M> {
1882    /// Bind this port, making it accessible to remote actors.
1883    pub fn bind(&self) -> reference::PortRef<M> {
1884        let port_id = {
1885            let mut guard = self.bound.write().unwrap();
1886            guard
1887                .get_or_insert_with(|| self.mailbox.bind(self).port_id().clone())
1888                .clone()
1889        };
1890        reference::PortRef::attest_reducible(
1891            port_id,
1892            self.reducer_spec.clone(),
1893            self.streaming_opts.clone(),
1894        )
1895    }
1896
1897    /// Bind to this message's actor port. This method will panic if the handle
1898    /// is already bound.
1899    ///
1900    /// This is used by [`actor::Binder`] implementations to bind actor refs.
1901    /// This is not intended for general use.
1902    pub(crate) fn bind_actor_port(&self) {
1903        let port_id = self.mailbox.actor_id().port_id(M::port());
1904        {
1905            let mut guard = self.bound.write().unwrap();
1906            if guard.is_some() {
1907                panic!(
1908                    "could not bind port handle {} as {port_id}: already bound",
1909                    self.port_index
1910                );
1911            }
1912            *guard = Some(port_id);
1913        }
1914        self.mailbox.bind_to_actor_port(self);
1915    }
1916}
1917
1918impl<M: Message> Clone for PortHandle<M> {
1919    fn clone(&self) -> Self {
1920        Self {
1921            mailbox: self.mailbox.clone(),
1922            port_index: self.port_index,
1923            sender: self.sender.clone(),
1924            bound: self.bound.clone(),
1925            reducer_spec: self.reducer_spec.clone(),
1926            streaming_opts: self.streaming_opts.clone(),
1927        }
1928    }
1929}
1930
1931impl<M: Message> fmt::Display for PortHandle<M> {
1932    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1933        fmt::Display::fmt(&self.location(), f)
1934    }
1935}
1936
1937/// A one-shot port handle to which M-typed messages can be delivered.
1938#[derive(Debug)]
1939pub struct OncePortHandle<M: Message> {
1940    mailbox: Mailbox,
1941    port_index: u64,
1942    port_id: reference::PortId,
1943    sender: oneshot::Sender<M>,
1944    reducer_spec: Option<ReducerSpec>,
1945}
1946
1947impl<M: Message> OncePortHandle<M> {
1948    /// This port's ID.
1949    // TODO: make value
1950    pub fn port_id(&self) -> &reference::PortId {
1951        &self.port_id
1952    }
1953
1954    /// Send a message to this port. The send operation will consume the
1955    /// port handle, as the port accepts at most one message.
1956    pub fn send(self, _cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1957        // TODO: Assign seq to the message if the port is bound to an actor port
1958        // in the future.
1959        assert!(
1960            !self.port_id().is_actor_port(),
1961            "OncePortHandle currently does not support actor ports; a \
1962            prerequisite of that support is to assign seq to messages \
1963            if the port is actor port."
1964        );
1965
1966        let actor_id = self.mailbox.actor_id().clone();
1967        self.sender.send(message).map_err(|_| {
1968            // Here, the value is returned when the port is
1969            // closed.  We should consider having a similar
1970            // API for send_once, though arguably it makes less
1971            // sense in this context.
1972            MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1973        })?;
1974        Ok(())
1975    }
1976}
1977
1978impl<M: RemoteMessage> OncePortHandle<M> {
1979    /// Turn this handle into a ref that may be passed to
1980    /// a remote actor. The remote actor can then use the
1981    /// ref to send a message to the port. Creating a ref also
1982    /// binds the port, so that it is remotely writable.
1983    pub fn bind(self) -> reference::OncePortRef<M> {
1984        let port_id = self.port_id().clone();
1985        let reducer_spec = self.reducer_spec.clone();
1986        self.mailbox.clone().bind_once(self);
1987        reference::OncePortRef::attest_reducible(port_id, reducer_spec)
1988    }
1989}
1990
1991impl<M: Message> fmt::Display for OncePortHandle<M> {
1992    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1993        fmt::Display::fmt(&self.port_id(), f)
1994    }
1995}
1996
1997/// A receiver of M-typed messages, used by actors to receive messages
1998/// on open ports.
1999#[derive(Debug)]
2000pub struct PortReceiver<M> {
2001    receiver: mpsc::UnboundedReceiver<M>,
2002    port_id: reference::PortId,
2003    /// When multiple messages are put in channel, only receive the latest one
2004    /// if coalesce is true. Other messages will be discarded.
2005    coalesce: bool,
2006    /// State is used to remove the port from service when the receiver
2007    /// is dropped.
2008    mailbox: Mailbox,
2009}
2010
2011impl<M> PortReceiver<M> {
2012    fn new(
2013        receiver: mpsc::UnboundedReceiver<M>,
2014        port_id: reference::PortId,
2015        coalesce: bool,
2016        mailbox: Mailbox,
2017    ) -> Self {
2018        Self {
2019            receiver,
2020            port_id,
2021            coalesce,
2022            mailbox,
2023        }
2024    }
2025
2026    /// Tries to receive the next value for this receiver.
2027    /// This function returns `Ok(None)` if the receiver is empty
2028    /// and returns a MailboxError if the receiver is disconnected.
2029    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxError`.
2030    pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
2031        let mut next = self.receiver.try_recv();
2032        // To coalesce, drain the mpsc queue and only keep the last one.
2033        if self.coalesce
2034            && let Some(latest) = self.drain().pop()
2035        {
2036            next = Ok(latest);
2037        }
2038        match next {
2039            Ok(msg) => Ok(Some(msg)),
2040            Err(mpsc::error::TryRecvError::Empty) => Ok(None),
2041            Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
2042                self.actor_id().clone(),
2043                MailboxErrorKind::Closed,
2044            )),
2045        }
2046    }
2047
2048    /// Receive the next message from the port corresponding with this
2049    /// receiver.
2050    pub async fn recv(&mut self) -> Result<M, MailboxError> {
2051        let mut next = self.receiver.recv().await;
2052        // To coalesce, get the last message from the queue if there are
2053        // more on the mspc queue.
2054        if self.coalesce
2055            && let Some(latest) = self.drain().pop()
2056        {
2057            next = Some(latest);
2058        }
2059        next.ok_or(MailboxError::new(
2060            self.actor_id().clone(),
2061            MailboxErrorKind::Closed,
2062        ))
2063    }
2064
2065    /// Drains all available messages from the port.
2066    pub fn drain(&mut self) -> Vec<M> {
2067        let mut drained: Vec<M> = Vec::new();
2068        while let Ok(msg) = self.receiver.try_recv() {
2069            // To coalesce, discard the old message if there is any.
2070            if self.coalesce {
2071                drained.pop();
2072            }
2073            drained.push(msg);
2074        }
2075        drained
2076    }
2077
2078    fn port(&self) -> u64 {
2079        self.port_id.index()
2080    }
2081
2082    fn actor_id(&self) -> &reference::ActorId {
2083        self.port_id.actor_id()
2084    }
2085}
2086
2087impl<M> Drop for PortReceiver<M> {
2088    fn drop(&mut self) {
2089        // MARIUS: do we need to tombstone these? or should we
2090        // error out if we have removed the receiver before serializing the port ref?
2091        // ("no longer live")?
2092        self.mailbox.inner.ports.remove(&self.port());
2093    }
2094}
2095
2096impl<M> Stream for PortReceiver<M> {
2097    type Item = Result<M, MailboxError>;
2098
2099    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2100        std::pin::pin!(self.recv()).poll(cx).map(Some)
2101    }
2102}
2103
2104/// A receiver of M-typed messages from [`OncePort`]s.
2105pub struct OncePortReceiver<M> {
2106    receiver: Option<oneshot::Receiver<M>>,
2107    port_id: reference::PortId,
2108
2109    /// Mailbox is used to remove the port from service when the receiver
2110    /// is dropped.
2111    mailbox: Mailbox,
2112}
2113
2114impl<M> OncePortReceiver<M> {
2115    /// Receive message from the one-shot port associated with this
2116    /// receiver.  Recv consumes the receiver: it is no longer valid
2117    /// after this call.
2118    pub async fn recv(mut self) -> Result<M, MailboxError> {
2119        std::mem::take(&mut self.receiver)
2120            .unwrap()
2121            .await
2122            .map_err(|err| {
2123                MailboxError::new(
2124                    self.actor_id().clone(),
2125                    MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
2126                )
2127            })
2128    }
2129
2130    fn port(&self) -> u64 {
2131        self.port_id.index()
2132    }
2133
2134    fn actor_id(&self) -> &reference::ActorId {
2135        self.port_id.actor_id()
2136    }
2137}
2138
2139impl<M> Drop for OncePortReceiver<M> {
2140    fn drop(&mut self) {
2141        // MARIUS: do we need to tombstone these? or should we
2142        // error out if we have removed the receiver before serializing the port ref?
2143        // ("no longer live")?
2144        self.mailbox.inner.ports.remove(&self.port());
2145    }
2146}
2147
2148/// Error that that occur during SerializedSender's send operation.
2149pub struct SerializedSenderError {
2150    /// The headers associated with the message.
2151    pub headers: Flattrs,
2152    /// The message was tried to send.
2153    pub data: wirevalue::Any,
2154    /// The mailbox sender error that occurred.
2155    pub error: MailboxSenderError,
2156}
2157
2158/// SerializedSender encapsulates senders:
2159///   - It performs type erasure (and thus it is object-safe).
2160///   - It abstracts over [`Port`]s and [`OncePort`]s, by dynamically tracking the
2161///     validity of the underlying port.
2162trait SerializedSender: Send + Sync {
2163    /// Enables downcasting from `&dyn SerializedSender` to concrete
2164    /// types.
2165    ///
2166    /// Used by `Mailbox::lookup_sender` to downcast to
2167    /// `&UnboundedSender<M>` via `Any::downcast_ref`.
2168    fn as_any(&self) -> &dyn Any;
2169
2170    /// Send a serialized message. SerializedSender will deserialize the
2171    /// message (failing if it fails to deserialize), and then send the
2172    /// resulting message on the underlying port.
2173    ///
2174    /// Send_serialized returns true whenever the port remains valid
2175    /// after the send operation.
2176    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SerializedSender`.
2177    fn send_serialized(
2178        &self,
2179        headers: Flattrs,
2180        serialized: wirevalue::Any,
2181    ) -> Result<bool, SerializedSenderError>;
2182}
2183
2184/// A sender to an M-typed unbounded port.
2185enum UnboundedPortSender<M: Message> {
2186    /// Send directly to the mpsc queue.
2187    Mpsc(mpsc::UnboundedSender<M>),
2188    /// Use the provided function to enqueue the item.
2189    Func(Arc<dyn Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
2190}
2191
2192impl<M: Message> UnboundedPortSender<M> {
2193    fn send(&self, headers: Flattrs, message: M) -> Result<(), anyhow::Error> {
2194        match self {
2195            Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
2196            Self::Func(func) => func(headers, message),
2197        }
2198    }
2199}
2200
2201// We implement Clone manually as derive(Clone) places unnecessarily
2202// strict bounds on the type parameter M.
2203impl<M: Message> Clone for UnboundedPortSender<M> {
2204    fn clone(&self) -> Self {
2205        match self {
2206            Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
2207            Self::Func(func) => Self::Func(func.clone()),
2208        }
2209    }
2210}
2211
2212impl<M: Message> Debug for UnboundedPortSender<M> {
2213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2214        match self {
2215            Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
2216            Self::Func(_) => f
2217                .debug_tuple("UnboundedPortSender::Func")
2218                .field(&"..")
2219                .finish(),
2220        }
2221    }
2222}
2223
2224struct UnboundedSender<M: Message> {
2225    sender: UnboundedPortSender<M>,
2226    port_id: reference::PortId,
2227}
2228
2229impl<M: Message> UnboundedSender<M> {
2230    /// Create a new UnboundedSender encapsulating the provided
2231    /// sender.
2232    fn new(sender: UnboundedPortSender<M>, port_id: reference::PortId) -> Self {
2233        Self { sender, port_id }
2234    }
2235
2236    #[allow(dead_code)]
2237    fn send(&self, headers: Flattrs, message: M) -> Result<(), MailboxSenderError> {
2238        self.sender.send(headers, message).map_err(|err| {
2239            MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
2240        })
2241    }
2242}
2243
2244// Clone is implemented explicitly because the derive macro demands M:
2245// Clone directly. In this case, it isn't needed because Arc<T> can
2246// clone for any T.
2247impl<M: Message> Clone for UnboundedSender<M> {
2248    fn clone(&self) -> Self {
2249        Self {
2250            sender: self.sender.clone(),
2251            port_id: self.port_id.clone(),
2252        }
2253    }
2254}
2255
2256impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
2257    fn as_any(&self) -> &dyn Any {
2258        self
2259    }
2260
2261    fn send_serialized(
2262        &self,
2263        headers: Flattrs,
2264        serialized: wirevalue::Any,
2265    ) -> Result<bool, SerializedSenderError> {
2266        // Here, the stack ensures that this port is only instantiated for M-typed messages.
2267        // This does not protect against bad senders (e.g., encoding wrongly-typed messages),
2268        // but it is required as we have some usages that rely on representational equivalence
2269        // to provide type indexing, specifically in `IndexedErasedUnbound` which is used to
2270        // support port aggregation.
2271        match serialized.deserialized_unchecked() {
2272            Ok(message) => {
2273                self.sender.send(headers.clone(), message).map_err(|err| {
2274                    SerializedSenderError {
2275                        data: serialized,
2276                        error: MailboxSenderError::new_bound(
2277                            self.port_id.clone(),
2278                            MailboxSenderErrorKind::Other(err),
2279                        ),
2280                        headers,
2281                    }
2282                })?;
2283
2284                Ok(true)
2285            }
2286            Err(err) => Err(SerializedSenderError {
2287                data: serialized,
2288                error: MailboxSenderError::new_bound(
2289                    self.port_id.clone(),
2290                    MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2291                ),
2292                headers,
2293            }),
2294        }
2295    }
2296}
2297
2298/// OnceSender encapsulates an underlying one-shot sender, dynamically
2299/// tracking its validity.
2300#[derive(Debug)]
2301struct OnceSender<M: Message> {
2302    sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
2303    port_id: reference::PortId,
2304}
2305
2306impl<M: Message> OnceSender<M> {
2307    /// Create a new OnceSender encapsulating the provided one-shot
2308    /// sender.
2309    fn new(sender: oneshot::Sender<M>, port_id: reference::PortId) -> Self {
2310        Self {
2311            sender: Arc::new(Mutex::new(Some(sender))),
2312            port_id,
2313        }
2314    }
2315
2316    fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2317        // TODO: we should replace the sender on error
2318        match self.sender.lock().unwrap().take() {
2319            None => Err(MailboxSenderError::new_bound(
2320                self.port_id.clone(),
2321                MailboxSenderErrorKind::Closed,
2322            )),
2323            Some(sender) => {
2324                sender.send(message).map_err(|_| {
2325                    // Here, the value is returned when the port is
2326                    // closed.  We should consider having a similar
2327                    // API for send_once, though arguably it makes less
2328                    // sense in this context.
2329                    MailboxSenderError::new_bound(
2330                        self.port_id.clone(),
2331                        MailboxSenderErrorKind::Closed,
2332                    )
2333                })?;
2334                Ok(false)
2335            }
2336        }
2337    }
2338}
2339
2340// Clone is implemented explicitly because the derive macro demands M:
2341// Clone directly. In this case, it isn't needed because Arc<T> can
2342// clone for any T.
2343impl<M: Message> Clone for OnceSender<M> {
2344    fn clone(&self) -> Self {
2345        Self {
2346            sender: self.sender.clone(),
2347            port_id: self.port_id.clone(),
2348        }
2349    }
2350}
2351
2352impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2353    fn as_any(&self) -> &dyn Any {
2354        self
2355    }
2356
2357    fn send_serialized(
2358        &self,
2359        headers: Flattrs,
2360        serialized: wirevalue::Any,
2361    ) -> Result<bool, SerializedSenderError> {
2362        match serialized.deserialized() {
2363            Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2364                data: serialized,
2365                error: e,
2366                headers,
2367            }),
2368            Err(err) => Err(SerializedSenderError {
2369                data: serialized,
2370                error: MailboxSenderError::new_bound(
2371                    self.port_id.clone(),
2372                    MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2373                ),
2374                headers,
2375            }),
2376        }
2377    }
2378}
2379
2380/// Use the provided function to send untyped messages (i.e. Any objects).
2381pub(crate) struct UntypedUnboundedSender {
2382    pub(crate) sender:
2383        Box<dyn Fn(wirevalue::Any) -> Result<bool, (wirevalue::Any, anyhow::Error)> + Send + Sync>,
2384    pub(crate) port_id: reference::PortId,
2385}
2386
2387impl SerializedSender for UntypedUnboundedSender {
2388    fn as_any(&self) -> &dyn Any {
2389        self
2390    }
2391
2392    fn send_serialized(
2393        &self,
2394        headers: Flattrs,
2395        serialized: wirevalue::Any,
2396    ) -> Result<bool, SerializedSenderError> {
2397        (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2398            data,
2399            error: MailboxSenderError::new_bound(
2400                self.port_id.clone(),
2401                MailboxSenderErrorKind::Other(err),
2402            ),
2403            headers,
2404        })
2405    }
2406}
2407
2408/// State is the internal state of the mailbox.
2409struct State {
2410    /// The ID of the mailbox owner.
2411    actor_id: reference::ActorId,
2412
2413    // insert if it's serializable; otherwise don't.
2414    /// The set of active ports in the mailbox. All currently
2415    /// allocated ports are
2416    ports: DashMap<u64, Box<dyn SerializedSender>>,
2417
2418    /// The next port ID to allocate.
2419    next_port: AtomicU64,
2420
2421    /// The forwarder for this mailbox.
2422    forwarder: BoxedMailboxSender,
2423
2424    /// If a value is present, the mailbox has been closed with the provided
2425    /// status, and any subsequent `Mailbox::post_unchecked` calls will fail.
2426    closed: RwLock<Option<ActorStatus>>,
2427}
2428
2429impl State {
2430    /// Create a new state with the provided owning ActorId.
2431    fn new(actor_id: reference::ActorId, forwarder: BoxedMailboxSender) -> Self {
2432        Self {
2433            actor_id,
2434            ports: DashMap::new(),
2435            // The first 1024 ports are allocated to actor handlers.
2436            // Other port IDs are ephemeral.
2437            next_port: AtomicU64::new(USER_PORT_OFFSET),
2438            forwarder,
2439            closed: RwLock::new(None),
2440        }
2441    }
2442
2443    /// Allocate a fresh port.
2444    fn allocate_port(&self) -> u64 {
2445        self.next_port.fetch_add(1, Ordering::SeqCst)
2446    }
2447}
2448
2449impl fmt::Debug for State {
2450    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2451        f.debug_struct("State")
2452            .field("actor_id", &self.actor_id)
2453            .field(
2454                "open_ports",
2455                &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2456            )
2457            .field("next_port", &self.next_port)
2458            .finish()
2459    }
2460}
2461
2462// TODO: mux based on some parameterized type. (mux key).
2463/// An in-memory mailbox muxer. This is used to route messages to
2464/// different underlying senders.
2465#[derive(Clone)]
2466pub struct MailboxMuxer {
2467    mailboxes: Arc<DashMap<reference::ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2468}
2469
2470impl Default for MailboxMuxer {
2471    fn default() -> Self {
2472        Self::new()
2473    }
2474}
2475
2476impl MailboxMuxer {
2477    /// Create a new, empty, muxer.
2478    pub fn new() -> Self {
2479        Self {
2480            mailboxes: Arc::new(DashMap::new()),
2481        }
2482    }
2483
2484    /// Route messages destined for the provided actor id to the provided
2485    /// sender. Returns false if there is already a sender associated
2486    /// with the actor. In this case, the sender is not replaced, and
2487    /// the caller must [`MailboxMuxer::unbind`] it first.
2488    pub fn bind(&self, actor_id: reference::ActorId, sender: impl MailboxSender + 'static) -> bool {
2489        match self.mailboxes.entry(actor_id) {
2490            Entry::Occupied(_) => false,
2491            Entry::Vacant(entry) => {
2492                entry.insert(Box::new(sender));
2493                true
2494            }
2495        }
2496    }
2497
2498    /// Convenience function to bind a mailbox.
2499    pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2500        self.bind(mailbox.actor_id().clone(), mailbox)
2501    }
2502
2503    /// Unbind the sender associated with the provided actor ID. After
2504    /// unbinding, the muxer will no longer be able to send messages to
2505    /// that actor.
2506    #[allow(dead_code)]
2507    pub(crate) fn unbind(&self, actor_id: &reference::ActorId) {
2508        self.mailboxes.remove(actor_id);
2509    }
2510
2511    /// Returns a list of all actors bound to this muxer. Useful in debugging.
2512    pub fn bound_actors(&self) -> Vec<reference::ActorId> {
2513        self.mailboxes.iter().map(|e| e.key().clone()).collect()
2514    }
2515}
2516
2517#[async_trait]
2518impl MailboxSender for MailboxMuxer {
2519    fn post_unchecked(
2520        &self,
2521        envelope: MessageEnvelope,
2522        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2523    ) {
2524        let dest_actor_id = envelope.dest().actor_id();
2525        match self.mailboxes.get(dest_actor_id) {
2526            None => {
2527                let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2528                envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2529            }
2530            Some(sender) => sender.post(envelope, return_handle),
2531        }
2532    }
2533
2534    async fn flush(&self) -> Result<(), anyhow::Error> {
2535        let keys: Vec<_> = self
2536            .mailboxes
2537            .iter()
2538            .map(|entry| entry.key().clone())
2539            .collect();
2540        for key in keys {
2541            if let Some(sender) = self.mailboxes.get(&key) {
2542                sender.value().flush().await?;
2543            }
2544        }
2545        Ok(())
2546    }
2547}
2548
2549/// MailboxRouter routes messages to the sender that is bound to its
2550/// nearest prefix.
2551#[derive(Clone)]
2552pub struct MailboxRouter {
2553    entries: Arc<RwLock<BTreeMap<reference::Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2554}
2555
2556impl Default for MailboxRouter {
2557    fn default() -> Self {
2558        Self::new()
2559    }
2560}
2561
2562impl MailboxRouter {
2563    /// Create a new, empty router.
2564    pub fn new() -> Self {
2565        Self {
2566            entries: Arc::new(RwLock::new(BTreeMap::new())),
2567        }
2568    }
2569
2570    /// Downgrade this router to a [`WeakMailboxRouter`].
2571    pub fn downgrade(&self) -> WeakMailboxRouter {
2572        WeakMailboxRouter(Arc::downgrade(&self.entries))
2573    }
2574
2575    /// Returns a new router that will first attempt to find a route for the message
2576    /// in the router's table; otherwise post the message to the provided fallback
2577    /// sender.
2578    pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2579        FallbackMailboxRouter {
2580            router: self.clone(),
2581            default,
2582        }
2583    }
2584
2585    /// Bind the provided sender to the given reference. The destination
2586    /// is treated as a prefix to which messages can be routed, and
2587    /// messages are routed to their longest matching prefix.
2588    pub fn bind(&self, dest: reference::Reference, sender: impl MailboxSender + 'static) {
2589        let mut w = self.entries.write().unwrap();
2590        w.insert(dest, Arc::new(sender));
2591    }
2592
2593    fn sender(
2594        &self,
2595        actor_id: &reference::ActorId,
2596    ) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2597        match self
2598            .entries
2599            .read()
2600            .unwrap()
2601            .lower_bound(Excluded(&actor_id.clone().into()))
2602            .prev()
2603        {
2604            None => None,
2605            Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2606                Some(sender.clone())
2607            }
2608            Some(_) => None,
2609        }
2610    }
2611}
2612
2613#[async_trait]
2614impl MailboxSender for MailboxRouter {
2615    fn post_unchecked(
2616        &self,
2617        envelope: MessageEnvelope,
2618        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2619    ) {
2620        match self.sender(envelope.dest().actor_id()) {
2621            None => envelope.undeliverable(
2622                DeliveryError::Unroutable(
2623                    "no destination found for actor in routing table".to_string(),
2624                ),
2625                return_handle,
2626            ),
2627            Some(sender) => sender.post(envelope, return_handle),
2628        }
2629    }
2630
2631    async fn flush(&self) -> Result<(), anyhow::Error> {
2632        let senders: Vec<_> = self.entries.read().unwrap().values().cloned().collect();
2633        let futs: Vec<_> = senders.iter().map(|s| s.flush()).collect();
2634        futures::future::try_join_all(futs).await?;
2635        Ok(())
2636    }
2637}
2638
2639#[derive(Clone)]
2640struct FallbackMailboxRouter {
2641    router: MailboxRouter,
2642    default: BoxedMailboxSender,
2643}
2644
2645#[async_trait]
2646impl MailboxSender for FallbackMailboxRouter {
2647    fn post_unchecked(
2648        &self,
2649        envelope: MessageEnvelope,
2650        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2651    ) {
2652        match self.router.sender(envelope.dest().actor_id()) {
2653            Some(sender) => sender.post(envelope, return_handle),
2654            None => self.default.post(envelope, return_handle),
2655        }
2656    }
2657
2658    async fn flush(&self) -> Result<(), anyhow::Error> {
2659        let (r1, r2) = futures::future::join(self.router.flush(), self.default.flush()).await;
2660        r1?;
2661        r2?;
2662        Ok(())
2663    }
2664}
2665
2666/// A version of [`MailboxRouter`] that holds a weak reference to the underlying
2667/// state. This allows router references to be circular: an entity holding a reference
2668/// to the router may also contain the router itself.
2669///
2670/// TODO: this currently holds a weak reference to the entire router. This helps
2671/// prevent cycle leaks, but can cause excess memory usage as the cycle is at
2672/// the granularity of each entry. Possibly the router should allow weak references
2673/// on a per-entry basis.
2674#[derive(Debug, Clone)]
2675pub struct WeakMailboxRouter(
2676    Weak<RwLock<BTreeMap<reference::Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2677);
2678
2679impl WeakMailboxRouter {
2680    /// Upgrade the weak router to a strong reference router.
2681    pub fn upgrade(&self) -> Option<MailboxRouter> {
2682        self.0.upgrade().map(|entries| MailboxRouter { entries })
2683    }
2684}
2685
2686#[async_trait]
2687impl MailboxSender for WeakMailboxRouter {
2688    fn post_unchecked(
2689        &self,
2690        envelope: MessageEnvelope,
2691        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2692    ) {
2693        match self.upgrade() {
2694            Some(router) => router.post(envelope, return_handle),
2695            None => envelope.undeliverable(
2696                DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2697                return_handle,
2698            ),
2699        }
2700    }
2701
2702    async fn flush(&self) -> Result<(), anyhow::Error> {
2703        match self.upgrade() {
2704            Some(router) => router.flush().await,
2705            None => Ok(()),
2706        }
2707    }
2708}
2709
2710/// A dynamic mailbox router that supports remote delivery.
2711///
2712/// `DialMailboxRouter` maintains a runtime address book mapping
2713/// references to `ChannelAddr`s. It holds a cache of active
2714/// connections and forwards messages to the appropriate
2715/// `MailboxClient`.
2716///
2717/// If a message destination is not bound, but is a "direct mode" address
2718/// (i.e., its proc id contains the channel address through which the proc
2719/// is reachable), then DialMailboxRouter dials the proc directly.
2720///
2721/// Messages sent to unknown destinations are routed to the `default`
2722/// sender, if present.
2723#[derive(Clone)]
2724pub struct DialMailboxRouter {
2725    address_book: Arc<RwLock<BTreeMap<reference::Reference, ChannelAddr>>>,
2726    sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2727
2728    // The default sender, to which messages for unknown recipients
2729    // are sent. (This is like a default route in a routing table.)
2730    default: BoxedMailboxSender,
2731
2732    // When true, only dial direct-addressed procs if their transport
2733    // type is remote. Otherwise, fall back to the default sender.
2734    direct_addressed_remote_only: bool,
2735}
2736
2737impl Default for DialMailboxRouter {
2738    fn default() -> Self {
2739        Self::new()
2740    }
2741}
2742
2743impl DialMailboxRouter {
2744    /// Create a new [`DialMailboxRouter`] with an empty routing table.
2745    pub fn new() -> Self {
2746        Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2747    }
2748
2749    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2750    /// and a default sender. Any message with an unknown destination is
2751    /// dispatched on this default sender, unless the destination is
2752    /// direct-addressed, in which case it is dialed directly.
2753    pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2754        Self {
2755            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2756            sender_cache: Arc::new(DashMap::new()),
2757            default,
2758            direct_addressed_remote_only: false,
2759        }
2760    }
2761
2762    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2763    /// and a default sender. Any message with an unknown destination is
2764    /// dispatched on this default sender, unless the destination is
2765    /// direct-addressed *and* has a remote channel transport type.
2766    pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
2767        Self {
2768            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2769            sender_cache: Arc::new(DashMap::new()),
2770            default,
2771            direct_addressed_remote_only: true,
2772        }
2773    }
2774
2775    /// Binds a [`Reference`] to a [`ChannelAddr`], replacing any
2776    /// existing binding.
2777    ///
2778    /// If the address changes, the old sender is evicted from the
2779    /// cache to ensure fresh routing on next use.
2780    pub fn bind(&self, dest: reference::Reference, addr: ChannelAddr) {
2781        if let Ok(mut w) = self.address_book.write() {
2782            if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
2783                && old_addr != addr
2784            {
2785                tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2786                self.sender_cache.remove(&old_addr);
2787            }
2788        } else {
2789            tracing::error!("address book poisoned during bind of {:?}", dest);
2790        }
2791    }
2792
2793    /// Removes all address mappings with the given prefix from the
2794    /// router.
2795    ///
2796    /// Also evicts any corresponding cached senders to prevent reuse
2797    /// of stale connections.
2798    pub fn unbind(&self, dest: &reference::Reference) {
2799        if let Ok(mut w) = self.address_book.write() {
2800            let to_remove: Vec<(reference::Reference, ChannelAddr)> = w
2801                .range(dest..)
2802                .take_while(|(key, _)| dest.is_prefix_of(key))
2803                .map(|(key, addr)| (key.clone(), addr.clone()))
2804                .collect();
2805
2806            for (key, addr) in to_remove {
2807                tracing::info!("unbinding {:?} from {:?}", key, addr);
2808                w.remove(&key);
2809                self.sender_cache.remove(&addr);
2810            }
2811        } else {
2812            tracing::error!("address book poisoned during unbind of {:?}", dest);
2813        }
2814    }
2815
2816    /// Lookup an actor's channel in the router's address bok.
2817    pub fn lookup_addr(&self, actor_id: &reference::ActorId) -> Option<ChannelAddr> {
2818        let address_book = self.address_book.read().unwrap();
2819        let found = address_book
2820            .lower_bound(Excluded(&actor_id.clone().into()))
2821            .prev();
2822
2823        // First try to look up the address in our address book; failing that,
2824        // extract the address from the ProcId (all procs are direct-addressed now).
2825        if let Some((key, addr)) = found
2826            && key.is_prefix_of(&actor_id.clone().into())
2827        {
2828            Some(addr.clone())
2829        } else {
2830            let addr = actor_id.proc_id().addr().clone();
2831            if self.direct_addressed_remote_only {
2832                addr.transport().is_remote().then_some(addr)
2833            } else {
2834                Some(addr)
2835            }
2836        }
2837    }
2838
2839    /// Return all covering prefixes of this router. That is, all references that are not
2840    /// prefixed by another reference in the routing table
2841    pub fn prefixes(&self) -> BTreeSet<reference::Reference> {
2842        let addrs = self.address_book.read().unwrap();
2843        let mut prefixes: BTreeSet<reference::Reference> = BTreeSet::new();
2844        for (reference, _) in addrs.iter() {
2845            match prefixes.lower_bound(Excluded(reference)).peek_prev() {
2846                Some(candidate) if candidate.is_prefix_of(reference) => (),
2847                _ => {
2848                    prefixes.insert(reference.clone());
2849                }
2850            }
2851        }
2852
2853        prefixes
2854    }
2855
2856    fn dial(
2857        &self,
2858        addr: &ChannelAddr,
2859        actor_id: &reference::ActorId,
2860    ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2861        // Get the sender. Create it if needed. Do not send the
2862        // messages inside this block so we do not hold onto the
2863        // reference of the dashmap entries.
2864        match self.sender_cache.entry(addr.clone()) {
2865            Entry::Occupied(entry) => Ok(entry.get().clone()),
2866            Entry::Vacant(entry) => {
2867                let tx = channel::dial(addr.clone()).map_err(|err| {
2868                    MailboxSenderError::new_unbound_type(
2869                        actor_id.clone(),
2870                        MailboxSenderErrorKind::Channel(err),
2871                        "unknown",
2872                    )
2873                })?;
2874                let sender = MailboxClient::new(tx);
2875                Ok(entry.insert(Arc::new(sender)).value().clone())
2876            }
2877        }
2878    }
2879}
2880
2881#[async_trait]
2882impl MailboxSender for DialMailboxRouter {
2883    fn post_unchecked(
2884        &self,
2885        envelope: MessageEnvelope,
2886        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2887    ) {
2888        let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2889            self.default.post(envelope, return_handle);
2890            return;
2891        };
2892
2893        match self.dial(&addr, envelope.dest().actor_id()) {
2894            Err(err) => envelope.undeliverable(
2895                DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2896                return_handle,
2897            ),
2898            Ok(sender) => sender.post(envelope, return_handle),
2899        }
2900    }
2901
2902    async fn flush(&self) -> Result<(), anyhow::Error> {
2903        let senders: Vec<_> = self
2904            .sender_cache
2905            .iter()
2906            .map(|entry| entry.value().clone())
2907            .collect();
2908        let mut futs: Vec<_> = senders.iter().map(|s| s.flush()).collect();
2909        futs.push(self.default.flush());
2910        futures::future::try_join_all(futs).await?;
2911        Ok(())
2912    }
2913}
2914
2915/// A MailboxSender that reports any envelope as undeliverable due to
2916/// routing failure.
2917#[derive(Debug)]
2918pub struct UnroutableMailboxSender;
2919
2920#[async_trait]
2921impl MailboxSender for UnroutableMailboxSender {
2922    fn post_unchecked(
2923        &self,
2924        envelope: MessageEnvelope,
2925        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2926    ) {
2927        envelope.undeliverable(
2928            DeliveryError::Unroutable("destination not found in routing table".to_string()),
2929            return_handle,
2930        );
2931    }
2932}
2933
2934#[cfg(test)]
2935mod tests {
2936
2937    use std::assert_matches::assert_matches;
2938    use std::mem::drop;
2939    use std::str::FromStr;
2940    use std::sync::atomic::AtomicUsize;
2941    use std::time::Duration;
2942
2943    use timed_test::async_timed_test;
2944
2945    use super::*;
2946    use crate::Actor;
2947    use crate::ActorHandle;
2948    use crate::Instance;
2949    use crate::accum;
2950    use crate::accum::ReducerMode;
2951    use crate::channel::ChannelTransport;
2952    use crate::context::Mailbox as MailboxContext;
2953    use crate::proc::Proc;
2954    use crate::testing::ids::test_actor_id;
2955    use crate::testing::ids::test_port_id;
2956    use crate::testing::ids::test_proc_id;
2957
2958    fn test_proc_ref(name: &str) -> reference::Reference {
2959        reference::Reference::Proc(test_proc_id(name))
2960    }
2961
2962    fn test_actor_ref(proc_name: &str, actor_name: &str) -> reference::Reference {
2963        reference::Reference::Actor(test_actor_id(proc_name, actor_name))
2964    }
2965
2966    #[test]
2967    fn test_error() {
2968        use crate::testing::ids::test_actor_id_with_pid;
2969        let err = MailboxError::new(
2970            test_actor_id_with_pid("myworld_2", "myactor", 5),
2971            MailboxErrorKind::Closed,
2972        );
2973        // The format is: "{proc_id},{actor_name}[{pid}]: {error}"
2974        // proc_id = "{addr},{proc_name}" so overall: "{addr},{proc_name},{actor_name}[{pid}]"
2975        assert!(format!("{}", err).ends_with(",test_myworld_2,myactor[5]: mailbox closed"));
2976    }
2977
2978    #[tokio::test]
2979    async fn test_mailbox_basic() {
2980        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
2981        let (port, mut receiver) = mbox.open_port::<u64>();
2982        let port = port.bind();
2983
2984        mbox.serialize_and_send(&port, 123, monitored_return_handle())
2985            .unwrap();
2986        mbox.serialize_and_send(&port, 321, monitored_return_handle())
2987            .unwrap();
2988        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2989        assert_eq!(receiver.recv().await.unwrap(), 321u64);
2990
2991        let serialized = wirevalue::Any::serialize(&999u64).unwrap();
2992        mbox.post(
2993            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2994            monitored_return_handle(),
2995        );
2996        assert_eq!(receiver.recv().await.unwrap(), 999u64);
2997    }
2998
2999    #[tokio::test]
3000    async fn test_mailbox_accum() {
3001        let proc = Proc::local();
3002        let (client, _) = proc.instance("client").unwrap();
3003        let (port, mut receiver) = client
3004            .mailbox()
3005            .open_accum_port(accum::join_semilattice::<accum::Max<i64>>());
3006
3007        for i in -3..4 {
3008            port.send(&client, accum::Max(i)).unwrap();
3009            let received: accum::Max<i64> = receiver.recv().await.unwrap();
3010            let msg = received.get();
3011            assert_eq!(msg, &i);
3012        }
3013        // Send a smaller or same value. Should still receive the previous max.
3014        for i in -3..4 {
3015            port.send(&client, accum::Max(i)).unwrap();
3016            assert_eq!(receiver.recv().await.unwrap().get(), &3);
3017        }
3018        // send a larger value. Should receive the new max.
3019        port.send(&client, accum::Max(4)).unwrap();
3020        assert_eq!(receiver.recv().await.unwrap().get(), &4);
3021
3022        // Send multiple updates. Should only receive the final change.
3023        for i in 5..10 {
3024            port.send(&client, accum::Max(i)).unwrap();
3025        }
3026        assert_eq!(receiver.recv().await.unwrap().get(), &9);
3027        port.send(&client, accum::Max(1)).unwrap();
3028        port.send(&client, accum::Max(3)).unwrap();
3029        port.send(&client, accum::Max(2)).unwrap();
3030        assert_eq!(receiver.recv().await.unwrap().get(), &9);
3031    }
3032
3033    #[test]
3034    fn test_port_and_reducer() {
3035        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
3036        // accum port could have reducer typehash
3037        {
3038            let accumulator = accum::join_semilattice::<accum::Max<u64>>();
3039            let reducer_spec = accumulator.reducer_spec().unwrap();
3040            let (port, _) = mbox.open_accum_port(accum::join_semilattice::<accum::Max<u64>>());
3041            assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
3042            let port_ref = port.bind();
3043            assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
3044        }
3045        // normal port should not have reducer typehash
3046        {
3047            let (port, _) = mbox.open_port::<u64>();
3048            assert_eq!(port.reducer_spec, None);
3049            let port_ref = port.bind();
3050            assert_eq!(port_ref.reducer_spec(), &None);
3051        }
3052    }
3053
3054    #[tokio::test]
3055    #[ignore] // error behavior changed, but we will bring it back
3056    async fn test_mailbox_once() {
3057        let proc = Proc::local();
3058        let (client, _) = proc.instance("client").unwrap();
3059
3060        let (port, receiver) = client.open_once_port::<u64>();
3061
3062        // let port_id = port.port_id().clone();
3063
3064        port.send(&client, 123u64).unwrap();
3065        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3066
3067        // // The borrow checker won't let us send again on the port
3068        // // (good!), but we stashed the port-id and so we can try on the
3069        // // serialized interface.
3070        // let Err(err) = mbox
3071        //     .send_serialized(&port_id, &wirevalue::Any(Vec::new()))
3072        //     .await
3073        // else {
3074        //     unreachable!()
3075        // };
3076        // assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
3077    }
3078
3079    #[tokio::test]
3080    #[ignore] // changed error behavior
3081    async fn test_mailbox_receiver_drop() {
3082        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
3083        let (port, mut receiver) = mbox.open_port::<u64>();
3084        // Make sure we go through "remote" path.
3085        let port = port.bind();
3086        mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
3087            .unwrap();
3088        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3089        drop(receiver);
3090        let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
3091            panic!();
3092        };
3093
3094        assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
3095        assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
3096    }
3097
3098    #[tokio::test]
3099    async fn test_drain() {
3100        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
3101
3102        let (port, mut receiver) = mbox.open_port();
3103        let port = port.bind();
3104
3105        for i in 0..10 {
3106            mbox.serialize_and_send(&port, i, monitored_return_handle())
3107                .unwrap();
3108        }
3109
3110        for i in 0..10 {
3111            assert_eq!(receiver.recv().await.unwrap(), i);
3112        }
3113
3114        assert!(receiver.drain().is_empty());
3115    }
3116
3117    #[tokio::test]
3118    async fn test_mailbox_muxer() {
3119        let muxer = MailboxMuxer::new();
3120
3121        let mbox0 = Mailbox::new_detached(test_actor_id("0", "actor1"));
3122        let mbox1 = Mailbox::new_detached(test_actor_id("0", "actor2"));
3123
3124        muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
3125        muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
3126
3127        let (port, receiver) = mbox0.open_once_port::<u64>();
3128
3129        let proc = Proc::configured(test_proc_id("0"), BoxedMailboxSender::new(muxer));
3130        let (client, _) = proc.instance("client").unwrap();
3131
3132        port.send(&client, 123u64).unwrap();
3133        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3134
3135        /*
3136        let (tx, rx) = channel::local::new::<u64>();
3137        let (port, _) = mbox0.open_port::<u64>();
3138        let handle = muxer.clone().serve_port(port, rx).unwrap();
3139        muxer.unbind(mbox0.actor_id());
3140        tx.send(123u64).await.unwrap();
3141        let Ok(Err(err)) = handle.await else { panic!() };
3142        assert_eq!(err.actor_id(), &actor_id(0));
3143        */
3144    }
3145
3146    #[tokio::test]
3147    async fn test_local_client_server() {
3148        let mbox = Mailbox::new_detached(test_actor_id("0", "actor0"));
3149        let (tx, rx) = channel::local::new();
3150        let serve_handle = mbox.clone().serve(rx);
3151        let client = MailboxClient::new(tx);
3152
3153        let (port, receiver) = mbox.open_once_port::<u64>();
3154        let port = port.bind();
3155
3156        client
3157            .serialize_and_send_once(port, 123u64, monitored_return_handle())
3158            .unwrap();
3159        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3160        serve_handle.stop("fromt test");
3161        serve_handle.await.unwrap().unwrap();
3162    }
3163
3164    #[tokio::test]
3165    async fn test_mailbox_router() {
3166        let mbox0 = Mailbox::new_detached(test_actor_id("world0_0", "actor0"));
3167        let mbox1 = Mailbox::new_detached(test_actor_id("world1_0", "actor0"));
3168        let mbox2 = Mailbox::new_detached(test_actor_id("world1_1", "actor0"));
3169        let mbox3 = Mailbox::new_detached(test_actor_id("world1_1", "actor1"));
3170
3171        let comms: Vec<(reference::OncePortRef<u64>, OncePortReceiver<u64>)> =
3172            [&mbox0, &mbox1, &mbox2, &mbox3]
3173                .into_iter()
3174                .map(|mbox| {
3175                    let (port, receiver) = mbox.open_once_port::<u64>();
3176                    (port.bind(), receiver)
3177                })
3178                .collect();
3179
3180        let router = MailboxRouter::new();
3181
3182        router.bind(test_proc_id("world0_0").into(), mbox0);
3183        router.bind(test_proc_id("world1_0").into(), mbox1);
3184        router.bind(test_proc_id("world1_1").into(), mbox2);
3185        router.bind(test_actor_id("world1_1", "actor1").into(), mbox3);
3186
3187        for (i, (port, receiver)) in comms.into_iter().enumerate() {
3188            router
3189                .serialize_and_send_once(port, i as u64, monitored_return_handle())
3190                .unwrap();
3191            assert_eq!(receiver.recv().await.unwrap(), i as u64);
3192        }
3193
3194        // Test undeliverable messages, and that it is delivered with the appropriate fallback.
3195
3196        let mbox4 = Mailbox::new_detached(test_actor_id("fallback_0", "actor"));
3197
3198        let (return_handle, mut return_receiver) =
3199            crate::mailbox::undeliverable::new_undeliverable_port();
3200        let (port, _receiver) = mbox4.open_once_port();
3201        router
3202            .serialize_and_send_once(port.bind(), 0, return_handle.clone())
3203            .unwrap();
3204        assert!(return_receiver.recv().await.is_ok());
3205
3206        let router = router.fallback(mbox4.clone().into_boxed());
3207        let (port, receiver) = mbox4.open_once_port();
3208        router
3209            .serialize_and_send_once(port.bind(), 0, return_handle)
3210            .unwrap();
3211        assert_eq!(receiver.recv().await.unwrap(), 0);
3212    }
3213
3214    #[tokio::test]
3215    async fn test_dial_mailbox_router() {
3216        let router = DialMailboxRouter::new();
3217
3218        router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
3219        router.bind(test_proc_ref("world1_0"), "unix!@2".parse().unwrap());
3220        router.bind(test_proc_ref("world1_1"), "unix!@3".parse().unwrap());
3221        router.bind(
3222            test_actor_ref("world1_1", "actor1"),
3223            "unix!@4".parse().unwrap(),
3224        );
3225        // Bind a direct address -- we should use its bound address!
3226        router.bind(
3227            "unix:@4,my_proc,my_actor".parse().unwrap(),
3228            "unix:@5".parse().unwrap(),
3229        );
3230
3231        // We should be able to lookup the ids
3232        router
3233            .lookup_addr(&test_actor_id("world0_0", "actor"))
3234            .unwrap();
3235        router
3236            .lookup_addr(&test_actor_id("world1_0", "actor"))
3237            .unwrap();
3238
3239        let actor_id = reference::Reference::from_str("unix:@4,my_proc,my_actor")
3240            .unwrap()
3241            .into_actor()
3242            .unwrap();
3243        assert_eq!(
3244            router.lookup_addr(&actor_id).unwrap(),
3245            "unix!@5".parse().unwrap(),
3246        );
3247        router.unbind(&actor_id.clone().into());
3248        assert_eq!(
3249            router.lookup_addr(&actor_id).unwrap(),
3250            "unix!@4".parse().unwrap(),
3251        );
3252
3253        // Unbind procs so lookups fall back to the proc's direct address
3254        // (all procs are direct-addressed now, so lookup_addr always returns
3255        // Some; we verify the bound address is gone by checking the returned
3256        // address is the local fallback, not the originally bound one).
3257        let fallback = ChannelAddr::any(ChannelTransport::Local);
3258        router.unbind(&test_proc_ref("world1_0"));
3259        router.unbind(&test_proc_ref("world1_1"));
3260        assert_eq!(
3261            router
3262                .lookup_addr(&test_actor_id("world1_0", "actor1"))
3263                .unwrap(),
3264            fallback,
3265        );
3266        assert_eq!(
3267            router
3268                .lookup_addr(&test_actor_id("world1_1", "actor1"))
3269                .unwrap(),
3270            fallback,
3271        );
3272        router
3273            .lookup_addr(&test_actor_id("world0_0", "actor"))
3274            .unwrap();
3275        router.unbind(&test_proc_ref("world0_0"));
3276        assert_eq!(
3277            router
3278                .lookup_addr(&test_actor_id("world0_0", "actor"))
3279                .unwrap(),
3280            fallback,
3281        );
3282    }
3283
3284    #[tokio::test]
3285    #[ignore] // TODO: there's a leak here, fix it
3286    async fn test_dial_mailbox_router_default() {
3287        let mbox0 = Mailbox::new_detached(test_actor_id("world0_0", "actor0"));
3288        let mbox1 = Mailbox::new_detached(test_actor_id("world1_0", "actor0"));
3289        let mbox2 = Mailbox::new_detached(test_actor_id("world1_1", "actor0"));
3290        let mbox3 = Mailbox::new_detached(test_actor_id("world1_1", "actor1"));
3291
3292        // We don't need to dial here, since we gain direct access to the
3293        // underlying routers.
3294        let root = MailboxRouter::new();
3295        let world0_router = DialMailboxRouter::new_with_default(root.boxed());
3296        let world1_router = DialMailboxRouter::new_with_default(root.boxed());
3297
3298        root.bind(test_proc_ref("world0"), world0_router.clone());
3299        root.bind(test_proc_ref("world1"), world1_router.clone());
3300
3301        let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
3302
3303        let mut handles = Vec::new(); // hold on to handles, or channels get closed
3304        for mbox in mailboxes.iter() {
3305            let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
3306            let handle = (*mbox).clone().serve(rx);
3307            handles.push(handle);
3308
3309            eprintln!("{}: {}", mbox.actor_id(), addr);
3310            if mbox.actor_id().proc_id().name().starts_with("world0") {
3311                world0_router.bind(mbox.actor_id().clone().into(), addr);
3312            } else {
3313                world1_router.bind(mbox.actor_id().clone().into(), addr);
3314            }
3315        }
3316
3317        // Make sure nodes are fully connected.
3318        for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
3319            for mbox in mailboxes.iter() {
3320                let (port, receiver) = mbox.open_once_port::<u64>();
3321                let port = port.bind();
3322                router
3323                    .serialize_and_send_once(port, 123u64, monitored_return_handle())
3324                    .unwrap();
3325                assert_eq!(receiver.recv().await.unwrap(), 123u64);
3326            }
3327        }
3328    }
3329
3330    #[tokio::test]
3331    async fn test_enqueue_port() {
3332        let proc = Proc::local();
3333        let (client, _) = proc.instance("client").unwrap();
3334
3335        let count = Arc::new(AtomicUsize::new(0));
3336        let count_clone = count.clone();
3337        let port = client.mailbox().open_enqueue_port(move |_, n| {
3338            count_clone.fetch_add(n, Ordering::SeqCst);
3339            Ok(())
3340        });
3341
3342        port.send(&client, 10).unwrap();
3343        port.send(&client, 5).unwrap();
3344        port.send(&client, 1).unwrap();
3345        port.send(&client, 0).unwrap();
3346
3347        assert_eq!(count.load(Ordering::SeqCst), 16);
3348    }
3349
3350    #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3351    struct TestMessage;
3352
3353    #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3354    #[named(name = "some::custom::path")]
3355    struct TestMessage2;
3356
3357    #[test]
3358    fn test_remote_message_macros() {
3359        assert_eq!(
3360            TestMessage::typename(),
3361            "hyperactor::mailbox::tests::TestMessage"
3362        );
3363        assert_eq!(TestMessage2::typename(), "some::custom::path");
3364    }
3365
3366    #[test]
3367    fn test_message_envelope_display() {
3368        #[derive(typeuri::Named, Serialize, Deserialize)]
3369        struct MyTest {
3370            a: u64,
3371            b: String,
3372        }
3373        wirevalue::register_type!(MyTest);
3374
3375        let envelope = MessageEnvelope::serialize(
3376            test_actor_id("source_0", "actor"),
3377            test_port_id("dest_1", "actor", 123),
3378            &MyTest {
3379                a: 123,
3380                b: "hello".into(),
3381            },
3382            Flattrs::new(),
3383        )
3384        .unwrap();
3385
3386        // Note: display format changed from "source[0].actor" to direct format
3387        assert!(format!("{}", envelope).contains("MyTest{\"a\":123,\"b\":\"hello\"}"));
3388    }
3389
3390    #[derive(Debug, Default)]
3391    struct Foo;
3392
3393    impl Actor for Foo {}
3394
3395    // Test that a message delivery failure causes the sending actor
3396    // to stop running.
3397    #[tokio::test]
3398    async fn test_actor_delivery_failure() {
3399        // This test involves making an actor fail and so we must set
3400        // a supervision coordinator.
3401        use crate::actor::ActorStatus;
3402        use crate::testing::proc_supervison::ProcSupervisionCoordinator;
3403
3404        let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
3405            BOXED_PANICKING_MAILBOX_SENDER.clone(),
3406        ));
3407        let proc_id = test_proc_id("quux_0");
3408        let mut proc = Proc::configured(proc_id.clone(), proc_forwarder);
3409        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3410        let (client, _) = proc.instance("client").unwrap();
3411
3412        let foo = proc.spawn("foo", Foo).unwrap();
3413        let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
3414        let message = MessageEnvelope::new(
3415            foo.actor_id().clone(),
3416            test_port_id("corge_0", "bar", 9999),
3417            wirevalue::Any::serialize(&1u64).unwrap(),
3418            Flattrs::new(),
3419        );
3420        return_handle.send(&client, Undeliverable(message)).unwrap();
3421
3422        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
3423
3424        let foo_status = foo.status();
3425        assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
3426        let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
3427            unreachable!()
3428        };
3429        let msg_str = msg.to_string();
3430        assert!(msg_str.contains("undeliverable message error"));
3431        // Updated assertions for direct format
3432        assert!(msg_str.contains("sender:") && msg_str.contains("quux_0"));
3433        assert!(msg_str.contains("dest:") && msg_str.contains("corge_0"));
3434
3435        proc.destroy_and_wait::<()>(tokio::time::Duration::from_secs(1), None, "test cleanup")
3436            .await
3437            .unwrap();
3438    }
3439
3440    #[tokio::test]
3441    async fn test_detached_return_handle() {
3442        let (return_handle, mut return_receiver) =
3443            crate::mailbox::undeliverable::new_undeliverable_port();
3444        // Simulate an undelivered message return.
3445        let envelope = MessageEnvelope::new(
3446            test_actor_id("foo_0", "bar"),
3447            test_port_id("baz_0", "corge", 9999),
3448            wirevalue::Any::serialize(&1u64).unwrap(),
3449            Flattrs::new(),
3450        );
3451        let proc = Proc::local();
3452        let (client, _) = proc.instance("client").unwrap();
3453        return_handle
3454            .send(&client, Undeliverable(envelope.clone()))
3455            .unwrap();
3456        // Check we receive the undelivered message.
3457        assert!(
3458            tokio::time::timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
3459                .await
3460                .is_ok()
3461        );
3462        // Setup a monitor for the receiver and show that if there are
3463        // no outstanding return handles it terminates.
3464        let monitor_handle = tokio::spawn(async move {
3465            while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
3466                envelope.set_error(DeliveryError::BrokenLink(
3467                    "returned in unit test".to_string(),
3468                ));
3469                UndeliverableMailboxSender
3470                    .post(envelope, /*unused */ monitored_return_handle());
3471            }
3472        });
3473        drop(return_handle);
3474        assert!(
3475            tokio::time::timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3476                .await
3477                .is_ok()
3478        );
3479    }
3480
3481    async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3482        fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3483            // Create dummy state and port_id to create PortReceiver. They are
3484            // not used in the test.
3485            let dummy_actor_id = test_actor_id("world_0", "actor");
3486            let dummy_state = State::new(
3487                dummy_actor_id.clone(),
3488                BOXED_PANICKING_MAILBOX_SENDER.clone(),
3489            );
3490            let dummy_port_id = reference::PortId::new(dummy_actor_id, 0);
3491            let (sender, receiver) = mpsc::unbounded_channel::<M>();
3492            let receiver = PortReceiver {
3493                receiver,
3494                port_id: dummy_port_id,
3495                coalesce,
3496                mailbox: Mailbox {
3497                    inner: Arc::new(dummy_state),
3498                },
3499            };
3500            (sender, receiver)
3501        }
3502
3503        // verify fn drain
3504        {
3505            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3506            assert!(receiver.drain().is_empty());
3507
3508            sender.send(0).unwrap();
3509            sender.send(1).unwrap();
3510            sender.send(2).unwrap();
3511            sender.send(3).unwrap();
3512            sender.send(4).unwrap();
3513            sender.send(5).unwrap();
3514            sender.send(6).unwrap();
3515            sender.send(7).unwrap();
3516
3517            if drop_sender {
3518                drop(sender);
3519            }
3520
3521            if !coalesce {
3522                assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3523            } else {
3524                assert_eq!(receiver.drain(), vec![7]);
3525            }
3526
3527            assert!(receiver.drain().is_empty());
3528            assert!(receiver.drain().is_empty());
3529        }
3530
3531        // verify fn try_recv
3532        {
3533            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3534            assert!(receiver.try_recv().unwrap().is_none());
3535
3536            sender.send(0).unwrap();
3537            sender.send(1).unwrap();
3538            sender.send(2).unwrap();
3539            sender.send(3).unwrap();
3540
3541            if drop_sender {
3542                drop(sender);
3543            }
3544
3545            if !coalesce {
3546                assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3547                assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3548                assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3549            }
3550            assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3551            if drop_sender {
3552                assert_matches!(
3553                    receiver.try_recv().unwrap_err().kind(),
3554                    MailboxErrorKind::Closed
3555                );
3556                // Still Closed error
3557                assert_matches!(
3558                    receiver.try_recv().unwrap_err().kind(),
3559                    MailboxErrorKind::Closed
3560                );
3561            } else {
3562                assert!(receiver.try_recv().unwrap().is_none());
3563                // Still empty
3564                assert!(receiver.try_recv().unwrap().is_none());
3565            }
3566        }
3567        // verify fn recv
3568        {
3569            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3570            assert!(
3571                tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3572                    .await
3573                    .is_err()
3574            );
3575
3576            sender.send(4).unwrap();
3577            sender.send(5).unwrap();
3578            sender.send(6).unwrap();
3579            sender.send(7).unwrap();
3580
3581            if drop_sender {
3582                drop(sender);
3583            }
3584
3585            if !coalesce {
3586                assert_eq!(receiver.recv().await.unwrap(), 4);
3587                assert_eq!(receiver.recv().await.unwrap(), 5);
3588                assert_eq!(receiver.recv().await.unwrap(), 6);
3589            }
3590            assert_eq!(receiver.recv().await.unwrap(), 7);
3591            if drop_sender {
3592                assert_matches!(
3593                    receiver.recv().await.unwrap_err().kind(),
3594                    MailboxErrorKind::Closed
3595                );
3596                // Still None
3597                assert_matches!(
3598                    receiver.recv().await.unwrap_err().kind(),
3599                    MailboxErrorKind::Closed
3600                );
3601            } else {
3602                assert!(
3603                    tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3604                        .await
3605                        .is_err()
3606                );
3607            }
3608        }
3609    }
3610
3611    #[tokio::test]
3612    async fn test_receiver_basic_default() {
3613        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ false).await
3614    }
3615
3616    #[tokio::test]
3617    async fn test_receiver_basic_latest() {
3618        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ false).await
3619    }
3620
3621    #[tokio::test]
3622    async fn test_receiver_after_sender_drop_default() {
3623        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ true).await
3624    }
3625
3626    #[tokio::test]
3627    async fn test_receiver_after_sender_drop_latest() {
3628        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ true).await
3629    }
3630
3631    struct Setup {
3632        receiver: PortReceiver<u64>,
3633        actor0: Instance<()>,
3634        actor1: Instance<()>,
3635        _actor0_handle: ActorHandle<()>,
3636        _actor1_handle: ActorHandle<()>,
3637        port_id: reference::PortId,
3638        port_id1: reference::PortId,
3639        port_id2: reference::PortId,
3640        port_id2_1: reference::PortId,
3641    }
3642
3643    async fn setup_split_port_ids(
3644        reducer_spec: Option<ReducerSpec>,
3645        reducer_mode: ReducerMode,
3646    ) -> Setup {
3647        let proc = Proc::local();
3648        let (actor0, actor0_handle) = proc.instance("actor0").unwrap();
3649        let (actor1, actor1_handle) = proc.instance("actor1").unwrap();
3650
3651        // Open a port on actor0
3652        let (port_handle, receiver) = actor0.open_port::<u64>();
3653        let port_id = port_handle.bind().port_id().clone();
3654
3655        // Split it twice on actor1
3656        let port_id1 = port_id
3657            .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
3658            .unwrap();
3659        let port_id2 = port_id
3660            .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
3661            .unwrap();
3662
3663        // A split port id can also be split
3664        let port_id2_1 = port_id2
3665            .split(&actor1, reducer_spec, reducer_mode.clone(), true)
3666            .unwrap();
3667
3668        Setup {
3669            receiver,
3670            actor0,
3671            actor1,
3672            _actor0_handle: actor0_handle,
3673            _actor1_handle: actor1_handle,
3674            port_id,
3675            port_id1,
3676            port_id2,
3677            port_id2_1,
3678        }
3679    }
3680
3681    fn post(cx: &impl context::Actor, port_id: reference::PortId, msg: u64) {
3682        let serialized = wirevalue::Any::serialize(&msg).unwrap();
3683        port_id.send(cx, serialized);
3684    }
3685
3686    #[async_timed_test(timeout_secs = 30)]
3687    // TODO: OSS: this test is flaky in OSS. Need to repo and fix it.
3688    #[cfg_attr(not(fbcode_build), ignore)]
3689    async fn test_split_port_id_no_reducer() {
3690        let Setup {
3691            mut receiver,
3692            actor0,
3693            actor1,
3694            port_id,
3695            port_id1,
3696            port_id2,
3697            port_id2_1,
3698            ..
3699        } = setup_split_port_ids(None, ReducerMode::default()).await;
3700        // Can send messages to receiver from all port handles
3701        post(&actor0, port_id.clone(), 1);
3702        assert_eq!(receiver.recv().await.unwrap(), 1);
3703        post(&actor1, port_id1.clone(), 2);
3704        assert_eq!(receiver.recv().await.unwrap(), 2);
3705        post(&actor1, port_id2.clone(), 3);
3706        assert_eq!(receiver.recv().await.unwrap(), 3);
3707        post(&actor1, port_id2_1.clone(), 4);
3708        assert_eq!(receiver.recv().await.unwrap(), 4);
3709
3710        // no more messages
3711        tokio::time::sleep(Duration::from_secs(2)).await;
3712        let msg = receiver.try_recv().unwrap();
3713        assert_eq!(msg, None);
3714    }
3715
3716    async fn wait_for(
3717        receiver: &mut PortReceiver<u64>,
3718        expected_size: usize,
3719        timeout_duration: Duration,
3720    ) -> anyhow::Result<Vec<u64>> {
3721        let mut messeges = vec![];
3722
3723        tokio::time::timeout(timeout_duration, async {
3724            loop {
3725                let msg = receiver.recv().await.unwrap();
3726                messeges.push(msg);
3727                if messeges.len() == expected_size {
3728                    break;
3729                }
3730            }
3731        })
3732        .await?;
3733        Ok(messeges)
3734    }
3735
3736    #[async_timed_test(timeout_secs = 30)]
3737    async fn test_split_port_id_sum_reducer() {
3738        let config = hyperactor_config::global::lock();
3739        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3740
3741        let sum_accumulator = accum::sum::<u64>();
3742        let reducer_spec = sum_accumulator.reducer_spec();
3743        let Setup {
3744            mut receiver,
3745            actor0,
3746            actor1,
3747            port_id,
3748            port_id1,
3749            port_id2,
3750            port_id2_1,
3751            ..
3752        } = setup_split_port_ids(reducer_spec, ReducerMode::default()).await;
3753        post(&actor0, port_id.clone(), 4);
3754        post(&actor1, port_id1.clone(), 2);
3755        post(&actor1, port_id2.clone(), 3);
3756        post(&actor1, port_id2_1.clone(), 1);
3757        let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3758            .await
3759            .unwrap();
3760        // Message might be received out of their sending out. So we sort the
3761        // messages here.
3762        messages.sort();
3763        assert_eq!(messages, vec![1, 2, 3, 4]);
3764
3765        // no more messages
3766        tokio::time::sleep(Duration::from_secs(2)).await;
3767        let msg = receiver.try_recv().unwrap();
3768        assert_eq!(msg, None);
3769    }
3770
3771    #[async_timed_test(timeout_secs = 30)]
3772    // TODO: OSS: this test is flaky in OSS. Need to repo and fix it.
3773    #[cfg_attr(not(fbcode_build), ignore)]
3774    async fn test_split_port_id_every_n_messages() {
3775        let config = hyperactor_config::global::lock();
3776        let _config_guard =
3777            config.override_key(crate::config::SPLIT_MAX_BUFFER_AGE, Duration::from_mins(10));
3778        let proc = Proc::local();
3779        let (actor, _actor_handle) = proc.instance("actor").unwrap();
3780        let (port_handle, mut receiver) = actor.open_port::<u64>();
3781        let port_id = port_handle.bind().port_id().clone();
3782        // Split it
3783        let reducer_spec = accum::sum::<u64>().reducer_spec();
3784        let split_port_id = port_id
3785            .split(
3786                &actor,
3787                reducer_spec,
3788                ReducerMode::Streaming(accum::StreamingReducerOpts {
3789                    max_update_interval: Some(Duration::from_mins(10)),
3790                    initial_update_interval: Some(Duration::from_mins(10)),
3791                }),
3792                true,
3793            )
3794            .unwrap();
3795
3796        // Send 9 messages.
3797        for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3798            post(&actor, split_port_id.clone(), msg);
3799        }
3800        // The first 5 should be batched and reduced once due
3801        // to every_n_msgs = 5.
3802        let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3803            .await
3804            .unwrap();
3805        assert_eq!(messages, vec![15]);
3806
3807        // the last message unfortranately will never come because they do not
3808        // reach batch size.
3809        tokio::time::sleep(Duration::from_secs(2)).await;
3810        let msg = receiver.try_recv().unwrap();
3811        assert_eq!(msg, None);
3812    }
3813
3814    #[async_timed_test(timeout_secs = 30)]
3815    async fn test_split_port_timeout_flush() {
3816        let config = hyperactor_config::global::lock();
3817        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
3818
3819        let Setup {
3820            mut receiver,
3821            actor0: _,
3822            actor1,
3823            port_id: _,
3824            port_id1,
3825            port_id2: _,
3826            port_id2_1: _,
3827            ..
3828        } = setup_split_port_ids(
3829            Some(accum::sum::<u64>().reducer_spec().unwrap()),
3830            ReducerMode::Streaming(accum::StreamingReducerOpts {
3831                max_update_interval: Some(Duration::from_millis(50)),
3832                initial_update_interval: Some(Duration::from_millis(50)),
3833            }),
3834        )
3835        .await;
3836
3837        post(&actor1, port_id1.clone(), 10);
3838        post(&actor1, port_id1.clone(), 20);
3839        post(&actor1, port_id1.clone(), 30);
3840
3841        // Messages should accumulate for 50ms.
3842        tokio::time::sleep(Duration::from_millis(10)).await;
3843        let msg = receiver.try_recv().unwrap();
3844        assert_eq!(msg, None);
3845
3846        // Wait until we are flushed.
3847        tokio::time::sleep(Duration::from_millis(100)).await;
3848
3849        // Now we are reduced and accumulated:
3850        let msg = receiver.recv().await.unwrap();
3851        assert_eq!(msg, 60); // 10 + 20 + 30
3852
3853        // No further messages:
3854        let msg = receiver.try_recv().unwrap();
3855        assert_eq!(msg, None);
3856    }
3857
3858    #[async_timed_test(timeout_secs = 30)]
3859    async fn test_split_port_timeout_and_size_flush() {
3860        let config = hyperactor_config::global::lock();
3861        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
3862
3863        let Setup {
3864            mut receiver,
3865            actor0: _,
3866            actor1,
3867            port_id: _,
3868            port_id1,
3869            port_id2: _,
3870            port_id2_1: _,
3871            ..
3872        } = setup_split_port_ids(
3873            Some(accum::sum::<u64>().reducer_spec().unwrap()),
3874            ReducerMode::Streaming(accum::StreamingReducerOpts {
3875                max_update_interval: Some(Duration::from_millis(50)),
3876                initial_update_interval: Some(Duration::from_millis(50)),
3877            }),
3878        )
3879        .await;
3880
3881        post(&actor1, port_id1.clone(), 10);
3882        post(&actor1, port_id1.clone(), 20);
3883        post(&actor1, port_id1.clone(), 30);
3884        post(&actor1, port_id1.clone(), 40);
3885
3886        // Should have flushed at the third message.
3887        let msg = receiver.recv().await.unwrap();
3888        assert_eq!(msg, 60);
3889
3890        // After 50ms, the next reduce will flush:
3891        let msg = receiver.recv().await.unwrap();
3892        assert_eq!(msg, 40);
3893
3894        // No further messages
3895        let msg = receiver.try_recv().unwrap();
3896        assert_eq!(msg, None);
3897    }
3898
3899    #[async_timed_test(timeout_secs = 30)]
3900    async fn test_split_port_once_mode_basic() {
3901        let proc = Proc::local();
3902        let (actor, _actor_handle) = proc.instance("actor").unwrap();
3903        let (port_handle, mut receiver) = actor.open_port::<u64>();
3904        let port_id = port_handle.bind().port_id().clone();
3905
3906        // Split with Once(3) mode - accumulate 3 values then emit
3907        let reducer_spec = accum::sum::<u64>().reducer_spec();
3908        let split_port_id = port_id
3909            .split(&actor, reducer_spec, ReducerMode::Once(3), true)
3910            .unwrap();
3911
3912        // Send 3 messages
3913        post(&actor, split_port_id.clone(), 10);
3914        post(&actor, split_port_id.clone(), 20);
3915        post(&actor, split_port_id.clone(), 30);
3916
3917        // Should receive a single reduced message
3918        let msg = receiver.recv().await.unwrap();
3919        assert_eq!(msg, 60); // 10 + 20 + 30
3920
3921        // No further messages
3922        tokio::time::sleep(Duration::from_millis(100)).await;
3923        let msg = receiver.try_recv().unwrap();
3924        assert_eq!(msg, None);
3925    }
3926
3927    #[async_timed_test(timeout_secs = 30)]
3928    async fn test_split_port_once_mode_teardown() {
3929        let proc = Proc::local();
3930        let (actor, _actor_handle) = proc.instance("actor").unwrap();
3931        let (port_handle, mut receiver) = actor.open_port::<u64>();
3932        let port_id = port_handle.bind().port_id().clone();
3933
3934        // Set up an undeliverable receiver to capture messages sent to torn-down ports
3935        let (undeliverable_handle, mut undeliverable_receiver) =
3936            undeliverable::new_undeliverable_port();
3937
3938        // Split with Once(3) mode - accumulate 3 values then emit and tear down
3939        let reducer_spec = accum::sum::<u64>().reducer_spec();
3940        let split_port_id = port_id
3941            .split(&actor, reducer_spec, ReducerMode::Once(3), true)
3942            .unwrap();
3943
3944        // Send 3 messages to trigger reduction
3945        post(&actor, split_port_id.clone(), 10);
3946        post(&actor, split_port_id.clone(), 20);
3947        post(&actor, split_port_id.clone(), 30);
3948
3949        // Should receive a single reduced message
3950        let msg = receiver.recv().await.unwrap();
3951        assert_eq!(msg, 60); // 10 + 20 + 30
3952
3953        // Now send another message - it should fail because the port is torn down
3954        let serialized = wirevalue::Any::serialize(&100u64).unwrap();
3955        let envelope = MessageEnvelope::new(
3956            actor.mailbox().actor_id().clone(),
3957            split_port_id.clone(),
3958            serialized,
3959            Flattrs::new(),
3960        );
3961        actor.mailbox().post(envelope, undeliverable_handle);
3962
3963        // Verify the message was returned as undeliverable
3964        let undeliverable =
3965            tokio::time::timeout(Duration::from_secs(2), undeliverable_receiver.recv())
3966                .await
3967                .expect("should receive undeliverable message")
3968                .expect("receiver should not be closed");
3969
3970        // Verify the undeliverable message has the correct destination
3971        assert_eq!(undeliverable.0.dest(), &split_port_id);
3972
3973        // Verify no additional messages arrived at the original receiver
3974        let msg = receiver.try_recv().unwrap();
3975        assert_eq!(msg, None);
3976    }
3977
3978    #[test]
3979    fn test_dial_mailbox_router_prefixes_empty() {
3980        assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
3981    }
3982
3983    #[test]
3984    fn test_dial_mailbox_router_prefixes_single_entry() {
3985        let router = DialMailboxRouter::new();
3986        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3987
3988        let prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3989        assert_eq!(prefixes.len(), 1);
3990        assert_eq!(prefixes[0], test_proc_ref("world0"));
3991    }
3992
3993    #[test]
3994    fn test_dial_mailbox_router_prefixes_no_overlap() {
3995        let router = DialMailboxRouter::new();
3996        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3997        router.bind(test_proc_ref("world1"), "unix!@2".parse().unwrap());
3998        router.bind(test_proc_ref("world2"), "unix!@3".parse().unwrap());
3999
4000        let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
4001        prefixes.sort();
4002
4003        let mut expected = vec![
4004            test_proc_ref("world0"),
4005            test_proc_ref("world1"),
4006            test_proc_ref("world2"),
4007        ];
4008        expected.sort();
4009
4010        assert_eq!(prefixes, expected);
4011    }
4012
4013    #[test]
4014    fn test_dial_mailbox_router_prefixes_with_overlaps() {
4015        let router = DialMailboxRouter::new();
4016        // Proc refs are all independent since they have different names.
4017        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4018        router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
4019        router.bind(test_proc_ref("world0_1"), "unix!@3".parse().unwrap());
4020        router.bind(test_proc_ref("world1"), "unix!@4".parse().unwrap());
4021        router.bind(test_proc_ref("world1_0"), "unix!@5".parse().unwrap());
4022
4023        let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
4024        prefixes.sort();
4025
4026        let mut expected = vec![
4027            test_proc_ref("world0"),
4028            test_proc_ref("world0_0"),
4029            test_proc_ref("world0_1"),
4030            test_proc_ref("world1"),
4031            test_proc_ref("world1_0"),
4032        ];
4033        expected.sort();
4034
4035        assert_eq!(prefixes, expected);
4036    }
4037
4038    #[test]
4039    fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
4040        let router = DialMailboxRouter::new();
4041        // Proc refs still cover their own actors (same proc_id).
4042        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4043        router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
4044        router.bind(
4045            test_actor_ref("world0_0", "actor1"),
4046            "unix!@3".parse().unwrap(),
4047        );
4048        router.bind(test_proc_ref("world1_0"), "unix!@4".parse().unwrap());
4049        router.bind(test_proc_ref("world1_1"), "unix!@5".parse().unwrap());
4050        router.bind(
4051            test_actor_ref("world2_0", "actor0"),
4052            "unix!@6".parse().unwrap(),
4053        );
4054
4055        let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
4056        prefixes.sort();
4057
4058        // Covering prefixes:
4059        // - world0 (independent proc ref)
4060        // - world0_0 (covers world0_0.actor1 since same proc_id)
4061        // - world1_0 (not covered by anything else)
4062        // - world1_1 (not covered by anything else)
4063        // - world2_0.actor0 (not covered by anything else)
4064        let mut expected = vec![
4065            test_proc_ref("world0"),
4066            test_proc_ref("world0_0"),
4067            test_proc_ref("world1_0"),
4068            test_proc_ref("world1_1"),
4069            test_actor_ref("world2_0", "actor0"),
4070        ];
4071        expected.sort();
4072
4073        assert_eq!(prefixes, expected);
4074    }
4075
4076    #[test]
4077    fn test_dial_mailbox_router_prefixes_same_level() {
4078        let router = DialMailboxRouter::new();
4079        router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
4080        router.bind(test_proc_ref("world0_1"), "unix!@2".parse().unwrap());
4081        router.bind(test_proc_ref("world0_2"), "unix!@3".parse().unwrap());
4082
4083        let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
4084        prefixes.sort();
4085
4086        // All should be covering prefixes since none is a prefix of another
4087        let mut expected = vec![
4088            test_proc_ref("world0_0"),
4089            test_proc_ref("world0_1"),
4090            test_proc_ref("world0_2"),
4091        ];
4092        expected.sort();
4093
4094        assert_eq!(prefixes, expected);
4095    }
4096
4097    /// A forwarder that bounces messages back to the **same**
4098    /// mailbox, but does so on a task to avoid recursive stack
4099    /// growth.
4100    #[derive(Clone, Debug)]
4101    struct AsyncLoopForwarder;
4102
4103    #[async_trait]
4104    impl MailboxSender for AsyncLoopForwarder {
4105        fn post_unchecked(
4106            &self,
4107            envelope: MessageEnvelope,
4108            return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
4109        ) {
4110            let me = self.clone();
4111            tokio::spawn(async move {
4112                // Call `post` so each hop applies TTL exactly once.
4113                me.post(envelope, return_handle);
4114            });
4115        }
4116    }
4117
4118    #[tokio::test]
4119    async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
4120        let actor_id = test_actor_id("world_0", "ttl_actor");
4121        let mailbox = Mailbox::new(
4122            actor_id.clone(),
4123            BoxedMailboxSender::new(AsyncLoopForwarder),
4124        );
4125        let (ret_port, mut ret_rx) = mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
4126
4127        // Create a destination not owned by this mailbox to force
4128        // forwarding.
4129        let remote_actor = test_actor_id("remote_world_1", "remote");
4130        let dest = reference::PortId::new(remote_actor.clone(), /*port index*/ 4242);
4131
4132        // Build an envelope (TTL is seeded in `MessageEnvelope::new` /
4133        // `::serialize`).
4134        let payload = 1234_u64;
4135        let envelope =
4136            MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Flattrs::new())
4137                .expect("serialize");
4138
4139        // Post it. This will start bouncing between forwarder and
4140        // mailbox until TTL hits 0.
4141        let return_handle = ret_port.clone();
4142        mailbox.post(envelope, return_handle);
4143
4144        // We expect the undeliverable to come back once TTL expires.
4145        let Undeliverable(undelivered) =
4146            tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
4147                .await
4148                .expect("timed out waiting for undeliverable")
4149                .expect("channel closed");
4150
4151        // Sanity: round-trip payload still deserializes.
4152        let got: u64 = undelivered.deserialized().expect("deserialize");
4153        assert_eq!(got, payload, "payload preserved");
4154    }
4155
4156    #[tokio::test]
4157    async fn message_ttl_success_local_delivery() {
4158        let actor_id = test_actor_id("world_0", "ttl_actor");
4159        let mailbox = Mailbox::new(
4160            actor_id.clone(),
4161            BoxedMailboxSender::new(PanickingMailboxSender),
4162        );
4163        let (_undeliverable_tx, mut undeliverable_rx) =
4164            mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
4165
4166        // Open a local user u64 port.
4167        let (user_port, mut user_rx) = mailbox.open_port::<u64>();
4168
4169        // Build an envelope destined for this mailbox's own port.
4170        let payload = 0xC0FFEE_u64;
4171        let envelope = MessageEnvelope::serialize(
4172            actor_id.clone(),
4173            user_port.bind().port_id().clone(),
4174            &payload,
4175            Flattrs::new(),
4176        )
4177        .expect("serialize");
4178
4179        // Post the message using the mailbox (local path). TTL will
4180        // not expire.
4181        let return_handle = mailbox
4182            .bound_return_handle()
4183            .unwrap_or(monitored_return_handle());
4184        mailbox.post(envelope, return_handle);
4185
4186        // We should receive the payload locally.
4187        let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
4188            .await
4189            .expect("timed out waiting for local delivery")
4190            .expect("user port closed");
4191        assert_eq!(got, payload);
4192
4193        // There should be no undeliverables arriving.
4194        let no_undeliverable =
4195            tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
4196        assert!(
4197            no_undeliverable.is_err(),
4198            "unexpected undeliverable returned on successful local delivery"
4199        );
4200    }
4201
4202    #[tokio::test]
4203    async fn test_port_contramap() {
4204        let proc = Proc::local();
4205        let (client, _) = proc.instance("client").unwrap();
4206        let (handle, mut rx) = client.open_port();
4207
4208        handle
4209            .contramap(|m| (1, m))
4210            .send(&client, "hello".to_string())
4211            .unwrap();
4212        assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
4213    }
4214
4215    #[test]
4216    #[should_panic(expected = "already bound")]
4217    fn test_bind_port_handle_to_actor_port_twice() {
4218        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4219        let (handle, _rx) = mbox.open_port::<String>();
4220        handle.bind_actor_port();
4221        handle.bind_actor_port();
4222    }
4223
4224    #[test]
4225    fn test_bind_port_handle_to_actor_port() {
4226        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4227        let default_port = mbox.actor_id().port_id(String::port());
4228        let (handle, _rx) = mbox.open_port::<String>();
4229        // Handle's port index is allocated by mailbox, not the actor port.
4230        assert_ne!(default_port.index(), handle.port_index);
4231        // Bind the handle to the actor port.
4232        handle.bind_actor_port();
4233        assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4234        // bind() can still be used, just it will not change handle's state.
4235        handle.bind();
4236        handle.bind();
4237        assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4238    }
4239
4240    #[test]
4241    #[should_panic(expected = "already bound")]
4242    fn test_bind_port_handle_to_actor_port_when_already_bound() {
4243        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4244        let (handle, _rx) = mbox.open_port::<String>();
4245        // Bound handle to the port allocated by mailbox.
4246        handle.bind();
4247        assert_matches!(handle.location(), PortLocation::Bound(port) if port.index() == handle.port_index);
4248        // Since handle is already bound, call bind_to() on it will cause panic.
4249        handle.bind_actor_port();
4250    }
4251
4252    #[tokio::test]
4253    async fn test_mailbox_post_fails_when_actor_stopped() {
4254        let actor_id = test_actor_id("0", "stopped_actor");
4255
4256        let mailbox = Mailbox::new(
4257            actor_id.clone(),
4258            BoxedMailboxSender::new(PanickingMailboxSender),
4259        );
4260
4261        mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4262
4263        let (user_port, _user_rx) = mailbox.open_port::<u64>();
4264
4265        // Use a separate detached mailbox for the return handle since
4266        // the main mailbox is stopped and won't accept messages.
4267        let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4268
4269        let envelope = MessageEnvelope::serialize(
4270            actor_id.clone(),
4271            user_port.bind().port_id().clone(),
4272            &42u64,
4273            Flattrs::new(),
4274        )
4275        .expect("serialize");
4276
4277        mailbox.post(envelope, return_handle);
4278
4279        let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4280            .await
4281            .expect("timed out waiting for undeliverable")
4282            .expect("return port closed");
4283
4284        let err = undeliverable.0.error_msg().expect("expected error");
4285        assert!(
4286            err.contains(&format!("owner {} is stopped", actor_id)),
4287            "error should indicate actor stopped: {}",
4288            err
4289        );
4290    }
4291
4292    #[tokio::test]
4293    async fn test_mailbox_post_fails_when_actor_failed() {
4294        use crate::actor::ActorErrorKind;
4295
4296        let actor_id = test_actor_id("0", "failed_actor");
4297
4298        let mailbox = Mailbox::new(
4299            actor_id.clone(),
4300            BoxedMailboxSender::new(PanickingMailboxSender),
4301        );
4302
4303        let (user_port, _user_rx) = mailbox.open_port::<u64>();
4304
4305        mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4306            "test failure".to_string(),
4307        )));
4308
4309        // Use a separate detached mailbox for the return handle since
4310        // the main mailbox is failed and won't accept messages.
4311        let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4312
4313        let envelope = MessageEnvelope::serialize(
4314            actor_id.clone(),
4315            user_port.bind().port_id().clone(),
4316            &42u64,
4317            Flattrs::new(),
4318        )
4319        .expect("serialize");
4320
4321        mailbox.post(envelope, return_handle);
4322
4323        let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4324            .await
4325            .expect("timed out waiting for undeliverable")
4326            .expect("return port closed");
4327
4328        let err = undeliverable.0.error_msg().expect("expected error");
4329        assert!(
4330            err.contains(&format!("owner {} failed", actor_id)),
4331            "error should indicate actor failed: {}",
4332            err
4333        );
4334    }
4335
4336    #[tokio::test]
4337    async fn test_port_handle_send_fails_when_actor_stopped() {
4338        let actor_id = test_actor_id("0", "stopped_actor");
4339
4340        let mailbox = Mailbox::new(
4341            actor_id.clone(),
4342            BoxedMailboxSender::new(PanickingMailboxSender),
4343        );
4344
4345        let (port_handle, _rx) = mailbox.open_port::<u64>();
4346        let proc = Proc::local();
4347        let (client, _) = proc.instance("client").unwrap();
4348
4349        mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4350
4351        let result = port_handle.send(&client, 42u64);
4352
4353        assert!(result.is_err(), "send should fail when actor is stopped");
4354        let err = result.unwrap_err();
4355        assert_matches!(
4356            err.kind(),
4357            MailboxSenderErrorKind::Mailbox(mailbox_err)
4358                if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "test stop")
4359        );
4360    }
4361
4362    #[tokio::test]
4363    async fn test_port_handle_send_fails_when_actor_failed() {
4364        use crate::actor::ActorErrorKind;
4365
4366        let actor_id = test_actor_id("0", "failed_actor");
4367
4368        let mailbox = Mailbox::new(
4369            actor_id.clone(),
4370            BoxedMailboxSender::new(PanickingMailboxSender),
4371        );
4372
4373        let (port_handle, _rx) = mailbox.open_port::<u64>();
4374        let proc = Proc::local();
4375        let (client, _) = proc.instance("client").unwrap();
4376
4377        mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4378            "test failure".to_string(),
4379        )));
4380
4381        let result = port_handle.send(&client, 42u64);
4382
4383        assert!(result.is_err(), "send should fail when actor is failed");
4384        let err = result.unwrap_err();
4385        assert_matches!(
4386            err.kind(),
4387            MailboxSenderErrorKind::Mailbox(mailbox_err)
4388                if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg))) if msg == "test failure")
4389        );
4390    }
4391
4392    #[async_timed_test(timeout_secs = 30)]
4393    async fn test_open_reduce_port() {
4394        let proc = Proc::local();
4395        let (client, _) = proc.instance("client").unwrap();
4396
4397        // Open an accumulator port with sum reducer
4398        let (port_handle, receiver) = client.mailbox().open_reduce_port(accum::sum::<u64>());
4399
4400        // Verify the reducer_spec is set
4401        let port_ref = port_handle.bind();
4402        assert!(port_ref.reducer_spec().is_some());
4403
4404        // Send a single value via the bound port
4405        port_ref.send(&client, 42).unwrap();
4406
4407        // Should receive the value
4408        let result = receiver.recv().await.unwrap();
4409        assert_eq!(result, 42);
4410    }
4411
4412    #[async_timed_test(timeout_secs = 30)]
4413    async fn test_open_reduce_port_reducer_spec_preserved() {
4414        let proc = Proc::local();
4415        let (client, _) = proc.instance("client").unwrap();
4416
4417        // Test that different accumulators produce different reducer_specs
4418        let (sum_handle, _) = client.mailbox().open_reduce_port(accum::sum::<u64>());
4419        let sum_ref = sum_handle.bind();
4420        let sum_typehash = sum_ref.reducer_spec().as_ref().unwrap().typehash;
4421
4422        let (max_handle, _) = client
4423            .mailbox()
4424            .open_reduce_port(accum::join_semilattice::<accum::Max<u64>>());
4425        let max_ref = max_handle.bind();
4426        let max_typehash = max_ref.reducer_spec().as_ref().unwrap().typehash;
4427
4428        // Different accumulators should have different reducer typehashes
4429        assert_ne!(sum_typehash, max_typehash);
4430    }
4431
4432    /// Test that `MailboxClient::flush()` waits until messages are wire-acked
4433    /// over a unix domain socket channel. We send messages, flush, and then
4434    /// confirm that the messages have already been delivered to the receiving
4435    /// mailbox.
4436    #[tokio::test]
4437    async fn test_flush_over_unix_channel() {
4438        let mbox = Mailbox::new_detached(test_actor_id("0", "actor0"));
4439
4440        // Serve the mailbox on a unix domain socket channel.
4441        let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
4442        let serve_handle = mbox.clone().serve(rx);
4443
4444        // Dial the unix address to get a MailboxClient.
4445        let client = MailboxClient::dial(addr).unwrap();
4446
4447        // Open a streaming port so we can receive multiple messages.
4448        let (port, mut receiver) = mbox.open_port::<u64>();
4449        let port = port.bind();
4450
4451        // Send several messages without awaiting delivery.
4452        for i in 0..10u64 {
4453            client
4454                .serialize_and_send(&port, i, monitored_return_handle())
4455                .unwrap();
4456        }
4457
4458        // Flush: this should block until all 10 messages are wire-acked,
4459        // meaning they've been enqueued into the receiving mailbox.
4460        client.flush().await.unwrap();
4461
4462        // After flush, all messages should already be available.
4463        for i in 0..10u64 {
4464            let msg = receiver
4465                .try_recv()
4466                .expect("message should be available after flush")
4467                .expect("receiver should not be empty after flush");
4468            assert_eq!(msg, i);
4469        }
4470
4471        serve_handle.stop("test done");
4472        serve_handle.await.unwrap().unwrap();
4473    }
4474}