hyperactor_telemetry/
rate_limit.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Rate-limited logging macros.
10//!
11//! Provides macros for time-based (`*_every_ms!`) and count-based (`*_every_n!`)
12//! rate limiting of log messages per call site.
13
14/// Internal helper macro for rate-limited logging. Not intended for direct use.
15#[doc(hidden)]
16#[macro_export]
17macro_rules! log_every_ms_impl {
18    ($interval_ms:expr, $level:ident, $($args:tt)+) => {{
19        const { assert!($interval_ms > 0, "interval_ms must be greater than 0"); }
20        use std::sync::atomic::{AtomicU64, Ordering};
21        use std::time::{SystemTime, UNIX_EPOCH};
22
23        // Store last log time as millis since UNIX_EPOCH, 0 means never logged
24        static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
25
26        let now_ms = SystemTime::now()
27            .duration_since(UNIX_EPOCH)
28            .unwrap_or_default()
29            .as_millis() as u64;
30
31        let last = LAST_LOG_MS.load(Ordering::Relaxed);
32        let should_log = last == 0 || now_ms.saturating_sub(last) >= $interval_ms as u64;
33
34        if should_log {
35            LAST_LOG_MS.store(now_ms, Ordering::Relaxed);
36            tracing::$level!($($args)+)
37        }
38    }};
39}
40
41/// Rate-limited trace logging. See [`info_every_ms!`] for details.
42#[macro_export]
43macro_rules! trace_every_ms {
44    ($interval_ms:expr, $($args:tt)+) => {
45        $crate::log_every_ms_impl!($interval_ms, trace, $($args)+)
46    };
47}
48
49/// Rate-limited debug logging. See [`info_every_ms!`] for details.
50#[macro_export]
51macro_rules! debug_every_ms {
52    ($interval_ms:expr, $($args:tt)+) => {
53        $crate::log_every_ms_impl!($interval_ms, debug, $($args)+)
54    };
55}
56
57/// Rate-limited logging that emits at most once per N milliseconds per call site.
58///
59/// Uses atomic operations with relaxed ordering for minimal overhead.
60///
61/// # Example
62/// ```ignore
63/// // Basic message
64/// info_every_ms!(1000, "periodic status update");
65///
66/// // With format args
67/// info_every_ms!(500, "processed {} items", count);
68///
69/// // With key-value pairs
70/// info_every_ms!(1000, actor_id = id, "actor started");
71/// ```
72#[macro_export]
73macro_rules! info_every_ms {
74    ($interval_ms:expr, $($args:tt)+) => {
75        $crate::log_every_ms_impl!($interval_ms, info, $($args)+)
76    };
77}
78
79/// Rate-limited warn logging. See [`info_every_ms!`] for details.
80#[macro_export]
81macro_rules! warn_every_ms {
82    ($interval_ms:expr, $($args:tt)+) => {
83        $crate::log_every_ms_impl!($interval_ms, warn, $($args)+)
84    };
85}
86
87/// Rate-limited error logging. See [`info_every_ms!`] for details.
88#[macro_export]
89macro_rules! error_every_ms {
90    ($interval_ms:expr, $($args:tt)+) => {
91        $crate::log_every_ms_impl!($interval_ms, error, $($args)+)
92    };
93}
94
95/// Internal helper macro for count-based rate-limited logging. Not intended for direct use.
96#[doc(hidden)]
97#[macro_export]
98macro_rules! log_every_n_impl {
99    ($n:expr, $level:ident, $($args:tt)+) => {{
100        const { assert!($n > 0, "n must be greater than 0"); }
101        use std::sync::atomic::{AtomicU64, Ordering};
102
103        static COUNTER: AtomicU64 = AtomicU64::new(0);
104
105        // fetch_add returns the previous value, so first call returns 0
106        // This gives us glog behavior: log on 1st, n+1, 2n+1, ... invocations
107        let count = COUNTER.fetch_add(1, Ordering::Relaxed);
108        if count % $n as u64 == 0 {
109            tracing::$level!($($args)+)
110        }
111    }};
112}
113
114/// Rate-limited trace logging. See [`info_every_n!`] for details.
115#[macro_export]
116macro_rules! trace_every_n {
117    ($n:expr, $($args:tt)+) => {
118        $crate::log_every_n_impl!($n, trace, $($args)+)
119    };
120}
121
122/// Rate-limited debug logging. See [`info_every_n!`] for details.
123#[macro_export]
124macro_rules! debug_every_n {
125    ($n:expr, $($args:tt)+) => {
126        $crate::log_every_n_impl!($n, debug, $($args)+)
127    };
128}
129
130/// Count-based rate-limited logging that emits on the first invocation,
131/// then once every N invocations per call site (matching glog behavior).
132///
133/// Uses atomic operations with relaxed ordering for minimal overhead.
134///
135/// # Example
136/// ```ignore
137/// // Basic message
138/// info_every_n!(100, "periodic status");
139///
140/// // With format args
141/// info_every_n!(50, "processed {} items", count);
142///
143/// // With key-value pairs
144/// info_every_n!(100, actor_id = id, "actor started");
145/// ```
146#[macro_export]
147macro_rules! info_every_n {
148    ($n:expr, $($args:tt)+) => {
149        $crate::log_every_n_impl!($n, info, $($args)+)
150    };
151}
152
153/// Rate-limited warn logging. See [`info_every_n!`] for details.
154#[macro_export]
155macro_rules! warn_every_n {
156    ($n:expr, $($args:tt)+) => {
157        $crate::log_every_n_impl!($n, warn, $($args)+)
158    };
159}
160
161/// Rate-limited error logging. See [`info_every_n!`] for details.
162#[macro_export]
163macro_rules! error_every_n {
164    ($n:expr, $($args:tt)+) => {
165        $crate::log_every_n_impl!($n, error, $($args)+)
166    };
167}
168
169#[cfg(test)]
170mod tests {
171    use std::sync::atomic::AtomicU32;
172    use std::sync::atomic::Ordering;
173    use std::time::Duration;
174
175    use tracing_subscriber::prelude::*;
176
177    #[test]
178    fn test_rate_limited_logging_macros() {
179        static CALL_COUNT: AtomicU32 = AtomicU32::new(0);
180
181        // Test that the macros compile with various argument patterns
182        // Using a test subscriber to capture events
183        let subscriber = tracing_subscriber::registry().with(
184            tracing_subscriber::fmt::layer()
185                .with_writer(move || {
186                    CALL_COUNT.fetch_add(1, Ordering::SeqCst);
187                    std::io::stdout()
188                })
189                .with_filter(tracing_subscriber::filter::LevelFilter::TRACE),
190        );
191
192        tracing::subscriber::with_default(subscriber, || {
193            let initial = CALL_COUNT.load(Ordering::SeqCst);
194
195            // Call the same call site multiple times in a loop.
196            // First iteration should log, subsequent ones should be suppressed.
197            for i in 0..5 {
198                info_every_ms!(50, "test message iteration {}", i);
199            }
200            let after_loop = CALL_COUNT.load(Ordering::SeqCst);
201            assert_eq!(
202                after_loop - initial,
203                1,
204                "only first iteration should log: {} -> {}",
205                initial,
206                after_loop
207            );
208
209            // After interval, should log again
210            std::thread::sleep(Duration::from_millis(60));
211            for i in 5..8 {
212                info_every_ms!(50, "test message iteration {}", i);
213            }
214            let after_interval = CALL_COUNT.load(Ordering::SeqCst);
215            assert_eq!(
216                after_interval - after_loop,
217                1,
218                "only first iteration after sleep should log: {} -> {}",
219                after_loop,
220                after_interval
221            );
222        });
223
224        // Verify other macro levels compile with various argument patterns
225        let name = "test";
226        trace_every_ms!(100, "trace message");
227        debug_every_ms!(100, "debug message");
228        warn_every_ms!(100, key = "value", %name, "warn with fields");
229        error_every_ms!(100, count = 42, ?name, "error with format: {}", "test");
230    }
231
232    #[test]
233    fn test_count_based_rate_limited_logging() {
234        use std::io::Write;
235        use std::sync::Arc;
236        use std::sync::Mutex;
237
238        #[derive(Clone)]
239        struct SharedWriter(Arc<Mutex<Vec<u8>>>);
240
241        impl Write for SharedWriter {
242            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
243                self.0.lock().unwrap().extend_from_slice(buf);
244                Ok(buf.len())
245            }
246            fn flush(&mut self) -> std::io::Result<()> {
247                Ok(())
248            }
249        }
250
251        let buffer = Arc::new(Mutex::new(Vec::new()));
252        let writer = SharedWriter(buffer.clone());
253
254        let subscriber = tracing_subscriber::registry().with(
255            tracing_subscriber::fmt::layer()
256                .with_writer(move || writer.clone())
257                .with_ansi(false)
258                .without_time()
259                .with_filter(tracing_subscriber::filter::LevelFilter::TRACE),
260        );
261
262        tracing::subscriber::with_default(subscriber, || {
263            // With n=3, should log on 1st, 4th invocations (glog behavior)
264            for i in 1..=6 {
265                info_every_n!(3, "message {}", i);
266            }
267        });
268
269        let output = String::from_utf8(buffer.lock().unwrap().clone()).unwrap();
270
271        // Should contain 1st and 4th messages only
272        assert!(
273            output.contains("message 1"),
274            "should log message 1: {output}"
275        );
276        assert!(
277            !output.contains("message 2"),
278            "should NOT log message 2: {output}"
279        );
280        assert!(
281            !output.contains("message 3"),
282            "should NOT log message 3: {output}"
283        );
284        assert!(
285            output.contains("message 4"),
286            "should log message 4: {output}"
287        );
288        assert!(
289            !output.contains("message 5"),
290            "should NOT log message 5: {output}"
291        );
292        assert!(
293            !output.contains("message 6"),
294            "should NOT log message 6: {output}"
295        );
296
297        // Verify other macro levels compile with various argument patterns
298        let name = "test";
299        trace_every_n!(5, "trace message");
300        debug_every_n!(5, "debug message");
301        warn_every_n!(5, key = "value", %name, "warn with fields");
302        error_every_n!(5, count = 42, ?name, "error with format: {}", "test");
303    }
304}