1use std::io::Cursor;
14use std::io::Read;
15use std::path::PathBuf;
16use std::time::Duration;
17
18use hyperactor_config::AttrValue;
19use hyperactor_config::CONFIG;
20use hyperactor_config::ConfigAttr;
21use hyperactor_config::attrs::declare_attrs;
22use serde::Deserialize;
23use serde::Serialize;
24use typeuri::Named;
25
26#[derive(Clone, Debug, Serialize, Named)]
28#[named("hyperactor::config::Pem")]
29pub enum Pem {
30 Value(Vec<u8>),
32 File(PathBuf),
34 StaticPath(&'static str),
36}
37
38impl<'de> Deserialize<'de> for Pem {
40 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
41 where
42 D: serde::Deserializer<'de>,
43 {
44 #[derive(Deserialize)]
45 enum PemDeserialize {
46 Value(Vec<u8>),
47 File(PathBuf),
48 StaticPath(String),
50 }
51
52 match PemDeserialize::deserialize(deserializer)? {
53 PemDeserialize::Value(v) => Ok(Pem::Value(v)),
54 PemDeserialize::File(p) => Ok(Pem::File(p)),
55 PemDeserialize::StaticPath(s) => Ok(Pem::File(PathBuf::from(s))),
57 }
58 }
59}
60
61impl AttrValue for Pem {
62 fn display(&self) -> String {
63 match self {
64 Pem::Value(data) => String::from_utf8_lossy(data).to_string(),
65 Pem::File(path) => path.display().to_string(),
66 Pem::StaticPath(path) => path.to_string(),
67 }
68 }
69
70 fn parse(value: &str) -> Result<Self, anyhow::Error> {
71 if value.trim_start().starts_with("-----BEGIN") {
73 Ok(Pem::Value(value.as_bytes().to_vec()))
74 } else {
75 Ok(Pem::File(PathBuf::from(value)))
76 }
77 }
78}
79
80impl Pem {
81 pub fn reader(&self) -> std::io::Result<Box<dyn Read + '_>> {
83 match self {
84 Pem::Value(data) => Ok(Box::new(Cursor::new(data))),
85 Pem::File(path) => Ok(Box::new(std::fs::File::open(path)?)),
86 Pem::StaticPath(path) => Ok(Box::new(std::fs::File::open(path)?)),
87 }
88 }
89}
90
91#[derive(Clone, Debug)]
93pub struct PemBundle {
94 pub ca: Pem,
96 pub cert: Pem,
98 pub key: Pem,
100}
101
102declare_attrs! {
104 @meta(CONFIG = ConfigAttr::new(
106 Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
107 Some("codec_max_frame_length".to_string()),
108 ))
109 pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; @meta(CONFIG = ConfigAttr::new(
113 Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
114 Some("message_delivery_timeout".to_string()),
115 ))
116 pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
117
118 @meta(CONFIG = ConfigAttr::new(
122 Some("HYPERACTOR_TERMINATED_SNAPSHOT_RETENTION".to_string()),
123 Some("terminated_snapshot_retention".to_string()),
124 ))
125 pub attr TERMINATED_SNAPSHOT_RETENTION: usize = 100;
126
127 @meta(CONFIG = ConfigAttr::new(
129 Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
130 Some("process_exit_timeout".to_string()),
131 ))
132 pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
133
134 @meta(CONFIG = ConfigAttr::new(
136 Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
137 Some("message_ack_time_interval".to_string()),
138 ))
139 pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
140
141 @meta(CONFIG = ConfigAttr::new(
143 Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
144 Some("message_ack_every_n_messages".to_string()),
145 ))
146 pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
147
148 @meta(CONFIG = ConfigAttr::new(
150 Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
151 Some("message_ttl_default".to_string()),
152 ))
153 pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
154
155 @meta(CONFIG = ConfigAttr::new(
157 Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
158 Some("split_max_buffer_size".to_string()),
159 ))
160 pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
161
162 @meta(CONFIG = ConfigAttr::new(
164 Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
165 Some("split_max_buffer_age".to_string()),
166 ))
167 pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
168
169 @meta(CONFIG = ConfigAttr::new(
171 Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
172 Some("stop_actor_timeout".to_string()),
173 ))
174 pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10);
175
176 @meta(CONFIG = ConfigAttr::new(
179 Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()),
180 Some("cleanup_timeout".to_string()),
181 ))
182 pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3);
183
184 @meta(CONFIG = ConfigAttr::new(
186 Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
187 Some("channel_net_rx_buffer_full_check_interval".to_string()),
188 ))
189 pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
190
191 @meta(CONFIG = ConfigAttr::new(
194 Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
195 Some("message_latency_sampling_rate".to_string()),
196 ))
197 pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
198
199 @meta(CONFIG = ConfigAttr::new(
201 Some("HYPERACTOR_ENABLE_DEST_ACTOR_REORDERING_BUFFER".to_string()),
202 Some("enable_dest_actor_reordering_buffer".to_string()),
203 ))
204 pub attr ENABLE_DEST_ACTOR_REORDERING_BUFFER: bool = true;
205
206 @meta(CONFIG = ConfigAttr::new(
211 Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
212 Some("host_spawn_ready_timeout".to_string()),
213 ))
214 pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(30);
215
216 @meta(CONFIG = ConfigAttr::new(
219 Some("HYPERACTOR_SERVER_HEARTBEAT_INTERVAL".to_string()),
220 Some("server_heartbeat_interval".to_string()),
221 ))
222 pub attr SERVER_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
223
224 @meta(CONFIG = ConfigAttr::new(
229 Some("HYPERACTOR_FORWARDER_FLUSH_TIMEOUT".to_string()),
230 Some("forwarder_flush_timeout".to_string()),
231 ))
232 pub attr FORWARDER_FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
233
234 @meta(CONFIG = ConfigAttr::new(
236 Some("HYPERACTOR_TLS_CERT".to_string()),
237 Some("hyperactor_tls_cert".to_string()),
238 ).process_local())
239 pub attr TLS_CERT: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.crt");
240
241 @meta(CONFIG = ConfigAttr::new(
243 Some("HYPERACTOR_TLS_KEY".to_string()),
244 Some("hyperactor_tls_key".to_string()),
245 ).process_local())
246 pub attr TLS_KEY: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.key");
247
248 @meta(CONFIG = ConfigAttr::new(
250 Some("HYPERACTOR_TLS_CA".to_string()),
251 Some("hyperactor_tls_ca".to_string()),
252 ).process_local())
253 pub attr TLS_CA: Pem = Pem::StaticPath("/etc/hyperactor/tls/ca.crt");
254}
255
256#[cfg(test)]
257mod tests {
258 use std::collections::HashSet;
259
260 use hyperactor_config::Attrs;
261 use hyperactor_config::from_env;
262 use indoc::indoc;
263
264 use super::*;
265
266 fn logs_assert_unscoped(f: impl Fn(&[&str]) -> Result<(), String>) {
270 let buf = tracing_test::internal::global_buf().lock().unwrap();
271 let logs_str = std::str::from_utf8(&buf).expect("Logs contain invalid UTF8");
272 let lines: Vec<&str> = logs_str.lines().collect();
273 match f(&lines) {
274 Ok(()) => {}
275 Err(msg) => panic!("{}", msg),
276 }
277 }
278
279 const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
280
281 #[test]
282 fn test_default_config() {
283 let config = Attrs::new();
284 assert_eq!(
285 config[CODEC_MAX_FRAME_LENGTH],
286 CODEC_MAX_FRAME_LENGTH_DEFAULT
287 );
288 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
289 assert_eq!(
290 config[MESSAGE_ACK_TIME_INTERVAL],
291 Duration::from_millis(500)
292 );
293 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
294 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
295 }
296
297 #[tracing_test::traced_test]
298 #[test]
299 #[cfg_attr(not(fbcode_build), ignore)]
301 fn test_from_env() {
302 unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
305 unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
307
308 let config = from_env();
309
310 assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
311 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_mins(1));
312 assert_eq!(
313 config[MESSAGE_ACK_TIME_INTERVAL],
314 Duration::from_millis(500)
315 ); let expected_lines: HashSet<&str> = indoc! {"
318 # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
319 # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
320 # export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s
321 # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
322 # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
323 # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
324 # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
325 # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
326 # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
327 export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
328 # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
329 export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
330 # export HYPERACTOR_CLEANUP_TIMEOUT=3s
331 # export HYPERACTOR_SPLIT_MAX_BUFFER_AGE=50ms
332 # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
333 # export HYPERACTOR_HOST_SPAWN_READY_TIMEOUT=30s
334 "}
335 .trim_end()
336 .lines()
337 .collect();
338
339 logs_assert_unscoped(|logged_lines: &[&str]| {
343 let mut expected_lines = expected_lines.clone(); for logged in logged_lines {
345 expected_lines.remove(logged);
346 }
347
348 if expected_lines.is_empty() {
349 Ok(())
350 } else {
351 Err(format!("missing log lines: {:?}", expected_lines))
352 }
353 });
354
355 unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
358 unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
360 }
361
362 #[test]
363 fn test_defaults() {
364 let config = Attrs::new();
366
367 assert!(config.is_empty());
369
370 assert_eq!(
372 config[CODEC_MAX_FRAME_LENGTH],
373 CODEC_MAX_FRAME_LENGTH_DEFAULT
374 );
375 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
376 assert_eq!(
377 config[MESSAGE_ACK_TIME_INTERVAL],
378 Duration::from_millis(500)
379 );
380 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
381 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
382
383 assert!(CODEC_MAX_FRAME_LENGTH.has_default());
385 assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
386 assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
387 assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
388 assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
389
390 assert_eq!(
392 CODEC_MAX_FRAME_LENGTH.default(),
393 Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
394 );
395 assert_eq!(
396 MESSAGE_DELIVERY_TIMEOUT.default(),
397 Some(&Duration::from_secs(30))
398 );
399 assert_eq!(
400 MESSAGE_ACK_TIME_INTERVAL.default(),
401 Some(&Duration::from_millis(500))
402 );
403 assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
404 assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
405 }
406
407 #[test]
408 fn test_serialization_only_includes_set_values() {
409 let mut config = Attrs::new();
410
411 let serialized = serde_json::to_string(&config).unwrap();
413 assert_eq!(serialized, "{}");
414
415 config[CODEC_MAX_FRAME_LENGTH] = 1024;
416
417 let serialized = serde_json::to_string(&config).unwrap();
418 assert!(serialized.contains("codec_max_frame_length"));
419 assert!(!serialized.contains("message_delivery_timeout")); let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
423
424 assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
426
427 assert_eq!(
429 restored_config[MESSAGE_DELIVERY_TIMEOUT],
430 Duration::from_secs(30)
431 );
432 }
433}