1pub mod global;
27
28use std::env;
29use std::fs::File;
30use std::io::Read;
31use std::path::Path;
32use std::sync::Arc;
33use std::sync::LazyLock;
34use std::sync::RwLock;
35use std::time::Duration;
36
37use serde::Deserialize;
38use serde::Serialize;
39use shell_quote::QuoteRefExt;
40
41use crate as hyperactor;
42use crate::attrs::AttrKeyInfo;
43use crate::attrs::AttrValue;
44use crate::attrs::Attrs;
45use crate::attrs::SerializableValue;
46use crate::attrs::declare_attrs;
47use crate::data::Encoding; #[derive(Clone, Debug, Serialize, Deserialize, hyperactor::Named)]
62pub struct ConfigAttr {
63 pub env_name: Option<String>,
65
66 pub py_name: Option<String>,
69}
70
71impl AttrValue for ConfigAttr {
72 fn display(&self) -> String {
73 serde_json::to_string(self).unwrap_or_else(|_| "<invalid ConfigAttr>".into())
74 }
75 fn parse(s: &str) -> Result<Self, anyhow::Error> {
76 Ok(serde_json::from_str(s)?)
77 }
78}
79
80declare_attrs! {
82 pub attr CONFIG: ConfigAttr;
94
95 @meta(CONFIG = ConfigAttr {
97 env_name: Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
98 py_name: None,
99 })
100 pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; @meta(CONFIG = ConfigAttr {
104 env_name: Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
105 py_name: None,
106 })
107 pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
108
109 @meta(CONFIG = ConfigAttr {
111 env_name: Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
112 py_name: None,
113 })
114 pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
115
116 @meta(CONFIG = ConfigAttr {
118 env_name: Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
119 py_name: None,
120 })
121 pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
122
123 @meta(CONFIG = ConfigAttr {
125 env_name: Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
126 py_name: None,
127 })
128 pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
129
130 @meta(CONFIG = ConfigAttr {
132 env_name: Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
133 py_name: None,
134 })
135 pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
136
137 @meta(CONFIG = ConfigAttr {
139 env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
140 py_name: None,
141 })
142 pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
143
144 @meta(CONFIG = ConfigAttr {
146 env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
147 py_name: None,
148 })
149 pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
150
151 @meta(CONFIG = ConfigAttr {
153 env_name: Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
154 py_name: None,
155 })
156 pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10);
157
158 @meta(CONFIG = ConfigAttr {
161 env_name: Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()),
162 py_name: None,
163 })
164 pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3);
165
166 @meta(CONFIG = ConfigAttr {
170 env_name: Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()),
171 py_name: None,
172 })
173 pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(300);
174
175 @meta(CONFIG = ConfigAttr {
177 env_name: Some("HYPERACTOR_DEFAULT_ENCODING".to_string()),
178 py_name: None,
179 })
180 pub attr DEFAULT_ENCODING: Encoding = Encoding::Multipart;
181
182 @meta(CONFIG = ConfigAttr {
184 env_name: Some("HYPERACTOR_CHANNEL_MULTIPART".to_string()),
185 py_name: None,
186 })
187 pub attr CHANNEL_MULTIPART: bool = true;
188
189 @meta(CONFIG = ConfigAttr {
191 env_name: Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
192 py_name: None,
193 })
194 pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
195
196 @meta(CONFIG = ConfigAttr {
199 env_name: Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
200 py_name: None,
201 })
202 pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
203
204 pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false;
206
207 @meta(CONFIG = ConfigAttr {
212 env_name: Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
213 py_name: None,
214 })
215 pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(30);
216}
217
218pub fn from_env() -> Attrs {
220 let mut config = Attrs::new();
221 let mut output = String::new();
222
223 fn export(env_var: &str, value: Option<&dyn SerializableValue>) -> String {
224 let env_var: String = env_var.quoted(shell_quote::Bash);
225 let value: String = value
226 .map_or("".to_string(), SerializableValue::display)
227 .quoted(shell_quote::Bash);
228 format!("export {}={}\n", env_var, value)
229 }
230
231 for key in inventory::iter::<AttrKeyInfo>() {
232 let Some(cfg_meta) = key.meta.get(CONFIG) else {
237 continue;
238 };
239 let Some(env_var) = cfg_meta.env_name.as_deref() else {
240 continue;
241 };
242
243 let Ok(val) = env::var(env_var) else {
244 output.push_str("# ");
246 output.push_str(&export(env_var, key.default));
247 continue;
248 };
249
250 match (key.parse)(&val) {
251 Err(e) => {
252 tracing::error!(
253 "failed to override config key {} from value \"{}\" in ${}: {})",
254 key.name,
255 val,
256 env_var,
257 e
258 );
259 output.push_str("# ");
260 output.push_str(&export(env_var, key.default));
261 }
262 Ok(parsed) => {
263 output.push_str("# ");
264 output.push_str(&export(env_var, key.default));
265 output.push_str(&export(env_var, Some(parsed.as_ref())));
266 config.insert_value_by_name_unchecked(key.name, parsed);
267 }
268 }
269 }
270
271 tracing::info!(
272 "loaded configuration from environment:\n{}",
273 output.trim_end()
274 );
275
276 config
277}
278
279pub fn from_yaml<P: AsRef<Path>>(path: P) -> Result<Attrs, anyhow::Error> {
281 let mut file = File::open(path)?;
282 let mut contents = String::new();
283 file.read_to_string(&mut contents)?;
284 Ok(serde_yaml::from_str(&contents)?)
285}
286
287pub fn to_yaml<P: AsRef<Path>>(attrs: &Attrs, path: P) -> Result<(), anyhow::Error> {
289 let yaml = serde_yaml::to_string(attrs)?;
290 std::fs::write(path, yaml)?;
291 Ok(())
292}
293
294#[cfg(test)]
295mod tests {
296 use std::collections::HashSet;
297
298 use indoc::indoc;
299
300 use super::*;
301
302 const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
303
304 #[test]
305 fn test_default_config() {
306 let config = Attrs::new();
307 assert_eq!(
308 config[CODEC_MAX_FRAME_LENGTH],
309 CODEC_MAX_FRAME_LENGTH_DEFAULT
310 );
311 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
312 assert_eq!(
313 config[MESSAGE_ACK_TIME_INTERVAL],
314 Duration::from_millis(500)
315 );
316 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
317 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
318 assert_eq!(
319 config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
320 Duration::from_secs(300)
321 );
322 }
323
324 #[tracing_test::traced_test]
325 #[test]
326 #[cfg_attr(not(fbcode_build), ignore)]
328 fn test_from_env() {
329 unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
332 unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
334
335 let config = from_env();
336
337 assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
338 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(60));
339 assert_eq!(
340 config[MESSAGE_ACK_TIME_INTERVAL],
341 Duration::from_millis(500)
342 ); let expected_lines: HashSet<&str> = indoc! {"
345 # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
346 # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
347 # export HYPERACTOR_CHANNEL_MULTIPART=1
348 # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
349 # export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5m
350 # export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s
351 # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
352 # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
353 # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
354 # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
355 # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
356 # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
357 export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
358 # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
359 export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
360 "}
361 .trim_end()
362 .lines()
363 .collect();
364
365 logs_assert(|logged_lines: &[&str]| {
369 let mut expected_lines = expected_lines.clone(); for logged in logged_lines {
371 expected_lines.remove(logged);
372 }
373
374 if expected_lines.is_empty() {
375 Ok(())
376 } else {
377 Err(format!("missing log lines: {:?}", expected_lines))
378 }
379 });
380
381 unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
384 unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
386 }
387
388 #[test]
389 fn test_defaults() {
390 let config = Attrs::new();
392
393 assert!(config.is_empty());
395
396 assert_eq!(
398 config[CODEC_MAX_FRAME_LENGTH],
399 CODEC_MAX_FRAME_LENGTH_DEFAULT
400 );
401 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
402 assert_eq!(
403 config[MESSAGE_ACK_TIME_INTERVAL],
404 Duration::from_millis(500)
405 );
406 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
407 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
408
409 assert!(CODEC_MAX_FRAME_LENGTH.has_default());
411 assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
412 assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
413 assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
414 assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
415
416 assert_eq!(
418 CODEC_MAX_FRAME_LENGTH.default(),
419 Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
420 );
421 assert_eq!(
422 MESSAGE_DELIVERY_TIMEOUT.default(),
423 Some(&Duration::from_secs(30))
424 );
425 assert_eq!(
426 MESSAGE_ACK_TIME_INTERVAL.default(),
427 Some(&Duration::from_millis(500))
428 );
429 assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
430 assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
431 }
432
433 #[test]
434 fn test_serialization_only_includes_set_values() {
435 let mut config = Attrs::new();
436
437 let serialized = serde_json::to_string(&config).unwrap();
439 assert_eq!(serialized, "{}");
440
441 config[CODEC_MAX_FRAME_LENGTH] = 1024;
442
443 let serialized = serde_json::to_string(&config).unwrap();
444 assert!(serialized.contains("codec_max_frame_length"));
445 assert!(!serialized.contains("message_delivery_timeout")); let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
449
450 assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
452
453 assert_eq!(
455 restored_config[MESSAGE_DELIVERY_TIMEOUT],
456 Duration::from_secs(30)
457 );
458 }
459}