hyperactor/mailbox/
undeliverable.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::sync::OnceLock;
10
11use serde::Deserialize;
12use serde::Serialize;
13use thiserror::Error;
14
15// for macros
16use crate::Message;
17use crate::actor::ActorStatus;
18use crate::id;
19use crate::mailbox::DeliveryError;
20use crate::mailbox::MailboxSender;
21use crate::mailbox::MessageEnvelope;
22use crate::mailbox::PortHandle;
23use crate::mailbox::PortReceiver;
24use crate::mailbox::UndeliverableMailboxSender;
25use crate::supervision::ActorSupervisionEvent;
26
27/// An undeliverable `M`-typed message (in practice `M` is
28/// [MessageEnvelope]).
29#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
30pub struct Undeliverable<M: Message>(pub M);
31
32impl<M: Message> Undeliverable<M> {
33    /// Return the inner M-typed message.
34    pub fn into_inner(self) -> M {
35        self.0
36    }
37}
38
39// Port handle and receiver for undeliverable messages.
40pub(crate) fn new_undeliverable_port() -> (
41    PortHandle<Undeliverable<MessageEnvelope>>,
42    PortReceiver<Undeliverable<MessageEnvelope>>,
43) {
44    crate::mailbox::Mailbox::new_detached(id!(world[0].proc))
45        .open_port::<Undeliverable<MessageEnvelope>>()
46}
47
48// An undeliverable message port handle to be shared amongst multiple
49// producers. Messages sent here are forwarded to the undeliverable
50// mailbox sender.
51static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
52    OnceLock::new();
53/// Accessor to the shared monitored undeliverable message port
54/// handle. Initialization spawns the undeliverable message port
55/// monitor that forwards incoming messages to the undeliverable
56/// mailbox sender.
57pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
58    let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
59        let (return_handle, mut rx) = new_undeliverable_port();
60        // Don't reuse `return_handle` for `h`: else it will never get
61        // dropped and the task will never return.
62        let (h, _) = new_undeliverable_port();
63        crate::init::get_runtime().spawn(async move {
64            while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
65                envelope.set_error(DeliveryError::BrokenLink(
66                    "message returned to undeliverable port".to_string(),
67                ));
68                super::UndeliverableMailboxSender.post(envelope, /*unused */ h.clone());
69            }
70        });
71        return_handle
72    });
73
74    return_handle.clone()
75}
76
77/// Now that monitored return handles are rare, it's becoming helpful
78/// to get insights into where they are getting used (so that they can
79/// be eliminated and replaced with something better).
80#[track_caller]
81pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
82    let caller = caller.to_owned();
83    let (return_handle, mut rx) = new_undeliverable_port();
84    tokio::task::spawn(async move {
85        while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
86            envelope.set_error(DeliveryError::BrokenLink(
87                "message returned to undeliverable port".to_string(),
88            ));
89            tracing::error!("{caller} took back an undeliverable message: {}", envelope);
90        }
91    });
92    return_handle
93}
94
95/// Returns a message envelope to its original sender.
96pub(crate) fn return_undeliverable(
97    return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
98    envelope: MessageEnvelope,
99) {
100    if envelope.return_undeliverable() {
101        let envelope_copy = envelope.clone();
102        if (return_handle.send(Undeliverable(envelope))).is_err() {
103            UndeliverableMailboxSender.post(envelope_copy, /*unused*/ return_handle)
104        }
105    }
106}
107
108#[derive(Debug, Error)]
109/// Errors that occur during message delivery and return.
110pub enum UndeliverableMessageError {
111    /// Delivery of a message to its destination failed.
112    #[error(
113        "a message from {} to {} was undeliverable and returned: {:?}: {envelope}",
114        .envelope.sender(),
115        .envelope.dest(),
116        .envelope.error_msg()
117    )]
118    DeliveryFailure {
119        /// The undelivered message.
120        envelope: MessageEnvelope,
121    },
122
123    /// Delivery of an undeliverable message back to its sender
124    /// failed.
125    #[error(
126        "returning an undeliverable message to sender {} failed: {:?}: {envelope}",
127        .envelope.sender(),
128        .envelope.error_msg()
129    )]
130    ReturnFailure {
131        /// The undelivered message.
132        envelope: MessageEnvelope,
133    },
134}
135
136/// Drain undeliverables and convert them into
137/// `ActorSupervisionEvent`, using a caller-provided resolver to
138/// obtain the (possibly late) sink. If the resolver returns `None`,
139/// we **log and drop** the undeliverable.
140pub fn supervise_undeliverable_messages_with<R, F>(
141    mut rx: PortReceiver<Undeliverable<MessageEnvelope>>,
142    mut resolve_sink: R,
143    on_undeliverable: F,
144) where
145    R: FnMut() -> Option<PortHandle<ActorSupervisionEvent>> + Send + 'static,
146    F: Fn(&MessageEnvelope) + Send + Sync + 'static,
147{
148    crate::init::get_runtime().spawn(async move {
149        while let Ok(Undeliverable(mut env)) = rx.recv().await {
150            // Let caller log/trace before we mutate.
151            on_undeliverable(&env);
152
153            // `resolve_sink` provides the current supervision sink,
154            // which may appear later (e.g., after a ProcMesh finishes
155            // allocation). We call it on each message to ensure we
156            // always target the latest sink.
157            match resolve_sink() {
158                Some(sink) => {
159                    env.set_error(DeliveryError::BrokenLink(
160                        "message returned to supervised undeliverable port".to_string(),
161                    ));
162                    let actor_id = env.dest().actor_id().clone();
163                    let headers = env.headers().clone();
164
165                    if let Err(e) = sink.send(ActorSupervisionEvent::new(
166                        actor_id,
167                        None,
168                        ActorStatus::generic_failure(format!("message not delivered: {}", env)),
169                        Some(headers),
170                    )) {
171                        tracing::warn!(
172                            %e,
173                            actor=%env.dest().actor_id(),
174                            headers=?env.headers(),
175                            "failed to forward supervision event; logging undeliverable"
176                        );
177                        UndeliverableMailboxSender.post(env, monitored_return_handle());
178                    }
179                }
180                None => {
181                    tracing::warn!(
182                        actor=%env.dest().actor_id(),
183                        headers=?env.headers(),
184                        "no supervision sink yet; logging undeliverable"
185                    );
186                    UndeliverableMailboxSender.post(env, monitored_return_handle());
187                }
188            }
189        }
190    });
191}
192
193/// Spawns a task that listens for undeliverable messages and posts a
194/// corresponding `ActorSupervisionEvent` to the given supervision
195/// port.
196pub fn supervise_undeliverable_messages<F>(
197    supervision_port: PortHandle<ActorSupervisionEvent>,
198    rx: PortReceiver<Undeliverable<MessageEnvelope>>,
199    on_deliverable: F,
200) where
201    F: Fn(&MessageEnvelope) + Send + Sync + 'static,
202{
203    supervise_undeliverable_messages_with(
204        rx,
205        move || Some(supervision_port.clone()),
206        on_deliverable,
207    );
208}