hyperactor/mailbox/
undeliverable.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::sync::OnceLock;
10
11use serde::Deserialize;
12use serde::Serialize;
13use thiserror::Error;
14
15use crate as hyperactor; // for macros
16use crate::Message;
17use crate::Named;
18use crate::actor::ActorStatus;
19use crate::id;
20use crate::mailbox::DeliveryError;
21use crate::mailbox::MailboxSender;
22use crate::mailbox::MessageEnvelope;
23use crate::mailbox::PortHandle;
24use crate::mailbox::PortReceiver;
25use crate::mailbox::UndeliverableMailboxSender;
26use crate::supervision::ActorSupervisionEvent;
27
28/// An undeliverable `M`-typed message (in practice `M` is
29/// [MessageEnvelope]).
30#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
31pub struct Undeliverable<M: Message>(pub M);
32
33impl<M: Message> Undeliverable<M> {
34    /// Return the inner M-typed message.
35    pub fn into_inner(self) -> M {
36        self.0
37    }
38}
39
40// Port handle and receiver for undeliverable messages.
41pub(crate) fn new_undeliverable_port() -> (
42    PortHandle<Undeliverable<MessageEnvelope>>,
43    PortReceiver<Undeliverable<MessageEnvelope>>,
44) {
45    crate::mailbox::Mailbox::new_detached(id!(world[0].proc))
46        .open_port::<Undeliverable<MessageEnvelope>>()
47}
48
49// An undeliverable message port handle to be shared amongst multiple
50// producers. Messages sent here are forwarded to the undeliverable
51// mailbox sender.
52static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
53    OnceLock::new();
54/// Accessor to the shared monitored undeliverable message port
55/// handle. Initialization spawns the undeliverable message port
56/// monitor that forwards incoming messages to the undeliverable
57/// mailbox sender.
58pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
59    let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
60        let (return_handle, mut rx) = new_undeliverable_port();
61        // Don't reuse `return_handle` for `h`: else it will never get
62        // dropped and the task will never return.
63        let (h, _) = new_undeliverable_port();
64        crate::init::get_runtime().spawn(async move {
65            while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
66                envelope.set_error(DeliveryError::BrokenLink(
67                    "message returned to undeliverable port".to_string(),
68                ));
69                super::UndeliverableMailboxSender.post(envelope, /*unused */ h.clone());
70            }
71        });
72        return_handle
73    });
74
75    return_handle.clone()
76}
77
78/// Now that monitored return handles are rare, it's becoming helpful
79/// to get insights into where they are getting used (so that they can
80/// be eliminated and replaced with something better).
81#[track_caller]
82pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
83    let caller = caller.to_owned();
84    let (return_handle, mut rx) = new_undeliverable_port();
85    tokio::task::spawn(async move {
86        while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
87            envelope.set_error(DeliveryError::BrokenLink(
88                "message returned to undeliverable port".to_string(),
89            ));
90            tracing::error!("{caller} took back an undeliverable message: {}", envelope);
91        }
92    });
93    return_handle
94}
95
96/// Returns a message envelope to its original sender.
97pub(crate) fn return_undeliverable(
98    return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
99    envelope: MessageEnvelope,
100) {
101    if envelope.return_undeliverable() {
102        let envelope_copy = envelope.clone();
103        if (return_handle.send(Undeliverable(envelope))).is_err() {
104            UndeliverableMailboxSender.post(envelope_copy, /*unused*/ return_handle)
105        }
106    }
107}
108
109#[derive(Debug, Error)]
110/// Errors that occur during message delivery and return.
111pub enum UndeliverableMessageError {
112    /// Delivery of a message to its destination failed.
113    #[error(
114        "a message from {} to {} was undeliverable and returned: {:?}: {envelope}",
115        .envelope.sender(),
116        .envelope.dest(),
117        .envelope.error_msg()
118    )]
119    DeliveryFailure {
120        /// The undelivered message.
121        envelope: MessageEnvelope,
122    },
123
124    /// Delivery of an undeliverable message back to its sender
125    /// failed.
126    #[error(
127        "returning an undeliverable message to sender {} failed: {:?}: {envelope}",
128        .envelope.sender(),
129        .envelope.error_msg()
130    )]
131    ReturnFailure {
132        /// The undelivered message.
133        envelope: MessageEnvelope,
134    },
135}
136
137/// Drain undeliverables and convert them into
138/// `ActorSupervisionEvent`, using a caller-provided resolver to
139/// obtain the (possibly late) sink. If the resolver returns `None`,
140/// we **log and drop** the undeliverable.
141pub fn supervise_undeliverable_messages_with<R, F>(
142    mut rx: PortReceiver<Undeliverable<MessageEnvelope>>,
143    mut resolve_sink: R,
144    on_undeliverable: F,
145) where
146    R: FnMut() -> Option<PortHandle<ActorSupervisionEvent>> + Send + 'static,
147    F: Fn(&MessageEnvelope) + Send + Sync + 'static,
148{
149    crate::init::get_runtime().spawn(async move {
150        while let Ok(Undeliverable(mut env)) = rx.recv().await {
151            // Let caller log/trace before we mutate.
152            on_undeliverable(&env);
153
154            // `resolve_sink` provides the current supervision sink,
155            // which may appear later (e.g., after a ProcMesh finishes
156            // allocation). We call it on each message to ensure we
157            // always target the latest sink.
158            match resolve_sink() {
159                Some(sink) => {
160                    env.set_error(DeliveryError::BrokenLink(
161                        "message returned to supervised undeliverable port".to_string(),
162                    ));
163                    let actor_id = env.dest().actor_id().clone();
164                    let headers = env.headers().clone();
165
166                    if let Err(e) = sink.send(ActorSupervisionEvent::new(
167                        actor_id,
168                        None,
169                        ActorStatus::generic_failure(format!("message not delivered: {}", env)),
170                        Some(headers),
171                    )) {
172                        tracing::warn!(
173                            %e,
174                            actor=%env.dest().actor_id(),
175                            headers=?env.headers(),
176                            "failed to forward supervision event; logging undeliverable"
177                        );
178                        UndeliverableMailboxSender.post(env, monitored_return_handle());
179                    }
180                }
181                None => {
182                    tracing::warn!(
183                        actor=%env.dest().actor_id(),
184                        headers=?env.headers(),
185                        "no supervision sink yet; logging undeliverable"
186                    );
187                    UndeliverableMailboxSender.post(env, monitored_return_handle());
188                }
189            }
190        }
191    });
192}
193
194/// Spawns a task that listens for undeliverable messages and posts a
195/// corresponding `ActorSupervisionEvent` to the given supervision
196/// port.
197pub fn supervise_undeliverable_messages<F>(
198    supervision_port: PortHandle<ActorSupervisionEvent>,
199    rx: PortReceiver<Undeliverable<MessageEnvelope>>,
200    on_deliverable: F,
201) where
202    F: Fn(&MessageEnvelope) + Send + Sync + 'static,
203{
204    supervise_undeliverable_messages_with(
205        rx,
206        move || Some(supervision_port.clone()),
207        on_deliverable,
208    );
209}