hyperactor/
mailbox.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mailboxes are the central message-passing mechanism in Hyperactor.
10//!
11//! Each actor owns a mailbox to which other actors can deliver messages.
12//! An actor can open one or more typed _ports_ in the mailbox; messages
13//! are in turn delivered to specific ports.
14//!
15//! Mailboxes are associated with an [`ActorId`] (given by `actor_id`
16//! in the following example):
17//!
18//! ```
19//! # use hyperactor::mailbox::Mailbox;
20//! # use hyperactor::reference::{ActorId, ProcId, WorldId};
21//! # tokio_test::block_on(async {
22//! # let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
23//! # let actor_id = ActorId(proc_id, "actor".to_string(), 0);
24//! let mbox = Mailbox::new_detached(actor_id);
25//! let (port, mut receiver) = mbox.open_port::<u64>();
26//!
27//! port.send(123).unwrap();
28//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
29//! # })
30//! ```
31//!
32//! Mailboxes also provide a form of one-shot ports, called [`OncePort`],
33//! that permits at most one message transmission:
34//!
35//! ```
36//! # use hyperactor::mailbox::Mailbox;
37//! # use hyperactor::reference::{ActorId, ProcId, WorldId};
38//! # tokio_test::block_on(async {
39//! # let proc_id = ProcId::Ranked(WorldId("world".to_string()), 0);
40//! # let actor_id = ActorId(proc_id, "actor".to_string(), 0);
41//! let mbox = Mailbox::new_detached(actor_id);
42//!
43//! let (port, receiver) = mbox.open_once_port::<u64>();
44//!
45//! port.send(123u64).unwrap();
46//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
47//! # })
48//! ```
49//!
50//! [`OncePort`]s are correspondingly used for RPC replies in the actor
51//! system.
52//!
53//! ## Remote ports and serialization
54//!
55//! Mailboxes allow delivery of serialized messages to named ports:
56//!
57//! 1) Ports restrict message types to (serializable) [`Message`]s.
58//! 2) Each [`Port`] is associated with a [`PortId`] which globally names the port.
59//! 3) [`Mailbox`] provides interfaces to deliver serialized
60//!    messages to ports named by their [`PortId`].
61//!
62//! While this complicates the interface somewhat, it allows the
63//! implementation to avoid a serialization roundtrip when passing
64//! messages locally.
65
66#![allow(dead_code)] // Allow until this is used outside of tests.
67
68use std::any::Any;
69use std::collections::BTreeMap;
70use std::collections::BTreeSet;
71use std::fmt;
72use std::fmt::Debug;
73use std::future;
74use std::future::Future;
75use std::ops::Bound::Excluded;
76use std::pin::Pin;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::sync::Mutex;
80use std::sync::OnceLock;
81use std::sync::RwLock;
82use std::sync::Weak;
83use std::sync::atomic::AtomicU64;
84use std::sync::atomic::AtomicUsize;
85use std::sync::atomic::Ordering;
86use std::task::Context;
87use std::task::Poll;
88
89use async_trait::async_trait;
90use dashmap::DashMap;
91use dashmap::mapref::entry::Entry;
92use futures::Sink;
93use futures::Stream;
94use hyperactor_config::attrs::Attrs;
95use serde::Deserialize;
96use serde::Serialize;
97use serde::de::DeserializeOwned;
98use tokio::sync::mpsc;
99use tokio::sync::oneshot;
100use tokio::sync::watch;
101use tokio::task::JoinHandle;
102use tokio_util::sync::CancellationToken;
103use typeuri::Named;
104
105use crate as hyperactor; // for macros
106use crate::OncePortRef;
107use crate::PortRef;
108use crate::accum::Accumulator;
109use crate::accum::ReducerOpts;
110use crate::accum::ReducerSpec;
111use crate::actor::Signal;
112use crate::actor::remote::USER_PORT_OFFSET;
113use crate::channel;
114use crate::channel::ChannelAddr;
115use crate::channel::ChannelError;
116use crate::channel::SendError;
117use crate::channel::TxStatus;
118use crate::context;
119use crate::id;
120use crate::metrics;
121use crate::reference::ActorId;
122use crate::reference::PortId;
123use crate::reference::Reference;
124
125mod undeliverable;
126/// For [`Undeliverable`], a message type for delivery failures.
127pub use undeliverable::Undeliverable;
128pub use undeliverable::UndeliverableMessageError;
129pub use undeliverable::custom_monitored_return_handle;
130pub use undeliverable::monitored_return_handle; // TODO: Audit
131pub use undeliverable::supervise_undeliverable_messages;
132pub use undeliverable::supervise_undeliverable_messages_with;
133/// For [`MailboxAdminMessage`], a message type for mailbox administration.
134pub mod mailbox_admin_message;
135pub use mailbox_admin_message::MailboxAdminMessage;
136pub use mailbox_admin_message::MailboxAdminMessageHandler;
137/// For [`DurableMailboxSender`] a sender with a write-ahead log.
138pub mod durable_mailbox_sender;
139pub use durable_mailbox_sender::log;
140use durable_mailbox_sender::log::*;
141/// For message headers and latency tracking.
142pub mod headers;
143
144/// Message collects the necessary requirements for messages that are deposited
145/// into mailboxes.
146pub trait Message: Send + Sync + 'static {}
147impl<M: Send + Sync + 'static> Message for M {}
148
149/// RemoteMessage extends [`Message`] by requiring that the messages
150/// also be serializable, and can thus traverse process boundaries.
151/// RemoteMessages must also specify a globally unique type name (a URI).
152pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
153
154impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
155
156/// Type alias for bytestring data used throughout the system.
157pub type Data = Vec<u8>;
158
159/// Delivery errors occur during message posting.
160#[derive(
161    thiserror::Error,
162    Debug,
163    Serialize,
164    Deserialize,
165    typeuri::Named,
166    Clone,
167    PartialEq,
168    Eq
169)]
170pub enum DeliveryError {
171    /// The destination address is not reachable.
172    #[error("address not routable: {0}")]
173    Unroutable(String),
174
175    /// A broken link indicates that a link in the message
176    /// delivery path has failed.
177    #[error("broken link: {0}")]
178    BrokenLink(String),
179
180    /// A (local) mailbox delivery error.
181    #[error("mailbox error: {0}")]
182    Mailbox(String),
183
184    /// A multicast related delivery error.
185    #[error("multicast error: {0}")]
186    Multicast(String),
187
188    /// The message went through too many hops and has expired.
189    #[error("ttl expired")]
190    TtlExpired,
191}
192
193/// An envelope that carries a message destined to a remote actor.
194/// The envelope contains a serialized message along with its destination
195/// and sender.
196#[derive(Debug, Serialize, Deserialize, Clone, typeuri::Named)]
197pub struct MessageEnvelope {
198    /// The sender of this message.
199    sender: ActorId,
200
201    /// The destination of the message.
202    dest: PortId,
203
204    /// The serialized message.
205    data: wirevalue::Any,
206
207    /// Error contains a delivery error when message delivery failed.
208    errors: Vec<DeliveryError>,
209
210    /// Additional context for this message.
211    headers: Attrs,
212
213    /// Decremented at every `MailboxSender` hop.
214    ttl: u8,
215
216    /// If true, undeliverable messages should be returned to sender. Else, they
217    /// are dropped.
218    return_undeliverable: bool,
219    // TODO: add typename, source, seq, etc.
220}
221wirevalue::register_type!(MessageEnvelope);
222
223impl MessageEnvelope {
224    /// Create a new envelope with the provided sender, destination, and message.
225    pub fn new(sender: ActorId, dest: PortId, data: wirevalue::Any, headers: Attrs) -> Self {
226        Self {
227            sender,
228            dest,
229            data,
230            errors: Vec::new(),
231            headers,
232            ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
233            // By default, all undeliverable messages should be returned to the sender.
234            return_undeliverable: true,
235        }
236    }
237
238    /// Create a new envelope whose sender ID is unknown.
239    pub(crate) fn new_unknown(dest: PortId, data: wirevalue::Any) -> Self {
240        Self::new(id!(unknown[0].unknown), dest, data, Attrs::new())
241    }
242
243    /// Construct a new serialized value by serializing the provided T-typed value.
244    pub fn serialize<T: Serialize + Named>(
245        source: ActorId,
246        dest: PortId,
247        value: &T,
248        headers: Attrs,
249    ) -> Result<Self, wirevalue::Error> {
250        Ok(Self {
251            headers,
252            data: wirevalue::Any::serialize(value)?,
253            sender: source,
254            dest,
255            errors: Vec::new(),
256            ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
257            // By default, all undeliverable messages should be returned to the sender.
258            return_undeliverable: true,
259        })
260    }
261
262    /// Returns the remaining time-to-live (TTL) for this message.
263    ///
264    /// The TTL is decremented at each `MailboxSender` hop. When it
265    /// reaches 0, the message is considered expired and is returned
266    /// to the sender as undeliverable.
267    pub fn ttl(&self) -> u8 {
268        self.ttl
269    }
270
271    /// Overrides the message’s time-to-live (TTL).
272    ///
273    /// This replaces the current TTL value (normally initialized from
274    /// `config::MESSAGE_TTL_DEFAULT`) with the provided `ttl`. The
275    /// updated envelope is returned for chaining.
276    ///
277    /// # Note
278    /// The TTL is decremented at each `MailboxSender` hop, and when
279    /// it reaches 0 the message will be treated as undeliverable.
280    pub fn set_ttl(mut self, ttl: u8) -> Self {
281        self.ttl = ttl;
282        self
283    }
284
285    /// Decrements the message's TTL by one hop.
286    ///
287    /// Returns `Ok(())` if the TTL was greater than zero and
288    /// successfully decremented. If the TTL was already zero, no
289    /// decrement occurs and `Err(DeliveryError::TtlExpired)` is
290    /// returned, indicating that the message has expired and should
291    /// be treated as undeliverable.
292    fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
293        if self.ttl == 0 {
294            Err(DeliveryError::TtlExpired)
295        } else {
296            self.ttl -= 1;
297            Ok(())
298        }
299    }
300
301    /// Deserialize the message in the envelope to the provided type T.
302    pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
303        self.data.deserialized()
304    }
305
306    /// The serialized message.
307    pub fn data(&self) -> &wirevalue::Any {
308        &self.data
309    }
310
311    /// The message sender.
312    pub fn sender(&self) -> &ActorId {
313        &self.sender
314    }
315
316    /// The destination of the message.
317    pub fn dest(&self) -> &PortId {
318        &self.dest
319    }
320
321    /// The message headers.
322    pub fn headers(&self) -> &Attrs {
323        &self.headers
324    }
325
326    /// Tells whether this is a signal message.
327    pub fn is_signal(&self) -> bool {
328        self.dest.index() == Signal::port()
329    }
330
331    /// Set a delivery error for the message. If errors are already set, append
332    /// it to the existing errors.
333    pub fn set_error(&mut self, error: DeliveryError) {
334        self.errors.push(error)
335    }
336
337    /// Change the sender on the envelope in case it was set incorrectly. This
338    /// should only be used by CommActor since it is forwarding from another
339    /// sender.
340    pub fn update_sender(&mut self, sender: ActorId) {
341        self.sender = sender;
342    }
343
344    /// Set to true if you want this message to be returned to sender if it cannot
345    /// reach dest. This is the default.
346    /// Set to false if you want the message to be dropped instead.
347    pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
348        self.return_undeliverable = return_undeliverable;
349    }
350
351    /// The message has been determined to be undeliverable with the
352    /// provided error. Mark the envelope with the error and return to
353    /// sender.
354    pub fn undeliverable(
355        mut self,
356        error: DeliveryError,
357        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
358    ) {
359        tracing::error!(
360            name = "undelivered_message_attempt",
361            sender = self.sender.to_string(),
362            dest = self.dest.to_string(),
363            error = error.to_string(),
364            return_handle = %return_handle,
365        );
366        metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
367            1,
368            hyperactor_telemetry::kv_pairs!(
369                "sender_actor_id" => self.sender.to_string(),
370                "dest_actor_id" => self.dest.to_string(),
371                "message_type" => self.data.typename().unwrap_or("unknown"),
372                "error_type" =>  error.to_string(),
373            ),
374        );
375
376        self.set_error(error);
377        undeliverable::return_undeliverable(return_handle, self);
378    }
379
380    /// Get the errors of why this message was undeliverable. Empty means this
381    /// message was not determined as undeliverable.
382    pub fn errors(&self) -> &Vec<DeliveryError> {
383        &self.errors
384    }
385
386    /// Get the string representation of the errors of this message was
387    /// undeliverable. None means this message was not determined as
388    /// undeliverable.
389    pub fn error_msg(&self) -> Option<String> {
390        if self.errors.is_empty() {
391            None
392        } else {
393            Some(
394                self.errors
395                    .iter()
396                    .map(|e| e.to_string())
397                    .collect::<Vec<_>>()
398                    .join("; "),
399            )
400        }
401    }
402
403    fn open(self) -> (MessageMetadata, wirevalue::Any) {
404        let Self {
405            sender,
406            dest,
407            data,
408            errors,
409            headers,
410            ttl,
411            return_undeliverable,
412        } = self;
413
414        (
415            MessageMetadata {
416                sender,
417                dest,
418                errors,
419                headers,
420                ttl,
421                return_undeliverable,
422            },
423            data,
424        )
425    }
426
427    fn seal(metadata: MessageMetadata, data: wirevalue::Any) -> Self {
428        let MessageMetadata {
429            sender,
430            dest,
431            errors,
432            headers,
433            ttl,
434            return_undeliverable,
435        } = metadata;
436
437        Self {
438            sender,
439            dest,
440            data,
441            errors,
442            headers,
443            ttl,
444            return_undeliverable,
445        }
446    }
447
448    fn return_undeliverable(&self) -> bool {
449        self.return_undeliverable
450    }
451}
452
453impl fmt::Display for MessageEnvelope {
454    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
455        match &self.error_msg() {
456            None => write!(
457                f,
458                "{} > {}: {} {{{}}}",
459                self.sender, self.dest, self.data, self.headers
460            ),
461            Some(err) => write!(
462                f,
463                "{} > {}: {} {{{}}}: delivery error: {}",
464                self.sender, self.dest, self.data, self.headers, err
465            ),
466        }
467    }
468}
469
470/// Metadata about a message sent via a MessageEnvelope.
471#[derive(Clone)]
472pub struct MessageMetadata {
473    sender: ActorId,
474    dest: PortId,
475    errors: Vec<DeliveryError>,
476    headers: Attrs,
477    ttl: u8,
478    return_undeliverable: bool,
479}
480
481/// Errors that occur during mailbox operations. Each error is associated
482/// with the mailbox's actor id.
483#[derive(Debug)]
484pub struct MailboxError {
485    actor_id: ActorId,
486    kind: MailboxErrorKind,
487}
488
489/// The kinds of mailbox errors. This enum is marked non-exhaustive to
490/// allow for extensibility.
491#[derive(thiserror::Error, Debug)]
492#[non_exhaustive]
493pub enum MailboxErrorKind {
494    /// An operation was attempted on a closed mailbox.
495    #[error("mailbox closed")]
496    Closed,
497
498    /// The port associated with an operation was invalid.
499    #[error("invalid port: {0}")]
500    InvalidPort(PortId),
501
502    /// There was no sender associated with the port.
503    #[error("no sender for port: {0}")]
504    NoSenderForPort(PortId),
505
506    /// There was no local sender associated with the port.
507    /// Returned by operations that require a local port.
508    #[error("no local sender for port: {0}")]
509    NoLocalSenderForPort(PortId),
510
511    /// The port was closed.
512    #[error("{0}: port closed")]
513    PortClosed(PortId),
514
515    /// An error occured during a send operation.
516    #[error("send {0}: {1}")]
517    Send(PortId, #[source] anyhow::Error),
518
519    /// An error occured during a receive operation.
520    #[error("recv {0}: {1}")]
521    Recv(PortId, #[source] anyhow::Error),
522
523    /// There was a serialization failure.
524    #[error("serialize: {0}")]
525    Serialize(#[source] anyhow::Error),
526
527    /// There was a deserialization failure.
528    #[error("deserialize {0}: {1}")]
529    Deserialize(&'static str, anyhow::Error),
530
531    /// There was an error during a channel operation.
532    #[error(transparent)]
533    Channel(#[from] ChannelError),
534}
535
536impl MailboxError {
537    /// Create a new mailbox error associated with the provided actor
538    /// id and of the given kind.
539    pub fn new(actor_id: ActorId, kind: MailboxErrorKind) -> Self {
540        Self { actor_id, kind }
541    }
542
543    /// The ID of the mailbox producing this error.
544    pub fn actor_id(&self) -> &ActorId {
545        &self.actor_id
546    }
547
548    /// The error's kind.
549    pub fn kind(&self) -> &MailboxErrorKind {
550        &self.kind
551    }
552}
553
554impl fmt::Display for MailboxError {
555    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556        write!(f, "{}: ", self.actor_id)?;
557        fmt::Display::fmt(&self.kind, f)
558    }
559}
560
561impl std::error::Error for MailboxError {
562    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
563        self.kind.source()
564    }
565}
566
567/// PortLocation describes the location of a port.
568/// This is used in errors to provide a uniform data type
569/// for ports that may or may not be bound.
570#[derive(Debug, Clone)]
571pub enum PortLocation {
572    /// The port was bound: the location is its underlying bound ID.
573    Bound(PortId),
574    /// The port was not bound: we provide the actor ID and the message type.
575    Unbound(ActorId, &'static str),
576}
577
578impl PortLocation {
579    fn new_unbound<M: Message>(actor_id: ActorId) -> Self {
580        PortLocation::Unbound(actor_id, std::any::type_name::<M>())
581    }
582
583    fn new_unbound_type(actor_id: ActorId, ty: &'static str) -> Self {
584        PortLocation::Unbound(actor_id, ty)
585    }
586
587    /// The actor id of the location.
588    pub fn actor_id(&self) -> &ActorId {
589        match self {
590            PortLocation::Bound(port_id) => port_id.actor_id(),
591            PortLocation::Unbound(actor_id, _) => actor_id,
592        }
593    }
594}
595
596impl fmt::Display for PortLocation {
597    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598        match self {
599            PortLocation::Bound(port_id) => write!(f, "{}", port_id),
600            PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
601        }
602    }
603}
604
605/// Errors that that occur during mailbox sending operations. Each error
606/// is associated with the port ID of the operation.
607#[derive(Debug)]
608pub struct MailboxSenderError {
609    location: Box<PortLocation>,
610    kind: Box<MailboxSenderErrorKind>,
611}
612
613/// The kind of mailbox sending errors.
614#[derive(thiserror::Error, Debug)]
615pub enum MailboxSenderErrorKind {
616    /// Error during serialization.
617    #[error("serialization error: {0}")]
618    Serialize(anyhow::Error),
619
620    /// Error during deserialization.
621    #[error("deserialization error for type {0}: {1}")]
622    Deserialize(&'static str, anyhow::Error),
623
624    /// A send to an invalid port.
625    #[error("invalid port")]
626    Invalid,
627
628    /// A send to a closed port.
629    #[error("port closed")]
630    Closed,
631
632    // The following pass through underlying errors:
633    /// An underlying mailbox error.
634    #[error(transparent)]
635    Mailbox(#[from] MailboxError),
636
637    /// An underlying channel error.
638    #[error(transparent)]
639    Channel(#[from] ChannelError),
640
641    /// An underlying message log error.
642    #[error(transparent)]
643    MessageLog(#[from] MessageLogError),
644
645    /// An other, uncategorized error.
646    #[error("send error: {0}")]
647    Other(#[from] anyhow::Error),
648
649    /// The destination was unreachable.
650    #[error("unreachable: {0}")]
651    Unreachable(anyhow::Error),
652}
653
654impl MailboxSenderError {
655    /// Create a new mailbox sender error to an unbound port.
656    pub fn new_unbound<M>(actor_id: ActorId, kind: MailboxSenderErrorKind) -> Self {
657        Self {
658            location: Box::new(PortLocation::Unbound(actor_id, std::any::type_name::<M>())),
659            kind: Box::new(kind),
660        }
661    }
662
663    /// Create a new mailbox sender, manually providing the type.
664    pub fn new_unbound_type(
665        actor_id: ActorId,
666        kind: MailboxSenderErrorKind,
667        ty: &'static str,
668    ) -> Self {
669        Self {
670            location: Box::new(PortLocation::Unbound(actor_id, ty)),
671            kind: Box::new(kind),
672        }
673    }
674
675    /// Create a new mailbox sender error with the provided port ID and kind.
676    pub fn new_bound(port_id: PortId, kind: MailboxSenderErrorKind) -> Self {
677        Self {
678            location: Box::new(PortLocation::Bound(port_id)),
679            kind: Box::new(kind),
680        }
681    }
682
683    /// The location at which the error occured.
684    pub fn location(&self) -> &PortLocation {
685        &self.location
686    }
687
688    /// The kind associated with the error.
689    pub fn kind(&self) -> &MailboxSenderErrorKind {
690        &self.kind
691    }
692}
693
694impl fmt::Display for MailboxSenderError {
695    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
696        write!(f, "{}: ", self.location)?;
697        fmt::Display::fmt(&self.kind, f)
698    }
699}
700
701impl std::error::Error for MailboxSenderError {
702    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
703        self.kind.source()
704    }
705}
706
707/// MailboxSenders can send messages through ports to mailboxes. It
708/// provides a unified interface for message delivery in the system.
709pub trait MailboxSender: Send + Sync + Any {
710    /// Apply hop semantics (TTL decrement; undeliverable on 0), then
711    /// delegate to transport.
712    fn post(
713        &self,
714        mut envelope: MessageEnvelope,
715        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
716    ) {
717        if let Err(err) = envelope.dec_ttl_or_err() {
718            envelope.undeliverable(err, return_handle);
719            return;
720        }
721        self.post_unchecked(envelope, return_handle);
722    }
723
724    /// Raw transport: **no** policy.
725    fn post_unchecked(
726        &self,
727        envelope: MessageEnvelope,
728        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
729    );
730}
731
732/// PortSender extends [`MailboxSender`] by providing typed endpoints
733/// for sending messages over ports
734pub trait PortSender: MailboxSender {
735    /// Deliver a message to the provided port.
736    fn serialize_and_send<M: RemoteMessage>(
737        &self,
738        port: &PortRef<M>,
739        message: M,
740        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
741    ) -> Result<(), MailboxSenderError> {
742        // TODO: convert this to a undeliverable error also
743        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
744            MailboxSenderError::new_bound(
745                port.port_id().clone(),
746                MailboxSenderErrorKind::Serialize(err.into()),
747            )
748        })?;
749        self.post(
750            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
751            return_handle,
752        );
753        Ok(())
754    }
755
756    /// Deliver a message to a one-shot port, consuming the provided port,
757    /// which is not reusable.
758    fn serialize_and_send_once<M: RemoteMessage>(
759        &self,
760        once_port: OncePortRef<M>,
761        message: M,
762        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
763    ) -> Result<(), MailboxSenderError> {
764        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
765            MailboxSenderError::new_bound(
766                once_port.port_id().clone(),
767                MailboxSenderErrorKind::Serialize(err.into()),
768            )
769        })?;
770        self.post(
771            MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
772            return_handle,
773        );
774        Ok(())
775    }
776}
777
778impl<T: ?Sized + MailboxSender> PortSender for T {}
779
780/// A perpetually closed mailbox sender. Panics if any messages are posted.
781/// Useful for tests, or where there is no meaningful mailbox sender
782/// implementation available.
783#[derive(Debug, Clone)]
784pub struct PanickingMailboxSender;
785
786impl MailboxSender for PanickingMailboxSender {
787    fn post_unchecked(
788        &self,
789        envelope: MessageEnvelope,
790        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
791    ) {
792        panic!("panic! in the mailbox! attempted post: {}", envelope)
793    }
794}
795
796/// A mailbox sender for undeliverable messages. This will simply record
797/// any undelivered messages.
798#[derive(Debug)]
799pub struct UndeliverableMailboxSender;
800
801impl MailboxSender for UndeliverableMailboxSender {
802    fn post_unchecked(
803        &self,
804        envelope: MessageEnvelope,
805        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
806    ) {
807        let sender_name = envelope.sender.name();
808        let error_str = envelope.error_msg().unwrap_or("".to_string());
809        // The undeliverable message was unable to be delivered back to the
810        // sender for some reason
811        tracing::error!(
812            name = "undelivered_message_abandoned",
813            actor_name = sender_name,
814            actor_id = envelope.sender.to_string(),
815            dest = envelope.dest.to_string(),
816            headers = envelope.headers().to_string(), // todo: implement tracing::Value for Attrs
817            data = envelope.data().to_string(),
818            return_handle = %return_handle,
819            "message not delivered, {}",
820            error_str,
821        );
822    }
823}
824
825struct Buffer<T: Message> {
826    queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
827    processed: watch::Receiver<usize>,
828    seq: AtomicUsize,
829}
830
831impl<T: Message> Buffer<T> {
832    fn new<Fut>(
833        process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
834    ) -> Self
835    where
836        Fut: Future<Output = ()> + Send + 'static,
837    {
838        let (queue, mut next) = mpsc::unbounded_channel();
839        let (last_processed, processed) = watch::channel(0);
840        crate::init::get_runtime().spawn(async move {
841            let mut seq = 0;
842            while let Some((msg, return_handle)) = next.recv().await {
843                process(msg, return_handle).await;
844                seq += 1;
845                let _ = last_processed.send(seq);
846            }
847        });
848        Self {
849            queue,
850            processed,
851            seq: AtomicUsize::new(0),
852        }
853    }
854
855    #[allow(clippy::result_large_err)]
856    fn send(
857        &self,
858        item: (T, PortHandle<Undeliverable<T>>),
859    ) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
860        self.seq.fetch_add(1, Ordering::SeqCst);
861        self.queue.send(item)?;
862        Ok(())
863    }
864
865    async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
866        let seq = self.seq.load(Ordering::SeqCst);
867        while *self.processed.borrow_and_update() < seq {
868            self.processed.changed().await?;
869        }
870        Ok(())
871    }
872}
873
874static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
875    LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
876
877/// Convenience boxing implementation for MailboxSender. Most APIs
878/// are parameterized on MailboxSender implementations, and it's thus
879/// difficult to work with dyn values.  BoxedMailboxSender bridges this
880/// gap by providing a concrete MailboxSender which dispatches using an
881/// underlying (boxed) dyn.
882#[derive(Clone)]
883pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
884
885impl fmt::Debug for BoxedMailboxSender {
886    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
887        f.debug_struct("BoxedMailboxSender")
888            .field("sender", &"<dyn MailboxSender>")
889            .finish()
890    }
891}
892
893impl BoxedMailboxSender {
894    /// Create a new boxed sender given the provided sender implementation.
895    pub fn new(sender: impl MailboxSender + 'static) -> Self {
896        Self(Arc::new(sender))
897    }
898
899    /// Attempts to downcast the inner sender to the given concrete
900    /// type.
901    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
902        (&*self.0 as &dyn Any).downcast_ref::<T>()
903    }
904}
905
906/// Extension trait that creates a boxed clone of a MailboxSender.
907pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
908    /// A boxed clone of this MailboxSender.
909    fn boxed(&self) -> BoxedMailboxSender;
910}
911impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
912    fn boxed(&self) -> BoxedMailboxSender {
913        BoxedMailboxSender::new(self.clone())
914    }
915}
916
917/// Extension trait that rehomes a MailboxSender into a BoxedMailboxSender.
918pub trait IntoBoxedMailboxSender: MailboxSender {
919    /// Rehome this MailboxSender into a BoxedMailboxSender.
920    fn into_boxed(self) -> BoxedMailboxSender;
921}
922impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
923    fn into_boxed(self) -> BoxedMailboxSender {
924        BoxedMailboxSender::new(self)
925    }
926}
927
928impl MailboxSender for BoxedMailboxSender {
929    fn post_unchecked(
930        &self,
931        envelope: MessageEnvelope,
932        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
933    ) {
934        self.0.post_unchecked(envelope, return_handle);
935    }
936}
937
938/// Errors that occur during mailbox serving.
939#[derive(thiserror::Error, Debug)]
940pub enum MailboxServerError {
941    /// An underlying channel error.
942    #[error(transparent)]
943    Channel(#[from] ChannelError),
944
945    /// An underlying mailbox sender error.
946    #[error(transparent)]
947    MailboxSender(#[from] MailboxSenderError),
948}
949
950/// Represents a running [`MailboxServer`]. The handle composes a
951/// ['tokio::task::JoinHandle'] and may be joined in the same manner.
952#[derive(Debug)]
953pub struct MailboxServerHandle {
954    join_handle: JoinHandle<Result<(), MailboxServerError>>,
955    stopped_tx: watch::Sender<bool>,
956}
957
958impl MailboxServerHandle {
959    /// Signal the server to stop serving the mailbox. The caller should
960    /// join the handle by awaiting the [`MailboxServerHandle`] future.
961    ///
962    /// Stop should be called at most once.
963    pub fn stop(&self, reason: &str) {
964        tracing::info!("stopping mailbox server; reason: {}", reason);
965        self.stopped_tx.send(true).expect("stop called twice");
966    }
967}
968
969/// Forward future implementation to underlying handle.
970impl Future for MailboxServerHandle {
971    type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
972
973    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
974        // SAFETY: This is safe to do because self is pinned.
975        let join_handle_pinned =
976            unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
977        join_handle_pinned.poll(cx)
978    }
979}
980
981// A `MailboxServer` (such as a router) can receive a message
982// that couldn't reach its destination. We can use the fact that
983// servers are `MailboxSender`s to attempt to forward them back to
984// their senders.
985fn server_return_handle<T: MailboxServer>(server: T) -> PortHandle<Undeliverable<MessageEnvelope>> {
986    let (return_handle, mut rx) = undeliverable::new_undeliverable_port();
987
988    tokio::task::spawn(async move {
989        while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
990            if let Ok(Undeliverable(e)) = envelope.deserialized::<Undeliverable<MessageEnvelope>>()
991            {
992                // A non-returnable undeliverable.
993                UndeliverableMailboxSender.post(e, monitored_return_handle());
994                continue;
995            }
996            envelope.set_error(DeliveryError::BrokenLink(
997                "message was undeliverable".to_owned(),
998            ));
999            server.post(
1000                MessageEnvelope::new(
1001                    envelope.sender().clone(),
1002                    PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1003                        envelope.sender(),
1004                    )
1005                    .port_id()
1006                    .clone(),
1007                    wirevalue::Any::serialize(&Undeliverable(envelope)).unwrap(),
1008                    Attrs::new(),
1009                ),
1010                monitored_return_handle(),
1011            );
1012        }
1013    });
1014
1015    return_handle
1016}
1017
1018/// Serve a port on the provided [`channel::Rx`]. This dispatches all
1019/// channel messages directly to the port.
1020pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
1021    /// Serve the provided port on the given channel on this sender on
1022    /// a background task which may be joined with the returned handle.
1023    /// The task fails on any send error.
1024    fn serve(
1025        self,
1026        mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
1027    ) -> MailboxServerHandle {
1028        // A `MailboxServer` can receive a message that couldn't
1029        // reach its destination. We can use the fact that servers are
1030        // `MailboxSender`s to attempt to forward them back to their
1031        // senders.
1032        let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
1033        let server = self.clone();
1034        tokio::task::spawn(async move {
1035            while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
1036                if let Ok(Undeliverable(e)) =
1037                    envelope.deserialized::<Undeliverable<MessageEnvelope>>()
1038                {
1039                    // A non-returnable undeliverable.
1040                    UndeliverableMailboxSender.post(e, monitored_return_handle());
1041                    continue;
1042                }
1043                envelope.set_error(DeliveryError::BrokenLink(
1044                    "message was undeliverable".to_owned(),
1045                ));
1046                server.post(
1047                    MessageEnvelope::new(
1048                        envelope.sender().clone(),
1049                        PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1050                            envelope.sender(),
1051                        )
1052                        .port_id()
1053                        .clone(),
1054                        wirevalue::Any::serialize(&Undeliverable(envelope)).unwrap(),
1055                        Attrs::new(),
1056                    ),
1057                    monitored_return_handle(),
1058                );
1059            }
1060        });
1061
1062        let (stopped_tx, mut stopped_rx) = watch::channel(false);
1063        let join_handle = tokio::spawn(async move {
1064            let mut detached = false;
1065
1066            loop {
1067                if *stopped_rx.borrow_and_update() {
1068                    break Ok(());
1069                }
1070
1071                tokio::select! {
1072                    message = rx.recv() => {
1073                        match message {
1074                            // Relay the message to the port directly.
1075                            Ok(envelope) => self.post(envelope, return_handle.clone()),
1076
1077                            // Closed is a "graceful" error in this case.
1078                            // We simply stop serving.
1079                            Err(ChannelError::Closed) => break Ok(()),
1080                            Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1081                        }
1082                    }
1083                    result = stopped_rx.changed(), if !detached  => {
1084                        detached = result.is_err();
1085                        if detached {
1086                            tracing::debug!(
1087                                "the mailbox server is detached for Rx {}", rx.addr()
1088                            );
1089                        } else {
1090                            tracing::debug!(
1091                                "the mailbox server is stopped for Rx {}", rx.addr()
1092                            );
1093                        }
1094                    }
1095                }
1096            }
1097        });
1098
1099        MailboxServerHandle {
1100            join_handle,
1101            stopped_tx,
1102        }
1103    }
1104}
1105
1106impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1107
1108/// A mailbox server client that transmits messages on a Tx channel.
1109pub struct MailboxClient {
1110    // The unbounded sender.
1111    buffer: Buffer<MessageEnvelope>,
1112
1113    // To cancel monitoring tx health.
1114    _tx_monitoring: CancellationToken,
1115}
1116
1117impl fmt::Debug for MailboxClient {
1118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1119        f.debug_struct("MailboxClient")
1120            .field("buffer", &"<Buffer>")
1121            .finish()
1122    }
1123}
1124
1125impl MailboxClient {
1126    /// Create a new client that sends messages destined for a
1127    /// [`MailboxServer`] on the provided Tx channel.
1128    pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1129        let addr = tx.addr();
1130        let tx = Arc::new(tx);
1131        let tx_status = tx.status().clone();
1132        let tx_monitoring = CancellationToken::new();
1133        let buffer = Buffer::new(move |envelope, return_handle| {
1134            let tx = Arc::clone(&tx);
1135            let (return_channel, return_receiver) =
1136                oneshot::channel::<SendError<MessageEnvelope>>();
1137            // Set up for delivery failure.
1138            let return_handle_0 = return_handle.clone();
1139            tokio::spawn(async move {
1140                if let Ok(SendError {
1141                    error,
1142                    message,
1143                    reason,
1144                }) = return_receiver.await
1145                {
1146                    message.undeliverable(
1147                        DeliveryError::BrokenLink(format!(
1148                            "failed to enqueue in MailboxClient when processing buffer: {error} with reason {reason:?}"
1149                        )),
1150                        return_handle_0,
1151                    );
1152                }
1153            });
1154            // Send the message for transmission.
1155            tx.try_post(envelope, return_channel);
1156            future::ready(())
1157        });
1158        let this = Self {
1159            buffer,
1160            _tx_monitoring: tx_monitoring.clone(),
1161        };
1162        Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1163        this
1164    }
1165
1166    /// Convenience constructor, to set up a mailbox client that forwards messages
1167    /// to the provided address.
1168    pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1169        Ok(MailboxClient::new(channel::dial(addr)?))
1170    }
1171
1172    // Set up a watch for the tx's health.
1173    fn monitor_tx_health(
1174        mut rx: watch::Receiver<TxStatus>,
1175        cancel_token: CancellationToken,
1176        addr: ChannelAddr,
1177    ) {
1178        crate::init::get_runtime().spawn(async move {
1179            loop {
1180                tokio::select! {
1181                    changed = rx.changed() => {
1182                        if changed.is_err() || *rx.borrow() == TxStatus::Closed {
1183                            tracing::warn!("connection to {} lost", addr);
1184                            // TODO: Potential for supervision event
1185                            // interaction here.
1186                            break;
1187                        }
1188                    }
1189                    _ = cancel_token.cancelled() => {
1190                        break;
1191                    }
1192                }
1193            }
1194        });
1195    }
1196}
1197
1198impl MailboxSender for MailboxClient {
1199    #[hyperactor::instrument_infallible]
1200    fn post_unchecked(
1201        &self,
1202        envelope: MessageEnvelope,
1203        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1204    ) {
1205        tracing::event!(target:"messages", tracing::Level::TRACE,  "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1206        if let Err(mpsc::error::SendError((envelope, return_handle))) =
1207            self.buffer.send((envelope, return_handle))
1208        {
1209            let err = DeliveryError::BrokenLink(
1210                "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
1211            );
1212
1213            // Failed to enqueue.
1214            envelope.undeliverable(err, return_handle);
1215        }
1216    }
1217}
1218
1219/// Wrapper to turn `PortRef` into a `Sink`.
1220pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1221    cx: C,
1222    port: PortRef<M>,
1223}
1224
1225impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1226    /// Create new PortSink
1227    pub fn new(cx: C, port: PortRef<M>) -> Self {
1228        Self { cx, port }
1229    }
1230}
1231
1232impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1233    type Error = MailboxSenderError;
1234
1235    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1236        Poll::Ready(Ok(()))
1237    }
1238
1239    fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1240        self.port.send(&self.cx, item)
1241    }
1242
1243    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1244        Poll::Ready(Ok(()))
1245    }
1246
1247    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1248        Poll::Ready(Ok(()))
1249    }
1250}
1251
1252/// A mailbox coordinates message delivery to actors through typed
1253/// [`Port`]s associated with the mailbox.
1254#[derive(Clone, Debug)]
1255pub struct Mailbox {
1256    inner: Arc<State>,
1257}
1258
1259impl Mailbox {
1260    /// Create a new mailbox associated with the provided actor ID, using the provided
1261    /// forwarder for external destinations.
1262    pub fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
1263        Self {
1264            inner: Arc::new(State::new(actor_id, forwarder)),
1265        }
1266    }
1267
1268    /// Create a new detached mailbox associated with the provided actor ID.
1269    pub fn new_detached(actor_id: ActorId) -> Self {
1270        Self {
1271            inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1272        }
1273    }
1274
1275    /// The actor id associated with this mailbox.
1276    pub fn actor_id(&self) -> &ActorId {
1277        &self.inner.actor_id
1278    }
1279
1280    /// Open a new port that accepts M-typed messages. The returned
1281    /// port may be freely cloned, serialized, and passed around. The
1282    /// returned receiver should only be retained by the actor responsible
1283    /// for processing the delivered messages.
1284    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1285        let port_index = self.inner.allocate_port();
1286        let (sender, receiver) = mpsc::unbounded_channel::<M>();
1287        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1288        tracing::trace!(
1289            name = "open_port",
1290            "opening port for {} at {}",
1291            self.inner.actor_id,
1292            port_id
1293        );
1294        (
1295            PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1296            PortReceiver::new(receiver, port_id, /*coalesce=*/ false, self.clone()),
1297        )
1298    }
1299
1300    /// Bind this message's actor port to this actor's mailbox. This method is
1301    /// normally used:
1302    ///   1. when we need to intercept a message sent to a handler, and re-route
1303    ///      that message to the returned receiver;
1304    ///   2. mock this message's handler when it is not implemented for this actor
1305    ///      type, with the returned receiver.
1306    pub(crate) fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1307        let (handle, receiver) = self.open_port();
1308        handle.bind_actor_port();
1309        (handle, receiver)
1310    }
1311
1312    /// Open a new port with an accumulator with default reduce options.
1313    /// See [`open_accum_port_opts`] for more details.
1314    pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1315    where
1316        A: Accumulator + Send + Sync + 'static,
1317        A::Update: Message,
1318        A::State: Message + Default + Clone,
1319    {
1320        self.open_accum_port_opts(accum, None)
1321    }
1322
1323    /// Open a new port with an accumulator. This port accepts A::Update type
1324    /// messages, accumulate them into A::State with the given accumulator.
1325    /// The latest changed state can be received from the returned receiver as
1326    /// a single A::State message. If there is no new update, the receiver will
1327    /// not receive any message.
1328    ///
1329    /// If provided, reducer options are applied to reduce operations.
1330    pub fn open_accum_port_opts<A>(
1331        &self,
1332        accum: A,
1333        reducer_opts: Option<ReducerOpts>,
1334    ) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1335    where
1336        A: Accumulator + Send + Sync + 'static,
1337        A::Update: Message,
1338        A::State: Message + Default + Clone,
1339    {
1340        let port_index = self.inner.allocate_port();
1341        let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1342        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1343        let state = Mutex::new(A::State::default());
1344        let reducer_spec = accum.reducer_spec();
1345        let enqueue = move |_, update: A::Update| {
1346            let mut state = state.lock().unwrap();
1347            accum.accumulate(&mut state, update)?;
1348            let _ = sender.send(state.clone());
1349            Ok(())
1350        };
1351        (
1352            PortHandle {
1353                mailbox: self.clone(),
1354                port_index,
1355                sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1356                bound: Arc::new(OnceLock::new()),
1357                reducer_spec,
1358                reducer_opts,
1359            },
1360            PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()),
1361        )
1362    }
1363
1364    /// Open a port that accepts M-typed messages, using the provided function
1365    /// to enqueue.
1366    // TODO: consider making lifetime bound to Self instead.
1367    pub(crate) fn open_enqueue_port<M: Message>(
1368        &self,
1369        enqueue: impl Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1370    ) -> PortHandle<M> {
1371        PortHandle {
1372            mailbox: self.clone(),
1373            port_index: self.inner.allocate_port(),
1374            sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1375            bound: Arc::new(OnceLock::new()),
1376            reducer_spec: None,
1377            reducer_opts: None,
1378        }
1379    }
1380
1381    /// Open a new one-shot port that accepts M-typed messages. The
1382    /// returned port may be used to send a single message; ditto the
1383    /// receiver may receive a single message.
1384    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1385        let port_index = self.inner.allocate_port();
1386        let port_id = PortId(self.inner.actor_id.clone(), port_index);
1387        let (sender, receiver) = oneshot::channel::<M>();
1388        (
1389            OncePortHandle {
1390                mailbox: self.clone(),
1391                port_index,
1392                port_id: port_id.clone(),
1393                sender,
1394            },
1395            OncePortReceiver {
1396                receiver: Some(receiver),
1397                port_id,
1398                mailbox: self.clone(),
1399            },
1400        )
1401    }
1402
1403    fn error(&self, err: MailboxErrorKind) -> MailboxError {
1404        MailboxError::new(self.inner.actor_id.clone(), err)
1405    }
1406
1407    fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1408        let port_index = M::port();
1409        self.inner.ports.get(&port_index).and_then(|boxed| {
1410            boxed
1411                .as_any()
1412                .downcast_ref::<UnboundedSender<M>>()
1413                .map(|s| {
1414                    assert_eq!(
1415                        s.port_id,
1416                        self.actor_id().port_id(port_index),
1417                        "port_id mismatch in downcasted UnboundedSender"
1418                    );
1419                    s.sender.clone()
1420                })
1421        })
1422    }
1423
1424    /// Retrieve the bound undeliverable message port handle.
1425    pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1426        self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1427            .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1428    }
1429
1430    pub(crate) fn allocate_port(&self) -> u64 {
1431        self.inner.allocate_port()
1432    }
1433
1434    fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
1435        assert_eq!(
1436            handle.mailbox.actor_id(),
1437            self.actor_id(),
1438            "port does not belong to mailbox"
1439        );
1440
1441        // TODO: don't even allocate a port until the port is bound. Possibly
1442        // have handles explicitly staged (unbound, bound).
1443        let port_id = self.actor_id().port_id(handle.port_index);
1444        match self.inner.ports.entry(handle.port_index) {
1445            Entry::Vacant(entry) => {
1446                entry.insert(Box::new(UnboundedSender::new(
1447                    handle.sender.clone(),
1448                    port_id.clone(),
1449                )));
1450            }
1451            Entry::Occupied(_entry) => {}
1452        }
1453
1454        PortRef::attest(port_id)
1455    }
1456
1457    fn bind_to_actor_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
1458        assert_eq!(
1459            handle.mailbox.actor_id(),
1460            self.actor_id(),
1461            "port does not belong to mailbox"
1462        );
1463
1464        let port_index = M::port();
1465        let port_id = self.actor_id().port_id(port_index);
1466        match self.inner.ports.entry(port_index) {
1467            Entry::Vacant(entry) => {
1468                entry.insert(Box::new(UnboundedSender::new(
1469                    handle.sender.clone(),
1470                    port_id,
1471                )));
1472            }
1473            Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1474        }
1475    }
1476
1477    fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1478        let port_id = handle.port_id().clone();
1479        match self.inner.ports.entry(handle.port_index) {
1480            Entry::Vacant(entry) => {
1481                entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1482            }
1483            Entry::Occupied(_entry) => {}
1484        }
1485    }
1486
1487    pub(crate) fn bind_untyped(&self, port_id: &PortId, sender: UntypedUnboundedSender) {
1488        assert_eq!(
1489            port_id.actor_id(),
1490            self.actor_id(),
1491            "port does not belong to mailbox"
1492        );
1493
1494        match self.inner.ports.entry(port_id.index()) {
1495            Entry::Vacant(entry) => {
1496                entry.insert(Box::new(sender));
1497            }
1498            Entry::Occupied(_entry) => {}
1499        }
1500    }
1501}
1502
1503impl context::Mailbox for Mailbox {
1504    fn mailbox(&self) -> &Mailbox {
1505        self
1506    }
1507}
1508
1509// TODO: figure out what to do with these interfaces -- possibly these caps
1510// do not have to be private.
1511
1512/// Open a port given a capability.
1513pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1514    cx.mailbox().open_port()
1515}
1516
1517/// Open a one-shot port given a capability. This is a public method primarily to
1518/// enable macro-generated clients.
1519pub fn open_once_port<M: Message>(
1520    cx: &impl context::Mailbox,
1521) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1522    cx.mailbox().open_once_port()
1523}
1524
1525impl MailboxSender for Mailbox {
1526    /// Deliver a serialized message to the provided port ID. This method fails
1527    /// if the message does not deserialize into the expected type.
1528    fn post_unchecked(
1529        &self,
1530        envelope: MessageEnvelope,
1531        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1532    ) {
1533        metrics::MAILBOX_POSTS.add(
1534            1,
1535            hyperactor_telemetry::kv_pairs!(
1536                "actor_id" => envelope.sender.to_string(),
1537                "dest_actor_id" => envelope.dest.0.to_string(),
1538            ),
1539        );
1540        tracing::trace!(
1541            name = "post",
1542            actor_name = envelope.sender.name(),
1543            actor_id = envelope.sender.to_string(),
1544            "posting message to {}",
1545            envelope.dest
1546        );
1547
1548        if envelope.dest().actor_id() != &self.inner.actor_id {
1549            return self.inner.forwarder.post(envelope, return_handle);
1550        }
1551
1552        match self.inner.ports.entry(envelope.dest().index()) {
1553            Entry::Vacant(_) => {
1554                let err = DeliveryError::Unroutable(format!(
1555                    "port not bound in mailbox; port id: {}; message type: {}",
1556                    envelope.dest().index(),
1557                    envelope.data().typename().map_or_else(
1558                        || format!("unregistered type hash {}", envelope.data().typehash()),
1559                        |s| s.to_string(),
1560                    )
1561                ));
1562
1563                envelope.undeliverable(err, return_handle);
1564            }
1565            Entry::Occupied(entry) => {
1566                let (metadata, data) = envelope.open();
1567                let MessageMetadata {
1568                    headers,
1569                    sender,
1570                    dest,
1571                    errors: metadata_errors,
1572                    ttl,
1573                    return_undeliverable,
1574                } = metadata;
1575
1576                // We use the entry API here so that we can remove the
1577                // entry while holding an (entry) reference. The DashMap
1578                // documentation suggests that deadlocks are possible
1579                // "when holding any sort of reference into the map",
1580                // but surely this applies only to the same thread? This
1581                // would also imply we have to be careful holding any
1582                // sort of reference across .await points.
1583                match entry.get().send_serialized(headers, data) {
1584                    Ok(false) => {
1585                        entry.remove();
1586                    }
1587                    Ok(true) => (),
1588                    Err(SerializedSenderError {
1589                        data,
1590                        error: sender_error,
1591                        headers,
1592                    }) => {
1593                        let err = DeliveryError::Mailbox(format!("{}", sender_error));
1594
1595                        MessageEnvelope::seal(
1596                            MessageMetadata {
1597                                headers,
1598                                sender,
1599                                dest,
1600                                errors: metadata_errors,
1601                                ttl,
1602                                return_undeliverable,
1603                            },
1604                            data,
1605                        )
1606                        .undeliverable(err, return_handle)
1607                    }
1608                }
1609            }
1610        }
1611    }
1612}
1613
1614/// A port to which M-typed messages can be delivered. Ports may be
1615/// serialized to be sent to other actors. However, when a port is
1616/// deserialized, it may no longer be used to send messages directly
1617/// to a mailbox since it is no longer associated with a local mailbox
1618/// ([`Mailbox::send`] will fail). However, the runtime may accept
1619/// remote Ports, and arrange for these messages to be delivered
1620/// indirectly through inter-node message passing.
1621#[derive(Debug)]
1622pub struct PortHandle<M: Message> {
1623    mailbox: Mailbox,
1624    port_index: u64,
1625    sender: UnboundedPortSender<M>,
1626    // We would like this to be a Arc<OnceLock<PortRef<M>>>, but we cannot
1627    // write down the type PortRef<M> (M: Message), even though we cannot
1628    // legally construct such a value without M: RemoteMessage. We could consider
1629    // making PortRef<M> valid for M: Message, but constructible only for
1630    // M: RemoteMessage, but the guarantees offered by the impossibilty of even
1631    // writing down the type are appealing.
1632    bound: Arc<OnceLock<PortId>>,
1633    // Typehash of an optional reducer. When it's defined, we include it in port
1634    /// references to optionally enable incremental accumulation.
1635    reducer_spec: Option<ReducerSpec>,
1636    /// Reduction options. If unspecified, we use `ReducerOpts::default`.
1637    reducer_opts: Option<ReducerOpts>,
1638}
1639
1640impl<M: Message> PortHandle<M> {
1641    fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1642        Self {
1643            mailbox,
1644            port_index,
1645            sender,
1646            bound: Arc::new(OnceLock::new()),
1647            reducer_spec: None,
1648            reducer_opts: None,
1649        }
1650    }
1651
1652    fn location(&self) -> PortLocation {
1653        match self.bound.get() {
1654            Some(port_id) => PortLocation::Bound(port_id.clone()),
1655            None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1656        }
1657    }
1658
1659    /// Send a message to this port.
1660    pub fn send(&self, message: M) -> Result<(), MailboxSenderError> {
1661        let mut headers = Attrs::new();
1662
1663        crate::mailbox::headers::set_send_timestamp(&mut headers);
1664        crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1665
1666        self.sender.send(headers, message).map_err(|err| {
1667            MailboxSenderError::new_unbound::<M>(
1668                self.mailbox.actor_id().clone(),
1669                MailboxSenderErrorKind::Other(err),
1670            )
1671        })
1672    }
1673
1674    /// A contravariant map: using the provided function to translate
1675    /// `R`-typed messages to `M`-typed ones, delivered on this port.
1676    pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
1677    where
1678        R: Message,
1679        F: Fn(R) -> M + Send + Sync + 'static,
1680    {
1681        let port_index = self.mailbox.inner.allocate_port();
1682        let sender = self.sender.clone();
1683        PortHandle::new(
1684            self.mailbox.clone(),
1685            port_index,
1686            UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
1687                sender.send(headers, unmap(value))
1688            })),
1689        )
1690    }
1691}
1692
1693impl<M: RemoteMessage> PortHandle<M> {
1694    /// Bind this port, making it accessible to remote actors.
1695    pub fn bind(&self) -> PortRef<M> {
1696        PortRef::attest_reducible(
1697            self.bound
1698                .get_or_init(|| self.mailbox.bind(self).port_id().clone())
1699                .clone(),
1700            self.reducer_spec.clone(),
1701            self.reducer_opts.clone(),
1702        )
1703    }
1704
1705    /// Bind to this message's actor port. This method will panic if the handle
1706    /// is already bound.
1707    ///
1708    /// This is used by [`actor::Binder`] implementations to bind actor refs.
1709    /// This is not intended for general use.
1710    pub(crate) fn bind_actor_port(&self) {
1711        let port_id = self.mailbox.actor_id().port_id(M::port());
1712        self.bound
1713            .set(port_id)
1714            .map_err(|p| {
1715                format!(
1716                    "could not bind port handle {} as {p}: already bound",
1717                    self.port_index
1718                )
1719            })
1720            .unwrap();
1721        self.mailbox.bind_to_actor_port(self);
1722    }
1723}
1724
1725impl<M: Message> Clone for PortHandle<M> {
1726    fn clone(&self) -> Self {
1727        Self {
1728            mailbox: self.mailbox.clone(),
1729            port_index: self.port_index,
1730            sender: self.sender.clone(),
1731            bound: self.bound.clone(),
1732            reducer_spec: self.reducer_spec.clone(),
1733            reducer_opts: self.reducer_opts.clone(),
1734        }
1735    }
1736}
1737
1738impl<M: Message> fmt::Display for PortHandle<M> {
1739    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1740        fmt::Display::fmt(&self.location(), f)
1741    }
1742}
1743
1744/// A one-shot port handle to which M-typed messages can be delivered.
1745#[derive(Debug)]
1746pub struct OncePortHandle<M: Message> {
1747    mailbox: Mailbox,
1748    port_index: u64,
1749    port_id: PortId,
1750    sender: oneshot::Sender<M>,
1751}
1752
1753impl<M: Message> OncePortHandle<M> {
1754    /// This port's ID.
1755    // TODO: make value
1756    pub fn port_id(&self) -> &PortId {
1757        &self.port_id
1758    }
1759
1760    /// Send a message to this port. The send operation will consume the
1761    /// port handle, as the port accepts at most one message.
1762    pub fn send(self, message: M) -> Result<(), MailboxSenderError> {
1763        let actor_id = self.mailbox.actor_id().clone();
1764        self.sender.send(message).map_err(|_| {
1765            // Here, the value is returned when the port is
1766            // closed.  We should consider having a similar
1767            // API for send_once, though arguably it makes less
1768            // sense in this context.
1769            MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1770        })?;
1771        Ok(())
1772    }
1773}
1774
1775impl<M: RemoteMessage> OncePortHandle<M> {
1776    /// Turn this handle into a ref that may be passed to
1777    /// a remote actor. The remote actor can then use the
1778    /// ref to send a message to the port. Creating a ref also
1779    /// binds the port, so that it is remotely writable.
1780    pub fn bind(self) -> OncePortRef<M> {
1781        let port_id = self.port_id().clone();
1782        self.mailbox.clone().bind_once(self);
1783        OncePortRef::attest(port_id)
1784    }
1785}
1786
1787impl<M: Message> fmt::Display for OncePortHandle<M> {
1788    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1789        fmt::Display::fmt(&self.port_id(), f)
1790    }
1791}
1792
1793/// A receiver of M-typed messages, used by actors to receive messages
1794/// on open ports.
1795#[derive(Debug)]
1796pub struct PortReceiver<M> {
1797    receiver: mpsc::UnboundedReceiver<M>,
1798    port_id: PortId,
1799    /// When multiple messages are put in channel, only receive the latest one
1800    /// if coalesce is true. Other messages will be discarded.
1801    coalesce: bool,
1802    /// State is used to remove the port from service when the receiver
1803    /// is dropped.
1804    mailbox: Mailbox,
1805}
1806
1807impl<M> PortReceiver<M> {
1808    fn new(
1809        receiver: mpsc::UnboundedReceiver<M>,
1810        port_id: PortId,
1811        coalesce: bool,
1812        mailbox: Mailbox,
1813    ) -> Self {
1814        Self {
1815            receiver,
1816            port_id,
1817            coalesce,
1818            mailbox,
1819        }
1820    }
1821
1822    /// Tries to receive the next value for this receiver.
1823    /// This function returns `Ok(None)` if the receiver is empty
1824    /// and returns a MailboxError if the receiver is disconnected.
1825    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxError`.
1826    pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
1827        let mut next = self.receiver.try_recv();
1828        // To coalesce, drain the mpsc queue and only keep the last one.
1829        if self.coalesce
1830            && let Some(latest) = self.drain().pop()
1831        {
1832            next = Ok(latest);
1833        }
1834        match next {
1835            Ok(msg) => Ok(Some(msg)),
1836            Err(mpsc::error::TryRecvError::Empty) => Ok(None),
1837            Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
1838                self.actor_id().clone(),
1839                MailboxErrorKind::Closed,
1840            )),
1841        }
1842    }
1843
1844    /// Receive the next message from the port corresponding with this
1845    /// receiver.
1846    pub async fn recv(&mut self) -> Result<M, MailboxError> {
1847        let mut next = self.receiver.recv().await;
1848        // To coalesce, get the last message from the queue if there are
1849        // more on the mspc queue.
1850        if self.coalesce
1851            && let Some(latest) = self.drain().pop()
1852        {
1853            next = Some(latest);
1854        }
1855        next.ok_or(MailboxError::new(
1856            self.actor_id().clone(),
1857            MailboxErrorKind::Closed,
1858        ))
1859    }
1860
1861    /// Drains all available messages from the port.
1862    pub fn drain(&mut self) -> Vec<M> {
1863        let mut drained: Vec<M> = Vec::new();
1864        while let Ok(msg) = self.receiver.try_recv() {
1865            // To coalesce, discard the old message if there is any.
1866            if self.coalesce {
1867                drained.pop();
1868            }
1869            drained.push(msg);
1870        }
1871        drained
1872    }
1873
1874    fn port(&self) -> u64 {
1875        self.port_id.1
1876    }
1877
1878    fn actor_id(&self) -> &ActorId {
1879        &self.port_id.0
1880    }
1881}
1882
1883impl<M> Drop for PortReceiver<M> {
1884    fn drop(&mut self) {
1885        // MARIUS: do we need to tombstone these? or should we
1886        // error out if we have removed the receiver before serializing the port ref?
1887        // ("no longer live")?
1888        self.mailbox.inner.ports.remove(&self.port());
1889    }
1890}
1891
1892impl<M> Stream for PortReceiver<M> {
1893    type Item = Result<M, MailboxError>;
1894
1895    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1896        std::pin::pin!(self.recv()).poll(cx).map(Some)
1897    }
1898}
1899
1900/// A receiver of M-typed messages from [`OncePort`]s.
1901pub struct OncePortReceiver<M> {
1902    receiver: Option<oneshot::Receiver<M>>,
1903    port_id: PortId,
1904
1905    /// Mailbox is used to remove the port from service when the receiver
1906    /// is dropped.
1907    mailbox: Mailbox,
1908}
1909
1910impl<M> OncePortReceiver<M> {
1911    /// Receive message from the one-shot port associated with this
1912    /// receiver.  Recv consumes the receiver: it is no longer valid
1913    /// after this call.
1914    pub async fn recv(mut self) -> Result<M, MailboxError> {
1915        std::mem::take(&mut self.receiver)
1916            .unwrap()
1917            .await
1918            .map_err(|err| {
1919                MailboxError::new(
1920                    self.actor_id().clone(),
1921                    MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
1922                )
1923            })
1924    }
1925
1926    fn port(&self) -> u64 {
1927        self.port_id.1
1928    }
1929
1930    fn actor_id(&self) -> &ActorId {
1931        &self.port_id.0
1932    }
1933}
1934
1935impl<M> Drop for OncePortReceiver<M> {
1936    fn drop(&mut self) {
1937        // MARIUS: do we need to tombstone these? or should we
1938        // error out if we have removed the receiver before serializing the port ref?
1939        // ("no longer live")?
1940        self.mailbox.inner.ports.remove(&self.port());
1941    }
1942}
1943
1944/// Error that that occur during SerializedSender's send operation.
1945pub struct SerializedSenderError {
1946    /// The headers associated with the message.
1947    pub headers: Attrs,
1948    /// The message was tried to send.
1949    pub data: wirevalue::Any,
1950    /// The mailbox sender error that occurred.
1951    pub error: MailboxSenderError,
1952}
1953
1954/// SerializedSender encapsulates senders:
1955///   - It performs type erasure (and thus it is object-safe).
1956///   - It abstracts over [`Port`]s and [`OncePort`]s, by dynamically tracking the
1957///     validity of the underlying port.
1958trait SerializedSender: Send + Sync {
1959    /// Enables downcasting from `&dyn SerializedSender` to concrete
1960    /// types.
1961    ///
1962    /// Used by `Mailbox::lookup_sender` to downcast to
1963    /// `&UnboundedSender<M>` via `Any::downcast_ref`.
1964    fn as_any(&self) -> &dyn Any;
1965
1966    /// Send a serialized message. SerializedSender will deserialize the
1967    /// message (failing if it fails to deserialize), and then send the
1968    /// resulting message on the underlying port.
1969    ///
1970    /// Send_serialized returns true whenever the port remains valid
1971    /// after the send operation.
1972    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SerializedSender`.
1973    fn send_serialized(
1974        &self,
1975        headers: Attrs,
1976        serialized: wirevalue::Any,
1977    ) -> Result<bool, SerializedSenderError>;
1978}
1979
1980/// A sender to an M-typed unbounded port.
1981enum UnboundedPortSender<M: Message> {
1982    /// Send directly to the mpsc queue.
1983    Mpsc(mpsc::UnboundedSender<M>),
1984    /// Use the provided function to enqueue the item.
1985    Func(Arc<dyn Fn(Attrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
1986}
1987
1988impl<M: Message> UnboundedPortSender<M> {
1989    fn send(&self, headers: Attrs, message: M) -> Result<(), anyhow::Error> {
1990        match self {
1991            Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
1992            Self::Func(func) => func(headers, message),
1993        }
1994    }
1995}
1996
1997// We implement Clone manually as derive(Clone) places unnecessarily
1998// strict bounds on the type parameter M.
1999impl<M: Message> Clone for UnboundedPortSender<M> {
2000    fn clone(&self) -> Self {
2001        match self {
2002            Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
2003            Self::Func(func) => Self::Func(func.clone()),
2004        }
2005    }
2006}
2007
2008impl<M: Message> Debug for UnboundedPortSender<M> {
2009    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2010        match self {
2011            Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
2012            Self::Func(_) => f
2013                .debug_tuple("UnboundedPortSender::Func")
2014                .field(&"..")
2015                .finish(),
2016        }
2017    }
2018}
2019
2020struct UnboundedSender<M: Message> {
2021    sender: UnboundedPortSender<M>,
2022    port_id: PortId,
2023}
2024
2025impl<M: Message> UnboundedSender<M> {
2026    /// Create a new UnboundedSender encapsulating the provided
2027    /// sender.
2028    fn new(sender: UnboundedPortSender<M>, port_id: PortId) -> Self {
2029        Self { sender, port_id }
2030    }
2031
2032    fn send(&self, headers: Attrs, message: M) -> Result<(), MailboxSenderError> {
2033        self.sender.send(headers, message).map_err(|err| {
2034            MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
2035        })
2036    }
2037}
2038
2039// Clone is implemented explicitly because the derive macro demands M:
2040// Clone directly. In this case, it isn't needed because Arc<T> can
2041// clone for any T.
2042impl<M: Message> Clone for UnboundedSender<M> {
2043    fn clone(&self) -> Self {
2044        Self {
2045            sender: self.sender.clone(),
2046            port_id: self.port_id.clone(),
2047        }
2048    }
2049}
2050
2051impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
2052    fn as_any(&self) -> &dyn Any {
2053        self
2054    }
2055
2056    fn send_serialized(
2057        &self,
2058        headers: Attrs,
2059        serialized: wirevalue::Any,
2060    ) -> Result<bool, SerializedSenderError> {
2061        // Here, the stack ensures that this port is only instantiated for M-typed messages.
2062        // This does not protect against bad senders (e.g., encoding wrongly-typed messages),
2063        // but it is required as we have some usages that rely on representational equivalence
2064        // to provide type indexing, specifically in `IndexedErasedUnbound` which is used to
2065        // support port aggregation.
2066        match serialized.deserialized_unchecked() {
2067            Ok(message) => {
2068                self.sender.send(headers.clone(), message).map_err(|err| {
2069                    SerializedSenderError {
2070                        data: serialized,
2071                        error: MailboxSenderError::new_bound(
2072                            self.port_id.clone(),
2073                            MailboxSenderErrorKind::Other(err),
2074                        ),
2075                        headers,
2076                    }
2077                })?;
2078
2079                Ok(true)
2080            }
2081            Err(err) => Err(SerializedSenderError {
2082                data: serialized,
2083                error: MailboxSenderError::new_bound(
2084                    self.port_id.clone(),
2085                    MailboxSenderErrorKind::Deserialize(M::typename(), err),
2086                ),
2087                headers,
2088            }),
2089        }
2090    }
2091}
2092
2093/// OnceSender encapsulates an underlying one-shot sender, dynamically
2094/// tracking its validity.
2095#[derive(Debug)]
2096struct OnceSender<M: Message> {
2097    sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
2098    port_id: PortId,
2099}
2100
2101impl<M: Message> OnceSender<M> {
2102    /// Create a new OnceSender encapsulating the provided one-shot
2103    /// sender.
2104    fn new(sender: oneshot::Sender<M>, port_id: PortId) -> Self {
2105        Self {
2106            sender: Arc::new(Mutex::new(Some(sender))),
2107            port_id,
2108        }
2109    }
2110
2111    fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2112        // TODO: we should replace the sender on error
2113        match self.sender.lock().unwrap().take() {
2114            None => Err(MailboxSenderError::new_bound(
2115                self.port_id.clone(),
2116                MailboxSenderErrorKind::Closed,
2117            )),
2118            Some(sender) => {
2119                sender.send(message).map_err(|_| {
2120                    // Here, the value is returned when the port is
2121                    // closed.  We should consider having a similar
2122                    // API for send_once, though arguably it makes less
2123                    // sense in this context.
2124                    MailboxSenderError::new_bound(
2125                        self.port_id.clone(),
2126                        MailboxSenderErrorKind::Closed,
2127                    )
2128                })?;
2129                Ok(false)
2130            }
2131        }
2132    }
2133}
2134
2135// Clone is implemented explicitly because the derive macro demands M:
2136// Clone directly. In this case, it isn't needed because Arc<T> can
2137// clone for any T.
2138impl<M: Message> Clone for OnceSender<M> {
2139    fn clone(&self) -> Self {
2140        Self {
2141            sender: self.sender.clone(),
2142            port_id: self.port_id.clone(),
2143        }
2144    }
2145}
2146
2147impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2148    fn as_any(&self) -> &dyn Any {
2149        self
2150    }
2151
2152    fn send_serialized(
2153        &self,
2154        headers: Attrs,
2155        serialized: wirevalue::Any,
2156    ) -> Result<bool, SerializedSenderError> {
2157        match serialized.deserialized() {
2158            Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2159                data: serialized,
2160                error: e,
2161                headers,
2162            }),
2163            Err(err) => Err(SerializedSenderError {
2164                data: serialized,
2165                error: MailboxSenderError::new_bound(
2166                    self.port_id.clone(),
2167                    MailboxSenderErrorKind::Deserialize(M::typename(), err),
2168                ),
2169                headers,
2170            }),
2171        }
2172    }
2173}
2174
2175/// Use the provided function to send untyped messages (i.e. Any objects).
2176pub(crate) struct UntypedUnboundedSender {
2177    pub(crate) sender:
2178        Box<dyn Fn(wirevalue::Any) -> Result<(), (wirevalue::Any, anyhow::Error)> + Send + Sync>,
2179    pub(crate) port_id: PortId,
2180}
2181
2182impl SerializedSender for UntypedUnboundedSender {
2183    fn as_any(&self) -> &dyn Any {
2184        self
2185    }
2186
2187    fn send_serialized(
2188        &self,
2189        headers: Attrs,
2190        serialized: wirevalue::Any,
2191    ) -> Result<bool, SerializedSenderError> {
2192        (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2193            data,
2194            error: MailboxSenderError::new_bound(
2195                self.port_id.clone(),
2196                MailboxSenderErrorKind::Other(err),
2197            ),
2198            headers,
2199        })?;
2200
2201        Ok(true)
2202    }
2203}
2204
2205/// State is the internal state of the mailbox.
2206struct State {
2207    /// The ID of the mailbox owner.
2208    actor_id: ActorId,
2209
2210    // insert if it's serializable; otherwise don't.
2211    /// The set of active ports in the mailbox. All currently
2212    /// allocated ports are
2213    ports: DashMap<u64, Box<dyn SerializedSender>>,
2214
2215    /// The next port ID to allocate.
2216    next_port: AtomicU64,
2217
2218    /// The forwarder for this mailbox.
2219    forwarder: BoxedMailboxSender,
2220}
2221
2222impl State {
2223    /// Create a new state with the provided owning ActorId.
2224    fn new(actor_id: ActorId, forwarder: BoxedMailboxSender) -> Self {
2225        Self {
2226            actor_id,
2227            ports: DashMap::new(),
2228            // The first 1024 ports are allocated to actor handlers.
2229            // Other port IDs are ephemeral.
2230            next_port: AtomicU64::new(USER_PORT_OFFSET),
2231            forwarder,
2232        }
2233    }
2234
2235    /// Allocate a fresh port.
2236    fn allocate_port(&self) -> u64 {
2237        self.next_port.fetch_add(1, Ordering::SeqCst)
2238    }
2239}
2240
2241impl fmt::Debug for State {
2242    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2243        f.debug_struct("State")
2244            .field("actor_id", &self.actor_id)
2245            .field(
2246                "open_ports",
2247                &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2248            )
2249            .field("next_port", &self.next_port)
2250            .finish()
2251    }
2252}
2253
2254// TODO: mux based on some parameterized type. (mux key).
2255/// An in-memory mailbox muxer. This is used to route messages to
2256/// different underlying senders.
2257#[derive(Clone)]
2258pub struct MailboxMuxer {
2259    mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2260}
2261
2262impl Default for MailboxMuxer {
2263    fn default() -> Self {
2264        Self::new()
2265    }
2266}
2267
2268impl MailboxMuxer {
2269    /// Create a new, empty, muxer.
2270    pub fn new() -> Self {
2271        Self {
2272            mailboxes: Arc::new(DashMap::new()),
2273        }
2274    }
2275
2276    /// Route messages destined for the provided actor id to the provided
2277    /// sender. Returns false if there is already a sender associated
2278    /// with the actor. In this case, the sender is not replaced, and
2279    /// the caller must [`MailboxMuxer::unbind`] it first.
2280    pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
2281        match self.mailboxes.entry(actor_id) {
2282            Entry::Occupied(_) => false,
2283            Entry::Vacant(entry) => {
2284                entry.insert(Box::new(sender));
2285                true
2286            }
2287        }
2288    }
2289
2290    /// Convenience function to bind a mailbox.
2291    pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2292        self.bind(mailbox.actor_id().clone(), mailbox)
2293    }
2294
2295    /// Unbind the sender associated with the provided actor ID. After
2296    /// unbinding, the muxer will no longer be able to send messages to
2297    /// that actor.
2298    pub(crate) fn unbind(&self, actor_id: &ActorId) {
2299        self.mailboxes.remove(actor_id);
2300    }
2301
2302    /// Returns a list of all actors bound to this muxer. Useful in debugging.
2303    pub fn bound_actors(&self) -> Vec<ActorId> {
2304        self.mailboxes.iter().map(|e| e.key().clone()).collect()
2305    }
2306}
2307
2308impl MailboxSender for MailboxMuxer {
2309    fn post_unchecked(
2310        &self,
2311        envelope: MessageEnvelope,
2312        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2313    ) {
2314        let dest_actor_id = envelope.dest().actor_id();
2315        match self.mailboxes.get(envelope.dest().actor_id()) {
2316            None => {
2317                let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2318                envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2319            }
2320            Some(sender) => sender.post(envelope, return_handle),
2321        }
2322    }
2323}
2324
2325/// MailboxRouter routes messages to the sender that is bound to its
2326/// nearest prefix.
2327#[derive(Clone)]
2328pub struct MailboxRouter {
2329    entries: Arc<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2330}
2331
2332impl Default for MailboxRouter {
2333    fn default() -> Self {
2334        Self::new()
2335    }
2336}
2337
2338impl MailboxRouter {
2339    /// Create a new, empty router.
2340    pub fn new() -> Self {
2341        Self {
2342            entries: Arc::new(RwLock::new(BTreeMap::new())),
2343        }
2344    }
2345
2346    /// Downgrade this router to a [`WeakMailboxRouter`].
2347    pub fn downgrade(&self) -> WeakMailboxRouter {
2348        WeakMailboxRouter(Arc::downgrade(&self.entries))
2349    }
2350
2351    /// Returns a new router that will first attempt to find a route for the message
2352    /// in the router's table; otherwise post the message to the provided fallback
2353    /// sender.
2354    pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2355        FallbackMailboxRouter {
2356            router: self.clone(),
2357            default,
2358        }
2359    }
2360
2361    /// Bind the provided sender to the given reference. The destination
2362    /// is treated as a prefix to which messages can be routed, and
2363    /// messages are routed to their longest matching prefix.
2364    pub fn bind(&self, dest: Reference, sender: impl MailboxSender + 'static) {
2365        let mut w = self.entries.write().unwrap();
2366        w.insert(dest, Arc::new(sender));
2367    }
2368
2369    fn sender(&self, actor_id: &ActorId) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2370        match self
2371            .entries
2372            .read()
2373            .unwrap()
2374            .lower_bound(Excluded(&actor_id.clone().into()))
2375            .prev()
2376        {
2377            None => None,
2378            Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2379                Some(sender.clone())
2380            }
2381            Some(_) => None,
2382        }
2383    }
2384}
2385
2386impl MailboxSender for MailboxRouter {
2387    fn post_unchecked(
2388        &self,
2389        envelope: MessageEnvelope,
2390        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2391    ) {
2392        match self.sender(envelope.dest().actor_id()) {
2393            None => envelope.undeliverable(
2394                DeliveryError::Unroutable(
2395                    "no destination found for actor in routing table".to_string(),
2396                ),
2397                return_handle,
2398            ),
2399            Some(sender) => sender.post(envelope, return_handle),
2400        }
2401    }
2402}
2403
2404#[derive(Clone)]
2405struct FallbackMailboxRouter {
2406    router: MailboxRouter,
2407    default: BoxedMailboxSender,
2408}
2409
2410impl MailboxSender for FallbackMailboxRouter {
2411    fn post_unchecked(
2412        &self,
2413        envelope: MessageEnvelope,
2414        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2415    ) {
2416        match self.router.sender(envelope.dest().actor_id()) {
2417            Some(sender) => sender.post(envelope, return_handle),
2418            None => self.default.post(envelope, return_handle),
2419        }
2420    }
2421}
2422
2423/// A version of [`MailboxRouter`] that holds a weak reference to the underlying
2424/// state. This allows router references to be circular: an entity holding a reference
2425/// to the router may also contain the router itself.
2426///
2427/// TODO: this currently holds a weak reference to the entire router. This helps
2428/// prevent cycle leaks, but can cause excess memory usage as the cycle is at
2429/// the granularity of each entry. Possibly the router should allow weak references
2430/// on a per-entry basis.
2431#[derive(Debug, Clone)]
2432pub struct WeakMailboxRouter(
2433    Weak<RwLock<BTreeMap<Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2434);
2435
2436impl WeakMailboxRouter {
2437    /// Upgrade the weak router to a strong reference router.
2438    pub fn upgrade(&self) -> Option<MailboxRouter> {
2439        self.0.upgrade().map(|entries| MailboxRouter { entries })
2440    }
2441}
2442
2443impl MailboxSender for WeakMailboxRouter {
2444    fn post_unchecked(
2445        &self,
2446        envelope: MessageEnvelope,
2447        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2448    ) {
2449        match self.upgrade() {
2450            Some(router) => router.post(envelope, return_handle),
2451            None => envelope.undeliverable(
2452                DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2453                return_handle,
2454            ),
2455        }
2456    }
2457}
2458
2459/// A dynamic mailbox router that supports remote delivery.
2460///
2461/// `DialMailboxRouter` maintains a runtime address book mapping
2462/// references to `ChannelAddr`s. It holds a cache of active
2463/// connections and forwards messages to the appropriate
2464/// `MailboxClient`.
2465///
2466/// If a message destination is not bound, but is a "direct mode" address
2467/// (i.e., its proc id contains the channel address through which the proc
2468/// is reachable), then DialMailboxRouter dials the proc directly.
2469///
2470/// Messages sent to unknown destinations are routed to the `default`
2471/// sender, if present.
2472#[derive(Clone)]
2473pub struct DialMailboxRouter {
2474    address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,
2475    sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2476
2477    // The default sender, to which messages for unknown recipients
2478    // are sent. (This is like a default route in a routing table.)
2479    default: BoxedMailboxSender,
2480
2481    // When true, only dial direct-addressed procs if their transport
2482    // type is remote. Otherwise, fall back to the default sender.
2483    direct_addressed_remote_only: bool,
2484}
2485
2486impl Default for DialMailboxRouter {
2487    fn default() -> Self {
2488        Self::new()
2489    }
2490}
2491
2492impl DialMailboxRouter {
2493    /// Create a new [`DialMailboxRouter`] with an empty routing table.
2494    pub fn new() -> Self {
2495        Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2496    }
2497
2498    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2499    /// and a default sender. Any message with an unknown destination is
2500    /// dispatched on this default sender, unless the destination is
2501    /// direct-addressed, in which case it is dialed directly.
2502    pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2503        Self {
2504            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2505            sender_cache: Arc::new(DashMap::new()),
2506            default,
2507            direct_addressed_remote_only: false,
2508        }
2509    }
2510
2511    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2512    /// and a default sender. Any message with an unknown destination is
2513    /// dispatched on this default sender, unless the destination is
2514    /// direct-addressed *and* has a remote channel transport type.
2515    pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
2516        Self {
2517            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2518            sender_cache: Arc::new(DashMap::new()),
2519            default,
2520            direct_addressed_remote_only: true,
2521        }
2522    }
2523
2524    /// Binds a [`Reference`] to a [`ChannelAddr`], replacing any
2525    /// existing binding.
2526    ///
2527    /// If the address changes, the old sender is evicted from the
2528    /// cache to ensure fresh routing on next use.
2529    pub fn bind(&self, dest: Reference, addr: ChannelAddr) {
2530        if let Ok(mut w) = self.address_book.write() {
2531            if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
2532                && old_addr != addr
2533            {
2534                tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2535                self.sender_cache.remove(&old_addr);
2536            }
2537        } else {
2538            tracing::error!("address book poisoned during bind of {:?}", dest);
2539        }
2540    }
2541
2542    /// Removes all address mappings with the given prefix from the
2543    /// router.
2544    ///
2545    /// Also evicts any corresponding cached senders to prevent reuse
2546    /// of stale connections.
2547    pub fn unbind(&self, dest: &Reference) {
2548        if let Ok(mut w) = self.address_book.write() {
2549            let to_remove: Vec<(Reference, ChannelAddr)> = w
2550                .range(dest..)
2551                .take_while(|(key, _)| dest.is_prefix_of(key))
2552                .map(|(key, addr)| (key.clone(), addr.clone()))
2553                .collect();
2554
2555            for (key, addr) in to_remove {
2556                tracing::info!("unbinding {:?} from {:?}", key, addr);
2557                w.remove(&key);
2558                self.sender_cache.remove(&addr);
2559            }
2560        } else {
2561            tracing::error!("address book poisoned during unbind of {:?}", dest);
2562        }
2563    }
2564
2565    /// Lookup an actor's channel in the router's address bok.
2566    pub fn lookup_addr(&self, actor_id: &ActorId) -> Option<ChannelAddr> {
2567        let address_book = self.address_book.read().unwrap();
2568        let found = address_book
2569            .lower_bound(Excluded(&actor_id.clone().into()))
2570            .prev();
2571
2572        // First try to look up the address in our address book; failing that,
2573        // try to resolve direct procs.
2574        if let Some((key, addr)) = found
2575            && key.is_prefix_of(&actor_id.clone().into())
2576        {
2577            Some(addr.clone())
2578        } else if actor_id.proc_id().is_direct() {
2579            let (addr, _name) = actor_id.proc_id().clone().into_direct().unwrap();
2580            if self.direct_addressed_remote_only {
2581                addr.transport().is_remote().then_some(addr)
2582            } else {
2583                Some(addr)
2584            }
2585        } else {
2586            None
2587        }
2588    }
2589
2590    /// Return all covering prefixes of this router. That is, all references that are not
2591    /// prefixed by another reference in the routing table
2592    pub fn prefixes(&self) -> BTreeSet<Reference> {
2593        let addrs = self.address_book.read().unwrap();
2594        let mut prefixes: BTreeSet<Reference> = BTreeSet::new();
2595        for (reference, _) in addrs.iter() {
2596            match prefixes.lower_bound(Excluded(reference)).peek_prev() {
2597                Some(candidate) if candidate.is_prefix_of(reference) => (),
2598                _ => {
2599                    prefixes.insert(reference.clone());
2600                }
2601            }
2602        }
2603
2604        prefixes
2605    }
2606
2607    fn dial(
2608        &self,
2609        addr: &ChannelAddr,
2610        actor_id: &ActorId,
2611    ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2612        // Get the sender. Create it if needed. Do not send the
2613        // messages inside this block so we do not hold onto the
2614        // reference of the dashmap entries.
2615        match self.sender_cache.entry(addr.clone()) {
2616            Entry::Occupied(entry) => Ok(entry.get().clone()),
2617            Entry::Vacant(entry) => {
2618                let tx = channel::dial(addr.clone()).map_err(|err| {
2619                    MailboxSenderError::new_unbound_type(
2620                        actor_id.clone(),
2621                        MailboxSenderErrorKind::Channel(err),
2622                        "unknown",
2623                    )
2624                })?;
2625                let sender = MailboxClient::new(tx);
2626                Ok(entry.insert(Arc::new(sender)).value().clone())
2627            }
2628        }
2629    }
2630}
2631
2632impl MailboxSender for DialMailboxRouter {
2633    fn post_unchecked(
2634        &self,
2635        envelope: MessageEnvelope,
2636        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2637    ) {
2638        let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2639            self.default.post(envelope, return_handle);
2640            return;
2641        };
2642
2643        match self.dial(&addr, envelope.dest().actor_id()) {
2644            Err(err) => envelope.undeliverable(
2645                DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2646                return_handle,
2647            ),
2648            Ok(sender) => sender.post(envelope, return_handle),
2649        }
2650    }
2651}
2652
2653/// A MailboxSender that reports any envelope as undeliverable due to
2654/// routing failure.
2655#[derive(Debug)]
2656pub struct UnroutableMailboxSender;
2657
2658impl MailboxSender for UnroutableMailboxSender {
2659    fn post_unchecked(
2660        &self,
2661        envelope: MessageEnvelope,
2662        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2663    ) {
2664        envelope.undeliverable(
2665            DeliveryError::Unroutable("destination not found in routing table".to_string()),
2666            return_handle,
2667        );
2668    }
2669}
2670
2671#[cfg(test)]
2672mod tests {
2673
2674    use std::assert_matches::assert_matches;
2675    use std::mem::drop;
2676    use std::str::FromStr;
2677    use std::sync::atomic::AtomicUsize;
2678    use std::time::Duration;
2679
2680    use timed_test::async_timed_test;
2681
2682    use super::*;
2683    use crate::Actor;
2684    use crate::ActorHandle;
2685    use crate::Instance;
2686    use crate::PortId;
2687    use crate::accum;
2688    use crate::channel::ChannelTransport;
2689    use crate::channel::dial;
2690    use crate::channel::serve;
2691    use crate::channel::sim::SimAddr;
2692    use crate::clock::Clock;
2693    use crate::clock::RealClock;
2694    use crate::id;
2695    use crate::proc::Proc;
2696    use crate::reference::ProcId;
2697    use crate::reference::WorldId;
2698    use crate::simnet;
2699
2700    #[test]
2701    fn test_error() {
2702        let err = MailboxError::new(
2703            ActorId(
2704                ProcId::Ranked(WorldId("myworld".to_string()), 2),
2705                "myactor".to_string(),
2706                5,
2707            ),
2708            MailboxErrorKind::Closed,
2709        );
2710        assert_eq!(format!("{}", err), "myworld[2].myactor[5]: mailbox closed");
2711    }
2712
2713    #[tokio::test]
2714    async fn test_mailbox_basic() {
2715        let mbox = Mailbox::new_detached(id!(test[0].test));
2716        let (port, mut receiver) = mbox.open_port::<u64>();
2717        let port = port.bind();
2718
2719        mbox.serialize_and_send(&port, 123, monitored_return_handle())
2720            .unwrap();
2721        mbox.serialize_and_send(&port, 321, monitored_return_handle())
2722            .unwrap();
2723        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2724        assert_eq!(receiver.recv().await.unwrap(), 321u64);
2725
2726        let serialized = wirevalue::Any::serialize(&999u64).unwrap();
2727        mbox.post(
2728            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2729            monitored_return_handle(),
2730        );
2731        assert_eq!(receiver.recv().await.unwrap(), 999u64);
2732    }
2733
2734    #[tokio::test]
2735    async fn test_mailbox_accum() {
2736        let mbox = Mailbox::new_detached(id!(test[0].test));
2737        let (port, mut receiver) = mbox.open_accum_port(accum::max::<i64>());
2738
2739        for i in -3..4 {
2740            port.send(i).unwrap();
2741            let received: accum::Max<i64> = receiver.recv().await.unwrap();
2742            let msg = received.get();
2743            assert_eq!(msg, &i);
2744        }
2745        // Send a smaller or same value. Should still receive the previous max.
2746        for i in -3..4 {
2747            port.send(i).unwrap();
2748            assert_eq!(receiver.recv().await.unwrap().get(), &3);
2749        }
2750        // send a larger value. Should receive the new max.
2751        port.send(4).unwrap();
2752        assert_eq!(receiver.recv().await.unwrap().get(), &4);
2753
2754        // Send multiple updates. Should only receive the final change.
2755        for i in 5..10 {
2756            port.send(i).unwrap();
2757        }
2758        assert_eq!(receiver.recv().await.unwrap().get(), &9);
2759        port.send(1).unwrap();
2760        port.send(3).unwrap();
2761        port.send(2).unwrap();
2762        assert_eq!(receiver.recv().await.unwrap().get(), &9);
2763    }
2764
2765    #[test]
2766    fn test_port_and_reducer() {
2767        let mbox = Mailbox::new_detached(id!(test[0].test));
2768        // accum port could have reducer typehash
2769        {
2770            let accumulator = accum::max::<u64>();
2771            let reducer_spec = accumulator.reducer_spec().unwrap();
2772            let (port, _) = mbox.open_accum_port(accum::max::<u64>());
2773            assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
2774            let port_ref = port.bind();
2775            assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
2776        }
2777        // normal port should not have reducer typehash
2778        {
2779            let (port, _) = mbox.open_port::<u64>();
2780            assert_eq!(port.reducer_spec, None);
2781            let port_ref = port.bind();
2782            assert_eq!(port_ref.reducer_spec(), &None);
2783        }
2784    }
2785
2786    #[tokio::test]
2787    #[ignore] // error behavior changed, but we will bring it back
2788    async fn test_mailbox_once() {
2789        let mbox = Mailbox::new_detached(id!(test[0].test));
2790
2791        let (port, receiver) = mbox.open_once_port::<u64>();
2792
2793        // let port_id = port.port_id().clone();
2794
2795        port.send(123u64).unwrap();
2796        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2797
2798        // // The borrow checker won't let us send again on the port
2799        // // (good!), but we stashed the port-id and so we can try on the
2800        // // serialized interface.
2801        // let Err(err) = mbox
2802        //     .send_serialized(&port_id, &wirevalue::Any(Vec::new()))
2803        //     .await
2804        // else {
2805        //     unreachable!()
2806        // };
2807        // assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2808    }
2809
2810    #[tokio::test]
2811    #[ignore] // changed error behavior
2812    async fn test_mailbox_receiver_drop() {
2813        let mbox = Mailbox::new_detached(id!(test[0].test));
2814        let (port, mut receiver) = mbox.open_port::<u64>();
2815        // Make sure we go through "remote" path.
2816        let port = port.bind();
2817        mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
2818            .unwrap();
2819        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2820        drop(receiver);
2821        let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
2822            panic!();
2823        };
2824
2825        assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2826        assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
2827    }
2828
2829    #[tokio::test]
2830    async fn test_drain() {
2831        let mbox = Mailbox::new_detached(id!(test[0].test));
2832
2833        let (port, mut receiver) = mbox.open_port();
2834        let port = port.bind();
2835
2836        for i in 0..10 {
2837            mbox.serialize_and_send(&port, i, monitored_return_handle())
2838                .unwrap();
2839        }
2840
2841        for i in 0..10 {
2842            assert_eq!(receiver.recv().await.unwrap(), i);
2843        }
2844
2845        assert!(receiver.drain().is_empty());
2846    }
2847
2848    #[tokio::test]
2849    async fn test_mailbox_muxer() {
2850        let muxer = MailboxMuxer::new();
2851
2852        let mbox0 = Mailbox::new_detached(id!(test[0].actor1));
2853        let mbox1 = Mailbox::new_detached(id!(test[0].actor2));
2854
2855        muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
2856        muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
2857
2858        let (port, receiver) = mbox0.open_once_port::<u64>();
2859
2860        port.send(123u64).unwrap();
2861        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2862
2863        /*
2864        let (tx, rx) = channel::local::new::<u64>();
2865        let (port, _) = mbox0.open_port::<u64>();
2866        let handle = muxer.clone().serve_port(port, rx).unwrap();
2867        muxer.unbind(mbox0.actor_id());
2868        tx.send(123u64).await.unwrap();
2869        let Ok(Err(err)) = handle.await else { panic!() };
2870        assert_eq!(err.actor_id(), &actor_id(0));
2871        */
2872    }
2873
2874    #[tokio::test]
2875    async fn test_local_client_server() {
2876        let mbox = Mailbox::new_detached(id!(test[0].actor0));
2877        let (tx, rx) = channel::local::new();
2878        let serve_handle = mbox.clone().serve(rx);
2879        let client = MailboxClient::new(tx);
2880
2881        let (port, receiver) = mbox.open_once_port::<u64>();
2882        let port = port.bind();
2883
2884        client
2885            .serialize_and_send_once(port, 123u64, monitored_return_handle())
2886            .unwrap();
2887        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2888        serve_handle.stop("fromt test");
2889        serve_handle.await.unwrap().unwrap();
2890    }
2891
2892    #[tokio::test]
2893    async fn test_sim_client_server() {
2894        simnet::start();
2895        let dst_addr = SimAddr::new("local:1".parse::<ChannelAddr>().unwrap()).unwrap();
2896        let src_to_dst = ChannelAddr::Sim(
2897            SimAddr::new_with_src(
2898                "local:0".parse::<ChannelAddr>().unwrap(),
2899                dst_addr.addr().clone(),
2900            )
2901            .unwrap(),
2902        );
2903
2904        let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone())).unwrap();
2905        let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
2906        let mbox = Mailbox::new_detached(id!(test[0].actor0));
2907        let serve_handle = mbox.clone().serve(rx);
2908        let client = MailboxClient::new(tx);
2909        let (port, receiver) = mbox.open_once_port::<u64>();
2910        let port = port.bind();
2911        let msg: u64 = 123;
2912        client
2913            .serialize_and_send_once(port, msg, monitored_return_handle())
2914            .unwrap();
2915        assert_eq!(receiver.recv().await.unwrap(), msg);
2916        serve_handle.stop("from test");
2917        serve_handle.await.unwrap().unwrap();
2918    }
2919
2920    #[tokio::test]
2921    async fn test_mailbox_router() {
2922        let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
2923        let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
2924        let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
2925        let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
2926
2927        let comms: Vec<(OncePortRef<u64>, OncePortReceiver<u64>)> =
2928            [&mbox0, &mbox1, &mbox2, &mbox3]
2929                .into_iter()
2930                .map(|mbox| {
2931                    let (port, receiver) = mbox.open_once_port::<u64>();
2932                    (port.bind(), receiver)
2933                })
2934                .collect();
2935
2936        let router = MailboxRouter::new();
2937
2938        router.bind(id!(world0).into(), mbox0);
2939        router.bind(id!(world1[0]).into(), mbox1);
2940        router.bind(id!(world1[1]).into(), mbox2);
2941        router.bind(id!(world1[1].actor1).into(), mbox3);
2942
2943        for (i, (port, receiver)) in comms.into_iter().enumerate() {
2944            router
2945                .serialize_and_send_once(port, i as u64, monitored_return_handle())
2946                .unwrap();
2947            assert_eq!(receiver.recv().await.unwrap(), i as u64);
2948        }
2949
2950        // Test undeliverable messages, and that it is delivered with the appropriate fallback.
2951
2952        let mbox4 = Mailbox::new_detached(id!(fallback[0].actor));
2953
2954        let (return_handle, mut return_receiver) =
2955            crate::mailbox::undeliverable::new_undeliverable_port();
2956        let (port, _receiver) = mbox4.open_once_port();
2957        router
2958            .serialize_and_send_once(port.bind(), 0, return_handle.clone())
2959            .unwrap();
2960        assert!(return_receiver.recv().await.is_ok());
2961
2962        let router = router.fallback(mbox4.clone().into_boxed());
2963        let (port, receiver) = mbox4.open_once_port();
2964        router
2965            .serialize_and_send_once(port.bind(), 0, return_handle)
2966            .unwrap();
2967        assert_eq!(receiver.recv().await.unwrap(), 0);
2968    }
2969
2970    #[tokio::test]
2971    async fn test_dial_mailbox_router() {
2972        let router = DialMailboxRouter::new();
2973
2974        router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
2975        router.bind(id!(world1[0]).into(), "unix!@2".parse().unwrap());
2976        router.bind(id!(world1[1]).into(), "unix!@3".parse().unwrap());
2977        router.bind(id!(world1[1].actor1).into(), "unix!@4".parse().unwrap());
2978        // Bind a direct address -- we should use its bound address!
2979        router.bind(
2980            "unix:@4,my_proc,my_actor".parse().unwrap(),
2981            "unix:@5".parse().unwrap(),
2982        );
2983
2984        // We should be able to lookup the ids
2985        router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
2986        router.lookup_addr(&id!(world1[0].actor[0])).unwrap();
2987
2988        let actor_id = Reference::from_str("unix:@4,my_proc,my_actor")
2989            .unwrap()
2990            .into_actor()
2991            .unwrap();
2992        assert_eq!(
2993            router.lookup_addr(&actor_id).unwrap(),
2994            "unix!@5".parse().unwrap(),
2995        );
2996        router.unbind(&actor_id.clone().into());
2997        assert_eq!(
2998            router.lookup_addr(&actor_id).unwrap(),
2999            "unix!@4".parse().unwrap(),
3000        );
3001
3002        // Unbind so we cannot find the ids anymore
3003        router.unbind(&id!(world1).into());
3004        assert!(router.lookup_addr(&id!(world1[0].actor1[0])).is_none());
3005        assert!(router.lookup_addr(&id!(world1[1].actor1[0])).is_none());
3006        assert!(router.lookup_addr(&id!(world1[2].actor1[0])).is_none());
3007        router.lookup_addr(&id!(world0[0].actor[0])).unwrap();
3008        router.unbind(&id!(world0).into());
3009        assert!(router.lookup_addr(&id!(world0[0].actor[0])).is_none());
3010    }
3011
3012    #[tokio::test]
3013    #[ignore] // TODO: there's a leak here, fix it
3014    async fn test_dial_mailbox_router_default() {
3015        let mbox0 = Mailbox::new_detached(id!(world0[0].actor0));
3016        let mbox1 = Mailbox::new_detached(id!(world1[0].actor0));
3017        let mbox2 = Mailbox::new_detached(id!(world1[1].actor0));
3018        let mbox3 = Mailbox::new_detached(id!(world1[1].actor1));
3019
3020        // We don't need to dial here, since we gain direct access to the
3021        // underlying routers.
3022        let root = MailboxRouter::new();
3023        let world0_router = DialMailboxRouter::new_with_default(root.boxed());
3024        let world1_router = DialMailboxRouter::new_with_default(root.boxed());
3025
3026        root.bind(id!(world0).into(), world0_router.clone());
3027        root.bind(id!(world1).into(), world1_router.clone());
3028
3029        let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
3030
3031        let mut handles = Vec::new(); // hold on to handles, or channels get closed
3032        for mbox in mailboxes.iter() {
3033            let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
3034            let handle = (*mbox).clone().serve(rx);
3035            handles.push(handle);
3036
3037            eprintln!("{}: {}", mbox.actor_id(), addr);
3038            if mbox.actor_id().world_name() == "world0" {
3039                world0_router.bind(mbox.actor_id().clone().into(), addr);
3040            } else {
3041                world1_router.bind(mbox.actor_id().clone().into(), addr);
3042            }
3043        }
3044
3045        // Make sure nodes are fully connected.
3046        for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
3047            for mbox in mailboxes.iter() {
3048                let (port, receiver) = mbox.open_once_port::<u64>();
3049                let port = port.bind();
3050                router
3051                    .serialize_and_send_once(port, 123u64, monitored_return_handle())
3052                    .unwrap();
3053                assert_eq!(receiver.recv().await.unwrap(), 123u64);
3054            }
3055        }
3056    }
3057
3058    #[tokio::test]
3059    async fn test_enqueue_port() {
3060        let mbox = Mailbox::new_detached(id!(test[0].test));
3061
3062        let count = Arc::new(AtomicUsize::new(0));
3063        let count_clone = count.clone();
3064        let port = mbox.open_enqueue_port(move |_, n| {
3065            count_clone.fetch_add(n, Ordering::SeqCst);
3066            Ok(())
3067        });
3068
3069        port.send(10).unwrap();
3070        port.send(5).unwrap();
3071        port.send(1).unwrap();
3072        port.send(0).unwrap();
3073
3074        assert_eq!(count.load(Ordering::SeqCst), 16);
3075    }
3076
3077    #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3078    struct TestMessage;
3079
3080    #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3081    #[named(name = "some::custom::path")]
3082    struct TestMessage2;
3083
3084    #[test]
3085    fn test_remote_message_macros() {
3086        assert_eq!(
3087            TestMessage::typename(),
3088            "hyperactor::mailbox::tests::TestMessage"
3089        );
3090        assert_eq!(TestMessage2::typename(), "some::custom::path");
3091    }
3092
3093    #[test]
3094    fn test_message_envelope_display() {
3095        #[derive(typeuri::Named, Serialize, Deserialize)]
3096        struct MyTest {
3097            a: u64,
3098            b: String,
3099        }
3100        wirevalue::register_type!(MyTest);
3101
3102        let envelope = MessageEnvelope::serialize(
3103            id!(source[0].actor),
3104            id!(dest[1].actor[0][123]),
3105            &MyTest {
3106                a: 123,
3107                b: "hello".into(),
3108            },
3109            Attrs::new(),
3110        )
3111        .unwrap();
3112
3113        assert_eq!(
3114            format!("{}", envelope),
3115            r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"} {}"#
3116        );
3117    }
3118
3119    #[derive(Debug, Default)]
3120    struct Foo;
3121
3122    impl Actor for Foo {}
3123
3124    // Test that a message delivery failure causes the sending actor
3125    // to stop running.
3126    #[tokio::test]
3127    async fn test_actor_delivery_failure() {
3128        // This test involves making an actor fail and so we must set
3129        // a supervision coordinator.
3130        use crate::actor::ActorStatus;
3131        use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
3132
3133        let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
3134            BOXED_PANICKING_MAILBOX_SENDER.clone(),
3135        ));
3136        let proc_id = id!(quux[0]);
3137        let mut proc = Proc::new(proc_id.clone(), proc_forwarder);
3138        ProcSupervisionCoordinator::set(&proc).await.unwrap();
3139
3140        let foo = proc.spawn("foo", Foo).unwrap();
3141        let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
3142        let message = MessageEnvelope::new(
3143            foo.actor_id().clone(),
3144            PortId(id!(corge[0].bar), 9999u64),
3145            wirevalue::Any::serialize(&1u64).unwrap(),
3146            Attrs::new(),
3147        );
3148        return_handle.send(Undeliverable(message)).unwrap();
3149
3150        RealClock
3151            .sleep(tokio::time::Duration::from_millis(100))
3152            .await;
3153
3154        let foo_status = foo.status();
3155        assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
3156        let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
3157            unreachable!()
3158        };
3159        assert!(msg.to_string().contains(
3160            "a message from \
3161                quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
3162        ));
3163
3164        proc.destroy_and_wait::<()>(tokio::time::Duration::from_secs(1), None)
3165            .await
3166            .unwrap();
3167    }
3168
3169    #[tokio::test]
3170    async fn test_detached_return_handle() {
3171        let (return_handle, mut return_receiver) =
3172            crate::mailbox::undeliverable::new_undeliverable_port();
3173        // Simulate an undelivered message return.
3174        let envelope = MessageEnvelope::new(
3175            id!(foo[0].bar),
3176            PortId(id!(baz[0].corge), 9999u64),
3177            wirevalue::Any::serialize(&1u64).unwrap(),
3178            Attrs::new(),
3179        );
3180        return_handle.send(Undeliverable(envelope.clone())).unwrap();
3181        // Check we receive the undelivered message.
3182        assert!(
3183            RealClock
3184                .timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
3185                .await
3186                .is_ok()
3187        );
3188        // Setup a monitor for the receiver and show that if there are
3189        // no outstanding return handles it terminates.
3190        let monitor_handle = tokio::spawn(async move {
3191            while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
3192                envelope.set_error(DeliveryError::BrokenLink(
3193                    "returned in unit test".to_string(),
3194                ));
3195                UndeliverableMailboxSender
3196                    .post(envelope, /*unused */ monitored_return_handle());
3197            }
3198        });
3199        drop(return_handle);
3200        assert!(
3201            RealClock
3202                .timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3203                .await
3204                .is_ok()
3205        );
3206    }
3207
3208    async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3209        fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3210            // Create dummy state and port_id to create PortReceiver. They are
3211            // not used in the test.
3212            let dummy_state =
3213                State::new(id!(world[0].actor), BOXED_PANICKING_MAILBOX_SENDER.clone());
3214            let dummy_port_id = PortId(id!(world[0].actor), 0);
3215            let (sender, receiver) = mpsc::unbounded_channel::<M>();
3216            let receiver = PortReceiver {
3217                receiver,
3218                port_id: dummy_port_id,
3219                coalesce,
3220                mailbox: Mailbox {
3221                    inner: Arc::new(dummy_state),
3222                },
3223            };
3224            (sender, receiver)
3225        }
3226
3227        // verify fn drain
3228        {
3229            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3230            assert!(receiver.drain().is_empty());
3231
3232            sender.send(0).unwrap();
3233            sender.send(1).unwrap();
3234            sender.send(2).unwrap();
3235            sender.send(3).unwrap();
3236            sender.send(4).unwrap();
3237            sender.send(5).unwrap();
3238            sender.send(6).unwrap();
3239            sender.send(7).unwrap();
3240
3241            if drop_sender {
3242                drop(sender);
3243            }
3244
3245            if !coalesce {
3246                assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3247            } else {
3248                assert_eq!(receiver.drain(), vec![7]);
3249            }
3250
3251            assert!(receiver.drain().is_empty());
3252            assert!(receiver.drain().is_empty());
3253        }
3254
3255        // verify fn try_recv
3256        {
3257            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3258            assert!(receiver.try_recv().unwrap().is_none());
3259
3260            sender.send(0).unwrap();
3261            sender.send(1).unwrap();
3262            sender.send(2).unwrap();
3263            sender.send(3).unwrap();
3264
3265            if drop_sender {
3266                drop(sender);
3267            }
3268
3269            if !coalesce {
3270                assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3271                assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3272                assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3273            }
3274            assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3275            if drop_sender {
3276                assert_matches!(
3277                    receiver.try_recv().unwrap_err().kind(),
3278                    MailboxErrorKind::Closed
3279                );
3280                // Still Closed error
3281                assert_matches!(
3282                    receiver.try_recv().unwrap_err().kind(),
3283                    MailboxErrorKind::Closed
3284                );
3285            } else {
3286                assert!(receiver.try_recv().unwrap().is_none());
3287                // Still empty
3288                assert!(receiver.try_recv().unwrap().is_none());
3289            }
3290        }
3291        // verify fn recv
3292        {
3293            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3294            assert!(
3295                RealClock
3296                    .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3297                    .await
3298                    .is_err()
3299            );
3300
3301            sender.send(4).unwrap();
3302            sender.send(5).unwrap();
3303            sender.send(6).unwrap();
3304            sender.send(7).unwrap();
3305
3306            if drop_sender {
3307                drop(sender);
3308            }
3309
3310            if !coalesce {
3311                assert_eq!(receiver.recv().await.unwrap(), 4);
3312                assert_eq!(receiver.recv().await.unwrap(), 5);
3313                assert_eq!(receiver.recv().await.unwrap(), 6);
3314            }
3315            assert_eq!(receiver.recv().await.unwrap(), 7);
3316            if drop_sender {
3317                assert_matches!(
3318                    receiver.recv().await.unwrap_err().kind(),
3319                    MailboxErrorKind::Closed
3320                );
3321                // Still None
3322                assert_matches!(
3323                    receiver.recv().await.unwrap_err().kind(),
3324                    MailboxErrorKind::Closed
3325                );
3326            } else {
3327                assert!(
3328                    RealClock
3329                        .timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3330                        .await
3331                        .is_err()
3332                );
3333            }
3334        }
3335    }
3336
3337    #[tokio::test]
3338    async fn test_receiver_basic_default() {
3339        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ false).await
3340    }
3341
3342    #[tokio::test]
3343    async fn test_receiver_basic_latest() {
3344        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ false).await
3345    }
3346
3347    #[tokio::test]
3348    async fn test_receiver_after_sender_drop_default() {
3349        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ true).await
3350    }
3351
3352    #[tokio::test]
3353    async fn test_receiver_after_sender_drop_latest() {
3354        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ true).await
3355    }
3356
3357    struct Setup {
3358        receiver: PortReceiver<u64>,
3359        actor0: Instance<()>,
3360        actor1: Instance<()>,
3361        _actor0_handle: ActorHandle<()>,
3362        _actor1_handle: ActorHandle<()>,
3363        port_id: PortId,
3364        port_id1: PortId,
3365        port_id2: PortId,
3366        port_id2_1: PortId,
3367    }
3368
3369    async fn setup_split_port_ids(
3370        reducer_spec: Option<ReducerSpec>,
3371        reducer_opts: Option<ReducerOpts>,
3372    ) -> Setup {
3373        let proc = Proc::local();
3374        let (actor0, actor0_handle) = proc.instance("actor0").unwrap();
3375        let (actor1, actor1_handle) = proc.instance("actor1").unwrap();
3376
3377        // Open a port on actor0
3378        let (port_handle, receiver) = actor0.open_port::<u64>();
3379        let port_id = port_handle.bind().port_id().clone();
3380
3381        // Split it twice on actor1
3382        let port_id1 = port_id
3383            .split(&actor1, reducer_spec.clone(), reducer_opts.clone(), true)
3384            .unwrap();
3385        let port_id2 = port_id
3386            .split(&actor1, reducer_spec.clone(), reducer_opts.clone(), true)
3387            .unwrap();
3388
3389        // A split port id can also be split
3390        let port_id2_1 = port_id2
3391            .split(&actor1, reducer_spec, reducer_opts.clone(), true)
3392            .unwrap();
3393
3394        Setup {
3395            receiver,
3396            actor0,
3397            actor1,
3398            _actor0_handle: actor0_handle,
3399            _actor1_handle: actor1_handle,
3400            port_id,
3401            port_id1,
3402            port_id2,
3403            port_id2_1,
3404        }
3405    }
3406
3407    fn post(cx: &impl context::Actor, port_id: PortId, msg: u64) {
3408        let serialized = wirevalue::Any::serialize(&msg).unwrap();
3409        port_id.send(cx, serialized);
3410    }
3411
3412    #[async_timed_test(timeout_secs = 30)]
3413    // TODO: OSS: this test is flaky in OSS. Need to repo and fix it.
3414    #[cfg_attr(not(fbcode_build), ignore)]
3415    async fn test_split_port_id_no_reducer() {
3416        let Setup {
3417            mut receiver,
3418            actor0,
3419            actor1,
3420            port_id,
3421            port_id1,
3422            port_id2,
3423            port_id2_1,
3424            ..
3425        } = setup_split_port_ids(None, None).await;
3426        // Can send messages to receiver from all port handles
3427        post(&actor0, port_id.clone(), 1);
3428        assert_eq!(receiver.recv().await.unwrap(), 1);
3429        post(&actor1, port_id1.clone(), 2);
3430        assert_eq!(receiver.recv().await.unwrap(), 2);
3431        post(&actor1, port_id2.clone(), 3);
3432        assert_eq!(receiver.recv().await.unwrap(), 3);
3433        post(&actor1, port_id2_1.clone(), 4);
3434        assert_eq!(receiver.recv().await.unwrap(), 4);
3435
3436        // no more messages
3437        RealClock.sleep(Duration::from_secs(2)).await;
3438        let msg = receiver.try_recv().unwrap();
3439        assert_eq!(msg, None);
3440    }
3441
3442    async fn wait_for(
3443        receiver: &mut PortReceiver<u64>,
3444        expected_size: usize,
3445        timeout_duration: Duration,
3446    ) -> anyhow::Result<Vec<u64>> {
3447        let mut messeges = vec![];
3448
3449        RealClock
3450            .timeout(timeout_duration, async {
3451                loop {
3452                    let msg = receiver.recv().await.unwrap();
3453                    messeges.push(msg);
3454                    if messeges.len() == expected_size {
3455                        break;
3456                    }
3457                }
3458            })
3459            .await?;
3460        Ok(messeges)
3461    }
3462
3463    #[async_timed_test(timeout_secs = 30)]
3464    async fn test_split_port_id_sum_reducer() {
3465        let config = hyperactor_config::global::lock();
3466        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3467
3468        let sum_accumulator = accum::sum::<u64>();
3469        let reducer_spec = sum_accumulator.reducer_spec();
3470        let Setup {
3471            mut receiver,
3472            actor0,
3473            actor1,
3474            port_id,
3475            port_id1,
3476            port_id2,
3477            port_id2_1,
3478            ..
3479        } = setup_split_port_ids(reducer_spec, None).await;
3480        post(&actor0, port_id.clone(), 4);
3481        post(&actor1, port_id1.clone(), 2);
3482        post(&actor1, port_id2.clone(), 3);
3483        post(&actor1, port_id2_1.clone(), 1);
3484        let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3485            .await
3486            .unwrap();
3487        // Message might be received out of their sending out. So we sort the
3488        // messages here.
3489        messages.sort();
3490        assert_eq!(messages, vec![1, 2, 3, 4]);
3491
3492        // no more messages
3493        RealClock.sleep(Duration::from_secs(2)).await;
3494        let msg = receiver.try_recv().unwrap();
3495        assert_eq!(msg, None);
3496    }
3497
3498    #[async_timed_test(timeout_secs = 30)]
3499    // TODO: OSS: this test is flaky in OSS. Need to repo and fix it.
3500    #[cfg_attr(not(fbcode_build), ignore)]
3501    async fn test_split_port_id_every_n_messages() {
3502        let config = hyperactor_config::global::lock();
3503        let _config_guard =
3504            config.override_key(crate::config::SPLIT_MAX_BUFFER_AGE, Duration::from_mins(10));
3505        let proc = Proc::local();
3506        let (actor, _actor_handle) = proc.instance("actor").unwrap();
3507        let (port_handle, mut receiver) = actor.open_port::<u64>();
3508        let port_id = port_handle.bind().port_id().clone();
3509        // Split it
3510        let reducer_spec = accum::sum::<u64>().reducer_spec();
3511        let split_port_id = port_id
3512            .split(
3513                &actor,
3514                reducer_spec,
3515                Some(ReducerOpts {
3516                    max_update_interval: Some(Duration::from_mins(10)),
3517                    initial_update_interval: Some(Duration::from_mins(10)),
3518                }),
3519                true,
3520            )
3521            .unwrap();
3522
3523        // Send 9 messages.
3524        for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3525            post(&actor, split_port_id.clone(), msg);
3526        }
3527        // The first 5 should be batched and reduced once due
3528        // to every_n_msgs = 5.
3529        let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3530            .await
3531            .unwrap();
3532        assert_eq!(messages, vec![15]);
3533
3534        // the last message unfortranately will never come because they do not
3535        // reach batch size.
3536        RealClock.sleep(Duration::from_secs(2)).await;
3537        let msg = receiver.try_recv().unwrap();
3538        assert_eq!(msg, None);
3539    }
3540
3541    #[async_timed_test(timeout_secs = 30)]
3542    async fn test_split_port_timeout_flush() {
3543        let config = hyperactor_config::global::lock();
3544        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
3545
3546        let Setup {
3547            mut receiver,
3548            actor0: _,
3549            actor1,
3550            port_id: _,
3551            port_id1,
3552            port_id2: _,
3553            port_id2_1: _,
3554            ..
3555        } = setup_split_port_ids(
3556            Some(accum::sum::<u64>().reducer_spec().unwrap()),
3557            Some(ReducerOpts {
3558                max_update_interval: Some(Duration::from_millis(50)),
3559                initial_update_interval: Some(Duration::from_millis(50)),
3560            }),
3561        )
3562        .await;
3563
3564        post(&actor1, port_id1.clone(), 10);
3565        post(&actor1, port_id1.clone(), 20);
3566        post(&actor1, port_id1.clone(), 30);
3567
3568        // Messages should accumulate for 50ms.
3569        RealClock.sleep(Duration::from_millis(10)).await;
3570        let msg = receiver.try_recv().unwrap();
3571        assert_eq!(msg, None);
3572
3573        // Wait until we are flushed.
3574        RealClock.sleep(Duration::from_millis(100)).await;
3575
3576        // Now we are reduced and accumulated:
3577        let msg = receiver.recv().await.unwrap();
3578        assert_eq!(msg, 60); // 10 + 20 + 30
3579
3580        // No further messages:
3581        let msg = receiver.try_recv().unwrap();
3582        assert_eq!(msg, None);
3583    }
3584
3585    #[async_timed_test(timeout_secs = 30)]
3586    async fn test_split_port_timeout_and_size_flush() {
3587        let config = hyperactor_config::global::lock();
3588        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
3589
3590        let Setup {
3591            mut receiver,
3592            actor0: _,
3593            actor1,
3594            port_id: _,
3595            port_id1,
3596            port_id2: _,
3597            port_id2_1: _,
3598            ..
3599        } = setup_split_port_ids(
3600            Some(accum::sum::<u64>().reducer_spec().unwrap()),
3601            Some(ReducerOpts {
3602                max_update_interval: Some(Duration::from_millis(50)),
3603                initial_update_interval: Some(Duration::from_millis(50)),
3604            }),
3605        )
3606        .await;
3607
3608        post(&actor1, port_id1.clone(), 10);
3609        post(&actor1, port_id1.clone(), 20);
3610        post(&actor1, port_id1.clone(), 30);
3611        post(&actor1, port_id1.clone(), 40);
3612
3613        // Should have flushed at the third message.
3614        let msg = receiver.recv().await.unwrap();
3615        assert_eq!(msg, 60);
3616
3617        // After 50ms, the next reduce will flush:
3618        let msg = receiver.recv().await.unwrap();
3619        assert_eq!(msg, 40);
3620
3621        // No further messages
3622        let msg = receiver.try_recv().unwrap();
3623        assert_eq!(msg, None);
3624    }
3625
3626    #[test]
3627    fn test_dial_mailbox_router_prefixes_empty() {
3628        assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
3629    }
3630
3631    #[test]
3632    fn test_dial_mailbox_router_prefixes_single_entry() {
3633        let router = DialMailboxRouter::new();
3634        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3635
3636        let prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3637        assert_eq!(prefixes.len(), 1);
3638        assert_eq!(prefixes[0], id!(world0).into());
3639    }
3640
3641    #[test]
3642    fn test_dial_mailbox_router_prefixes_no_overlap() {
3643        let router = DialMailboxRouter::new();
3644        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3645        router.bind(id!(world1).into(), "unix!@2".parse().unwrap());
3646        router.bind(id!(world2).into(), "unix!@3".parse().unwrap());
3647
3648        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3649        prefixes.sort();
3650
3651        let mut expected = vec![id!(world0).into(), id!(world1).into(), id!(world2).into()];
3652        expected.sort();
3653
3654        assert_eq!(prefixes, expected);
3655    }
3656
3657    #[test]
3658    fn test_dial_mailbox_router_prefixes_with_overlaps() {
3659        let router = DialMailboxRouter::new();
3660        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3661        router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3662        router.bind(id!(world0[1]).into(), "unix!@3".parse().unwrap());
3663        router.bind(id!(world1).into(), "unix!@4".parse().unwrap());
3664        router.bind(id!(world1[0]).into(), "unix!@5".parse().unwrap());
3665
3666        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3667        prefixes.sort();
3668
3669        // Only world0 and world1 should be covering prefixes since they cover their children
3670        let mut expected = vec![id!(world0).into(), id!(world1).into()];
3671        expected.sort();
3672
3673        assert_eq!(prefixes, expected);
3674    }
3675
3676    #[test]
3677    fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
3678        let router = DialMailboxRouter::new();
3679        router.bind(id!(world0).into(), "unix!@1".parse().unwrap());
3680        router.bind(id!(world0[0]).into(), "unix!@2".parse().unwrap());
3681        router.bind(id!(world0[0].actor1).into(), "unix!@3".parse().unwrap());
3682        router.bind(id!(world1[0]).into(), "unix!@4".parse().unwrap());
3683        router.bind(id!(world1[1]).into(), "unix!@5".parse().unwrap());
3684        router.bind(id!(world2[0].actor0).into(), "unix!@6".parse().unwrap());
3685
3686        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3687        prefixes.sort();
3688
3689        // Covering prefixes should be:
3690        // - world0 (covers world0[0] and world0[0].actor1)
3691        // - world1[0] (not covered by anything else)
3692        // - world1[1] (not covered by anything else)
3693        // - world2[0].actor0 (not covered by anything else)
3694        let expected = vec![
3695            id!(world0).into(),
3696            id!(world1[0]).into(),
3697            id!(world1[1]).into(),
3698            id!(world2[0].actor0).into(),
3699        ];
3700
3701        assert_eq!(prefixes, expected);
3702    }
3703
3704    #[test]
3705    fn test_dial_mailbox_router_prefixes_same_level() {
3706        let router = DialMailboxRouter::new();
3707        router.bind(id!(world0[0]).into(), "unix!@1".parse().unwrap());
3708        router.bind(id!(world0[1]).into(), "unix!@2".parse().unwrap());
3709        router.bind(id!(world0[2]).into(), "unix!@3".parse().unwrap());
3710
3711        let mut prefixes: Vec<Reference> = router.prefixes().into_iter().collect();
3712        prefixes.sort();
3713
3714        // All should be covering prefixes since none is a prefix of another
3715        let mut expected = vec![
3716            id!(world0[0]).into(),
3717            id!(world0[1]).into(),
3718            id!(world0[2]).into(),
3719        ];
3720        expected.sort();
3721
3722        assert_eq!(prefixes, expected);
3723    }
3724
3725    /// A forwarder that bounces messages back to the **same**
3726    /// mailbox, but does so on a task to avoid recursive stack
3727    /// growth.
3728    #[derive(Clone, Debug)]
3729    struct AsyncLoopForwarder;
3730
3731    impl MailboxSender for AsyncLoopForwarder {
3732        fn post_unchecked(
3733            &self,
3734            envelope: MessageEnvelope,
3735            return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3736        ) {
3737            let me = self.clone();
3738            tokio::spawn(async move {
3739                // Call `post` so each hop applies TTL exactly once.
3740                me.post(envelope, return_handle);
3741            });
3742        }
3743    }
3744
3745    #[tokio::test]
3746    async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
3747        let actor_id = ActorId(
3748            ProcId::Ranked(id!(test_world), 0),
3749            "ttl_actor".to_string(),
3750            0,
3751        );
3752        let mailbox = Mailbox::new(
3753            actor_id.clone(),
3754            BoxedMailboxSender::new(AsyncLoopForwarder),
3755        );
3756        let (ret_port, mut ret_rx) = mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
3757
3758        // Create a destination not owned by this mailbox to force
3759        // forwarding.
3760        let remote_actor = ActorId(
3761            ProcId::Ranked(id!(remote_world), 1),
3762            "remote".to_string(),
3763            0,
3764        );
3765        let dest = PortId(remote_actor.clone(), /*port index*/ 4242);
3766
3767        // Build an envelope (TTL is seeded in `MessageEnvelope::new` /
3768        // `::serialize`).
3769        let payload = 1234_u64;
3770        let envelope =
3771            MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Attrs::new())
3772                .expect("serialize");
3773
3774        // Post it. This will start bouncing between forwarder and
3775        // mailbox until TTL hits 0.
3776        let return_handle = ret_port.clone();
3777        mailbox.post(envelope, return_handle);
3778
3779        // We expect the undeliverable to come back once TTL expires.
3780        #[allow(clippy::disallowed_methods)]
3781        let Undeliverable(undelivered) =
3782            tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
3783                .await
3784                .expect("timed out waiting for undeliverable")
3785                .expect("channel closed");
3786
3787        // Sanity: round-trip payload still deserializes.
3788        let got: u64 = undelivered.deserialized().expect("deserialize");
3789        assert_eq!(got, payload, "payload preserved");
3790    }
3791
3792    #[tokio::test]
3793    async fn message_ttl_success_local_delivery() {
3794        let actor_id = ActorId(
3795            ProcId::Ranked(id!(test_world), 0),
3796            "ttl_actor".to_string(),
3797            0,
3798        );
3799        let mailbox = Mailbox::new(
3800            actor_id.clone(),
3801            BoxedMailboxSender::new(PanickingMailboxSender),
3802        );
3803        let (_undeliverable_tx, mut undeliverable_rx) =
3804            mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
3805
3806        // Open a local user u64 port.
3807        let (user_port, mut user_rx) = mailbox.open_port::<u64>();
3808
3809        // Build an envelope destined for this mailbox's own port.
3810        let payload = 0xC0FFEE_u64;
3811        let envelope = MessageEnvelope::serialize(
3812            actor_id.clone(),
3813            user_port.bind().port_id().clone(),
3814            &payload,
3815            Attrs::new(),
3816        )
3817        .expect("serialize");
3818
3819        // Post the message using the mailbox (local path). TTL will
3820        // not expire.
3821        let return_handle = mailbox
3822            .bound_return_handle()
3823            .unwrap_or(monitored_return_handle());
3824        mailbox.post(envelope, return_handle);
3825
3826        // We should receive the payload locally.
3827        #[allow(clippy::disallowed_methods)]
3828        let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
3829            .await
3830            .expect("timed out waiting for local delivery")
3831            .expect("user port closed");
3832        assert_eq!(got, payload);
3833
3834        // There should be no undeliverables arriving.
3835        #[allow(clippy::disallowed_methods)]
3836        let no_undeliverable =
3837            tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
3838        assert!(
3839            no_undeliverable.is_err(),
3840            "unexpected undeliverable returned on successful local delivery"
3841        );
3842    }
3843
3844    #[tokio::test]
3845    async fn test_port_contramap() {
3846        let mbox = Mailbox::new_detached(id!(test[0].test));
3847        let (handle, mut rx) = mbox.open_port();
3848
3849        handle
3850            .contramap(|m| (1, m))
3851            .send("hello".to_string())
3852            .unwrap();
3853        assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
3854    }
3855
3856    #[test]
3857    #[should_panic(expected = "already bound")]
3858    fn test_bind_port_handle_to_actor_port_twice() {
3859        let mbox = Mailbox::new_detached(id!(test[0].test));
3860        let (handle, _rx) = mbox.open_port::<String>();
3861        handle.bind_actor_port();
3862        handle.bind_actor_port();
3863    }
3864
3865    #[test]
3866    fn test_bind_port_handle_to_actor_port() {
3867        let mbox = Mailbox::new_detached(id!(test[0].test));
3868        let default_port = mbox.actor_id().port_id(String::port());
3869        let (handle, _rx) = mbox.open_port::<String>();
3870        // Handle's port index is allocated by mailbox, not the actor port.
3871        assert_ne!(default_port.index(), handle.port_index);
3872        // Bind the handle to the actor port.
3873        handle.bind_actor_port();
3874        assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
3875        // bind() can still be used, just it will not change handle's state.
3876        handle.bind();
3877        handle.bind();
3878        assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
3879    }
3880
3881    #[test]
3882    #[should_panic(expected = "already bound")]
3883    fn test_bind_port_handle_to_actor_port_when_already_bound() {
3884        let mbox = Mailbox::new_detached(id!(test[0].test));
3885        let (handle, _rx) = mbox.open_port::<String>();
3886        // Bound handle to the port allocated by mailbox.
3887        handle.bind();
3888        assert_matches!(handle.location(), PortLocation::Bound(port) if port.index() == handle.port_index);
3889        // Since handle is already bound, call bind_to() on it will cause panic.
3890        handle.bind_actor_port();
3891    }
3892}