hyperactor/mailbox/
undeliverable.rs1use std::sync::OnceLock;
10
11use serde::Deserialize;
12use serde::Serialize;
13use thiserror::Error;
14
15use crate as hyperactor; use crate::ActorId;
17use crate::Message;
18use crate::Named;
19use crate::PortId;
20use crate::actor::ActorStatus;
21use crate::id;
22use crate::mailbox::DeliveryError;
23use crate::mailbox::MailboxSender;
24use crate::mailbox::MessageEnvelope;
25use crate::mailbox::PortHandle;
26use crate::mailbox::PortReceiver;
27use crate::mailbox::UndeliverableMailboxSender;
28use crate::supervision::ActorSupervisionEvent;
29
30#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
33pub struct Undeliverable<M: Message>(pub M);
34
35pub(crate) fn new_undeliverable_port() -> (
37 PortHandle<Undeliverable<MessageEnvelope>>,
38 PortReceiver<Undeliverable<MessageEnvelope>>,
39) {
40 crate::mailbox::Mailbox::new_detached(id!(world[0].proc))
41 .open_port::<Undeliverable<MessageEnvelope>>()
42}
43
44static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
48 OnceLock::new();
49pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
54 let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
55 let (return_handle, mut rx) = new_undeliverable_port();
56 let (h, _) = new_undeliverable_port();
59 crate::init::get_runtime().spawn(async move {
60 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
61 envelope.set_error(DeliveryError::BrokenLink(
62 "message returned to undeliverable port".to_string(),
63 ));
64 super::UndeliverableMailboxSender.post(envelope, h.clone());
65 }
66 });
67 return_handle
68 });
69
70 return_handle.clone()
71}
72
73#[track_caller]
77pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
78 let caller = caller.to_owned();
79 let (return_handle, mut rx) = new_undeliverable_port();
80 tokio::task::spawn(async move {
81 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
82 envelope.set_error(DeliveryError::BrokenLink(
83 "message returned to undeliverable port".to_string(),
84 ));
85 tracing::error!("{caller} took back an undeliverable message: {}", envelope);
86 }
87 });
88 return_handle
89}
90
91pub(crate) fn return_undeliverable(
93 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
94 envelope: MessageEnvelope,
95) {
96 let envelope_copy = envelope.clone();
97 if (return_handle.send(Undeliverable(envelope))).is_err() {
98 UndeliverableMailboxSender.post(envelope_copy, return_handle)
99 }
100}
101
102#[derive(Debug, Error)]
103pub enum UndeliverableMessageError {
105 #[error("a message from {from} to {to} was undeliverable and returned: {error:?}")]
107 DeliveryFailure {
108 from: ActorId,
110 to: PortId,
112 error: Option<String>,
114 },
115
116 #[error("returning an undeliverable message to sender {sender} failed: {error:?}")]
119 ReturnFailure {
120 sender: ActorId,
122
123 error: Option<String>,
125 },
126}
127
128impl UndeliverableMessageError {
129 pub fn delivery_failure(envelope: &MessageEnvelope) -> Self {
131 UndeliverableMessageError::DeliveryFailure {
132 from: envelope.sender().clone(),
133 to: envelope.dest().clone(),
134 error: envelope.error_msg(),
135 }
136 }
137
138 pub fn return_failure(envelope: &MessageEnvelope) -> Self {
140 UndeliverableMessageError::ReturnFailure {
141 sender: envelope.sender().clone(),
142 error: envelope.error_msg(),
143 }
144 }
145}
146
147pub fn supervise_undeliverable_messages_with<R, F>(
152 mut rx: PortReceiver<Undeliverable<MessageEnvelope>>,
153 mut resolve_sink: R,
154 on_undeliverable: F,
155) where
156 R: FnMut() -> Option<PortHandle<ActorSupervisionEvent>> + Send + 'static,
157 F: Fn(&MessageEnvelope) + Send + Sync + 'static,
158{
159 crate::init::get_runtime().spawn(async move {
160 while let Ok(Undeliverable(mut env)) = rx.recv().await {
161 on_undeliverable(&env);
163
164 match resolve_sink() {
169 Some(sink) => {
170 env.set_error(DeliveryError::BrokenLink(
171 "message returned to supervised undeliverable port".to_string(),
172 ));
173 let actor_id = env.dest().actor_id().clone();
174 let headers = env.headers().clone();
175
176 if let Err(e) = sink.send(ActorSupervisionEvent::new(
177 actor_id,
178 ActorStatus::Failed(format!("message not delivered: {}", env)),
179 Some(headers),
180 None,
181 )) {
182 tracing::warn!(
183 %e,
184 actor=%env.dest().actor_id(),
185 headers=?env.headers(),
186 "failed to forward supervision event; logging undeliverable"
187 );
188 UndeliverableMailboxSender.post(env, monitored_return_handle());
189 }
190 }
191 None => {
192 tracing::warn!(
193 actor=%env.dest().actor_id(),
194 headers=?env.headers(),
195 "no supervision sink yet; logging undeliverable"
196 );
197 UndeliverableMailboxSender.post(env, monitored_return_handle());
198 }
199 }
200 }
201 });
202}
203
204pub fn supervise_undeliverable_messages<F>(
208 supervision_port: PortHandle<ActorSupervisionEvent>,
209 rx: PortReceiver<Undeliverable<MessageEnvelope>>,
210 on_deliverable: F,
211) where
212 F: Fn(&MessageEnvelope) + Send + Sync + 'static,
213{
214 supervise_undeliverable_messages_with(
215 rx,
216 move || Some(supervision_port.clone()),
217 on_deliverable,
218 );
219}