hyperactor/mailbox/
headers.rs1use std::time::SystemTime;
15
16use crate::attrs::Attrs;
17use crate::attrs::declare_attrs;
18use crate::clock::Clock;
19use crate::clock::RealClock;
20use crate::config::global;
21use crate::metrics::MESSAGE_LATENCY_MICROS;
22
23declare_attrs! {
24 pub attr SEND_TIMESTAMP: SystemTime;
26}
27
28pub fn set_send_timestamp(headers: &mut Attrs) {
30 if !headers.contains_key(SEND_TIMESTAMP) {
31 let time = RealClock.system_time_now();
32 headers.set(SEND_TIMESTAMP, time);
33 }
34}
35
36pub fn log_message_latency_if_sampling(headers: &Attrs, actor_id: String) {
40 if fastrand::f32() > global::get(crate::config::MESSAGE_LATENCY_SAMPLING_RATE) {
41 return;
42 }
43
44 if !headers.contains_key(SEND_TIMESTAMP) {
45 tracing::warn!(
46 actor_id = actor_id,
47 "SEND_TIMESTAMP missing from message headers, cannot measure latency"
48 );
49 return;
50 }
51
52 let metric_pairs = hyperactor_telemetry::kv_pairs!(
53 "actor_id" => actor_id
54 );
55 let Some(send_timestamp) = headers.get(SEND_TIMESTAMP) else {
56 return;
57 };
58 let now = RealClock.system_time_now();
59 let latency = now.duration_since(*send_timestamp).unwrap();
60 MESSAGE_LATENCY_MICROS.record(latency.as_micros() as f64, metric_pairs);
61}