Skip to main content

hyperactor/
mailbox.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mailboxes are the central message-passing mechanism in Hyperactor.
10//!
11//! Each actor owns a mailbox to which other actors can deliver messages.
12//! An actor can open one or more typed _ports_ in the mailbox; messages
13//! are in turn delivered to specific ports.
14//!
15//! Mailboxes are associated with an [`ActorAddr`] (given by `actor_id`
16//! in the following example):
17//!
18//! ```
19//! # use hyperactor::mailbox::Mailbox;
20//! # use hyperactor::Endpoint as _;
21//! # use hyperactor::Proc;
22//! # use hyperactor::{ActorAddr, ProcAddr};
23//! # tokio_test::block_on(async {
24//! # let proc = Proc::isolated();
25//! # let (client, _) = proc.client("client").unwrap();
26//! # let actor_id = proc.proc_addr().actor_addr("actor");
27//! let mbox = Mailbox::new(actor_id);
28//! let (port, mut receiver) = mbox.open_port::<u64>();
29//!
30//! port.post(&client, 123);
31//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
32//! # })
33//! ```
34//!
35//! Mailboxes also provide a form of one-shot ports, called [`OncePort`],
36//! that permits at most one message transmission:
37//!
38//! ```
39//! # use hyperactor::mailbox::Mailbox;
40//! # use hyperactor::Endpoint as _;
41//! # use hyperactor::Proc;
42//! # use hyperactor::{ActorAddr, ProcAddr};
43//! # tokio_test::block_on(async {
44//! # let proc = Proc::isolated();
45//! # let (client, _) = proc.client("client").unwrap();
46//! # let actor_id = proc.proc_addr().actor_addr("actor");
47//! let mbox = Mailbox::new(actor_id);
48//!
49//! let (port, receiver) = mbox.open_once_port::<u64>();
50//!
51//! port.post(&client, 123u64);
52//! assert_eq!(receiver.recv().await.unwrap(), 123u64);
53//! # })
54//! ```
55//!
56//! [`OncePort`]s are correspondingly used for RPC replies in the actor
57//! system.
58//!
59//! ## Remote ports and serialization
60//!
61//! Mailboxes allow delivery of serialized messages to named ports:
62//!
63//! 1) Ports restrict message types to (serializable) [`Message`]s.
64//! 2) Each [`Port`] is associated with a [`PortAddr`] which globally names the port.
65//! 3) [`Mailbox`] provides interfaces to deliver serialized
66//!    messages to ports named by their [`PortAddr`].
67//!
68//! While this complicates the interface somewhat, it allows the
69//! implementation to avoid a serialization roundtrip when passing
70//! messages locally.
71//!
72//! ## Undeliverable-message log invariants (UM-*)
73//!
74//! The `undelivered_message_abandoned` log at
75//! `UndeliverableMailboxSender::post_unchecked` is a user-facing
76//! surface: it fires when a message could not be delivered *and*
77//! could not be returned to its sender. The following invariants
78//! govern its shape so the log stays scannable and its downstream
79//! consumers (Scuba, alerts) stay stable.
80//!
81//! - **UM-1 (bounded abandoned-message log).** The log must not emit
82//!   unbounded `envelope.headers().to_string()` or
83//!   `envelope.data().to_string()`. Payload observability is provided
84//!   by `message_type` (`data.typename()`) and `data_len`
85//!   (`data.len()`) — cheap, bounded, and type-safe.
86//!
87//! - **UM-2 (stable compatibility fields).** The `actor_name` and
88//!   `actor_id` fields stay on the log with their current values and
89//!   types. Readability improvements are strictly additive on this
90//!   surface; renames or removals require a separate migration diff
91//!   that coordinates with downstream consumers.
92//!
93//! - **UM-3a (destination naming).** When the envelope carries no
94//!   `OPERATION_ENDPOINT`, the format string names the transport
95//!   destination: `"message not delivered to <dest>"`.
96//!
97//! - **UM-3b (operation naming).** When the envelope carries
98//!   `OPERATION_ENDPOINT`, the format string names the operation:
99//!   `"abandoned message for <endpoint>"`.
100//!
101//!   `OPERATION_*` keys live in `hyperactor::mailbox::headers`
102//!   because the readers (this log, the undeliverable formatter)
103//!   live in `hyperactor` and can't depend upward on
104//!   `monarch_hyperactor`. Keys whose consumers are not at this
105//!   layer don't belong here.
106
107use std::any::Any;
108use std::collections::BTreeMap;
109use std::collections::BTreeSet;
110use std::fmt;
111use std::fmt::Debug;
112use std::future;
113use std::future::Future;
114use std::ops::Bound::Excluded;
115use std::pin::Pin;
116use std::sync::Arc;
117use std::sync::Condvar;
118use std::sync::Mutex;
119use std::sync::RwLock;
120use std::sync::Weak;
121use std::sync::atomic::AtomicU64;
122use std::sync::atomic::AtomicUsize;
123use std::sync::atomic::Ordering;
124use std::task::Context;
125use std::task::Poll;
126
127use async_trait::async_trait;
128use dashmap::DashMap;
129use dashmap::mapref::entry::Entry;
130use futures::Sink;
131use futures::Stream;
132use hyperactor_config::Flattrs;
133use hyperactor_telemetry::hash_to_u64;
134use serde::Deserialize;
135use serde::Serialize;
136use serde::de::DeserializeOwned;
137use tokio::sync::mpsc;
138use tokio::sync::oneshot;
139use tokio::sync::watch;
140use tokio::task::JoinHandle;
141use tokio_util::sync::CancellationToken;
142use typeuri::Named;
143
144use crate::ActorAddr;
145use crate::Addr;
146use crate::Endpoint;
147use crate::EndpointLocation;
148// for macros
149use crate::OncePortRef;
150use crate::PortAddr;
151use crate::PortRef;
152use crate::Proc;
153use crate::ProcAddr;
154use crate::accum::Accumulator;
155use crate::accum::ReducerSpec;
156use crate::accum::StreamingReducerOpts;
157use crate::actor::ActorStatus;
158use crate::actor::Signal;
159use crate::actor::remote::USER_PORT_OFFSET;
160use crate::channel;
161use crate::channel::ChannelAddr;
162use crate::channel::ChannelError;
163use crate::channel::ChannelTransport;
164use crate::channel::SendError;
165use crate::channel::TxStatus;
166use crate::context;
167use crate::gateway::Gateway;
168use crate::id::ActorId;
169use crate::metrics;
170use crate::ordering::SEQ_INFO;
171use crate::ordering::SeqInfo;
172use crate::port::Port;
173
174mod undeliverable;
175/// For [`Undeliverable`], a message type for delivery failures.
176pub use undeliverable::LostMessage;
177pub use undeliverable::Undeliverable;
178pub use undeliverable::UndeliverableMessageError;
179pub use undeliverable::custom_monitored_return_handle;
180pub use undeliverable::monitored_return_handle; // TODO: Audit
181/// For [`MailboxAdminMessage`], a message type for mailbox administration.
182pub mod mailbox_admin_message;
183pub use mailbox_admin_message::MailboxAdminMessage;
184pub use mailbox_admin_message::MailboxAdminMessageHandler;
185/// For message headers and latency tracking.
186pub mod headers;
187
188/// Message collects the necessary requirements for messages that are deposited
189/// into mailboxes.
190pub trait Message: Send + Sync + 'static {}
191impl<M: Send + Sync + 'static> Message for M {}
192
193/// RemoteMessage extends [`Message`] by requiring that the messages
194/// also be serializable, and can thus traverse process boundaries.
195/// RemoteMessages must also specify a globally unique type name (a URI).
196pub trait RemoteMessage: Message + Named + Serialize + DeserializeOwned {}
197
198impl<M: Message + Named + Serialize + DeserializeOwned> RemoteMessage for M {}
199
200/// Type alias for bytestring data used throughout the system.
201pub type Data = Vec<u8>;
202
203/// Delivery errors occur during message posting.
204#[derive(
205    thiserror::Error,
206    Debug,
207    Serialize,
208    Deserialize,
209    typeuri::Named,
210    Clone,
211    PartialEq,
212    Eq
213)]
214pub enum DeliveryError {
215    /// The destination address is not reachable.
216    #[error("address not routable: {0}")]
217    Unroutable(String),
218
219    /// A broken link indicates that a link in the message
220    /// delivery path has failed.
221    #[error("broken link: {0}")]
222    BrokenLink(String),
223
224    /// A (local) mailbox delivery error.
225    #[error("mailbox error: {0}")]
226    Mailbox(String),
227
228    /// A multicast related delivery error.
229    #[error("multicast error: {0}")]
230    Multicast(String),
231
232    /// The message went through too many hops and has expired.
233    #[error("ttl expired")]
234    TtlExpired,
235}
236
237/// An envelope that carries a message destined to a remote actor.
238/// The envelope contains a serialized message along with its destination
239/// and sender.
240#[derive(Debug, Serialize, Deserialize, Clone, typeuri::Named)]
241pub struct MessageEnvelope {
242    /// The sender of this message.
243    sender: ActorAddr,
244
245    /// The destination of the message.
246    dest: PortAddr,
247
248    /// The serialized message.
249    data: wirevalue::Any,
250
251    /// Error contains a delivery error when message delivery failed.
252    errors: Vec<DeliveryError>,
253
254    /// Additional context for this message.
255    headers: Flattrs,
256
257    /// Decremented at every `MailboxSender` hop.
258    ttl: u8,
259
260    /// If true, undeliverable messages should be returned to sender. Else, they
261    /// are dropped.
262    return_undeliverable: bool,
263    // TODO: add typename, source, seq, etc.
264}
265wirevalue::register_type!(MessageEnvelope);
266
267impl MessageEnvelope {
268    /// Create a new envelope with the provided sender, destination, and message.
269    pub fn new(
270        sender: impl Into<ActorAddr>,
271        dest: impl Into<PortAddr>,
272        data: wirevalue::Any,
273        headers: Flattrs,
274    ) -> Self {
275        let sender = sender.into();
276        let dest = dest.into();
277        Self {
278            sender,
279            dest,
280            data,
281            errors: Vec::new(),
282            headers,
283            ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
284            // By default, all undeliverable messages should be returned to the sender.
285            return_undeliverable: true,
286        }
287    }
288
289    /// Create a new envelope whose sender ID is unknown.
290    pub(crate) fn new_unknown(dest: impl Into<PortAddr>, data: wirevalue::Any) -> Self {
291        // Create a synthetic "unknown" actor ID for messages with no known sender
292        let unknown_addr = ChannelAddr::any(ChannelTransport::Local);
293        let unknown_proc_ref = ProcAddr::instance(unknown_addr, "unknown");
294        let unknown_actor_ref =
295            ActorAddr::root(unknown_proc_ref, crate::id::Label::strip("unknown"));
296        Self::new(unknown_actor_ref, dest, data, Flattrs::new())
297    }
298
299    /// Construct a new serialized value by serializing the provided T-typed value.
300    pub fn serialize<T: Serialize + Named>(
301        source: impl Into<ActorAddr>,
302        dest: impl Into<PortAddr>,
303        value: &T,
304        headers: Flattrs,
305    ) -> Result<Self, wirevalue::Error> {
306        Ok(Self {
307            headers,
308            data: wirevalue::Any::serialize(value)?,
309            sender: source.into(),
310            dest: dest.into(),
311            errors: Vec::new(),
312            ttl: hyperactor_config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
313            // By default, all undeliverable messages should be returned to the sender.
314            return_undeliverable: true,
315        })
316    }
317
318    /// Returns the remaining time-to-live (TTL) for this message.
319    ///
320    /// The TTL is decremented at each `MailboxSender` hop. When it
321    /// reaches 0, the message is considered expired and is returned
322    /// to the sender as undeliverable.
323    pub fn ttl(&self) -> u8 {
324        self.ttl
325    }
326
327    /// Overrides the message’s time-to-live (TTL).
328    ///
329    /// This replaces the current TTL value (normally initialized from
330    /// `config::MESSAGE_TTL_DEFAULT`) with the provided `ttl`. The
331    /// updated envelope is returned for chaining.
332    ///
333    /// # Note
334    /// The TTL is decremented at each `MailboxSender` hop, and when
335    /// it reaches 0 the message will be treated as undeliverable.
336    pub fn set_ttl(mut self, ttl: u8) -> Self {
337        self.ttl = ttl;
338        self
339    }
340
341    /// Decrements the message's TTL by one hop.
342    ///
343    /// Returns `Ok(())` if the TTL was greater than zero and
344    /// successfully decremented. If the TTL was already zero, no
345    /// decrement occurs and `Err(DeliveryError::TtlExpired)` is
346    /// returned, indicating that the message has expired and should
347    /// be treated as undeliverable.
348    fn dec_ttl_or_err(&mut self) -> Result<(), DeliveryError> {
349        if self.ttl == 0 {
350            Err(DeliveryError::TtlExpired)
351        } else {
352            self.ttl -= 1;
353            Ok(())
354        }
355    }
356
357    /// Deserialize the message in the envelope to the provided type T.
358    pub fn deserialized<T: DeserializeOwned + Named>(&self) -> Result<T, anyhow::Error> {
359        Ok(self.data.deserialized()?)
360    }
361
362    /// The serialized message.
363    pub fn data(&self) -> &wirevalue::Any {
364        &self.data
365    }
366
367    /// The message sender.
368    pub fn sender(&self) -> &ActorAddr {
369        &self.sender
370    }
371
372    /// The destination of the message.
373    pub fn dest(&self) -> &PortAddr {
374        &self.dest
375    }
376
377    /// The message headers.
378    pub fn headers(&self) -> &Flattrs {
379        &self.headers
380    }
381
382    /// Tells whether this is a signal message.
383    pub fn is_signal(&self) -> bool {
384        self.dest.index() == Signal::port()
385    }
386
387    /// Set a delivery error for the message. If errors are already set, append
388    /// it to the existing errors.
389    pub fn set_error(&mut self, error: DeliveryError) {
390        self.errors.push(error)
391    }
392
393    /// Change the sender on the envelope in case it was set incorrectly. This
394    /// should only be used by CommActor since it is forwarding from another
395    /// sender.
396    pub fn update_sender(&mut self, sender: impl Into<ActorAddr>) {
397        self.sender = sender.into();
398    }
399
400    /// Set to true if you want this message to be returned to sender if it cannot
401    /// reach dest. This is the default.
402    /// Set to false if you want the message to be dropped instead.
403    pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
404        self.return_undeliverable = return_undeliverable;
405    }
406
407    /// The message has been determined to be undeliverable with the
408    /// provided error. Mark the envelope with the error and return to
409    /// sender.
410    pub fn undeliverable(
411        mut self,
412        error: DeliveryError,
413        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
414    ) {
415        tracing::debug!(
416            name = "undelivered_message_attempt",
417            sender = self.sender.to_string(),
418            dest = self.dest.to_string(),
419            error = error.to_string(),
420            return_handle = %return_handle,
421        );
422        metrics::MAILBOX_UNDELIVERABLE_MESSAGES.add(
423            1,
424            hyperactor_telemetry::kv_pairs!(
425                "sender_actor_id" => self.sender.to_string(),
426                "dest_actor_id" => self.dest.to_string(),
427                "message_type" => self.data.typename().unwrap_or("unknown"),
428                "error_type" =>  error.to_string(),
429            ),
430        );
431
432        self.set_error(error);
433        undeliverable::return_undeliverable(return_handle, self);
434    }
435
436    /// Get the errors of why this message was undeliverable. Empty means this
437    /// message was not determined as undeliverable.
438    pub fn errors(&self) -> &Vec<DeliveryError> {
439        &self.errors
440    }
441
442    /// Get the string representation of the errors of this message was
443    /// undeliverable. None means this message was not determined as
444    /// undeliverable.
445    pub fn error_msg(&self) -> Option<String> {
446        if self.errors.is_empty() {
447            None
448        } else {
449            Some(
450                self.errors
451                    .iter()
452                    .map(|e| e.to_string())
453                    .collect::<Vec<_>>()
454                    .join("; "),
455            )
456        }
457    }
458
459    fn open(self) -> (MessageMetadata, wirevalue::Any) {
460        let Self {
461            sender,
462            dest,
463            data,
464            errors,
465            headers,
466            ttl,
467            return_undeliverable,
468        } = self;
469
470        (
471            MessageMetadata {
472                sender,
473                dest,
474                errors,
475                headers,
476                ttl,
477                return_undeliverable,
478            },
479            data,
480        )
481    }
482
483    fn seal(metadata: MessageMetadata, data: wirevalue::Any) -> Self {
484        let MessageMetadata {
485            sender,
486            dest,
487            errors,
488            headers,
489            ttl,
490            return_undeliverable,
491        } = metadata;
492
493        Self {
494            sender,
495            dest,
496            data,
497            errors,
498            headers,
499            ttl,
500            return_undeliverable,
501        }
502    }
503
504    fn return_undeliverable(&self) -> bool {
505        self.return_undeliverable
506    }
507
508    /// Set a header value on this envelope.
509    pub fn set_header<T: Serialize>(&mut self, key: hyperactor_config::attrs::Key<T>, value: T) {
510        self.headers.set(key, value);
511    }
512}
513
514impl fmt::Display for MessageEnvelope {
515    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516        match &self.error_msg() {
517            None => write!(
518                f,
519                "{} > {}: {} {{{}}}",
520                self.sender, self.dest, self.data, self.headers
521            ),
522            Some(err) => write!(
523                f,
524                "{} > {}: {} {{{}}}: delivery error: {}",
525                self.sender, self.dest, self.data, self.headers, err
526            ),
527        }
528    }
529}
530
531/// Metadata about a message sent via a MessageEnvelope.
532#[derive(Clone)]
533pub struct MessageMetadata {
534    sender: ActorAddr,
535    dest: PortAddr,
536    errors: Vec<DeliveryError>,
537    headers: Flattrs,
538    ttl: u8,
539    return_undeliverable: bool,
540}
541
542/// Errors that occur during mailbox operations. Each error is associated
543/// with the mailbox's actor id.
544#[derive(Debug)]
545pub struct MailboxError {
546    actor_id: ActorAddr,
547    kind: MailboxErrorKind,
548}
549
550/// The kinds of mailbox errors. This enum is marked non-exhaustive to
551/// allow for extensibility.
552#[derive(thiserror::Error, Debug)]
553#[non_exhaustive]
554pub enum MailboxErrorKind {
555    /// An operation was attempted on a closed mailbox.
556    #[error("mailbox closed")]
557    Closed,
558
559    /// The port associated with an operation was invalid.
560    #[error("invalid port: {0}")]
561    InvalidPort(PortAddr),
562
563    /// There was no sender associated with the port.
564    #[error("no sender for port: {0}")]
565    NoSenderForPort(PortAddr),
566
567    /// There was no local sender associated with the port.
568    /// Returned by operations that require a local port.
569    #[error("no local sender for port: {0}")]
570    NoLocalSenderForPort(PortAddr),
571
572    /// The port was closed.
573    #[error("{0}: port closed")]
574    PortClosed(PortAddr),
575
576    /// An error occured during a send operation.
577    #[error("send {0}: {1}")]
578    Send(PortAddr, #[source] anyhow::Error),
579
580    /// An error occured during a receive operation.
581    #[error("recv {0}: {1}")]
582    Recv(PortAddr, #[source] anyhow::Error),
583
584    /// There was a serialization failure.
585    #[error("serialize: {0}")]
586    Serialize(#[source] anyhow::Error),
587
588    /// There was a deserialization failure.
589    #[error("deserialize {0}: {1}")]
590    Deserialize(&'static str, anyhow::Error),
591
592    /// There was an error during a channel operation.
593    #[error(transparent)]
594    Channel(#[from] ChannelError),
595
596    /// The owning actor terminated (either stopped or failed).
597    #[error("owner terminated: {0}")]
598    OwnerTerminated(ActorStatus),
599}
600
601impl MailboxError {
602    /// Create a new mailbox error associated with the provided actor
603    /// id and of the given kind.
604    pub fn new(actor_id: impl Into<ActorAddr>, kind: MailboxErrorKind) -> Self {
605        Self {
606            actor_id: actor_id.into(),
607            kind,
608        }
609    }
610
611    /// The address of the mailbox producing this error.
612    pub fn actor_addr(&self) -> &ActorAddr {
613        &self.actor_id
614    }
615
616    /// The error's kind.
617    pub fn kind(&self) -> &MailboxErrorKind {
618        &self.kind
619    }
620}
621
622impl fmt::Display for MailboxError {
623    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624        write!(f, "{}: ", self.actor_id)?;
625        fmt::Display::fmt(&self.kind, f)
626    }
627}
628
629impl std::error::Error for MailboxError {
630    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
631        self.kind.source()
632    }
633}
634
635/// PortLocation describes the location of a port.
636/// This is used in errors to provide a uniform data type
637/// for ports that may or may not be bound.
638#[derive(Debug, Clone)]
639pub enum PortLocation {
640    /// The port was bound: the location is its underlying bound ID.
641    Bound(PortAddr),
642    /// The port was not bound: we provide the actor ID and the message type.
643    Unbound(ActorAddr, &'static str),
644}
645
646impl PortLocation {
647    fn new_unbound<M: Message>(actor_id: ActorAddr) -> Self {
648        PortLocation::Unbound(actor_id, std::any::type_name::<M>())
649    }
650
651    #[allow(dead_code)]
652    fn new_unbound_type(actor_id: ActorAddr, ty: &'static str) -> Self {
653        PortLocation::Unbound(actor_id, ty)
654    }
655
656    /// The actor address of the location.
657    pub fn actor_addr(&self) -> ActorAddr {
658        match self {
659            PortLocation::Bound(port_addr) => port_addr.actor_addr(),
660            PortLocation::Unbound(actor_addr, _) => actor_addr.clone(),
661        }
662    }
663}
664
665impl fmt::Display for PortLocation {
666    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667        match self {
668            PortLocation::Bound(port_ref) => write!(f, "{}", port_ref),
669            PortLocation::Unbound(actor_ref, name) => write!(f, "{}<{}>", actor_ref, name),
670        }
671    }
672}
673
674/// Errors that that occur during mailbox sending operations. Each error
675/// is associated with the port ID of the operation.
676#[derive(Debug)]
677pub struct MailboxSenderError {
678    location: Box<PortLocation>,
679    kind: Box<MailboxSenderErrorKind>,
680}
681
682/// The kind of mailbox sending errors.
683#[derive(thiserror::Error, Debug)]
684pub enum MailboxSenderErrorKind {
685    /// Error during serialization.
686    #[error("serialization error: {0}")]
687    Serialize(anyhow::Error),
688
689    /// Error during deserialization.
690    #[error("deserialization error for type {0}: {1}")]
691    Deserialize(&'static str, anyhow::Error),
692
693    /// A send to an invalid port.
694    #[error("invalid port")]
695    Invalid,
696
697    /// A send to a closed port.
698    #[error("port closed")]
699    Closed,
700
701    // The following pass through underlying errors:
702    /// An underlying mailbox error.
703    #[error(transparent)]
704    Mailbox(#[from] MailboxError),
705
706    /// An underlying channel error.
707    #[error(transparent)]
708    Channel(#[from] ChannelError),
709
710    /// An other, uncategorized error.
711    #[error("send error: {0}")]
712    Other(#[from] anyhow::Error),
713
714    /// The destination was unreachable.
715    #[error("unreachable: {0}")]
716    Unreachable(anyhow::Error),
717}
718
719impl MailboxSenderError {
720    /// Create a new mailbox sender error to an unbound port.
721    pub fn new_unbound<M>(actor_id: impl Into<ActorAddr>, kind: MailboxSenderErrorKind) -> Self {
722        Self {
723            location: Box::new(PortLocation::Unbound(
724                actor_id.into(),
725                std::any::type_name::<M>(),
726            )),
727            kind: Box::new(kind),
728        }
729    }
730
731    /// Create a new mailbox sender, manually providing the type.
732    pub fn new_unbound_type(
733        actor_id: impl Into<ActorAddr>,
734        kind: MailboxSenderErrorKind,
735        ty: &'static str,
736    ) -> Self {
737        Self {
738            location: Box::new(PortLocation::Unbound(actor_id.into(), ty)),
739            kind: Box::new(kind),
740        }
741    }
742
743    /// Create a new mailbox sender error with the provided port ID and kind.
744    pub fn new_bound(port_id: impl Into<PortAddr>, kind: MailboxSenderErrorKind) -> Self {
745        Self {
746            location: Box::new(PortLocation::Bound(port_id.into())),
747            kind: Box::new(kind),
748        }
749    }
750
751    /// The location at which the error occured.
752    pub fn location(&self) -> &PortLocation {
753        &self.location
754    }
755
756    /// The kind associated with the error.
757    pub fn kind(&self) -> &MailboxSenderErrorKind {
758        &self.kind
759    }
760}
761
762impl fmt::Display for MailboxSenderError {
763    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
764        write!(f, "{}: ", self.location)?;
765        fmt::Display::fmt(&self.kind, f)
766    }
767}
768
769impl std::error::Error for MailboxSenderError {
770    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
771        self.kind.source()
772    }
773}
774
775/// MailboxSenders can send messages through ports to mailboxes. It
776/// provides a unified interface for message delivery in the system.
777#[async_trait]
778pub trait MailboxSender: Send + Sync + Any {
779    /// Apply hop semantics (TTL decrement; undeliverable on 0), then
780    /// delegate to transport.
781    fn post(
782        &self,
783        mut envelope: MessageEnvelope,
784        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
785    ) {
786        if let Err(err) = envelope.dec_ttl_or_err() {
787            envelope.undeliverable(err, return_handle);
788            return;
789        }
790        self.post_unchecked(envelope, return_handle);
791    }
792
793    /// Raw transport: **no** policy.
794    fn post_unchecked(
795        &self,
796        envelope: MessageEnvelope,
797        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
798    );
799
800    /// Wait until all messages previously posted through this sender
801    /// have been delivered (wire-acked) or confirmed undeliverable.
802    /// The default implementation is a no-op, appropriate for senders
803    /// whose `post` is synchronous (e.g. local in-process delivery).
804    async fn flush(&self) -> Result<(), anyhow::Error> {
805        Ok(())
806    }
807}
808
809/// PortSender extends [`MailboxSender`] by providing typed endpoints
810/// for sending messages over ports
811pub trait PortSender: MailboxSender {
812    /// Deliver a message to the provided port.
813    fn serialize_and_send<M: RemoteMessage>(
814        &self,
815        port: &PortRef<M>,
816        message: M,
817        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
818    ) -> Result<(), MailboxSenderError> {
819        // TODO: convert this to a undeliverable error also
820        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
821            MailboxSenderError::new_bound(
822                port.port_addr().clone(),
823                MailboxSenderErrorKind::Serialize(err.into()),
824            )
825        })?;
826        self.post(
827            MessageEnvelope::new_unknown(port.port_addr().clone(), serialized),
828            return_handle,
829        );
830        Ok(())
831    }
832
833    /// Deliver a message to a one-shot port, consuming the provided port,
834    /// which is not reusable.
835    fn serialize_and_send_once<M: RemoteMessage>(
836        &self,
837        once_port: OncePortRef<M>,
838        message: M,
839        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
840    ) -> Result<(), MailboxSenderError> {
841        let serialized = wirevalue::Any::serialize(&message).map_err(|err| {
842            MailboxSenderError::new_bound(
843                once_port.port_addr().clone(),
844                MailboxSenderErrorKind::Serialize(err.into()),
845            )
846        })?;
847        self.post(
848            MessageEnvelope::new_unknown(once_port.port_addr().clone(), serialized),
849            return_handle,
850        );
851        Ok(())
852    }
853}
854
855impl<T: ?Sized + MailboxSender> PortSender for T {}
856
857/// A perpetually closed mailbox sender. Panics if any messages are posted.
858/// Useful for tests, or where there is no meaningful mailbox sender
859/// implementation available.
860#[derive(Debug, Clone)]
861pub struct PanickingMailboxSender;
862
863#[async_trait]
864impl MailboxSender for PanickingMailboxSender {
865    fn post_unchecked(
866        &self,
867        envelope: MessageEnvelope,
868        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
869    ) {
870        panic!("panic! in the mailbox! attempted post: {}", envelope)
871    }
872}
873
874/// A mailbox sender for undeliverable messages. This will simply record
875/// any undelivered messages.
876#[derive(Debug)]
877pub struct UndeliverableMailboxSender;
878
879#[async_trait]
880impl MailboxSender for UndeliverableMailboxSender {
881    fn post_unchecked(
882        &self,
883        envelope: MessageEnvelope,
884        _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
885    ) {
886        let sender_name = envelope
887            .sender
888            .label()
889            .map_or("?".to_string(), |l| l.to_string());
890        let error_str = envelope.error_msg().unwrap_or("".to_string());
891        let operation_endpoint = envelope.headers().get(headers::OPERATION_ENDPOINT);
892        let operation_adverb = envelope.headers().get(headers::OPERATION_ADVERB);
893        // See UM-1..UM-3b in module docs.
894        match &operation_endpoint {
895            Some(endpoint) => tracing::error!(
896                name = "undelivered_message_abandoned",
897                actor_name = sender_name,
898                actor_id = envelope.sender.to_string(),
899                dest = envelope.dest.to_string(),
900                message_type = envelope.data().typename().unwrap_or("unknown"),
901                data_len = envelope.data().len(),
902                endpoint = %endpoint,
903                adverb = operation_adverb.as_deref().unwrap_or(""),
904                error = %error_str,
905                "abandoned message for {}",
906                endpoint,
907            ),
908            None => tracing::error!(
909                name = "undelivered_message_abandoned",
910                actor_name = sender_name,
911                actor_id = envelope.sender.to_string(),
912                dest = envelope.dest.to_string(),
913                message_type = envelope.data().typename().unwrap_or("unknown"),
914                data_len = envelope.data().len(),
915                error = %error_str,
916                "message not delivered to {}",
917                envelope.dest,
918            ),
919        }
920    }
921}
922
923/// Convenience boxing implementation for MailboxSender. Most APIs
924/// are parameterized on MailboxSender implementations, and it's thus
925/// difficult to work with dyn values.  BoxedMailboxSender bridges this
926/// gap by providing a concrete MailboxSender which dispatches using an
927/// underlying (boxed) dyn.
928#[derive(Clone)]
929pub struct BoxedMailboxSender(Arc<dyn MailboxSender + Send + Sync + 'static>);
930
931impl fmt::Debug for BoxedMailboxSender {
932    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
933        f.debug_struct("BoxedMailboxSender")
934            .field("sender", &"<dyn MailboxSender>")
935            .finish()
936    }
937}
938
939impl BoxedMailboxSender {
940    /// Create a new boxed sender given the provided sender implementation.
941    pub fn new(sender: impl MailboxSender + 'static) -> Self {
942        Self(Arc::new(sender))
943    }
944
945    /// Attempts to downcast the inner sender to the given concrete
946    /// type.
947    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
948        (&*self.0 as &dyn Any).downcast_ref::<T>()
949    }
950}
951
952/// Extension trait that creates a boxed clone of a MailboxSender.
953pub trait BoxableMailboxSender: MailboxSender + Clone + 'static {
954    /// A boxed clone of this MailboxSender.
955    fn boxed(&self) -> BoxedMailboxSender;
956}
957impl<T: MailboxSender + Clone + 'static> BoxableMailboxSender for T {
958    fn boxed(&self) -> BoxedMailboxSender {
959        BoxedMailboxSender::new(self.clone())
960    }
961}
962
963/// Extension trait that rehomes a MailboxSender into a BoxedMailboxSender.
964pub trait IntoBoxedMailboxSender: MailboxSender {
965    /// Rehome this MailboxSender into a BoxedMailboxSender.
966    fn into_boxed(self) -> BoxedMailboxSender;
967}
968impl<T: MailboxSender + 'static> IntoBoxedMailboxSender for T {
969    fn into_boxed(self) -> BoxedMailboxSender {
970        BoxedMailboxSender::new(self)
971    }
972}
973
974#[async_trait]
975impl MailboxSender for BoxedMailboxSender {
976    fn post_unchecked(
977        &self,
978        envelope: MessageEnvelope,
979        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
980    ) {
981        self.0.post_unchecked(envelope, return_handle);
982    }
983
984    async fn flush(&self) -> Result<(), anyhow::Error> {
985        self.0.flush().await
986    }
987}
988
989/// Errors that occur during mailbox serving.
990#[derive(thiserror::Error, Debug)]
991pub enum MailboxServerError {
992    /// An underlying channel error.
993    #[error(transparent)]
994    Channel(#[from] ChannelError),
995
996    /// An underlying mailbox sender error.
997    #[error(transparent)]
998    MailboxSender(#[from] MailboxSenderError),
999}
1000
1001/// Represents a running [`MailboxServer`]. The handle composes a
1002/// ['tokio::task::JoinHandle'] and may be joined in the same manner.
1003#[derive(Debug)]
1004pub struct MailboxServerHandle {
1005    join_handle: JoinHandle<Result<(), MailboxServerError>>,
1006    stopped_tx: watch::Sender<bool>,
1007}
1008
1009impl MailboxServerHandle {
1010    /// Signal the server to stop serving the mailbox. The caller should
1011    /// join the handle by awaiting the [`MailboxServerHandle`] future.
1012    ///
1013    /// Stop should be called at most once.
1014    pub fn stop(&self, reason: &str) {
1015        tracing::info!("stopping mailbox server; reason: {}", reason);
1016        self.stopped_tx.send(true).expect("stop called twice");
1017    }
1018
1019    /// Construct a handle from an already-spawned server task and a
1020    /// stop signal. The task must observe `stopped_rx` (the receiver
1021    /// paired with `stopped_tx`) and complete once stop is requested,
1022    /// so callers can join the handle to confirm shutdown.
1023    pub fn from_parts(
1024        join_handle: JoinHandle<Result<(), MailboxServerError>>,
1025        stopped_tx: watch::Sender<bool>,
1026    ) -> Self {
1027        Self {
1028            join_handle,
1029            stopped_tx,
1030        }
1031    }
1032}
1033
1034/// Forward future implementation to underlying handle.
1035impl Future for MailboxServerHandle {
1036    type Output = <JoinHandle<Result<(), MailboxServerError>> as Future>::Output;
1037
1038    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1039        // SAFETY: This is safe to do because self is pinned.
1040        let join_handle_pinned =
1041            unsafe { self.map_unchecked_mut(|container| &mut container.join_handle) };
1042        join_handle_pinned.poll(cx)
1043    }
1044}
1045
1046/// Serve a port on the provided [`channel::Rx`]. This dispatches all
1047/// channel messages directly to the port.
1048pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
1049    /// Serve the provided port on the given channel on this sender on
1050    /// a background task which may be joined with the returned handle.
1051    /// The task fails on any send error.
1052    fn serve(
1053        self,
1054        mut rx: impl channel::Rx<MessageEnvelope> + Send + 'static,
1055    ) -> MailboxServerHandle {
1056        // A `MailboxServer` can receive a message that couldn't
1057        // reach its destination. We can use the fact that servers are
1058        // `MailboxSender`s to attempt to forward them back to their
1059        // senders.
1060        let (return_handle, mut undeliverable_rx) = undeliverable::new_undeliverable_port();
1061        let server = self.clone();
1062        tokio::task::spawn(async move {
1063            // Create a client for this task.
1064            static NEXT_RANK: AtomicUsize = AtomicUsize::new(0);
1065            let rank = NEXT_RANK.fetch_add(1, Ordering::Relaxed);
1066            let addr = ChannelAddr::any(ChannelTransport::Local);
1067            let proc_id = ProcAddr::instance(addr, format!("mailbox_server_{}", rank));
1068            // Use this mailbox server as the forwarder, so we can use it to
1069            // return message back to the sender.
1070            //
1071            // TODO(meriksen): use a global proc here when it materializes
1072            let proc = Proc::builder()
1073                .proc_id(proc_id.id().clone())
1074                .shared_gateway(Gateway::configured(
1075                    proc_id.location().clone(),
1076                    BoxedMailboxSender::new(server),
1077                ))
1078                .build()
1079                .expect("mailbox server proc builder is valid");
1080            let (client, _) = proc.client("undeliverable_supervisor").unwrap();
1081            while let Ok(undeliverable) = undeliverable_rx.recv().await {
1082                match undeliverable {
1083                    Undeliverable::Message(mut envelope) => {
1084                        match envelope.deserialized::<Undeliverable<MessageEnvelope>>() {
1085                            Ok(Undeliverable::Message(e)) => {
1086                                // A non-returnable undeliverable.
1087                                UndeliverableMailboxSender.post(e, monitored_return_handle());
1088                                continue;
1089                            }
1090                            Ok(Undeliverable::Lost(lost)) => {
1091                                tracing::error!(
1092                                    sender = %lost.sender,
1093                                    dest = %lost.dest,
1094                                    message_type = lost.message_type.as_deref().unwrap_or("unknown"),
1095                                    error = %lost.error,
1096                                    "lost message was undeliverable"
1097                                );
1098                                continue;
1099                            }
1100                            Err(_) => {}
1101                        }
1102                        envelope.set_error(DeliveryError::BrokenLink(
1103                            "message was undeliverable".to_owned(),
1104                        ));
1105                        let sender_id: ActorAddr = envelope.sender().clone();
1106                        let return_port =
1107                            PortRef::<Undeliverable<MessageEnvelope>>::attest_handler_port(
1108                                &sender_id,
1109                            );
1110                        return_port.post_serialized(
1111                            &client,
1112                            Flattrs::new(),
1113                            wirevalue::Any::serialize(&Undeliverable::Message(envelope)).unwrap(),
1114                        );
1115                    }
1116                    Undeliverable::Lost(lost) => {
1117                        tracing::error!(
1118                            sender = %lost.sender,
1119                            dest = %lost.dest,
1120                            message_type = lost.message_type.as_deref().unwrap_or("unknown"),
1121                            error = %lost.error,
1122                            "lost message was undeliverable"
1123                        );
1124                    }
1125                }
1126            }
1127        });
1128
1129        let (stopped_tx, mut stopped_rx) = watch::channel(false);
1130        let join_handle = tokio::spawn(async move {
1131            let mut detached = false;
1132
1133            let result = loop {
1134                if *stopped_rx.borrow_and_update() {
1135                    break Ok(());
1136                }
1137
1138                tokio::select! {
1139                    message = rx.recv() => {
1140                        match message {
1141                            // Relay the message to the port directly.
1142                            Ok(envelope) => self.post(envelope, return_handle.clone()),
1143
1144                            // Closed is a "graceful" error in this case.
1145                            // We simply stop serving.
1146                            Err(ChannelError::Closed) => break Ok(()),
1147                            Err(channel_err) => break Err(MailboxServerError::from(channel_err)),
1148                        }
1149                    }
1150                    result = stopped_rx.changed(), if !detached  => {
1151                        detached = result.is_err();
1152                        if detached {
1153                            tracing::debug!(
1154                                "the mailbox server is detached for Rx {}", rx.addr()
1155                            );
1156                        } else {
1157                            tracing::debug!(
1158                                "the mailbox server is stopped for Rx {}", rx.addr()
1159                            );
1160                        }
1161                    }
1162                }
1163            };
1164
1165            // Join the channel receiver to ensure pending acks are
1166            // sent before the underlying channel server is torn down.
1167            rx.join().await;
1168
1169            result
1170        });
1171
1172        MailboxServerHandle {
1173            join_handle,
1174            stopped_tx,
1175        }
1176    }
1177}
1178
1179impl<T: MailboxSender + Clone + Sized + Sync + Send + 'static> MailboxServer for T {}
1180
1181struct Buffer<T: Message> {
1182    queue: mpsc::UnboundedSender<(T, PortHandle<Undeliverable<T>>)>,
1183    #[allow(dead_code)]
1184    processed: watch::Receiver<usize>,
1185    seq: AtomicUsize,
1186}
1187
1188impl<T: Message> Buffer<T> {
1189    fn new<Fut>(
1190        process: impl Fn(T, PortHandle<Undeliverable<T>>) -> Fut + Send + Sync + 'static,
1191    ) -> Self
1192    where
1193        Fut: Future<Output = ()> + Send + 'static,
1194    {
1195        let (queue, mut next) = mpsc::unbounded_channel();
1196        let (last_processed, processed) = watch::channel(0);
1197        crate::init::get_runtime().spawn(async move {
1198            let mut seq = 0;
1199            while let Some((msg, return_handle)) = next.recv().await {
1200                process(msg, return_handle).await;
1201                seq += 1;
1202                let _ = last_processed.send(seq);
1203            }
1204        });
1205        Self {
1206            queue,
1207            processed,
1208            seq: AtomicUsize::new(0),
1209        }
1210    }
1211
1212    fn send(
1213        &self,
1214        item: (T, PortHandle<Undeliverable<T>>),
1215    ) -> Result<(), Box<mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>>> {
1216        self.seq.fetch_add(1, Ordering::SeqCst);
1217        self.queue.send(item).map_err(Box::new)?;
1218        Ok(())
1219    }
1220}
1221
1222/// A mailbox server client that transmits messages on a Tx channel.
1223pub struct MailboxClient {
1224    // The unbounded sender.
1225    buffer: Buffer<MessageEnvelope>,
1226
1227    // To cancel monitoring tx health.
1228    _tx_monitoring: CancellationToken,
1229
1230    // Flush tracking: counts messages successfully submitted to the buffer.
1231    submitted: Arc<AtomicUsize>,
1232    // Flush tracking: counts messages whose delivery oneshot has resolved
1233    // (acked or failed).
1234    completed: Arc<AtomicUsize>,
1235    // Notifies flush waiters when `completed` changes.
1236    completed_notify: Arc<tokio::sync::Notify>,
1237}
1238
1239impl fmt::Debug for MailboxClient {
1240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1241        f.debug_struct("MailboxClient")
1242            .field("buffer", &"<Buffer>")
1243            .finish()
1244    }
1245}
1246
1247impl MailboxClient {
1248    /// Create a new client that sends messages destined for a
1249    /// [`MailboxServer`] on the provided Tx channel.
1250    pub fn new(tx: impl channel::Tx<MessageEnvelope> + Send + Sync + 'static) -> Self {
1251        let addr = tx.addr();
1252        let tx = Arc::new(tx);
1253        let tx_status = tx.status().clone();
1254        let tx_monitoring = CancellationToken::new();
1255        let completed = Arc::new(AtomicUsize::new(0));
1256        let completed_notify = Arc::new(tokio::sync::Notify::new());
1257        let buffer = {
1258            let completed = completed.clone();
1259            let completed_notify = completed_notify.clone();
1260            Buffer::new(move |envelope, return_handle| {
1261                let tx = Arc::clone(&tx);
1262                let (return_channel, return_receiver) =
1263                    oneshot::channel::<SendError<MessageEnvelope>>();
1264                // Set up for delivery failure.
1265                let return_handle_0 = return_handle.clone();
1266                let completed = completed.clone();
1267                let completed_notify = completed_notify.clone();
1268                tokio::spawn(async move {
1269                    match return_receiver.await {
1270                        Ok(SendError {
1271                            error,
1272                            message,
1273                            reason,
1274                        }) => {
1275                            message.undeliverable(
1276                                DeliveryError::BrokenLink(format!(
1277                                    "failed to enqueue in MailboxClient when processing buffer: {error} with reason {reason:?}"
1278                                )),
1279                                return_handle_0,
1280                            );
1281                        }
1282                        Err(_) => {
1283                            // Oneshot sender was dropped — message was acked.
1284                        }
1285                    }
1286                    completed.fetch_add(1, Ordering::SeqCst);
1287                    completed_notify.notify_waiters();
1288                });
1289                // Send the message for transmission.
1290                tx.try_post(envelope, return_channel);
1291                future::ready(())
1292            })
1293        };
1294        let this = Self {
1295            buffer,
1296            _tx_monitoring: tx_monitoring.clone(),
1297            submitted: Arc::new(AtomicUsize::new(0)),
1298            completed,
1299            completed_notify,
1300        };
1301        Self::monitor_tx_health(tx_status, tx_monitoring, addr);
1302        this
1303    }
1304
1305    /// Convenience constructor, to set up a mailbox client that forwards messages
1306    /// to the provided address.
1307    pub fn dial(addr: ChannelAddr) -> Result<MailboxClient, ChannelError> {
1308        Ok(MailboxClient::new(channel::dial(addr)?))
1309    }
1310
1311    // Set up a watch for the tx's health.
1312    fn monitor_tx_health(
1313        mut rx: watch::Receiver<TxStatus>,
1314        cancel_token: CancellationToken,
1315        addr: ChannelAddr,
1316    ) {
1317        crate::init::get_runtime().spawn(async move {
1318            loop {
1319                tokio::select! {
1320                    changed = rx.changed() => {
1321                        if changed.is_err() || rx.borrow().is_closed() {
1322                            let reason = rx.borrow().as_closed().map(|r| r.to_string()).unwrap_or_else(|| "unknown".to_string());
1323                            tracing::warn!("connection to {} lost: {}", addr, reason);
1324                            // TODO: Potential for supervision event
1325                            // interaction here.
1326                            break;
1327                        }
1328                    }
1329                    _ = cancel_token.cancelled() => {
1330                        break;
1331                    }
1332                }
1333            }
1334        });
1335    }
1336}
1337
1338#[async_trait]
1339impl MailboxSender for MailboxClient {
1340    #[tracing::instrument(level = "debug", skip_all)]
1341    fn post_unchecked(
1342        &self,
1343        envelope: MessageEnvelope,
1344        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1345    ) {
1346        tracing::event!(target:"messages", tracing::Level::TRACE,  "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.actor_addr(), "port"= envelope.dest.index(), "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
1347        if let Err(err) = self.buffer.send((envelope, return_handle)) {
1348            let mpsc::error::SendError((envelope, return_handle)) = *err;
1349            let err = DeliveryError::BrokenLink(
1350                "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
1351            );
1352
1353            // Failed to enqueue.
1354            envelope.undeliverable(err, return_handle);
1355        } else {
1356            self.submitted.fetch_add(1, Ordering::SeqCst);
1357        }
1358    }
1359
1360    async fn flush(&self) -> Result<(), anyhow::Error> {
1361        let target = self.submitted.load(Ordering::SeqCst);
1362        loop {
1363            if self.completed.load(Ordering::SeqCst) >= target {
1364                return Ok(());
1365            }
1366            self.completed_notify.notified().await;
1367        }
1368    }
1369}
1370
1371/// Wrapper to turn `PortAddr` into a `Sink`.
1372pub struct PortSink<C: context::Actor, M: RemoteMessage> {
1373    cx: C,
1374    port: PortRef<M>,
1375}
1376
1377impl<C: context::Actor, M: RemoteMessage> PortSink<C, M> {
1378    /// Create new PortSink
1379    pub fn new(cx: C, port: PortRef<M>) -> Self {
1380        Self { cx, port }
1381    }
1382}
1383
1384impl<C: context::Actor, M: RemoteMessage> Sink<M> for PortSink<C, M> {
1385    type Error = MailboxSenderError;
1386
1387    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1388        Poll::Ready(Ok(()))
1389    }
1390
1391    fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
1392        crate::Endpoint::post(&self.port, &self.cx, item);
1393        Ok(())
1394    }
1395
1396    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1397        Poll::Ready(Ok(()))
1398    }
1399
1400    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1401        Poll::Ready(Ok(()))
1402    }
1403}
1404
1405/// A mailbox coordinates message delivery to actors through typed
1406/// [`Port`]s associated with the mailbox.
1407#[derive(Clone, Debug)]
1408pub struct Mailbox {
1409    inner: Arc<State>,
1410}
1411
1412impl Mailbox {
1413    /// Create a mailbox associated with the provided actor ID.
1414    pub fn new(actor_id: impl Into<ActorAddr>) -> Self {
1415        Self {
1416            inner: Arc::new(State::new(actor_id.into())),
1417        }
1418    }
1419
1420    /// The actor address associated with this mailbox.
1421    pub fn actor_addr(&self) -> &ActorAddr {
1422        &self.inner.actor_id
1423    }
1424
1425    /// Open a new port that accepts M-typed messages. The returned
1426    /// port may be freely cloned, serialized, and passed around. The
1427    /// returned receiver should only be retained by the actor responsible
1428    /// for processing the delivered messages.
1429    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1430        let port_index = self.inner.allocate_port();
1431        let (sender, receiver) = mpsc::unbounded_channel::<M>();
1432        let port_id = self.inner.actor_id.port_addr(Port::from(port_index));
1433        tracing::trace!(
1434            name = "open_port",
1435            "opening port for {} at {}",
1436            self.inner.actor_id,
1437            port_id
1438        );
1439        (
1440            PortHandle::new(self.clone(), port_index, UnboundedPortSender::Mpsc(sender)),
1441            PortReceiver::new(receiver, port_id, /*coalesce=*/ false, self.clone()),
1442        )
1443    }
1444
1445    /// Bind the handler port for message type `M` to this mailbox.
1446    /// This method is normally used:
1447    ///   1. when we need to intercept a message sent to a handler, and re-route
1448    ///      that message to the returned receiver;
1449    ///   2. mock this message's handler when it is not implemented for this actor
1450    ///      type, with the returned receiver.
1451    pub(crate) fn bind_handler_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1452        let (handle, receiver) = self.open_port();
1453        handle.bind_handler_port();
1454        (handle, receiver)
1455    }
1456
1457    /// Open a new port with an accumulator with default reduce options.
1458    /// See [`open_accum_port_opts`] for more details.
1459    pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1460    where
1461        A: Accumulator + Send + Sync + 'static,
1462        A::Update: Message,
1463        A::State: Message + Default + Clone,
1464    {
1465        self.open_accum_port_opts(accum, StreamingReducerOpts::default())
1466    }
1467
1468    /// Open a new port with an accumulator. This port accepts A::Update type
1469    /// messages, accumulate them into A::State with the given accumulator.
1470    /// The latest changed state can be received from the returned receiver as
1471    /// a single A::State message. If there is no new update, the receiver will
1472    /// not receive any message.
1473    ///
1474    /// If provided, reducer mode controls reduce operations.
1475    pub fn open_accum_port_opts<A>(
1476        &self,
1477        accum: A,
1478        streaming_opts: StreamingReducerOpts,
1479    ) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1480    where
1481        A: Accumulator + Send + Sync + 'static,
1482        A::Update: Message,
1483        A::State: Message + Default + Clone,
1484    {
1485        let port_index = self.inner.allocate_port();
1486        let (sender, receiver) = mpsc::unbounded_channel::<A::State>();
1487        let port_id = self.inner.actor_id.port_addr(Port::from(port_index));
1488        let state = Mutex::new(A::State::default());
1489        let reducer_spec = accum.reducer_spec();
1490        let enqueue = move |_, update: A::Update| {
1491            let mut state = state.lock().unwrap();
1492            accum.accumulate(&mut state, update)?;
1493            let _ = sender.send(state.clone());
1494            Ok(())
1495        };
1496        (
1497            PortHandle::new_full(
1498                self.clone(),
1499                port_index,
1500                UnboundedPortSender::Func(Arc::new(enqueue)),
1501                reducer_spec,
1502                streaming_opts,
1503            ),
1504            PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()),
1505        )
1506    }
1507
1508    /// Open a port that accepts M-typed messages, using the provided function
1509    /// to enqueue.
1510    // TODO: consider making lifetime bound to Self instead.
1511    pub(crate) fn open_enqueue_port<M: Message>(
1512        &self,
1513        enqueue: impl Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1514    ) -> PortHandle<M> {
1515        PortHandle::new_full(
1516            self.clone(),
1517            self.inner.allocate_port(),
1518            UnboundedPortSender::Func(Arc::new(enqueue)),
1519            None,
1520            StreamingReducerOpts::default(),
1521        )
1522    }
1523
1524    /// Open a runtime-dispatched handler port that accepts M-typed
1525    /// messages using the provided enqueue function.
1526    pub(crate) fn open_handler_enqueue_port<M: Message>(
1527        &self,
1528        enqueue: impl Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync + 'static,
1529    ) -> PortHandle<M> {
1530        let port_index = self.inner.allocate_port();
1531        let enqueue = Arc::new(enqueue);
1532        let sender = Arc::new(HandlerPortSender::new(
1533            UnboundedPortSender::Func(enqueue),
1534            self.inner.handler_ingress.clone(),
1535        ));
1536        PortHandle::new_full(
1537            self.clone(),
1538            port_index,
1539            UnboundedPortSender::Handler(sender),
1540            None,
1541            StreamingReducerOpts::default(),
1542        )
1543    }
1544
1545    /// Open a new one-shot port that accepts M-typed messages. The
1546    /// returned port may be used to send a single message; ditto the
1547    /// receiver may receive a single message.
1548    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1549        let port_index = self.inner.allocate_port();
1550        let port_id = self.inner.actor_id.port_addr(Port::from(port_index));
1551        let (sender, receiver) = oneshot::channel::<M>();
1552        (
1553            OncePortHandle {
1554                mailbox: self.clone(),
1555                port_index,
1556                port_id: port_id.clone(),
1557                sender,
1558                reducer_spec: None,
1559            },
1560            OncePortReceiver {
1561                receiver: Some(receiver),
1562                port_id,
1563                mailbox: self.clone(),
1564            },
1565        )
1566    }
1567
1568    /// Open a new one-shot port with a reducer. This port is designed
1569    /// to be used with casting, where the port is split across multiple
1570    /// destinations and responses are accumulated using the reducer.
1571    /// The accumulator type must have a ReducerSpec.
1572    ///
1573    /// The returned handle can be bound and embedded in cast messages.
1574    /// When the message is split by CommActor, each destination receives a
1575    /// split port. Responses to split ports are accumulated using the
1576    /// accumulator's reducer, and the final accumulated result is delivered
1577    /// to the returned receiver.
1578    ///
1579    /// Note: For accumulators used with casting, `Update` and `State` types
1580    /// must be the same (e.g., `sum<u64>` where both are `u64`).
1581    pub fn open_reduce_port<A, T>(
1582        &self,
1583        accum: A,
1584    ) -> (OncePortHandle<A::State>, OncePortReceiver<A::State>)
1585    where
1586        A: Accumulator<State = T, Update = T> + Send + Sync + 'static,
1587        T: Message + Default + Clone,
1588    {
1589        let port_index = self.inner.allocate_port();
1590        let (sender, receiver) = oneshot::channel::<T>();
1591        let port_id = self.inner.actor_id.port_addr(Port::from(port_index));
1592        let reducer_spec = accum.reducer_spec();
1593        assert!(
1594            reducer_spec.is_some(),
1595            "cannot use a reduce port without a ReducerSpec"
1596        );
1597
1598        (
1599            OncePortHandle {
1600                mailbox: self.clone(),
1601                port_index,
1602                port_id: port_id.clone(),
1603                sender,
1604                reducer_spec,
1605            },
1606            OncePortReceiver {
1607                receiver: Some(receiver),
1608                port_id,
1609                mailbox: self.clone(),
1610            },
1611        )
1612    }
1613
1614    #[allow(dead_code)]
1615    fn error(&self, err: MailboxErrorKind) -> MailboxError {
1616        MailboxError::new(self.inner.actor_id.clone(), err)
1617    }
1618
1619    fn lookup_sender<M: RemoteMessage>(&self) -> Option<UnboundedPortSender<M>> {
1620        let port_index = M::port();
1621        self.inner.ports.get(&port_index).and_then(|boxed| {
1622            boxed
1623                .as_any()
1624                .downcast_ref::<UnboundedSender<M>>()
1625                .map(|s| {
1626                    assert_eq!(
1627                        s.port_id,
1628                        self.actor_addr().port_addr(Port::from(port_index)),
1629                        "port_id mismatch in downcasted UnboundedSender"
1630                    );
1631                    s.sender.clone()
1632                })
1633        })
1634    }
1635
1636    /// Retrieve the bound undeliverable handler port handle.
1637    pub fn bound_return_handle(&self) -> Option<PortHandle<Undeliverable<MessageEnvelope>>> {
1638        self.lookup_sender::<Undeliverable<MessageEnvelope>>()
1639            .map(|sender| PortHandle::new(self.clone(), self.inner.allocate_port(), sender))
1640    }
1641
1642    pub(crate) fn allocate_port(&self) -> u64 {
1643        self.inner.allocate_port()
1644    }
1645
1646    fn bind<M: RemoteMessage>(&self, handle: &PortHandle<M>) -> PortRef<M> {
1647        assert_eq!(
1648            handle.inner.mailbox.actor_addr(),
1649            self.actor_addr(),
1650            "port does not belong to mailbox"
1651        );
1652
1653        // TODO: don't even allocate a port until the port is bound. Possibly
1654        // have handles explicitly staged (unbound, bound).
1655        let port_ref = self
1656            .actor_addr()
1657            .port_addr(Port::from(handle.inner.port_index));
1658        match self.inner.ports.entry(handle.inner.port_index) {
1659            Entry::Vacant(entry) => {
1660                entry.insert(Arc::new(UnboundedSender::new(
1661                    handle.inner.sender.clone(),
1662                    port_ref.clone(),
1663                )));
1664            }
1665            Entry::Occupied(_entry) => {}
1666        }
1667
1668        PortRef::attest(port_ref)
1669    }
1670
1671    fn bind_to_handler_port<M: RemoteMessage>(&self, handle: &PortHandle<M>) {
1672        assert_eq!(
1673            handle.inner.mailbox.actor_addr(),
1674            self.actor_addr(),
1675            "port does not belong to mailbox"
1676        );
1677
1678        let port_index = M::port();
1679        let port_ref = self.actor_addr().port_addr(Port::from(port_index));
1680        match self.inner.ports.entry(port_index) {
1681            Entry::Vacant(entry) => {
1682                entry.insert(Arc::new(UnboundedSender::new(
1683                    handle.inner.sender.clone(),
1684                    port_ref,
1685                )));
1686            }
1687            Entry::Occupied(_entry) => panic!("port {} already bound", port_ref),
1688        }
1689    }
1690
1691    fn bind_once<M: RemoteMessage>(&self, handle: OncePortHandle<M>) {
1692        let port_id = handle.port_addr().clone();
1693        match self.inner.ports.entry(handle.port_index) {
1694            Entry::Vacant(entry) => {
1695                entry.insert(Arc::new(OnceSender::new(handle.sender, port_id.clone())));
1696            }
1697            Entry::Occupied(_entry) => {}
1698        }
1699    }
1700
1701    pub(crate) fn bind_untyped(&self, port_id: &PortAddr, sender: UntypedUnboundedSender) {
1702        assert_eq!(
1703            port_id.actor_addr(),
1704            *self.actor_addr(),
1705            "port does not belong to mailbox"
1706        );
1707
1708        match self.inner.ports.entry(port_id.index()) {
1709            Entry::Vacant(entry) => {
1710                entry.insert(Arc::new(sender));
1711            }
1712            Entry::Occupied(_entry) => {}
1713        }
1714    }
1715
1716    pub(crate) fn close(&self, status: ActorStatus) {
1717        let mut closed = self.inner.closed.write().unwrap();
1718        if closed.is_some() {
1719            panic!("mailbox with owner {} already closed", self.actor_addr());
1720        }
1721        let _ = closed.insert(status);
1722    }
1723
1724    /// Start draining handler ingress for this mailbox.
1725    ///
1726    /// Draining is a mailbox lifecycle property, but it is enforced
1727    /// only by runtime-dispatched handler ports. New handler work is
1728    /// rejected at the handler-port sender. Work that already entered
1729    /// a handler-port sender before draining began is allowed to finish
1730    /// enqueueing, and this method waits for those in-flight enqueue
1731    /// attempts before it returns. After this method returns, no handler
1732    /// work can still be entering the actor queue through a handler
1733    /// port.
1734    ///
1735    /// Runtime/control ports and ordinary mailbox ports remain usable
1736    /// while draining, so shutdown can continue and already accepted
1737    /// work can flush.
1738    ///
1739    /// This is distinct from [`Mailbox::close`], which marks the
1740    /// mailbox terminal and rejects all subsequent local delivery.
1741    pub(crate) fn drain(&self) {
1742        self.inner.handler_ingress.drain();
1743    }
1744}
1745
1746impl context::Mailbox for Mailbox {
1747    fn mailbox(&self) -> &Mailbox {
1748        self
1749    }
1750}
1751
1752// TODO: figure out what to do with these interfaces -- possibly these caps
1753// do not have to be private.
1754
1755/// Open a port given a capability.
1756pub fn open_port<M: Message>(cx: &impl context::Mailbox) -> (PortHandle<M>, PortReceiver<M>) {
1757    cx.mailbox().open_port()
1758}
1759
1760/// Open a one-shot port given a capability. This is a public method primarily to
1761/// enable macro-generated clients.
1762pub fn open_once_port<M: Message>(
1763    cx: &impl context::Mailbox,
1764) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1765    cx.mailbox().open_once_port()
1766}
1767
1768#[async_trait]
1769impl MailboxSender for Mailbox {
1770    /// Deliver a serialized message to the provided port ID. This method fails
1771    /// if the message does not deserialize into the expected type.
1772    fn post_unchecked(
1773        &self,
1774        envelope: MessageEnvelope,
1775        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1776    ) {
1777        metrics::MAILBOX_POSTS.add(
1778            1,
1779            hyperactor_telemetry::kv_pairs!(
1780                "actor_id" => envelope.sender.to_string(),
1781                "dest_actor_id" => envelope.dest.actor_addr().to_string(),
1782            ),
1783        );
1784        tracing::trace!(
1785            name = "post",
1786            actor_name = envelope.sender.label().map_or("?", |l| l.as_str()),
1787            actor_id = envelope.sender.to_string(),
1788            "posting message to {}",
1789            envelope.dest
1790        );
1791
1792        if envelope.dest().actor_id() != self.inner.actor_id.id() {
1793            let err = DeliveryError::Mailbox(format!(
1794                "mailbox owner {} cannot deliver to {}",
1795                self.inner.actor_id,
1796                envelope.dest().actor_addr()
1797            ));
1798            return envelope.undeliverable(err, return_handle);
1799        }
1800
1801        let port_index = envelope.dest().index();
1802
1803        // Clone the Arc<dyn SerializedSender> out of the DashMap while holding
1804        // only a short-lived read lock, then release the lock before calling
1805        // send_serialized. This prevents a deadlock that occurs when
1806        // send_serialized itself tries to post a message to another port on the
1807        // same mailbox: if both ports hash to the same DashMap shard, acquiring
1808        // the shard's write lock a second time on the same thread deadlocks
1809        // (RwLock is not reentrant). DashMap uses a random per-process hasher,
1810        // so whether two port indices collide in the same shard varies across
1811        // test runs, explaining the longstanding flaky timeout failures.
1812        let port_sender = match self.inner.ports.get(&port_index) {
1813            None => {
1814                let err = unbound_port_delivery_error(port_index, envelope.data());
1815                return envelope.undeliverable(err, return_handle);
1816            }
1817            Some(ref_) => {
1818                let closed = self.inner.closed.read().unwrap();
1819                if let Some(status) = &*closed {
1820                    match status {
1821                        ActorStatus::Stopped(reason) => {
1822                            let err = format!(
1823                                "mailbox owner {} is stopped: {}",
1824                                self.inner.actor_id, reason
1825                            );
1826                            return envelope
1827                                .undeliverable(DeliveryError::Mailbox(err), return_handle);
1828                        }
1829                        ActorStatus::Failed(actor_error) => {
1830                            let err = format!(
1831                                "mailbox owner {} failed: {}",
1832                                self.inner.actor_id, actor_error
1833                            );
1834                            return envelope
1835                                .undeliverable(DeliveryError::Mailbox(err), return_handle);
1836                        }
1837                        _ => {
1838                            let err = format!(
1839                                "mailbox owner {} closed unexpectedly: {:?}",
1840                                self.inner.actor_id, status
1841                            );
1842                            return envelope
1843                                .undeliverable(DeliveryError::Mailbox(err), return_handle);
1844                        }
1845                    }
1846                }
1847                // Clone the Arc so we can release the shard read lock before
1848                // calling send_serialized, which may re-enter post_unchecked.
1849                Arc::clone(&*ref_)
1850            }
1851        };
1852        // Shard read lock is released here when `ref_` is dropped.
1853
1854        let (metadata, data) = envelope.open();
1855        let MessageMetadata {
1856            mut headers,
1857            sender,
1858            dest,
1859            errors: metadata_errors,
1860            ttl,
1861            return_undeliverable,
1862        } = metadata;
1863
1864        let to_actor_id = hash_to_u64(&dest);
1865        let message_id = hyperactor_telemetry::generate_message_id(to_actor_id);
1866        headers.set(crate::mailbox::headers::TELEMETRY_MESSAGE_ID, message_id);
1867        // Only set sender hash if not already present (cast path
1868        // pre-sets it with the originating actor).
1869        if !headers.contains_key(crate::mailbox::headers::SENDER_ACTOR_ID_HASH) {
1870            headers.set(
1871                crate::mailbox::headers::SENDER_ACTOR_ID_HASH,
1872                hash_to_u64(&sender),
1873            );
1874        }
1875        headers.set(crate::mailbox::headers::TELEMETRY_PORT_ID, dest.index());
1876
1877        match port_sender.send_serialized(headers, data) {
1878            Ok(disposition) => {
1879                hyperactor_telemetry::notify_message_status(
1880                    hyperactor_telemetry::MessageStatusEvent {
1881                        timestamp: std::time::SystemTime::now(),
1882                        id: hyperactor_telemetry::generate_status_event_id(message_id),
1883                        message_id,
1884                        status: "queued".to_string(),
1885                    },
1886                );
1887
1888                if disposition == SerializedSendDisposition::DeliveredAndExhausted {
1889                    self.inner.ports.remove(&port_index);
1890                }
1891            }
1892            Err(SerializedSendFailure::Dead { data, headers }) => {
1893                self.inner.ports.remove(&port_index);
1894                let err = unbound_port_delivery_error(port_index, &data);
1895
1896                MessageEnvelope::seal(
1897                    MessageMetadata {
1898                        headers,
1899                        sender,
1900                        dest,
1901                        errors: metadata_errors,
1902                        ttl,
1903                        return_undeliverable,
1904                    },
1905                    data,
1906                )
1907                .undeliverable(err, return_handle)
1908            }
1909            Err(SerializedSendFailure::Error(SerializedSendError {
1910                data,
1911                error: sender_error,
1912                headers,
1913            })) => {
1914                let err = DeliveryError::Mailbox(format!("{}", sender_error));
1915
1916                MessageEnvelope::seal(
1917                    MessageMetadata {
1918                        headers,
1919                        sender,
1920                        dest,
1921                        errors: metadata_errors,
1922                        ttl,
1923                        return_undeliverable,
1924                    },
1925                    data,
1926                )
1927                .undeliverable(err, return_handle)
1928            }
1929        }
1930    }
1931}
1932
1933fn unbound_port_delivery_error(port_index: u64, data: &wirevalue::Any) -> DeliveryError {
1934    DeliveryError::Unroutable(format!(
1935        "port not bound in mailbox; port id: {}; message type: {}",
1936        port_index,
1937        data.typename().map_or_else(
1938            || format!("unregistered type hash {}", data.typehash()),
1939            |name| name.to_string(),
1940        )
1941    ))
1942}
1943
1944/// Inner state of a [`PortHandle`], shared via `Arc` to make cloning cheap
1945/// (single atomic refcount bump instead of cloning each field).
1946struct PortHandleInner<M: Message> {
1947    mailbox: Mailbox,
1948    port_index: u64,
1949    sender: UnboundedPortSender<M>,
1950    // We would like this to be a Arc<RwLock<Option<PortAddr<M>>>>, but we cannot
1951    // write down the type PortAddr<M> (M: Message), even though we cannot
1952    // legally construct such a value without M: RemoteMessage. We could consider
1953    // making PortAddr<M> valid for M: Message, but constructible only for
1954    // M: RemoteMessage, but the guarantees offered by the impossibilty of even
1955    // writing down the type are appealing.
1956    bound: Arc<RwLock<Option<PortAddr>>>,
1957    // Typehash of an optional reducer. When it's defined, we include it in port
1958    /// references to optionally enable incremental accumulation.
1959    reducer_spec: Option<ReducerSpec>,
1960    /// Streaming reducer options.
1961    streaming_opts: StreamingReducerOpts,
1962}
1963
1964impl<M: Message> fmt::Debug for PortHandleInner<M> {
1965    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1966        f.debug_struct("PortHandleInner")
1967            .field("mailbox", &self.mailbox)
1968            .field("port_index", &self.port_index)
1969            .field("sender", &self.sender)
1970            .field("bound", &self.bound)
1971            .field("reducer_spec", &self.reducer_spec)
1972            .field("streaming_opts", &self.streaming_opts)
1973            .finish()
1974    }
1975}
1976
1977/// A port to which M-typed messages can be delivered. Ports may be
1978/// serialized to be sent to other actors. However, when a port is
1979/// deserialized, it may no longer be used to send messages directly
1980/// to a mailbox since it is no longer associated with a local mailbox
1981/// ([`Mailbox::send`] will fail). However, the runtime may accept
1982/// remote Ports, and arrange for these messages to be delivered
1983/// indirectly through inter-node message passing.
1984#[derive(Debug)]
1985pub struct PortHandle<M: Message> {
1986    inner: Arc<PortHandleInner<M>>,
1987}
1988
1989impl<M: Message> PortHandle<M> {
1990    fn new_full(
1991        mailbox: Mailbox,
1992        port_index: u64,
1993        sender: UnboundedPortSender<M>,
1994        reducer_spec: Option<ReducerSpec>,
1995        streaming_opts: StreamingReducerOpts,
1996    ) -> Self {
1997        Self {
1998            inner: Arc::new(PortHandleInner {
1999                mailbox,
2000                port_index,
2001                sender,
2002                bound: Arc::new(RwLock::new(None)),
2003                reducer_spec,
2004                streaming_opts,
2005            }),
2006        }
2007    }
2008
2009    fn new(mailbox: Mailbox, port_index: u64, sender: UnboundedPortSender<M>) -> Self {
2010        Self::new_full(
2011            mailbox,
2012            port_index,
2013            sender,
2014            None,
2015            StreamingReducerOpts::default(),
2016        )
2017    }
2018
2019    pub(crate) fn location(&self) -> PortLocation {
2020        match self.inner.bound.read().unwrap().as_ref() {
2021            Some(port_id) => PortLocation::Bound(port_id.clone()),
2022            None => PortLocation::new_unbound::<M>(self.inner.mailbox.actor_addr().clone()),
2023        }
2024    }
2025
2026    pub(crate) fn try_send<C>(&self, cx: &C, message: M) -> Result<(), MailboxSenderError>
2027    where
2028        C: context::Actor,
2029    {
2030        let closed = self.inner.mailbox.inner.closed.read().unwrap();
2031
2032        if let Some(status) = &*closed {
2033            let err = MailboxError {
2034                actor_id: self.inner.mailbox.actor_addr().clone(),
2035                kind: MailboxErrorKind::OwnerTerminated(status.clone()),
2036            };
2037            return Err(MailboxSenderError::new_unbound::<M>(
2038                self.inner.mailbox.actor_addr().clone(),
2039                MailboxSenderErrorKind::Mailbox(err),
2040            ));
2041        }
2042        let mut headers = Flattrs::new();
2043
2044        crate::mailbox::headers::set_send_timestamp(&mut headers);
2045        crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
2046        // Hold read lock while checking and sending to prevent race with bind().
2047        // Message sent from handle is delivered immediately. It could race with
2048        // messages from refs. So we need to assign seq if the handle is bound.
2049        let bound_guard = self.inner.bound.read().unwrap();
2050        if let Some(bound_port) = bound_guard.as_ref() {
2051            let sequencer = cx.instance().sequencer();
2052            let bound_ref: PortAddr = bound_port.clone();
2053            let seq_info = sequencer.assign_seq(&bound_ref);
2054            headers.set(SEQ_INFO, seq_info);
2055        } else {
2056            // Because the port is not bound, messages can only be sent through
2057            // this port handle's underlying tokio channel directly. As a result,
2058            // we do not need to assign seq to the message. We do not need to
2059            // worry about race condition due to bound_guard.
2060            headers.set(SEQ_INFO, SeqInfo::Direct);
2061        }
2062        // Encountering error means the port is closed. So we do not need to
2063        // rollback the seq, because no message can be delivered to it, and
2064        // subsequently do not need to worry about out-of-sequence for messages
2065        // after this seq.
2066        //
2067        // Theoretically, we could have deadlock if
2068        //   1. `sender.send` attempts to hold read lock of this PortHandle's
2069        //      `bound` field, and in the meantime,
2070        //   2.  another thread is trying to bind and thus waiting for write lock.
2071        // But we do not expect `sender.send` to use the same PortHandle, so this
2072        // deadlock scenario should not happen.
2073        self.inner.sender.send(headers, message).map_err(|err| {
2074            MailboxSenderError::new_unbound::<M>(
2075                self.inner.mailbox.actor_addr().clone(),
2076                classify_sender_error(err),
2077            )
2078        })
2079    }
2080}
2081
2082impl<M> Endpoint<M> for &PortHandle<M>
2083where
2084    M: Message,
2085{
2086    fn endpoint_location(&self) -> EndpointLocation {
2087        self.location().into()
2088    }
2089
2090    fn post<C>(self, cx: &C, message: M)
2091    where
2092        C: context::Actor,
2093    {
2094        if let Err(err) = self.try_send(cx, message) {
2095            cx.instance()
2096                .report_lost_message(LostMessage::from_send_error::<M>(
2097                    cx.mailbox().actor_addr().clone(),
2098                    self.endpoint_location(),
2099                    &err,
2100                ));
2101        }
2102    }
2103}
2104
2105impl<M: Message> PortHandle<M> {
2106    /// A contravariant map: using the provided function to translate
2107    /// `R`-typed messages to `M`-typed ones, delivered on this port.
2108    pub fn contramap<R, F>(&self, unmap: F) -> PortHandle<R>
2109    where
2110        R: Message,
2111        F: Fn(R) -> M + Send + Sync + 'static,
2112    {
2113        let port_index = self.inner.mailbox.inner.allocate_port();
2114        let sender = self.inner.sender.clone();
2115        PortHandle::new(
2116            self.inner.mailbox.clone(),
2117            port_index,
2118            UnboundedPortSender::Func(Arc::new(move |headers, value: R| {
2119                sender.send(headers, unmap(value))
2120            })),
2121        )
2122    }
2123}
2124
2125impl<M: RemoteMessage> PortHandle<M> {
2126    /// Bind this port, making it accessible to remote actors.
2127    pub fn bind(&self) -> PortRef<M> {
2128        let port_ref = {
2129            let mut guard = self.inner.bound.write().unwrap();
2130            guard
2131                .get_or_insert_with(|| self.inner.mailbox.bind(self).into_port_addr())
2132                .clone()
2133        };
2134        PortRef::attest_reducible(
2135            port_ref,
2136            self.inner.reducer_spec.clone(),
2137            self.inner.streaming_opts.clone(),
2138        )
2139    }
2140
2141    /// Bind this handle to the handler port for message type `M`. This method
2142    /// will panic if the handle is already bound.
2143    ///
2144    /// This is used by [`actor::Binder`] implementations to bind actor refs.
2145    /// This is not intended for general use.
2146    pub(crate) fn bind_handler_port(&self) {
2147        let port_id = self
2148            .inner
2149            .mailbox
2150            .actor_addr()
2151            .port_addr(Port::from(M::port()));
2152        {
2153            let mut guard = self.inner.bound.write().unwrap();
2154            if guard.is_some() {
2155                panic!(
2156                    "could not bind port handle {} as {port_id}: already bound",
2157                    self.inner.port_index
2158                );
2159            }
2160            *guard = Some(port_id);
2161        }
2162        self.inner.mailbox.bind_to_handler_port(self);
2163    }
2164}
2165
2166impl<M: Message> Clone for PortHandle<M> {
2167    fn clone(&self) -> Self {
2168        Self {
2169            inner: Arc::clone(&self.inner),
2170        }
2171    }
2172}
2173
2174impl<M: Message> fmt::Display for PortHandle<M> {
2175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2176        fmt::Display::fmt(&self.location(), f)
2177    }
2178}
2179
2180/// A one-shot port handle to which M-typed messages can be delivered.
2181#[derive(Debug)]
2182pub struct OncePortHandle<M: Message> {
2183    mailbox: Mailbox,
2184    port_index: u64,
2185    port_id: PortAddr,
2186    sender: oneshot::Sender<M>,
2187    reducer_spec: Option<ReducerSpec>,
2188}
2189
2190impl<M: Message> OncePortHandle<M> {
2191    /// This port's address.
2192    // TODO: make value
2193    pub fn port_addr(&self) -> &PortAddr {
2194        &self.port_id
2195    }
2196}
2197
2198impl<M> Endpoint<M> for OncePortHandle<M>
2199where
2200    M: Message,
2201{
2202    fn endpoint_location(&self) -> EndpointLocation {
2203        EndpointLocation::Port(self.port_id.clone())
2204    }
2205
2206    fn post<C>(self, _cx: &C, message: M)
2207    where
2208        C: context::Actor,
2209    {
2210        // TODO: Assign seq to the message if the port is bound to a handler port
2211        // in the future.
2212        assert!(
2213            !self.port_addr().is_handler_port(),
2214            "OncePortHandle currently does not support handler ports; a \
2215            prerequisite of that support is to assign seq to messages \
2216            if the port is a handler port."
2217        );
2218
2219        let actor_id = self.mailbox.actor_addr().clone();
2220        let endpoint_location = self.endpoint_location();
2221        if let Err(err) = self.sender.send(message).map_err(|_| {
2222            // Here, the value is returned when the port is
2223            // closed.  We should consider having a similar
2224            // API for send_once, though arguably it makes less
2225            // sense in this context.
2226            MailboxSenderError::new_unbound::<M>(actor_id, MailboxSenderErrorKind::Closed)
2227        }) {
2228            _cx.instance()
2229                .report_lost_message(LostMessage::from_send_error::<M>(
2230                    _cx.mailbox().actor_addr().clone(),
2231                    endpoint_location,
2232                    &err,
2233                ));
2234        }
2235    }
2236}
2237
2238impl<M: RemoteMessage> OncePortHandle<M> {
2239    /// Turn this handle into a ref that may be passed to
2240    /// a remote actor. The remote actor can then use the
2241    /// ref to send a message to the port. Creating a ref also
2242    /// binds the port, so that it is remotely writable.
2243    pub fn bind(self) -> OncePortRef<M> {
2244        let port_id: PortAddr = self.port_addr().clone();
2245        let reducer_spec = self.reducer_spec.clone();
2246        self.mailbox.clone().bind_once(self);
2247        OncePortRef::attest_reducible(port_id, reducer_spec)
2248    }
2249}
2250
2251impl<M: Message> fmt::Display for OncePortHandle<M> {
2252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2253        fmt::Display::fmt(&self.port_addr(), f)
2254    }
2255}
2256
2257/// A receiver of M-typed messages, used by actors to receive messages
2258/// on open ports.
2259#[derive(Debug)]
2260pub struct PortReceiver<M> {
2261    receiver: mpsc::UnboundedReceiver<M>,
2262    port_id: PortAddr,
2263    /// When multiple messages are put in channel, only receive the latest one
2264    /// if coalesce is true. Other messages will be discarded.
2265    coalesce: bool,
2266    /// State is used to remove the port from service when the receiver
2267    /// is dropped.
2268    mailbox: Mailbox,
2269}
2270
2271impl<M> PortReceiver<M> {
2272    fn new(
2273        receiver: mpsc::UnboundedReceiver<M>,
2274        port_id: PortAddr,
2275        coalesce: bool,
2276        mailbox: Mailbox,
2277    ) -> Self {
2278        Self {
2279            receiver,
2280            port_id,
2281            coalesce,
2282            mailbox,
2283        }
2284    }
2285
2286    /// Tries to receive the next value for this receiver.
2287    /// This function returns `Ok(None)` if the receiver is empty
2288    /// and returns a MailboxError if the receiver is disconnected.
2289    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxError`.
2290    pub fn try_recv(&mut self) -> Result<Option<M>, MailboxError> {
2291        let mut next = self.receiver.try_recv();
2292        // To coalesce, drain the mpsc queue and only keep the last one.
2293        if self.coalesce
2294            && let Some(latest) = self.drain().pop()
2295        {
2296            next = Ok(latest);
2297        }
2298        match next {
2299            Ok(msg) => Ok(Some(msg)),
2300            Err(mpsc::error::TryRecvError::Empty) => Ok(None),
2301            Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxError::new(
2302                self.actor_addr().clone(),
2303                MailboxErrorKind::Closed,
2304            )),
2305        }
2306    }
2307
2308    /// Receive the next message from the port corresponding with this
2309    /// receiver.
2310    pub async fn recv(&mut self) -> Result<M, MailboxError> {
2311        let mut next = self.receiver.recv().await;
2312        // To coalesce, get the last message from the queue if there are
2313        // more on the mspc queue.
2314        if self.coalesce
2315            && let Some(latest) = self.drain().pop()
2316        {
2317            next = Some(latest);
2318        }
2319        next.ok_or(MailboxError::new(
2320            self.actor_addr().clone(),
2321            MailboxErrorKind::Closed,
2322        ))
2323    }
2324
2325    /// Drains all available messages from the port.
2326    pub fn drain(&mut self) -> Vec<M> {
2327        let mut drained: Vec<M> = Vec::new();
2328        while let Ok(msg) = self.receiver.try_recv() {
2329            // To coalesce, discard the old message if there is any.
2330            if self.coalesce {
2331                drained.pop();
2332            }
2333            drained.push(msg);
2334        }
2335        drained
2336    }
2337
2338    fn port(&self) -> u64 {
2339        self.port_id.index()
2340    }
2341
2342    fn actor_addr(&self) -> ActorAddr {
2343        self.port_id.actor_addr()
2344    }
2345}
2346
2347impl<M> Drop for PortReceiver<M> {
2348    fn drop(&mut self) {
2349        // MARIUS: do we need to tombstone these? or should we
2350        // error out if we have removed the receiver before serializing the port ref?
2351        // ("no longer live")?
2352        self.mailbox.inner.ports.remove(&self.port());
2353    }
2354}
2355
2356impl<M> Stream for PortReceiver<M> {
2357    type Item = Result<M, MailboxError>;
2358
2359    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2360        std::pin::pin!(self.recv()).poll(cx).map(Some)
2361    }
2362}
2363
2364/// A receiver of M-typed messages from [`OncePort`]s.
2365pub struct OncePortReceiver<M> {
2366    receiver: Option<oneshot::Receiver<M>>,
2367    port_id: PortAddr,
2368
2369    /// Mailbox is used to remove the port from service when the receiver
2370    /// is dropped.
2371    mailbox: Mailbox,
2372}
2373
2374impl<M> OncePortReceiver<M> {
2375    /// Receive message from the one-shot port associated with this
2376    /// receiver.  Recv consumes the receiver: it is no longer valid
2377    /// after this call.
2378    pub async fn recv(mut self) -> Result<M, MailboxError> {
2379        std::mem::take(&mut self.receiver)
2380            .unwrap()
2381            .await
2382            .map_err(|err| {
2383                MailboxError::new(
2384                    self.actor_addr().clone(),
2385                    MailboxErrorKind::Recv(self.port_id.clone(), err.into()),
2386                )
2387            })
2388    }
2389
2390    fn port(&self) -> u64 {
2391        self.port_id.index()
2392    }
2393
2394    fn actor_addr(&self) -> ActorAddr {
2395        self.port_id.actor_addr()
2396    }
2397}
2398
2399impl<M> Drop for OncePortReceiver<M> {
2400    fn drop(&mut self) {
2401        // MARIUS: do we need to tombstone these? or should we
2402        // error out if we have removed the receiver before serializing the port ref?
2403        // ("no longer live")?
2404        self.mailbox.inner.ports.remove(&self.port());
2405    }
2406}
2407
2408#[derive(Clone, Copy, Debug, PartialEq, Eq)]
2409pub(crate) enum SerializedSendDisposition {
2410    Delivered,
2411    DeliveredAndExhausted,
2412}
2413
2414/// Error that that occur during `SerializedSender::send_serialized`.
2415pub(crate) struct SerializedSendError {
2416    /// The headers associated with the message.
2417    pub(crate) headers: Flattrs,
2418    /// The message was tried to send.
2419    pub(crate) data: wirevalue::Any,
2420    /// The mailbox sender error that occurred.
2421    pub(crate) error: MailboxSenderError,
2422}
2423
2424pub(crate) enum SerializedSendFailure {
2425    Dead {
2426        headers: Flattrs,
2427        data: wirevalue::Any,
2428    },
2429    Error(SerializedSendError),
2430}
2431
2432/// SerializedSender encapsulates senders:
2433///   - It performs type erasure (and thus it is object-safe).
2434///   - It abstracts over [`Port`]s and [`OncePort`]s, by dynamically tracking the
2435///     validity of the underlying port.
2436trait SerializedSender: Send + Sync {
2437    /// Enables downcasting from `&dyn SerializedSender` to concrete
2438    /// types.
2439    ///
2440    /// Used by `Mailbox::lookup_sender` to downcast to
2441    /// `&UnboundedSender<M>` via `Any::downcast_ref`.
2442    fn as_any(&self) -> &dyn Any;
2443
2444    /// Send a serialized message. SerializedSender will deserialize the
2445    /// message (failing if it fails to deserialize), and then send the
2446    /// resulting message on the underlying port.
2447    ///
2448    /// The returned disposition describes successful delivery. Errors
2449    /// report both the failed message and whether the sender remains live.
2450    fn send_serialized(
2451        &self,
2452        headers: Flattrs,
2453        serialized: wirevalue::Any,
2454    ) -> Result<SerializedSendDisposition, SerializedSendFailure>;
2455}
2456
2457#[derive(Debug, thiserror::Error)]
2458#[error("handler port closed")]
2459struct HandlerPortClosedError;
2460
2461fn classify_sender_error(err: anyhow::Error) -> MailboxSenderErrorKind {
2462    if err.is::<HandlerPortClosedError>() {
2463        MailboxSenderErrorKind::Closed
2464    } else {
2465        MailboxSenderErrorKind::Other(err)
2466    }
2467}
2468
2469/// A sender to an M-typed unbounded port.
2470enum UnboundedPortSender<M: Message> {
2471    /// Send directly to the mpsc queue.
2472    Mpsc(mpsc::UnboundedSender<M>),
2473    /// Use the provided function to enqueue the item.
2474    Func(Arc<dyn Fn(Flattrs, M) -> Result<(), anyhow::Error> + Send + Sync>),
2475    /// A runtime-dispatched handler port that observes mailbox drain state.
2476    Handler(Arc<HandlerPortSender<M>>),
2477}
2478
2479impl<M: Message> UnboundedPortSender<M> {
2480    fn send(&self, headers: Flattrs, message: M) -> Result<(), anyhow::Error> {
2481        match self {
2482            Self::Mpsc(sender) => sender.send(message).map_err(anyhow::Error::from),
2483            Self::Func(func) => func(headers, message),
2484            Self::Handler(sender) => sender.send(headers, message),
2485        }
2486    }
2487}
2488
2489// We implement Clone manually as derive(Clone) places unnecessarily
2490// strict bounds on the type parameter M.
2491impl<M: Message> Clone for UnboundedPortSender<M> {
2492    fn clone(&self) -> Self {
2493        match self {
2494            Self::Mpsc(sender) => Self::Mpsc(sender.clone()),
2495            Self::Func(func) => Self::Func(func.clone()),
2496            Self::Handler(sender) => Self::Handler(sender.clone()),
2497        }
2498    }
2499}
2500
2501impl<M: Message> Debug for UnboundedPortSender<M> {
2502    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2503        match self {
2504            Self::Mpsc(q) => f.debug_tuple("UnboundedPortSender::Mpsc").field(q).finish(),
2505            Self::Func(_) => f
2506                .debug_tuple("UnboundedPortSender::Func")
2507                .field(&"..")
2508                .finish(),
2509            Self::Handler(_) => f
2510                .debug_tuple("UnboundedPortSender::Handler")
2511                .field(&"..")
2512                .finish(),
2513        }
2514    }
2515}
2516
2517const HANDLER_INGRESS_DRAINING: usize = 1usize << (usize::BITS as usize - 1);
2518const HANDLER_INGRESS_ACTIVE_MASK: usize = !HANDLER_INGRESS_DRAINING;
2519
2520struct HandlerIngressGate {
2521    state: AtomicUsize,
2522    wait_lock: Mutex<()>,
2523    drained: Condvar,
2524}
2525
2526struct HandlerIngressGuard {
2527    gate: Arc<HandlerIngressGate>,
2528}
2529
2530impl HandlerIngressGate {
2531    fn new() -> Self {
2532        Self {
2533            state: AtomicUsize::new(0),
2534            wait_lock: Mutex::new(()),
2535            drained: Condvar::new(),
2536        }
2537    }
2538
2539    fn try_enter(self: &Arc<Self>) -> Result<HandlerIngressGuard, HandlerPortClosedError> {
2540        let mut state = self.state.load(Ordering::Acquire);
2541        loop {
2542            if state & HANDLER_INGRESS_DRAINING != 0 {
2543                return Err(HandlerPortClosedError);
2544            }
2545
2546            let active = state & HANDLER_INGRESS_ACTIVE_MASK;
2547            assert!(
2548                active < HANDLER_INGRESS_ACTIVE_MASK,
2549                "too many active handler ingress sends"
2550            );
2551
2552            match self.state.compare_exchange_weak(
2553                state,
2554                state + 1,
2555                Ordering::AcqRel,
2556                Ordering::Acquire,
2557            ) {
2558                Ok(_) => {
2559                    return Ok(HandlerIngressGuard {
2560                        gate: Arc::clone(self),
2561                    });
2562                }
2563                Err(next_state) => state = next_state,
2564            }
2565        }
2566    }
2567
2568    fn drain(&self) {
2569        let mut state = self.state.load(Ordering::Acquire);
2570        loop {
2571            if state & HANDLER_INGRESS_DRAINING != 0 {
2572                break;
2573            }
2574            match self.state.compare_exchange_weak(
2575                state,
2576                state | HANDLER_INGRESS_DRAINING,
2577                Ordering::AcqRel,
2578                Ordering::Acquire,
2579            ) {
2580                Ok(_) => break,
2581                Err(next_state) => state = next_state,
2582            }
2583        }
2584
2585        let mut wait_guard = self.wait_lock.lock().unwrap();
2586        while self.state.load(Ordering::Acquire) & HANDLER_INGRESS_ACTIVE_MASK != 0 {
2587            wait_guard = self.drained.wait(wait_guard).unwrap();
2588        }
2589    }
2590}
2591
2592impl Drop for HandlerIngressGuard {
2593    fn drop(&mut self) {
2594        let previous = self.gate.state.fetch_sub(1, Ordering::AcqRel);
2595        assert!(
2596            previous & HANDLER_INGRESS_ACTIVE_MASK != 0,
2597            "handler ingress active count underflow"
2598        );
2599        if previous & HANDLER_INGRESS_DRAINING != 0 && previous & HANDLER_INGRESS_ACTIVE_MASK == 1 {
2600            // Pair only the final active-count decrement during drain
2601            // with the drain waiter's condvar mutex. Ordinary send
2602            // completion stays on the atomic fast path, but the final
2603            // sender still cannot notify between the waiter's state
2604            // check and its transition to sleep.
2605            let _wait_guard = self.gate.wait_lock.lock().unwrap();
2606            self.gate.drained.notify_all();
2607        }
2608    }
2609}
2610
2611struct HandlerPortSender<M: Message> {
2612    sender: UnboundedPortSender<M>,
2613    gate: Arc<HandlerIngressGate>,
2614}
2615
2616impl<M: Message> HandlerPortSender<M> {
2617    fn new(sender: UnboundedPortSender<M>, gate: Arc<HandlerIngressGate>) -> Self {
2618        Self { sender, gate }
2619    }
2620
2621    fn send(&self, headers: Flattrs, message: M) -> Result<(), anyhow::Error> {
2622        let _guard = self.gate.try_enter()?;
2623        self.sender.send(headers, message)
2624    }
2625}
2626
2627struct UnboundedSender<M: Message> {
2628    sender: UnboundedPortSender<M>,
2629    port_id: PortAddr,
2630}
2631
2632impl<M: Message> UnboundedSender<M> {
2633    /// Create a new UnboundedSender encapsulating the provided
2634    /// sender.
2635    fn new(sender: UnboundedPortSender<M>, port_id: PortAddr) -> Self {
2636        Self { sender, port_id }
2637    }
2638
2639    #[allow(dead_code)]
2640    fn send(&self, headers: Flattrs, message: M) -> Result<(), MailboxSenderError> {
2641        self.sender.send(headers, message).map_err(|err| {
2642            MailboxSenderError::new_bound(self.port_id.clone(), classify_sender_error(err))
2643        })
2644    }
2645}
2646
2647// Clone is implemented explicitly because the derive macro demands M:
2648// Clone directly. In this case, it isn't needed because Arc<T> can
2649// clone for any T.
2650impl<M: Message> Clone for UnboundedSender<M> {
2651    fn clone(&self) -> Self {
2652        Self {
2653            sender: self.sender.clone(),
2654            port_id: self.port_id.clone(),
2655        }
2656    }
2657}
2658
2659impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
2660    fn as_any(&self) -> &dyn Any {
2661        self
2662    }
2663
2664    fn send_serialized(
2665        &self,
2666        headers: Flattrs,
2667        serialized: wirevalue::Any,
2668    ) -> Result<SerializedSendDisposition, SerializedSendFailure> {
2669        // Here, the stack ensures that this port is only instantiated for M-typed messages.
2670        // This does not protect against bad senders (e.g., encoding wrongly-typed messages),
2671        // but it is required as we have some usages that rely on representational equivalence
2672        // to provide type indexing, specifically in `IndexedErasedUnbound` which is used to
2673        // support port aggregation.
2674        match serialized.deserialized_unchecked() {
2675            Ok(message) => match self.sender.send(headers.clone(), message) {
2676                Ok(()) => Ok(SerializedSendDisposition::Delivered),
2677                Err(_) if matches!(&self.sender, UnboundedPortSender::Mpsc(_)) => {
2678                    Err(SerializedSendFailure::Dead {
2679                        data: serialized,
2680                        headers,
2681                    })
2682                }
2683                Err(err) => Err(SerializedSendFailure::Error(SerializedSendError {
2684                    data: serialized,
2685                    error: MailboxSenderError::new_bound(
2686                        self.port_id.clone(),
2687                        classify_sender_error(err),
2688                    ),
2689                    headers,
2690                })),
2691            },
2692            Err(err) => Err(SerializedSendFailure::Error(SerializedSendError {
2693                data: serialized,
2694                error: MailboxSenderError::new_bound(
2695                    self.port_id.clone(),
2696                    MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2697                ),
2698                headers,
2699            })),
2700        }
2701    }
2702}
2703
2704/// OnceSender encapsulates an underlying one-shot sender, dynamically
2705/// tracking its validity.
2706#[derive(Debug)]
2707struct OnceSender<M: Message> {
2708    sender: Arc<Mutex<Option<oneshot::Sender<M>>>>,
2709    port_id: PortAddr,
2710}
2711
2712impl<M: Message> OnceSender<M> {
2713    /// Create a new OnceSender encapsulating the provided one-shot
2714    /// sender.
2715    fn new(sender: oneshot::Sender<M>, port_id: PortAddr) -> Self {
2716        Self {
2717            sender: Arc::new(Mutex::new(Some(sender))),
2718            port_id,
2719        }
2720    }
2721
2722    fn send_once(&self, message: M) -> Result<SerializedSendDisposition, MailboxSenderError> {
2723        // TODO: we should replace the sender on error
2724        match self.sender.lock().unwrap().take() {
2725            None => Err(MailboxSenderError::new_bound(
2726                self.port_id.clone(),
2727                MailboxSenderErrorKind::Closed,
2728            )),
2729            Some(sender) => {
2730                sender.send(message).map_err(|_| {
2731                    // Here, the value is returned when the port is
2732                    // closed.  We should consider having a similar
2733                    // API for send_once, though arguably it makes less
2734                    // sense in this context.
2735                    MailboxSenderError::new_bound(
2736                        self.port_id.clone(),
2737                        MailboxSenderErrorKind::Closed,
2738                    )
2739                })?;
2740                Ok(SerializedSendDisposition::DeliveredAndExhausted)
2741            }
2742        }
2743    }
2744}
2745
2746// Clone is implemented explicitly because the derive macro demands M:
2747// Clone directly. In this case, it isn't needed because Arc<T> can
2748// clone for any T.
2749impl<M: Message> Clone for OnceSender<M> {
2750    fn clone(&self) -> Self {
2751        Self {
2752            sender: self.sender.clone(),
2753            port_id: self.port_id.clone(),
2754        }
2755    }
2756}
2757
2758impl<M: RemoteMessage> SerializedSender for OnceSender<M> {
2759    fn as_any(&self) -> &dyn Any {
2760        self
2761    }
2762
2763    fn send_serialized(
2764        &self,
2765        headers: Flattrs,
2766        serialized: wirevalue::Any,
2767    ) -> Result<SerializedSendDisposition, SerializedSendFailure> {
2768        match serialized.deserialized() {
2769            Ok(message) => self
2770                .send_once(message)
2771                .map_err(|_| SerializedSendFailure::Dead {
2772                    data: serialized,
2773                    headers,
2774                }),
2775            Err(err) => Err(SerializedSendFailure::Error(SerializedSendError {
2776                data: serialized,
2777                error: MailboxSenderError::new_bound(
2778                    self.port_id.clone(),
2779                    MailboxSenderErrorKind::Deserialize(M::typename(), err.into()),
2780                ),
2781                headers,
2782            })),
2783        }
2784    }
2785}
2786
2787/// Use the provided function to send untyped messages (i.e. Any objects).
2788pub(crate) struct UntypedUnboundedSender {
2789    pub(crate) sender: Box<
2790        dyn Fn(Flattrs, wirevalue::Any) -> Result<SerializedSendDisposition, SerializedSendFailure>
2791            + Send
2792            + Sync,
2793    >,
2794}
2795
2796impl SerializedSender for UntypedUnboundedSender {
2797    fn as_any(&self) -> &dyn Any {
2798        self
2799    }
2800
2801    fn send_serialized(
2802        &self,
2803        headers: Flattrs,
2804        serialized: wirevalue::Any,
2805    ) -> Result<SerializedSendDisposition, SerializedSendFailure> {
2806        (self.sender)(headers, serialized)
2807    }
2808}
2809
2810/// State is the internal state of the mailbox.
2811struct State {
2812    /// The ID of the mailbox owner.
2813    actor_id: ActorAddr,
2814
2815    // insert if it's serializable; otherwise don't.
2816    /// The set of active ports in the mailbox. All currently
2817    /// allocated ports are
2818    ports: DashMap<u64, Arc<dyn SerializedSender>>,
2819
2820    /// The next port ID to allocate.
2821    next_port: AtomicU64,
2822
2823    /// If a value is present, the mailbox has been closed with the provided
2824    /// status, and any subsequent `Mailbox::post_unchecked` calls will fail.
2825    closed: RwLock<Option<ActorStatus>>,
2826
2827    /// Gate that closes and drains runtime-dispatched handler ingress.
2828    handler_ingress: Arc<HandlerIngressGate>,
2829}
2830
2831impl State {
2832    /// Create a new state with the provided owning ActorAddr.
2833    fn new(actor_id: ActorAddr) -> Self {
2834        Self {
2835            actor_id,
2836            ports: DashMap::new(),
2837            // The first 1024 ports are allocated to actor handlers.
2838            // Other port IDs are ephemeral.
2839            next_port: AtomicU64::new(USER_PORT_OFFSET),
2840            closed: RwLock::new(None),
2841            handler_ingress: Arc::new(HandlerIngressGate::new()),
2842        }
2843    }
2844
2845    /// Allocate a fresh port.
2846    fn allocate_port(&self) -> u64 {
2847        self.next_port.fetch_add(1, Ordering::SeqCst)
2848    }
2849}
2850
2851impl fmt::Debug for State {
2852    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
2853        f.debug_struct("State")
2854            .field("actor_id", &self.actor_id)
2855            .field(
2856                "open_ports",
2857                &self.ports.iter().map(|e| *e.key()).collect::<Vec<_>>(),
2858            )
2859            .field("next_port", &self.next_port)
2860            .finish()
2861    }
2862}
2863
2864// TODO: mux based on some parameterized type. (mux key).
2865/// An in-memory mailbox muxer. This is used to route messages to
2866/// different underlying senders.
2867#[derive(Clone)]
2868pub struct MailboxMuxer {
2869    mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
2870}
2871
2872impl Default for MailboxMuxer {
2873    fn default() -> Self {
2874        Self::new()
2875    }
2876}
2877
2878impl MailboxMuxer {
2879    /// Create a new, empty, muxer.
2880    pub fn new() -> Self {
2881        Self {
2882            mailboxes: Arc::new(DashMap::new()),
2883        }
2884    }
2885
2886    /// Route messages destined for the provided actor id to the provided
2887    /// sender. Returns false if there is already a sender associated
2888    /// with the actor. In this case, the sender is not replaced, and
2889    /// the caller must [`MailboxMuxer::unbind`] it first.
2890    pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
2891        match self.mailboxes.entry(actor_id) {
2892            Entry::Occupied(_) => false,
2893            Entry::Vacant(entry) => {
2894                entry.insert(Box::new(sender));
2895                true
2896            }
2897        }
2898    }
2899
2900    /// Convenience function to bind a mailbox.
2901    pub fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
2902        self.bind(mailbox.actor_addr().id().clone(), mailbox)
2903    }
2904
2905    /// Unbind the sender associated with the provided actor ID. After
2906    /// unbinding, the muxer will no longer be able to send messages to
2907    /// that actor.
2908    #[allow(dead_code)]
2909    pub(crate) fn unbind(&self, actor_id: &ActorId) {
2910        self.mailboxes.remove(actor_id);
2911    }
2912}
2913
2914#[async_trait]
2915impl MailboxSender for MailboxMuxer {
2916    fn post_unchecked(
2917        &self,
2918        envelope: MessageEnvelope,
2919        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
2920    ) {
2921        let dest_actor_ref = envelope.dest().actor_addr();
2922        match self.mailboxes.get(dest_actor_ref.id()) {
2923            None => {
2924                let err = format!(
2925                    "no mailbox for actor {} registered in muxer",
2926                    dest_actor_ref
2927                );
2928                envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
2929            }
2930            Some(sender) => sender.post(envelope, return_handle),
2931        }
2932    }
2933
2934    async fn flush(&self) -> Result<(), anyhow::Error> {
2935        let keys: Vec<_> = self
2936            .mailboxes
2937            .iter()
2938            .map(|entry| entry.key().clone())
2939            .collect();
2940        for key in keys {
2941            if let Some(sender) = self.mailboxes.get(&key) {
2942                sender.value().flush().await?;
2943            }
2944        }
2945        Ok(())
2946    }
2947}
2948
2949/// MailboxRouter routes messages to the sender that is bound to its
2950/// nearest prefix.
2951#[derive(Clone)]
2952pub struct MailboxRouter {
2953    entries: Arc<RwLock<BTreeMap<Addr, Arc<dyn MailboxSender + Send + Sync>>>>,
2954}
2955
2956impl Default for MailboxRouter {
2957    fn default() -> Self {
2958        Self::new()
2959    }
2960}
2961
2962impl MailboxRouter {
2963    /// Create a new, empty router.
2964    pub fn new() -> Self {
2965        Self {
2966            entries: Arc::new(RwLock::new(BTreeMap::new())),
2967        }
2968    }
2969
2970    /// Downgrade this router to a [`WeakMailboxRouter`].
2971    pub fn downgrade(&self) -> WeakMailboxRouter {
2972        WeakMailboxRouter(Arc::downgrade(&self.entries))
2973    }
2974
2975    /// Returns a boxed sender that first attempts to find a route in
2976    /// this router's table; otherwise posts the message to the provided
2977    /// fallback sender.
2978    pub fn fallback(&self, default: BoxedMailboxSender) -> BoxedMailboxSender {
2979        FallbackMailboxRouter {
2980            router: self.clone(),
2981            default,
2982        }
2983        .into_boxed()
2984    }
2985
2986    /// Bind the provided sender to the given reference. The destination
2987    /// is treated as a prefix to which messages can be routed, and
2988    /// messages are routed to their longest matching prefix.
2989    pub fn bind(&self, dest: impl Into<Addr>, sender: impl MailboxSender + 'static) {
2990        let dest = dest.into();
2991        let mut w = self.entries.write().unwrap();
2992        w.insert(dest, Arc::new(sender));
2993    }
2994
2995    /// Remove the binding for the given reference. Only the exact
2996    /// point is removed; other bindings under the same prefix are
2997    /// unaffected.
2998    pub fn unbind(&self, dest: &Addr) {
2999        let mut w = self.entries.write().unwrap();
3000        w.remove(dest);
3001    }
3002
3003    fn sender(&self, actor_ref: &ActorAddr) -> Option<Arc<dyn MailboxSender + Send + Sync>> {
3004        let reference = Addr::from(actor_ref.clone());
3005        match self
3006            .entries
3007            .read()
3008            .unwrap()
3009            .lower_bound(Excluded(&reference))
3010            .prev()
3011        {
3012            None => None,
3013            Some((key, sender)) if key.is_prefix_of(&reference) => Some(sender.clone()),
3014            Some(_) => None,
3015        }
3016    }
3017}
3018
3019#[async_trait]
3020impl MailboxSender for MailboxRouter {
3021    fn post_unchecked(
3022        &self,
3023        envelope: MessageEnvelope,
3024        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3025    ) {
3026        let dest_actor_ref = envelope.dest().actor_addr();
3027        match self.sender(&dest_actor_ref) {
3028            None => envelope.undeliverable(
3029                DeliveryError::Unroutable(
3030                    "no destination found for actor in routing table".to_string(),
3031                ),
3032                return_handle,
3033            ),
3034            Some(sender) => sender.post(envelope, return_handle),
3035        }
3036    }
3037
3038    async fn flush(&self) -> Result<(), anyhow::Error> {
3039        let senders: Vec<_> = self.entries.read().unwrap().values().cloned().collect();
3040        let futs: Vec<_> = senders.iter().map(|s| s.flush()).collect();
3041        futures::future::try_join_all(futs).await?;
3042        Ok(())
3043    }
3044}
3045
3046/// A router that first checks a [`MailboxRouter`] for a matching
3047/// prefix route, falling back to a default sender when none is found.
3048#[derive(Clone)]
3049pub struct FallbackMailboxRouter {
3050    router: MailboxRouter,
3051    default: BoxedMailboxSender,
3052}
3053
3054impl FallbackMailboxRouter {
3055    /// The fallback sender used when the router has no match.
3056    pub fn default_sender(&self) -> &BoxedMailboxSender {
3057        &self.default
3058    }
3059}
3060
3061#[async_trait]
3062impl MailboxSender for FallbackMailboxRouter {
3063    fn post_unchecked(
3064        &self,
3065        envelope: MessageEnvelope,
3066        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3067    ) {
3068        let dest_actor_ref = envelope.dest().actor_addr();
3069        match self.router.sender(&dest_actor_ref) {
3070            Some(sender) => sender.post(envelope, return_handle),
3071            None => self.default.post(envelope, return_handle),
3072        }
3073    }
3074
3075    async fn flush(&self) -> Result<(), anyhow::Error> {
3076        let (r1, r2) = futures::future::join(self.router.flush(), self.default.flush()).await;
3077        r1?;
3078        r2?;
3079        Ok(())
3080    }
3081}
3082
3083/// A version of [`MailboxRouter`] that holds a weak reference to the underlying
3084/// state. This allows router references to be circular: an entity holding a reference
3085/// to the router may also contain the router itself.
3086///
3087/// TODO: this currently holds a weak reference to the entire router. This helps
3088/// prevent cycle leaks, but can cause excess memory usage as the cycle is at
3089/// the granularity of each entry. Possibly the router should allow weak references
3090/// on a per-entry basis.
3091#[derive(Debug, Clone)]
3092pub struct WeakMailboxRouter(Weak<RwLock<BTreeMap<Addr, Arc<dyn MailboxSender + Send + Sync>>>>);
3093
3094impl WeakMailboxRouter {
3095    /// Upgrade the weak router to a strong reference router.
3096    pub fn upgrade(&self) -> Option<MailboxRouter> {
3097        self.0.upgrade().map(|entries| MailboxRouter { entries })
3098    }
3099}
3100
3101#[async_trait]
3102impl MailboxSender for WeakMailboxRouter {
3103    fn post_unchecked(
3104        &self,
3105        envelope: MessageEnvelope,
3106        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3107    ) {
3108        match self.upgrade() {
3109            Some(router) => router.post(envelope, return_handle),
3110            None => envelope.undeliverable(
3111                DeliveryError::BrokenLink("failed to upgrade WeakMailboxRouter".to_string()),
3112                return_handle,
3113            ),
3114        }
3115    }
3116
3117    async fn flush(&self) -> Result<(), anyhow::Error> {
3118        match self.upgrade() {
3119            Some(router) => router.flush().await,
3120            None => Ok(()),
3121        }
3122    }
3123}
3124
3125/// A dynamic mailbox router that supports remote delivery.
3126///
3127/// `DialMailboxRouter` maintains a runtime address book mapping
3128/// references to `ChannelAddr`s. It holds a cache of active
3129/// connections and forwards messages to the appropriate
3130/// `MailboxClient`.
3131///
3132/// If a message destination is not bound, but is a "direct mode" address
3133/// (i.e., its proc id contains the channel address through which the proc
3134/// is reachable), then DialMailboxRouter dials the proc directly.
3135///
3136/// Messages sent to unknown destinations are routed to the `default`
3137/// sender, if present.
3138#[derive(Clone)]
3139pub struct DialMailboxRouter {
3140    address_book: Arc<RwLock<BTreeMap<Addr, ChannelAddr>>>,
3141    sender_cache: Arc<DashMap<ChannelAddr, Arc<MailboxClient>>>,
3142
3143    // The default sender, to which messages for unknown recipients
3144    // are sent. (This is like a default route in a routing table.)
3145    default: BoxedMailboxSender,
3146
3147    // When true, only dial direct-addressed procs if their transport
3148    // type is remote. Otherwise, fall back to the default sender.
3149    direct_addressed_remote_only: bool,
3150}
3151
3152impl Default for DialMailboxRouter {
3153    fn default() -> Self {
3154        Self::new()
3155    }
3156}
3157
3158impl DialMailboxRouter {
3159    /// Create a new [`DialMailboxRouter`] with an empty routing table.
3160    pub fn new() -> Self {
3161        Self::new_with_default(BoxedMailboxSender::new(UnroutableMailboxSender))
3162    }
3163
3164    /// Create a new [`DialMailboxRouter`] with an empty routing table,
3165    /// and a default sender. Any message with an unknown destination is
3166    /// dispatched on this default sender, unless the destination is
3167    /// direct-addressed, in which case it is dialed directly.
3168    pub fn new_with_default(default: BoxedMailboxSender) -> Self {
3169        Self {
3170            address_book: Arc::new(RwLock::new(BTreeMap::new())),
3171            sender_cache: Arc::new(DashMap::new()),
3172            default,
3173            direct_addressed_remote_only: false,
3174        }
3175    }
3176
3177    /// Create a new [`DialMailboxRouter`] with an empty routing table,
3178    /// and a default sender. Any message with an unknown destination is
3179    /// dispatched on this default sender, unless the destination is
3180    /// direct-addressed *and* has a remote channel transport type.
3181    pub fn new_with_default_direct_addressed_remote_only(default: BoxedMailboxSender) -> Self {
3182        Self {
3183            address_book: Arc::new(RwLock::new(BTreeMap::new())),
3184            sender_cache: Arc::new(DashMap::new()),
3185            default,
3186            direct_addressed_remote_only: true,
3187        }
3188    }
3189
3190    /// Binds a [`Addr`] to a [`ChannelAddr`], replacing any
3191    /// existing binding.
3192    ///
3193    /// If the address changes, the old sender is evicted from the
3194    /// cache to ensure fresh routing on next use.
3195    pub fn bind(&self, dest: impl Into<Addr>, addr: ChannelAddr) {
3196        let dest = dest.into();
3197        if let Ok(mut w) = self.address_book.write() {
3198            if let Some(old_addr) = w.insert(dest.clone(), addr.clone())
3199                && old_addr != addr
3200            {
3201                tracing::info!("rebinding {:?} from {:?} to {:?}", dest, old_addr, addr);
3202                self.sender_cache.remove(&old_addr);
3203            }
3204        } else {
3205            tracing::error!("address book poisoned during bind of {:?}", dest);
3206        }
3207    }
3208
3209    /// Removes all address mappings with the given prefix from the
3210    /// router.
3211    ///
3212    /// Also evicts any corresponding cached senders to prevent reuse
3213    /// of stale connections.
3214    pub fn unbind(&self, dest: &Addr) {
3215        if let Ok(mut w) = self.address_book.write() {
3216            let to_remove: Vec<(Addr, ChannelAddr)> = w
3217                .range(dest..)
3218                .take_while(|(key, _)| dest.is_prefix_of(key))
3219                .map(|(key, addr)| (key.clone(), addr.clone()))
3220                .collect();
3221
3222            for (key, addr) in to_remove {
3223                tracing::info!("unbinding {:?} from {:?}", key, addr);
3224                w.remove(&key);
3225                self.sender_cache.remove(&addr);
3226            }
3227        } else {
3228            tracing::error!("address book poisoned during unbind of {:?}", dest);
3229        }
3230    }
3231
3232    /// Lookup an actor's channel in the router's address bok.
3233    pub fn lookup_addr(&self, actor_ref: &ActorAddr) -> Option<ChannelAddr> {
3234        let address_book = self.address_book.read().unwrap();
3235        let reference = Addr::from(actor_ref.clone());
3236        let found = address_book.lower_bound(Excluded(&reference)).prev();
3237
3238        // First try to look up the address in our address book; failing that,
3239        // extract the address from the ProcAddr (all procs are direct-addressed now).
3240        if let Some((key, addr)) = found
3241            && key.is_prefix_of(&reference)
3242        {
3243            Some(addr.clone())
3244        } else {
3245            let addr = actor_ref.addr().clone();
3246            if self.direct_addressed_remote_only {
3247                addr.transport().is_remote().then_some(addr)
3248            } else {
3249                Some(addr)
3250            }
3251        }
3252    }
3253
3254    /// Return all covering prefixes of this router. That is, all references that are not
3255    /// prefixed by another reference in the routing table
3256    pub fn prefixes(&self) -> BTreeSet<Addr> {
3257        let addrs = self.address_book.read().unwrap();
3258        let mut prefixes: BTreeSet<Addr> = BTreeSet::new();
3259        for (reference, _) in addrs.iter() {
3260            match prefixes.lower_bound(Excluded(reference)).peek_prev() {
3261                Some(candidate) if candidate.is_prefix_of(reference) => (),
3262                _ => {
3263                    prefixes.insert(reference.clone());
3264                }
3265            }
3266        }
3267
3268        prefixes
3269    }
3270
3271    fn dial(
3272        &self,
3273        addr: &ChannelAddr,
3274        actor_ref: &ActorAddr,
3275    ) -> Result<Arc<MailboxClient>, MailboxSenderError> {
3276        // Get the sender. Create it if needed. Do not send the
3277        // messages inside this block so we do not hold onto the
3278        // reference of the dashmap entries.
3279        match self.sender_cache.entry(addr.clone()) {
3280            Entry::Occupied(entry) => Ok(entry.get().clone()),
3281            Entry::Vacant(entry) => {
3282                let tx = channel::dial(addr.clone()).map_err(|err| {
3283                    MailboxSenderError::new_unbound_type(
3284                        actor_ref.clone(),
3285                        MailboxSenderErrorKind::Channel(err),
3286                        "unknown",
3287                    )
3288                })?;
3289                let sender = MailboxClient::new(tx);
3290                Ok(entry.insert(Arc::new(sender)).value().clone())
3291            }
3292        }
3293    }
3294}
3295
3296#[async_trait]
3297impl MailboxSender for DialMailboxRouter {
3298    fn post_unchecked(
3299        &self,
3300        envelope: MessageEnvelope,
3301        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3302    ) {
3303        let dest_actor_ref = envelope.dest().actor_addr();
3304        let Some(addr) = self.lookup_addr(&dest_actor_ref) else {
3305            self.default.post(envelope, return_handle);
3306            return;
3307        };
3308
3309        match self.dial(&addr, &dest_actor_ref) {
3310            Err(err) => envelope.undeliverable(
3311                DeliveryError::Unroutable(format!("cannot dial destination: {err}")),
3312                return_handle,
3313            ),
3314            Ok(sender) => sender.post(envelope, return_handle),
3315        }
3316    }
3317
3318    async fn flush(&self) -> Result<(), anyhow::Error> {
3319        let senders: Vec<_> = self
3320            .sender_cache
3321            .iter()
3322            .map(|entry| entry.value().clone())
3323            .collect();
3324        let mut futs: Vec<_> = senders.iter().map(|s| s.flush()).collect();
3325        futs.push(self.default.flush());
3326        futures::future::try_join_all(futs).await?;
3327        Ok(())
3328    }
3329}
3330
3331/// A MailboxSender that reports any envelope as undeliverable due to
3332/// routing failure.
3333#[derive(Debug)]
3334pub struct UnroutableMailboxSender;
3335
3336#[async_trait]
3337impl MailboxSender for UnroutableMailboxSender {
3338    fn post_unchecked(
3339        &self,
3340        envelope: MessageEnvelope,
3341        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
3342    ) {
3343        envelope.undeliverable(
3344            DeliveryError::Unroutable("destination not found in routing table".to_string()),
3345            return_handle,
3346        );
3347    }
3348}
3349
3350#[cfg(test)]
3351mod tests {
3352
3353    use std::assert_matches;
3354    use std::mem::drop;
3355    use std::sync::atomic::AtomicUsize;
3356    use std::time::Duration;
3357
3358    use timed_test::async_timed_test;
3359
3360    use super::*;
3361    use crate::Actor;
3362    use crate::ActorHandle;
3363    use crate::Instance;
3364    use crate::accum;
3365    use crate::accum::ReducerMode;
3366    use crate::channel::ChannelTransport;
3367    use crate::context::Mailbox as MailboxContext;
3368    use crate::endpoint::Endpoint as _;
3369    use crate::proc::Proc;
3370    use crate::testing::ids::test_actor_id;
3371    use crate::testing::ids::test_port_id;
3372    use crate::testing::ids::test_proc_id;
3373
3374    fn test_proc_ref(name: &str) -> Addr {
3375        Addr::Proc(test_proc_id(name))
3376    }
3377
3378    fn test_actor_ref(proc_name: &str, actor_name: &str) -> Addr {
3379        Addr::Actor(test_actor_id(proc_name, actor_name))
3380    }
3381
3382    #[test]
3383    fn test_error() {
3384        use crate::testing::ids::test_actor_id;
3385        let err = MailboxError::new(
3386            test_actor_id("myworld_2", "myactor"),
3387            MailboxErrorKind::Closed,
3388        );
3389        // ActorAddr display is now "actor_uid.proc_uid@location"
3390        let err_str = format!("{err}");
3391        assert!(
3392            err_str.contains("mailbox closed"),
3393            "expected error: {}",
3394            err_str
3395        );
3396        assert!(
3397            err_str.contains("@"),
3398            "expected ref-style location separator in {err_str}"
3399        );
3400    }
3401
3402    #[tokio::test]
3403    async fn test_mailbox_basic() {
3404        let mbox = Mailbox::new(test_actor_id("0", "test"));
3405        let (port, mut receiver) = mbox.open_port::<u64>();
3406        let port = port.bind();
3407
3408        mbox.serialize_and_send(&port, 123, monitored_return_handle())
3409            .unwrap();
3410        mbox.serialize_and_send(&port, 321, monitored_return_handle())
3411            .unwrap();
3412        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3413        assert_eq!(receiver.recv().await.unwrap(), 321u64);
3414
3415        let serialized = wirevalue::Any::serialize(&999u64).unwrap();
3416        mbox.post(
3417            MessageEnvelope::new_unknown(port.port_addr().clone(), serialized),
3418            monitored_return_handle(),
3419        );
3420        assert_eq!(receiver.recv().await.unwrap(), 999u64);
3421    }
3422
3423    #[tokio::test]
3424    async fn test_mailbox_rejects_messages_for_other_actors() {
3425        let mbox = Mailbox::new(test_actor_id("0", "owner"));
3426        let dest = test_actor_id("0", "other").port_addr(Port::from(1234));
3427        let envelope =
3428            MessageEnvelope::serialize(mbox.actor_addr().clone(), dest, &42u64, Flattrs::new())
3429                .expect("serialize");
3430        let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
3431
3432        mbox.post(envelope, return_handle);
3433
3434        let Undeliverable::Message(undelivered) =
3435            tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
3436                .await
3437                .expect("timed out waiting for undeliverable")
3438                .expect("return port closed")
3439        else {
3440            panic!("expected returned message");
3441        };
3442        assert!(
3443            undelivered
3444                .error_msg()
3445                .expect("expected error")
3446                .contains("cannot deliver to")
3447        );
3448    }
3449
3450    #[tokio::test]
3451    async fn test_mailbox_accum() {
3452        let proc = Proc::isolated();
3453        let (client, _) = proc.client("client").unwrap();
3454        let (port, mut receiver) = client
3455            .mailbox()
3456            .open_accum_port(accum::join_semilattice::<accum::Max<i64>>());
3457
3458        for i in -3..4 {
3459            port.post(&client, accum::Max(i));
3460            let received: accum::Max<i64> = receiver.recv().await.unwrap();
3461            let msg = received.get();
3462            assert_eq!(msg, &i);
3463        }
3464        // Send a smaller or same value. Should still receive the previous max.
3465        for i in -3..4 {
3466            port.post(&client, accum::Max(i));
3467            assert_eq!(receiver.recv().await.unwrap().get(), &3);
3468        }
3469        // send a larger value. Should receive the new max.
3470        port.post(&client, accum::Max(4));
3471        assert_eq!(receiver.recv().await.unwrap().get(), &4);
3472
3473        // Send multiple updates. Should only receive the final change.
3474        for i in 5..10 {
3475            port.post(&client, accum::Max(i));
3476        }
3477        assert_eq!(receiver.recv().await.unwrap().get(), &9);
3478        port.post(&client, accum::Max(1));
3479        port.post(&client, accum::Max(3));
3480        port.post(&client, accum::Max(2));
3481        assert_eq!(receiver.recv().await.unwrap().get(), &9);
3482    }
3483
3484    #[test]
3485    fn test_port_and_reducer() {
3486        let mbox = Mailbox::new(test_actor_id("0", "test"));
3487        // accum port could have reducer typehash
3488        {
3489            let accumulator = accum::join_semilattice::<accum::Max<u64>>();
3490            let reducer_spec = accumulator.reducer_spec().unwrap();
3491            let (port, _) = mbox.open_accum_port(accum::join_semilattice::<accum::Max<u64>>());
3492            assert_eq!(port.inner.reducer_spec, Some(reducer_spec.clone()));
3493            let port_ref = port.bind();
3494            assert_eq!(port_ref.reducer_spec(), &Some(reducer_spec));
3495        }
3496        // normal port should not have reducer typehash
3497        {
3498            let (port, _) = mbox.open_port::<u64>();
3499            assert_eq!(port.inner.reducer_spec, None);
3500            let port_ref = port.bind();
3501            assert_eq!(port_ref.reducer_spec(), &None);
3502        }
3503    }
3504
3505    #[tokio::test]
3506    #[ignore] // error behavior changed, but we will bring it back
3507    async fn test_mailbox_once() {
3508        let proc = Proc::isolated();
3509        let (client, _) = proc.client("client").unwrap();
3510
3511        let (port, receiver) = client.open_once_port::<u64>();
3512
3513        // let port_id = port.port_addr().clone();
3514
3515        port.post(&client, 123u64);
3516        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3517
3518        // // The borrow checker won't let us send again on the port
3519        // // (good!), but we stashed the port-id and so we can try on the
3520        // // serialized interface.
3521        // let Err(err) = mbox
3522        //     .send_serialized(&port_id, &wirevalue::Any(Vec::new()))
3523        //     .await
3524        // else {
3525        //     unreachable!()
3526        // };
3527        // assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
3528    }
3529
3530    #[tokio::test]
3531    #[ignore] // changed error behavior
3532    async fn test_mailbox_receiver_drop() {
3533        let mbox = Mailbox::new(test_actor_id("0", "test"));
3534        let (port, mut receiver) = mbox.open_port::<u64>();
3535        // Make sure we go through "remote" path.
3536        let port = port.bind();
3537        mbox.serialize_and_send(&port, 123u64, monitored_return_handle())
3538            .unwrap();
3539        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3540        drop(receiver);
3541        let Err(err) = mbox.serialize_and_send(&port, 123u64, monitored_return_handle()) else {
3542            panic!();
3543        };
3544
3545        assert_matches!(err.kind(), MailboxSenderErrorKind::Closed);
3546        assert_matches!(err.location(), PortLocation::Bound(bound) if *bound == *port.port_addr());
3547    }
3548
3549    #[tokio::test]
3550    async fn test_mailbox_type_mismatch_does_not_evict_unbounded_port() {
3551        let mbox = Mailbox::new(test_actor_id("0", "test"));
3552        let (port, mut receiver) = mbox.open_port::<u64>();
3553        let port = port.bind();
3554        let port_index = port.port_addr().index();
3555        let (return_handle, mut return_receiver) =
3556            crate::mailbox::undeliverable::new_undeliverable_port();
3557
3558        let wrong_message = wirevalue::Any::serialize(&TestMessage).unwrap();
3559        mbox.post(
3560            MessageEnvelope::new_unknown(port.port_addr().clone(), wrong_message),
3561            return_handle.clone(),
3562        );
3563
3564        let envelope = tokio::time::timeout(Duration::from_secs(1), return_receiver.recv())
3565            .await
3566            .expect("undeliverable mismatch should arrive")
3567            .unwrap()
3568            .into_message()
3569            .expect("expected returned envelope");
3570        assert!(
3571            envelope
3572                .error_msg()
3573                .is_some_and(|message| message.contains("deserialization error")),
3574            "expected deserialization error in {envelope}",
3575        );
3576        assert!(
3577            mbox.inner.ports.contains_key(&port_index),
3578            "deserialization mismatch should not evict reusable port",
3579        );
3580
3581        mbox.serialize_and_send(&port, 123u64, return_handle)
3582            .unwrap();
3583        assert_eq!(
3584            tokio::time::timeout(Duration::from_secs(1), receiver.recv())
3585                .await
3586                .expect("valid message should still be delivered")
3587                .unwrap(),
3588            123u64
3589        );
3590    }
3591
3592    #[tokio::test]
3593    async fn test_mailbox_closed_unbounded_port_is_removed_after_send_failure() {
3594        let mbox = Mailbox::new(test_actor_id("0", "test"));
3595        let port_index = mbox.allocate_port();
3596        let port_id = mbox.actor_addr().port_addr(Port::from(port_index));
3597        let port = crate::PortRef::attest(port_id.clone());
3598        let (return_handle, mut return_receiver) =
3599            crate::mailbox::undeliverable::new_undeliverable_port();
3600        let (sender, receiver) = mpsc::unbounded_channel::<u64>();
3601
3602        drop(receiver);
3603
3604        mbox.inner.ports.insert(
3605            port_index,
3606            Arc::new(UnboundedSender::new(
3607                UnboundedPortSender::Mpsc(sender),
3608                port_id,
3609            )),
3610        );
3611
3612        mbox.serialize_and_send(&port, 123u64, return_handle.clone())
3613            .unwrap();
3614
3615        let envelope = tokio::time::timeout(Duration::from_secs(1), return_receiver.recv())
3616            .await
3617            .expect("closed port should produce undeliverable")
3618            .unwrap()
3619            .into_message()
3620            .expect("expected returned envelope");
3621        let first_error = envelope.error_msg().expect("expected delivery error");
3622        assert!(
3623            first_error.contains("port not bound in mailbox"),
3624            "expected unbound-port error in {envelope}",
3625        );
3626        assert!(
3627            !mbox.inner.ports.contains_key(&port_index),
3628            "dead reusable port should be removed after send failure",
3629        );
3630
3631        mbox.serialize_and_send(&port, 456u64, return_handle)
3632            .unwrap();
3633        let envelope = tokio::time::timeout(Duration::from_secs(1), return_receiver.recv())
3634            .await
3635            .expect("removed port should produce unbound undeliverable")
3636            .unwrap()
3637            .into_message()
3638            .expect("expected returned envelope");
3639        let second_error = envelope.error_msg().expect("expected delivery error");
3640        assert_eq!(
3641            first_error, second_error,
3642            "dead-port undeliverable should match unbound-port undeliverable exactly",
3643        );
3644    }
3645
3646    #[tokio::test]
3647    async fn test_mailbox_once_type_mismatch_preserves_sender_until_delivery() {
3648        let mbox = Mailbox::new(test_actor_id("0", "test"));
3649        let (port, receiver) = mbox.open_once_port::<u64>();
3650        let port = port.bind();
3651        let port_index = port.port_addr().index();
3652        let (return_handle, mut return_receiver) =
3653            crate::mailbox::undeliverable::new_undeliverable_port();
3654
3655        let wrong_message = wirevalue::Any::serialize(&TestMessage).unwrap();
3656        mbox.post(
3657            MessageEnvelope::new_unknown(port.port_addr().clone(), wrong_message),
3658            return_handle.clone(),
3659        );
3660
3661        let envelope = tokio::time::timeout(Duration::from_secs(1), return_receiver.recv())
3662            .await
3663            .expect("once-port mismatch should arrive")
3664            .unwrap()
3665            .into_message()
3666            .expect("expected returned envelope");
3667        assert!(
3668            envelope
3669                .error_msg()
3670                .is_some_and(|message| message.contains("deserialization error")),
3671            "expected deserialization error in {envelope}",
3672        );
3673        assert!(
3674            mbox.inner.ports.contains_key(&port_index),
3675            "once port should survive deserialization mismatch before delivery",
3676        );
3677
3678        mbox.serialize_and_send_once(port, 123u64, return_handle)
3679            .unwrap();
3680        assert_eq!(
3681            tokio::time::timeout(Duration::from_secs(1), receiver.recv())
3682                .await
3683                .expect("valid once message should still be delivered")
3684                .unwrap(),
3685            123u64
3686        );
3687        assert!(
3688            !mbox.inner.ports.contains_key(&port_index),
3689            "successful once send should remove the sender entry",
3690        );
3691    }
3692
3693    #[tokio::test]
3694    async fn test_drain() {
3695        let mbox = Mailbox::new(test_actor_id("0", "test"));
3696
3697        let (port, mut receiver) = mbox.open_port();
3698        let port = port.bind();
3699
3700        for i in 0..10 {
3701            mbox.serialize_and_send(&port, i, monitored_return_handle())
3702                .unwrap();
3703        }
3704
3705        for i in 0..10 {
3706            assert_eq!(receiver.recv().await.unwrap(), i);
3707        }
3708
3709        assert!(receiver.drain().is_empty());
3710    }
3711
3712    #[tokio::test]
3713    async fn test_mailbox_muxer() {
3714        let muxer = MailboxMuxer::new();
3715
3716        let mbox0 = Mailbox::new(test_actor_id("0", "actor1"));
3717        let mbox1 = Mailbox::new(test_actor_id("0", "actor2"));
3718
3719        muxer.bind(mbox0.actor_addr().id().clone(), mbox0.clone());
3720        muxer.bind(mbox1.actor_addr().id().clone(), mbox1.clone());
3721
3722        let (port, receiver) = mbox0.open_once_port::<u64>();
3723
3724        let proc = Proc::configured(test_proc_id("0"), BoxedMailboxSender::new(muxer));
3725        let (client, _) = proc.client("client").unwrap();
3726
3727        port.post(&client, 123u64);
3728        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3729
3730        /*
3731        let (tx, rx) = channel::local::new::<u64>();
3732        let (port, _) = mbox0.open_port::<u64>();
3733        let handle = muxer.clone().serve_port(port, rx).unwrap();
3734        muxer.unbind(mbox0.actor_addr());
3735        tx.send(123u64).await.unwrap();
3736        let Ok(Err(err)) = handle.await else { panic!() };
3737        assert_eq!(err.actor_addr(), &actor_id(0));
3738        */
3739    }
3740
3741    #[tokio::test]
3742    async fn test_local_client_server() {
3743        let mbox = Mailbox::new(test_actor_id("0", "actor0"));
3744        let (tx, rx) = channel::local::new();
3745        let serve_handle = mbox.clone().serve(rx);
3746        let client = MailboxClient::new(tx);
3747
3748        let (port, receiver) = mbox.open_once_port::<u64>();
3749        let port = port.bind();
3750
3751        client
3752            .serialize_and_send_once(port, 123u64, monitored_return_handle())
3753            .unwrap();
3754        assert_eq!(receiver.recv().await.unwrap(), 123u64);
3755        serve_handle.stop("fromt test");
3756        serve_handle.await.unwrap().unwrap();
3757    }
3758
3759    #[tokio::test]
3760    async fn test_mailbox_router() {
3761        let mbox0 = Mailbox::new(test_actor_id("world0_0", "actor0"));
3762        let mbox1 = Mailbox::new(test_actor_id("world1_0", "actor0"));
3763        let mbox2 = Mailbox::new(test_actor_id("world1_1", "actor0"));
3764        let mbox3 = Mailbox::new(test_actor_id("world1_1", "actor1"));
3765
3766        let comms: Vec<(OncePortRef<u64>, OncePortReceiver<u64>)> =
3767            [&mbox0, &mbox1, &mbox2, &mbox3]
3768                .into_iter()
3769                .map(|mbox| {
3770                    let (port, receiver) = mbox.open_once_port::<u64>();
3771                    (port.bind(), receiver)
3772                })
3773                .collect();
3774
3775        let router = MailboxRouter::new();
3776
3777        router.bind(test_proc_ref("world0_0"), mbox0);
3778        router.bind(test_proc_ref("world1_0"), mbox1);
3779        router.bind(test_proc_ref("world1_1"), mbox2);
3780        router.bind(test_actor_ref("world1_1", "actor1"), mbox3);
3781
3782        for (i, (port, receiver)) in comms.into_iter().enumerate() {
3783            router
3784                .serialize_and_send_once(port, i as u64, monitored_return_handle())
3785                .unwrap();
3786            assert_eq!(receiver.recv().await.unwrap(), i as u64);
3787        }
3788
3789        // Test undeliverable messages, and that it is delivered with the appropriate fallback.
3790
3791        let mbox4 = Mailbox::new(test_actor_id("fallback_0", "actor"));
3792
3793        let (return_handle, mut return_receiver) =
3794            crate::mailbox::undeliverable::new_undeliverable_port();
3795        let (port, _receiver) = mbox4.open_once_port();
3796        router
3797            .serialize_and_send_once(port.bind(), 0, return_handle.clone())
3798            .unwrap();
3799        assert!(return_receiver.recv().await.is_ok());
3800
3801        let router = router.fallback(mbox4.clone().into_boxed());
3802        let (port, receiver) = mbox4.open_once_port();
3803        router
3804            .serialize_and_send_once(port.bind(), 0, return_handle)
3805            .unwrap();
3806        assert_eq!(receiver.recv().await.unwrap(), 0);
3807    }
3808
3809    #[tokio::test]
3810    async fn test_dial_mailbox_router() {
3811        let router = DialMailboxRouter::new();
3812
3813        router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
3814        router.bind(test_proc_ref("world1_0"), "unix!@2".parse().unwrap());
3815        router.bind(test_proc_ref("world1_1"), "unix!@3".parse().unwrap());
3816        router.bind(
3817            test_actor_ref("world1_1", "actor1"),
3818            "unix!@4".parse().unwrap(),
3819        );
3820        // Bind a direct address -- we should use its bound address!
3821        // The actor must be on unix:@4 so that after unbinding, the prefix
3822        // route for world1_1 (unix!@3) is the fallback, not world1_1/actor1 (unix!@4).
3823        let direct_actor_ref: ActorAddr =
3824            ProcAddr::singleton("unix:@4".parse().unwrap(), "my_proc").actor_addr("my_actor");
3825        router.bind(
3826            Addr::Actor(direct_actor_ref.clone()),
3827            "unix:@5".parse().unwrap(),
3828        );
3829
3830        // We should be able to lookup the ids
3831        router
3832            .lookup_addr(&test_actor_id("world0_0", "actor"))
3833            .unwrap();
3834        router
3835            .lookup_addr(&test_actor_id("world1_0", "actor"))
3836            .unwrap();
3837
3838        let actor_id = direct_actor_ref;
3839        assert_eq!(
3840            router.lookup_addr(&actor_id).unwrap(),
3841            "unix!@5".parse().unwrap(),
3842        );
3843        router.unbind(&actor_id.clone().into());
3844        assert_eq!(
3845            router.lookup_addr(&actor_id).unwrap(),
3846            "unix!@4".parse().unwrap(),
3847        );
3848
3849        // Unbind procs so lookups fall back to the proc's direct address
3850        // (all procs are direct-addressed now, so lookup_addr always returns
3851        // Some; we verify the bound address is gone by checking the returned
3852        // address is the local fallback, not the originally bound one).
3853        let fallback = ChannelAddr::any(ChannelTransport::Local);
3854        router.unbind(&test_proc_ref("world1_0"));
3855        router.unbind(&test_proc_ref("world1_1"));
3856        assert_eq!(
3857            router
3858                .lookup_addr(&test_actor_id("world1_0", "actor1"))
3859                .unwrap(),
3860            fallback,
3861        );
3862        assert_eq!(
3863            router
3864                .lookup_addr(&test_actor_id("world1_1", "actor1"))
3865                .unwrap(),
3866            fallback,
3867        );
3868        router
3869            .lookup_addr(&test_actor_id("world0_0", "actor"))
3870            .unwrap();
3871        router.unbind(&test_proc_ref("world0_0"));
3872        assert_eq!(
3873            router
3874                .lookup_addr(&test_actor_id("world0_0", "actor"))
3875                .unwrap(),
3876            fallback,
3877        );
3878    }
3879
3880    #[tokio::test]
3881    #[ignore] // TODO: there's a leak here, fix it
3882    async fn test_dial_mailbox_router_default() {
3883        let mbox0 = Mailbox::new(test_actor_id("world0_0", "actor0"));
3884        let mbox1 = Mailbox::new(test_actor_id("world1_0", "actor0"));
3885        let mbox2 = Mailbox::new(test_actor_id("world1_1", "actor0"));
3886        let mbox3 = Mailbox::new(test_actor_id("world1_1", "actor1"));
3887
3888        // We don't need to dial here, since we gain direct access to the
3889        // underlying routers.
3890        let root = MailboxRouter::new();
3891        let world0_router = DialMailboxRouter::new_with_default(root.boxed());
3892        let world1_router = DialMailboxRouter::new_with_default(root.boxed());
3893
3894        root.bind(test_proc_ref("world0"), world0_router.clone());
3895        root.bind(test_proc_ref("world1"), world1_router.clone());
3896
3897        let mailboxes = [&mbox0, &mbox1, &mbox2, &mbox3];
3898
3899        let mut handles = Vec::new(); // hold on to handles, or channels get closed
3900        for mbox in mailboxes.iter() {
3901            let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
3902            let handle = (*mbox).clone().serve(rx);
3903            handles.push(handle);
3904
3905            eprintln!("{}: {}", mbox.actor_addr(), addr);
3906            if mbox
3907                .actor_addr()
3908                .proc_addr()
3909                .label()
3910                .is_some_and(|l| l.as_str().starts_with("world0"))
3911            {
3912                world0_router.bind(Addr::from(mbox.actor_addr().clone()), addr);
3913            } else {
3914                world1_router.bind(Addr::from(mbox.actor_addr().clone()), addr);
3915            }
3916        }
3917
3918        // Make sure nodes are fully connected.
3919        for router in [root.boxed(), world0_router.boxed(), world1_router.boxed()] {
3920            for mbox in mailboxes.iter() {
3921                let (port, receiver) = mbox.open_once_port::<u64>();
3922                let port = port.bind();
3923                router
3924                    .serialize_and_send_once(port, 123u64, monitored_return_handle())
3925                    .unwrap();
3926                assert_eq!(receiver.recv().await.unwrap(), 123u64);
3927            }
3928        }
3929    }
3930
3931    #[tokio::test]
3932    async fn test_enqueue_port() {
3933        let proc = Proc::isolated();
3934        let (client, _) = proc.client("client").unwrap();
3935
3936        let count = Arc::new(AtomicUsize::new(0));
3937        let count_clone = count.clone();
3938        let port = client.mailbox().open_enqueue_port(move |_, n| {
3939            count_clone.fetch_add(n, Ordering::SeqCst);
3940            Ok(())
3941        });
3942
3943        port.post(&client, 10);
3944        port.post(&client, 5);
3945        port.post(&client, 1);
3946        port.post(&client, 0);
3947
3948        assert_eq!(count.load(Ordering::SeqCst), 16);
3949    }
3950
3951    #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3952    struct TestMessage;
3953
3954    #[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
3955    #[named(name = "some::custom::path")]
3956    struct TestMessage2;
3957
3958    #[test]
3959    fn test_remote_message_macros() {
3960        assert_eq!(
3961            TestMessage::typename(),
3962            "hyperactor::mailbox::tests::TestMessage"
3963        );
3964        assert_eq!(TestMessage2::typename(), "some::custom::path");
3965    }
3966
3967    #[test]
3968    fn test_message_envelope_display() {
3969        #[derive(typeuri::Named, Serialize, Deserialize)]
3970        struct MyTest {
3971            a: u64,
3972            b: String,
3973        }
3974        wirevalue::register_type!(MyTest);
3975
3976        let envelope = MessageEnvelope::serialize(
3977            test_actor_id("source_0", "actor"),
3978            test_port_id("dest_1", "actor", 123),
3979            &MyTest {
3980                a: 123,
3981                b: "hello".into(),
3982            },
3983            Flattrs::new(),
3984        )
3985        .unwrap();
3986
3987        // Note: display format changed from "source[0].actor" to direct format
3988        assert!(format!("{}", envelope).contains("MyTest{\"a\":123,\"b\":\"hello\"}"));
3989    }
3990
3991    #[derive(Debug, Default)]
3992    struct Foo;
3993
3994    impl Actor for Foo {}
3995
3996    // Test that a message delivery failure causes the sending actor
3997    // to stop running.
3998    #[tokio::test]
3999    async fn test_actor_delivery_failure() {
4000        // This test involves making an actor fail and so we must set
4001        // a supervision coordinator.
4002        use crate::actor::ActorStatus;
4003        use crate::testing::proc_supervison::ProcSupervisionCoordinator;
4004
4005        let proc_forwarder = BoxedMailboxSender::new(DialMailboxRouter::new_with_default(
4006            BoxedMailboxSender::new(PanickingMailboxSender),
4007        ));
4008        let proc_id = test_proc_id("quux_0");
4009        let mut proc = Proc::configured(proc_id.clone(), proc_forwarder);
4010        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
4011        let (client, _) = proc.client("client").unwrap();
4012
4013        let foo = proc.spawn("foo", Foo).unwrap();
4014        let return_handle = foo.port::<Undeliverable<MessageEnvelope>>();
4015        let message = MessageEnvelope::new(
4016            foo.actor_addr().clone(),
4017            test_port_id("corge_0", "bar", 9999),
4018            wirevalue::Any::serialize(&1u64).unwrap(),
4019            Flattrs::new(),
4020        );
4021        return_handle.post(&client, Undeliverable::Message(message));
4022
4023        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
4024
4025        let foo_status = foo.status();
4026        assert!(matches!(*foo_status.borrow(), ActorStatus::Failed(_)));
4027        let ActorStatus::Failed(ref msg) = *foo_status.borrow() else {
4028            unreachable!()
4029        };
4030        let msg_str = msg.to_string();
4031        // UE-3 (top-line shape): with no operation context, the top
4032        // line names the destination — `undeliverable message to
4033        // {dest}`. The retired neutral `undeliverable message error`
4034        // wording is no longer emitted.
4035        assert!(
4036            msg_str.contains("undeliverable message to"),
4037            "expected destination-named top line, got:\n{msg_str}"
4038        );
4039        assert!(
4040            !msg_str.contains("undeliverable message error"),
4041            "retired neutral fallback must not appear, got:\n{msg_str}"
4042        );
4043        assert!(msg_str.contains("sender:") && msg_str.contains("quux_0"));
4044        assert!(msg_str.contains("dest:") && msg_str.contains("corge_0"));
4045
4046        proc.destroy_and_wait(tokio::time::Duration::from_secs(1), "test cleanup")
4047            .await
4048            .unwrap();
4049    }
4050
4051    #[tokio::test]
4052    async fn test_detached_return_handle() {
4053        let (return_handle, mut return_receiver) =
4054            crate::mailbox::undeliverable::new_undeliverable_port();
4055        // Simulate an undelivered message return.
4056        let envelope = MessageEnvelope::new(
4057            test_actor_id("foo_0", "bar"),
4058            test_port_id("baz_0", "corge", 9999),
4059            wirevalue::Any::serialize(&1u64).unwrap(),
4060            Flattrs::new(),
4061        );
4062        let proc = Proc::isolated();
4063        let (client, _) = proc.client("client").unwrap();
4064        return_handle.post(&client, Undeliverable::Message(envelope.clone()));
4065        // Check we receive the undelivered message.
4066        assert!(
4067            tokio::time::timeout(tokio::time::Duration::from_secs(1), return_receiver.recv())
4068                .await
4069                .is_ok()
4070        );
4071        // Setup a monitor for the receiver and show that if there are
4072        // no outstanding return handles it terminates.
4073        let monitor_handle = tokio::spawn(async move {
4074            while let Ok(Undeliverable::Message(mut envelope)) = return_receiver.recv().await {
4075                envelope.set_error(DeliveryError::BrokenLink(
4076                    "returned in unit test".to_string(),
4077                ));
4078                UndeliverableMailboxSender
4079                    .post(envelope, /*unused */ monitored_return_handle());
4080            }
4081        });
4082        drop(return_handle);
4083        assert!(
4084            tokio::time::timeout(tokio::time::Duration::from_secs(1), monitor_handle)
4085                .await
4086                .is_ok()
4087        );
4088    }
4089
4090    async fn verify_receiver(coalesce: bool, drop_sender: bool) {
4091        fn create_receiver<M>(coalesce: bool) -> (mpsc::UnboundedSender<M>, PortReceiver<M>) {
4092            // Create dummy state and port_id to create PortReceiver. They are
4093            // not used in the test.
4094            let dummy_actor_ref: ActorAddr = test_actor_id("world_0", "actor");
4095            let dummy_state = State::new(dummy_actor_ref.clone());
4096            let dummy_port_id = dummy_actor_ref.port_addr(Port::from(0));
4097            let (sender, receiver) = mpsc::unbounded_channel::<M>();
4098            let receiver = PortReceiver {
4099                receiver,
4100                port_id: dummy_port_id,
4101                coalesce,
4102                mailbox: Mailbox {
4103                    inner: Arc::new(dummy_state),
4104                },
4105            };
4106            (sender, receiver)
4107        }
4108
4109        // verify fn drain
4110        {
4111            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
4112            assert!(receiver.drain().is_empty());
4113
4114            sender.send(0).unwrap();
4115            sender.send(1).unwrap();
4116            sender.send(2).unwrap();
4117            sender.send(3).unwrap();
4118            sender.send(4).unwrap();
4119            sender.send(5).unwrap();
4120            sender.send(6).unwrap();
4121            sender.send(7).unwrap();
4122
4123            if drop_sender {
4124                drop(sender);
4125            }
4126
4127            if !coalesce {
4128                assert_eq!(receiver.drain(), vec![0, 1, 2, 3, 4, 5, 6, 7]);
4129            } else {
4130                assert_eq!(receiver.drain(), vec![7]);
4131            }
4132
4133            assert!(receiver.drain().is_empty());
4134            assert!(receiver.drain().is_empty());
4135        }
4136
4137        // verify fn try_recv
4138        {
4139            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
4140            assert!(receiver.try_recv().unwrap().is_none());
4141
4142            sender.send(0).unwrap();
4143            sender.send(1).unwrap();
4144            sender.send(2).unwrap();
4145            sender.send(3).unwrap();
4146
4147            if drop_sender {
4148                drop(sender);
4149            }
4150
4151            if !coalesce {
4152                assert_eq!(receiver.try_recv().unwrap().unwrap(), 0);
4153                assert_eq!(receiver.try_recv().unwrap().unwrap(), 1);
4154                assert_eq!(receiver.try_recv().unwrap().unwrap(), 2);
4155            }
4156            assert_eq!(receiver.try_recv().unwrap().unwrap(), 3);
4157            if drop_sender {
4158                assert_matches!(
4159                    receiver.try_recv().unwrap_err().kind(),
4160                    MailboxErrorKind::Closed
4161                );
4162                // Still Closed error
4163                assert_matches!(
4164                    receiver.try_recv().unwrap_err().kind(),
4165                    MailboxErrorKind::Closed
4166                );
4167            } else {
4168                assert!(receiver.try_recv().unwrap().is_none());
4169                // Still empty
4170                assert!(receiver.try_recv().unwrap().is_none());
4171            }
4172        }
4173        // verify fn recv
4174        {
4175            let (sender, mut receiver) = create_receiver::<u64>(coalesce);
4176            assert!(
4177                tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
4178                    .await
4179                    .is_err()
4180            );
4181
4182            sender.send(4).unwrap();
4183            sender.send(5).unwrap();
4184            sender.send(6).unwrap();
4185            sender.send(7).unwrap();
4186
4187            if drop_sender {
4188                drop(sender);
4189            }
4190
4191            if !coalesce {
4192                assert_eq!(receiver.recv().await.unwrap(), 4);
4193                assert_eq!(receiver.recv().await.unwrap(), 5);
4194                assert_eq!(receiver.recv().await.unwrap(), 6);
4195            }
4196            assert_eq!(receiver.recv().await.unwrap(), 7);
4197            if drop_sender {
4198                assert_matches!(
4199                    receiver.recv().await.unwrap_err().kind(),
4200                    MailboxErrorKind::Closed
4201                );
4202                // Still None
4203                assert_matches!(
4204                    receiver.recv().await.unwrap_err().kind(),
4205                    MailboxErrorKind::Closed
4206                );
4207            } else {
4208                assert!(
4209                    tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv())
4210                        .await
4211                        .is_err()
4212                );
4213            }
4214        }
4215    }
4216
4217    #[tokio::test]
4218    async fn test_receiver_basic_default() {
4219        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ false).await
4220    }
4221
4222    #[tokio::test]
4223    async fn test_receiver_basic_latest() {
4224        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ false).await
4225    }
4226
4227    #[tokio::test]
4228    async fn test_receiver_after_sender_drop_default() {
4229        verify_receiver(/*coalesce=*/ false, /*drop_sender=*/ true).await
4230    }
4231
4232    #[tokio::test]
4233    async fn test_receiver_after_sender_drop_latest() {
4234        verify_receiver(/*coalesce=*/ true, /*drop_sender=*/ true).await
4235    }
4236
4237    struct Setup {
4238        receiver: PortReceiver<u64>,
4239        actor0: Instance<()>,
4240        actor1: Instance<()>,
4241        _actor0_handle: ActorHandle<()>,
4242        _actor1_handle: ActorHandle<()>,
4243        port_id: PortAddr,
4244        port_id1: PortAddr,
4245        port_id2: PortAddr,
4246        port_id2_1: PortAddr,
4247    }
4248
4249    async fn setup_split_port_ids(
4250        reducer_spec: Option<ReducerSpec>,
4251        reducer_mode: ReducerMode,
4252    ) -> Setup {
4253        let proc = Proc::isolated();
4254        let (actor0, actor0_handle) = proc.client("actor0").unwrap();
4255        let (actor1, actor1_handle) = proc.client("actor1").unwrap();
4256
4257        // Open a port on actor0
4258        let (port_handle, receiver) = actor0.open_port::<u64>();
4259        let port_id = port_handle.bind().port_addr().clone();
4260
4261        // Split it twice on actor1
4262        let port_id1 = port_id
4263            .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
4264            .unwrap();
4265        let port_id2 = port_id
4266            .split(&actor1, reducer_spec.clone(), reducer_mode.clone(), true)
4267            .unwrap();
4268
4269        // A split port id can also be split
4270        let port_id2_1 = port_id2
4271            .split(&actor1, reducer_spec, reducer_mode.clone(), true)
4272            .unwrap();
4273
4274        Setup {
4275            receiver,
4276            actor0,
4277            actor1,
4278            _actor0_handle: actor0_handle,
4279            _actor1_handle: actor1_handle,
4280            port_id,
4281            port_id1,
4282            port_id2,
4283            port_id2_1,
4284        }
4285    }
4286
4287    fn post(cx: &impl context::Actor, port_id: PortAddr, msg: u64) {
4288        let serialized = wirevalue::Any::serialize(&msg).unwrap();
4289        port_id.send(cx, serialized);
4290    }
4291
4292    #[async_timed_test(timeout_secs = 30)]
4293    // TODO: OSS: this test is flaky in OSS. Need to repo and fix it.
4294    #[cfg_attr(not(fbcode_build), ignore)]
4295    async fn test_split_port_id_no_reducer() {
4296        let Setup {
4297            mut receiver,
4298            actor0,
4299            actor1,
4300            port_id,
4301            port_id1,
4302            port_id2,
4303            port_id2_1,
4304            ..
4305        } = setup_split_port_ids(None, ReducerMode::default()).await;
4306        // Can send messages to receiver from all port handles
4307        post(&actor0, port_id.clone(), 1);
4308        assert_eq!(receiver.recv().await.unwrap(), 1);
4309        post(&actor1, port_id1.clone(), 2);
4310        assert_eq!(receiver.recv().await.unwrap(), 2);
4311        post(&actor1, port_id2.clone(), 3);
4312        assert_eq!(receiver.recv().await.unwrap(), 3);
4313        post(&actor1, port_id2_1.clone(), 4);
4314        assert_eq!(receiver.recv().await.unwrap(), 4);
4315
4316        // no more messages
4317        tokio::time::sleep(Duration::from_secs(2)).await;
4318        let msg = receiver.try_recv().unwrap();
4319        assert_eq!(msg, None);
4320    }
4321
4322    async fn wait_for(
4323        receiver: &mut PortReceiver<u64>,
4324        expected_size: usize,
4325        timeout_duration: Duration,
4326    ) -> anyhow::Result<Vec<u64>> {
4327        let mut messeges = vec![];
4328
4329        tokio::time::timeout(timeout_duration, async {
4330            loop {
4331                let msg = receiver.recv().await.unwrap();
4332                messeges.push(msg);
4333                if messeges.len() == expected_size {
4334                    break;
4335                }
4336            }
4337        })
4338        .await?;
4339        Ok(messeges)
4340    }
4341
4342    #[async_timed_test(timeout_secs = 30)]
4343    async fn test_split_port_id_sum_reducer() {
4344        let config = hyperactor_config::global::lock();
4345        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 1);
4346
4347        let sum_accumulator = accum::sum::<u64>();
4348        let reducer_spec = sum_accumulator.reducer_spec();
4349        let Setup {
4350            mut receiver,
4351            actor0,
4352            actor1,
4353            port_id,
4354            port_id1,
4355            port_id2,
4356            port_id2_1,
4357            ..
4358        } = setup_split_port_ids(reducer_spec, ReducerMode::default()).await;
4359        post(&actor0, port_id.clone(), 4);
4360        post(&actor1, port_id1.clone(), 2);
4361        post(&actor1, port_id2.clone(), 3);
4362        post(&actor1, port_id2_1.clone(), 1);
4363        let mut messages = wait_for(&mut receiver, 4, Duration::from_secs(2))
4364            .await
4365            .unwrap();
4366        // Message might be received out of their sending out. So we sort the
4367        // messages here.
4368        messages.sort();
4369        assert_eq!(messages, vec![1, 2, 3, 4]);
4370
4371        // no more messages
4372        tokio::time::sleep(Duration::from_secs(2)).await;
4373        let msg = receiver.try_recv().unwrap();
4374        assert_eq!(msg, None);
4375    }
4376
4377    #[async_timed_test(timeout_secs = 30)]
4378    // TODO: OSS: this test is flaky in OSS. Need to repo and fix it.
4379    #[cfg_attr(not(fbcode_build), ignore)]
4380    async fn test_split_port_id_every_n_messages() {
4381        let config = hyperactor_config::global::lock();
4382        let _config_guard =
4383            config.override_key(crate::config::SPLIT_MAX_BUFFER_AGE, Duration::from_mins(10));
4384        let proc = Proc::isolated();
4385        let (actor, _actor_handle) = proc.client("actor").unwrap();
4386        let (port_handle, mut receiver) = actor.open_port::<u64>();
4387        let port_id = port_handle.bind().port_addr().clone();
4388        // Split it
4389        let reducer_spec = accum::sum::<u64>().reducer_spec();
4390        let split_port_id = port_id
4391            .split(
4392                &actor,
4393                reducer_spec,
4394                ReducerMode::Streaming(accum::StreamingReducerOpts {
4395                    max_update_interval: Some(Duration::from_mins(10)),
4396                    initial_update_interval: Some(Duration::from_mins(10)),
4397                }),
4398                true,
4399            )
4400            .unwrap();
4401
4402        // Send 9 messages.
4403        for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
4404            post(&actor, split_port_id.clone(), msg);
4405        }
4406        // The first 5 should be batched and reduced once due
4407        // to every_n_msgs = 5.
4408        let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
4409            .await
4410            .unwrap();
4411        assert_eq!(messages, vec![15]);
4412
4413        // the last message unfortranately will never come because they do not
4414        // reach batch size.
4415        tokio::time::sleep(Duration::from_secs(2)).await;
4416        let msg = receiver.try_recv().unwrap();
4417        assert_eq!(msg, None);
4418    }
4419
4420    #[async_timed_test(timeout_secs = 30)]
4421    async fn test_split_port_timeout_flush() {
4422        let config = hyperactor_config::global::lock();
4423        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 100);
4424
4425        let Setup {
4426            mut receiver,
4427            actor0: _,
4428            actor1,
4429            port_id: _,
4430            port_id1,
4431            port_id2: _,
4432            port_id2_1: _,
4433            ..
4434        } = setup_split_port_ids(
4435            Some(accum::sum::<u64>().reducer_spec().unwrap()),
4436            ReducerMode::Streaming(accum::StreamingReducerOpts {
4437                max_update_interval: Some(Duration::from_millis(50)),
4438                initial_update_interval: Some(Duration::from_millis(50)),
4439            }),
4440        )
4441        .await;
4442
4443        post(&actor1, port_id1.clone(), 10);
4444        post(&actor1, port_id1.clone(), 20);
4445        post(&actor1, port_id1.clone(), 30);
4446
4447        // Messages should accumulate for 50ms.
4448        tokio::time::sleep(Duration::from_millis(10)).await;
4449        let msg = receiver.try_recv().unwrap();
4450        assert_eq!(msg, None);
4451
4452        // Wait until we are flushed.
4453        tokio::time::sleep(Duration::from_millis(100)).await;
4454
4455        // Now we are reduced and accumulated:
4456        let msg = receiver.recv().await.unwrap();
4457        assert_eq!(msg, 60); // 10 + 20 + 30
4458
4459        // No further messages:
4460        let msg = receiver.try_recv().unwrap();
4461        assert_eq!(msg, None);
4462    }
4463
4464    #[async_timed_test(timeout_secs = 30)]
4465    async fn test_split_port_timeout_and_size_flush() {
4466        let config = hyperactor_config::global::lock();
4467        let _config_guard = config.override_key(crate::config::SPLIT_MAX_BUFFER_SIZE, 3);
4468
4469        let Setup {
4470            mut receiver,
4471            actor0: _,
4472            actor1,
4473            port_id: _,
4474            port_id1,
4475            port_id2: _,
4476            port_id2_1: _,
4477            ..
4478        } = setup_split_port_ids(
4479            Some(accum::sum::<u64>().reducer_spec().unwrap()),
4480            ReducerMode::Streaming(accum::StreamingReducerOpts {
4481                max_update_interval: Some(Duration::from_millis(50)),
4482                initial_update_interval: Some(Duration::from_millis(50)),
4483            }),
4484        )
4485        .await;
4486
4487        post(&actor1, port_id1.clone(), 10);
4488        post(&actor1, port_id1.clone(), 20);
4489        post(&actor1, port_id1.clone(), 30);
4490        post(&actor1, port_id1.clone(), 40);
4491
4492        // Should have flushed at the third message.
4493        let msg = receiver.recv().await.unwrap();
4494        assert_eq!(msg, 60);
4495
4496        // After 50ms, the next reduce will flush:
4497        let msg = receiver.recv().await.unwrap();
4498        assert_eq!(msg, 40);
4499
4500        // No further messages
4501        let msg = receiver.try_recv().unwrap();
4502        assert_eq!(msg, None);
4503    }
4504
4505    #[async_timed_test(timeout_secs = 30)]
4506    async fn test_split_port_once_mode_basic() {
4507        let proc = Proc::isolated();
4508        let (actor, _actor_handle) = proc.client("actor").unwrap();
4509        let (port_handle, mut receiver) = actor.open_port::<u64>();
4510        let port_id = port_handle.bind().port_addr().clone();
4511
4512        // Split with Once(3) mode - accumulate 3 values then emit
4513        let reducer_spec = accum::sum::<u64>().reducer_spec();
4514        let split_port_id = port_id
4515            .split(&actor, reducer_spec, ReducerMode::Once(3), true)
4516            .unwrap();
4517
4518        // Send 3 messages
4519        post(&actor, split_port_id.clone(), 10);
4520        post(&actor, split_port_id.clone(), 20);
4521        post(&actor, split_port_id.clone(), 30);
4522
4523        // Should receive a single reduced message
4524        let msg = receiver.recv().await.unwrap();
4525        assert_eq!(msg, 60); // 10 + 20 + 30
4526
4527        // No further messages
4528        tokio::time::sleep(Duration::from_millis(100)).await;
4529        let msg = receiver.try_recv().unwrap();
4530        assert_eq!(msg, None);
4531    }
4532
4533    #[async_timed_test(timeout_secs = 30)]
4534    async fn test_split_port_once_mode_teardown() {
4535        let proc = Proc::isolated();
4536        let (actor, _actor_handle) = proc.client("actor").unwrap();
4537        let (port_handle, mut receiver) = actor.open_port::<u64>();
4538        let port_id = port_handle.bind().port_addr().clone();
4539
4540        // Set up an undeliverable receiver to capture messages sent to torn-down ports
4541        let (undeliverable_handle, mut undeliverable_receiver) =
4542            undeliverable::new_undeliverable_port();
4543
4544        // Split with Once(3) mode - accumulate 3 values then emit and tear down
4545        let reducer_spec = accum::sum::<u64>().reducer_spec();
4546        let split_port_id = port_id
4547            .split(&actor, reducer_spec, ReducerMode::Once(3), true)
4548            .unwrap();
4549
4550        // Send 3 messages to trigger reduction
4551        post(&actor, split_port_id.clone(), 10);
4552        post(&actor, split_port_id.clone(), 20);
4553        post(&actor, split_port_id.clone(), 30);
4554
4555        // Should receive a single reduced message
4556        let msg = receiver.recv().await.unwrap();
4557        assert_eq!(msg, 60); // 10 + 20 + 30
4558
4559        // Now send another message - it should fail because the port is torn down
4560        let serialized = wirevalue::Any::serialize(&100u64).unwrap();
4561        let envelope = MessageEnvelope::new(
4562            actor.mailbox().actor_addr().clone(),
4563            split_port_id.clone(),
4564            serialized,
4565            Flattrs::new(),
4566        );
4567        actor.mailbox().post(envelope, undeliverable_handle);
4568
4569        // Verify the message was returned as undeliverable
4570        let undeliverable =
4571            tokio::time::timeout(Duration::from_secs(2), undeliverable_receiver.recv())
4572                .await
4573                .expect("should receive undeliverable message")
4574                .expect("receiver should not be closed");
4575
4576        // Verify the undeliverable message has the correct destination
4577        let split_port_ref: PortAddr = split_port_id;
4578        assert_eq!(
4579            undeliverable
4580                .into_message()
4581                .expect("expected returned envelope")
4582                .dest(),
4583            &split_port_ref
4584        );
4585
4586        // Verify no additional messages arrived at the original receiver
4587        let msg = receiver.try_recv().unwrap();
4588        assert_eq!(msg, None);
4589    }
4590
4591    #[test]
4592    fn test_dial_mailbox_router_prefixes_empty() {
4593        assert_eq!(DialMailboxRouter::new().prefixes().len(), 0);
4594    }
4595
4596    #[test]
4597    fn test_dial_mailbox_router_prefixes_single_entry() {
4598        let router = DialMailboxRouter::new();
4599        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4600
4601        let prefixes: Vec<Addr> = router.prefixes().into_iter().collect();
4602        assert_eq!(prefixes.len(), 1);
4603        assert_eq!(prefixes[0], test_proc_ref("world0"));
4604    }
4605
4606    #[test]
4607    fn test_dial_mailbox_router_prefixes_no_overlap() {
4608        let router = DialMailboxRouter::new();
4609        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4610        router.bind(test_proc_ref("world1"), "unix!@2".parse().unwrap());
4611        router.bind(test_proc_ref("world2"), "unix!@3".parse().unwrap());
4612
4613        let mut prefixes: Vec<Addr> = router.prefixes().into_iter().collect();
4614        prefixes.sort();
4615
4616        let mut expected = vec![
4617            test_proc_ref("world0"),
4618            test_proc_ref("world1"),
4619            test_proc_ref("world2"),
4620        ];
4621        expected.sort();
4622
4623        assert_eq!(prefixes, expected);
4624    }
4625
4626    #[test]
4627    fn test_dial_mailbox_router_prefixes_with_overlaps() {
4628        let router = DialMailboxRouter::new();
4629        // Proc refs are all independent since they have different names.
4630        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4631        router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
4632        router.bind(test_proc_ref("world0_1"), "unix!@3".parse().unwrap());
4633        router.bind(test_proc_ref("world1"), "unix!@4".parse().unwrap());
4634        router.bind(test_proc_ref("world1_0"), "unix!@5".parse().unwrap());
4635
4636        let mut prefixes: Vec<Addr> = router.prefixes().into_iter().collect();
4637        prefixes.sort();
4638
4639        let mut expected = vec![
4640            test_proc_ref("world0"),
4641            test_proc_ref("world0_0"),
4642            test_proc_ref("world0_1"),
4643            test_proc_ref("world1"),
4644            test_proc_ref("world1_0"),
4645        ];
4646        expected.sort();
4647
4648        assert_eq!(prefixes, expected);
4649    }
4650
4651    #[test]
4652    fn test_dial_mailbox_router_prefixes_complex_hierarchy() {
4653        let router = DialMailboxRouter::new();
4654        // Proc refs still cover their own actors (same proc_id).
4655        router.bind(test_proc_ref("world0"), "unix!@1".parse().unwrap());
4656        router.bind(test_proc_ref("world0_0"), "unix!@2".parse().unwrap());
4657        router.bind(
4658            test_actor_ref("world0_0", "actor1"),
4659            "unix!@3".parse().unwrap(),
4660        );
4661        router.bind(test_proc_ref("world1_0"), "unix!@4".parse().unwrap());
4662        router.bind(test_proc_ref("world1_1"), "unix!@5".parse().unwrap());
4663        router.bind(
4664            test_actor_ref("world2_0", "actor0"),
4665            "unix!@6".parse().unwrap(),
4666        );
4667
4668        let mut prefixes: Vec<Addr> = router.prefixes().into_iter().collect();
4669        prefixes.sort();
4670
4671        // Covering prefixes:
4672        // - world0 (independent proc ref)
4673        // - world0_0 (covers world0_0.actor1 since same proc_id)
4674        // - world1_0 (not covered by anything else)
4675        // - world1_1 (not covered by anything else)
4676        // - world2_0.actor0 (not covered by anything else)
4677        let mut expected = vec![
4678            test_proc_ref("world0"),
4679            test_proc_ref("world0_0"),
4680            test_proc_ref("world1_0"),
4681            test_proc_ref("world1_1"),
4682            test_actor_ref("world2_0", "actor0"),
4683        ];
4684        expected.sort();
4685
4686        assert_eq!(prefixes, expected);
4687    }
4688
4689    #[test]
4690    fn test_dial_mailbox_router_prefixes_same_level() {
4691        let router = DialMailboxRouter::new();
4692        router.bind(test_proc_ref("world0_0"), "unix!@1".parse().unwrap());
4693        router.bind(test_proc_ref("world0_1"), "unix!@2".parse().unwrap());
4694        router.bind(test_proc_ref("world0_2"), "unix!@3".parse().unwrap());
4695
4696        let mut prefixes: Vec<Addr> = router.prefixes().into_iter().collect();
4697        prefixes.sort();
4698
4699        // All should be covering prefixes since none is a prefix of another
4700        let mut expected = vec![
4701            test_proc_ref("world0_0"),
4702            test_proc_ref("world0_1"),
4703            test_proc_ref("world0_2"),
4704        ];
4705        expected.sort();
4706
4707        assert_eq!(prefixes, expected);
4708    }
4709
4710    /// A forwarder that bounces messages back to the **same**
4711    /// mailbox, but does so on a task to avoid recursive stack
4712    /// growth.
4713    #[derive(Clone, Debug)]
4714    struct AsyncLoopForwarder;
4715
4716    #[async_trait]
4717    impl MailboxSender for AsyncLoopForwarder {
4718        fn post_unchecked(
4719            &self,
4720            envelope: MessageEnvelope,
4721            return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
4722        ) {
4723            let me = self.clone();
4724            tokio::spawn(async move {
4725                // Call `post` so each hop applies TTL exactly once.
4726                me.post(envelope, return_handle);
4727            });
4728        }
4729    }
4730
4731    #[tokio::test]
4732    async fn message_ttl_expires_in_routing_loop_returns_to_sender() {
4733        let actor_id = test_actor_id("world_0", "ttl_actor");
4734        let (ret_port, mut ret_rx) = undeliverable::new_undeliverable_port();
4735
4736        let remote_actor = test_actor_id("remote_world_1", "remote");
4737        let dest = remote_actor.port_addr(4242.into());
4738
4739        // Build an envelope (TTL is seeded in `MessageEnvelope::new` /
4740        // `::serialize`).
4741        let payload = 1234_u64;
4742        let envelope =
4743            MessageEnvelope::serialize(actor_id.clone(), dest.clone(), &payload, Flattrs::new())
4744                .expect("serialize");
4745
4746        AsyncLoopForwarder.post(envelope, ret_port.clone());
4747
4748        // We expect the undeliverable to come back once TTL expires.
4749        let undelivered = tokio::time::timeout(Duration::from_secs(5), ret_rx.recv())
4750            .await
4751            .expect("timed out waiting for undeliverable")
4752            .expect("channel closed")
4753            .into_message()
4754            .expect("expected returned envelope");
4755
4756        // Sanity: round-trip payload still deserializes.
4757        let got: u64 = undelivered.deserialized().expect("deserialize");
4758        assert_eq!(got, payload, "payload preserved");
4759    }
4760
4761    #[tokio::test]
4762    async fn message_ttl_success_local_delivery() {
4763        let actor_id = test_actor_id("world_0", "ttl_actor");
4764        let mailbox = Mailbox::new(actor_id.clone());
4765        let (_undeliverable_tx, mut undeliverable_rx) =
4766            mailbox.bind_handler_port::<Undeliverable<MessageEnvelope>>();
4767
4768        // Open a local user u64 port.
4769        let (user_port, mut user_rx) = mailbox.open_port::<u64>();
4770
4771        // Build an envelope destined for this mailbox's own port.
4772        let payload = 0xC0FFEE_u64;
4773        let envelope = MessageEnvelope::serialize(
4774            actor_id.clone(),
4775            user_port.bind().port_addr().clone(),
4776            &payload,
4777            Flattrs::new(),
4778        )
4779        .expect("serialize");
4780
4781        // Post the message using the mailbox (local path). TTL will
4782        // not expire.
4783        let return_handle = mailbox
4784            .bound_return_handle()
4785            .unwrap_or(monitored_return_handle());
4786        mailbox.post(envelope, return_handle);
4787
4788        // We should receive the payload locally.
4789        let got = tokio::time::timeout(Duration::from_secs(1), user_rx.recv())
4790            .await
4791            .expect("timed out waiting for local delivery")
4792            .expect("user port closed");
4793        assert_eq!(got, payload);
4794
4795        // There should be no undeliverables arriving.
4796        let no_undeliverable =
4797            tokio::time::timeout(Duration::from_millis(100), undeliverable_rx.recv()).await;
4798        assert!(
4799            no_undeliverable.is_err(),
4800            "unexpected undeliverable returned on successful local delivery"
4801        );
4802    }
4803
4804    #[tokio::test]
4805    async fn test_port_contramap() {
4806        let proc = Proc::isolated();
4807        let (client, _) = proc.client("client").unwrap();
4808        let (handle, mut rx) = client.open_port();
4809
4810        handle
4811            .contramap(|m| (1, m))
4812            .post(&client, "hello".to_string());
4813        assert_eq!(rx.recv().await.unwrap(), (1, "hello".to_string()));
4814    }
4815
4816    #[test]
4817    #[should_panic(expected = "already bound")]
4818    fn test_bind_port_handle_to_handler_port_twice() {
4819        let mbox = Mailbox::new(test_actor_id("0", "test"));
4820        let (handle, _rx) = mbox.open_port::<String>();
4821        handle.bind_handler_port();
4822        handle.bind_handler_port();
4823    }
4824
4825    #[test]
4826    fn test_bind_port_handle_to_handler_port() {
4827        let mbox = Mailbox::new(test_actor_id("0", "test"));
4828        let default_port = mbox.actor_addr().port_addr(Port::from(String::port()));
4829        let (handle, _rx) = mbox.open_port::<String>();
4830        // Handle's port index is allocated by mailbox, not the handler port.
4831        assert_ne!(default_port.index(), handle.inner.port_index);
4832        // Bind the handle to the handler port.
4833        handle.bind_handler_port();
4834        assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4835        // bind() can still be used, just it will not change handle's state.
4836        handle.bind();
4837        handle.bind();
4838        assert_matches!(handle.location(), PortLocation::Bound(port) if port == default_port);
4839    }
4840
4841    #[test]
4842    #[should_panic(expected = "already bound")]
4843    fn test_bind_port_handle_to_handler_port_when_already_bound() {
4844        let mbox = Mailbox::new(test_actor_id("0", "test"));
4845        let (handle, _rx) = mbox.open_port::<String>();
4846        // Bound handle to the port allocated by mailbox.
4847        handle.bind();
4848        assert_matches!(handle.location(), PortLocation::Bound(port) if port.index() == handle.inner.port_index);
4849        // Since handle is already bound, call bind_to() on it will cause panic.
4850        handle.bind_handler_port();
4851    }
4852
4853    #[tokio::test]
4854    async fn test_mailbox_post_fails_when_actor_stopped() {
4855        let actor_id = test_actor_id("0", "stopped_actor");
4856
4857        let mailbox = Mailbox::new(actor_id.clone());
4858
4859        mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4860
4861        let (user_port, _user_rx) = mailbox.open_port::<u64>();
4862
4863        // Use a separate return mailbox since
4864        // the main mailbox is stopped and won't accept messages.
4865        let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4866
4867        let envelope = MessageEnvelope::serialize(
4868            actor_id.clone(),
4869            user_port.bind().port_addr().clone(),
4870            &42u64,
4871            Flattrs::new(),
4872        )
4873        .expect("serialize");
4874
4875        mailbox.post(envelope, return_handle);
4876
4877        let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4878            .await
4879            .expect("timed out waiting for undeliverable")
4880            .expect("return port closed");
4881
4882        let err = undeliverable
4883            .into_message()
4884            .expect("expected returned envelope")
4885            .error_msg()
4886            .expect("expected error");
4887        assert!(
4888            err.contains(&format!("owner {} is stopped", actor_id)),
4889            "error should indicate actor stopped: {}",
4890            err
4891        );
4892    }
4893
4894    #[tokio::test]
4895    async fn test_mailbox_post_fails_when_actor_failed() {
4896        use crate::actor::ActorErrorKind;
4897
4898        let actor_id = test_actor_id("0", "failed_actor");
4899
4900        let mailbox = Mailbox::new(actor_id.clone());
4901
4902        let (user_port, _user_rx) = mailbox.open_port::<u64>();
4903
4904        mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4905            "test failure".to_string(),
4906        )));
4907
4908        // Use a separate return mailbox since
4909        // the main mailbox is failed and won't accept messages.
4910        let (return_handle, mut return_rx) = undeliverable::new_undeliverable_port();
4911
4912        let envelope = MessageEnvelope::serialize(
4913            actor_id.clone(),
4914            user_port.bind().port_addr().clone(),
4915            &42u64,
4916            Flattrs::new(),
4917        )
4918        .expect("serialize");
4919
4920        mailbox.post(envelope, return_handle);
4921
4922        let undeliverable = tokio::time::timeout(Duration::from_secs(1), return_rx.recv())
4923            .await
4924            .expect("timed out waiting for undeliverable")
4925            .expect("return port closed");
4926
4927        let err = undeliverable
4928            .into_message()
4929            .expect("expected returned envelope")
4930            .error_msg()
4931            .expect("expected error");
4932        assert!(
4933            err.contains(&format!("owner {} failed", actor_id)),
4934            "error should indicate actor failed: {}",
4935            err
4936        );
4937    }
4938
4939    #[tokio::test]
4940    async fn test_port_handle_send_fails_when_actor_stopped() {
4941        let actor_id = test_actor_id("0", "stopped_actor");
4942
4943        let mailbox = Mailbox::new(actor_id.clone());
4944
4945        let (port_handle, _rx) = mailbox.open_port::<u64>();
4946        let proc = Proc::isolated();
4947        let (client, _) = proc.client("client").unwrap();
4948
4949        mailbox.close(ActorStatus::Stopped("test stop".to_string()));
4950
4951        let err = port_handle.try_send(&client, 42u64).unwrap_err();
4952        assert_matches!(
4953            err.kind(),
4954            MailboxSenderErrorKind::Mailbox(mailbox_err)
4955                if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "test stop")
4956        );
4957    }
4958
4959    #[tokio::test]
4960    async fn test_port_handle_send_fails_when_actor_failed() {
4961        use crate::actor::ActorErrorKind;
4962
4963        let actor_id = test_actor_id("0", "failed_actor");
4964
4965        let mailbox = Mailbox::new(actor_id.clone());
4966
4967        let (port_handle, _rx) = mailbox.open_port::<u64>();
4968        let proc = Proc::isolated();
4969        let (client, _) = proc.client("client").unwrap();
4970
4971        mailbox.close(ActorStatus::Failed(ActorErrorKind::Generic(
4972            "test failure".to_string(),
4973        )));
4974
4975        let err = port_handle.try_send(&client, 42u64).unwrap_err();
4976        assert_matches!(
4977            err.kind(),
4978            MailboxSenderErrorKind::Mailbox(mailbox_err)
4979                if matches!(mailbox_err.kind(), MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg))) if msg == "test failure")
4980        );
4981    }
4982
4983    #[async_timed_test(timeout_secs = 30)]
4984    async fn test_open_reduce_port() {
4985        let proc = Proc::isolated();
4986        let (client, _) = proc.client("client").unwrap();
4987
4988        // Open an accumulator port with sum reducer
4989        let (port_handle, receiver) = client.mailbox().open_reduce_port(accum::sum::<u64>());
4990
4991        // Verify the reducer_spec is set
4992        let port_ref = port_handle.bind();
4993        assert!(port_ref.reducer_spec().is_some());
4994
4995        // Send a single value via the bound port
4996        port_ref.post(&client, 42);
4997
4998        // Should receive the value
4999        let result = receiver.recv().await.unwrap();
5000        assert_eq!(result, 42);
5001    }
5002
5003    #[async_timed_test(timeout_secs = 30)]
5004    async fn test_open_reduce_port_reducer_spec_preserved() {
5005        let proc = Proc::isolated();
5006        let (client, _) = proc.client("client").unwrap();
5007
5008        // Test that different accumulators produce different reducer_specs
5009        let (sum_handle, _) = client.mailbox().open_reduce_port(accum::sum::<u64>());
5010        let sum_ref = sum_handle.bind();
5011        let sum_typehash = sum_ref.reducer_spec().as_ref().unwrap().typehash;
5012
5013        let (max_handle, _) = client
5014            .mailbox()
5015            .open_reduce_port(accum::join_semilattice::<accum::Max<u64>>());
5016        let max_ref = max_handle.bind();
5017        let max_typehash = max_ref.reducer_spec().as_ref().unwrap().typehash;
5018
5019        // Different accumulators should have different reducer typehashes
5020        assert_ne!(sum_typehash, max_typehash);
5021    }
5022
5023    /// Test that `MailboxClient::flush()` waits until messages are wire-acked
5024    /// over a unix domain socket channel. We send messages, flush, and then
5025    /// confirm that the messages have already been delivered to the receiving
5026    /// mailbox.
5027    #[tokio::test]
5028    async fn test_flush_over_unix_channel() {
5029        let mbox = Mailbox::new(test_actor_id("0", "actor0"));
5030
5031        // Serve the mailbox on a unix domain socket channel.
5032        let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
5033        let serve_handle = mbox.clone().serve(rx);
5034
5035        // Dial the unix address to get a MailboxClient.
5036        let client = MailboxClient::dial(addr).unwrap();
5037
5038        // Open a streaming port so we can receive multiple messages.
5039        let (port, mut receiver) = mbox.open_port::<u64>();
5040        let port = port.bind();
5041
5042        // Send several messages without awaiting delivery.
5043        for i in 0..10u64 {
5044            client
5045                .serialize_and_send(&port, i, monitored_return_handle())
5046                .unwrap();
5047        }
5048
5049        // Flush: this should block until all 10 messages are wire-acked,
5050        // meaning they've been enqueued into the receiving mailbox.
5051        client.flush().await.unwrap();
5052
5053        // After flush, all messages should already be available.
5054        for i in 0..10u64 {
5055            let msg = receiver
5056                .try_recv()
5057                .expect("message should be available after flush")
5058                .expect("receiver should not be empty after flush");
5059            assert_eq!(msg, i);
5060        }
5061
5062        serve_handle.stop("test done");
5063        serve_handle.await.unwrap().unwrap();
5064    }
5065
5066    #[test]
5067    fn test_drain_waits_for_active_handler_enqueue() {
5068        let mailbox = Mailbox::new(test_actor_id("drain", "actor"));
5069        let (entered_tx, entered_rx) = std::sync::mpsc::channel();
5070        let release = Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
5071        let delivered = Arc::new(AtomicUsize::new(0));
5072
5073        let port = mailbox.open_handler_enqueue_port({
5074            let release = Arc::clone(&release);
5075            let delivered = Arc::clone(&delivered);
5076            move |_headers, _message: u64| {
5077                entered_tx.send(()).unwrap();
5078                let (lock, cvar) = &*release;
5079                let mut released = lock.lock().unwrap();
5080                while !*released {
5081                    released = cvar.wait(released).unwrap();
5082                }
5083                delivered.fetch_add(1, Ordering::SeqCst);
5084                Ok(())
5085            }
5086        });
5087
5088        let sender = port.inner.sender.clone();
5089        let sender_thread = std::thread::spawn(move || sender.send(Flattrs::new(), 1u64).unwrap());
5090        entered_rx.recv_timeout(Duration::from_secs(1)).unwrap();
5091
5092        let (drained_tx, drained_rx) = std::sync::mpsc::channel();
5093        let drain_thread = std::thread::spawn({
5094            let mailbox = mailbox.clone();
5095            move || {
5096                mailbox.drain();
5097                drained_tx.send(()).unwrap();
5098            }
5099        });
5100
5101        let deadline = std::time::Instant::now() + Duration::from_secs(1);
5102        while mailbox.inner.handler_ingress.state.load(Ordering::Acquire) & HANDLER_INGRESS_DRAINING
5103            == 0
5104        {
5105            assert!(std::time::Instant::now() < deadline, "drain did not start");
5106            std::thread::yield_now();
5107        }
5108        assert_matches!(
5109            drained_rx.try_recv(),
5110            Err(std::sync::mpsc::TryRecvError::Empty)
5111        );
5112
5113        let (lock, cvar) = &*release;
5114        *lock.lock().unwrap() = true;
5115        cvar.notify_all();
5116
5117        sender_thread.join().unwrap();
5118        drained_rx.recv_timeout(Duration::from_secs(1)).unwrap();
5119        drain_thread.join().unwrap();
5120        assert_eq!(delivered.load(Ordering::SeqCst), 1);
5121
5122        let err = port.inner.sender.send(Flattrs::new(), 2u64).unwrap_err();
5123        assert!(err.is::<HandlerPortClosedError>());
5124    }
5125
5126    /// Helper: build a `MessageEnvelope` with a recognizable payload
5127    /// and non-empty headers, then feed it through
5128    /// `UndeliverableMailboxSender::post_unchecked`. Returns the
5129    /// sender + destination so tests can assert against the values we
5130    /// know will end up on the log.
5131    fn drive_abandonment_log(payload_sentinel: &str) -> (crate::ActorAddr, crate::PortAddr) {
5132        use hyperactor_config::declare_attrs;
5133
5134        declare_attrs! {
5135            // Any non-empty entry works; UM-1 only asserts the log
5136            // does not dump `headers` inline.
5137            attr UM_TEST_HEADER: u64;
5138        }
5139
5140        let sender = test_actor_id("um_proc", "um_sender");
5141        let dest = test_port_id("um_dest_proc", "um_dest", 42);
5142
5143        let mut headers = Flattrs::new();
5144        headers.set(UM_TEST_HEADER, 0xC0FFEEu64);
5145
5146        let envelope = MessageEnvelope::new(
5147            sender.clone(),
5148            dest.clone(),
5149            wirevalue::Any::serialize(&payload_sentinel.to_string()).unwrap(),
5150            headers,
5151        );
5152
5153        let (return_handle, _rx) = crate::mailbox::undeliverable::new_undeliverable_port();
5154        UndeliverableMailboxSender.post_unchecked(envelope, return_handle);
5155        (sender, dest)
5156    }
5157
5158    /// UM-1: the log does not render unbounded `headers` or `data`
5159    /// fields, and reports bounded `message_type` + `data_len`
5160    /// summaries instead.
5161    #[tracing_test::traced_test]
5162    #[test]
5163    fn test_um1_bounded_fields() {
5164        let payload_sentinel = "um1_payload_sentinel_5b7a9c3d";
5165        let (_sender, _dest) = drive_abandonment_log(payload_sentinel);
5166
5167        let buf = tracing_test::internal::global_buf().lock().unwrap();
5168        let logs = std::str::from_utf8(&buf).expect("logs are utf-8");
5169
5170        // Bounded summaries are present.
5171        assert!(
5172            logs.contains("message_type="),
5173            "UM-1: expected message_type field, got:\n{logs}"
5174        );
5175        assert!(
5176            logs.contains("data_len="),
5177            "UM-1: expected data_len field, got:\n{logs}"
5178        );
5179        // The payload body must not appear (no `data=<full body>`
5180        // dump).
5181        assert!(
5182            !logs.contains(payload_sentinel),
5183            "UM-1: payload body leaked into the log:\n{logs}"
5184        );
5185        // The `headers=` field must not appear. Use a space-prefixed
5186        // match to avoid matching e.g. the word "headers" in free
5187        // prose.
5188        assert!(
5189            !logs.contains(" headers="),
5190            "UM-1: unbounded headers field leaked into the log:\n{logs}"
5191        );
5192    }
5193
5194    /// UM-2: the `actor_name` and `actor_id` fields are preserved
5195    /// on the log for downstream Scuba / alert compatibility — same
5196    /// field names, same values, same types as the prior shape.
5197    #[tracing_test::traced_test]
5198    #[test]
5199    fn test_um2_compat_fields_preserved() {
5200        let (sender, _dest) = drive_abandonment_log("um2_payload");
5201
5202        let buf = tracing_test::internal::global_buf().lock().unwrap();
5203        let logs = std::str::from_utf8(&buf).expect("logs are utf-8");
5204
5205        // Decouple field-presence from value-presence so the test
5206        // does not depend on the tracing formatter's quoting rules.
5207        let actor_name = sender.log_name();
5208        let actor_id = sender.to_string();
5209        assert!(
5210            logs.contains("actor_name=") && logs.contains(actor_name),
5211            "UM-2: expected actor_name={actor_name} on the log, got:\n{logs}"
5212        );
5213        assert!(
5214            logs.contains("actor_id=") && logs.contains(&actor_id),
5215            "UM-2: expected actor_id={actor_id} on the log, got:\n{logs}"
5216        );
5217    }
5218
5219    /// UM-3: the format string names the transport destination.
5220    #[tracing_test::traced_test]
5221    #[test]
5222    fn test_um3_destination_format() {
5223        let (_sender, dest) = drive_abandonment_log("um3_payload");
5224
5225        let buf = tracing_test::internal::global_buf().lock().unwrap();
5226        let logs = std::str::from_utf8(&buf).expect("logs are utf-8");
5227
5228        assert!(
5229            logs.contains(&format!("message not delivered to {}", dest)),
5230            "UM-3: expected destination-naming format string, got:\n{logs}"
5231        );
5232    }
5233
5234    /// UM-3b: when the envelope carries operation-context headers, the
5235    /// format string names the user operation and the log carries
5236    /// the structured `endpoint` / `adverb` fields.
5237    #[tracing_test::traced_test]
5238    #[test]
5239    fn test_um3b_operation_format_with_operation_context() {
5240        let sender = test_actor_id("um_proc", "um_sender");
5241        let dest = test_port_id("um_dest_proc", "um_dest", 42);
5242
5243        let mut headers = Flattrs::new();
5244        headers.set(
5245            headers::OPERATION_ENDPOINT,
5246            "training.buffer.sample()".to_string(),
5247        );
5248        headers.set(headers::OPERATION_ADVERB, "call_one".to_string());
5249
5250        let envelope = MessageEnvelope::new(
5251            sender,
5252            dest,
5253            wirevalue::Any::serialize(&"um3b_payload".to_string()).unwrap(),
5254            headers,
5255        );
5256
5257        let (return_handle, _rx) = crate::mailbox::undeliverable::new_undeliverable_port();
5258        UndeliverableMailboxSender.post_unchecked(envelope, return_handle);
5259
5260        let buf = tracing_test::internal::global_buf().lock().unwrap();
5261        let logs = std::str::from_utf8(&buf).expect("logs are utf-8");
5262
5263        assert!(
5264            logs.contains("abandoned message for training.buffer.sample()"),
5265            "UM-3b: expected operation-naming format string, got:\n{logs}"
5266        );
5267        assert!(
5268            logs.contains("endpoint=") && logs.contains("training.buffer.sample()"),
5269            "UM-3b: expected endpoint field with the caller's operation, got:\n{logs}"
5270        );
5271        assert!(
5272            logs.contains("adverb=") && logs.contains("call_one"),
5273            "UM-3b: expected adverb field, got:\n{logs}"
5274        );
5275        // The UM-3a destination-naming shape must not appear when
5276        // operation-context headers are stamped on the envelope.
5277        assert!(
5278            !logs.contains("message not delivered to"),
5279            "UM-3b: unexpected destination-naming format string:\n{logs}"
5280        );
5281    }
5282}