hyperactor/mailbox/
undeliverable.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::sync::OnceLock;
10
11use serde::Deserialize;
12use serde::Serialize;
13use thiserror::Error;
14
15use crate as hyperactor; // for macros
16use crate::ActorId;
17use crate::Message;
18use crate::Named;
19use crate::PortId;
20use crate::actor::ActorStatus;
21use crate::id;
22use crate::mailbox::DeliveryError;
23use crate::mailbox::MailboxSender;
24use crate::mailbox::MessageEnvelope;
25use crate::mailbox::PortHandle;
26use crate::mailbox::PortReceiver;
27use crate::mailbox::UndeliverableMailboxSender;
28use crate::supervision::ActorSupervisionEvent;
29
30/// An undeliverable `M`-typed message (in practice `M` is
31/// [MessageEnvelope]).
32#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
33pub struct Undeliverable<M: Message>(pub M);
34
35// Port handle and receiver for undeliverable messages.
36pub(crate) fn new_undeliverable_port() -> (
37    PortHandle<Undeliverable<MessageEnvelope>>,
38    PortReceiver<Undeliverable<MessageEnvelope>>,
39) {
40    crate::mailbox::Mailbox::new_detached(id!(world[0].proc))
41        .open_port::<Undeliverable<MessageEnvelope>>()
42}
43
44// An undeliverable message port handle to be shared amongst multiple
45// producers. Messages sent here are forwarded to the undeliverable
46// mailbox sender.
47static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
48    OnceLock::new();
49/// Accessor to the shared monitored undeliverable message port
50/// handle. Initialization spawns the undeliverable message port
51/// monitor that forwards incoming messages to the undeliverable
52/// mailbox sender.
53pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
54    let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
55        let (return_handle, mut rx) = new_undeliverable_port();
56        // Don't reuse `return_handle` for `h`: else it will never get
57        // dropped and the task will never return.
58        let (h, _) = new_undeliverable_port();
59        crate::init::get_runtime().spawn(async move {
60            while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
61                envelope.set_error(DeliveryError::BrokenLink(
62                    "message returned to undeliverable port".to_string(),
63                ));
64                super::UndeliverableMailboxSender.post(envelope, /*unused */ h.clone());
65            }
66        });
67        return_handle
68    });
69
70    return_handle.clone()
71}
72
73/// Now that monitored return handles are rare, it's becoming helpful
74/// to get insights into where they are getting used (so that they can
75/// be eliminated and replaced with something better).
76#[track_caller]
77pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
78    let caller = caller.to_owned();
79    let (return_handle, mut rx) = new_undeliverable_port();
80    tokio::task::spawn(async move {
81        while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
82            envelope.set_error(DeliveryError::BrokenLink(
83                "message returned to undeliverable port".to_string(),
84            ));
85            tracing::error!("{caller} took back an undeliverable message: {}", envelope);
86        }
87    });
88    return_handle
89}
90
91/// Returns a message envelope to its original sender.
92pub(crate) fn return_undeliverable(
93    return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
94    envelope: MessageEnvelope,
95) {
96    let envelope_copy = envelope.clone();
97    if (return_handle.send(Undeliverable(envelope))).is_err() {
98        UndeliverableMailboxSender.post(envelope_copy, /*unsued*/ return_handle)
99    }
100}
101
102#[derive(Debug, Error)]
103/// Errors that occur during message delivery and return.
104pub enum UndeliverableMessageError {
105    /// Delivery of a message to its destination failed.
106    #[error("a message from {from} to {to} was undeliverable and returned: {error:?}")]
107    DeliveryFailure {
108        /// The sender of the message.
109        from: ActorId,
110        /// The destination of the message.
111        to: PortId,
112        /// Details of why the message couldn't be delivered.
113        error: Option<String>,
114    },
115
116    /// Delivery of an undeliverable message back to its sender
117    /// failed.
118    #[error("returning an undeliverable message to sender {sender} failed: {error:?}")]
119    ReturnFailure {
120        /// The actor the message was to be returned to.
121        sender: ActorId,
122
123        /// Details of why the return failed.
124        error: Option<String>,
125    },
126}
127
128impl UndeliverableMessageError {
129    /// Constructs `DeliveryFailure` from a failed delivery attempt.
130    pub fn delivery_failure(envelope: &MessageEnvelope) -> Self {
131        UndeliverableMessageError::DeliveryFailure {
132            from: envelope.sender().clone(),
133            to: envelope.dest().clone(),
134            error: envelope.error_msg(),
135        }
136    }
137
138    /// Constructs a `ReturnFailure` from a failed return attempt.
139    pub fn return_failure(envelope: &MessageEnvelope) -> Self {
140        UndeliverableMessageError::ReturnFailure {
141            sender: envelope.sender().clone(),
142            error: envelope.error_msg(),
143        }
144    }
145}
146
147/// Drain undeliverables and convert them into
148/// `ActorSupervisionEvent`, using a caller-provided resolver to
149/// obtain the (possibly late) sink. If the resolver returns `None`,
150/// we **log and drop** the undeliverable.
151pub fn supervise_undeliverable_messages_with<R, F>(
152    mut rx: PortReceiver<Undeliverable<MessageEnvelope>>,
153    mut resolve_sink: R,
154    on_undeliverable: F,
155) where
156    R: FnMut() -> Option<PortHandle<ActorSupervisionEvent>> + Send + 'static,
157    F: Fn(&MessageEnvelope) + Send + Sync + 'static,
158{
159    crate::init::get_runtime().spawn(async move {
160        while let Ok(Undeliverable(mut env)) = rx.recv().await {
161            // Let caller log/trace before we mutate.
162            on_undeliverable(&env);
163
164            // `resolve_sink` provides the current supervision sink,
165            // which may appear later (e.g., after a ProcMesh finishes
166            // allocation). We call it on each message to ensure we
167            // always target the latest sink.
168            match resolve_sink() {
169                Some(sink) => {
170                    env.set_error(DeliveryError::BrokenLink(
171                        "message returned to supervised undeliverable port".to_string(),
172                    ));
173                    let actor_id = env.dest().actor_id().clone();
174                    let headers = env.headers().clone();
175
176                    if let Err(e) = sink.send(ActorSupervisionEvent::new(
177                        actor_id,
178                        ActorStatus::Failed(format!("message not delivered: {}", env)),
179                        Some(headers),
180                        None,
181                    )) {
182                        tracing::warn!(
183                            %e,
184                            actor=%env.dest().actor_id(),
185                            headers=?env.headers(),
186                            "failed to forward supervision event; logging undeliverable"
187                        );
188                        UndeliverableMailboxSender.post(env, monitored_return_handle());
189                    }
190                }
191                None => {
192                    tracing::warn!(
193                        actor=%env.dest().actor_id(),
194                        headers=?env.headers(),
195                        "no supervision sink yet; logging undeliverable"
196                    );
197                    UndeliverableMailboxSender.post(env, monitored_return_handle());
198                }
199            }
200        }
201    });
202}
203
204/// Spawns a task that listens for undeliverable messages and posts a
205/// corresponding `ActorSupervisionEvent` to the given supervision
206/// port.
207pub fn supervise_undeliverable_messages<F>(
208    supervision_port: PortHandle<ActorSupervisionEvent>,
209    rx: PortReceiver<Undeliverable<MessageEnvelope>>,
210    on_deliverable: F,
211) where
212    F: Fn(&MessageEnvelope) + Send + Sync + 'static,
213{
214    supervise_undeliverable_messages_with(
215        rx,
216        move || Some(supervision_port.clone()),
217        on_deliverable,
218    );
219}