hyperactor/mailbox/
undeliverable.rs1use std::sync::OnceLock;
10
11use serde::Deserialize;
12use serde::Serialize;
13use thiserror::Error;
14
15use crate as hyperactor; use crate::ActorId;
17use crate::Message;
18use crate::Named;
19use crate::PortId;
20use crate::actor::ActorStatus;
21use crate::id;
22use crate::mailbox::DeliveryError;
23use crate::mailbox::MailboxSender;
24use crate::mailbox::MessageEnvelope;
25use crate::mailbox::PortHandle;
26use crate::mailbox::PortReceiver;
27use crate::mailbox::UndeliverableMailboxSender;
28use crate::supervision::ActorSupervisionEvent;
29
30#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
33pub struct Undeliverable<M: Message>(pub M);
34
35pub(crate) fn new_undeliverable_port() -> (
37 PortHandle<Undeliverable<MessageEnvelope>>,
38 PortReceiver<Undeliverable<MessageEnvelope>>,
39) {
40 crate::mailbox::Mailbox::new_detached(id!(world[0].proc))
41 .open_port::<Undeliverable<MessageEnvelope>>()
42}
43
44static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
48 OnceLock::new();
49pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
54 let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
55 let (return_handle, mut rx) = new_undeliverable_port();
56 let (h, _) = new_undeliverable_port();
59 crate::init::get_runtime().spawn(async move {
60 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
61 envelope.try_set_error(DeliveryError::BrokenLink(
62 "message returned to undeliverable port".to_string(),
63 ));
64 super::UndeliverableMailboxSender.post(envelope, h.clone());
65 }
66 });
67 return_handle
68 });
69
70 return_handle.clone()
71}
72
73#[track_caller]
77pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
78 let caller = caller.to_owned();
79 let (return_handle, mut rx) = new_undeliverable_port();
80 tokio::task::spawn(async move {
81 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
82 envelope.try_set_error(DeliveryError::BrokenLink(
83 "message returned to undeliverable port".to_string(),
84 ));
85 tracing::error!("{caller} took back an undeliverable message: {}", envelope);
86 }
87 });
88 return_handle
89}
90
91pub(crate) fn return_undeliverable(
93 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
94 envelope: MessageEnvelope,
95) {
96 let envelope_copy = envelope.clone();
97 if (return_handle.send(Undeliverable(envelope))).is_err() {
98 UndeliverableMailboxSender.post(envelope_copy, return_handle)
99 }
100}
101
102#[derive(Debug, Error)]
103pub enum UndeliverableMessageError {
105 #[error("a message from {from} to {to} was undeliverable and returned: {error:?}")]
107 DeliveryFailure {
108 from: ActorId,
110 to: PortId,
112 error: Option<String>,
114 },
115
116 #[error("returning an undeliverable message to sender {sender} failed: {error:?}")]
119 ReturnFailure {
120 sender: ActorId,
122
123 error: Option<String>,
125 },
126}
127
128impl UndeliverableMessageError {
129 pub fn delivery_failure(envelope: &MessageEnvelope) -> Self {
131 UndeliverableMessageError::DeliveryFailure {
132 from: envelope.sender().clone(),
133 to: envelope.dest().clone(),
134 error: envelope.error().map(|e| format!("{:?}", e)),
135 }
136 }
137
138 pub fn return_failure(envelope: &MessageEnvelope) -> Self {
140 UndeliverableMessageError::ReturnFailure {
141 sender: envelope.sender().clone(),
142 error: envelope.error().map(|e| format!("{:?}", e)),
143 }
144 }
145}
146
147pub fn supervise_undeliverable_messages(
151 supervision_port: PortHandle<ActorSupervisionEvent>,
152 mut rx: PortReceiver<Undeliverable<MessageEnvelope>>,
153) {
154 tokio::spawn(async move {
155 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
156 envelope.try_set_error(DeliveryError::BrokenLink(
157 "message returned to undeliverable port".to_string(),
158 ));
159 if supervision_port
160 .send(ActorSupervisionEvent {
161 actor_id: envelope.dest().actor_id().clone(),
162 actor_status: ActorStatus::Failed(format!(
163 "message not delivered: {}",
164 envelope
165 )),
166 message_headers: Some(envelope.headers().clone()),
167 caused_by: None,
168 })
169 .is_err()
170 {
171 UndeliverableMailboxSender
172 .post(envelope.clone(), monitored_return_handle())
173 }
174 }
175 });
176}