hyperactor_telemetry/
rate_limit.rs1#[doc(hidden)]
16#[macro_export]
17macro_rules! log_every_ms_impl {
18 ($interval_ms:expr, $level:ident, $($args:tt)+) => {{
19 const { assert!($interval_ms > 0, "interval_ms must be greater than 0"); }
20 use std::sync::atomic::{AtomicU64, Ordering};
21 use std::time::{SystemTime, UNIX_EPOCH};
22
23 static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
25
26 let now_ms = SystemTime::now()
27 .duration_since(UNIX_EPOCH)
28 .unwrap_or_default()
29 .as_millis() as u64;
30
31 let last = LAST_LOG_MS.load(Ordering::Relaxed);
32 let should_log = last == 0 || now_ms.saturating_sub(last) >= $interval_ms as u64;
33
34 if should_log {
35 LAST_LOG_MS.store(now_ms, Ordering::Relaxed);
36 tracing::$level!($($args)+)
37 }
38 }};
39}
40
41#[macro_export]
43macro_rules! trace_every_ms {
44 ($interval_ms:expr, $($args:tt)+) => {
45 $crate::log_every_ms_impl!($interval_ms, trace, $($args)+)
46 };
47}
48
49#[macro_export]
51macro_rules! debug_every_ms {
52 ($interval_ms:expr, $($args:tt)+) => {
53 $crate::log_every_ms_impl!($interval_ms, debug, $($args)+)
54 };
55}
56
57#[macro_export]
73macro_rules! info_every_ms {
74 ($interval_ms:expr, $($args:tt)+) => {
75 $crate::log_every_ms_impl!($interval_ms, info, $($args)+)
76 };
77}
78
79#[macro_export]
81macro_rules! warn_every_ms {
82 ($interval_ms:expr, $($args:tt)+) => {
83 $crate::log_every_ms_impl!($interval_ms, warn, $($args)+)
84 };
85}
86
87#[macro_export]
89macro_rules! error_every_ms {
90 ($interval_ms:expr, $($args:tt)+) => {
91 $crate::log_every_ms_impl!($interval_ms, error, $($args)+)
92 };
93}
94
95#[doc(hidden)]
97#[macro_export]
98macro_rules! log_every_n_impl {
99 ($n:expr, $level:ident, $($args:tt)+) => {{
100 const { assert!($n > 0, "n must be greater than 0"); }
101 use std::sync::atomic::{AtomicU64, Ordering};
102
103 static COUNTER: AtomicU64 = AtomicU64::new(0);
104
105 let count = COUNTER.fetch_add(1, Ordering::Relaxed);
108 if count % $n as u64 == 0 {
109 tracing::$level!($($args)+)
110 }
111 }};
112}
113
114#[macro_export]
116macro_rules! trace_every_n {
117 ($n:expr, $($args:tt)+) => {
118 $crate::log_every_n_impl!($n, trace, $($args)+)
119 };
120}
121
122#[macro_export]
124macro_rules! debug_every_n {
125 ($n:expr, $($args:tt)+) => {
126 $crate::log_every_n_impl!($n, debug, $($args)+)
127 };
128}
129
130#[macro_export]
147macro_rules! info_every_n {
148 ($n:expr, $($args:tt)+) => {
149 $crate::log_every_n_impl!($n, info, $($args)+)
150 };
151}
152
153#[macro_export]
155macro_rules! warn_every_n {
156 ($n:expr, $($args:tt)+) => {
157 $crate::log_every_n_impl!($n, warn, $($args)+)
158 };
159}
160
161#[macro_export]
163macro_rules! error_every_n {
164 ($n:expr, $($args:tt)+) => {
165 $crate::log_every_n_impl!($n, error, $($args)+)
166 };
167}
168
169#[cfg(test)]
170mod tests {
171 use std::sync::atomic::AtomicU32;
172 use std::sync::atomic::Ordering;
173 use std::time::Duration;
174
175 use tracing_subscriber::prelude::*;
176
177 #[test]
178 fn test_rate_limited_logging_macros() {
179 static CALL_COUNT: AtomicU32 = AtomicU32::new(0);
180
181 let subscriber = tracing_subscriber::registry().with(
184 tracing_subscriber::fmt::layer()
185 .with_writer(move || {
186 CALL_COUNT.fetch_add(1, Ordering::SeqCst);
187 std::io::stdout()
188 })
189 .with_filter(tracing_subscriber::filter::LevelFilter::TRACE),
190 );
191
192 tracing::subscriber::with_default(subscriber, || {
193 let initial = CALL_COUNT.load(Ordering::SeqCst);
194
195 for i in 0..5 {
198 info_every_ms!(50, "test message iteration {}", i);
199 }
200 let after_loop = CALL_COUNT.load(Ordering::SeqCst);
201 assert_eq!(
202 after_loop - initial,
203 1,
204 "only first iteration should log: {} -> {}",
205 initial,
206 after_loop
207 );
208
209 std::thread::sleep(Duration::from_millis(60));
211 for i in 5..8 {
212 info_every_ms!(50, "test message iteration {}", i);
213 }
214 let after_interval = CALL_COUNT.load(Ordering::SeqCst);
215 assert_eq!(
216 after_interval - after_loop,
217 1,
218 "only first iteration after sleep should log: {} -> {}",
219 after_loop,
220 after_interval
221 );
222 });
223
224 let name = "test";
226 trace_every_ms!(100, "trace message");
227 debug_every_ms!(100, "debug message");
228 warn_every_ms!(100, key = "value", %name, "warn with fields");
229 error_every_ms!(100, count = 42, ?name, "error with format: {}", "test");
230 }
231
232 #[test]
233 fn test_count_based_rate_limited_logging() {
234 use std::io::Write;
235 use std::sync::Arc;
236 use std::sync::Mutex;
237
238 #[derive(Clone)]
239 struct SharedWriter(Arc<Mutex<Vec<u8>>>);
240
241 impl Write for SharedWriter {
242 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
243 self.0.lock().unwrap().extend_from_slice(buf);
244 Ok(buf.len())
245 }
246 fn flush(&mut self) -> std::io::Result<()> {
247 Ok(())
248 }
249 }
250
251 let buffer = Arc::new(Mutex::new(Vec::new()));
252 let writer = SharedWriter(buffer.clone());
253
254 let subscriber = tracing_subscriber::registry().with(
255 tracing_subscriber::fmt::layer()
256 .with_writer(move || writer.clone())
257 .with_ansi(false)
258 .without_time()
259 .with_filter(tracing_subscriber::filter::LevelFilter::TRACE),
260 );
261
262 tracing::subscriber::with_default(subscriber, || {
263 for i in 1..=6 {
265 info_every_n!(3, "message {}", i);
266 }
267 });
268
269 let output = String::from_utf8(buffer.lock().unwrap().clone()).unwrap();
270
271 assert!(
273 output.contains("message 1"),
274 "should log message 1: {output}"
275 );
276 assert!(
277 !output.contains("message 2"),
278 "should NOT log message 2: {output}"
279 );
280 assert!(
281 !output.contains("message 3"),
282 "should NOT log message 3: {output}"
283 );
284 assert!(
285 output.contains("message 4"),
286 "should log message 4: {output}"
287 );
288 assert!(
289 !output.contains("message 5"),
290 "should NOT log message 5: {output}"
291 );
292 assert!(
293 !output.contains("message 6"),
294 "should NOT log message 6: {output}"
295 );
296
297 let name = "test";
299 trace_every_n!(5, "trace message");
300 debug_every_n!(5, "debug message");
301 warn_every_n!(5, key = "value", %name, "warn with fields");
302 error_every_n!(5, count = 42, ?name, "error with format: {}", "test");
303 }
304}