hyperactor/mailbox/
headers.rs1use std::any::type_name;
15use std::time::SystemTime;
16
17use crate::attrs::Attrs;
18use crate::attrs::declare_attrs;
19use crate::clock::Clock;
20use crate::clock::RealClock;
21use crate::config::global;
22use crate::metrics::MESSAGE_LATENCY_MICROS;
23
24declare_attrs! {
25 pub attr SEND_TIMESTAMP: SystemTime;
27
28 pub attr RUST_MESSAGE_TYPE: String;
30}
31
32pub fn set_send_timestamp(headers: &mut Attrs) {
34 if !headers.contains_key(SEND_TIMESTAMP) {
35 let time = RealClock.system_time_now();
36 headers.set(SEND_TIMESTAMP, time);
37 }
38}
39
40pub fn set_rust_message_type<M>(headers: &mut Attrs) {
42 headers.set(RUST_MESSAGE_TYPE, type_name::<M>().to_string());
43}
44
45pub fn log_message_latency_if_sampling(headers: &Attrs, actor_id: String) {
49 if fastrand::f32() > global::get(crate::config::MESSAGE_LATENCY_SAMPLING_RATE) {
50 return;
51 }
52
53 if !headers.contains_key(SEND_TIMESTAMP) {
54 tracing::debug!(
55 actor_id = actor_id,
56 "SEND_TIMESTAMP missing from message headers, cannot measure latency"
57 );
58 return;
59 }
60
61 let metric_pairs = hyperactor_telemetry::kv_pairs!(
62 "actor_id" => actor_id
63 );
64 let Some(send_timestamp) = headers.get(SEND_TIMESTAMP) else {
65 return;
66 };
67 let now = RealClock.system_time_now();
68 let latency = now.duration_since(*send_timestamp).unwrap_or_default();
69 MESSAGE_LATENCY_MICROS.record(latency.as_micros() as f64, metric_pairs);
70}