hyperactor/mailbox/
headers.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Message headers and latency tracking functionality for the mailbox system.
10//!
11//! This module provides header attributes and utilities for message metadata,
12//! including latency tracking timestamps used to measure message processing times.
13
14use std::time::SystemTime;
15
16use crate::attrs::Attrs;
17use crate::attrs::declare_attrs;
18use crate::clock::Clock;
19use crate::clock::RealClock;
20use crate::config::global;
21use crate::metrics::MESSAGE_LATENCY_MICROS;
22
23declare_attrs! {
24    /// Send timestamp for message latency tracking
25    pub attr SEND_TIMESTAMP: SystemTime;
26}
27
28/// Set the send timestamp for latency tracking if timestamp not already set.
29pub fn set_send_timestamp(headers: &mut Attrs) {
30    if !headers.contains_key(SEND_TIMESTAMP) {
31        let time = RealClock.system_time_now();
32        headers.set(SEND_TIMESTAMP, time);
33    }
34}
35
36/// This function checks the configured sampling rate and, if the random sample passes,
37/// calculates the latency between the send timestamp and the current time, then records
38/// the latency metric with the associated actor ID.
39pub fn log_message_latency_if_sampling(headers: &Attrs, actor_id: String) {
40    if fastrand::f32() > global::get(crate::config::MESSAGE_LATENCY_SAMPLING_RATE) {
41        return;
42    }
43
44    if !headers.contains_key(SEND_TIMESTAMP) {
45        tracing::warn!(
46            actor_id = actor_id,
47            "SEND_TIMESTAMP missing from message headers, cannot measure latency"
48        );
49        return;
50    }
51
52    let metric_pairs = hyperactor_telemetry::kv_pairs!(
53        "actor_id" => actor_id
54    );
55    let Some(send_timestamp) = headers.get(SEND_TIMESTAMP) else {
56        return;
57    };
58    let now = RealClock.system_time_now();
59    let latency = now.duration_since(*send_timestamp).unwrap();
60    MESSAGE_LATENCY_MICROS.record(latency.as_micros() as f64, metric_pairs);
61}