hyperactor/mailbox/
undeliverable.rs1use std::sync::OnceLock;
10
11use serde::Deserialize;
12use serde::Serialize;
13use thiserror::Error;
14
15use crate::ActorHandle;
16use crate::Instance;
17use crate::Message;
19use crate::Proc;
20use crate::mailbox::DeliveryError;
21use crate::mailbox::MailboxSender;
22use crate::mailbox::MessageEnvelope;
23use crate::mailbox::PortHandle;
24use crate::mailbox::PortReceiver;
25use crate::mailbox::UndeliverableMailboxSender;
26
27#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
30pub struct Undeliverable<M: Message>(pub M);
31
32impl<M: Message> Undeliverable<M> {
33 pub fn into_inner(self) -> M {
35 self.0
36 }
37}
38
39pub(crate) fn new_undeliverable_port() -> (
41 PortHandle<Undeliverable<MessageEnvelope>>,
42 PortReceiver<Undeliverable<MessageEnvelope>>,
43) {
44 let proc = Proc::local();
45 crate::mailbox::Mailbox::new_detached(proc.proc_id().actor_id("undeliverable", 0))
46 .open_port::<Undeliverable<MessageEnvelope>>()
47}
48
49static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
53 OnceLock::new();
54pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
59 let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
60 let (return_handle, mut rx) = new_undeliverable_port();
61 let (h, _) = new_undeliverable_port();
64 crate::init::get_runtime().spawn(async move {
65 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
66 envelope.set_error(DeliveryError::BrokenLink(
67 "message returned to undeliverable port".to_string(),
68 ));
69 super::UndeliverableMailboxSender.post(envelope, h.clone());
70 }
71 });
72 return_handle
73 });
74
75 return_handle.clone()
76}
77
78#[track_caller]
82pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
83 let caller = caller.to_owned();
84 let (return_handle, mut rx) = new_undeliverable_port();
85 tokio::task::spawn(async move {
86 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
87 envelope.set_error(DeliveryError::BrokenLink(
88 "message returned to undeliverable port".to_string(),
89 ));
90 tracing::error!("{caller} took back an undeliverable message: {}", envelope);
91 }
92 });
93 return_handle
94}
95
96pub(crate) fn return_undeliverable(
98 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
99 envelope: MessageEnvelope,
100) {
101 if envelope.return_undeliverable() {
102 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
104 let client = &CLIENT
105 .get_or_init(|| Proc::runtime().instance("global_return_client").unwrap())
106 .0;
107 let envelope_copy = envelope.clone();
108 if (return_handle.send(client, Undeliverable(envelope))).is_err() {
109 UndeliverableMailboxSender.post(envelope_copy, return_handle)
110 }
111 }
112}
113
114#[derive(Debug, Error)]
115pub enum UndeliverableMessageError {
117 DeliveryFailure {
119 envelope: MessageEnvelope,
121 },
122
123 ReturnFailure {
126 envelope: MessageEnvelope,
128 },
129}
130
131impl std::fmt::Display for UndeliverableMessageError {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 match self {
134 UndeliverableMessageError::DeliveryFailure { envelope } => {
135 writeln!(f, "undeliverable message error:")?;
136 writeln!(
137 f,
138 "\tdescription: delivery of message from sender to dest failed"
139 )?;
140 writeln!(f, "\tsender: {}", envelope.sender())?;
141 writeln!(f, "\tdest: {}", envelope.dest())?;
142 writeln!(f, "\theaders: {}", envelope.headers())?;
143 writeln!(f, "\tdata: {}", envelope.data())?;
144 writeln!(
145 f,
146 "\terror: {}",
147 envelope.error_msg().unwrap_or("<none>".to_string())
148 )
149 }
150 UndeliverableMessageError::ReturnFailure { envelope } => {
151 writeln!(f, "undeliverable message error:")?;
152 writeln!(
153 f,
154 "\tdescription: returning undeliverable message to original sender failed"
155 )?;
156 writeln!(f, "\toriginal sender: {}", envelope.sender())?;
157 writeln!(f, "\toriginal dest: {}", envelope.dest())?;
158 writeln!(f, "\theaders: {}", envelope.headers())?;
159 writeln!(f, "\tdata: {}", envelope.data())?;
160 writeln!(
161 f,
162 "\terror: {}",
163 envelope.error_msg().unwrap_or("<none>".to_string())
164 )
165 }
166 }
167 }
168}