Skip to main content

hyperactor/mailbox/
headers.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Message headers and latency tracking functionality for the mailbox system.
10//!
11//! This module provides header attributes and utilities for message metadata,
12//! including latency tracking timestamps used to measure message processing times.
13
14use std::any::type_name;
15use std::time::SystemTime;
16
17use hyperactor_config::Flattrs;
18use hyperactor_config::attrs::OPERATION_CONTEXT_HEADER;
19use hyperactor_config::attrs::declare_attrs;
20use hyperactor_config::global;
21
22use crate::metrics::MESSAGE_LATENCY_MICROS;
23
24declare_attrs! {
25    /// Send timestamp for message latency tracking
26    pub attr SEND_TIMESTAMP: SystemTime;
27
28    /// The rust type of the message.
29    pub attr RUST_MESSAGE_TYPE: String;
30
31    /// Hashed ActorAddr of the message sender, injected in post_unchecked().
32    pub attr SENDER_ACTOR_ID_HASH: u64;
33
34    /// Telemetry message ID for correlating lifecycle events, injected in post_unchecked().
35    pub attr TELEMETRY_MESSAGE_ID: u64;
36
37    /// Port index the message was delivered to, injected in post_unchecked().
38    pub attr TELEMETRY_PORT_ID: u64;
39
40    // Operation-context headers (see `OPERATION_CONTEXT_HEADER` in
41    // `hyperactor_config::attrs`). Carried from the caller's outgoing
42    // request onto the reply envelope by a consumer-side helper that
43    // filters on `OPERATION_CONTEXT_HEADER`. Read at the
44    // undeliverable-abandonment log site in
45    // `hyperactor/src/mailbox.rs` to name the user operation a
46    // dropped reply belonged to (UM-3b).
47    //
48    // Layering note: these keys belong semantically to a higher layer
49    // (Monarch endpoint / adverb / method). They live in `hyperactor`
50    // as a tactical compromise because the log reader lives here and
51    // cannot depend upward on `monarch_hyperactor`. Scope narrowly;
52    // do not grow this vocabulary without revisiting whether the
53    // reader should move up a layer or a generic substrate-owned
54    // operation-context abstraction should replace these keys.
55
56    /// Qualified endpoint name of the caller's operation, e.g.
57    /// "<mesh>.<method>()". Stamped by the request-send site.
58    @meta(OPERATION_CONTEXT_HEADER = true)
59    pub attr OPERATION_ENDPOINT: String;
60
61    /// Endpoint adverb describing the call shape. Typical values from
62    /// current Monarch producers: "call", "call_one", "choose",
63    /// "stream".
64    @meta(OPERATION_CONTEXT_HEADER = true)
65    pub attr OPERATION_ADVERB: String;
66}
67
68/// Set the send timestamp for latency tracking if timestamp not already set.
69pub fn set_send_timestamp(headers: &mut Flattrs) {
70    if !headers.contains_key(SEND_TIMESTAMP) {
71        let time = std::time::SystemTime::now();
72        headers.set(SEND_TIMESTAMP, time);
73    }
74}
75
76/// Set the send timestamp for latency tracking if timestamp not already set.
77pub fn set_rust_message_type<M>(headers: &mut Flattrs) {
78    headers.set(RUST_MESSAGE_TYPE, type_name::<M>().to_string());
79}
80
81/// This function checks the configured sampling rate and, if the random sample passes,
82/// calculates the latency between the send timestamp and the current time, then records
83/// the latency metric with the associated actor ID.
84pub fn log_message_latency_if_sampling(headers: &Flattrs, actor_id: String) {
85    if fastrand::f32() > global::get(crate::config::MESSAGE_LATENCY_SAMPLING_RATE) {
86        return;
87    }
88
89    if !headers.contains_key(SEND_TIMESTAMP) {
90        tracing::debug!(
91            actor_id = actor_id,
92            "SEND_TIMESTAMP missing from message headers, cannot measure latency"
93        );
94        return;
95    }
96
97    let metric_pairs = hyperactor_telemetry::kv_pairs!(
98        "actor_id" => actor_id
99    );
100    let Some(send_timestamp) = headers.get(SEND_TIMESTAMP) else {
101        return;
102    };
103    let now = std::time::SystemTime::now();
104    let latency = now.duration_since(send_timestamp).unwrap_or_default();
105    MESSAGE_LATENCY_MICROS.record(latency.as_micros() as f64, metric_pairs);
106}