hyperactor/mailbox/
undeliverable.rs1use std::sync::OnceLock;
10
11use serde::Deserialize;
12use serde::Serialize;
13use thiserror::Error;
14
15use crate as hyperactor; use crate::Message;
17use crate::Named;
18use crate::actor::ActorStatus;
19use crate::id;
20use crate::mailbox::DeliveryError;
21use crate::mailbox::MailboxSender;
22use crate::mailbox::MessageEnvelope;
23use crate::mailbox::PortHandle;
24use crate::mailbox::PortReceiver;
25use crate::mailbox::UndeliverableMailboxSender;
26use crate::supervision::ActorSupervisionEvent;
27
28#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
31pub struct Undeliverable<M: Message>(pub M);
32
33impl<M: Message> Undeliverable<M> {
34 pub fn into_inner(self) -> M {
36 self.0
37 }
38}
39
40pub(crate) fn new_undeliverable_port() -> (
42 PortHandle<Undeliverable<MessageEnvelope>>,
43 PortReceiver<Undeliverable<MessageEnvelope>>,
44) {
45 crate::mailbox::Mailbox::new_detached(id!(world[0].proc))
46 .open_port::<Undeliverable<MessageEnvelope>>()
47}
48
49static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
53 OnceLock::new();
54pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
59 let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
60 let (return_handle, mut rx) = new_undeliverable_port();
61 let (h, _) = new_undeliverable_port();
64 crate::init::get_runtime().spawn(async move {
65 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
66 envelope.set_error(DeliveryError::BrokenLink(
67 "message returned to undeliverable port".to_string(),
68 ));
69 super::UndeliverableMailboxSender.post(envelope, h.clone());
70 }
71 });
72 return_handle
73 });
74
75 return_handle.clone()
76}
77
78#[track_caller]
82pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
83 let caller = caller.to_owned();
84 let (return_handle, mut rx) = new_undeliverable_port();
85 tokio::task::spawn(async move {
86 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
87 envelope.set_error(DeliveryError::BrokenLink(
88 "message returned to undeliverable port".to_string(),
89 ));
90 tracing::error!("{caller} took back an undeliverable message: {}", envelope);
91 }
92 });
93 return_handle
94}
95
96pub(crate) fn return_undeliverable(
98 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
99 envelope: MessageEnvelope,
100) {
101 if envelope.return_undeliverable() {
102 let envelope_copy = envelope.clone();
103 if (return_handle.send(Undeliverable(envelope))).is_err() {
104 UndeliverableMailboxSender.post(envelope_copy, return_handle)
105 }
106 }
107}
108
109#[derive(Debug, Error)]
110pub enum UndeliverableMessageError {
112 #[error(
114 "a message from {} to {} was undeliverable and returned: {:?}: {envelope}",
115 .envelope.sender(),
116 .envelope.dest(),
117 .envelope.error_msg()
118 )]
119 DeliveryFailure {
120 envelope: MessageEnvelope,
122 },
123
124 #[error(
127 "returning an undeliverable message to sender {} failed: {:?}: {envelope}",
128 .envelope.sender(),
129 .envelope.error_msg()
130 )]
131 ReturnFailure {
132 envelope: MessageEnvelope,
134 },
135}
136
137pub fn supervise_undeliverable_messages_with<R, F>(
142 mut rx: PortReceiver<Undeliverable<MessageEnvelope>>,
143 mut resolve_sink: R,
144 on_undeliverable: F,
145) where
146 R: FnMut() -> Option<PortHandle<ActorSupervisionEvent>> + Send + 'static,
147 F: Fn(&MessageEnvelope) + Send + Sync + 'static,
148{
149 crate::init::get_runtime().spawn(async move {
150 while let Ok(Undeliverable(mut env)) = rx.recv().await {
151 on_undeliverable(&env);
153
154 match resolve_sink() {
159 Some(sink) => {
160 env.set_error(DeliveryError::BrokenLink(
161 "message returned to supervised undeliverable port".to_string(),
162 ));
163 let actor_id = env.dest().actor_id().clone();
164 let headers = env.headers().clone();
165
166 if let Err(e) = sink.send(ActorSupervisionEvent::new(
167 actor_id,
168 None,
169 ActorStatus::generic_failure(format!("message not delivered: {}", env)),
170 Some(headers),
171 )) {
172 tracing::warn!(
173 %e,
174 actor=%env.dest().actor_id(),
175 headers=?env.headers(),
176 "failed to forward supervision event; logging undeliverable"
177 );
178 UndeliverableMailboxSender.post(env, monitored_return_handle());
179 }
180 }
181 None => {
182 tracing::warn!(
183 actor=%env.dest().actor_id(),
184 headers=?env.headers(),
185 "no supervision sink yet; logging undeliverable"
186 );
187 UndeliverableMailboxSender.post(env, monitored_return_handle());
188 }
189 }
190 }
191 });
192}
193
194pub fn supervise_undeliverable_messages<F>(
198 supervision_port: PortHandle<ActorSupervisionEvent>,
199 rx: PortReceiver<Undeliverable<MessageEnvelope>>,
200 on_deliverable: F,
201) where
202 F: Fn(&MessageEnvelope) + Send + Sync + 'static,
203{
204 supervise_undeliverable_messages_with(
205 rx,
206 move || Some(supervision_port.clone()),
207 on_deliverable,
208 );
209}