Skip to main content

hyperactor/mailbox/
undeliverable.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Undeliverable-message port helpers and the user-visible
10//! `UndeliverableMessageError` type.
11//!
12//! ## Undeliverable-error text invariants (UE-*)
13//!
14//! - **UE-1 (bounded rendering).** `UndeliverableMessageError`
15//!   `Display` must not render `envelope.headers()` or
16//!   `envelope.data()` via their `Display` impls.
17//!
18//! - **UE-2 (core diagnostics preserved).** The rendered text keeps
19//!   `sender`, `dest`, `message type`, `data_len`, and `error`.
20//!
21//! - **UE-3 (top-line shape).** The top line names the operation when
22//!   the envelope carries `OPERATION_ENDPOINT`. Otherwise it names the
23//!   actual failing hop, derived from the variant of
24//!   `UndeliverableMessageError`:
25//!   - `DeliveryFailure` → `"undeliverable message to {dest}"`,
26//!     mirroring the abandonment-log surface in `mailbox.rs`.
27//!   - `ReturnFailure` → `"undeliverable return to original sender
28//!     {sender}"`. Sender/dest in this variant refer to the *original*
29//!     envelope, so headlining `dest` would misstate the failing hop;
30//!     the return-to-sender hop is the one that actually failed.
31//!
32//!   Both shapes only relocate UE-2 stable rendered fields into the
33//!   headline; no unbounded surface is introduced.
34//!
35//! - **UE-4 (neutral wording).** Top-line wording is neutral re.
36//!   request/reply classification. The three shapes — `"undeliverable
37//!   message for {operation} ({adverb})"`, `"undeliverable message to
38//!   {dest}"`, and `"undeliverable return to original sender
39//!   {sender}"` — describe a bounce without claiming send-kind.
40//!
41//! - **UE-5 (message-type fallback).** When wirevalue type resolution
42//!   is unavailable (`envelope.data().typename()` returns `None`), the
43//!   `message type:` field falls back to the stamped
44//!   `RUST_MESSAGE_TYPE` header (planted at every send by the
45//!   `PortHandle`/`PortRef` paths in `mailbox.rs` / `ref_.rs`) before
46//!   rendering the literal `"unknown"`. `"unknown"` is reserved for
47//!   envelopes lacking both.
48
49use std::sync::OnceLock;
50
51use enum_as_inner::EnumAsInner;
52use serde::Deserialize;
53use serde::Serialize;
54use thiserror::Error;
55
56use crate::ActorAddr;
57use crate::ActorHandle;
58use crate::EndpointLocation;
59use crate::Instance;
60// for macros
61use crate::Message;
62use crate::Proc;
63use crate::mailbox::DeliveryError;
64use crate::mailbox::MailboxSender;
65use crate::mailbox::MailboxSenderError;
66use crate::mailbox::MessageEnvelope;
67use crate::mailbox::PortHandle;
68use crate::mailbox::PortReceiver;
69use crate::mailbox::UndeliverableMailboxSender;
70use crate::mailbox::headers::OPERATION_ADVERB;
71use crate::mailbox::headers::OPERATION_ENDPOINT;
72use crate::mailbox::headers::RUST_MESSAGE_TYPE;
73
74/// Metadata for a message that could not be delivered and could not be
75/// returned.
76#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
77pub struct LostMessage {
78    /// The actor that attempted the send.
79    pub sender: ActorAddr,
80    /// The destination that rejected the message.
81    pub dest: EndpointLocation,
82    /// The message type, if known.
83    pub message_type: Option<String>,
84    /// The delivery failure.
85    pub error: String,
86}
87
88impl LostMessage {
89    /// Construct lost-message metadata from a local send error.
90    pub(crate) fn from_send_error<M: Message>(
91        sender: ActorAddr,
92        dest: EndpointLocation,
93        error: &MailboxSenderError,
94    ) -> Self {
95        Self {
96            sender,
97            dest,
98            message_type: Some(std::any::type_name::<M>().to_string()),
99            error: error.to_string(),
100        }
101    }
102}
103
104/// An undeliverable `M`-typed message.
105#[expect(
106    clippy::large_enum_variant,
107    reason = "returned messages stay inline so callers can recover the original payload without extra allocation"
108)]
109#[derive(
110    Debug,
111    EnumAsInner,
112    Serialize,
113    Deserialize,
114    Clone,
115    PartialEq,
116    typeuri::Named
117)]
118pub enum Undeliverable<M: Message> {
119    /// The message was returned intact.
120    Message(M),
121    /// The message was lost before it could be returned.
122    Lost(LostMessage),
123}
124
125impl<M: Message> Undeliverable<M> {
126    /// Construct an undeliverable message that preserves the original payload.
127    pub fn message(message: M) -> Self {
128        Self::Message(message)
129    }
130
131    /// Construct an undeliverable message that carries only lost-message
132    /// metadata.
133    pub fn lost(message: LostMessage) -> Self {
134        Self::Lost(message)
135    }
136}
137
138// Port handle and receiver for undeliverable messages.
139pub(crate) fn new_undeliverable_port() -> (
140    PortHandle<Undeliverable<MessageEnvelope>>,
141    PortReceiver<Undeliverable<MessageEnvelope>>,
142) {
143    let proc = Proc::isolated();
144    crate::mailbox::Mailbox::new(proc.proc_addr().actor_addr("undeliverable"))
145        .open_port::<Undeliverable<MessageEnvelope>>()
146}
147
148// An undeliverable message port handle to be shared amongst multiple
149// producers. Messages sent here are forwarded to the undeliverable
150// mailbox sender.
151static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
152    OnceLock::new();
153/// Accessor to the shared monitored undeliverable message port
154/// handle. Initialization spawns the undeliverable message port
155/// monitor that forwards incoming messages to the undeliverable
156/// mailbox sender.
157pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
158    let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
159        let (return_handle, mut rx) = new_undeliverable_port();
160        // Don't reuse `return_handle` for `h`: else it will never get
161        // dropped and the task will never return.
162        let (h, _) = new_undeliverable_port();
163        crate::init::get_runtime().spawn(async move {
164            while let Ok(undeliverable) = rx.recv().await {
165                match undeliverable {
166                    Undeliverable::Message(mut envelope) => {
167                        envelope.set_error(DeliveryError::BrokenLink(
168                            "message returned to undeliverable port".to_string(),
169                        ));
170                        super::UndeliverableMailboxSender
171                            .post(envelope, /*unused */ h.clone());
172                    }
173                    Undeliverable::Lost(lost) => {
174                        tracing::error!(
175                            sender = %lost.sender,
176                            dest = %lost.dest,
177                            message_type = lost.message_type.as_deref().unwrap_or("unknown"),
178                            error = %lost.error,
179                            "lost message returned to undeliverable port"
180                        );
181                    }
182                }
183            }
184        });
185        return_handle
186    });
187
188    return_handle.clone()
189}
190
191/// Now that monitored return handles are rare, it's becoming helpful
192/// to get insights into where they are getting used (so that they can
193/// be eliminated and replaced with something better).
194#[track_caller]
195pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
196    let caller = caller.to_owned();
197    let (return_handle, mut rx) = new_undeliverable_port();
198    tokio::task::spawn(async move {
199        while let Ok(undeliverable) = rx.recv().await {
200            match undeliverable {
201                Undeliverable::Message(mut envelope) => {
202                    envelope.set_error(DeliveryError::BrokenLink(
203                        "message returned to undeliverable port".to_string(),
204                    ));
205                    tracing::error!("{caller} took back an undeliverable message: {}", envelope);
206                }
207                Undeliverable::Lost(lost) => {
208                    tracing::error!(
209                        sender = %lost.sender,
210                        dest = %lost.dest,
211                        message_type = lost.message_type.as_deref().unwrap_or("unknown"),
212                        error = %lost.error,
213                        "{caller} took back a lost message"
214                    );
215                }
216            }
217        }
218    });
219    return_handle
220}
221
222/// Returns a message envelope to its original sender.
223pub(crate) fn return_undeliverable(
224    return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
225    envelope: MessageEnvelope,
226) {
227    if envelope.return_undeliverable() {
228        // A global client for returning undeliverable messages.
229        static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
230        let client = &CLIENT
231            .get_or_init(|| Proc::runtime().client("global_return_client").unwrap())
232            .0;
233        let envelope_copy = envelope.clone();
234        if return_handle
235            .try_send(client, Undeliverable::message(envelope))
236            .is_err()
237        {
238            UndeliverableMailboxSender.post(envelope_copy, /*unused*/ return_handle)
239        }
240    }
241}
242
243#[derive(Debug, Error)]
244/// Errors that occur during message delivery and return.
245pub enum UndeliverableMessageError {
246    /// Delivery of a message to its destination failed.
247    DeliveryFailure {
248        /// The undelivered message.
249        envelope: MessageEnvelope,
250    },
251
252    /// Delivery of an undeliverable message back to its sender
253    /// failed.
254    ReturnFailure {
255        /// The undelivered message.
256        envelope: MessageEnvelope,
257    },
258
259    /// A message was lost before it could be returned.
260    Lost {
261        /// The lost-message metadata.
262        lost: LostMessage,
263    },
264}
265
266/// Compute the top-line prefix for a bounced envelope (UE-3, UE-4).
267///
268/// When `OPERATION_ENDPOINT` is present, name the operation. Otherwise
269/// name the actual failing hop, which differs between the two variants:
270/// `DeliveryFailure` failed at `sender → dest`, while `ReturnFailure`
271/// failed at the return hop `system → original sender` (sender/dest
272/// in that variant still describe the original envelope, not the
273/// failing return).
274fn undeliverable_prefix(error: &UndeliverableMessageError) -> String {
275    let envelope = match error {
276        UndeliverableMessageError::DeliveryFailure { envelope }
277        | UndeliverableMessageError::ReturnFailure { envelope } => envelope,
278        UndeliverableMessageError::Lost { lost } => {
279            return format!("lost message to {}", lost.dest);
280        }
281    };
282    if let Some(endpoint) = envelope.headers().get(OPERATION_ENDPOINT) {
283        let adverb = envelope
284            .headers()
285            .get(OPERATION_ADVERB)
286            .unwrap_or_else(|| "?".to_string());
287        return format!("undeliverable message for {} ({})", endpoint, adverb);
288    }
289    match error {
290        UndeliverableMessageError::DeliveryFailure { .. } => {
291            format!("undeliverable message to {}", envelope.dest())
292        }
293        UndeliverableMessageError::ReturnFailure { .. } => {
294            format!(
295                "undeliverable return to original sender {}",
296                envelope.sender()
297            )
298        }
299        UndeliverableMessageError::Lost { lost } => format!("lost message to {}", lost.dest),
300    }
301}
302
303impl std::fmt::Display for UndeliverableMessageError {
304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305        // For `DeliveryFailure`, the sender/dest fields describe the
306        // failing hop. For `ReturnFailure`, they describe the
307        // *original* envelope — the return hop failed, but the
308        // identity fields still refer to the original delivery. Keep
309        // the labels distinct so readers know which one they're
310        // looking at.
311        let (envelope, description, sender_label, dest_label) = match self {
312            UndeliverableMessageError::DeliveryFailure { envelope } => (
313                envelope,
314                "delivery of message from sender to dest failed",
315                "sender",
316                "dest",
317            ),
318            UndeliverableMessageError::ReturnFailure { envelope } => (
319                envelope,
320                "returning undeliverable message to original sender failed",
321                "original sender",
322                "original dest",
323            ),
324            UndeliverableMessageError::Lost { lost } => {
325                writeln!(f, "{}:", undeliverable_prefix(self))?;
326                writeln!(
327                    f,
328                    "\tdescription: message was lost before it could be returned"
329                )?;
330                writeln!(f, "\tsender: {}", lost.sender)?;
331                writeln!(f, "\tdest: {}", lost.dest)?;
332                writeln!(
333                    f,
334                    "\tmessage type: {}",
335                    lost.message_type.as_deref().unwrap_or("unknown")
336                )?;
337                writeln!(f, "\terror: {}", lost.error)?;
338                return Ok(());
339            }
340        };
341
342        writeln!(f, "{}:", undeliverable_prefix(self))?;
343        writeln!(f, "\tdescription: {}", description)?;
344        writeln!(f, "\t{}: {}", sender_label, envelope.sender())?;
345        writeln!(f, "\t{}: {}", dest_label, envelope.dest())?;
346        // UE-5: prefer the wirevalue-resolved typename; fall back to
347        // the static `RUST_MESSAGE_TYPE` stamped at send time before
348        // resorting to the literal "unknown".
349        let message_type = envelope
350            .data()
351            .typename()
352            .map(|s| s.to_string())
353            .or_else(|| envelope.headers().get(RUST_MESSAGE_TYPE))
354            .unwrap_or_else(|| "unknown".to_string());
355        writeln!(f, "\tmessage type: {}", message_type)?;
356        writeln!(f, "\tdata_len: {}", envelope.data().len())?;
357        writeln!(
358            f,
359            "\terror: {}",
360            envelope.error_msg().unwrap_or("<none>".to_string())
361        )
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use hyperactor_config::Flattrs;
368
369    use super::*;
370    use crate::mailbox::MessageEnvelope;
371    use crate::testing::ids::test_actor_id;
372    use crate::testing::ids::test_port_id;
373
374    fn make_envelope(payload: &str, headers: Flattrs) -> MessageEnvelope {
375        let sender = test_actor_id("ue_proc", "ue_sender");
376        let dest = test_port_id("ue_dest_proc", "ue_dest", 42);
377        let data = wirevalue::Any::serialize(&payload.to_string()).unwrap();
378        MessageEnvelope::new(sender, dest, data, headers)
379    }
380
381    /// UE-1: `DeliveryFailure` Display is bounded — no unbounded
382    /// `headers: ...` or `data: ...` dumps. `data_len` replaces
383    /// the payload body.
384    #[test]
385    fn test_ue1_delivery_failure_bounded() {
386        let payload: String = std::iter::repeat_n('x', 10_000).collect();
387        let mut headers = Flattrs::new();
388        headers.set(OPERATION_ENDPOINT, "training.buffer.sample()".to_string());
389        let envelope = make_envelope(&payload, headers);
390        let rendered = format!(
391            "{}",
392            UndeliverableMessageError::DeliveryFailure { envelope }
393        );
394
395        assert!(
396            rendered.contains("message type:"),
397            "UE-1: message type field must be present, got:\n{rendered}"
398        );
399        assert!(
400            rendered.contains("data_len:"),
401            "UE-1: data_len field must be present, got:\n{rendered}"
402        );
403        assert!(
404            rendered.contains("sender:"),
405            "UE-2: sender field must be preserved, got:\n{rendered}"
406        );
407        assert!(
408            rendered.contains("dest:"),
409            "UE-2: dest field must be preserved, got:\n{rendered}"
410        );
411        assert!(
412            rendered.contains("error:"),
413            "UE-2: error field must be preserved, got:\n{rendered}"
414        );
415        // UE-1: the unbounded raw dumps must not appear.
416        assert!(
417            !rendered.contains("\theaders: "),
418            "UE-1: raw headers dump leaked, got:\n{rendered}"
419        );
420        assert!(
421            !rendered.contains("\tdata: "),
422            "UE-1: raw data dump leaked, got:\n{rendered}"
423        );
424        // The 10_000-byte payload body must not appear verbatim.
425        assert!(
426            !rendered.contains(&payload),
427            "UE-1: payload body leaked into rendered text"
428        );
429    }
430
431    /// UE-1: `ReturnFailure` Display is bounded — same rule as
432    /// `DeliveryFailure`. Covers the other match arm.
433    #[test]
434    fn test_ue1_return_failure_bounded() {
435        let payload: String = std::iter::repeat_n('y', 10_000).collect();
436        let envelope = make_envelope(&payload, Flattrs::new());
437        let rendered = format!("{}", UndeliverableMessageError::ReturnFailure { envelope });
438
439        assert!(
440            rendered.contains("data_len:"),
441            "UE-1: data_len field must be present, got:\n{rendered}"
442        );
443        assert!(
444            !rendered.contains("\theaders: "),
445            "UE-1: raw headers dump leaked, got:\n{rendered}"
446        );
447        assert!(
448            !rendered.contains("\tdata: "),
449            "UE-1: raw data dump leaked, got:\n{rendered}"
450        );
451        assert!(
452            !rendered.contains(&payload),
453            "UE-1: payload body leaked into rendered text"
454        );
455    }
456
457    /// UE-3 / UE-4: when the envelope carries an operation endpoint,
458    /// the top line is `"undeliverable message for <endpoint>
459    /// (<adverb>)"`. Neutral wording — no claim about send vs reply
460    /// kind.
461    #[test]
462    fn test_ue3_operation_endpoint_names_top_line() {
463        let mut headers = Flattrs::new();
464        headers.set(OPERATION_ENDPOINT, "training.buffer.sample()".to_string());
465        headers.set(OPERATION_ADVERB, "call_one".to_string());
466        let envelope = make_envelope("payload", headers);
467        let rendered = format!(
468            "{}",
469            UndeliverableMessageError::DeliveryFailure { envelope }
470        );
471
472        let expected_line = "undeliverable message for training.buffer.sample() (call_one):";
473        assert!(
474            rendered.starts_with(expected_line),
475            "UE-3/UE-4: expected top line `{expected_line}`, got:\n{rendered}"
476        );
477        // UE-4 specifically: the wording must be neutral — it must
478        // not claim "reply" or "send" when we only know that
479        // operation context is present.
480        assert!(
481            !rendered.contains("undeliverable reply"),
482            "UE-4: must not claim reply-kind from header presence alone, got:\n{rendered}"
483        );
484        assert!(
485            !rendered.contains("undeliverable send"),
486            "UE-4: must not claim send-kind from header presence alone, got:\n{rendered}"
487        );
488    }
489
490    /// UE-3: `DeliveryFailure` with no operation context falls back to
491    /// naming the destination (the actual failing hop), mirroring the
492    /// abandonment-log surface in `mailbox.rs`.
493    #[test]
494    fn test_ue3_delivery_failure_no_context_names_destination() {
495        let envelope = make_envelope("payload", Flattrs::new());
496        let dest_str = envelope.dest().to_string();
497        let rendered = format!(
498            "{}",
499            UndeliverableMessageError::DeliveryFailure { envelope }
500        );
501
502        let expected_prefix = format!("undeliverable message to {}", dest_str);
503        assert!(
504            rendered.starts_with(&expected_prefix),
505            "UE-3: delivery failure no context → destination prefix `{expected_prefix}`, got:\n{rendered}"
506        );
507        // The retired neutral wording must not return.
508        assert!(
509            !rendered.contains("undeliverable message error"),
510            "UE-3: neutral fallback must not be re-introduced, got:\n{rendered}"
511        );
512    }
513
514    /// UE-3: `ReturnFailure` with no operation context names the
515    /// original sender, because in this variant `sender`/`dest` refer
516    /// to the original envelope and the actual failing hop is
517    /// `system → original sender`. Headlining `dest` here would
518    /// misstate the failure.
519    #[test]
520    fn test_ue3_return_failure_no_context_names_original_sender() {
521        let envelope = make_envelope("payload", Flattrs::new());
522        let sender_str = envelope.sender().to_string();
523        let dest_str = envelope.dest().to_string();
524        let rendered = format!("{}", UndeliverableMessageError::ReturnFailure { envelope });
525
526        let expected_prefix = format!("undeliverable return to original sender {}", sender_str);
527        assert!(
528            rendered.starts_with(&expected_prefix),
529            "UE-3: return failure no context → original-sender prefix `{expected_prefix}`, got:\n{rendered}"
530        );
531        // Must not headline the original destination — the failing hop
532        // is the return to the original sender, not the original
533        // delivery.
534        assert!(
535            !rendered.starts_with(&format!("undeliverable message to {}", dest_str)),
536            "UE-3: return failure must not headline the original destination, got:\n{rendered}"
537        );
538        // The retired neutral wording must not return.
539        assert!(
540            !rendered.contains("undeliverable message error"),
541            "UE-3: neutral fallback must not be re-introduced, got:\n{rendered}"
542        );
543    }
544
545    /// UE-5: when wirevalue type resolution is unavailable
546    /// (`typename()` is `None`), the formatter falls back to the
547    /// static `RUST_MESSAGE_TYPE` stamped at send time.
548    #[test]
549    fn test_ue5_message_type_falls_back_to_rust_message_type() {
550        // `Any::new_broken()` carries `BROKEN_TYPEHASH` (0), which is
551        // not in the wirevalue type registry, so `typename()` is None.
552        // Mirrors the `test_broken_any` pattern in wirevalue itself.
553        let sender = test_actor_id("ue_proc", "ue_sender");
554        let dest = test_port_id("ue_dest_proc", "ue_dest", 42);
555        let mut headers = Flattrs::new();
556        headers.set(RUST_MESSAGE_TYPE, "my::Foo".to_string());
557        let envelope = MessageEnvelope::new(sender, dest, wirevalue::Any::new_broken(), headers);
558        assert!(
559            envelope.data().typename().is_none(),
560            "test fixture invariant: broken Any must have no typename()"
561        );
562
563        let rendered = format!(
564            "{}",
565            UndeliverableMessageError::DeliveryFailure { envelope }
566        );
567
568        assert!(
569            rendered.contains("\tmessage type: my::Foo\n"),
570            "UE-5: must surface RUST_MESSAGE_TYPE when typename() is absent, got:\n{rendered}"
571        );
572        assert!(
573            !rendered.contains("\tmessage type: unknown"),
574            "UE-5: must not render \"unknown\" when RUST_MESSAGE_TYPE is present, got:\n{rendered}"
575        );
576    }
577
578    /// UE-5 (negative): when both `typename()` and `RUST_MESSAGE_TYPE`
579    /// are absent, the formatter falls all the way through to the
580    /// literal `"unknown"`.
581    #[test]
582    fn test_ue5_unknown_when_typename_and_rust_message_type_both_absent() {
583        let sender = test_actor_id("ue_proc", "ue_sender");
584        let dest = test_port_id("ue_dest_proc", "ue_dest", 42);
585        let envelope =
586            MessageEnvelope::new(sender, dest, wirevalue::Any::new_broken(), Flattrs::new());
587        assert!(
588            envelope.data().typename().is_none(),
589            "test fixture invariant: broken Any must have no typename()"
590        );
591
592        let rendered = format!(
593            "{}",
594            UndeliverableMessageError::DeliveryFailure { envelope }
595        );
596
597        assert!(
598            rendered.contains("\tmessage type: unknown\n"),
599            "UE-5: with no typename and no RUST_MESSAGE_TYPE, must render \"unknown\", got:\n{rendered}"
600        );
601    }
602}