hyperactor/mailbox/
headers.rs1use std::any::type_name;
15use std::time::SystemTime;
16
17use hyperactor_config::Flattrs;
18use hyperactor_config::attrs::declare_attrs;
19use hyperactor_config::global;
20
21use crate::metrics::MESSAGE_LATENCY_MICROS;
22
23declare_attrs! {
24 pub attr SEND_TIMESTAMP: SystemTime;
26
27 pub attr RUST_MESSAGE_TYPE: String;
29
30 pub attr SENDER_ACTOR_ID_HASH: u64;
32
33 pub attr TELEMETRY_MESSAGE_ID: u64;
35
36 pub attr TELEMETRY_PORT_ID: u64;
38}
39
40pub fn set_send_timestamp(headers: &mut Flattrs) {
42 if !headers.contains_key(SEND_TIMESTAMP) {
43 let time = std::time::SystemTime::now();
44 headers.set(SEND_TIMESTAMP, time);
45 }
46}
47
48pub fn set_rust_message_type<M>(headers: &mut Flattrs) {
50 headers.set(RUST_MESSAGE_TYPE, type_name::<M>().to_string());
51}
52
53pub fn log_message_latency_if_sampling(headers: &Flattrs, actor_id: String) {
57 if fastrand::f32() > global::get(crate::config::MESSAGE_LATENCY_SAMPLING_RATE) {
58 return;
59 }
60
61 if !headers.contains_key(SEND_TIMESTAMP) {
62 tracing::debug!(
63 actor_id = actor_id,
64 "SEND_TIMESTAMP missing from message headers, cannot measure latency"
65 );
66 return;
67 }
68
69 let metric_pairs = hyperactor_telemetry::kv_pairs!(
70 "actor_id" => actor_id
71 );
72 let Some(send_timestamp) = headers.get(SEND_TIMESTAMP) else {
73 return;
74 };
75 let now = std::time::SystemTime::now();
76 let latency = now.duration_since(send_timestamp).unwrap_or_default();
77 MESSAGE_LATENCY_MICROS.record(latency.as_micros() as f64, metric_pairs);
78}