1use std::sync::OnceLock;
50
51use enum_as_inner::EnumAsInner;
52use serde::Deserialize;
53use serde::Serialize;
54use thiserror::Error;
55
56use crate::ActorAddr;
57use crate::ActorHandle;
58use crate::EndpointLocation;
59use crate::Instance;
60use crate::Message;
62use crate::Proc;
63use crate::mailbox::DeliveryError;
64use crate::mailbox::MailboxSender;
65use crate::mailbox::MailboxSenderError;
66use crate::mailbox::MessageEnvelope;
67use crate::mailbox::PortHandle;
68use crate::mailbox::PortReceiver;
69use crate::mailbox::UndeliverableMailboxSender;
70use crate::mailbox::headers::OPERATION_ADVERB;
71use crate::mailbox::headers::OPERATION_ENDPOINT;
72use crate::mailbox::headers::RUST_MESSAGE_TYPE;
73
74#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, typeuri::Named)]
77pub struct LostMessage {
78 pub sender: ActorAddr,
80 pub dest: EndpointLocation,
82 pub message_type: Option<String>,
84 pub error: String,
86}
87
88impl LostMessage {
89 pub(crate) fn from_send_error<M: Message>(
91 sender: ActorAddr,
92 dest: EndpointLocation,
93 error: &MailboxSenderError,
94 ) -> Self {
95 Self {
96 sender,
97 dest,
98 message_type: Some(std::any::type_name::<M>().to_string()),
99 error: error.to_string(),
100 }
101 }
102}
103
104#[expect(
106 clippy::large_enum_variant,
107 reason = "returned messages stay inline so callers can recover the original payload without extra allocation"
108)]
109#[derive(
110 Debug,
111 EnumAsInner,
112 Serialize,
113 Deserialize,
114 Clone,
115 PartialEq,
116 typeuri::Named
117)]
118pub enum Undeliverable<M: Message> {
119 Message(M),
121 Lost(LostMessage),
123}
124
125impl<M: Message> Undeliverable<M> {
126 pub fn message(message: M) -> Self {
128 Self::Message(message)
129 }
130
131 pub fn lost(message: LostMessage) -> Self {
134 Self::Lost(message)
135 }
136}
137
138pub(crate) fn new_undeliverable_port() -> (
140 PortHandle<Undeliverable<MessageEnvelope>>,
141 PortReceiver<Undeliverable<MessageEnvelope>>,
142) {
143 let proc = Proc::isolated();
144 crate::mailbox::Mailbox::new(proc.proc_addr().actor_addr("undeliverable"))
145 .open_port::<Undeliverable<MessageEnvelope>>()
146}
147
148static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelope>>> =
152 OnceLock::new();
153pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
158 let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
159 let (return_handle, mut rx) = new_undeliverable_port();
160 let (h, _) = new_undeliverable_port();
163 crate::init::get_runtime().spawn(async move {
164 while let Ok(undeliverable) = rx.recv().await {
165 match undeliverable {
166 Undeliverable::Message(mut envelope) => {
167 envelope.set_error(DeliveryError::BrokenLink(
168 "message returned to undeliverable port".to_string(),
169 ));
170 super::UndeliverableMailboxSender
171 .post(envelope, h.clone());
172 }
173 Undeliverable::Lost(lost) => {
174 tracing::error!(
175 sender = %lost.sender,
176 dest = %lost.dest,
177 message_type = lost.message_type.as_deref().unwrap_or("unknown"),
178 error = %lost.error,
179 "lost message returned to undeliverable port"
180 );
181 }
182 }
183 }
184 });
185 return_handle
186 });
187
188 return_handle.clone()
189}
190
191#[track_caller]
195pub fn custom_monitored_return_handle(caller: &str) -> PortHandle<Undeliverable<MessageEnvelope>> {
196 let caller = caller.to_owned();
197 let (return_handle, mut rx) = new_undeliverable_port();
198 tokio::task::spawn(async move {
199 while let Ok(undeliverable) = rx.recv().await {
200 match undeliverable {
201 Undeliverable::Message(mut envelope) => {
202 envelope.set_error(DeliveryError::BrokenLink(
203 "message returned to undeliverable port".to_string(),
204 ));
205 tracing::error!("{caller} took back an undeliverable message: {}", envelope);
206 }
207 Undeliverable::Lost(lost) => {
208 tracing::error!(
209 sender = %lost.sender,
210 dest = %lost.dest,
211 message_type = lost.message_type.as_deref().unwrap_or("unknown"),
212 error = %lost.error,
213 "{caller} took back a lost message"
214 );
215 }
216 }
217 }
218 });
219 return_handle
220}
221
222pub(crate) fn return_undeliverable(
224 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
225 envelope: MessageEnvelope,
226) {
227 if envelope.return_undeliverable() {
228 static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
230 let client = &CLIENT
231 .get_or_init(|| Proc::runtime().client("global_return_client").unwrap())
232 .0;
233 let envelope_copy = envelope.clone();
234 if return_handle
235 .try_send(client, Undeliverable::message(envelope))
236 .is_err()
237 {
238 UndeliverableMailboxSender.post(envelope_copy, return_handle)
239 }
240 }
241}
242
243#[derive(Debug, Error)]
244pub enum UndeliverableMessageError {
246 DeliveryFailure {
248 envelope: MessageEnvelope,
250 },
251
252 ReturnFailure {
255 envelope: MessageEnvelope,
257 },
258
259 Lost {
261 lost: LostMessage,
263 },
264}
265
266fn undeliverable_prefix(error: &UndeliverableMessageError) -> String {
275 let envelope = match error {
276 UndeliverableMessageError::DeliveryFailure { envelope }
277 | UndeliverableMessageError::ReturnFailure { envelope } => envelope,
278 UndeliverableMessageError::Lost { lost } => {
279 return format!("lost message to {}", lost.dest);
280 }
281 };
282 if let Some(endpoint) = envelope.headers().get(OPERATION_ENDPOINT) {
283 let adverb = envelope
284 .headers()
285 .get(OPERATION_ADVERB)
286 .unwrap_or_else(|| "?".to_string());
287 return format!("undeliverable message for {} ({})", endpoint, adverb);
288 }
289 match error {
290 UndeliverableMessageError::DeliveryFailure { .. } => {
291 format!("undeliverable message to {}", envelope.dest())
292 }
293 UndeliverableMessageError::ReturnFailure { .. } => {
294 format!(
295 "undeliverable return to original sender {}",
296 envelope.sender()
297 )
298 }
299 UndeliverableMessageError::Lost { lost } => format!("lost message to {}", lost.dest),
300 }
301}
302
303impl std::fmt::Display for UndeliverableMessageError {
304 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305 let (envelope, description, sender_label, dest_label) = match self {
312 UndeliverableMessageError::DeliveryFailure { envelope } => (
313 envelope,
314 "delivery of message from sender to dest failed",
315 "sender",
316 "dest",
317 ),
318 UndeliverableMessageError::ReturnFailure { envelope } => (
319 envelope,
320 "returning undeliverable message to original sender failed",
321 "original sender",
322 "original dest",
323 ),
324 UndeliverableMessageError::Lost { lost } => {
325 writeln!(f, "{}:", undeliverable_prefix(self))?;
326 writeln!(
327 f,
328 "\tdescription: message was lost before it could be returned"
329 )?;
330 writeln!(f, "\tsender: {}", lost.sender)?;
331 writeln!(f, "\tdest: {}", lost.dest)?;
332 writeln!(
333 f,
334 "\tmessage type: {}",
335 lost.message_type.as_deref().unwrap_or("unknown")
336 )?;
337 writeln!(f, "\terror: {}", lost.error)?;
338 return Ok(());
339 }
340 };
341
342 writeln!(f, "{}:", undeliverable_prefix(self))?;
343 writeln!(f, "\tdescription: {}", description)?;
344 writeln!(f, "\t{}: {}", sender_label, envelope.sender())?;
345 writeln!(f, "\t{}: {}", dest_label, envelope.dest())?;
346 let message_type = envelope
350 .data()
351 .typename()
352 .map(|s| s.to_string())
353 .or_else(|| envelope.headers().get(RUST_MESSAGE_TYPE))
354 .unwrap_or_else(|| "unknown".to_string());
355 writeln!(f, "\tmessage type: {}", message_type)?;
356 writeln!(f, "\tdata_len: {}", envelope.data().len())?;
357 writeln!(
358 f,
359 "\terror: {}",
360 envelope.error_msg().unwrap_or("<none>".to_string())
361 )
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use hyperactor_config::Flattrs;
368
369 use super::*;
370 use crate::mailbox::MessageEnvelope;
371 use crate::testing::ids::test_actor_id;
372 use crate::testing::ids::test_port_id;
373
374 fn make_envelope(payload: &str, headers: Flattrs) -> MessageEnvelope {
375 let sender = test_actor_id("ue_proc", "ue_sender");
376 let dest = test_port_id("ue_dest_proc", "ue_dest", 42);
377 let data = wirevalue::Any::serialize(&payload.to_string()).unwrap();
378 MessageEnvelope::new(sender, dest, data, headers)
379 }
380
381 #[test]
385 fn test_ue1_delivery_failure_bounded() {
386 let payload: String = std::iter::repeat_n('x', 10_000).collect();
387 let mut headers = Flattrs::new();
388 headers.set(OPERATION_ENDPOINT, "training.buffer.sample()".to_string());
389 let envelope = make_envelope(&payload, headers);
390 let rendered = format!(
391 "{}",
392 UndeliverableMessageError::DeliveryFailure { envelope }
393 );
394
395 assert!(
396 rendered.contains("message type:"),
397 "UE-1: message type field must be present, got:\n{rendered}"
398 );
399 assert!(
400 rendered.contains("data_len:"),
401 "UE-1: data_len field must be present, got:\n{rendered}"
402 );
403 assert!(
404 rendered.contains("sender:"),
405 "UE-2: sender field must be preserved, got:\n{rendered}"
406 );
407 assert!(
408 rendered.contains("dest:"),
409 "UE-2: dest field must be preserved, got:\n{rendered}"
410 );
411 assert!(
412 rendered.contains("error:"),
413 "UE-2: error field must be preserved, got:\n{rendered}"
414 );
415 assert!(
417 !rendered.contains("\theaders: "),
418 "UE-1: raw headers dump leaked, got:\n{rendered}"
419 );
420 assert!(
421 !rendered.contains("\tdata: "),
422 "UE-1: raw data dump leaked, got:\n{rendered}"
423 );
424 assert!(
426 !rendered.contains(&payload),
427 "UE-1: payload body leaked into rendered text"
428 );
429 }
430
431 #[test]
434 fn test_ue1_return_failure_bounded() {
435 let payload: String = std::iter::repeat_n('y', 10_000).collect();
436 let envelope = make_envelope(&payload, Flattrs::new());
437 let rendered = format!("{}", UndeliverableMessageError::ReturnFailure { envelope });
438
439 assert!(
440 rendered.contains("data_len:"),
441 "UE-1: data_len field must be present, got:\n{rendered}"
442 );
443 assert!(
444 !rendered.contains("\theaders: "),
445 "UE-1: raw headers dump leaked, got:\n{rendered}"
446 );
447 assert!(
448 !rendered.contains("\tdata: "),
449 "UE-1: raw data dump leaked, got:\n{rendered}"
450 );
451 assert!(
452 !rendered.contains(&payload),
453 "UE-1: payload body leaked into rendered text"
454 );
455 }
456
457 #[test]
462 fn test_ue3_operation_endpoint_names_top_line() {
463 let mut headers = Flattrs::new();
464 headers.set(OPERATION_ENDPOINT, "training.buffer.sample()".to_string());
465 headers.set(OPERATION_ADVERB, "call_one".to_string());
466 let envelope = make_envelope("payload", headers);
467 let rendered = format!(
468 "{}",
469 UndeliverableMessageError::DeliveryFailure { envelope }
470 );
471
472 let expected_line = "undeliverable message for training.buffer.sample() (call_one):";
473 assert!(
474 rendered.starts_with(expected_line),
475 "UE-3/UE-4: expected top line `{expected_line}`, got:\n{rendered}"
476 );
477 assert!(
481 !rendered.contains("undeliverable reply"),
482 "UE-4: must not claim reply-kind from header presence alone, got:\n{rendered}"
483 );
484 assert!(
485 !rendered.contains("undeliverable send"),
486 "UE-4: must not claim send-kind from header presence alone, got:\n{rendered}"
487 );
488 }
489
490 #[test]
494 fn test_ue3_delivery_failure_no_context_names_destination() {
495 let envelope = make_envelope("payload", Flattrs::new());
496 let dest_str = envelope.dest().to_string();
497 let rendered = format!(
498 "{}",
499 UndeliverableMessageError::DeliveryFailure { envelope }
500 );
501
502 let expected_prefix = format!("undeliverable message to {}", dest_str);
503 assert!(
504 rendered.starts_with(&expected_prefix),
505 "UE-3: delivery failure no context → destination prefix `{expected_prefix}`, got:\n{rendered}"
506 );
507 assert!(
509 !rendered.contains("undeliverable message error"),
510 "UE-3: neutral fallback must not be re-introduced, got:\n{rendered}"
511 );
512 }
513
514 #[test]
520 fn test_ue3_return_failure_no_context_names_original_sender() {
521 let envelope = make_envelope("payload", Flattrs::new());
522 let sender_str = envelope.sender().to_string();
523 let dest_str = envelope.dest().to_string();
524 let rendered = format!("{}", UndeliverableMessageError::ReturnFailure { envelope });
525
526 let expected_prefix = format!("undeliverable return to original sender {}", sender_str);
527 assert!(
528 rendered.starts_with(&expected_prefix),
529 "UE-3: return failure no context → original-sender prefix `{expected_prefix}`, got:\n{rendered}"
530 );
531 assert!(
535 !rendered.starts_with(&format!("undeliverable message to {}", dest_str)),
536 "UE-3: return failure must not headline the original destination, got:\n{rendered}"
537 );
538 assert!(
540 !rendered.contains("undeliverable message error"),
541 "UE-3: neutral fallback must not be re-introduced, got:\n{rendered}"
542 );
543 }
544
545 #[test]
549 fn test_ue5_message_type_falls_back_to_rust_message_type() {
550 let sender = test_actor_id("ue_proc", "ue_sender");
554 let dest = test_port_id("ue_dest_proc", "ue_dest", 42);
555 let mut headers = Flattrs::new();
556 headers.set(RUST_MESSAGE_TYPE, "my::Foo".to_string());
557 let envelope = MessageEnvelope::new(sender, dest, wirevalue::Any::new_broken(), headers);
558 assert!(
559 envelope.data().typename().is_none(),
560 "test fixture invariant: broken Any must have no typename()"
561 );
562
563 let rendered = format!(
564 "{}",
565 UndeliverableMessageError::DeliveryFailure { envelope }
566 );
567
568 assert!(
569 rendered.contains("\tmessage type: my::Foo\n"),
570 "UE-5: must surface RUST_MESSAGE_TYPE when typename() is absent, got:\n{rendered}"
571 );
572 assert!(
573 !rendered.contains("\tmessage type: unknown"),
574 "UE-5: must not render \"unknown\" when RUST_MESSAGE_TYPE is present, got:\n{rendered}"
575 );
576 }
577
578 #[test]
582 fn test_ue5_unknown_when_typename_and_rust_message_type_both_absent() {
583 let sender = test_actor_id("ue_proc", "ue_sender");
584 let dest = test_port_id("ue_dest_proc", "ue_dest", 42);
585 let envelope =
586 MessageEnvelope::new(sender, dest, wirevalue::Any::new_broken(), Flattrs::new());
587 assert!(
588 envelope.data().typename().is_none(),
589 "test fixture invariant: broken Any must have no typename()"
590 );
591
592 let rendered = format!(
593 "{}",
594 UndeliverableMessageError::DeliveryFailure { envelope }
595 );
596
597 assert!(
598 rendered.contains("\tmessage type: unknown\n"),
599 "UE-5: with no typename and no RUST_MESSAGE_TYPE, must render \"unknown\", got:\n{rendered}"
600 );
601 }
602}