hyperactor/mailbox/
headers.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Message headers and latency tracking functionality for the mailbox system.
10//!
11//! This module provides header attributes and utilities for message metadata,
12//! including latency tracking timestamps used to measure message processing times.
13
14use std::any::type_name;
15use std::time::SystemTime;
16
17use hyperactor_config::attrs::Attrs;
18use hyperactor_config::attrs::declare_attrs;
19use hyperactor_config::global;
20
21use crate::clock::Clock;
22use crate::clock::RealClock;
23use crate::metrics::MESSAGE_LATENCY_MICROS;
24
25declare_attrs! {
26    /// Send timestamp for message latency tracking
27    pub attr SEND_TIMESTAMP: SystemTime;
28
29    /// The rust type of the message.
30    pub attr RUST_MESSAGE_TYPE: String;
31}
32
33/// Set the send timestamp for latency tracking if timestamp not already set.
34pub fn set_send_timestamp(headers: &mut Attrs) {
35    if !headers.contains_key(SEND_TIMESTAMP) {
36        let time = RealClock.system_time_now();
37        headers.set(SEND_TIMESTAMP, time);
38    }
39}
40
41/// Set the send timestamp for latency tracking if timestamp not already set.
42pub fn set_rust_message_type<M>(headers: &mut Attrs) {
43    headers.set(RUST_MESSAGE_TYPE, type_name::<M>().to_string());
44}
45
46/// This function checks the configured sampling rate and, if the random sample passes,
47/// calculates the latency between the send timestamp and the current time, then records
48/// the latency metric with the associated actor ID.
49pub fn log_message_latency_if_sampling(headers: &Attrs, actor_id: String) {
50    if fastrand::f32() > global::get(crate::config::MESSAGE_LATENCY_SAMPLING_RATE) {
51        return;
52    }
53
54    if !headers.contains_key(SEND_TIMESTAMP) {
55        tracing::debug!(
56            actor_id = actor_id,
57            "SEND_TIMESTAMP missing from message headers, cannot measure latency"
58        );
59        return;
60    }
61
62    let metric_pairs = hyperactor_telemetry::kv_pairs!(
63        "actor_id" => actor_id
64    );
65    let Some(send_timestamp) = headers.get(SEND_TIMESTAMP) else {
66        return;
67    };
68    let now = RealClock.system_time_now();
69    let latency = now.duration_since(*send_timestamp).unwrap_or_default();
70    MESSAGE_LATENCY_MICROS.record(latency.as_micros() as f64, metric_pairs);
71}