hyperactor/mailbox/
headers.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Message headers and latency tracking functionality for the mailbox system.
10//!
11//! This module provides header attributes and utilities for message metadata,
12//! including latency tracking timestamps used to measure message processing times.
13
14use std::any::type_name;
15use std::time::SystemTime;
16
17use hyperactor_config::Flattrs;
18use hyperactor_config::attrs::declare_attrs;
19use hyperactor_config::global;
20
21use crate::metrics::MESSAGE_LATENCY_MICROS;
22
23declare_attrs! {
24    /// Send timestamp for message latency tracking
25    pub attr SEND_TIMESTAMP: SystemTime;
26
27    /// The rust type of the message.
28    pub attr RUST_MESSAGE_TYPE: String;
29
30    /// Hashed ActorId of the message sender, injected in post_unchecked().
31    pub attr SENDER_ACTOR_ID_HASH: u64;
32
33    /// Telemetry message ID for correlating lifecycle events, injected in post_unchecked().
34    pub attr TELEMETRY_MESSAGE_ID: u64;
35
36    /// Port index the message was delivered to, injected in post_unchecked().
37    pub attr TELEMETRY_PORT_ID: u64;
38}
39
40/// Set the send timestamp for latency tracking if timestamp not already set.
41pub fn set_send_timestamp(headers: &mut Flattrs) {
42    if !headers.contains_key(SEND_TIMESTAMP) {
43        let time = std::time::SystemTime::now();
44        headers.set(SEND_TIMESTAMP, time);
45    }
46}
47
48/// Set the send timestamp for latency tracking if timestamp not already set.
49pub fn set_rust_message_type<M>(headers: &mut Flattrs) {
50    headers.set(RUST_MESSAGE_TYPE, type_name::<M>().to_string());
51}
52
53/// This function checks the configured sampling rate and, if the random sample passes,
54/// calculates the latency between the send timestamp and the current time, then records
55/// the latency metric with the associated actor ID.
56pub fn log_message_latency_if_sampling(headers: &Flattrs, actor_id: String) {
57    if fastrand::f32() > global::get(crate::config::MESSAGE_LATENCY_SAMPLING_RATE) {
58        return;
59    }
60
61    if !headers.contains_key(SEND_TIMESTAMP) {
62        tracing::debug!(
63            actor_id = actor_id,
64            "SEND_TIMESTAMP missing from message headers, cannot measure latency"
65        );
66        return;
67    }
68
69    let metric_pairs = hyperactor_telemetry::kv_pairs!(
70        "actor_id" => actor_id
71    );
72    let Some(send_timestamp) = headers.get(SEND_TIMESTAMP) else {
73        return;
74    };
75    let now = std::time::SystemTime::now();
76    let latency = now.duration_since(send_timestamp).unwrap_or_default();
77    MESSAGE_LATENCY_MICROS.record(latency.as_micros() as f64, metric_pairs);
78}