hyperactor/mailbox/
undeliverable.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::sync::OnceLock;
10
11use serde::Deserialize;
12use serde::Serialize;
13use thiserror::Error;
14
15use crate::ActorHandle;
16use crate::Instance;
17// for macros
18use crate::Message;
19use crate::Proc;
20use crate::mailbox::DeliveryError;
21use crate::mailbox::MailboxSender;
22use crate::mailbox::MessageEnvelope;
23use crate::mailbox::PortHandle;
24use crate::mailbox::PortReceiver;
25use crate::mailbox::UndeliverableMailboxSender;
26
27/// An undeliverable `M`-typed message (in practice `M` is
28/// [MessageEnvelope]).
29#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
30pub struct Undeliverable<M: Message>(pub M);
31
32impl<M: Message> Undeliverable<M> {
33    /// Return the inner M-typed message.
34    pub fn into_inner(self) -> M {
35        self.0
36    }
37}
38
39// Port handle and receiver for undeliverable messages.
40pub(crate) fn new_undeliverable_port() -> (
41    PortHandle<Undeliverable<MessageEnvelope>>,
42    PortReceiver<Undeliverable<MessageEnvelope>>,
43) {
44    let proc = Proc::local();
45    crate::mailbox::Mailbox::new_detached(proc.proc_id().actor_id("undeliverable", 0))
46        .open_port::<Undeliverable<MessageEnvelope>>()
47}
48
49// An undeliverable message port handle to be shared amongst multiple
50// producers. Messages sent here are forwarded to the undeliverable
51// mailbox sender.
52static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
53    OnceLock::new();
54/// Accessor to the shared monitored undeliverable message port
55/// handle. Initialization spawns the undeliverable message port
56/// monitor that forwards incoming messages to the undeliverable
57/// mailbox sender.
58pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
59    let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
60        let (return_handle, mut rx) = new_undeliverable_port();
61        // Don't reuse `return_handle` for `h`: else it will never get
62        // dropped and the task will never return.
63        let (h, _) = new_undeliverable_port();
64        crate::init::get_runtime().spawn(async move {
65            while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
66                envelope.set_error(DeliveryError::BrokenLink(
67                    "message returned to undeliverable port".to_string(),
68                ));
69                super::UndeliverableMailboxSender.post(envelope, /*unused */ h.clone());
70            }
71        });
72        return_handle
73    });
74
75    return_handle.clone()
76}
77
78/// Now that monitored return handles are rare, it's becoming helpful
79/// to get insights into where they are getting used (so that they can
80/// be eliminated and replaced with something better).
81#[track_caller]
82pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
83    let caller = caller.to_owned();
84    let (return_handle, mut rx) = new_undeliverable_port();
85    tokio::task::spawn(async move {
86        while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
87            envelope.set_error(DeliveryError::BrokenLink(
88                "message returned to undeliverable port".to_string(),
89            ));
90            tracing::error!("{caller} took back an undeliverable message: {}", envelope);
91        }
92    });
93    return_handle
94}
95
96/// Returns a message envelope to its original sender.
97pub(crate) fn return_undeliverable(
98    return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
99    envelope: MessageEnvelope,
100) {
101    if envelope.return_undeliverable() {
102        // A global client for returning undeliverable messages.
103        static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
104        let client = &CLIENT
105            .get_or_init(|| Proc::runtime().instance("global_return_client").unwrap())
106            .0;
107        let envelope_copy = envelope.clone();
108        if (return_handle.send(client, Undeliverable(envelope))).is_err() {
109            UndeliverableMailboxSender.post(envelope_copy, /*unused*/ return_handle)
110        }
111    }
112}
113
114#[derive(Debug, Error)]
115/// Errors that occur during message delivery and return.
116pub enum UndeliverableMessageError {
117    /// Delivery of a message to its destination failed.
118    DeliveryFailure {
119        /// The undelivered message.
120        envelope: MessageEnvelope,
121    },
122
123    /// Delivery of an undeliverable message back to its sender
124    /// failed.
125    ReturnFailure {
126        /// The undelivered message.
127        envelope: MessageEnvelope,
128    },
129}
130
131impl std::fmt::Display for UndeliverableMessageError {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        match self {
134            UndeliverableMessageError::DeliveryFailure { envelope } => {
135                writeln!(f, "undeliverable message error:")?;
136                writeln!(
137                    f,
138                    "\tdescription: delivery of message from sender to dest failed"
139                )?;
140                writeln!(f, "\tsender: {}", envelope.sender())?;
141                writeln!(f, "\tdest: {}", envelope.dest())?;
142                writeln!(f, "\theaders: {}", envelope.headers())?;
143                writeln!(f, "\tdata: {}", envelope.data())?;
144                writeln!(
145                    f,
146                    "\terror: {}",
147                    envelope.error_msg().unwrap_or("<none>".to_string())
148                )
149            }
150            UndeliverableMessageError::ReturnFailure { envelope } => {
151                writeln!(f, "undeliverable message error:")?;
152                writeln!(
153                    f,
154                    "\tdescription: returning undeliverable message to original sender failed"
155                )?;
156                writeln!(f, "\toriginal sender: {}", envelope.sender())?;
157                writeln!(f, "\toriginal dest: {}", envelope.dest())?;
158                writeln!(f, "\theaders: {}", envelope.headers())?;
159                writeln!(f, "\tdata: {}", envelope.data())?;
160                writeln!(
161                    f,
162                    "\terror: {}",
163                    envelope.error_msg().unwrap_or("<none>".to_string())
164                )
165            }
166        }
167    }
168}