1use std::env;
16use std::fs::File;
17use std::io::Read;
18use std::path::Path;
19use std::sync::Arc;
20use std::sync::LazyLock;
21use std::sync::RwLock;
22use std::time::Duration;
23
24use crate::attrs::Attrs;
25use crate::attrs::declare_attrs;
26
27declare_attrs! {
29 pub attr CODEC_MAX_FRAME_LENGTH: usize = 1024 * 1024 * 1024; pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
34
35 pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
37
38 pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
40
41 pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
43
44 pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
46
47 pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(1);
49
50 pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
52}
53
54pub fn from_env() -> Attrs {
56 let mut config = Attrs::new();
57
58 if let Ok(val) = env::var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") {
60 if let Ok(parsed) = val.parse::<usize>() {
61 config[CODEC_MAX_FRAME_LENGTH] = parsed;
62 }
63 }
64
65 if let Ok(val) = env::var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") {
67 if let Ok(parsed) = val.parse::<u64>() {
68 config[MESSAGE_DELIVERY_TIMEOUT] = Duration::from_secs(parsed);
69 }
70 }
71
72 if let Ok(val) = env::var("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL_MS") {
74 if let Ok(parsed) = val.parse::<u64>() {
75 config[MESSAGE_ACK_TIME_INTERVAL] = Duration::from_millis(parsed);
76 }
77 }
78
79 if let Ok(val) = env::var("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES") {
81 if let Ok(parsed) = val.parse::<u64>() {
82 config[MESSAGE_ACK_EVERY_N_MESSAGES] = parsed;
83 }
84 }
85
86 if let Ok(val) = env::var("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE") {
88 if let Ok(parsed) = val.parse::<usize>() {
89 config[SPLIT_MAX_BUFFER_SIZE] = parsed;
90 }
91 }
92
93 if let Ok(val) = env::var("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL_SECS") {
95 if let Ok(parsed) = val.parse::<u64>() {
96 config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL] = Duration::from_secs(parsed);
97 }
98 }
99
100 config
101}
102
103pub fn from_yaml<P: AsRef<Path>>(path: P) -> Result<Attrs, anyhow::Error> {
105 let mut file = File::open(path)?;
106 let mut contents = String::new();
107 file.read_to_string(&mut contents)?;
108 Ok(serde_yaml::from_str(&contents)?)
109}
110
111pub fn to_yaml<P: AsRef<Path>>(attrs: &Attrs, path: P) -> Result<(), anyhow::Error> {
113 let yaml = serde_yaml::to_string(attrs)?;
114 std::fs::write(path, yaml)?;
115 Ok(())
116}
117
118pub fn merge(config: &mut Attrs, other: &Attrs) {
120 if other.contains_key(CODEC_MAX_FRAME_LENGTH) {
121 config[CODEC_MAX_FRAME_LENGTH] = other[CODEC_MAX_FRAME_LENGTH];
122 }
123 if other.contains_key(MESSAGE_DELIVERY_TIMEOUT) {
124 config[MESSAGE_DELIVERY_TIMEOUT] = other[MESSAGE_DELIVERY_TIMEOUT];
125 }
126 if other.contains_key(MESSAGE_ACK_TIME_INTERVAL) {
127 config[MESSAGE_ACK_TIME_INTERVAL] = other[MESSAGE_ACK_TIME_INTERVAL];
128 }
129 if other.contains_key(MESSAGE_ACK_EVERY_N_MESSAGES) {
130 config[MESSAGE_ACK_EVERY_N_MESSAGES] = other[MESSAGE_ACK_EVERY_N_MESSAGES];
131 }
132 if other.contains_key(SPLIT_MAX_BUFFER_SIZE) {
133 config[SPLIT_MAX_BUFFER_SIZE] = other[SPLIT_MAX_BUFFER_SIZE];
134 }
135 if other.contains_key(REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL) {
136 config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL] = other[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL];
137 }
138}
139
140pub mod global {
158 use std::marker::PhantomData;
159
160 use super::*;
161 use crate::attrs::Key;
162
163 static CONFIG: LazyLock<Arc<RwLock<Attrs>>> =
165 LazyLock::new(|| Arc::new(RwLock::new(from_env())));
166
167 pub fn lock() -> ConfigLock {
180 static MUTEX: LazyLock<std::sync::Mutex<()>> = LazyLock::new(|| std::sync::Mutex::new(()));
181 ConfigLock {
182 _guard: MUTEX.lock().unwrap(),
183 }
184 }
185
186 pub fn init_from_env() {
188 let config = from_env();
189 let mut global_config = CONFIG.write().unwrap();
190 *global_config = config;
191 }
192
193 pub fn init_from_yaml<P: AsRef<Path>>(path: P) -> Result<(), anyhow::Error> {
195 let config = from_yaml(path)?;
196 let mut global_config = CONFIG.write().unwrap();
197 *global_config = config;
198 Ok(())
199 }
200
201 pub fn get<
204 T: Send
205 + Sync
206 + Copy
207 + serde::Serialize
208 + serde::de::DeserializeOwned
209 + crate::data::Named
210 + 'static,
211 >(
212 key: Key<T>,
213 ) -> T {
214 *CONFIG.read().unwrap().get(key).unwrap()
215 }
216
217 pub fn attrs() -> Attrs {
219 CONFIG.read().unwrap().clone()
220 }
221
222 pub fn reset_to_defaults() {
227 let mut config = CONFIG.write().unwrap();
228 *config = Attrs::new();
229 }
230
231 pub struct ConfigLock {
237 _guard: std::sync::MutexGuard<'static, ()>,
238 }
239
240 impl ConfigLock {
241 pub fn override_key<
245 'a,
246 T: Send
247 + Sync
248 + serde::Serialize
249 + serde::de::DeserializeOwned
250 + crate::data::Named
251 + Clone
252 + 'static,
253 >(
254 &'a self,
255 key: crate::attrs::Key<T>,
256 value: T,
257 ) -> ConfigValueGuard<'a, T> {
258 let orig = {
259 let mut config = CONFIG.write().unwrap();
260 let orig = config.take_value(key);
261 config.set(key, value);
262 orig
263 };
264
265 ConfigValueGuard {
266 key,
267 orig,
268 _phantom: PhantomData,
269 }
270 }
271 }
272
273 pub struct ConfigValueGuard<'a, T: 'static> {
275 key: crate::attrs::Key<T>,
276 orig: Option<Box<dyn crate::attrs::SerializableValue>>,
277 _phantom: PhantomData<&'a ()>,
279 }
280
281 impl<T: 'static> Drop for ConfigValueGuard<'_, T> {
282 fn drop(&mut self) {
283 let mut config = CONFIG.write().unwrap();
284 if let Some(orig) = self.orig.take() {
285 config.restore_value(self.key, orig);
286 } else {
287 config.remove_value(self.key);
288 }
289 }
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296
297 #[test]
298 fn test_default_config() {
299 let config = Attrs::new();
300 assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024 * 1024 * 1024);
301 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
302 assert_eq!(
303 config[MESSAGE_ACK_TIME_INTERVAL],
304 Duration::from_millis(500)
305 );
306 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
307 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
308 assert_eq!(
309 config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
310 Duration::from_secs(5)
311 );
312 }
313
314 #[test]
315 fn test_from_env() {
316 unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
319 unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS", "60") };
321
322 let config = from_env();
323
324 assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
325 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(60));
326 assert_eq!(
327 config[MESSAGE_ACK_TIME_INTERVAL],
328 Duration::from_millis(500)
329 ); unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
334 unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
336 }
337
338 #[test]
339 fn test_merge() {
340 let mut config1 = Attrs::new();
341 let mut config2 = Attrs::new();
342 config2[CODEC_MAX_FRAME_LENGTH] = 1024;
343 config2[MESSAGE_DELIVERY_TIMEOUT] = Duration::from_secs(60);
344
345 merge(&mut config1, &config2);
346
347 assert_eq!(config1[CODEC_MAX_FRAME_LENGTH], 1024);
348 assert_eq!(config1[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(60));
349 }
350
351 #[test]
352 fn test_global_config() {
353 let config = global::lock();
354
355 global::reset_to_defaults();
357
358 assert_eq!(global::get(CODEC_MAX_FRAME_LENGTH), 1024 * 1024 * 1024);
359 {
360 let _guard = config.override_key(CODEC_MAX_FRAME_LENGTH, 1024);
361 assert_eq!(global::get(CODEC_MAX_FRAME_LENGTH), 1024);
362 }
363 assert_eq!(global::get(CODEC_MAX_FRAME_LENGTH), 1024 * 1024 * 1024);
364
365 {
366 let _guard = config.override_key(CODEC_MAX_FRAME_LENGTH, 1024);
367 assert_eq!(global::get(CODEC_MAX_FRAME_LENGTH), 1024);
368
369 }
371
372 assert_eq!(global::get(CODEC_MAX_FRAME_LENGTH), 1024 * 1024 * 1024);
373 }
374
375 #[test]
376 fn test_defaults() {
377 let config = Attrs::new();
379
380 assert!(config.is_empty());
382
383 assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024 * 1024 * 1024);
385 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
386 assert_eq!(
387 config[MESSAGE_ACK_TIME_INTERVAL],
388 Duration::from_millis(500)
389 );
390 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
391 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
392
393 assert!(CODEC_MAX_FRAME_LENGTH.has_default());
395 assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
396 assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
397 assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
398 assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
399
400 assert_eq!(
402 CODEC_MAX_FRAME_LENGTH.default(),
403 Some(&(1024 * 1024 * 1024))
404 );
405 assert_eq!(
406 MESSAGE_DELIVERY_TIMEOUT.default(),
407 Some(&Duration::from_secs(30))
408 );
409 assert_eq!(
410 MESSAGE_ACK_TIME_INTERVAL.default(),
411 Some(&Duration::from_millis(500))
412 );
413 assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
414 assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
415 }
416
417 #[test]
418 fn test_serialization_only_includes_set_values() {
419 let mut config = Attrs::new();
420
421 let serialized = serde_json::to_string(&config).unwrap();
423 assert_eq!(serialized, "{}");
424
425 config[CODEC_MAX_FRAME_LENGTH] = 1024;
426
427 let serialized = serde_json::to_string(&config).unwrap();
428 assert!(serialized.contains("codec_max_frame_length"));
429 assert!(!serialized.contains("message_delivery_timeout")); let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
433
434 assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
436
437 assert_eq!(
439 restored_config[MESSAGE_DELIVERY_TIMEOUT],
440 Duration::from_secs(30)
441 );
442 }
443
444 #[test]
445 fn test_overrides() {
446 let config = global::lock();
447
448 global::reset_to_defaults();
450
451 assert_eq!(global::get(CODEC_MAX_FRAME_LENGTH), 1024 * 1024 * 1024);
453 assert_eq!(
454 global::get(MESSAGE_DELIVERY_TIMEOUT),
455 Duration::from_secs(30)
456 );
457
458 {
460 let _guard = config.override_key(CODEC_MAX_FRAME_LENGTH, 2048);
461 assert_eq!(global::get(CODEC_MAX_FRAME_LENGTH), 2048);
462 assert_eq!(
463 global::get(MESSAGE_DELIVERY_TIMEOUT),
464 Duration::from_secs(30)
465 ); }
467
468 assert_eq!(global::get(CODEC_MAX_FRAME_LENGTH), 1024 * 1024 * 1024);
470
471 {
473 let _guard1 = config.override_key(CODEC_MAX_FRAME_LENGTH, 4096);
474 let _guard2 = config.override_key(MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(60));
475
476 assert_eq!(global::get(CODEC_MAX_FRAME_LENGTH), 4096);
477 assert_eq!(
478 global::get(MESSAGE_DELIVERY_TIMEOUT),
479 Duration::from_secs(60)
480 );
481 }
482
483 assert_eq!(global::get(CODEC_MAX_FRAME_LENGTH), 1024 * 1024 * 1024);
485 assert_eq!(
486 global::get(MESSAGE_DELIVERY_TIMEOUT),
487 Duration::from_secs(30)
488 );
489 }
490}