hyperactor_telemetry/
in_memory_reader.rs1use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::Weak;
12use std::time::Duration;
13
14use opentelemetry_sdk::error::OTelSdkResult;
15use opentelemetry_sdk::metrics::InstrumentKind;
16use opentelemetry_sdk::metrics::ManualReader;
17use opentelemetry_sdk::metrics::Pipeline;
18use opentelemetry_sdk::metrics::SdkMeterProvider;
19use opentelemetry_sdk::metrics::Temporality;
20use opentelemetry_sdk::metrics::data::AggregatedMetrics;
21use opentelemetry_sdk::metrics::data::MetricData;
22use opentelemetry_sdk::metrics::data::ResourceMetrics;
23use opentelemetry_sdk::metrics::reader::MetricReader;
24
25#[derive(Debug, Clone)]
27pub struct InMemoryReader {
28 manual_reader: Arc<ManualReader>,
29}
30
31impl InMemoryReader {
32 pub fn new(manual_reader: Arc<ManualReader>) -> Self {
34 Self { manual_reader }
35 }
36
37 pub fn get_all_counters(&self) -> HashMap<String, i64> {
39 let mut rm = ResourceMetrics::default();
40 let _ = self.manual_reader.collect(&mut rm);
41
42 let mut counters = HashMap::new();
44 for scope in rm.scope_metrics() {
45 for metric in scope.metrics() {
46 let data = metric.data();
47
48 if let AggregatedMetrics::U64(MetricData::Sum(sum_u64)) = data {
49 for data_point in sum_u64.data_points() {
50 let metric_name = metric.name().to_owned();
51 counters.insert(metric_name, data_point.value() as i64);
52 }
53 } else if let AggregatedMetrics::I64(MetricData::Sum(sum_i64)) = data {
54 for data_point in sum_i64.data_points() {
55 let metric_name = metric.name().to_owned();
56 counters.insert(metric_name, data_point.value());
57 }
58 }
59 }
60 }
61 counters
62 }
63}
64
65impl MetricReader for InMemoryReader {
66 fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
67 self.manual_reader.register_pipeline(pipeline);
68 }
69
70 fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
71 self.manual_reader.collect(rm)
72 }
73
74 fn force_flush(&self) -> OTelSdkResult {
75 self.manual_reader.force_flush()
76 }
77
78 fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
79 self.manual_reader.shutdown_with_timeout(timeout)
80 }
81
82 fn temporality(&self, kind: InstrumentKind) -> Temporality {
83 self.manual_reader.temporality(kind)
84 }
85
86 fn shutdown(&self) -> OTelSdkResult {
87 self.manual_reader.shutdown()
88 }
89}
90
91pub struct InMemoryMetrics {
103 in_memory_reader: InMemoryReader,
104 _provider: SdkMeterProvider,
105}
106
107impl InMemoryMetrics {
108 pub fn new() -> Self {
118 let manual_reader = Arc::new(
121 ManualReader::builder()
122 .with_temporality(Temporality::Cumulative)
123 .build(),
124 );
125
126 let in_memory_reader = InMemoryReader::new(Arc::clone(&manual_reader));
128
129 let provider = SdkMeterProvider::builder()
131 .with_reader(in_memory_reader)
132 .build();
133
134 opentelemetry::global::set_meter_provider(provider.clone());
136
137 Self {
138 in_memory_reader: InMemoryReader::new(Arc::clone(&manual_reader)),
139 _provider: provider,
140 }
141 }
142
143 pub fn get_counters(&self) -> HashMap<String, i64> {
145 self.in_memory_reader.get_all_counters()
146 }
147
148 pub fn get_counter(&self, name: &str) -> Option<i64> {
150 self.get_counters().get(name).copied()
151 }
152
153 pub fn reader(&self) -> &InMemoryReader {
155 &self.in_memory_reader
156 }
157}
158
159impl Drop for InMemoryMetrics {
160 fn drop(&mut self) {
161 let _ = self._provider.shutdown();
163
164 let noop_provider = SdkMeterProvider::builder().build();
167 opentelemetry::global::set_meter_provider(noop_provider);
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn test_in_memory_metrics_guard() {
177 let guard = InMemoryMetrics::new();
179
180 crate::declare_static_counter!(GUARD_TEST_COUNTER, "guard_test_counter");
182 GUARD_TEST_COUNTER.add(42, &[]);
183
184 let counters = guard.get_counters();
186 assert_eq!(counters.get("guard_test_counter"), Some(&42));
187
188 assert_eq!(guard.get_counter("guard_test_counter"), Some(42));
190 assert_eq!(guard.get_counter("nonexistent_counter"), None);
191
192 }
194
195 #[test]
196 fn test_multiple_guards_sequential() {
197 {
199 let guard1 = InMemoryMetrics::new();
200 crate::declare_static_counter!(COUNTER_1, "counter_1");
201 COUNTER_1.add(10, &[]);
202 assert_eq!(guard1.get_counter("counter_1"), Some(10));
203 } {
206 let guard2 = InMemoryMetrics::new();
207 crate::declare_static_counter!(COUNTER_2, "counter_2");
208 COUNTER_2.add(20, &[]);
209 assert_eq!(guard2.get_counter("counter_2"), Some(20));
210 assert_eq!(guard2.get_counter("counter_1"), None);
212 } }
214
215 #[test]
216 fn test_counter_accumulation() {
217 let guard = InMemoryMetrics::new();
218
219 crate::declare_static_counter!(ACCUMULATING_COUNTER, "accumulating_counter");
220
221 ACCUMULATING_COUNTER.add(1, &[]);
223 assert_eq!(guard.get_counter("accumulating_counter"), Some(1));
224
225 ACCUMULATING_COUNTER.add(2, &[]);
226 assert_eq!(guard.get_counter("accumulating_counter"), Some(3));
227
228 ACCUMULATING_COUNTER.add(7, &[]);
229 assert_eq!(guard.get_counter("accumulating_counter"), Some(10));
230 }
231
232 #[test]
233 fn test_guard_isolation() {
234 let _guard1 = InMemoryMetrics::new();
236 let _guard2 = InMemoryMetrics::new();
237
238 {
240 let _temp_guard1 = InMemoryMetrics::new(); crate::declare_static_counter!(ISOLATED_COUNTER_1, "isolated_counter_1");
243 ISOLATED_COUNTER_1.add(100, &[]);
244 }
245
246 {
247 let _temp_guard2 = InMemoryMetrics::new(); crate::declare_static_counter!(ISOLATED_COUNTER_2, "isolated_counter_2");
250 ISOLATED_COUNTER_2.add(200, &[]);
251 }
252 }
253}