hyperactor_telemetry/
in_memory_reader.rs1use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::Weak;
12
13use opentelemetry_sdk::Resource;
14use opentelemetry_sdk::error::OTelSdkResult;
15use opentelemetry_sdk::metrics::InstrumentKind;
16use opentelemetry_sdk::metrics::ManualReader;
17use opentelemetry_sdk::metrics::MetricResult;
18use opentelemetry_sdk::metrics::Pipeline;
19use opentelemetry_sdk::metrics::SdkMeterProvider;
20use opentelemetry_sdk::metrics::Temporality;
21use opentelemetry_sdk::metrics::data::ResourceMetrics;
22use opentelemetry_sdk::metrics::data::Sum;
23use opentelemetry_sdk::metrics::reader::MetricReader;
24
25#[derive(Debug, Clone)]
27pub struct InMemoryReader {
28 manual_reader: Arc<ManualReader>,
29}
30
31impl InMemoryReader {
32 pub fn new(manual_reader: Arc<ManualReader>) -> Self {
34 Self { manual_reader }
35 }
36
37 pub fn get_all_counters(&self) -> HashMap<String, i64> {
39 let mut rm = ResourceMetrics {
40 resource: Resource::builder_empty().build(),
41 scope_metrics: Vec::new(),
42 };
43 let _ = self.manual_reader.collect(&mut rm);
44
45 let mut counters = HashMap::new();
47 for scope in &rm.scope_metrics {
48 for metric in &scope.metrics {
49 let data = metric.data.as_any();
50
51 if let Some(sum_u64) = data.downcast_ref::<Sum<u64>>() {
52 for data_point in &sum_u64.data_points {
53 let metric_name = metric.name.to_string();
54 counters.insert(metric_name, data_point.value as i64);
55 }
56 } else if let Some(sum_i64) = data.downcast_ref::<Sum<i64>>() {
57 for data_point in &sum_i64.data_points {
58 let metric_name = metric.name.to_string();
59 counters.insert(metric_name, data_point.value);
60 }
61 }
62 }
63 }
64 counters
65 }
66}
67
68impl MetricReader for InMemoryReader {
69 fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
70 self.manual_reader.register_pipeline(pipeline);
71 }
72
73 fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
74 self.manual_reader.collect(rm)
75 }
76
77 fn force_flush(&self) -> OTelSdkResult {
78 self.manual_reader.force_flush()
79 }
80
81 fn shutdown(&self) -> OTelSdkResult {
82 self.manual_reader.shutdown()
83 }
84
85 fn temporality(&self, kind: InstrumentKind) -> Temporality {
86 self.manual_reader.temporality(kind)
87 }
88}
89
90pub struct InMemoryMetrics {
102 in_memory_reader: InMemoryReader,
103 _provider: SdkMeterProvider,
104}
105
106impl InMemoryMetrics {
107 pub fn new() -> Self {
117 let manual_reader = Arc::new(
120 ManualReader::builder()
121 .with_temporality(Temporality::Cumulative)
122 .build(),
123 );
124
125 let in_memory_reader = InMemoryReader::new(Arc::clone(&manual_reader));
127
128 let provider = SdkMeterProvider::builder()
130 .with_reader(in_memory_reader)
131 .build();
132
133 opentelemetry::global::set_meter_provider(provider.clone());
135
136 Self {
137 in_memory_reader: InMemoryReader::new(Arc::clone(&manual_reader)),
138 _provider: provider,
139 }
140 }
141
142 pub fn get_counters(&self) -> HashMap<String, i64> {
144 self.in_memory_reader.get_all_counters()
145 }
146
147 pub fn get_counter(&self, name: &str) -> Option<i64> {
149 self.get_counters().get(name).copied()
150 }
151
152 pub fn reader(&self) -> &InMemoryReader {
154 &self.in_memory_reader
155 }
156}
157
158impl Drop for InMemoryMetrics {
159 fn drop(&mut self) {
160 let _ = self._provider.shutdown();
162
163 let noop_provider = SdkMeterProvider::builder().build();
166 opentelemetry::global::set_meter_provider(noop_provider);
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173
174 #[test]
175 fn test_in_memory_metrics_guard() {
176 let guard = InMemoryMetrics::new();
178
179 crate::declare_static_counter!(GUARD_TEST_COUNTER, "guard_test_counter");
181 GUARD_TEST_COUNTER.add(42, &[]);
182
183 let counters = guard.get_counters();
185 assert_eq!(counters.get("guard_test_counter"), Some(&42));
186
187 assert_eq!(guard.get_counter("guard_test_counter"), Some(42));
189 assert_eq!(guard.get_counter("nonexistent_counter"), None);
190
191 }
193
194 #[test]
195 fn test_multiple_guards_sequential() {
196 {
198 let guard1 = InMemoryMetrics::new();
199 crate::declare_static_counter!(COUNTER_1, "counter_1");
200 COUNTER_1.add(10, &[]);
201 assert_eq!(guard1.get_counter("counter_1"), Some(10));
202 } {
205 let guard2 = InMemoryMetrics::new();
206 crate::declare_static_counter!(COUNTER_2, "counter_2");
207 COUNTER_2.add(20, &[]);
208 assert_eq!(guard2.get_counter("counter_2"), Some(20));
209 assert_eq!(guard2.get_counter("counter_1"), None);
211 } }
213
214 #[test]
215 fn test_counter_accumulation() {
216 let guard = InMemoryMetrics::new();
217
218 crate::declare_static_counter!(ACCUMULATING_COUNTER, "accumulating_counter");
219
220 ACCUMULATING_COUNTER.add(1, &[]);
222 assert_eq!(guard.get_counter("accumulating_counter"), Some(1));
223
224 ACCUMULATING_COUNTER.add(2, &[]);
225 assert_eq!(guard.get_counter("accumulating_counter"), Some(3));
226
227 ACCUMULATING_COUNTER.add(7, &[]);
228 assert_eq!(guard.get_counter("accumulating_counter"), Some(10));
229 }
230
231 #[test]
232 fn test_guard_isolation() {
233 let _guard1 = InMemoryMetrics::new();
235 let _guard2 = InMemoryMetrics::new();
236
237 {
239 let _temp_guard1 = InMemoryMetrics::new(); crate::declare_static_counter!(ISOLATED_COUNTER_1, "isolated_counter_1");
242 ISOLATED_COUNTER_1.add(100, &[]);
243 }
244
245 {
246 let _temp_guard2 = InMemoryMetrics::new(); crate::declare_static_counter!(ISOLATED_COUNTER_2, "isolated_counter_2");
249 ISOLATED_COUNTER_2.add(200, &[]);
250 }
251 }
252}