hyperactor_telemetry/
in_memory_reader.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::Weak;
12
13use opentelemetry_sdk::Resource;
14use opentelemetry_sdk::error::OTelSdkResult;
15use opentelemetry_sdk::metrics::InstrumentKind;
16use opentelemetry_sdk::metrics::ManualReader;
17use opentelemetry_sdk::metrics::MetricResult;
18use opentelemetry_sdk::metrics::Pipeline;
19use opentelemetry_sdk::metrics::SdkMeterProvider;
20use opentelemetry_sdk::metrics::Temporality;
21use opentelemetry_sdk::metrics::data::ResourceMetrics;
22use opentelemetry_sdk::metrics::data::Sum;
23use opentelemetry_sdk::metrics::reader::MetricReader;
24
25// InMemoryReader that uses a shared ManualReader and implements MetricReader
26#[derive(Debug, Clone)]
27pub struct InMemoryReader {
28    manual_reader: Arc<ManualReader>,
29}
30
31impl InMemoryReader {
32    // Create a new InMemoryReader with a specific ManualReader
33    pub fn new(manual_reader: Arc<ManualReader>) -> Self {
34        Self { manual_reader }
35    }
36
37    // Get all counters from the shared ManualReader
38    pub fn get_all_counters(&self) -> HashMap<String, i64> {
39        let mut rm = ResourceMetrics {
40            resource: Resource::builder_empty().build(),
41            scope_metrics: Vec::new(),
42        };
43        let _ = self.manual_reader.collect(&mut rm);
44
45        // Extract counters directly from the collected metrics
46        let mut counters = HashMap::new();
47        for scope in &rm.scope_metrics {
48            for metric in &scope.metrics {
49                let data = metric.data.as_any();
50
51                if let Some(sum_u64) = data.downcast_ref::<Sum<u64>>() {
52                    for data_point in &sum_u64.data_points {
53                        let metric_name = metric.name.to_string();
54                        counters.insert(metric_name, data_point.value as i64);
55                    }
56                } else if let Some(sum_i64) = data.downcast_ref::<Sum<i64>>() {
57                    for data_point in &sum_i64.data_points {
58                        let metric_name = metric.name.to_string();
59                        counters.insert(metric_name, data_point.value);
60                    }
61                }
62            }
63        }
64        counters
65    }
66}
67
68impl MetricReader for InMemoryReader {
69    fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
70        self.manual_reader.register_pipeline(pipeline);
71    }
72
73    fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
74        self.manual_reader.collect(rm)
75    }
76
77    fn force_flush(&self) -> OTelSdkResult {
78        self.manual_reader.force_flush()
79    }
80
81    fn shutdown(&self) -> OTelSdkResult {
82        self.manual_reader.shutdown()
83    }
84
85    fn temporality(&self, kind: InstrumentKind) -> Temporality {
86        self.manual_reader.temporality(kind)
87    }
88}
89
90// RAII guard for in-memory metrics collection during testing
91//
92// Usage:
93//     let _guard = InMemoryMetrics::new();
94//
95//     // Your code that emits metrics
96//     my_counter.add(42, &[]);
97//
98//     // Check accumulated metrics
99//     let counters = _guard.get_counters();
100//     assert_eq!(counters.get("my_counter"), Some(&42));
101pub struct InMemoryMetrics {
102    in_memory_reader: InMemoryReader,
103    _provider: SdkMeterProvider,
104}
105
106impl InMemoryMetrics {
107    // Create a new InMemoryMetrics
108    //
109    // This will:
110    // 1. Create a ManualReader as shared state
111    // 2. Create an InMemoryReader that uses the shared ManualReader
112    // 3. Create a new SdkMeterProvider with the InMemoryReader
113    // 4. Set it as the global meter provider
114    //
115    // When the guard is dropped, the provider will be shut down.
116    pub fn new() -> Self {
117        // Create the manual reader with cumulative temporality - this state
118        // will only exists for the lifetime of the guard
119        let manual_reader = Arc::new(
120            ManualReader::builder()
121                .with_temporality(Temporality::Cumulative)
122                .build(),
123        );
124
125        // Create the in-memory reader using the shared manual reader
126        let in_memory_reader = InMemoryReader::new(Arc::clone(&manual_reader));
127
128        // Create a new provider with the in-memory reader
129        let provider = SdkMeterProvider::builder()
130            .with_reader(in_memory_reader)
131            .build();
132
133        // Set as global provider
134        opentelemetry::global::set_meter_provider(provider.clone());
135
136        Self {
137            in_memory_reader: InMemoryReader::new(Arc::clone(&manual_reader)),
138            _provider: provider,
139        }
140    }
141
142    // Get all counters accumulated since this guard was created
143    pub fn get_counters(&self) -> HashMap<String, i64> {
144        self.in_memory_reader.get_all_counters()
145    }
146
147    // Get the value of a specific counter by name
148    pub fn get_counter(&self, name: &str) -> Option<i64> {
149        self.get_counters().get(name).copied()
150    }
151
152    // Get a reference to the InMemoryReader for advanced usage
153    pub fn reader(&self) -> &InMemoryReader {
154        &self.in_memory_reader
155    }
156}
157
158impl Drop for InMemoryMetrics {
159    fn drop(&mut self) {
160        // Shutdown our provider
161        let _ = self._provider.shutdown();
162
163        // Reset to a no-op provider to prevent metrics from continuing
164        // to be collected by our in-memory reader after the guard is dropped
165        let noop_provider = SdkMeterProvider::builder().build();
166        opentelemetry::global::set_meter_provider(noop_provider);
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    #[test]
175    fn test_in_memory_metrics_guard() {
176        // Use the RAII guard
177        let guard = InMemoryMetrics::new();
178
179        // Create and use counters
180        crate::declare_static_counter!(GUARD_TEST_COUNTER, "guard_test_counter");
181        GUARD_TEST_COUNTER.add(42, &[]);
182
183        // Check that we can read the counter value
184        let counters = guard.get_counters();
185        assert_eq!(counters.get("guard_test_counter"), Some(&42));
186
187        // Test the convenience method
188        assert_eq!(guard.get_counter("guard_test_counter"), Some(42));
189        assert_eq!(guard.get_counter("nonexistent_counter"), None);
190
191        // Guard will be dropped here, cleaning up automatically
192    }
193
194    #[test]
195    fn test_multiple_guards_sequential() {
196        // Test that multiple guards work correctly when used sequentially
197        {
198            let guard1 = InMemoryMetrics::new();
199            crate::declare_static_counter!(COUNTER_1, "counter_1");
200            COUNTER_1.add(10, &[]);
201            assert_eq!(guard1.get_counter("counter_1"), Some(10));
202        } // guard1 dropped here
203
204        {
205            let guard2 = InMemoryMetrics::new();
206            crate::declare_static_counter!(COUNTER_2, "counter_2");
207            COUNTER_2.add(20, &[]);
208            assert_eq!(guard2.get_counter("counter_2"), Some(20));
209            // counter_1 should not be visible in guard2 since it's a new provider
210            assert_eq!(guard2.get_counter("counter_1"), None);
211        } // guard2 dropped here
212    }
213
214    #[test]
215    fn test_counter_accumulation() {
216        let guard = InMemoryMetrics::new();
217
218        crate::declare_static_counter!(ACCUMULATING_COUNTER, "accumulating_counter");
219
220        // Add values multiple times
221        ACCUMULATING_COUNTER.add(1, &[]);
222        assert_eq!(guard.get_counter("accumulating_counter"), Some(1));
223
224        ACCUMULATING_COUNTER.add(2, &[]);
225        assert_eq!(guard.get_counter("accumulating_counter"), Some(3));
226
227        ACCUMULATING_COUNTER.add(7, &[]);
228        assert_eq!(guard.get_counter("accumulating_counter"), Some(10));
229    }
230
231    #[test]
232    fn test_guard_isolation() {
233        // Test that each guard creates its own isolated ManualReader
234        let _guard1 = InMemoryMetrics::new();
235        let _guard2 = InMemoryMetrics::new();
236
237        // Create counters in each guard's context
238        {
239            // Switch to guard1's provider
240            let _temp_guard1 = InMemoryMetrics::new(); // This sets guard1's provider as global
241            crate::declare_static_counter!(ISOLATED_COUNTER_1, "isolated_counter_1");
242            ISOLATED_COUNTER_1.add(100, &[]);
243        }
244
245        {
246            // Switch to guard2's provider
247            let _temp_guard2 = InMemoryMetrics::new(); // This sets guard2's provider as global
248            crate::declare_static_counter!(ISOLATED_COUNTER_2, "isolated_counter_2");
249            ISOLATED_COUNTER_2.add(200, &[]);
250        }
251    }
252}