hyperactor/mailbox/
undeliverable.rs1use std::sync::OnceLock;
10
11use serde::Deserialize;
12use serde::Serialize;
13use thiserror::Error;
14
15use crate::Message;
17use crate::actor::ActorStatus;
18use crate::id;
19use crate::mailbox::DeliveryError;
20use crate::mailbox::MailboxSender;
21use crate::mailbox::MessageEnvelope;
22use crate::mailbox::PortHandle;
23use crate::mailbox::PortReceiver;
24use crate::mailbox::UndeliverableMailboxSender;
25use crate::supervision::ActorSupervisionEvent;
26
27#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
30pub struct Undeliverable<M: Message>(pub M);
31
32impl<M: Message> Undeliverable<M> {
33 pub fn into_inner(self) -> M {
35 self.0
36 }
37}
38
39pub(crate) fn new_undeliverable_port() -> (
41 PortHandle<Undeliverable<MessageEnvelope>>,
42 PortReceiver<Undeliverable<MessageEnvelope>>,
43) {
44 crate::mailbox::Mailbox::new_detached(id!(world[0].proc))
45 .open_port::<Undeliverable<MessageEnvelope>>()
46}
47
48static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
52 OnceLock::new();
53pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
58 let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
59 let (return_handle, mut rx) = new_undeliverable_port();
60 let (h, _) = new_undeliverable_port();
63 crate::init::get_runtime().spawn(async move {
64 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
65 envelope.set_error(DeliveryError::BrokenLink(
66 "message returned to undeliverable port".to_string(),
67 ));
68 super::UndeliverableMailboxSender.post(envelope, h.clone());
69 }
70 });
71 return_handle
72 });
73
74 return_handle.clone()
75}
76
77#[track_caller]
81pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
82 let caller = caller.to_owned();
83 let (return_handle, mut rx) = new_undeliverable_port();
84 tokio::task::spawn(async move {
85 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
86 envelope.set_error(DeliveryError::BrokenLink(
87 "message returned to undeliverable port".to_string(),
88 ));
89 tracing::error!("{caller} took back an undeliverable message: {}", envelope);
90 }
91 });
92 return_handle
93}
94
95pub(crate) fn return_undeliverable(
97 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
98 envelope: MessageEnvelope,
99) {
100 if envelope.return_undeliverable() {
101 let envelope_copy = envelope.clone();
102 if (return_handle.send(Undeliverable(envelope))).is_err() {
103 UndeliverableMailboxSender.post(envelope_copy, return_handle)
104 }
105 }
106}
107
108#[derive(Debug, Error)]
109pub enum UndeliverableMessageError {
111 #[error(
113 "a message from {} to {} was undeliverable and returned: {:?}: {envelope}",
114 .envelope.sender(),
115 .envelope.dest(),
116 .envelope.error_msg()
117 )]
118 DeliveryFailure {
119 envelope: MessageEnvelope,
121 },
122
123 #[error(
126 "returning an undeliverable message to sender {} failed: {:?}: {envelope}",
127 .envelope.sender(),
128 .envelope.error_msg()
129 )]
130 ReturnFailure {
131 envelope: MessageEnvelope,
133 },
134}
135
136pub fn supervise_undeliverable_messages_with<R, F>(
141 mut rx: PortReceiver<Undeliverable<MessageEnvelope>>,
142 mut resolve_sink: R,
143 on_undeliverable: F,
144) where
145 R: FnMut() -> Option<PortHandle<ActorSupervisionEvent>> + Send + 'static,
146 F: Fn(&MessageEnvelope) + Send + Sync + 'static,
147{
148 crate::init::get_runtime().spawn(async move {
149 while let Ok(Undeliverable(mut env)) = rx.recv().await {
150 on_undeliverable(&env);
152
153 match resolve_sink() {
158 Some(sink) => {
159 env.set_error(DeliveryError::BrokenLink(
160 "message returned to supervised undeliverable port".to_string(),
161 ));
162 let actor_id = env.dest().actor_id().clone();
163 let headers = env.headers().clone();
164
165 if let Err(e) = sink.send(ActorSupervisionEvent::new(
166 actor_id,
167 None,
168 ActorStatus::generic_failure(format!("message not delivered: {}", env)),
169 Some(headers),
170 )) {
171 tracing::warn!(
172 %e,
173 actor=%env.dest().actor_id(),
174 headers=?env.headers(),
175 "failed to forward supervision event; logging undeliverable"
176 );
177 UndeliverableMailboxSender.post(env, monitored_return_handle());
178 }
179 }
180 None => {
181 tracing::warn!(
182 actor=%env.dest().actor_id(),
183 headers=?env.headers(),
184 "no supervision sink yet; logging undeliverable"
185 );
186 UndeliverableMailboxSender.post(env, monitored_return_handle());
187 }
188 }
189 }
190 });
191}
192
193pub fn supervise_undeliverable_messages<F>(
197 supervision_port: PortHandle<ActorSupervisionEvent>,
198 rx: PortReceiver<Undeliverable<MessageEnvelope>>,
199 on_deliverable: F,
200) where
201 F: Fn(&MessageEnvelope) + Send + Sync + 'static,
202{
203 supervise_undeliverable_messages_with(
204 rx,
205 move || Some(supervision_port.clone()),
206 on_deliverable,
207 );
208}