hyperactor/mailbox/headers.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Message headers and latency tracking functionality for the mailbox system.
10//!
11//! This module provides header attributes and utilities for message metadata,
12//! including latency tracking timestamps used to measure message processing times.
13
14use std::any::type_name;
15use std::time::SystemTime;
16
17use hyperactor_config::Flattrs;
18use hyperactor_config::attrs::OPERATION_CONTEXT_HEADER;
19use hyperactor_config::attrs::declare_attrs;
20use hyperactor_config::global;
21
22use crate::metrics::MESSAGE_LATENCY_MICROS;
23
24declare_attrs! {
25 /// Send timestamp for message latency tracking
26 pub attr SEND_TIMESTAMP: SystemTime;
27
28 /// The rust type of the message.
29 pub attr RUST_MESSAGE_TYPE: String;
30
31 /// Hashed ActorAddr of the message sender, injected in post_unchecked().
32 pub attr SENDER_ACTOR_ID_HASH: u64;
33
34 /// Telemetry message ID for correlating lifecycle events, injected in post_unchecked().
35 pub attr TELEMETRY_MESSAGE_ID: u64;
36
37 /// Port index the message was delivered to, injected in post_unchecked().
38 pub attr TELEMETRY_PORT_ID: u64;
39
40 // Operation-context headers (see `OPERATION_CONTEXT_HEADER` in
41 // `hyperactor_config::attrs`). Carried from the caller's outgoing
42 // request onto the reply envelope by a consumer-side helper that
43 // filters on `OPERATION_CONTEXT_HEADER`. Read at the
44 // undeliverable-abandonment log site in
45 // `hyperactor/src/mailbox.rs` to name the user operation a
46 // dropped reply belonged to (UM-3b).
47 //
48 // Layering note: these keys belong semantically to a higher layer
49 // (Monarch endpoint / adverb / method). They live in `hyperactor`
50 // as a tactical compromise because the log reader lives here and
51 // cannot depend upward on `monarch_hyperactor`. Scope narrowly;
52 // do not grow this vocabulary without revisiting whether the
53 // reader should move up a layer or a generic substrate-owned
54 // operation-context abstraction should replace these keys.
55
56 /// Qualified endpoint name of the caller's operation, e.g.
57 /// "<mesh>.<method>()". Stamped by the request-send site.
58 @meta(OPERATION_CONTEXT_HEADER = true)
59 pub attr OPERATION_ENDPOINT: String;
60
61 /// Endpoint adverb describing the call shape. Typical values from
62 /// current Monarch producers: "call", "call_one", "choose",
63 /// "stream".
64 @meta(OPERATION_CONTEXT_HEADER = true)
65 pub attr OPERATION_ADVERB: String;
66}
67
68/// Set the send timestamp for latency tracking if timestamp not already set.
69pub fn set_send_timestamp(headers: &mut Flattrs) {
70 if !headers.contains_key(SEND_TIMESTAMP) {
71 let time = std::time::SystemTime::now();
72 headers.set(SEND_TIMESTAMP, time);
73 }
74}
75
76/// Set the send timestamp for latency tracking if timestamp not already set.
77pub fn set_rust_message_type<M>(headers: &mut Flattrs) {
78 headers.set(RUST_MESSAGE_TYPE, type_name::<M>().to_string());
79}
80
81/// This function checks the configured sampling rate and, if the random sample passes,
82/// calculates the latency between the send timestamp and the current time, then records
83/// the latency metric with the associated actor ID.
84pub fn log_message_latency_if_sampling(headers: &Flattrs, actor_id: String) {
85 if fastrand::f32() > global::get(crate::config::MESSAGE_LATENCY_SAMPLING_RATE) {
86 return;
87 }
88
89 if !headers.contains_key(SEND_TIMESTAMP) {
90 tracing::debug!(
91 actor_id = actor_id,
92 "SEND_TIMESTAMP missing from message headers, cannot measure latency"
93 );
94 return;
95 }
96
97 let metric_pairs = hyperactor_telemetry::kv_pairs!(
98 "actor_id" => actor_id
99 );
100 let Some(send_timestamp) = headers.get(SEND_TIMESTAMP) else {
101 return;
102 };
103 let now = std::time::SystemTime::now();
104 let latency = now.duration_since(send_timestamp).unwrap_or_default();
105 MESSAGE_LATENCY_MICROS.record(latency.as_micros() as f64, metric_pairs);
106}