hyperactor/
mailbox.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mailboxes are the central message-passing mechanism in Hyperactor.
10//!
11//! Each actor owns a mailbox to which other actors can deliver messages.
12//! An actor can open one or more typed _ports_ in the mailbox; messages
13//! are in turn delivered to specific ports.
14//!
15//! Mailboxes are associated with an [`ActorId`] (given by `actor_id`
16//! in the following example):
17//!
18//! ```
19//! # use hyperactor::mailbox::Mailbox;
20//! # use hyperactor::Proc;
21//! # use hyperactor::reference::{ActorId, ProcId};
22//! # tokio_test::block_on(async {
23//! # let proc = Proc::local();
24//! # let (client, _) = proc.instance("client").unwrap();
25//! # let actor_id = proc.proc_id().actor_id("actor", 0);
26//! let mbox = Mailbox::new_detached(actor_id);
27//! let (port, mut receiver) = mbox.open_port::<u64>();
28//!
29//! port.send(&client, 123).unwrap();
30//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
31//! # })
32//! ```
33//!
34//! Mailboxes also provide a form of one-shot ports, called [`OncePort`],
35//! that permits at most one message transmission:
36//!
37//! ```
38//! # use hyperactor::mailbox::Mailbox;
39//! # use hyperactor::Proc;
40//! # use hyperactor::reference::{ActorId, ProcId};
41//! # tokio_test::block_on(async {
42//! # let proc = Proc::local();
43//! # let (client, _) = proc.instance("client").unwrap();
44//! # let actor_id = proc.proc_id().actor_id("actor", 0);
45//! let mbox = Mailbox::new_detached(actor_id);
46//!
47//! let (port, receiver) = mbox.open_once_port::<u64>();
48//!
49//! port.send(&client, 123u64).unwrap();
50//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
51//! # })
52//! ```
53//!
54//! [`OncePort`]s are correspondingly used for RPC replies in the actor
55//! system.
56//!
57//! ## Remote ports and serialization
58//!
59//! Mailboxes allow delivery of serialized messages to named ports:
60//!
61//! 1) Ports restrict message types to (serializable) [`Message`]s.
62//! 2) Each [`Port`] is associated with a [`PortId`] which globally names the port.
63//! 3) [`Mailbox`] provides interfaces to deliver serialized
64//!    messages to ports named by their [`PortId`].
65//!
66//! While this complicates the interface somewhat, it allows the
67//! implementation to avoid a serialization roundtrip when passing
68//! messages locally.
69
70use std::any::Any;
71use std::collections::BTreeMap;
72use std::collections::BTreeSet;
73use std::fmt;
74use std::fmt::Debug;
75use std::future;
76use std::future::Future;
77use std::ops::Bound::Excluded;
78use std::pin::Pin;
79use std::sync::Arc;
80use std::sync::LazyLock;
81use std::sync::Mutex;
82use std::sync::RwLock;
83use std::sync::Weak;
84use std::sync::atomic::AtomicU64;
85use std::sync::atomic::AtomicUsize;
86use std::sync::atomic::Ordering;
87use std::task::Context;
88use std::task::Poll;
89
90use async_trait::async_trait;
91use dashmap::DashMap;
92use dashmap::mapref::entry::Entry;
93use futures::Sink;
94use futures::Stream;
95use hyperactor_config::Flattrs;
96use hyperactor_telemetry::hash_to_u64;
97use serde::Deserialize;
98use serde::Serialize;
99use serde::de::DeserializeOwned;
100use tokio::sync::mpsc;
101use tokio::sync::oneshot;
102use tokio::sync::watch;
103use tokio::task::JoinHandle;
104use tokio_util::sync::CancellationToken;
105use typeuri::Named;
106
107// for macros
108use crate::Proc;
109use crate::accum::Accumulator;
110use crate::accum::ReducerSpec;
111use crate::accum::StreamingReducerOpts;
112use crate::actor::ActorStatus;
113use crate::actor::Signal;
114use crate::actor::remote::USER_PORT_OFFSET;
115use crate::channel;
116use crate::channel::ChannelAddr;
117use crate::channel::ChannelError;
118use crate::channel::ChannelTransport;
119use crate::channel::SendError;
120use crate::channel::TxStatus;
121use crate::context;
122use crate::metrics;
123use crate::ordering::SEQ_INFO;
124use crate::ordering::SeqInfo;
125use crate::reference;
126
127mod undeliverable;
128/// For [`Undeliverable`], a message type for delivery failures.
129pub use undeliverable::Undeliverable;
130pub use undeliverable::UndeliverableMessageError;
131pub use undeliverable::custom_monitored_return_handle;
132pub use undeliverable::monitored_return_handle; // TODO: Audit
133/// For [`MailboxAdminMessage`], a message type for mailbox administration.
134pub mod mailbox_admin_message;
135pub use mailbox_admin_message::MailboxAdminMessage;
136pub use mailbox_admin_message::MailboxAdminMessageHandler;
137/// For [`DurableMailboxSender`] a sender with a write-ahead log.
138pub mod durable_mailbox_sender;
139pub use durable_mailbox_sender::log;
140use durable_mailbox_sender::log::*;
141/// For message headers and latency tracking.
142pub mod headers;
143
144/// Message collects the necessary requirements for messages that are deposited
145/// into mailboxes.
146pub trait Message: Send + Sync + 'static {}
147impl<M: Send + Sync + 'static> Message for M {}
148
149/// RemoteMessage extends [`Message`] by requiring that the messages
150/// also be serializable, and can thus traverse process boundaries.
151/// RemoteMessages must also specify a globally unique type name (a URI).
152pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
153
154impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
155
156/// Type alias for bytestring data used throughout the system.
157pub type Data = Vec<u8>;
158
159/// Delivery errors occur during message posting.
160#[derive(
161    thiserror::Error,
162    Debug,
163    Serialize,
164    Deserialize,
165    typeuri::Named,
166    Clone,
167    PartialEq,
168    Eq
169)]
170pub enum DeliveryError {
171    /// The destination address is not reachable.
172    #[error("address not routable: {0}")]
173    Unroutable(String),
174
175    /// A broken link indicates that a link in the message
176    /// delivery path has failed.
177    #[error("broken link: {0}")]
178    BrokenLink(String),
179
180    /// A (local) mailbox delivery error.
181    #[error("mailbox error: {0}")]
182    Mailbox(String),
183
184    /// A multicast related delivery error.
185    #[error("multicast error: {0}")]
186    Multicast(String),
187
188    /// The message went through too many hops and has expired.
189    #[error("ttl expired")]
190    TtlExpired,
191}
192
193/// An envelope that carries a message destined to a remote actor.
194/// The envelope contains a serialized message along with its destination
195/// and sender.
196#[derive(Debug, Serialize, Deserialize, Clone, typeuri::Named)]
197pub struct MessageEnvelope {
198    /// The sender of this message.
199    sender: reference::ActorId,
200
201    /// The destination of the message.
202    dest: reference::PortId,
203
204    /// The serialized message.
205    data: wirevalue::Any,
206
207    /// Error contains a delivery error when message delivery failed.
208    errors: Vec<DeliveryError>,
209
210    /// Additional context for this message.
211    headers: Flattrs,
212
213    /// Decremented at every `MailboxSender` hop.
214    ttl: u8,
215
216    /// If true, undeliverable messages should be returned to sender. Else, they
217    /// are dropped.
218    return_undeliverable: bool,
219    // TODO: add typename, source, seq, etc.
220}
221wirevalue::register_type!(MessageEnvelope);
222
223impl MessageEnvelope {
224    /// Create a new envelope with the provided sender, destination, and message.
225    pub fn new(
226        sender: reference::ActorId,
227        dest: reference::PortId,
228        data: wirevalue::Any,
229        headers: Flattrs,
230    ) -> Self {
231        Self {
232            sender,
233            dest,
234            data,
235            errors: Vec::new(),
236            headers,
237            ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
238            // By default, all undeliverable messages should be returned to the sender.
239            return_undeliverable: true,
240        }
241    }
242
243    /// Create a new envelope whose sender ID is unknown.
244    pub(crate) fn new_unknown(dest: reference::PortId, data: wirevalue::Any) -> Self {
245        // Create a synthetic "unknown" actor ID for messages with no known sender
246        let unknown_addr = ChannelAddr::any(ChannelTransport::Local);
247        let unknown_proc_id = crate::reference::ProcId::unique(unknown_addr, "unknown");
248        let unknown_actor_id =
249            crate::reference::ActorId::root(unknown_proc_id, "unknown".to_string());
250        Self::new(unknown_actor_id, dest, data, Flattrs::new())
251    }
252
253    /// Construct a new serialized value by serializing the provided T-typed value.
254    pub fn serialize<T: Serialize + Named>(
255        source: reference::ActorId,
256        dest: reference::PortId,
257        value: &T,
258        headers: Flattrs,
259    ) -> Result<Self, wirevalue::Error> {
260        Ok(Self {
261            headers,
262            data: wirevalue::Any::serialize(value)?,
263            sender: source,
264            dest,
265            errors: Vec::new(),
266            ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
267            // By default, all undeliverable messages should be returned to the sender.
268            return_undeliverable: true,
269        })
270    }
271
272    /// Returns the remaining time-to-live (TTL) for this message.
273    ///
274    /// The TTL is decremented at each `MailboxSender` hop. When it
275    /// reaches 0, the message is considered expired and is returned
276    /// to the sender as undeliverable.
277    pub fn ttl(&self) -> u8 {
278        self.ttl
279    }
280
281    /// Overrides the message’s time-to-live (TTL).
282    ///
283    /// This replaces the current TTL value (normally initialized from
284    /// `config::MESSAGE_TTL_DEFAULT`) with the provided `ttl`. The
285    /// updated envelope is returned for chaining.
286    ///
287    /// # Note
288    /// The TTL is decremented at each `MailboxSender` hop, and when
289    /// it reaches 0 the message will be treated as undeliverable.
290    pub fn set_ttl(mut self, ttl: u8) -> Self {
291        self.ttl = ttl;
292        self
293    }
294
295    /// Decrements the message's TTL by one hop.
296    ///
297    /// Returns `Ok(())` if the TTL was greater than zero and
298    /// successfully decremented. If the TTL was already zero, no
299    /// decrement occurs and `Err(DeliveryError::TtlExpired)` is
300    /// returned, indicating that the message has expired and should
301    /// be treated as undeliverable.
302    fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
303        if self.ttl == 0 {
304            Err(DeliveryError::TtlExpired)
305        } else {
306            self.ttl -= 1;
307            Ok(())
308        }
309    }
310
311    /// Deserialize the message in the envelope to the provided type T.
312    pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
313        Ok(self.data.deserialized()?)
314    }
315
316    /// The serialized message.
317    pub fn data(&self) -> &wirevalue::Any {
318        &self.data
319    }
320
321    /// The message sender.
322    pub fn sender(&self) -> &reference::ActorId {
323        &self.sender
324    }
325
326    /// The destination of the message.
327    pub fn dest(&self) -> &reference::PortId {
328        &self.dest
329    }
330
331    /// The message headers.
332    pub fn headers(&self) -> &Flattrs {
333        &self.headers
334    }
335
336    /// Tells whether this is a signal message.
337    pub fn is_signal(&self) -> bool {
338        self.dest.index() == Signal::port()
339    }
340
341    /// Set a delivery error for the message. If errors are already set, append
342    /// it to the existing errors.
343    pub fn set_error(&mut self, error: DeliveryError) {
344        self.errors.push(error)
345    }
346
347    /// Change the sender on the envelope in case it was set incorrectly. This
348    /// should only be used by CommActor since it is forwarding from another
349    /// sender.
350    pub fn update_sender(&mut self, sender: reference::ActorId) {
351        self.sender = sender;
352    }
353
354    /// Set to true if you want this message to be returned to sender if it cannot
355    /// reach dest. This is the default.
356    /// Set to false if you want the message to be dropped instead.
357    pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
358        self.return_undeliverable = return_undeliverable;
359    }
360
361    /// The message has been determined to be undeliverable with the
362    /// provided error. Mark the envelope with the error and return to
363    /// sender.
364    pub fn undeliverable(
365        mut self,
366        error: DeliveryError,
367        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
368    ) {
369        tracing::debug!(
370            name = "undelivered_message_attempt",
371            sender = self.sender.to_string(),
372            dest = self.dest.to_string(),
373            error = error.to_string(),
374            return_handle = %return_handle,
375        );
376        metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
377            1,
378            hyperactor_telemetry::kv_pairs!(
379                "sender_actor_id" => self.sender.to_string(),
380                "dest_actor_id" => self.dest.to_string(),
381                "message_type" => self.data.typename().unwrap_or("unknown"),
382                "error_type" =>  error.to_string(),
383            ),
384        );
385
386        self.set_error(error);
387        undeliverable::return_undeliverable(return_handle, self);
388    }
389
390    /// Get the errors of why this message was undeliverable. Empty means this
391    /// message was not determined as undeliverable.
392    pub fn errors(&self) -> &Vec<DeliveryError> {
393        &self.errors
394    }
395
396    /// Get the string representation of the errors of this message was
397    /// undeliverable. None means this message was not determined as
398    /// undeliverable.
399    pub fn error_msg(&self) -> Option<String> {
400        if self.errors.is_empty() {
401            None
402        } else {
403            Some(
404                self.errors
405                    .iter()
406                    .map(|e| e.to_string())
407                    .collect::<Vec<_>>()
408                    .join("; "),
409            )
410        }
411    }
412
413    fn open(self) -> (MessageMetadata, wirevalue::Any) {
414        let Self {
415            sender,
416            dest,
417            data,
418            errors,
419            headers,
420            ttl,
421            return_undeliverable,
422        } = self;
423
424        (
425            MessageMetadata {
426                sender,
427                dest,
428                errors,
429                headers,
430                ttl,
431                return_undeliverable,
432            },
433            data,
434        )
435    }
436
437    fn seal(metadata: MessageMetadata, data: wirevalue::Any) -> Self {
438        let MessageMetadata {
439            sender,
440            dest,
441            errors,
442            headers,
443            ttl,
444            return_undeliverable,
445        } = metadata;
446
447        Self {
448            sender,
449            dest,
450            data,
451            errors,
452            headers,
453            ttl,
454            return_undeliverable,
455        }
456    }
457
458    fn return_undeliverable(&self) -> bool {
459        self.return_undeliverable
460    }
461
462    /// Set a header value on this envelope.
463    pub fn set_header<T: Serialize>(&mut self, key: hyperactor_config::attrs::Key<T>, value: T) {
464        self.headers.set(key, value);
465    }
466}
467
468impl fmt::Display for MessageEnvelope {
469    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470        match &self.error_msg() {
471            None => write!(
472                f,
473                "{} > {}: {} {{{}}}",
474                self.sender, self.dest, self.data, self.headers
475            ),
476            Some(err) => write!(
477                f,
478                "{} > {}: {} {{{}}}: delivery error: {}",
479                self.sender, self.dest, self.data, self.headers, err
480            ),
481        }
482    }
483}
484
485/// Metadata about a message sent via a MessageEnvelope.
486#[derive(Clone)]
487pub struct MessageMetadata {
488    sender: reference::ActorId,
489    dest: reference::PortId,
490    errors: Vec<DeliveryError>,
491    headers: Flattrs,
492    ttl: u8,
493    return_undeliverable: bool,
494}
495
496/// Errors that occur during mailbox operations. Each error is associated
497/// with the mailbox's actor id.
498#[derive(Debug)]
499pub struct MailboxError {
500    actor_id: reference::ActorId,
501    kind: MailboxErrorKind,
502}
503
504/// The kinds of mailbox errors. This enum is marked non-exhaustive to
505/// allow for extensibility.
506#[derive(thiserror::Error, Debug)]
507#[non_exhaustive]
508pub enum MailboxErrorKind {
509    /// An operation was attempted on a closed mailbox.
510    #[error("mailbox closed")]
511    Closed,
512
513    /// The port associated with an operation was invalid.
514    #[error("invalid port: {0}")]
515    InvalidPort(reference::PortId),
516
517    /// There was no sender associated with the port.
518    #[error("no sender for port: {0}")]
519    NoSenderForPort(reference::PortId),
520
521    /// There was no local sender associated with the port.
522    /// Returned by operations that require a local port.
523    #[error("no local sender for port: {0}")]
524    NoLocalSenderForPort(reference::PortId),
525
526    /// The port was closed.
527    #[error("{0}: port closed")]
528    PortClosed(reference::PortId),
529
530    /// An error occured during a send operation.
531    #[error("send {0}: {1}")]
532    Send(reference::PortId, #[source] anyhow::Error),
533
534    /// An error occured during a receive operation.
535    #[error("recv {0}: {1}")]
536    Recv(reference::PortId, #[source] anyhow::Error),
537
538    /// There was a serialization failure.
539    #[error("serialize: {0}")]
540    Serialize(#[source] anyhow::Error),
541
542    /// There was a deserialization failure.
543    #[error("deserialize {0}: {1}")]
544    Deserialize(&'static str, anyhow::Error),
545
546    /// There was an error during a channel operation.
547    #[error(transparent)]
548    Channel(#[from] ChannelError),
549
550    /// The owning actor terminated (either stopped or failed).
551    #[error("owner terminated: {0}")]
552    OwnerTerminated(ActorStatus),
553}
554
555impl MailboxError {
556    /// Create a new mailbox error associated with the provided actor
557    /// id and of the given kind.
558    pub fn new(actor_id: reference::ActorId, kind: MailboxErrorKind) -> Self {
559        Self { actor_id, kind }
560    }
561
562    /// The ID of the mailbox producing this error.
563    pub fn actor_id(&self) -> &reference::ActorId {
564        &self.actor_id
565    }
566
567    /// The error's kind.
568    pub fn kind(&self) -> &MailboxErrorKind {
569        &self.kind
570    }
571}
572
573impl fmt::Display for MailboxError {
574    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
575        write!(f, "{}: ", self.actor_id)?;
576        fmt::Display::fmt(&self.kind, f)
577    }
578}
579
580impl std::error::Error for MailboxError {
581    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
582        self.kind.source()
583    }
584}
585
586/// PortLocation describes the location of a port.
587/// This is used in errors to provide a uniform data type
588/// for ports that may or may not be bound.
589#[derive(Debug, Clone)]
590pub enum PortLocation {
591    /// The port was bound: the location is its underlying bound ID.
592    Bound(reference::PortId),
593    /// The port was not bound: we provide the actor ID and the message type.
594    Unbound(reference::ActorId, &'static str),
595}
596
597impl PortLocation {
598    fn new_unbound<M: Message>(actor_id: reference::ActorId) -> Self {
599        PortLocation::Unbound(actor_id, std::any::type_name::<M>())
600    }
601
602    #[allow(dead_code)]
603    fn new_unbound_type(actor_id: reference::ActorId, ty: &'static str) -> Self {
604        PortLocation::Unbound(actor_id, ty)
605    }
606
607    /// The actor id of the location.
608    pub fn actor_id(&self) -> &reference::ActorId {
609        match self {
610            PortLocation::Bound(port_id) => port_id.actor_id(),
611            PortLocation::Unbound(actor_id, _) => actor_id,
612        }
613    }
614}
615
616impl fmt::Display for PortLocation {
617    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618        match self {
619            PortLocation::Bound(port_id) => write!(f, "{}", port_id),
620            PortLocation::Unbound(actor_id, name) => write!(f, "{}<{}>", actor_id, name),
621        }
622    }
623}
624
625/// Errors that that occur during mailbox sending operations. Each error
626/// is associated with the port ID of the operation.
627#[derive(Debug)]
628pub struct MailboxSenderError {
629    location: Box<PortLocation>,
630    kind: Box<MailboxSenderErrorKind>,
631}
632
633/// The kind of mailbox sending errors.
634#[derive(thiserror::Error, Debug)]
635pub enum MailboxSenderErrorKind {
636    /// Error during serialization.
637    #[error("serialization error: {0}")]
638    Serialize(anyhow::Error),
639
640    /// Error during deserialization.
641    #[error("deserialization error for type {0}: {1}")]
642    Deserialize(&'static str, anyhow::Error),
643
644    /// A send to an invalid port.
645    #[error("invalid port")]
646    Invalid,
647
648    /// A send to a closed port.
649    #[error("port closed")]
650    Closed,
651
652    // The following pass through underlying errors:
653    /// An underlying mailbox error.
654    #[error(transparent)]
655    Mailbox(#[from] MailboxError),
656
657    /// An underlying channel error.
658    #[error(transparent)]
659    Channel(#[from] ChannelError),
660
661    /// An underlying message log error.
662    #[error(transparent)]
663    MessageLog(#[from] MessageLogError),
664
665    /// An other, uncategorized error.
666    #[error("send error: {0}")]
667    Other(#[from] anyhow::Error),
668
669    /// The destination was unreachable.
670    #[error("unreachable: {0}")]
671    Unreachable(anyhow::Error),
672}
673
674impl MailboxSenderError {
675    /// Create a new mailbox sender error to an unbound port.
676    pub fn new_unbound<M>(actor_id: reference::ActorId, kind: MailboxSenderErrorKind) -> Self {
677        Self {
678            location: Box::new(PortLocation::Unbound(actor_id, std::any::type_name::<M>())),
679            kind: Box::new(kind),
680        }
681    }
682
683    /// Create a new mailbox sender, manually providing the type.
684    pub fn new_unbound_type(
685        actor_id: reference::ActorId,
686        kind: MailboxSenderErrorKind,
687        ty: &'static str,
688    ) -> Self {
689        Self {
690            location: Box::new(PortLocation::Unbound(actor_id, ty)),
691            kind: Box::new(kind),
692        }
693    }
694
695    /// Create a new mailbox sender error with the provided port ID and kind.
696    pub fn new_bound(port_id: reference::PortId, kind: MailboxSenderErrorKind) -> Self {
697        Self {
698            location: Box::new(PortLocation::Bound(port_id)),
699            kind: Box::new(kind),
700        }
701    }
702
703    /// The location at which the error occured.
704    pub fn location(&self) -> &PortLocation {
705        &self.location
706    }
707
708    /// The kind associated with the error.
709    pub fn kind(&self) -> &MailboxSenderErrorKind {
710        &self.kind
711    }
712}
713
714impl fmt::Display for MailboxSenderError {
715    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
716        write!(f, "{}: ", self.location)?;
717        fmt::Display::fmt(&self.kind, f)
718    }
719}
720
721impl std::error::Error for MailboxSenderError {
722    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
723        self.kind.source()
724    }
725}
726
727/// MailboxSenders can send messages through ports to mailboxes. It
728/// provides a unified interface for message delivery in the system.
729pub trait MailboxSender: Send + Sync + Any {
730    /// Apply hop semantics (TTL decrement; undeliverable on 0), then
731    /// delegate to transport.
732    fn post(
733        &self,
734        mut envelope: MessageEnvelope,
735        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
736    ) {
737        if let Err(err) = envelope.dec_ttl_or_err() {
738            envelope.undeliverable(err, return_handle);
739            return;
740        }
741        self.post_unchecked(envelope, return_handle);
742    }
743
744    /// Raw transport: **no** policy.
745    fn post_unchecked(
746        &self,
747        envelope: MessageEnvelope,
748        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
749    );
750}
751
752/// PortSender extends [`MailboxSender`] by providing typed endpoints
753/// for sending messages over ports
754pub trait PortSender: MailboxSender {
755    /// Deliver a message to the provided port.
756    fn serialize_and_send<M: RemoteMessage>(
757        &self,
758        port: &reference::PortRef<M>,
759        message: M,
760        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
761    ) -> Result<(), MailboxSenderError> {
762        // TODO: convert this to a undeliverable error also
763        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
764            MailboxSenderError::new_bound(
765                port.port_id().clone(),
766                MailboxSenderErrorKind::Serialize(err.into()),
767            )
768        })?;
769        self.post(
770            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
771            return_handle,
772        );
773        Ok(())
774    }
775
776    /// Deliver a message to a one-shot port, consuming the provided port,
777    /// which is not reusable.
778    fn serialize_and_send_once<M: RemoteMessage>(
779        &self,
780        once_port: reference::OncePortRef<M>,
781        message: M,
782        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
783    ) -> Result<(), MailboxSenderError> {
784        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
785            MailboxSenderError::new_bound(
786                once_port.port_id().clone(),
787                MailboxSenderErrorKind::Serialize(err.into()),
788            )
789        })?;
790        self.post(
791            MessageEnvelope::new_unknown(once_port.port_id().clone(), serialized),
792            return_handle,
793        );
794        Ok(())
795    }
796}
797
798impl<T: ?Sized + MailboxSender> PortSender for T {}
799
800/// A perpetually closed mailbox sender. Panics if any messages are posted.
801/// Useful for tests, or where there is no meaningful mailbox sender
802/// implementation available.
803#[derive(Debug, Clone)]
804pub struct PanickingMailboxSender;
805
806impl MailboxSender for PanickingMailboxSender {
807    fn post_unchecked(
808        &self,
809        envelope: MessageEnvelope,
810        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
811    ) {
812        panic!("panic! in the mailbox! attempted post: {}", envelope)
813    }
814}
815
816/// A mailbox sender for undeliverable messages. This will simply record
817/// any undelivered messages.
818#[derive(Debug)]
819pub struct UndeliverableMailboxSender;
820
821impl MailboxSender for UndeliverableMailboxSender {
822    fn post_unchecked(
823        &self,
824        envelope: MessageEnvelope,
825        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
826    ) {
827        let sender_name = envelope.sender.name();
828        let error_str = envelope.error_msg().unwrap_or("".to_string());
829        // The undeliverable message was unable to be delivered back to the
830        // sender for some reason
831        tracing::error!(
832            name = "undelivered_message_abandoned",
833            actor_name = sender_name,
834            actor_id = envelope.sender.to_string(),
835            dest = envelope.dest.to_string(),
836            headers = envelope.headers().to_string(), // todo: implement tracing::Value for Flattrs
837            data = envelope.data().to_string(),
838            return_handle = %return_handle,
839            "message not delivered, {}",
840            error_str,
841        );
842    }
843}
844
845struct Buffer<T: Message> {
846    queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
847    #[allow(dead_code)]
848    processed: watch::Receiver<usize>,
849    seq: AtomicUsize,
850}
851
852impl<T: Message> Buffer<T> {
853    fn new<Fut>(
854        process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
855    ) -> Self
856    where
857        Fut: Future<Output = ()> + Send + 'static,
858    {
859        let (queue, mut next) = mpsc::unbounded_channel();
860        let (last_processed, processed) = watch::channel(0);
861        crate::init::get_runtime().spawn(async move {
862            let mut seq = 0;
863            while let Some((msg, return_handle)) = next.recv().await {
864                process(msg, return_handle).await;
865                seq += 1;
866                let _ = last_processed.send(seq);
867            }
868        });
869        Self {
870            queue,
871            processed,
872            seq: AtomicUsize::new(0),
873        }
874    }
875
876    #[allow(clippy::result_large_err)]
877    fn send(
878        &self,
879        item: (T, PortHandle<Undeliverable<T>>),
880    ) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
881        self.seq.fetch_add(1, Ordering::SeqCst);
882        self.queue.send(item)?;
883        Ok(())
884    }
885
886    #[allow(dead_code)]
887    async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
888        let seq = self.seq.load(Ordering::SeqCst);
889        while *self.processed.borrow_and_update() < seq {
890            self.processed.changed().await?;
891        }
892        Ok(())
893    }
894}
895
896static BOXED_PANICKING_MAILBOX_SENDER: LazyLock<BoxedMailboxSender> =
897    LazyLock::new(|| BoxedMailboxSender::new(PanickingMailboxSender));
898
899/// Convenience boxing implementation for MailboxSender. Most APIs
900/// are parameterized on MailboxSender implementations, and it's thus
901/// difficult to work with dyn values.  BoxedMailboxSender bridges this
902/// gap by providing a concrete MailboxSender which dispatches using an
903/// underlying (boxed) dyn.
904#[derive(Clone)]
905pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
906
907impl fmt::Debug for BoxedMailboxSender {
908    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
909        f.debug_struct("BoxedMailboxSender")
910            .field("sender", &"<dyn MailboxSender>")
911            .finish()
912    }
913}
914
915impl BoxedMailboxSender {
916    /// Create a new boxed sender given the provided sender implementation.
917    pub fn new(sender: impl MailboxSender + 'static) -> Self {
918        Self(Arc::new(sender))
919    }
920
921    /// Attempts to downcast the inner sender to the given concrete
922    /// type.
923    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
924        (&*self.0 as &dyn Any).downcast_ref::<T>()
925    }
926}
927
928/// Extension trait that creates a boxed clone of a MailboxSender.
929pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
930    /// A boxed clone of this MailboxSender.
931    fn boxed(&self) -> BoxedMailboxSender;
932}
933impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
934    fn boxed(&self) -> BoxedMailboxSender {
935        BoxedMailboxSender::new(self.clone())
936    }
937}
938
939/// Extension trait that rehomes a MailboxSender into a BoxedMailboxSender.
940pub trait IntoBoxedMailboxSender: MailboxSender {
941    /// Rehome this MailboxSender into a BoxedMailboxSender.
942    fn into_boxed(self) -> BoxedMailboxSender;
943}
944impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
945    fn into_boxed(self) -> BoxedMailboxSender {
946        BoxedMailboxSender::new(self)
947    }
948}
949
950impl MailboxSender for BoxedMailboxSender {
951    fn post_unchecked(
952        &self,
953        envelope: MessageEnvelope,
954        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
955    ) {
956        self.0.post_unchecked(envelope, return_handle);
957    }
958}
959
960/// Errors that occur during mailbox serving.
961#[derive(thiserror::Error, Debug)]
962pub enum MailboxServerError {
963    /// An underlying channel error.
964    #[error(transparent)]
965    Channel(#[from] ChannelError),
966
967    /// An underlying mailbox sender error.
968    #[error(transparent)]
969    MailboxSender(#[from] MailboxSenderError),
970}
971
972/// Represents a running [`MailboxServer`]. The handle composes a
973/// ['tokio::task::JoinHandle'] and may be joined in the same manner.
974#[derive(Debug)]
975pub struct MailboxServerHandle {
976    join_handle: JoinHandle<Result<(), MailboxServerError>>,
977    stopped_tx: watch::Sender<bool>,
978}
979
980impl MailboxServerHandle {
981    /// Signal the server to stop serving the mailbox. The caller should
982    /// join the handle by awaiting the [`MailboxServerHandle`] future.
983    ///
984    /// Stop should be called at most once.
985    pub fn stop(&self, reason: &str) {
986        tracing::info!("stopping mailbox server; reason: {}", reason);
987        self.stopped_tx.send(true).expect("stop called twice");
988    }
989}
990
991/// Forward future implementation to underlying handle.
992impl Future for MailboxServerHandle {
993    type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
994
995    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
996        // SAFETY: This is safe to do because self is pinned.
997        let join_handle_pinned =
998            unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
999        join_handle_pinned.poll(cx)
1000    }
1001}
1002
1003/// Serve a port on the provided [`channel::Rx`]. This dispatches all
1004/// channel messages directly to the port.
1005pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
1006    /// Serve the provided port on the given channel on this sender on
1007    /// a background task which may be joined with the returned handle.
1008    /// The task fails on any send error.
1009    fn serve(
1010        self,
1011        mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
1012    ) -> MailboxServerHandle {
1013        // A `MailboxServer` can receive a message that couldn't
1014        // reach its destination. We can use the fact that servers are
1015        // `MailboxSender`s to attempt to forward them back to their
1016        // senders.
1017        let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
1018        let server = self.clone();
1019        tokio::task::spawn(async move {
1020            // Create a client for this task.
1021            static NEXT_RANK: AtomicUsize = AtomicUsize::new(0);
1022            let rank = NEXT_RANK.fetch_add(1, Ordering::Relaxed);
1023            let addr = ChannelAddr::any(ChannelTransport::Local);
1024            let proc_id = reference::ProcId::unique(addr, format!("mailbox_server_{}", rank));
1025            // Use this mailbox server as the forwarder, so we can use it to
1026            // return message back to the sender.
1027            let proc = Proc::configured(proc_id, BoxedMailboxSender::new(server));
1028            let (client, _) = proc.instance("undeliverable_supervisor").unwrap();
1029            while let Ok(Undeliverable(mut envelope)) = undeliverable_rx.recv().await {
1030                if let Ok(Undeliverable(e)) =
1031                    envelope.deserialized::<Undeliverable<MessageEnvelope>>()
1032                {
1033                    // A non-returnable undeliverable.
1034                    UndeliverableMailboxSender.post(e, monitored_return_handle());
1035                    continue;
1036                }
1037                envelope.set_error(DeliveryError::BrokenLink(
1038                    "message was undeliverable".to_owned(),
1039                ));
1040                let return_port =
1041                    reference::PortRef::<Undeliverable<MessageEnvelope>>::attest_message_port(
1042                        envelope.sender(),
1043                    );
1044                return_port.send_serialized(
1045                    &client,
1046                    Flattrs::new(),
1047                    wirevalue::Any::serialize(&Undeliverable(envelope)).unwrap(),
1048                );
1049            }
1050        });
1051
1052        let (stopped_tx, mut stopped_rx) = watch::channel(false);
1053        let join_handle = tokio::spawn(async move {
1054            let mut detached = false;
1055
1056            let result = loop {
1057                if *stopped_rx.borrow_and_update() {
1058                    break Ok(());
1059                }
1060
1061                tokio::select! {
1062                    message = rx.recv() => {
1063                        match message {
1064                            // Relay the message to the port directly.
1065                            Ok(envelope) => self.post(envelope, return_handle.clone()),
1066
1067                            // Closed is a "graceful" error in this case.
1068                            // We simply stop serving.
1069                            Err(ChannelError::Closed) => break Ok(()),
1070                            Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1071                        }
1072                    }
1073                    result = stopped_rx.changed(), if !detached  => {
1074                        detached = result.is_err();
1075                        if detached {
1076                            tracing::debug!(
1077                                "the mailbox server is detached for Rx {}", rx.addr()
1078                            );
1079                        } else {
1080                            tracing::debug!(
1081                                "the mailbox server is stopped for Rx {}", rx.addr()
1082                            );
1083                        }
1084                    }
1085                }
1086            };
1087
1088            // Join the channel receiver to ensure pending acks are
1089            // sent before the underlying channel server is torn down.
1090            rx.join().await;
1091
1092            result
1093        });
1094
1095        MailboxServerHandle {
1096            join_handle,
1097            stopped_tx,
1098        }
1099    }
1100}
1101
1102impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1103
1104/// A mailbox server client that transmits messages on a Tx channel.
1105pub struct MailboxClient {
1106    // The unbounded sender.
1107    buffer: Buffer<MessageEnvelope>,
1108
1109    // To cancel monitoring tx health.
1110    _tx_monitoring: CancellationToken,
1111}
1112
1113impl fmt::Debug for MailboxClient {
1114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1115        f.debug_struct("MailboxClient")
1116            .field("buffer", &"<Buffer>")
1117            .finish()
1118    }
1119}
1120
1121impl MailboxClient {
1122    /// Create a new client that sends messages destined for a
1123    /// [`MailboxServer`] on the provided Tx channel.
1124    pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1125        let addr = tx.addr();
1126        let tx = Arc::new(tx);
1127        let tx_status = tx.status().clone();
1128        let tx_monitoring = CancellationToken::new();
1129        let buffer = Buffer::new(move |envelope, return_handle| {
1130            let tx = Arc::clone(&tx);
1131            let (return_channel, return_receiver) =
1132                oneshot::channel::<SendError<MessageEnvelope>>();
1133            // Set up for delivery failure.
1134            let return_handle_0 = return_handle.clone();
1135            tokio::spawn(async move {
1136                if let Ok(SendError {
1137                    error,
1138                    message,
1139                    reason,
1140                }) = return_receiver.await
1141                {
1142                    message.undeliverable(
1143                        DeliveryError::BrokenLink(format!(
1144                            "failed to enqueue in MailboxClient when processing buffer: {error} with reason {reason:?}"
1145                        )),
1146                        return_handle_0,
1147                    );
1148                }
1149            });
1150            // Send the message for transmission.
1151            tx.try_post(envelope, return_channel);
1152            future::ready(())
1153        });
1154        let this = Self {
1155            buffer,
1156            _tx_monitoring: tx_monitoring.clone(),
1157        };
1158        Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1159        this
1160    }
1161
1162    /// Convenience constructor, to set up a mailbox client that forwards messages
1163    /// to the provided address.
1164    pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1165        Ok(MailboxClient::new(channel::dial(addr)?))
1166    }
1167
1168    // Set up a watch for the tx's health.
1169    fn monitor_tx_health(
1170        mut rx: watch::Receiver<TxStatus>,
1171        cancel_token: CancellationToken,
1172        addr: ChannelAddr,
1173    ) {
1174        crate::init::get_runtime().spawn(async move {
1175            loop {
1176                tokio::select! {
1177                    changed = rx.changed() => {
1178                        if changed.is_err() || *rx.borrow() == TxStatus::Closed {
1179                            tracing::warn!("connection to {} lost", addr);
1180                            // TODO: Potential for supervision event
1181                            // interaction here.
1182                            break;
1183                        }
1184                    }
1185                    _ = cancel_token.cancelled() => {
1186                        break;
1187                    }
1188                }
1189            }
1190        });
1191    }
1192}
1193
1194impl MailboxSender for MailboxClient {
1195    #[tracing::instrument(level = "debug", skip_all)]
1196    fn post_unchecked(
1197        &self,
1198        envelope: MessageEnvelope,
1199        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1200    ) {
1201        tracing::event!(target:"messages", tracing::Level::TRACE,  "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.actor_id(), "port"= envelope.dest.index(), "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1202        if let Err(mpsc::error::SendError((envelope, return_handle))) =
1203            self.buffer.send((envelope, return_handle))
1204        {
1205            let err = DeliveryError::BrokenLink(
1206                "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
1207            );
1208
1209            // Failed to enqueue.
1210            envelope.undeliverable(err, return_handle);
1211        }
1212    }
1213}
1214
1215/// Wrapper to turn `PortRef` into a `Sink`.
1216pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1217    cx: C,
1218    port: reference::PortRef<M>,
1219}
1220
1221impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1222    /// Create new PortSink
1223    pub fn new(cx: C, port: reference::PortRef<M>) -> Self {
1224        Self { cx, port }
1225    }
1226}
1227
1228impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1229    type Error = MailboxSenderError;
1230
1231    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1232        Poll::Ready(Ok(()))
1233    }
1234
1235    fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1236        self.port.send(&self.cx, item)
1237    }
1238
1239    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1240        Poll::Ready(Ok(()))
1241    }
1242
1243    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1244        Poll::Ready(Ok(()))
1245    }
1246}
1247
1248/// A mailbox coordinates message delivery to actors through typed
1249/// [`Port`]s associated with the mailbox.
1250#[derive(Clone, Debug)]
1251pub struct Mailbox {
1252    inner: Arc<State>,
1253}
1254
1255impl Mailbox {
1256    /// Create a new mailbox associated with the provided actor ID, using the provided
1257    /// forwarder for external destinations.
1258    pub fn new(actor_id: reference::ActorId, forwarder: BoxedMailboxSender) -> Self {
1259        Self {
1260            inner: Arc::new(State::new(actor_id, forwarder)),
1261        }
1262    }
1263
1264    /// Create a new detached mailbox associated with the provided actor ID.
1265    pub fn new_detached(actor_id: reference::ActorId) -> Self {
1266        Self {
1267            inner: Arc::new(State::new(actor_id, BOXED_PANICKING_MAILBOX_SENDER.clone())),
1268        }
1269    }
1270
1271    /// The actor id associated with this mailbox.
1272    pub fn actor_id(&self) -> &reference::ActorId {
1273        &self.inner.actor_id
1274    }
1275
1276    /// Open a new port that accepts M-typed messages. The returned
1277    /// port may be freely cloned, serialized, and passed around. The
1278    /// returned receiver should only be retained by the actor responsible
1279    /// for processing the delivered messages.
1280    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1281        let port_index = self.inner.allocate_port();
1282        let (sender, receiver) = mpsc::unbounded_channel::<M>();
1283        let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1284        tracing::trace!(
1285            name = "open_port",
1286            "opening port for {} at {}",
1287            self.inner.actor_id,
1288            port_id
1289        );
1290        (
1291            PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1292            PortReceiver::new(receiver, port_id, /*coalesce=*/ false, self.clone()),
1293        )
1294    }
1295
1296    /// Bind this message's actor port to this actor's mailbox. This method is
1297    /// normally used:
1298    ///   1. when we need to intercept a message sent to a handler, and re-route
1299    ///      that message to the returned receiver;
1300    ///   2. mock this message's handler when it is not implemented for this actor
1301    ///      type, with the returned receiver.
1302    pub(crate) fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1303        let (handle, receiver) = self.open_port();
1304        handle.bind_actor_port();
1305        (handle, receiver)
1306    }
1307
1308    /// Open a new port with an accumulator with default reduce options.
1309    /// See [`open_accum_port_opts`] for more details.
1310    pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1311    where
1312        A: Accumulator + Send + Sync + 'static,
1313        A::Update: Message,
1314        A::State: Message + Default + Clone,
1315    {
1316        self.open_accum_port_opts(accum, StreamingReducerOpts::default())
1317    }
1318
1319    /// Open a new port with an accumulator. This port accepts A::Update type
1320    /// messages, accumulate them into A::State with the given accumulator.
1321    /// The latest changed state can be received from the returned receiver as
1322    /// a single A::State message. If there is no new update, the receiver will
1323    /// not receive any message.
1324    ///
1325    /// If provided, reducer mode controls reduce operations.
1326    pub fn open_accum_port_opts<A>(
1327        &self,
1328        accum: A,
1329        streaming_opts: StreamingReducerOpts,
1330    ) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1331    where
1332        A: Accumulator + Send + Sync + 'static,
1333        A::Update: Message,
1334        A::State: Message + Default + Clone,
1335    {
1336        let port_index = self.inner.allocate_port();
1337        let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1338        let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1339        let state = Mutex::new(A::State::default());
1340        let reducer_spec = accum.reducer_spec();
1341        let enqueue = move |_, update: A::Update| {
1342            let mut state = state.lock().unwrap();
1343            accum.accumulate(&mut state, update)?;
1344            let _ = sender.send(state.clone());
1345            Ok(())
1346        };
1347        (
1348            PortHandle {
1349                mailbox: self.clone(),
1350                port_index,
1351                sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1352                bound: Arc::new(RwLock::new(None)),
1353                reducer_spec,
1354                streaming_opts,
1355            },
1356            PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()),
1357        )
1358    }
1359
1360    /// Open a port that accepts M-typed messages, using the provided function
1361    /// to enqueue.
1362    // TODO: consider making lifetime bound to Self instead.
1363    pub(crate) fn open_enqueue_port<M: Message>(
1364        &self,
1365        enqueue: impl Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1366    ) -> PortHandle<M> {
1367        PortHandle {
1368            mailbox: self.clone(),
1369            port_index: self.inner.allocate_port(),
1370            sender: UnboundedPortSender::Func(Arc::new(enqueue)),
1371            bound: Arc::new(RwLock::new(None)),
1372            reducer_spec: None,
1373            streaming_opts: StreamingReducerOpts::default(),
1374        }
1375    }
1376
1377    /// Open a new one-shot port that accepts M-typed messages. The
1378    /// returned port may be used to send a single message; ditto the
1379    /// receiver may receive a single message.
1380    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1381        let port_index = self.inner.allocate_port();
1382        let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1383        let (sender, receiver) = oneshot::channel::<M>();
1384        (
1385            OncePortHandle {
1386                mailbox: self.clone(),
1387                port_index,
1388                port_id: port_id.clone(),
1389                sender,
1390                reducer_spec: None,
1391            },
1392            OncePortReceiver {
1393                receiver: Some(receiver),
1394                port_id,
1395                mailbox: self.clone(),
1396            },
1397        )
1398    }
1399
1400    /// Open a new one-shot port with a reducer. This port is designed
1401    /// to be used with casting, where the port is split across multiple
1402    /// destinations and responses are accumulated using the reducer.
1403    /// The accumulator type must have a ReducerSpec.
1404    ///
1405    /// The returned handle can be bound and embedded in cast messages.
1406    /// When the message is split by CommActor, each destination receives a
1407    /// split port. Responses to split ports are accumulated using the
1408    /// accumulator's reducer, and the final accumulated result is delivered
1409    /// to the returned receiver.
1410    ///
1411    /// Note: For accumulators used with casting, `Update` and `State` types
1412    /// must be the same (e.g., `sum<u64>` where both are `u64`).
1413    pub fn open_reduce_port<A, T>(
1414        &self,
1415        accum: A,
1416    ) -> (OncePortHandle<A::State>, OncePortReceiver<A::State>)
1417    where
1418        A: Accumulator<State = T, Update = T> + Send + Sync + 'static,
1419        T: Message + Default + Clone,
1420    {
1421        let port_index = self.inner.allocate_port();
1422        let (sender, receiver) = oneshot::channel::<T>();
1423        let port_id = reference::PortId::new(self.inner.actor_id.clone(), port_index);
1424        let reducer_spec = accum.reducer_spec();
1425        assert!(
1426            reducer_spec.is_some(),
1427            "cannot use a reduce port without a ReducerSpec"
1428        );
1429
1430        (
1431            OncePortHandle {
1432                mailbox: self.clone(),
1433                port_index,
1434                port_id: port_id.clone(),
1435                sender,
1436                reducer_spec,
1437            },
1438            OncePortReceiver {
1439                receiver: Some(receiver),
1440                port_id,
1441                mailbox: self.clone(),
1442            },
1443        )
1444    }
1445
1446    #[allow(dead_code)]
1447    fn error(&self, err: MailboxErrorKind) -> MailboxError {
1448        MailboxError::new(self.inner.actor_id.clone(), err)
1449    }
1450
1451    fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1452        let port_index = M::port();
1453        self.inner.ports.get(&port_index).and_then(|boxed| {
1454            boxed
1455                .as_any()
1456                .downcast_ref::<UnboundedSender<M>>()
1457                .map(|s| {
1458                    assert_eq!(
1459                        s.port_id,
1460                        self.actor_id().port_id(port_index),
1461                        "port_id mismatch in downcasted UnboundedSender"
1462                    );
1463                    s.sender.clone()
1464                })
1465        })
1466    }
1467
1468    /// Retrieve the bound undeliverable message port handle.
1469    pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1470        self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1471            .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1472    }
1473
1474    pub(crate) fn allocate_port(&self) -> u64 {
1475        self.inner.allocate_port()
1476    }
1477
1478    fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> reference::PortRef<M> {
1479        assert_eq!(
1480            handle.mailbox.actor_id(),
1481            self.actor_id(),
1482            "port does not belong to mailbox"
1483        );
1484
1485        // TODO: don't even allocate a port until the port is bound. Possibly
1486        // have handles explicitly staged (unbound, bound).
1487        let port_id = self.actor_id().port_id(handle.port_index);
1488        match self.inner.ports.entry(handle.port_index) {
1489            Entry::Vacant(entry) => {
1490                entry.insert(Box::new(UnboundedSender::new(
1491                    handle.sender.clone(),
1492                    port_id.clone(),
1493                )));
1494            }
1495            Entry::Occupied(_entry) => {}
1496        }
1497
1498        reference::PortRef::attest(port_id)
1499    }
1500
1501    fn bind_to_actor_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
1502        assert_eq!(
1503            handle.mailbox.actor_id(),
1504            self.actor_id(),
1505            "port does not belong to mailbox"
1506        );
1507
1508        let port_index = M::port();
1509        let port_id = self.actor_id().port_id(port_index);
1510        match self.inner.ports.entry(port_index) {
1511            Entry::Vacant(entry) => {
1512                entry.insert(Box::new(UnboundedSender::new(
1513                    handle.sender.clone(),
1514                    port_id,
1515                )));
1516            }
1517            Entry::Occupied(_entry) => panic!("port {} already bound", port_id),
1518        }
1519    }
1520
1521    fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1522        let port_id = handle.port_id().clone();
1523        match self.inner.ports.entry(handle.port_index) {
1524            Entry::Vacant(entry) => {
1525                entry.insert(Box::new(OnceSender::new(handle.sender, port_id.clone())));
1526            }
1527            Entry::Occupied(_entry) => {}
1528        }
1529    }
1530
1531    pub(crate) fn bind_untyped(&self, port_id: &reference::PortId, sender: UntypedUnboundedSender) {
1532        assert_eq!(
1533            port_id.actor_id(),
1534            self.actor_id(),
1535            "port does not belong to mailbox"
1536        );
1537
1538        match self.inner.ports.entry(port_id.index()) {
1539            Entry::Vacant(entry) => {
1540                entry.insert(Box::new(sender));
1541            }
1542            Entry::Occupied(_entry) => {}
1543        }
1544    }
1545
1546    pub(crate) fn close(&self, status: ActorStatus) {
1547        let mut closed = self.inner.closed.write().unwrap();
1548        if closed.is_some() {
1549            panic!("mailbox with owner {} already closed", self.actor_id());
1550        }
1551        let _ = closed.insert(status);
1552    }
1553}
1554
1555impl context::Mailbox for Mailbox {
1556    fn mailbox(&self) -> &Mailbox {
1557        self
1558    }
1559}
1560
1561// TODO: figure out what to do with these interfaces -- possibly these caps
1562// do not have to be private.
1563
1564/// Open a port given a capability.
1565pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1566    cx.mailbox().open_port()
1567}
1568
1569/// Open a one-shot port given a capability. This is a public method primarily to
1570/// enable macro-generated clients.
1571pub fn open_once_port<M: Message>(
1572    cx: &impl context::Mailbox,
1573) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1574    cx.mailbox().open_once_port()
1575}
1576
1577impl MailboxSender for Mailbox {
1578    /// Deliver a serialized message to the provided port ID. This method fails
1579    /// if the message does not deserialize into the expected type.
1580    fn post_unchecked(
1581        &self,
1582        envelope: MessageEnvelope,
1583        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1584    ) {
1585        metrics::MAILBOX_POSTS.add(
1586            1,
1587            hyperactor_telemetry::kv_pairs!(
1588                "actor_id" => envelope.sender.to_string(),
1589                "dest_actor_id" => envelope.dest.actor_id().to_string(),
1590            ),
1591        );
1592        tracing::trace!(
1593            name = "post",
1594            actor_name = envelope.sender.name(),
1595            actor_id = envelope.sender.to_string(),
1596            "posting message to {}",
1597            envelope.dest
1598        );
1599
1600        if envelope.dest().actor_id() != &self.inner.actor_id {
1601            return self.inner.forwarder.post(envelope, return_handle);
1602        }
1603
1604        match self.inner.ports.entry(envelope.dest().index()) {
1605            Entry::Vacant(_) => {
1606                let err = DeliveryError::Unroutable(format!(
1607                    "port not bound in mailbox; port id: {}; message type: {}",
1608                    envelope.dest().index(),
1609                    envelope.data().typename().map_or_else(
1610                        || format!("unregistered type hash {}", envelope.data().typehash()),
1611                        |s| s.to_string(),
1612                    )
1613                ));
1614
1615                envelope.undeliverable(err, return_handle);
1616            }
1617            Entry::Occupied(entry) => {
1618                let closed = self.inner.closed.read().unwrap();
1619                if let Some(status) = &*closed {
1620                    match status {
1621                        ActorStatus::Stopped(reason) => {
1622                            let err = format!(
1623                                "mailbox owner {} is stopped: {}",
1624                                self.inner.actor_id, reason
1625                            );
1626                            return envelope
1627                                .undeliverable(DeliveryError::Mailbox(err), return_handle);
1628                        }
1629                        ActorStatus::Failed(actor_error) => {
1630                            let err = format!(
1631                                "mailbox owner {} failed: {}",
1632                                self.inner.actor_id, actor_error
1633                            );
1634                            return envelope
1635                                .undeliverable(DeliveryError::Mailbox(err), return_handle);
1636                        }
1637                        _ => {
1638                            let err = format!(
1639                                "mailbox owner {} closed unexpectedly: {:?}",
1640                                self.inner.actor_id, status
1641                            );
1642                            return envelope
1643                                .undeliverable(DeliveryError::Mailbox(err), return_handle);
1644                        }
1645                    }
1646                }
1647
1648                let (metadata, data) = envelope.open();
1649                let MessageMetadata {
1650                    mut headers,
1651                    sender,
1652                    dest,
1653                    errors: metadata_errors,
1654                    ttl,
1655                    return_undeliverable,
1656                } = metadata;
1657
1658                let to_actor_id = hash_to_u64(&dest);
1659                let message_id = hyperactor_telemetry::generate_message_id(to_actor_id);
1660                headers.set(crate::mailbox::headers::TELEMETRY_MESSAGE_ID, message_id);
1661                // Only set sender hash if not already present (cast path
1662                // pre-sets it with the originating actor).
1663                if !headers.contains_key(crate::mailbox::headers::SENDER_ACTOR_ID_HASH) {
1664                    headers.set(
1665                        crate::mailbox::headers::SENDER_ACTOR_ID_HASH,
1666                        hash_to_u64(&sender),
1667                    );
1668                }
1669                headers.set(crate::mailbox::headers::TELEMETRY_PORT_ID, dest.index());
1670
1671                // We use the entry API here so that we can remove the
1672                // entry while holding an (entry) reference. The DashMap
1673                // documentation suggests that deadlocks are possible
1674                // "when holding any sort of reference into the map",
1675                // but surely this applies only to the same thread? This
1676                // would also imply we have to be careful holding any
1677                // sort of reference across .await points.
1678                match entry.get().send_serialized(headers, data) {
1679                    Ok(false) => {
1680                        hyperactor_telemetry::notify_message_status(
1681                            hyperactor_telemetry::MessageStatusEvent {
1682                                timestamp: std::time::SystemTime::now(),
1683                                id: hyperactor_telemetry::generate_status_event_id(message_id),
1684                                message_id,
1685                                status: "queued".to_string(),
1686                            },
1687                        );
1688                        entry.remove();
1689                    }
1690                    Ok(true) => {
1691                        hyperactor_telemetry::notify_message_status(
1692                            hyperactor_telemetry::MessageStatusEvent {
1693                                timestamp: std::time::SystemTime::now(),
1694                                id: hyperactor_telemetry::generate_status_event_id(message_id),
1695                                message_id,
1696                                status: "queued".to_string(),
1697                            },
1698                        );
1699                    }
1700                    Err(SerializedSenderError {
1701                        data,
1702                        error: sender_error,
1703                        headers,
1704                    }) => {
1705                        entry.remove();
1706                        let err = DeliveryError::Mailbox(format!("{}", sender_error));
1707
1708                        MessageEnvelope::seal(
1709                            MessageMetadata {
1710                                headers,
1711                                sender,
1712                                dest,
1713                                errors: metadata_errors,
1714                                ttl,
1715                                return_undeliverable,
1716                            },
1717                            data,
1718                        )
1719                        .undeliverable(err, return_handle)
1720                    }
1721                }
1722            }
1723        }
1724    }
1725}
1726
1727/// A port to which M-typed messages can be delivered. Ports may be
1728/// serialized to be sent to other actors. However, when a port is
1729/// deserialized, it may no longer be used to send messages directly
1730/// to a mailbox since it is no longer associated with a local mailbox
1731/// ([`Mailbox::send`] will fail). However, the runtime may accept
1732/// remote Ports, and arrange for these messages to be delivered
1733/// indirectly through inter-node message passing.
1734#[derive(Debug)]
1735pub struct PortHandle<M: Message> {
1736    mailbox: Mailbox,
1737    port_index: u64,
1738    sender: UnboundedPortSender<M>,
1739    // We would like this to be a Arc<RwLock<Option<PortRef<M>>>>, but we cannot
1740    // write down the type PortRef<M> (M: Message), even though we cannot
1741    // legally construct such a value without M: RemoteMessage. We could consider
1742    // making PortRef<M> valid for M: Message, but constructible only for
1743    // M: RemoteMessage, but the guarantees offered by the impossibilty of even
1744    // writing down the type are appealing.
1745    bound: Arc<RwLock<Option<reference::PortId>>>,
1746    // Typehash of an optional reducer. When it's defined, we include it in port
1747    /// references to optionally enable incremental accumulation.
1748    reducer_spec: Option<ReducerSpec>,
1749    /// Streaming reducer options.
1750    streaming_opts: StreamingReducerOpts,
1751}
1752
1753impl<M: Message> PortHandle<M> {
1754    fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
1755        Self {
1756            mailbox,
1757            port_index,
1758            sender,
1759            bound: Arc::new(RwLock::new(None)),
1760            reducer_spec: None,
1761            streaming_opts: StreamingReducerOpts::default(),
1762        }
1763    }
1764
1765    pub(crate) fn location(&self) -> PortLocation {
1766        match self.bound.read().unwrap().as_ref() {
1767            Some(port_id) => PortLocation::Bound(port_id.clone()),
1768            None => PortLocation::new_unbound::<M>(self.mailbox.actor_id().clone()),
1769        }
1770    }
1771
1772    /// Send a message to this port.
1773    pub fn send(&self, cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1774        let closed = self.mailbox.inner.closed.read().unwrap();
1775
1776        if let Some(status) = &*closed {
1777            let err = MailboxError {
1778                actor_id: self.mailbox.actor_id().clone(),
1779                kind: MailboxErrorKind::OwnerTerminated(status.clone()),
1780            };
1781            return Err(MailboxSenderError::new_unbound::<M>(
1782                self.mailbox.actor_id().clone(),
1783                MailboxSenderErrorKind::Mailbox(err),
1784            ));
1785        }
1786
1787        let mut headers = Flattrs::new();
1788
1789        crate::mailbox::headers::set_send_timestamp(&mut headers);
1790        crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1791        // Hold read lock while checking and sending to prevent race with bind().
1792        // Message sent from handle is delivered immediately. It could race with
1793        // messages from refs. So we need to assign seq if the handle is bound.
1794        let bound_guard = self.bound.read().unwrap();
1795        if let Some(bound_port) = bound_guard.as_ref() {
1796            let sequencer = cx.instance().sequencer();
1797            let seq_info = sequencer.assign_seq(bound_port);
1798            headers.set(SEQ_INFO, seq_info);
1799        } else {
1800            // Because the port is not bound, messages can only be sent through
1801            // this port handle's underlying tokio channel directly. As a result,
1802            // we do not need to assign seq to the message. We do not need to
1803            // worry about race condition due to bound_guard.
1804            headers.set(SEQ_INFO, SeqInfo::Direct);
1805        }
1806        // Encountering error means the port is closed. So we do not need to
1807        // rollback the seq, because no message can be delivered to it, and
1808        // subsequently do not need to worry about out-of-sequence for messages
1809        // after this seq.
1810        //
1811        // Theoretically, we could have deadlock if
1812        //   1. `sender.send` attempts to hold read lock of this PortHandle's
1813        //      `bound` field, and in the meantime,
1814        //   2.  another thread is trying to bind and thus waiting for write lock.
1815        // But we do not expect `sender.send` to use the same PortHandle, so this
1816        // deadlock scenario should not happen.
1817        self.sender.send(headers, message).map_err(|err| {
1818            MailboxSenderError::new_unbound::<M>(
1819                self.mailbox.actor_id().clone(),
1820                MailboxSenderErrorKind::Other(err),
1821            )
1822        })
1823    }
1824
1825    /// A contravariant map: using the provided function to translate
1826    /// `R`-typed messages to `M`-typed ones, delivered on this port.
1827    pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
1828    where
1829        R: Message,
1830        F: Fn(R) -> M + Send + Sync + 'static,
1831    {
1832        let port_index = self.mailbox.inner.allocate_port();
1833        let sender = self.sender.clone();
1834        PortHandle::new(
1835            self.mailbox.clone(),
1836            port_index,
1837            UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
1838                sender.send(headers, unmap(value))
1839            })),
1840        )
1841    }
1842}
1843
1844impl<M: RemoteMessage> PortHandle<M> {
1845    /// Bind this port, making it accessible to remote actors.
1846    pub fn bind(&self) -> reference::PortRef<M> {
1847        let port_id = {
1848            let mut guard = self.bound.write().unwrap();
1849            guard
1850                .get_or_insert_with(|| self.mailbox.bind(self).port_id().clone())
1851                .clone()
1852        };
1853        reference::PortRef::attest_reducible(
1854            port_id,
1855            self.reducer_spec.clone(),
1856            self.streaming_opts.clone(),
1857        )
1858    }
1859
1860    /// Bind to this message's actor port. This method will panic if the handle
1861    /// is already bound.
1862    ///
1863    /// This is used by [`actor::Binder`] implementations to bind actor refs.
1864    /// This is not intended for general use.
1865    pub(crate) fn bind_actor_port(&self) {
1866        let port_id = self.mailbox.actor_id().port_id(M::port());
1867        {
1868            let mut guard = self.bound.write().unwrap();
1869            if guard.is_some() {
1870                panic!(
1871                    "could not bind port handle {} as {port_id}: already bound",
1872                    self.port_index
1873                );
1874            }
1875            *guard = Some(port_id);
1876        }
1877        self.mailbox.bind_to_actor_port(self);
1878    }
1879}
1880
1881impl<M: Message> Clone for PortHandle<M> {
1882    fn clone(&self) -> Self {
1883        Self {
1884            mailbox: self.mailbox.clone(),
1885            port_index: self.port_index,
1886            sender: self.sender.clone(),
1887            bound: self.bound.clone(),
1888            reducer_spec: self.reducer_spec.clone(),
1889            streaming_opts: self.streaming_opts.clone(),
1890        }
1891    }
1892}
1893
1894impl<M: Message> fmt::Display for PortHandle<M> {
1895    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1896        fmt::Display::fmt(&self.location(), f)
1897    }
1898}
1899
1900/// A one-shot port handle to which M-typed messages can be delivered.
1901#[derive(Debug)]
1902pub struct OncePortHandle<M: Message> {
1903    mailbox: Mailbox,
1904    port_index: u64,
1905    port_id: reference::PortId,
1906    sender: oneshot::Sender<M>,
1907    reducer_spec: Option<ReducerSpec>,
1908}
1909
1910impl<M: Message> OncePortHandle<M> {
1911    /// This port's ID.
1912    // TODO: make value
1913    pub fn port_id(&self) -> &reference::PortId {
1914        &self.port_id
1915    }
1916
1917    /// Send a message to this port. The send operation will consume the
1918    /// port handle, as the port accepts at most one message.
1919    pub fn send(self, _cx: &impl context::Actor, message: M) -> Result<(), MailboxSenderError> {
1920        // TODO: Assign seq to the message if the port is bound to an actor port
1921        // in the future.
1922        assert!(
1923            !self.port_id().is_actor_port(),
1924            "OncePortHandle currently does not support actor ports; a \
1925            prerequisite of that support is to assign seq to messages \
1926            if the port is actor port."
1927        );
1928
1929        let actor_id = self.mailbox.actor_id().clone();
1930        self.sender.send(message).map_err(|_| {
1931            // Here, the value is returned when the port is
1932            // closed.  We should consider having a similar
1933            // API for send_once, though arguably it makes less
1934            // sense in this context.
1935            MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
1936        })?;
1937        Ok(())
1938    }
1939}
1940
1941impl<M: RemoteMessage> OncePortHandle<M> {
1942    /// Turn this handle into a ref that may be passed to
1943    /// a remote actor. The remote actor can then use the
1944    /// ref to send a message to the port. Creating a ref also
1945    /// binds the port, so that it is remotely writable.
1946    pub fn bind(self) -> reference::OncePortRef<M> {
1947        let port_id = self.port_id().clone();
1948        let reducer_spec = self.reducer_spec.clone();
1949        self.mailbox.clone().bind_once(self);
1950        reference::OncePortRef::attest_reducible(port_id, reducer_spec)
1951    }
1952}
1953
1954impl<M: Message> fmt::Display for OncePortHandle<M> {
1955    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1956        fmt::Display::fmt(&self.port_id(), f)
1957    }
1958}
1959
1960/// A receiver of M-typed messages, used by actors to receive messages
1961/// on open ports.
1962#[derive(Debug)]
1963pub struct PortReceiver<M> {
1964    receiver: mpsc::UnboundedReceiver<M>,
1965    port_id: reference::PortId,
1966    /// When multiple messages are put in channel, only receive the latest one
1967    /// if coalesce is true. Other messages will be discarded.
1968    coalesce: bool,
1969    /// State is used to remove the port from service when the receiver
1970    /// is dropped.
1971    mailbox: Mailbox,
1972}
1973
1974impl<M> PortReceiver<M> {
1975    fn new(
1976        receiver: mpsc::UnboundedReceiver<M>,
1977        port_id: reference::PortId,
1978        coalesce: bool,
1979        mailbox: Mailbox,
1980    ) -> Self {
1981        Self {
1982            receiver,
1983            port_id,
1984            coalesce,
1985            mailbox,
1986        }
1987    }
1988
1989    /// Tries to receive the next value for this receiver.
1990    /// This function returns `Ok(None)` if the receiver is empty
1991    /// and returns a MailboxError if the receiver is disconnected.
1992    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxError`.
1993    pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
1994        let mut next = self.receiver.try_recv();
1995        // To coalesce, drain the mpsc queue and only keep the last one.
1996        if self.coalesce
1997            && let Some(latest) = self.drain().pop()
1998        {
1999            next = Ok(latest);
2000        }
2001        match next {
2002            Ok(msg) => Ok(Some(msg)),
2003            Err(mpsc::error::TryRecvError::Empty) => Ok(None),
2004            Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
2005                self.actor_id().clone(),
2006                MailboxErrorKind::Closed,
2007            )),
2008        }
2009    }
2010
2011    /// Receive the next message from the port corresponding with this
2012    /// receiver.
2013    pub async fn recv(&mut self) -> Result<M, MailboxError> {
2014        let mut next = self.receiver.recv().await;
2015        // To coalesce, get the last message from the queue if there are
2016        // more on the mspc queue.
2017        if self.coalesce
2018            && let Some(latest) = self.drain().pop()
2019        {
2020            next = Some(latest);
2021        }
2022        next.ok_or(MailboxError::new(
2023            self.actor_id().clone(),
2024            MailboxErrorKind::Closed,
2025        ))
2026    }
2027
2028    /// Drains all available messages from the port.
2029    pub fn drain(&mut self) -> Vec<M> {
2030        let mut drained: Vec<M> = Vec::new();
2031        while let Ok(msg) = self.receiver.try_recv() {
2032            // To coalesce, discard the old message if there is any.
2033            if self.coalesce {
2034                drained.pop();
2035            }
2036            drained.push(msg);
2037        }
2038        drained
2039    }
2040
2041    fn port(&self) -> u64 {
2042        self.port_id.index()
2043    }
2044
2045    fn actor_id(&self) -> &reference::ActorId {
2046        self.port_id.actor_id()
2047    }
2048}
2049
2050impl<M> Drop for PortReceiver<M> {
2051    fn drop(&mut self) {
2052        // MARIUS: do we need to tombstone these? or should we
2053        // error out if we have removed the receiver before serializing the port ref?
2054        // ("no longer live")?
2055        self.mailbox.inner.ports.remove(&self.port());
2056    }
2057}
2058
2059impl<M> Stream for PortReceiver<M> {
2060    type Item = Result<M, MailboxError>;
2061
2062    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2063        std::pin::pin!(self.recv()).poll(cx).map(Some)
2064    }
2065}
2066
2067/// A receiver of M-typed messages from [`OncePort`]s.
2068pub struct OncePortReceiver<M> {
2069    receiver: Option<oneshot::Receiver<M>>,
2070    port_id: reference::PortId,
2071
2072    /// Mailbox is used to remove the port from service when the receiver
2073    /// is dropped.
2074    mailbox: Mailbox,
2075}
2076
2077impl<M> OncePortReceiver<M> {
2078    /// Receive message from the one-shot port associated with this
2079    /// receiver.  Recv consumes the receiver: it is no longer valid
2080    /// after this call.
2081    pub async fn recv(mut self) -> Result<M, MailboxError> {
2082        std::mem::take(&mut self.receiver)
2083            .unwrap()
2084            .await
2085            .map_err(|err| {
2086                MailboxError::new(
2087                    self.actor_id().clone(),
2088                    MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
2089                )
2090            })
2091    }
2092
2093    fn port(&self) -> u64 {
2094        self.port_id.index()
2095    }
2096
2097    fn actor_id(&self) -> &reference::ActorId {
2098        self.port_id.actor_id()
2099    }
2100}
2101
2102impl<M> Drop for OncePortReceiver<M> {
2103    fn drop(&mut self) {
2104        // MARIUS: do we need to tombstone these? or should we
2105        // error out if we have removed the receiver before serializing the port ref?
2106        // ("no longer live")?
2107        self.mailbox.inner.ports.remove(&self.port());
2108    }
2109}
2110
2111/// Error that that occur during SerializedSender's send operation.
2112pub struct SerializedSenderError {
2113    /// The headers associated with the message.
2114    pub headers: Flattrs,
2115    /// The message was tried to send.
2116    pub data: wirevalue::Any,
2117    /// The mailbox sender error that occurred.
2118    pub error: MailboxSenderError,
2119}
2120
2121/// SerializedSender encapsulates senders:
2122///   - It performs type erasure (and thus it is object-safe).
2123///   - It abstracts over [`Port`]s and [`OncePort`]s, by dynamically tracking the
2124///     validity of the underlying port.
2125trait SerializedSender: Send + Sync {
2126    /// Enables downcasting from `&dyn SerializedSender` to concrete
2127    /// types.
2128    ///
2129    /// Used by `Mailbox::lookup_sender` to downcast to
2130    /// `&UnboundedSender<M>` via `Any::downcast_ref`.
2131    fn as_any(&self) -> &dyn Any;
2132
2133    /// Send a serialized message. SerializedSender will deserialize the
2134    /// message (failing if it fails to deserialize), and then send the
2135    /// resulting message on the underlying port.
2136    ///
2137    /// Send_serialized returns true whenever the port remains valid
2138    /// after the send operation.
2139    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SerializedSender`.
2140    fn send_serialized(
2141        &self,
2142        headers: Flattrs,
2143        serialized: wirevalue::Any,
2144    ) -> Result<bool, SerializedSenderError>;
2145}
2146
2147/// A sender to an M-typed unbounded port.
2148enum UnboundedPortSender<M: Message> {
2149    /// Send directly to the mpsc queue.
2150    Mpsc(mpsc::UnboundedSender<M>),
2151    /// Use the provided function to enqueue the item.
2152    Func(Arc<dyn Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
2153}
2154
2155impl<M: Message> UnboundedPortSender<M> {
2156    fn send(&self, headers: Flattrs, message: M) -> Result<(), anyhow::Error> {
2157        match self {
2158            Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
2159            Self::Func(func) => func(headers, message),
2160        }
2161    }
2162}
2163
2164// We implement Clone manually as derive(Clone) places unnecessarily
2165// strict bounds on the type parameter M.
2166impl<M: Message> Clone for UnboundedPortSender<M> {
2167    fn clone(&self) -> Self {
2168        match self {
2169            Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
2170            Self::Func(func) => Self::Func(func.clone()),
2171        }
2172    }
2173}
2174
2175impl<M: Message> Debug for UnboundedPortSender<M> {
2176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2177        match self {
2178            Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
2179            Self::Func(_) => f
2180                .debug_tuple("UnboundedPortSender::Func")
2181                .field(&"..")
2182                .finish(),
2183        }
2184    }
2185}
2186
2187struct UnboundedSender<M: Message> {
2188    sender: UnboundedPortSender<M>,
2189    port_id: reference::PortId,
2190}
2191
2192impl<M: Message> UnboundedSender<M> {
2193    /// Create a new UnboundedSender encapsulating the provided
2194    /// sender.
2195    fn new(sender: UnboundedPortSender<M>, port_id: reference::PortId) -> Self {
2196        Self { sender, port_id }
2197    }
2198
2199    #[allow(dead_code)]
2200    fn send(&self, headers: Flattrs, message: M) -> Result<(), MailboxSenderError> {
2201        self.sender.send(headers, message).map_err(|err| {
2202            MailboxSenderError::new_bound(self.port_id.clone(), MailboxSenderErrorKind::Other(err))
2203        })
2204    }
2205}
2206
2207// Clone is implemented explicitly because the derive macro demands M:
2208// Clone directly. In this case, it isn't needed because Arc<T> can
2209// clone for any T.
2210impl<M: Message> Clone for UnboundedSender<M> {
2211    fn clone(&self) -> Self {
2212        Self {
2213            sender: self.sender.clone(),
2214            port_id: self.port_id.clone(),
2215        }
2216    }
2217}
2218
2219impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
2220    fn as_any(&self) -> &dyn Any {
2221        self
2222    }
2223
2224    fn send_serialized(
2225        &self,
2226        headers: Flattrs,
2227        serialized: wirevalue::Any,
2228    ) -> Result<bool, SerializedSenderError> {
2229        // Here, the stack ensures that this port is only instantiated for M-typed messages.
2230        // This does not protect against bad senders (e.g., encoding wrongly-typed messages),
2231        // but it is required as we have some usages that rely on representational equivalence
2232        // to provide type indexing, specifically in `IndexedErasedUnbound` which is used to
2233        // support port aggregation.
2234        match serialized.deserialized_unchecked() {
2235            Ok(message) => {
2236                self.sender.send(headers.clone(), message).map_err(|err| {
2237                    SerializedSenderError {
2238                        data: serialized,
2239                        error: MailboxSenderError::new_bound(
2240                            self.port_id.clone(),
2241                            MailboxSenderErrorKind::Other(err),
2242                        ),
2243                        headers,
2244                    }
2245                })?;
2246
2247                Ok(true)
2248            }
2249            Err(err) => Err(SerializedSenderError {
2250                data: serialized,
2251                error: MailboxSenderError::new_bound(
2252                    self.port_id.clone(),
2253                    MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2254                ),
2255                headers,
2256            }),
2257        }
2258    }
2259}
2260
2261/// OnceSender encapsulates an underlying one-shot sender, dynamically
2262/// tracking its validity.
2263#[derive(Debug)]
2264struct OnceSender<M: Message> {
2265    sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
2266    port_id: reference::PortId,
2267}
2268
2269impl<M: Message> OnceSender<M> {
2270    /// Create a new OnceSender encapsulating the provided one-shot
2271    /// sender.
2272    fn new(sender: oneshot::Sender<M>, port_id: reference::PortId) -> Self {
2273        Self {
2274            sender: Arc::new(Mutex::new(Some(sender))),
2275            port_id,
2276        }
2277    }
2278
2279    fn send_once(&self, message: M) -> Result<bool, MailboxSenderError> {
2280        // TODO: we should replace the sender on error
2281        match self.sender.lock().unwrap().take() {
2282            None => Err(MailboxSenderError::new_bound(
2283                self.port_id.clone(),
2284                MailboxSenderErrorKind::Closed,
2285            )),
2286            Some(sender) => {
2287                sender.send(message).map_err(|_| {
2288                    // Here, the value is returned when the port is
2289                    // closed.  We should consider having a similar
2290                    // API for send_once, though arguably it makes less
2291                    // sense in this context.
2292                    MailboxSenderError::new_bound(
2293                        self.port_id.clone(),
2294                        MailboxSenderErrorKind::Closed,
2295                    )
2296                })?;
2297                Ok(false)
2298            }
2299        }
2300    }
2301}
2302
2303// Clone is implemented explicitly because the derive macro demands M:
2304// Clone directly. In this case, it isn't needed because Arc<T> can
2305// clone for any T.
2306impl<M: Message> Clone for OnceSender<M> {
2307    fn clone(&self) -> Self {
2308        Self {
2309            sender: self.sender.clone(),
2310            port_id: self.port_id.clone(),
2311        }
2312    }
2313}
2314
2315impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2316    fn as_any(&self) -> &dyn Any {
2317        self
2318    }
2319
2320    fn send_serialized(
2321        &self,
2322        headers: Flattrs,
2323        serialized: wirevalue::Any,
2324    ) -> Result<bool, SerializedSenderError> {
2325        match serialized.deserialized() {
2326            Ok(message) => self.send_once(message).map_err(|e| SerializedSenderError {
2327                data: serialized,
2328                error: e,
2329                headers,
2330            }),
2331            Err(err) => Err(SerializedSenderError {
2332                data: serialized,
2333                error: MailboxSenderError::new_bound(
2334                    self.port_id.clone(),
2335                    MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2336                ),
2337                headers,
2338            }),
2339        }
2340    }
2341}
2342
2343/// Use the provided function to send untyped messages (i.e. Any objects).
2344pub(crate) struct UntypedUnboundedSender {
2345    pub(crate) sender:
2346        Box<dyn Fn(wirevalue::Any) -> Result<bool, (wirevalue::Any, anyhow::Error)> + Send + Sync>,
2347    pub(crate) port_id: reference::PortId,
2348}
2349
2350impl SerializedSender for UntypedUnboundedSender {
2351    fn as_any(&self) -> &dyn Any {
2352        self
2353    }
2354
2355    fn send_serialized(
2356        &self,
2357        headers: Flattrs,
2358        serialized: wirevalue::Any,
2359    ) -> Result<bool, SerializedSenderError> {
2360        (self.sender)(serialized).map_err(|(data, err)| SerializedSenderError {
2361            data,
2362            error: MailboxSenderError::new_bound(
2363                self.port_id.clone(),
2364                MailboxSenderErrorKind::Other(err),
2365            ),
2366            headers,
2367        })
2368    }
2369}
2370
2371/// State is the internal state of the mailbox.
2372struct State {
2373    /// The ID of the mailbox owner.
2374    actor_id: reference::ActorId,
2375
2376    // insert if it's serializable; otherwise don't.
2377    /// The set of active ports in the mailbox. All currently
2378    /// allocated ports are
2379    ports: DashMap<u64, Box<dyn SerializedSender>>,
2380
2381    /// The next port ID to allocate.
2382    next_port: AtomicU64,
2383
2384    /// The forwarder for this mailbox.
2385    forwarder: BoxedMailboxSender,
2386
2387    /// If a value is present, the mailbox has been closed with the provided
2388    /// status, and any subsequent `Mailbox::post_unchecked` calls will fail.
2389    closed: RwLock<Option<ActorStatus>>,
2390}
2391
2392impl State {
2393    /// Create a new state with the provided owning ActorId.
2394    fn new(actor_id: reference::ActorId, forwarder: BoxedMailboxSender) -> Self {
2395        Self {
2396            actor_id,
2397            ports: DashMap::new(),
2398            // The first 1024 ports are allocated to actor handlers.
2399            // Other port IDs are ephemeral.
2400            next_port: AtomicU64::new(USER_PORT_OFFSET),
2401            forwarder,
2402            closed: RwLock::new(None),
2403        }
2404    }
2405
2406    /// Allocate a fresh port.
2407    fn allocate_port(&self) -> u64 {
2408        self.next_port.fetch_add(1, Ordering::SeqCst)
2409    }
2410}
2411
2412impl fmt::Debug for State {
2413    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2414        f.debug_struct("State")
2415            .field("actor_id", &self.actor_id)
2416            .field(
2417                "open_ports",
2418                &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2419            )
2420            .field("next_port", &self.next_port)
2421            .finish()
2422    }
2423}
2424
2425// TODO: mux based on some parameterized type. (mux key).
2426/// An in-memory mailbox muxer. This is used to route messages to
2427/// different underlying senders.
2428#[derive(Clone)]
2429pub struct MailboxMuxer {
2430    mailboxes: Arc<DashMap<reference::ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2431}
2432
2433impl Default for MailboxMuxer {
2434    fn default() -> Self {
2435        Self::new()
2436    }
2437}
2438
2439impl MailboxMuxer {
2440    /// Create a new, empty, muxer.
2441    pub fn new() -> Self {
2442        Self {
2443            mailboxes: Arc::new(DashMap::new()),
2444        }
2445    }
2446
2447    /// Route messages destined for the provided actor id to the provided
2448    /// sender. Returns false if there is already a sender associated
2449    /// with the actor. In this case, the sender is not replaced, and
2450    /// the caller must [`MailboxMuxer::unbind`] it first.
2451    pub fn bind(&self, actor_id: reference::ActorId, sender: impl MailboxSender + 'static) -> bool {
2452        match self.mailboxes.entry(actor_id) {
2453            Entry::Occupied(_) => false,
2454            Entry::Vacant(entry) => {
2455                entry.insert(Box::new(sender));
2456                true
2457            }
2458        }
2459    }
2460
2461    /// Convenience function to bind a mailbox.
2462    pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2463        self.bind(mailbox.actor_id().clone(), mailbox)
2464    }
2465
2466    /// Unbind the sender associated with the provided actor ID. After
2467    /// unbinding, the muxer will no longer be able to send messages to
2468    /// that actor.
2469    #[allow(dead_code)]
2470    pub(crate) fn unbind(&self, actor_id: &reference::ActorId) {
2471        self.mailboxes.remove(actor_id);
2472    }
2473
2474    /// Returns a list of all actors bound to this muxer. Useful in debugging.
2475    pub fn bound_actors(&self) -> Vec<reference::ActorId> {
2476        self.mailboxes.iter().map(|e| e.key().clone()).collect()
2477    }
2478}
2479
2480impl MailboxSender for MailboxMuxer {
2481    fn post_unchecked(
2482        &self,
2483        envelope: MessageEnvelope,
2484        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2485    ) {
2486        let dest_actor_id = envelope.dest().actor_id();
2487        match self.mailboxes.get(dest_actor_id) {
2488            None => {
2489                let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
2490                envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2491            }
2492            Some(sender) => sender.post(envelope, return_handle),
2493        }
2494    }
2495}
2496
2497/// MailboxRouter routes messages to the sender that is bound to its
2498/// nearest prefix.
2499#[derive(Clone)]
2500pub struct MailboxRouter {
2501    entries: Arc<RwLock<BTreeMap<reference::Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2502}
2503
2504impl Default for MailboxRouter {
2505    fn default() -> Self {
2506        Self::new()
2507    }
2508}
2509
2510impl MailboxRouter {
2511    /// Create a new, empty router.
2512    pub fn new() -> Self {
2513        Self {
2514            entries: Arc::new(RwLock::new(BTreeMap::new())),
2515        }
2516    }
2517
2518    /// Downgrade this router to a [`WeakMailboxRouter`].
2519    pub fn downgrade(&self) -> WeakMailboxRouter {
2520        WeakMailboxRouter(Arc::downgrade(&self.entries))
2521    }
2522
2523    /// Returns a new router that will first attempt to find a route for the message
2524    /// in the router's table; otherwise post the message to the provided fallback
2525    /// sender.
2526    pub fn fallback(&self, default: BoxedMailboxSender) -> impl MailboxSender {
2527        FallbackMailboxRouter {
2528            router: self.clone(),
2529            default,
2530        }
2531    }
2532
2533    /// Bind the provided sender to the given reference. The destination
2534    /// is treated as a prefix to which messages can be routed, and
2535    /// messages are routed to their longest matching prefix.
2536    pub fn bind(&self, dest: reference::Reference, sender: impl MailboxSender + 'static) {
2537        let mut w = self.entries.write().unwrap();
2538        w.insert(dest, Arc::new(sender));
2539    }
2540
2541    fn sender(
2542        &self,
2543        actor_id: &reference::ActorId,
2544    ) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
2545        match self
2546            .entries
2547            .read()
2548            .unwrap()
2549            .lower_bound(Excluded(&actor_id.clone().into()))
2550            .prev()
2551        {
2552            None => None,
2553            Some((key, sender)) if key.is_prefix_of(&actor_id.clone().into()) => {
2554                Some(sender.clone())
2555            }
2556            Some(_) => None,
2557        }
2558    }
2559}
2560
2561impl MailboxSender for MailboxRouter {
2562    fn post_unchecked(
2563        &self,
2564        envelope: MessageEnvelope,
2565        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2566    ) {
2567        match self.sender(envelope.dest().actor_id()) {
2568            None => envelope.undeliverable(
2569                DeliveryError::Unroutable(
2570                    "no destination found for actor in routing table".to_string(),
2571                ),
2572                return_handle,
2573            ),
2574            Some(sender) => sender.post(envelope, return_handle),
2575        }
2576    }
2577}
2578
2579#[derive(Clone)]
2580struct FallbackMailboxRouter {
2581    router: MailboxRouter,
2582    default: BoxedMailboxSender,
2583}
2584
2585impl MailboxSender for FallbackMailboxRouter {
2586    fn post_unchecked(
2587        &self,
2588        envelope: MessageEnvelope,
2589        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2590    ) {
2591        match self.router.sender(envelope.dest().actor_id()) {
2592            Some(sender) => sender.post(envelope, return_handle),
2593            None => self.default.post(envelope, return_handle),
2594        }
2595    }
2596}
2597
2598/// A version of [`MailboxRouter`] that holds a weak reference to the underlying
2599/// state. This allows router references to be circular: an entity holding a reference
2600/// to the router may also contain the router itself.
2601///
2602/// TODO: this currently holds a weak reference to the entire router. This helps
2603/// prevent cycle leaks, but can cause excess memory usage as the cycle is at
2604/// the granularity of each entry. Possibly the router should allow weak references
2605/// on a per-entry basis.
2606#[derive(Debug, Clone)]
2607pub struct WeakMailboxRouter(
2608    Weak<RwLock<BTreeMap<reference::Reference, Arc<dyn MailboxSender + Send + Sync>>>>,
2609);
2610
2611impl WeakMailboxRouter {
2612    /// Upgrade the weak router to a strong reference router.
2613    pub fn upgrade(&self) -> Option<MailboxRouter> {
2614        self.0.upgrade().map(|entries| MailboxRouter { entries })
2615    }
2616}
2617
2618impl MailboxSender for WeakMailboxRouter {
2619    fn post_unchecked(
2620        &self,
2621        envelope: MessageEnvelope,
2622        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2623    ) {
2624        match self.upgrade() {
2625            Some(router) => router.post(envelope, return_handle),
2626            None => envelope.undeliverable(
2627                DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
2628                return_handle,
2629            ),
2630        }
2631    }
2632}
2633
2634/// A dynamic mailbox router that supports remote delivery.
2635///
2636/// `DialMailboxRouter` maintains a runtime address book mapping
2637/// references to `ChannelAddr`s. It holds a cache of active
2638/// connections and forwards messages to the appropriate
2639/// `MailboxClient`.
2640///
2641/// If a message destination is not bound, but is a "direct mode" address
2642/// (i.e., its proc id contains the channel address through which the proc
2643/// is reachable), then DialMailboxRouter dials the proc directly.
2644///
2645/// Messages sent to unknown destinations are routed to the `default`
2646/// sender, if present.
2647#[derive(Clone)]
2648pub struct DialMailboxRouter {
2649    address_book: Arc<RwLock<BTreeMap<reference::Reference, ChannelAddr>>>,
2650    sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
2651
2652    // The default sender, to which messages for unknown recipients
2653    // are sent. (This is like a default route in a routing table.)
2654    default: BoxedMailboxSender,
2655
2656    // When true, only dial direct-addressed procs if their transport
2657    // type is remote. Otherwise, fall back to the default sender.
2658    direct_addressed_remote_only: bool,
2659}
2660
2661impl Default for DialMailboxRouter {
2662    fn default() -> Self {
2663        Self::new()
2664    }
2665}
2666
2667impl DialMailboxRouter {
2668    /// Create a new [`DialMailboxRouter`] with an empty routing table.
2669    pub fn new() -> Self {
2670        Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
2671    }
2672
2673    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2674    /// and a default sender. Any message with an unknown destination is
2675    /// dispatched on this default sender, unless the destination is
2676    /// direct-addressed, in which case it is dialed directly.
2677    pub fn new_with_default(default: BoxedMailboxSender) -> Self {
2678        Self {
2679            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2680            sender_cache: Arc::new(DashMap::new()),
2681            default,
2682            direct_addressed_remote_only: false,
2683        }
2684    }
2685
2686    /// Create a new [`DialMailboxRouter`] with an empty routing table,
2687    /// and a default sender. Any message with an unknown destination is
2688    /// dispatched on this default sender, unless the destination is
2689    /// direct-addressed *and* has a remote channel transport type.
2690    pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
2691        Self {
2692            address_book: Arc::new(RwLock::new(BTreeMap::new())),
2693            sender_cache: Arc::new(DashMap::new()),
2694            default,
2695            direct_addressed_remote_only: true,
2696        }
2697    }
2698
2699    /// Binds a [`Reference`] to a [`ChannelAddr`], replacing any
2700    /// existing binding.
2701    ///
2702    /// If the address changes, the old sender is evicted from the
2703    /// cache to ensure fresh routing on next use.
2704    pub fn bind(&self, dest: reference::Reference, addr: ChannelAddr) {
2705        if let Ok(mut w) = self.address_book.write() {
2706            if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
2707                && old_addr != addr
2708            {
2709                tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
2710                self.sender_cache.remove(&old_addr);
2711            }
2712        } else {
2713            tracing::error!("address book poisoned during bind of {:?}", dest);
2714        }
2715    }
2716
2717    /// Removes all address mappings with the given prefix from the
2718    /// router.
2719    ///
2720    /// Also evicts any corresponding cached senders to prevent reuse
2721    /// of stale connections.
2722    pub fn unbind(&self, dest: &reference::Reference) {
2723        if let Ok(mut w) = self.address_book.write() {
2724            let to_remove: Vec<(reference::Reference, ChannelAddr)> = w
2725                .range(dest..)
2726                .take_while(|(key, _)| dest.is_prefix_of(key))
2727                .map(|(key, addr)| (key.clone(), addr.clone()))
2728                .collect();
2729
2730            for (key, addr) in to_remove {
2731                tracing::info!("unbinding {:?} from {:?}", key, addr);
2732                w.remove(&key);
2733                self.sender_cache.remove(&addr);
2734            }
2735        } else {
2736            tracing::error!("address book poisoned during unbind of {:?}", dest);
2737        }
2738    }
2739
2740    /// Lookup an actor's channel in the router's address bok.
2741    pub fn lookup_addr(&self, actor_id: &reference::ActorId) -> Option<ChannelAddr> {
2742        let address_book = self.address_book.read().unwrap();
2743        let found = address_book
2744            .lower_bound(Excluded(&actor_id.clone().into()))
2745            .prev();
2746
2747        // First try to look up the address in our address book; failing that,
2748        // extract the address from the ProcId (all procs are direct-addressed now).
2749        if let Some((key, addr)) = found
2750            && key.is_prefix_of(&actor_id.clone().into())
2751        {
2752            Some(addr.clone())
2753        } else {
2754            let addr = actor_id.proc_id().addr().clone();
2755            if self.direct_addressed_remote_only {
2756                addr.transport().is_remote().then_some(addr)
2757            } else {
2758                Some(addr)
2759            }
2760        }
2761    }
2762
2763    /// Return all covering prefixes of this router. That is, all references that are not
2764    /// prefixed by another reference in the routing table
2765    pub fn prefixes(&self) -> BTreeSet<reference::Reference> {
2766        let addrs = self.address_book.read().unwrap();
2767        let mut prefixes: BTreeSet<reference::Reference> = BTreeSet::new();
2768        for (reference, _) in addrs.iter() {
2769            match prefixes.lower_bound(Excluded(reference)).peek_prev() {
2770                Some(candidate) if candidate.is_prefix_of(reference) => (),
2771                _ => {
2772                    prefixes.insert(reference.clone());
2773                }
2774            }
2775        }
2776
2777        prefixes
2778    }
2779
2780    fn dial(
2781        &self,
2782        addr: &ChannelAddr,
2783        actor_id: &reference::ActorId,
2784    ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
2785        // Get the sender. Create it if needed. Do not send the
2786        // messages inside this block so we do not hold onto the
2787        // reference of the dashmap entries.
2788        match self.sender_cache.entry(addr.clone()) {
2789            Entry::Occupied(entry) => Ok(entry.get().clone()),
2790            Entry::Vacant(entry) => {
2791                let tx = channel::dial(addr.clone()).map_err(|err| {
2792                    MailboxSenderError::new_unbound_type(
2793                        actor_id.clone(),
2794                        MailboxSenderErrorKind::Channel(err),
2795                        "unknown",
2796                    )
2797                })?;
2798                let sender = MailboxClient::new(tx);
2799                Ok(entry.insert(Arc::new(sender)).value().clone())
2800            }
2801        }
2802    }
2803}
2804
2805impl MailboxSender for DialMailboxRouter {
2806    fn post_unchecked(
2807        &self,
2808        envelope: MessageEnvelope,
2809        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2810    ) {
2811        let Some(addr) = self.lookup_addr(envelope.dest().actor_id()) else {
2812            self.default.post(envelope, return_handle);
2813            return;
2814        };
2815
2816        match self.dial(&addr, envelope.dest().actor_id()) {
2817            Err(err) => envelope.undeliverable(
2818                DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
2819                return_handle,
2820            ),
2821            Ok(sender) => sender.post(envelope, return_handle),
2822        }
2823    }
2824}
2825
2826/// A MailboxSender that reports any envelope as undeliverable due to
2827/// routing failure.
2828#[derive(Debug)]
2829pub struct UnroutableMailboxSender;
2830
2831impl MailboxSender for UnroutableMailboxSender {
2832    fn post_unchecked(
2833        &self,
2834        envelope: MessageEnvelope,
2835        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2836    ) {
2837        envelope.undeliverable(
2838            DeliveryError::Unroutable("destination not found in routing table".to_string()),
2839            return_handle,
2840        );
2841    }
2842}
2843
2844#[cfg(test)]
2845mod tests {
2846
2847    use std::assert_matches::assert_matches;
2848    use std::mem::drop;
2849    use std::str::FromStr;
2850    use std::sync::atomic::AtomicUsize;
2851    use std::time::Duration;
2852
2853    use timed_test::async_timed_test;
2854
2855    use super::*;
2856    use crate::Actor;
2857    use crate::ActorHandle;
2858    use crate::Instance;
2859    use crate::accum;
2860    use crate::accum::ReducerMode;
2861    use crate::channel::ChannelTransport;
2862    use crate::context::Mailbox as MailboxContext;
2863    use crate::proc::Proc;
2864    use crate::testing::ids::test_actor_id;
2865    use crate::testing::ids::test_port_id;
2866    use crate::testing::ids::test_proc_id;
2867
2868    fn test_proc_ref(name: &str) -> reference::Reference {
2869        reference::Reference::Proc(test_proc_id(name))
2870    }
2871
2872    fn test_actor_ref(proc_name: &str, actor_name: &str) -> reference::Reference {
2873        reference::Reference::Actor(test_actor_id(proc_name, actor_name))
2874    }
2875
2876    #[test]
2877    fn test_error() {
2878        use crate::testing::ids::test_actor_id_with_pid;
2879        let err = MailboxError::new(
2880            test_actor_id_with_pid("myworld_2", "myactor", 5),
2881            MailboxErrorKind::Closed,
2882        );
2883        // The format is: "{proc_id},{actor_name}[{pid}]: {error}"
2884        // proc_id = "{addr},{proc_name}" so overall: "{addr},{proc_name},{actor_name}[{pid}]"
2885        assert!(format!("{}", err).ends_with(",test_myworld_2,myactor[5]: mailbox closed"));
2886    }
2887
2888    #[tokio::test]
2889    async fn test_mailbox_basic() {
2890        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
2891        let (port, mut receiver) = mbox.open_port::<u64>();
2892        let port = port.bind();
2893
2894        mbox.serialize_and_send(&port, 123, monitored_return_handle())
2895            .unwrap();
2896        mbox.serialize_and_send(&port, 321, monitored_return_handle())
2897            .unwrap();
2898        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2899        assert_eq!(receiver.recv().await.unwrap(), 321u64);
2900
2901        let serialized = wirevalue::Any::serialize(&999u64).unwrap();
2902        mbox.post(
2903            MessageEnvelope::new_unknown(port.port_id().clone(), serialized),
2904            monitored_return_handle(),
2905        );
2906        assert_eq!(receiver.recv().await.unwrap(), 999u64);
2907    }
2908
2909    #[tokio::test]
2910    async fn test_mailbox_accum() {
2911        let proc = Proc::local();
2912        let (client, _) = proc.instance("client").unwrap();
2913        let (port, mut receiver) = client
2914            .mailbox()
2915            .open_accum_port(accum::join_semilattice::<accum::Max<i64>>());
2916
2917        for i in -3..4 {
2918            port.send(&client, accum::Max(i)).unwrap();
2919            let received: accum::Max<i64> = receiver.recv().await.unwrap();
2920            let msg = received.get();
2921            assert_eq!(msg, &i);
2922        }
2923        // Send a smaller or same value. Should still receive the previous max.
2924        for i in -3..4 {
2925            port.send(&client, accum::Max(i)).unwrap();
2926            assert_eq!(receiver.recv().await.unwrap().get(), &3);
2927        }
2928        // send a larger value. Should receive the new max.
2929        port.send(&client, accum::Max(4)).unwrap();
2930        assert_eq!(receiver.recv().await.unwrap().get(), &4);
2931
2932        // Send multiple updates. Should only receive the final change.
2933        for i in 5..10 {
2934            port.send(&client, accum::Max(i)).unwrap();
2935        }
2936        assert_eq!(receiver.recv().await.unwrap().get(), &9);
2937        port.send(&client, accum::Max(1)).unwrap();
2938        port.send(&client, accum::Max(3)).unwrap();
2939        port.send(&client, accum::Max(2)).unwrap();
2940        assert_eq!(receiver.recv().await.unwrap().get(), &9);
2941    }
2942
2943    #[test]
2944    fn test_port_and_reducer() {
2945        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
2946        // accum port could have reducer typehash
2947        {
2948            let accumulator = accum::join_semilattice::<accum::Max<u64>>();
2949            let reducer_spec = accumulator.reducer_spec().unwrap();
2950            let (port, _) = mbox.open_accum_port(accum::join_semilattice::<accum::Max<u64>>());
2951            assert_eq!(port.reducer_spec, Some(reducer_spec.clone()));
2952            let port_ref = port.bind();
2953            assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
2954        }
2955        // normal port should not have reducer typehash
2956        {
2957            let (port, _) = mbox.open_port::<u64>();
2958            assert_eq!(port.reducer_spec, None);
2959            let port_ref = port.bind();
2960            assert_eq!(port_ref.reducer_spec(), &None);
2961        }
2962    }
2963
2964    #[tokio::test]
2965    #[ignore] // error behavior changed, but we will bring it back
2966    async fn test_mailbox_once() {
2967        let proc = Proc::local();
2968        let (client, _) = proc.instance("client").unwrap();
2969
2970        let (port, receiver) = client.open_once_port::<u64>();
2971
2972        // let port_id = port.port_id().clone();
2973
2974        port.send(&client, 123u64).unwrap();
2975        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2976
2977        // // The borrow checker won't let us send again on the port
2978        // // (good!), but we stashed the port-id and so we can try on the
2979        // // serialized interface.
2980        // let Err(err) = mbox
2981        //     .send_serialized(&port_id, &wirevalue::Any(Vec::new()))
2982        //     .await
2983        // else {
2984        //     unreachable!()
2985        // };
2986        // assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
2987    }
2988
2989    #[tokio::test]
2990    #[ignore] // changed error behavior
2991    async fn test_mailbox_receiver_drop() {
2992        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
2993        let (port, mut receiver) = mbox.open_port::<u64>();
2994        // Make sure we go through "remote" path.
2995        let port = port.bind();
2996        mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
2997            .unwrap();
2998        assert_eq!(receiver.recv().await.unwrap(), 123u64);
2999        drop(receiver);
3000        let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
3001            panic!();
3002        };
3003
3004        assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
3005        assert_matches!(err.location(), PortLocation::Bound(bound) if bound == port.port_id());
3006    }
3007
3008    #[tokio::test]
3009    async fn test_drain() {
3010        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
3011
3012        let (port, mut receiver) = mbox.open_port();
3013        let port = port.bind();
3014
3015        for i in 0..10 {
3016            mbox.serialize_and_send(&port, i, monitored_return_handle())
3017                .unwrap();
3018        }
3019
3020        for i in 0..10 {
3021            assert_eq!(receiver.recv().await.unwrap(), i);
3022        }
3023
3024        assert!(receiver.drain().is_empty());
3025    }
3026
3027    #[tokio::test]
3028    async fn test_mailbox_muxer() {
3029        let muxer = MailboxMuxer::new();
3030
3031        let mbox0 = Mailbox::new_detached(test_actor_id("0", "actor1"));
3032        let mbox1 = Mailbox::new_detached(test_actor_id("0", "actor2"));
3033
3034        muxer.bind(mbox0.actor_id().clone(), mbox0.clone());
3035        muxer.bind(mbox1.actor_id().clone(), mbox1.clone());
3036
3037        let (port, receiver) = mbox0.open_once_port::<u64>();
3038
3039        let proc = Proc::configured(test_proc_id("0"), BoxedMailboxSender::new(muxer));
3040        let (client, _) = proc.instance("client").unwrap();
3041
3042        port.send(&client, 123u64).unwrap();
3043        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3044
3045        /*
3046        let (tx, rx) = channel::local::new::<u64>();
3047        let (port, _) = mbox0.open_port::<u64>();
3048        let handle = muxer.clone().serve_port(port, rx).unwrap();
3049        muxer.unbind(mbox0.actor_id());
3050        tx.send(123u64).await.unwrap();
3051        let Ok(Err(err)) = handle.await else { panic!() };
3052        assert_eq!(err.actor_id(), &actor_id(0));
3053        */
3054    }
3055
3056    #[tokio::test]
3057    async fn test_local_client_server() {
3058        let mbox = Mailbox::new_detached(test_actor_id("0", "actor0"));
3059        let (tx, rx) = channel::local::new();
3060        let serve_handle = mbox.clone().serve(rx);
3061        let client = MailboxClient::new(tx);
3062
3063        let (port, receiver) = mbox.open_once_port::<u64>();
3064        let port = port.bind();
3065
3066        client
3067            .serialize_and_send_once(port, 123u64, monitored_return_handle())
3068            .unwrap();
3069        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3070        serve_handle.stop("fromt test");
3071        serve_handle.await.unwrap().unwrap();
3072    }
3073
3074    #[tokio::test]
3075    async fn test_mailbox_router() {
3076        let mbox0 = Mailbox::new_detached(test_actor_id("world0_0", "actor0"));
3077        let mbox1 = Mailbox::new_detached(test_actor_id("world1_0", "actor0"));
3078        let mbox2 = Mailbox::new_detached(test_actor_id("world1_1", "actor0"));
3079        let mbox3 = Mailbox::new_detached(test_actor_id("world1_1", "actor1"));
3080
3081        let comms: Vec<(reference::OncePortRef<u64>, OncePortReceiver<u64>)> =
3082            [&mbox0, &mbox1, &mbox2, &mbox3]
3083                .into_iter()
3084                .map(|mbox| {
3085                    let (port, receiver) = mbox.open_once_port::<u64>();
3086                    (port.bind(), receiver)
3087                })
3088                .collect();
3089
3090        let router = MailboxRouter::new();
3091
3092        router.bind(test_proc_id("world0_0").into(), mbox0);
3093        router.bind(test_proc_id("world1_0").into(), mbox1);
3094        router.bind(test_proc_id("world1_1").into(), mbox2);
3095        router.bind(test_actor_id("world1_1", "actor1").into(), mbox3);
3096
3097        for (i, (port, receiver)) in comms.into_iter().enumerate() {
3098            router
3099                .serialize_and_send_once(port, i as u64, monitored_return_handle())
3100                .unwrap();
3101            assert_eq!(receiver.recv().await.unwrap(), i as u64);
3102        }
3103
3104        // Test undeliverable messages, and that it is delivered with the appropriate fallback.
3105
3106        let mbox4 = Mailbox::new_detached(test_actor_id("fallback_0", "actor"));
3107
3108        let (return_handle, mut return_receiver) =
3109            crate::mailbox::undeliverable::new_undeliverable_port();
3110        let (port, _receiver) = mbox4.open_once_port();
3111        router
3112            .serialize_and_send_once(port.bind(), 0, return_handle.clone())
3113            .unwrap();
3114        assert!(return_receiver.recv().await.is_ok());
3115
3116        let router = router.fallback(mbox4.clone().into_boxed());
3117        let (port, receiver) = mbox4.open_once_port();
3118        router
3119            .serialize_and_send_once(port.bind(), 0, return_handle)
3120            .unwrap();
3121        assert_eq!(receiver.recv().await.unwrap(), 0);
3122    }
3123
3124    #[tokio::test]
3125    async fn test_dial_mailbox_router() {
3126        let router = DialMailboxRouter::new();
3127
3128        router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
3129        router.bind(test_proc_ref("world1_0"), "unix!@2".parse().unwrap());
3130        router.bind(test_proc_ref("world1_1"), "unix!@3".parse().unwrap());
3131        router.bind(
3132            test_actor_ref("world1_1", "actor1"),
3133            "unix!@4".parse().unwrap(),
3134        );
3135        // Bind a direct address -- we should use its bound address!
3136        router.bind(
3137            "unix:@4,my_proc,my_actor".parse().unwrap(),
3138            "unix:@5".parse().unwrap(),
3139        );
3140
3141        // We should be able to lookup the ids
3142        router
3143            .lookup_addr(&test_actor_id("world0_0", "actor"))
3144            .unwrap();
3145        router
3146            .lookup_addr(&test_actor_id("world1_0", "actor"))
3147            .unwrap();
3148
3149        let actor_id = reference::Reference::from_str("unix:@4,my_proc,my_actor")
3150            .unwrap()
3151            .into_actor()
3152            .unwrap();
3153        assert_eq!(
3154            router.lookup_addr(&actor_id).unwrap(),
3155            "unix!@5".parse().unwrap(),
3156        );
3157        router.unbind(&actor_id.clone().into());
3158        assert_eq!(
3159            router.lookup_addr(&actor_id).unwrap(),
3160            "unix!@4".parse().unwrap(),
3161        );
3162
3163        // Unbind procs so lookups fall back to the proc's direct address
3164        // (all procs are direct-addressed now, so lookup_addr always returns
3165        // Some; we verify the bound address is gone by checking the returned
3166        // address is the local fallback, not the originally bound one).
3167        let fallback = ChannelAddr::any(ChannelTransport::Local);
3168        router.unbind(&test_proc_ref("world1_0"));
3169        router.unbind(&test_proc_ref("world1_1"));
3170        assert_eq!(
3171            router
3172                .lookup_addr(&test_actor_id("world1_0", "actor1"))
3173                .unwrap(),
3174            fallback,
3175        );
3176        assert_eq!(
3177            router
3178                .lookup_addr(&test_actor_id("world1_1", "actor1"))
3179                .unwrap(),
3180            fallback,
3181        );
3182        router
3183            .lookup_addr(&test_actor_id("world0_0", "actor"))
3184            .unwrap();
3185        router.unbind(&test_proc_ref("world0_0"));
3186        assert_eq!(
3187            router
3188                .lookup_addr(&test_actor_id("world0_0", "actor"))
3189                .unwrap(),
3190            fallback,
3191        );
3192    }
3193
3194    #[tokio::test]
3195    #[ignore] // TODO: there's a leak here, fix it
3196    async fn test_dial_mailbox_router_default() {
3197        let mbox0 = Mailbox::new_detached(test_actor_id("world0_0", "actor0"));
3198        let mbox1 = Mailbox::new_detached(test_actor_id("world1_0", "actor0"));
3199        let mbox2 = Mailbox::new_detached(test_actor_id("world1_1", "actor0"));
3200        let mbox3 = Mailbox::new_detached(test_actor_id("world1_1", "actor1"));
3201
3202        // We don't need to dial here, since we gain direct access to the
3203        // underlying routers.
3204        let root = MailboxRouter::new();
3205        let world0_router = DialMailboxRouter::new_with_default(root.boxed());
3206        let world1_router = DialMailboxRouter::new_with_default(root.boxed());
3207
3208        root.bind(test_proc_ref("world0"), world0_router.clone());
3209        root.bind(test_proc_ref("world1"), world1_router.clone());
3210
3211        let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
3212
3213        let mut handles = Vec::new(); // hold on to handles, or channels get closed
3214        for mbox in mailboxes.iter() {
3215            let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
3216            let handle = (*mbox).clone().serve(rx);
3217            handles.push(handle);
3218
3219            eprintln!("{}: {}", mbox.actor_id(), addr);
3220            if mbox.actor_id().proc_id().name().starts_with("world0") {
3221                world0_router.bind(mbox.actor_id().clone().into(), addr);
3222            } else {
3223                world1_router.bind(mbox.actor_id().clone().into(), addr);
3224            }
3225        }
3226
3227        // Make sure nodes are fully connected.
3228        for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
3229            for mbox in mailboxes.iter() {
3230                let (port, receiver) = mbox.open_once_port::<u64>();
3231                let port = port.bind();
3232                router
3233                    .serialize_and_send_once(port, 123u64, monitored_return_handle())
3234                    .unwrap();
3235                assert_eq!(receiver.recv().await.unwrap(), 123u64);
3236            }
3237        }
3238    }
3239
3240    #[tokio::test]
3241    async fn test_enqueue_port() {
3242        let proc = Proc::local();
3243        let (client, _) = proc.instance("client").unwrap();
3244
3245        let count = Arc::new(AtomicUsize::new(0));
3246        let count_clone = count.clone();
3247        let port = client.mailbox().open_enqueue_port(move |_, n| {
3248            count_clone.fetch_add(n, Ordering::SeqCst);
3249            Ok(())
3250        });
3251
3252        port.send(&client, 10).unwrap();
3253        port.send(&client, 5).unwrap();
3254        port.send(&client, 1).unwrap();
3255        port.send(&client, 0).unwrap();
3256
3257        assert_eq!(count.load(Ordering::SeqCst), 16);
3258    }
3259
3260    #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3261    struct TestMessage;
3262
3263    #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3264    #[named(name = "some::custom::path")]
3265    struct TestMessage2;
3266
3267    #[test]
3268    fn test_remote_message_macros() {
3269        assert_eq!(
3270            TestMessage::typename(),
3271            "hyperactor::mailbox::tests::TestMessage"
3272        );
3273        assert_eq!(TestMessage2::typename(), "some::custom::path");
3274    }
3275
3276    #[test]
3277    fn test_message_envelope_display() {
3278        #[derive(typeuri::Named, Serialize, Deserialize)]
3279        struct MyTest {
3280            a: u64,
3281            b: String,
3282        }
3283        wirevalue::register_type!(MyTest);
3284
3285        let envelope = MessageEnvelope::serialize(
3286            test_actor_id("source_0", "actor"),
3287            test_port_id("dest_1", "actor", 123),
3288            &MyTest {
3289                a: 123,
3290                b: "hello".into(),
3291            },
3292            Flattrs::new(),
3293        )
3294        .unwrap();
3295
3296        // Note: display format changed from "source[0].actor" to direct format
3297        assert!(format!("{}", envelope).contains("MyTest{\"a\":123,\"b\":\"hello\"}"));
3298    }
3299
3300    #[derive(Debug, Default)]
3301    struct Foo;
3302
3303    impl Actor for Foo {}
3304
3305    // Test that a message delivery failure causes the sending actor
3306    // to stop running.
3307    #[tokio::test]
3308    async fn test_actor_delivery_failure() {
3309        // This test involves making an actor fail and so we must set
3310        // a supervision coordinator.
3311        use crate::actor::ActorStatus;
3312        use crate::testing::proc_supervison::ProcSupervisionCoordinator;
3313
3314        let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
3315            BOXED_PANICKING_MAILBOX_SENDER.clone(),
3316        ));
3317        let proc_id = test_proc_id("quux_0");
3318        let mut proc = Proc::configured(proc_id.clone(), proc_forwarder);
3319        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3320        let (client, _) = proc.instance("client").unwrap();
3321
3322        let foo = proc.spawn("foo", Foo).unwrap();
3323        let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
3324        let message = MessageEnvelope::new(
3325            foo.actor_id().clone(),
3326            test_port_id("corge_0", "bar", 9999),
3327            wirevalue::Any::serialize(&1u64).unwrap(),
3328            Flattrs::new(),
3329        );
3330        return_handle.send(&client, Undeliverable(message)).unwrap();
3331
3332        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
3333
3334        let foo_status = foo.status();
3335        assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
3336        let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
3337            unreachable!()
3338        };
3339        let msg_str = msg.to_string();
3340        assert!(msg_str.contains("undeliverable message error"));
3341        // Updated assertions for direct format
3342        assert!(msg_str.contains("sender:") && msg_str.contains("quux_0"));
3343        assert!(msg_str.contains("dest:") && msg_str.contains("corge_0"));
3344
3345        proc.destroy_and_wait::<()>(tokio::time::Duration::from_secs(1), None, "test cleanup")
3346            .await
3347            .unwrap();
3348    }
3349
3350    #[tokio::test]
3351    async fn test_detached_return_handle() {
3352        let (return_handle, mut return_receiver) =
3353            crate::mailbox::undeliverable::new_undeliverable_port();
3354        // Simulate an undelivered message return.
3355        let envelope = MessageEnvelope::new(
3356            test_actor_id("foo_0", "bar"),
3357            test_port_id("baz_0", "corge", 9999),
3358            wirevalue::Any::serialize(&1u64).unwrap(),
3359            Flattrs::new(),
3360        );
3361        let proc = Proc::local();
3362        let (client, _) = proc.instance("client").unwrap();
3363        return_handle
3364            .send(&client, Undeliverable(envelope.clone()))
3365            .unwrap();
3366        // Check we receive the undelivered message.
3367        assert!(
3368            tokio::time::timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
3369                .await
3370                .is_ok()
3371        );
3372        // Setup a monitor for the receiver and show that if there are
3373        // no outstanding return handles it terminates.
3374        let monitor_handle = tokio::spawn(async move {
3375            while let Ok(Undeliverable(mut envelope)) = return_receiver.recv().await {
3376                envelope.set_error(DeliveryError::BrokenLink(
3377                    "returned in unit test".to_string(),
3378                ));
3379                UndeliverableMailboxSender
3380                    .post(envelope, /*unused */ monitored_return_handle());
3381            }
3382        });
3383        drop(return_handle);
3384        assert!(
3385            tokio::time::timeout(tokio::time::Duration::from_secs(1), monitor_handle)
3386                .await
3387                .is_ok()
3388        );
3389    }
3390
3391    async fn verify_receiver(coalesce: bool, drop_sender: bool) {
3392        fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
3393            // Create dummy state and port_id to create PortReceiver. They are
3394            // not used in the test.
3395            let dummy_actor_id = test_actor_id("world_0", "actor");
3396            let dummy_state = State::new(
3397                dummy_actor_id.clone(),
3398                BOXED_PANICKING_MAILBOX_SENDER.clone(),
3399            );
3400            let dummy_port_id = reference::PortId::new(dummy_actor_id, 0);
3401            let (sender, receiver) = mpsc::unbounded_channel::<M>();
3402            let receiver = PortReceiver {
3403                receiver,
3404                port_id: dummy_port_id,
3405                coalesce,
3406                mailbox: Mailbox {
3407                    inner: Arc::new(dummy_state),
3408                },
3409            };
3410            (sender, receiver)
3411        }
3412
3413        // verify fn drain
3414        {
3415            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3416            assert!(receiver.drain().is_empty());
3417
3418            sender.send(0).unwrap();
3419            sender.send(1).unwrap();
3420            sender.send(2).unwrap();
3421            sender.send(3).unwrap();
3422            sender.send(4).unwrap();
3423            sender.send(5).unwrap();
3424            sender.send(6).unwrap();
3425            sender.send(7).unwrap();
3426
3427            if drop_sender {
3428                drop(sender);
3429            }
3430
3431            if !coalesce {
3432                assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
3433            } else {
3434                assert_eq!(receiver.drain(), vec![7]);
3435            }
3436
3437            assert!(receiver.drain().is_empty());
3438            assert!(receiver.drain().is_empty());
3439        }
3440
3441        // verify fn try_recv
3442        {
3443            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3444            assert!(receiver.try_recv().unwrap().is_none());
3445
3446            sender.send(0).unwrap();
3447            sender.send(1).unwrap();
3448            sender.send(2).unwrap();
3449            sender.send(3).unwrap();
3450
3451            if drop_sender {
3452                drop(sender);
3453            }
3454
3455            if !coalesce {
3456                assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
3457                assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
3458                assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
3459            }
3460            assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
3461            if drop_sender {
3462                assert_matches!(
3463                    receiver.try_recv().unwrap_err().kind(),
3464                    MailboxErrorKind::Closed
3465                );
3466                // Still Closed error
3467                assert_matches!(
3468                    receiver.try_recv().unwrap_err().kind(),
3469                    MailboxErrorKind::Closed
3470                );
3471            } else {
3472                assert!(receiver.try_recv().unwrap().is_none());
3473                // Still empty
3474                assert!(receiver.try_recv().unwrap().is_none());
3475            }
3476        }
3477        // verify fn recv
3478        {
3479            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
3480            assert!(
3481                tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3482                    .await
3483                    .is_err()
3484            );
3485
3486            sender.send(4).unwrap();
3487            sender.send(5).unwrap();
3488            sender.send(6).unwrap();
3489            sender.send(7).unwrap();
3490
3491            if drop_sender {
3492                drop(sender);
3493            }
3494
3495            if !coalesce {
3496                assert_eq!(receiver.recv().await.unwrap(), 4);
3497                assert_eq!(receiver.recv().await.unwrap(), 5);
3498                assert_eq!(receiver.recv().await.unwrap(), 6);
3499            }
3500            assert_eq!(receiver.recv().await.unwrap(), 7);
3501            if drop_sender {
3502                assert_matches!(
3503                    receiver.recv().await.unwrap_err().kind(),
3504                    MailboxErrorKind::Closed
3505                );
3506                // Still None
3507                assert_matches!(
3508                    receiver.recv().await.unwrap_err().kind(),
3509                    MailboxErrorKind::Closed
3510                );
3511            } else {
3512                assert!(
3513                    tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
3514                        .await
3515                        .is_err()
3516                );
3517            }
3518        }
3519    }
3520
3521    #[tokio::test]
3522    async fn test_receiver_basic_default() {
3523        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ false).await
3524    }
3525
3526    #[tokio::test]
3527    async fn test_receiver_basic_latest() {
3528        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ false).await
3529    }
3530
3531    #[tokio::test]
3532    async fn test_receiver_after_sender_drop_default() {
3533        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ true).await
3534    }
3535
3536    #[tokio::test]
3537    async fn test_receiver_after_sender_drop_latest() {
3538        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ true).await
3539    }
3540
3541    struct Setup {
3542        receiver: PortReceiver<u64>,
3543        actor0: Instance<()>,
3544        actor1: Instance<()>,
3545        _actor0_handle: ActorHandle<()>,
3546        _actor1_handle: ActorHandle<()>,
3547        port_id: reference::PortId,
3548        port_id1: reference::PortId,
3549        port_id2: reference::PortId,
3550        port_id2_1: reference::PortId,
3551    }
3552
3553    async fn setup_split_port_ids(
3554        reducer_spec: Option<ReducerSpec>,
3555        reducer_mode: ReducerMode,
3556    ) -> Setup {
3557        let proc = Proc::local();
3558        let (actor0, actor0_handle) = proc.instance("actor0").unwrap();
3559        let (actor1, actor1_handle) = proc.instance("actor1").unwrap();
3560
3561        // Open a port on actor0
3562        let (port_handle, receiver) = actor0.open_port::<u64>();
3563        let port_id = port_handle.bind().port_id().clone();
3564
3565        // Split it twice on actor1
3566        let port_id1 = port_id
3567            .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
3568            .unwrap();
3569        let port_id2 = port_id
3570            .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
3571            .unwrap();
3572
3573        // A split port id can also be split
3574        let port_id2_1 = port_id2
3575            .split(&actor1, reducer_spec, reducer_mode.clone(), true)
3576            .unwrap();
3577
3578        Setup {
3579            receiver,
3580            actor0,
3581            actor1,
3582            _actor0_handle: actor0_handle,
3583            _actor1_handle: actor1_handle,
3584            port_id,
3585            port_id1,
3586            port_id2,
3587            port_id2_1,
3588        }
3589    }
3590
3591    fn post(cx: &impl context::Actor, port_id: reference::PortId, msg: u64) {
3592        let serialized = wirevalue::Any::serialize(&msg).unwrap();
3593        port_id.send(cx, serialized);
3594    }
3595
3596    #[async_timed_test(timeout_secs = 30)]
3597    // TODO: OSS: this test is flaky in OSS. Need to repo and fix it.
3598    #[cfg_attr(not(fbcode_build), ignore)]
3599    async fn test_split_port_id_no_reducer() {
3600        let Setup {
3601            mut receiver,
3602            actor0,
3603            actor1,
3604            port_id,
3605            port_id1,
3606            port_id2,
3607            port_id2_1,
3608            ..
3609        } = setup_split_port_ids(None, ReducerMode::default()).await;
3610        // Can send messages to receiver from all port handles
3611        post(&actor0, port_id.clone(), 1);
3612        assert_eq!(receiver.recv().await.unwrap(), 1);
3613        post(&actor1, port_id1.clone(), 2);
3614        assert_eq!(receiver.recv().await.unwrap(), 2);
3615        post(&actor1, port_id2.clone(), 3);
3616        assert_eq!(receiver.recv().await.unwrap(), 3);
3617        post(&actor1, port_id2_1.clone(), 4);
3618        assert_eq!(receiver.recv().await.unwrap(), 4);
3619
3620        // no more messages
3621        tokio::time::sleep(Duration::from_secs(2)).await;
3622        let msg = receiver.try_recv().unwrap();
3623        assert_eq!(msg, None);
3624    }
3625
3626    async fn wait_for(
3627        receiver: &mut PortReceiver<u64>,
3628        expected_size: usize,
3629        timeout_duration: Duration,
3630    ) -> anyhow::Result<Vec<u64>> {
3631        let mut messeges = vec![];
3632
3633        tokio::time::timeout(timeout_duration, async {
3634            loop {
3635                let msg = receiver.recv().await.unwrap();
3636                messeges.push(msg);
3637                if messeges.len() == expected_size {
3638                    break;
3639                }
3640            }
3641        })
3642        .await?;
3643        Ok(messeges)
3644    }
3645
3646    #[async_timed_test(timeout_secs = 30)]
3647    async fn test_split_port_id_sum_reducer() {
3648        let config = hyperactor_config::global::lock();
3649        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
3650
3651        let sum_accumulator = accum::sum::<u64>();
3652        let reducer_spec = sum_accumulator.reducer_spec();
3653        let Setup {
3654            mut receiver,
3655            actor0,
3656            actor1,
3657            port_id,
3658            port_id1,
3659            port_id2,
3660            port_id2_1,
3661            ..
3662        } = setup_split_port_ids(reducer_spec, ReducerMode::default()).await;
3663        post(&actor0, port_id.clone(), 4);
3664        post(&actor1, port_id1.clone(), 2);
3665        post(&actor1, port_id2.clone(), 3);
3666        post(&actor1, port_id2_1.clone(), 1);
3667        let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
3668            .await
3669            .unwrap();
3670        // Message might be received out of their sending out. So we sort the
3671        // messages here.
3672        messages.sort();
3673        assert_eq!(messages, vec![1, 2, 3, 4]);
3674
3675        // no more messages
3676        tokio::time::sleep(Duration::from_secs(2)).await;
3677        let msg = receiver.try_recv().unwrap();
3678        assert_eq!(msg, None);
3679    }
3680
3681    #[async_timed_test(timeout_secs = 30)]
3682    // TODO: OSS: this test is flaky in OSS. Need to repo and fix it.
3683    #[cfg_attr(not(fbcode_build), ignore)]
3684    async fn test_split_port_id_every_n_messages() {
3685        let config = hyperactor_config::global::lock();
3686        let _config_guard =
3687            config.override_key(crate::config::SPLIT_MAX_BUFFER_AGE, Duration::from_mins(10));
3688        let proc = Proc::local();
3689        let (actor, _actor_handle) = proc.instance("actor").unwrap();
3690        let (port_handle, mut receiver) = actor.open_port::<u64>();
3691        let port_id = port_handle.bind().port_id().clone();
3692        // Split it
3693        let reducer_spec = accum::sum::<u64>().reducer_spec();
3694        let split_port_id = port_id
3695            .split(
3696                &actor,
3697                reducer_spec,
3698                ReducerMode::Streaming(accum::StreamingReducerOpts {
3699                    max_update_interval: Some(Duration::from_mins(10)),
3700                    initial_update_interval: Some(Duration::from_mins(10)),
3701                }),
3702                true,
3703            )
3704            .unwrap();
3705
3706        // Send 9 messages.
3707        for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3708            post(&actor, split_port_id.clone(), msg);
3709        }
3710        // The first 5 should be batched and reduced once due
3711        // to every_n_msgs = 5.
3712        let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3713            .await
3714            .unwrap();
3715        assert_eq!(messages, vec![15]);
3716
3717        // the last message unfortranately will never come because they do not
3718        // reach batch size.
3719        tokio::time::sleep(Duration::from_secs(2)).await;
3720        let msg = receiver.try_recv().unwrap();
3721        assert_eq!(msg, None);
3722    }
3723
3724    #[async_timed_test(timeout_secs = 30)]
3725    async fn test_split_port_timeout_flush() {
3726        let config = hyperactor_config::global::lock();
3727        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
3728
3729        let Setup {
3730            mut receiver,
3731            actor0: _,
3732            actor1,
3733            port_id: _,
3734            port_id1,
3735            port_id2: _,
3736            port_id2_1: _,
3737            ..
3738        } = setup_split_port_ids(
3739            Some(accum::sum::<u64>().reducer_spec().unwrap()),
3740            ReducerMode::Streaming(accum::StreamingReducerOpts {
3741                max_update_interval: Some(Duration::from_millis(50)),
3742                initial_update_interval: Some(Duration::from_millis(50)),
3743            }),
3744        )
3745        .await;
3746
3747        post(&actor1, port_id1.clone(), 10);
3748        post(&actor1, port_id1.clone(), 20);
3749        post(&actor1, port_id1.clone(), 30);
3750
3751        // Messages should accumulate for 50ms.
3752        tokio::time::sleep(Duration::from_millis(10)).await;
3753        let msg = receiver.try_recv().unwrap();
3754        assert_eq!(msg, None);
3755
3756        // Wait until we are flushed.
3757        tokio::time::sleep(Duration::from_millis(100)).await;
3758
3759        // Now we are reduced and accumulated:
3760        let msg = receiver.recv().await.unwrap();
3761        assert_eq!(msg, 60); // 10 + 20 + 30
3762
3763        // No further messages:
3764        let msg = receiver.try_recv().unwrap();
3765        assert_eq!(msg, None);
3766    }
3767
3768    #[async_timed_test(timeout_secs = 30)]
3769    async fn test_split_port_timeout_and_size_flush() {
3770        let config = hyperactor_config::global::lock();
3771        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
3772
3773        let Setup {
3774            mut receiver,
3775            actor0: _,
3776            actor1,
3777            port_id: _,
3778            port_id1,
3779            port_id2: _,
3780            port_id2_1: _,
3781            ..
3782        } = setup_split_port_ids(
3783            Some(accum::sum::<u64>().reducer_spec().unwrap()),
3784            ReducerMode::Streaming(accum::StreamingReducerOpts {
3785                max_update_interval: Some(Duration::from_millis(50)),
3786                initial_update_interval: Some(Duration::from_millis(50)),
3787            }),
3788        )
3789        .await;
3790
3791        post(&actor1, port_id1.clone(), 10);
3792        post(&actor1, port_id1.clone(), 20);
3793        post(&actor1, port_id1.clone(), 30);
3794        post(&actor1, port_id1.clone(), 40);
3795
3796        // Should have flushed at the third message.
3797        let msg = receiver.recv().await.unwrap();
3798        assert_eq!(msg, 60);
3799
3800        // After 50ms, the next reduce will flush:
3801        let msg = receiver.recv().await.unwrap();
3802        assert_eq!(msg, 40);
3803
3804        // No further messages
3805        let msg = receiver.try_recv().unwrap();
3806        assert_eq!(msg, None);
3807    }
3808
3809    #[async_timed_test(timeout_secs = 30)]
3810    async fn test_split_port_once_mode_basic() {
3811        let proc = Proc::local();
3812        let (actor, _actor_handle) = proc.instance("actor").unwrap();
3813        let (port_handle, mut receiver) = actor.open_port::<u64>();
3814        let port_id = port_handle.bind().port_id().clone();
3815
3816        // Split with Once(3) mode - accumulate 3 values then emit
3817        let reducer_spec = accum::sum::<u64>().reducer_spec();
3818        let split_port_id = port_id
3819            .split(&actor, reducer_spec, ReducerMode::Once(3), true)
3820            .unwrap();
3821
3822        // Send 3 messages
3823        post(&actor, split_port_id.clone(), 10);
3824        post(&actor, split_port_id.clone(), 20);
3825        post(&actor, split_port_id.clone(), 30);
3826
3827        // Should receive a single reduced message
3828        let msg = receiver.recv().await.unwrap();
3829        assert_eq!(msg, 60); // 10 + 20 + 30
3830
3831        // No further messages
3832        tokio::time::sleep(Duration::from_millis(100)).await;
3833        let msg = receiver.try_recv().unwrap();
3834        assert_eq!(msg, None);
3835    }
3836
3837    #[async_timed_test(timeout_secs = 30)]
3838    async fn test_split_port_once_mode_teardown() {
3839        let proc = Proc::local();
3840        let (actor, _actor_handle) = proc.instance("actor").unwrap();
3841        let (port_handle, mut receiver) = actor.open_port::<u64>();
3842        let port_id = port_handle.bind().port_id().clone();
3843
3844        // Set up an undeliverable receiver to capture messages sent to torn-down ports
3845        let (undeliverable_handle, mut undeliverable_receiver) =
3846            undeliverable::new_undeliverable_port();
3847
3848        // Split with Once(3) mode - accumulate 3 values then emit and tear down
3849        let reducer_spec = accum::sum::<u64>().reducer_spec();
3850        let split_port_id = port_id
3851            .split(&actor, reducer_spec, ReducerMode::Once(3), true)
3852            .unwrap();
3853
3854        // Send 3 messages to trigger reduction
3855        post(&actor, split_port_id.clone(), 10);
3856        post(&actor, split_port_id.clone(), 20);
3857        post(&actor, split_port_id.clone(), 30);
3858
3859        // Should receive a single reduced message
3860        let msg = receiver.recv().await.unwrap();
3861        assert_eq!(msg, 60); // 10 + 20 + 30
3862
3863        // Now send another message - it should fail because the port is torn down
3864        let serialized = wirevalue::Any::serialize(&100u64).unwrap();
3865        let envelope = MessageEnvelope::new(
3866            actor.mailbox().actor_id().clone(),
3867            split_port_id.clone(),
3868            serialized,
3869            Flattrs::new(),
3870        );
3871        actor.mailbox().post(envelope, undeliverable_handle);
3872
3873        // Verify the message was returned as undeliverable
3874        let undeliverable =
3875            tokio::time::timeout(Duration::from_secs(2), undeliverable_receiver.recv())
3876                .await
3877                .expect("should receive undeliverable message")
3878                .expect("receiver should not be closed");
3879
3880        // Verify the undeliverable message has the correct destination
3881        assert_eq!(undeliverable.0.dest(), &split_port_id);
3882
3883        // Verify no additional messages arrived at the original receiver
3884        let msg = receiver.try_recv().unwrap();
3885        assert_eq!(msg, None);
3886    }
3887
3888    #[test]
3889    fn test_dial_mailbox_router_prefixes_empty() {
3890        assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
3891    }
3892
3893    #[test]
3894    fn test_dial_mailbox_router_prefixes_single_entry() {
3895        let router = DialMailboxRouter::new();
3896        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3897
3898        let prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3899        assert_eq!(prefixes.len(), 1);
3900        assert_eq!(prefixes[0], test_proc_ref("world0"));
3901    }
3902
3903    #[test]
3904    fn test_dial_mailbox_router_prefixes_no_overlap() {
3905        let router = DialMailboxRouter::new();
3906        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3907        router.bind(test_proc_ref("world1"), "unix!@2".parse().unwrap());
3908        router.bind(test_proc_ref("world2"), "unix!@3".parse().unwrap());
3909
3910        let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3911        prefixes.sort();
3912
3913        let mut expected = vec![
3914            test_proc_ref("world0"),
3915            test_proc_ref("world1"),
3916            test_proc_ref("world2"),
3917        ];
3918        expected.sort();
3919
3920        assert_eq!(prefixes, expected);
3921    }
3922
3923    #[test]
3924    fn test_dial_mailbox_router_prefixes_with_overlaps() {
3925        let router = DialMailboxRouter::new();
3926        // Proc refs are all independent since they have different names.
3927        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3928        router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
3929        router.bind(test_proc_ref("world0_1"), "unix!@3".parse().unwrap());
3930        router.bind(test_proc_ref("world1"), "unix!@4".parse().unwrap());
3931        router.bind(test_proc_ref("world1_0"), "unix!@5".parse().unwrap());
3932
3933        let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3934        prefixes.sort();
3935
3936        let mut expected = vec![
3937            test_proc_ref("world0"),
3938            test_proc_ref("world0_0"),
3939            test_proc_ref("world0_1"),
3940            test_proc_ref("world1"),
3941            test_proc_ref("world1_0"),
3942        ];
3943        expected.sort();
3944
3945        assert_eq!(prefixes, expected);
3946    }
3947
3948    #[test]
3949    fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
3950        let router = DialMailboxRouter::new();
3951        // Proc refs still cover their own actors (same proc_id).
3952        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
3953        router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
3954        router.bind(
3955            test_actor_ref("world0_0", "actor1"),
3956            "unix!@3".parse().unwrap(),
3957        );
3958        router.bind(test_proc_ref("world1_0"), "unix!@4".parse().unwrap());
3959        router.bind(test_proc_ref("world1_1"), "unix!@5".parse().unwrap());
3960        router.bind(
3961            test_actor_ref("world2_0", "actor0"),
3962            "unix!@6".parse().unwrap(),
3963        );
3964
3965        let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3966        prefixes.sort();
3967
3968        // Covering prefixes:
3969        // - world0 (independent proc ref)
3970        // - world0_0 (covers world0_0.actor1 since same proc_id)
3971        // - world1_0 (not covered by anything else)
3972        // - world1_1 (not covered by anything else)
3973        // - world2_0.actor0 (not covered by anything else)
3974        let mut expected = vec![
3975            test_proc_ref("world0"),
3976            test_proc_ref("world0_0"),
3977            test_proc_ref("world1_0"),
3978            test_proc_ref("world1_1"),
3979            test_actor_ref("world2_0", "actor0"),
3980        ];
3981        expected.sort();
3982
3983        assert_eq!(prefixes, expected);
3984    }
3985
3986    #[test]
3987    fn test_dial_mailbox_router_prefixes_same_level() {
3988        let router = DialMailboxRouter::new();
3989        router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
3990        router.bind(test_proc_ref("world0_1"), "unix!@2".parse().unwrap());
3991        router.bind(test_proc_ref("world0_2"), "unix!@3".parse().unwrap());
3992
3993        let mut prefixes: Vec<reference::Reference> = router.prefixes().into_iter().collect();
3994        prefixes.sort();
3995
3996        // All should be covering prefixes since none is a prefix of another
3997        let mut expected = vec![
3998            test_proc_ref("world0_0"),
3999            test_proc_ref("world0_1"),
4000            test_proc_ref("world0_2"),
4001        ];
4002        expected.sort();
4003
4004        assert_eq!(prefixes, expected);
4005    }
4006
4007    /// A forwarder that bounces messages back to the **same**
4008    /// mailbox, but does so on a task to avoid recursive stack
4009    /// growth.
4010    #[derive(Clone, Debug)]
4011    struct AsyncLoopForwarder;
4012
4013    impl MailboxSender for AsyncLoopForwarder {
4014        fn post_unchecked(
4015            &self,
4016            envelope: MessageEnvelope,
4017            return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
4018        ) {
4019            let me = self.clone();
4020            tokio::spawn(async move {
4021                // Call `post` so each hop applies TTL exactly once.
4022                me.post(envelope, return_handle);
4023            });
4024        }
4025    }
4026
4027    #[tokio::test]
4028    async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
4029        let actor_id = test_actor_id("world_0", "ttl_actor");
4030        let mailbox = Mailbox::new(
4031            actor_id.clone(),
4032            BoxedMailboxSender::new(AsyncLoopForwarder),
4033        );
4034        let (ret_port, mut ret_rx) = mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
4035
4036        // Create a destination not owned by this mailbox to force
4037        // forwarding.
4038        let remote_actor = test_actor_id("remote_world_1", "remote");
4039        let dest = reference::PortId::new(remote_actor.clone(), /*port index*/ 4242);
4040
4041        // Build an envelope (TTL is seeded in `MessageEnvelope::new` /
4042        // `::serialize`).
4043        let payload = 1234_u64;
4044        let envelope =
4045            MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Flattrs::new())
4046                .expect("serialize");
4047
4048        // Post it. This will start bouncing between forwarder and
4049        // mailbox until TTL hits 0.
4050        let return_handle = ret_port.clone();
4051        mailbox.post(envelope, return_handle);
4052
4053        // We expect the undeliverable to come back once TTL expires.
4054        let Undeliverable(undelivered) =
4055            tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
4056                .await
4057                .expect("timed out waiting for undeliverable")
4058                .expect("channel closed");
4059
4060        // Sanity: round-trip payload still deserializes.
4061        let got: u64 = undelivered.deserialized().expect("deserialize");
4062        assert_eq!(got, payload, "payload preserved");
4063    }
4064
4065    #[tokio::test]
4066    async fn message_ttl_success_local_delivery() {
4067        let actor_id = test_actor_id("world_0", "ttl_actor");
4068        let mailbox = Mailbox::new(
4069            actor_id.clone(),
4070            BoxedMailboxSender::new(PanickingMailboxSender),
4071        );
4072        let (_undeliverable_tx, mut undeliverable_rx) =
4073            mailbox.bind_actor_port::<Undeliverable<MessageEnvelope>>();
4074
4075        // Open a local user u64 port.
4076        let (user_port, mut user_rx) = mailbox.open_port::<u64>();
4077
4078        // Build an envelope destined for this mailbox's own port.
4079        let payload = 0xC0FFEE_u64;
4080        let envelope = MessageEnvelope::serialize(
4081            actor_id.clone(),
4082            user_port.bind().port_id().clone(),
4083            &payload,
4084            Flattrs::new(),
4085        )
4086        .expect("serialize");
4087
4088        // Post the message using the mailbox (local path). TTL will
4089        // not expire.
4090        let return_handle = mailbox
4091            .bound_return_handle()
4092            .unwrap_or(monitored_return_handle());
4093        mailbox.post(envelope, return_handle);
4094
4095        // We should receive the payload locally.
4096        let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
4097            .await
4098            .expect("timed out waiting for local delivery")
4099            .expect("user port closed");
4100        assert_eq!(got, payload);
4101
4102        // There should be no undeliverables arriving.
4103        let no_undeliverable =
4104            tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
4105        assert!(
4106            no_undeliverable.is_err(),
4107            "unexpected undeliverable returned on successful local delivery"
4108        );
4109    }
4110
4111    #[tokio::test]
4112    async fn test_port_contramap() {
4113        let proc = Proc::local();
4114        let (client, _) = proc.instance("client").unwrap();
4115        let (handle, mut rx) = client.open_port();
4116
4117        handle
4118            .contramap(|m| (1, m))
4119            .send(&client, "hello".to_string())
4120            .unwrap();
4121        assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
4122    }
4123
4124    #[test]
4125    #[should_panic(expected = "already bound")]
4126    fn test_bind_port_handle_to_actor_port_twice() {
4127        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4128        let (handle, _rx) = mbox.open_port::<String>();
4129        handle.bind_actor_port();
4130        handle.bind_actor_port();
4131    }
4132
4133    #[test]
4134    fn test_bind_port_handle_to_actor_port() {
4135        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4136        let default_port = mbox.actor_id().port_id(String::port());
4137        let (handle, _rx) = mbox.open_port::<String>();
4138        // Handle's port index is allocated by mailbox, not the actor port.
4139        assert_ne!(default_port.index(), handle.port_index);
4140        // Bind the handle to the actor port.
4141        handle.bind_actor_port();
4142        assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4143        // bind() can still be used, just it will not change handle's state.
4144        handle.bind();
4145        handle.bind();
4146        assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4147    }
4148
4149    #[test]
4150    #[should_panic(expected = "already bound")]
4151    fn test_bind_port_handle_to_actor_port_when_already_bound() {
4152        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
4153        let (handle, _rx) = mbox.open_port::<String>();
4154        // Bound handle to the port allocated by mailbox.
4155        handle.bind();
4156        assert_matches!(handle.location(), PortLocation::Bound(port) if port.index() == handle.port_index);
4157        // Since handle is already bound, call bind_to() on it will cause panic.
4158        handle.bind_actor_port();
4159    }
4160
4161    #[tokio::test]
4162    async fn test_mailbox_post_fails_when_actor_stopped() {
4163        let actor_id = test_actor_id("0", "stopped_actor");
4164
4165        let mailbox = Mailbox::new(
4166            actor_id.clone(),
4167            BoxedMailboxSender::new(PanickingMailboxSender),
4168        );
4169
4170        mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4171
4172        let (user_port, _user_rx) = mailbox.open_port::<u64>();
4173
4174        // Use a separate detached mailbox for the return handle since
4175        // the main mailbox is stopped and won't accept messages.
4176        let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4177
4178        let envelope = MessageEnvelope::serialize(
4179            actor_id.clone(),
4180            user_port.bind().port_id().clone(),
4181            &42u64,
4182            Flattrs::new(),
4183        )
4184        .expect("serialize");
4185
4186        mailbox.post(envelope, return_handle);
4187
4188        let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4189            .await
4190            .expect("timed out waiting for undeliverable")
4191            .expect("return port closed");
4192
4193        let err = undeliverable.0.error_msg().expect("expected error");
4194        assert!(
4195            err.contains(&format!("owner {} is stopped", actor_id)),
4196            "error should indicate actor stopped: {}",
4197            err
4198        );
4199    }
4200
4201    #[tokio::test]
4202    async fn test_mailbox_post_fails_when_actor_failed() {
4203        use crate::actor::ActorErrorKind;
4204
4205        let actor_id = test_actor_id("0", "failed_actor");
4206
4207        let mailbox = Mailbox::new(
4208            actor_id.clone(),
4209            BoxedMailboxSender::new(PanickingMailboxSender),
4210        );
4211
4212        let (user_port, _user_rx) = mailbox.open_port::<u64>();
4213
4214        mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4215            "test failure".to_string(),
4216        )));
4217
4218        // Use a separate detached mailbox for the return handle since
4219        // the main mailbox is failed and won't accept messages.
4220        let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4221
4222        let envelope = MessageEnvelope::serialize(
4223            actor_id.clone(),
4224            user_port.bind().port_id().clone(),
4225            &42u64,
4226            Flattrs::new(),
4227        )
4228        .expect("serialize");
4229
4230        mailbox.post(envelope, return_handle);
4231
4232        let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4233            .await
4234            .expect("timed out waiting for undeliverable")
4235            .expect("return port closed");
4236
4237        let err = undeliverable.0.error_msg().expect("expected error");
4238        assert!(
4239            err.contains(&format!("owner {} failed", actor_id)),
4240            "error should indicate actor failed: {}",
4241            err
4242        );
4243    }
4244
4245    #[tokio::test]
4246    async fn test_port_handle_send_fails_when_actor_stopped() {
4247        let actor_id = test_actor_id("0", "stopped_actor");
4248
4249        let mailbox = Mailbox::new(
4250            actor_id.clone(),
4251            BoxedMailboxSender::new(PanickingMailboxSender),
4252        );
4253
4254        let (port_handle, _rx) = mailbox.open_port::<u64>();
4255        let proc = Proc::local();
4256        let (client, _) = proc.instance("client").unwrap();
4257
4258        mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4259
4260        let result = port_handle.send(&client, 42u64);
4261
4262        assert!(result.is_err(), "send should fail when actor is stopped");
4263        let err = result.unwrap_err();
4264        assert_matches!(
4265            err.kind(),
4266            MailboxSenderErrorKind::Mailbox(mailbox_err)
4267                if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "test stop")
4268        );
4269    }
4270
4271    #[tokio::test]
4272    async fn test_port_handle_send_fails_when_actor_failed() {
4273        use crate::actor::ActorErrorKind;
4274
4275        let actor_id = test_actor_id("0", "failed_actor");
4276
4277        let mailbox = Mailbox::new(
4278            actor_id.clone(),
4279            BoxedMailboxSender::new(PanickingMailboxSender),
4280        );
4281
4282        let (port_handle, _rx) = mailbox.open_port::<u64>();
4283        let proc = Proc::local();
4284        let (client, _) = proc.instance("client").unwrap();
4285
4286        mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4287            "test failure".to_string(),
4288        )));
4289
4290        let result = port_handle.send(&client, 42u64);
4291
4292        assert!(result.is_err(), "send should fail when actor is failed");
4293        let err = result.unwrap_err();
4294        assert_matches!(
4295            err.kind(),
4296            MailboxSenderErrorKind::Mailbox(mailbox_err)
4297                if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg))) if msg == "test failure")
4298        );
4299    }
4300
4301    #[async_timed_test(timeout_secs = 30)]
4302    async fn test_open_reduce_port() {
4303        let proc = Proc::local();
4304        let (client, _) = proc.instance("client").unwrap();
4305
4306        // Open an accumulator port with sum reducer
4307        let (port_handle, receiver) = client.mailbox().open_reduce_port(accum::sum::<u64>());
4308
4309        // Verify the reducer_spec is set
4310        let port_ref = port_handle.bind();
4311        assert!(port_ref.reducer_spec().is_some());
4312
4313        // Send a single value via the bound port
4314        port_ref.send(&client, 42).unwrap();
4315
4316        // Should receive the value
4317        let result = receiver.recv().await.unwrap();
4318        assert_eq!(result, 42);
4319    }
4320
4321    #[async_timed_test(timeout_secs = 30)]
4322    async fn test_open_reduce_port_reducer_spec_preserved() {
4323        let proc = Proc::local();
4324        let (client, _) = proc.instance("client").unwrap();
4325
4326        // Test that different accumulators produce different reducer_specs
4327        let (sum_handle, _) = client.mailbox().open_reduce_port(accum::sum::<u64>());
4328        let sum_ref = sum_handle.bind();
4329        let sum_typehash = sum_ref.reducer_spec().as_ref().unwrap().typehash;
4330
4331        let (max_handle, _) = client
4332            .mailbox()
4333            .open_reduce_port(accum::join_semilattice::<accum::Max<u64>>());
4334        let max_ref = max_handle.bind();
4335        let max_typehash = max_ref.reducer_spec().as_ref().unwrap().typehash;
4336
4337        // Different accumulators should have different reducer typehashes
4338        assert_ne!(sum_typehash, max_typehash);
4339    }
4340}