1use std::io::Cursor;
14use std::io::Read;
15use std::path::PathBuf;
16use std::time::Duration;
17
18use hyperactor_config::AttrValue;
19use hyperactor_config::CONFIG;
20use hyperactor_config::ConfigAttr;
21use hyperactor_config::attrs::declare_attrs;
22use serde::Deserialize;
23use serde::Serialize;
24use typeuri::Named;
25
26#[derive(Clone, Debug, Serialize, Named)]
28#[named("hyperactor::config::Pem")]
29pub enum Pem {
30 Value(Vec<u8>),
32 File(PathBuf),
34 StaticPath(&'static str),
36}
37
38impl<'de> Deserialize<'de> for Pem {
40 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
41 where
42 D: serde::Deserializer<'de>,
43 {
44 #[derive(Deserialize)]
45 enum PemDeserialize {
46 Value(Vec<u8>),
47 File(PathBuf),
48 StaticPath(String),
50 }
51
52 match PemDeserialize::deserialize(deserializer)? {
53 PemDeserialize::Value(v) => Ok(Pem::Value(v)),
54 PemDeserialize::File(p) => Ok(Pem::File(p)),
55 PemDeserialize::StaticPath(s) => Ok(Pem::File(PathBuf::from(s))),
57 }
58 }
59}
60
61impl AttrValue for Pem {
62 fn display(&self) -> String {
63 match self {
64 Pem::Value(data) => String::from_utf8_lossy(data).to_string(),
65 Pem::File(path) => path.display().to_string(),
66 Pem::StaticPath(path) => path.to_string(),
67 }
68 }
69
70 fn parse(value: &str) -> Result<Self, anyhow::Error> {
71 if value.trim_start().starts_with("-----BEGIN") {
73 Ok(Pem::Value(value.as_bytes().to_vec()))
74 } else {
75 Ok(Pem::File(PathBuf::from(value)))
76 }
77 }
78}
79
80impl Pem {
81 pub fn reader(&self) -> std::io::Result<Box<dyn Read + '_>> {
83 match self {
84 Pem::Value(data) => Ok(Box::new(Cursor::new(data))),
85 Pem::File(path) => Ok(Box::new(std::fs::File::open(path)?)),
86 Pem::StaticPath(path) => Ok(Box::new(std::fs::File::open(path)?)),
87 }
88 }
89}
90
91#[derive(Clone, Debug)]
93pub struct PemBundle {
94 pub ca: Pem,
96 pub cert: Pem,
98 pub key: Pem,
100}
101
102declare_attrs! {
104 @meta(CONFIG = ConfigAttr::new(
106 Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
107 Some("codec_max_frame_length".to_string()),
108 ))
109 pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; @meta(CONFIG = ConfigAttr::new(
113 Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
114 Some("message_delivery_timeout".to_string()),
115 ))
116 pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
117
118 @meta(CONFIG = ConfigAttr::new(
122 Some("HYPERACTOR_TERMINATED_SNAPSHOT_RETENTION".to_string()),
123 Some("terminated_snapshot_retention".to_string()),
124 ))
125 pub attr TERMINATED_SNAPSHOT_RETENTION: usize = 100;
126
127 @meta(CONFIG = ConfigAttr::new(
129 Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
130 Some("process_exit_timeout".to_string()),
131 ))
132 pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
133
134 @meta(CONFIG = ConfigAttr::new(
136 Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
137 Some("message_ack_time_interval".to_string()),
138 ))
139 pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
140
141 @meta(CONFIG = ConfigAttr::new(
143 Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
144 Some("message_ack_every_n_messages".to_string()),
145 ))
146 pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
147
148 @meta(CONFIG = ConfigAttr::new(
150 Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
151 Some("message_ttl_default".to_string()),
152 ))
153 pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
154
155 @meta(CONFIG = ConfigAttr::new(
157 Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
158 Some("split_max_buffer_size".to_string()),
159 ))
160 pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
161
162 @meta(CONFIG = ConfigAttr::new(
164 Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
165 Some("split_max_buffer_age".to_string()),
166 ))
167 pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
168
169 @meta(CONFIG = ConfigAttr::new(
171 Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
172 Some("stop_actor_timeout".to_string()),
173 ))
174 pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10);
175
176 @meta(CONFIG = ConfigAttr::new(
179 Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()),
180 Some("cleanup_timeout".to_string()),
181 ))
182 pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3);
183
184 @meta(CONFIG = ConfigAttr::new(
188 Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()),
189 Some("remote_allocator_heartbeat_interval".to_string()),
190 ))
191 pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_mins(5);
192
193 @meta(CONFIG = ConfigAttr::new(
195 Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
196 Some("channel_net_rx_buffer_full_check_interval".to_string()),
197 ))
198 pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
199
200 @meta(CONFIG = ConfigAttr::new(
203 Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
204 Some("message_latency_sampling_rate".to_string()),
205 ))
206 pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
207
208 @meta(CONFIG = ConfigAttr::new(
210 Some("HYPERACTOR_ENABLE_DEST_ACTOR_REORDERING_BUFFER".to_string()),
211 Some("enable_dest_actor_reordering_buffer".to_string()),
212 ))
213 pub attr ENABLE_DEST_ACTOR_REORDERING_BUFFER: bool = false;
214
215 @meta(CONFIG = ConfigAttr::new(
220 Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
221 Some("host_spawn_ready_timeout".to_string()),
222 ))
223 pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(30);
224
225 @meta(CONFIG = ConfigAttr::new(
228 Some("HYPERACTOR_SERVER_HEARTBEAT_INTERVAL".to_string()),
229 Some("server_heartbeat_interval".to_string()),
230 ))
231 pub attr SERVER_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
232
233 @meta(CONFIG = ConfigAttr::new(
238 Some("HYPERACTOR_FORWARDER_FLUSH_TIMEOUT".to_string()),
239 Some("forwarder_flush_timeout".to_string()),
240 ))
241 pub attr FORWARDER_FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
242
243 @meta(CONFIG = ConfigAttr::new(
245 Some("HYPERACTOR_TLS_CERT".to_string()),
246 Some("hyperactor_tls_cert".to_string()),
247 ).process_local())
248 pub attr TLS_CERT: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.crt");
249
250 @meta(CONFIG = ConfigAttr::new(
252 Some("HYPERACTOR_TLS_KEY".to_string()),
253 Some("hyperactor_tls_key".to_string()),
254 ).process_local())
255 pub attr TLS_KEY: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.key");
256
257 @meta(CONFIG = ConfigAttr::new(
259 Some("HYPERACTOR_TLS_CA".to_string()),
260 Some("hyperactor_tls_ca".to_string()),
261 ).process_local())
262 pub attr TLS_CA: Pem = Pem::StaticPath("/etc/hyperactor/tls/ca.crt");
263}
264
265#[cfg(test)]
266mod tests {
267 use std::collections::HashSet;
268
269 use hyperactor_config::Attrs;
270 use hyperactor_config::from_env;
271 use indoc::indoc;
272
273 use super::*;
274
275 fn logs_assert_unscoped(f: impl Fn(&[&str]) -> Result<(), String>) {
279 let buf = tracing_test::internal::global_buf().lock().unwrap();
280 let logs_str = std::str::from_utf8(&buf).expect("Logs contain invalid UTF8");
281 let lines: Vec<&str> = logs_str.lines().collect();
282 match f(&lines) {
283 Ok(()) => {}
284 Err(msg) => panic!("{}", msg),
285 }
286 }
287
288 const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
289
290 #[test]
291 fn test_default_config() {
292 let config = Attrs::new();
293 assert_eq!(
294 config[CODEC_MAX_FRAME_LENGTH],
295 CODEC_MAX_FRAME_LENGTH_DEFAULT
296 );
297 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
298 assert_eq!(
299 config[MESSAGE_ACK_TIME_INTERVAL],
300 Duration::from_millis(500)
301 );
302 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
303 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
304 assert_eq!(
305 config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
306 Duration::from_mins(5)
307 );
308 }
309
310 #[tracing_test::traced_test]
311 #[test]
312 #[cfg_attr(not(fbcode_build), ignore)]
314 fn test_from_env() {
315 unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
318 unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
320
321 let config = from_env();
322
323 assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
324 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_mins(1));
325 assert_eq!(
326 config[MESSAGE_ACK_TIME_INTERVAL],
327 Duration::from_millis(500)
328 ); let expected_lines: HashSet<&str> = indoc! {"
331 # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
332 # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
333 # export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5m
334 # export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s
335 # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
336 # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
337 # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
338 # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
339 # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
340 # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
341 export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
342 # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
343 export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
344 # export HYPERACTOR_CLEANUP_TIMEOUT=3s
345 # export HYPERACTOR_SPLIT_MAX_BUFFER_AGE=50ms
346 # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
347 # export HYPERACTOR_HOST_SPAWN_READY_TIMEOUT=30s
348 "}
349 .trim_end()
350 .lines()
351 .collect();
352
353 logs_assert_unscoped(|logged_lines: &[&str]| {
357 let mut expected_lines = expected_lines.clone(); for logged in logged_lines {
359 expected_lines.remove(logged);
360 }
361
362 if expected_lines.is_empty() {
363 Ok(())
364 } else {
365 Err(format!("missing log lines: {:?}", expected_lines))
366 }
367 });
368
369 unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
372 unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
374 }
375
376 #[test]
377 fn test_defaults() {
378 let config = Attrs::new();
380
381 assert!(config.is_empty());
383
384 assert_eq!(
386 config[CODEC_MAX_FRAME_LENGTH],
387 CODEC_MAX_FRAME_LENGTH_DEFAULT
388 );
389 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
390 assert_eq!(
391 config[MESSAGE_ACK_TIME_INTERVAL],
392 Duration::from_millis(500)
393 );
394 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
395 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
396
397 assert!(CODEC_MAX_FRAME_LENGTH.has_default());
399 assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
400 assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
401 assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
402 assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
403
404 assert_eq!(
406 CODEC_MAX_FRAME_LENGTH.default(),
407 Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
408 );
409 assert_eq!(
410 MESSAGE_DELIVERY_TIMEOUT.default(),
411 Some(&Duration::from_secs(30))
412 );
413 assert_eq!(
414 MESSAGE_ACK_TIME_INTERVAL.default(),
415 Some(&Duration::from_millis(500))
416 );
417 assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
418 assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
419 }
420
421 #[test]
422 fn test_serialization_only_includes_set_values() {
423 let mut config = Attrs::new();
424
425 let serialized = serde_json::to_string(&config).unwrap();
427 assert_eq!(serialized, "{}");
428
429 config[CODEC_MAX_FRAME_LENGTH] = 1024;
430
431 let serialized = serde_json::to_string(&config).unwrap();
432 assert!(serialized.contains("codec_max_frame_length"));
433 assert!(!serialized.contains("message_delivery_timeout")); let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
437
438 assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
440
441 assert_eq!(
443 restored_config[MESSAGE_DELIVERY_TIMEOUT],
444 Duration::from_secs(30)
445 );
446 }
447}