1use std::io::Cursor;
14use std::io::Read;
15use std::path::PathBuf;
16use std::time::Duration;
17
18use hyperactor_config::AttrValue;
19use hyperactor_config::CONFIG;
20use hyperactor_config::ConfigAttr;
21use hyperactor_config::attrs::declare_attrs;
22use serde::Deserialize;
23use serde::Serialize;
24use typeuri::Named;
25
26#[derive(Clone, Debug, Serialize, Named)]
28#[named("hyperactor::config::Pem")]
29pub enum Pem {
30 Value(Vec<u8>),
32 File(PathBuf),
34 StaticPath(&'static str),
36}
37
38impl<'de> Deserialize<'de> for Pem {
40 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
41 where
42 D: serde::Deserializer<'de>,
43 {
44 #[derive(Deserialize)]
45 enum PemDeserialize {
46 Value(Vec<u8>),
47 File(PathBuf),
48 StaticPath(String),
50 }
51
52 match PemDeserialize::deserialize(deserializer)? {
53 PemDeserialize::Value(v) => Ok(Pem::Value(v)),
54 PemDeserialize::File(p) => Ok(Pem::File(p)),
55 PemDeserialize::StaticPath(s) => Ok(Pem::File(PathBuf::from(s))),
57 }
58 }
59}
60
61impl AttrValue for Pem {
62 fn display(&self) -> String {
63 match self {
64 Pem::Value(data) => String::from_utf8_lossy(data).to_string(),
65 Pem::File(path) => path.display().to_string(),
66 Pem::StaticPath(path) => path.to_string(),
67 }
68 }
69
70 fn parse(value: &str) -> Result<Self, anyhow::Error> {
71 if value.trim_start().starts_with("-----BEGIN") {
73 Ok(Pem::Value(value.as_bytes().to_vec()))
74 } else {
75 Ok(Pem::File(PathBuf::from(value)))
76 }
77 }
78}
79
80impl Pem {
81 pub fn reader(&self) -> std::io::Result<Box<dyn Read + '_>> {
83 match self {
84 Pem::Value(data) => Ok(Box::new(Cursor::new(data))),
85 Pem::File(path) => Ok(Box::new(std::fs::File::open(path)?)),
86 Pem::StaticPath(path) => Ok(Box::new(std::fs::File::open(path)?)),
87 }
88 }
89}
90
91#[derive(Clone, Debug)]
93pub struct PemBundle {
94 pub ca: Pem,
96 pub cert: Pem,
98 pub key: Pem,
100}
101
102declare_attrs! {
104 @meta(CONFIG = ConfigAttr::new(
106 Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
107 Some("codec_max_frame_length".to_string()),
108 ))
109 pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; @meta(CONFIG = ConfigAttr::new(
113 Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
114 Some("message_delivery_timeout".to_string()),
115 ))
116 pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
117
118 @meta(CONFIG = ConfigAttr::new(
122 Some("HYPERACTOR_TERMINATED_SNAPSHOT_RETENTION".to_string()),
123 Some("terminated_snapshot_retention".to_string()),
124 ))
125 pub attr TERMINATED_SNAPSHOT_RETENTION: usize = 100;
126
127 @meta(CONFIG = ConfigAttr::new(
129 Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
130 Some("process_exit_timeout".to_string()),
131 ))
132 pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
133
134 @meta(CONFIG = ConfigAttr::new(
136 Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
137 Some("message_ack_time_interval".to_string()),
138 ))
139 pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
140
141 @meta(CONFIG = ConfigAttr::new(
143 Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
144 Some("message_ack_every_n_messages".to_string()),
145 ))
146 pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
147
148 @meta(CONFIG = ConfigAttr::new(
150 Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
151 Some("message_ttl_default".to_string()),
152 ))
153 pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
154
155 @meta(CONFIG = ConfigAttr::new(
157 Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
158 Some("split_max_buffer_size".to_string()),
159 ))
160 pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
161
162 @meta(CONFIG = ConfigAttr::new(
164 Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
165 Some("split_max_buffer_age".to_string()),
166 ))
167 pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
168
169 @meta(CONFIG = ConfigAttr::new(
171 Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
172 Some("stop_actor_timeout".to_string()),
173 ))
174 pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10);
175
176 @meta(CONFIG = ConfigAttr::new(
179 Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()),
180 Some("cleanup_timeout".to_string()),
181 ))
182 pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3);
183
184 @meta(CONFIG = ConfigAttr::new(
188 Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()),
189 Some("remote_allocator_heartbeat_interval".to_string()),
190 ))
191 pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_mins(5);
192
193 @meta(CONFIG = ConfigAttr::new(
195 Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
196 Some("channel_net_rx_buffer_full_check_interval".to_string()),
197 ))
198 pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
199
200 @meta(CONFIG = ConfigAttr::new(
203 Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
204 Some("message_latency_sampling_rate".to_string()),
205 ))
206 pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
207
208 @meta(CONFIG = ConfigAttr::new(
210 Some("HYPERACTOR_ENABLE_DEST_ACTOR_REORDERING_BUFFER".to_string()),
211 Some("enable_dest_actor_reordering_buffer".to_string()),
212 ))
213 pub attr ENABLE_DEST_ACTOR_REORDERING_BUFFER: bool = false;
214
215 @meta(CONFIG = ConfigAttr::new(
220 Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
221 Some("host_spawn_ready_timeout".to_string()),
222 ))
223 pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(30);
224
225 @meta(CONFIG = ConfigAttr::new(
228 Some("HYPERACTOR_SERVER_HEARTBEAT_INTERVAL".to_string()),
229 Some("server_heartbeat_interval".to_string()),
230 ))
231 pub attr SERVER_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
232
233 @meta(CONFIG = ConfigAttr::new(
235 Some("HYPERACTOR_TLS_CERT".to_string()),
236 Some("hyperactor_tls_cert".to_string()),
237 ).process_local())
238 pub attr TLS_CERT: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.crt");
239
240 @meta(CONFIG = ConfigAttr::new(
242 Some("HYPERACTOR_TLS_KEY".to_string()),
243 Some("hyperactor_tls_key".to_string()),
244 ).process_local())
245 pub attr TLS_KEY: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.key");
246
247 @meta(CONFIG = ConfigAttr::new(
249 Some("HYPERACTOR_TLS_CA".to_string()),
250 Some("hyperactor_tls_ca".to_string()),
251 ).process_local())
252 pub attr TLS_CA: Pem = Pem::StaticPath("/etc/hyperactor/tls/ca.crt");
253}
254
255#[cfg(test)]
256mod tests {
257 use std::collections::HashSet;
258
259 use hyperactor_config::Attrs;
260 use hyperactor_config::from_env;
261 use indoc::indoc;
262
263 use super::*;
264
265 fn logs_assert_unscoped(f: impl Fn(&[&str]) -> Result<(), String>) {
269 let buf = tracing_test::internal::global_buf().lock().unwrap();
270 let logs_str = std::str::from_utf8(&buf).expect("Logs contain invalid UTF8");
271 let lines: Vec<&str> = logs_str.lines().collect();
272 match f(&lines) {
273 Ok(()) => {}
274 Err(msg) => panic!("{}", msg),
275 }
276 }
277
278 const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
279
280 #[test]
281 fn test_default_config() {
282 let config = Attrs::new();
283 assert_eq!(
284 config[CODEC_MAX_FRAME_LENGTH],
285 CODEC_MAX_FRAME_LENGTH_DEFAULT
286 );
287 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
288 assert_eq!(
289 config[MESSAGE_ACK_TIME_INTERVAL],
290 Duration::from_millis(500)
291 );
292 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
293 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
294 assert_eq!(
295 config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
296 Duration::from_mins(5)
297 );
298 }
299
300 #[tracing_test::traced_test]
301 #[test]
302 #[cfg_attr(not(fbcode_build), ignore)]
304 fn test_from_env() {
305 unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
308 unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
310
311 let config = from_env();
312
313 assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
314 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_mins(1));
315 assert_eq!(
316 config[MESSAGE_ACK_TIME_INTERVAL],
317 Duration::from_millis(500)
318 ); let expected_lines: HashSet<&str> = indoc! {"
321 # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
322 # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
323 # export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5m
324 # export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s
325 # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
326 # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
327 # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
328 # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
329 # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
330 # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
331 export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
332 # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
333 export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
334 # export HYPERACTOR_CLEANUP_TIMEOUT=3s
335 # export HYPERACTOR_SPLIT_MAX_BUFFER_AGE=50ms
336 # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
337 # export HYPERACTOR_HOST_SPAWN_READY_TIMEOUT=30s
338 "}
339 .trim_end()
340 .lines()
341 .collect();
342
343 logs_assert_unscoped(|logged_lines: &[&str]| {
347 let mut expected_lines = expected_lines.clone(); for logged in logged_lines {
349 expected_lines.remove(logged);
350 }
351
352 if expected_lines.is_empty() {
353 Ok(())
354 } else {
355 Err(format!("missing log lines: {:?}", expected_lines))
356 }
357 });
358
359 unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
362 unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
364 }
365
366 #[test]
367 fn test_defaults() {
368 let config = Attrs::new();
370
371 assert!(config.is_empty());
373
374 assert_eq!(
376 config[CODEC_MAX_FRAME_LENGTH],
377 CODEC_MAX_FRAME_LENGTH_DEFAULT
378 );
379 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
380 assert_eq!(
381 config[MESSAGE_ACK_TIME_INTERVAL],
382 Duration::from_millis(500)
383 );
384 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
385 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
386
387 assert!(CODEC_MAX_FRAME_LENGTH.has_default());
389 assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
390 assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
391 assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
392 assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
393
394 assert_eq!(
396 CODEC_MAX_FRAME_LENGTH.default(),
397 Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
398 );
399 assert_eq!(
400 MESSAGE_DELIVERY_TIMEOUT.default(),
401 Some(&Duration::from_secs(30))
402 );
403 assert_eq!(
404 MESSAGE_ACK_TIME_INTERVAL.default(),
405 Some(&Duration::from_millis(500))
406 );
407 assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
408 assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
409 }
410
411 #[test]
412 fn test_serialization_only_includes_set_values() {
413 let mut config = Attrs::new();
414
415 let serialized = serde_json::to_string(&config).unwrap();
417 assert_eq!(serialized, "{}");
418
419 config[CODEC_MAX_FRAME_LENGTH] = 1024;
420
421 let serialized = serde_json::to_string(&config).unwrap();
422 assert!(serialized.contains("codec_max_frame_length"));
423 assert!(!serialized.contains("message_delivery_timeout")); let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
427
428 assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
430
431 assert_eq!(
433 restored_config[MESSAGE_DELIVERY_TIMEOUT],
434 Duration::from_secs(30)
435 );
436 }
437}