1use std::time::Duration;
14
15use hyperactor_config::CONFIG;
16use hyperactor_config::ConfigAttr;
17use hyperactor_config::attrs::declare_attrs;
18
19declare_attrs! {
21 @meta(CONFIG = ConfigAttr {
23 env_name: Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
24 py_name: Some("codec_max_frame_length".to_string()),
25 })
26 pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; @meta(CONFIG = ConfigAttr {
30 env_name: Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
31 py_name: Some("message_delivery_timeout".to_string()),
32 })
33 pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
34
35 @meta(CONFIG = ConfigAttr {
37 env_name: Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
38 py_name: Some("process_exit_timeout".to_string()),
39 })
40 pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
41
42 @meta(CONFIG = ConfigAttr {
44 env_name: Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
45 py_name: Some("message_ack_time_interval".to_string()),
46 })
47 pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
48
49 @meta(CONFIG = ConfigAttr {
51 env_name: Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
52 py_name: Some("message_ack_every_n_messages".to_string()),
53 })
54 pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
55
56 @meta(CONFIG = ConfigAttr {
58 env_name: Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
59 py_name: Some("message_ttl_default".to_string()),
60 })
61 pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
62
63 @meta(CONFIG = ConfigAttr {
65 env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
66 py_name: Some("split_max_buffer_size".to_string()),
67 })
68 pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
69
70 @meta(CONFIG = ConfigAttr {
72 env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
73 py_name: Some("split_max_buffer_age".to_string()),
74 })
75 pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
76
77 @meta(CONFIG = ConfigAttr {
79 env_name: Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
80 py_name: Some("stop_actor_timeout".to_string()),
81 })
82 pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10);
83
84 @meta(CONFIG = ConfigAttr {
87 env_name: Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()),
88 py_name: Some("cleanup_timeout".to_string()),
89 })
90 pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3);
91
92 @meta(CONFIG = ConfigAttr {
96 env_name: Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()),
97 py_name: Some("remote_allocator_heartbeat_interval".to_string()),
98 })
99 pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_mins(5);
100
101 @meta(CONFIG = ConfigAttr {
103 env_name: Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
104 py_name: Some("channel_net_rx_buffer_full_check_interval".to_string()),
105 })
106 pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
107
108 @meta(CONFIG = ConfigAttr {
111 env_name: Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
112 py_name: Some("message_latency_sampling_rate".to_string()),
113 })
114 pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
115
116 @meta(CONFIG = ConfigAttr {
118 env_name: Some("HYPERACTOR_ENABLE_CLIENT_SEQ_ASSIGNMENT".to_string()),
119 py_name: Some("enable_client_seq_assignment".to_string()),
120 })
121 pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false;
122
123 @meta(CONFIG = ConfigAttr {
128 env_name: Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
129 py_name: Some("host_spawn_ready_timeout".to_string()),
130 })
131 pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(30);
132}
133
134#[cfg(test)]
135mod tests {
136 use std::collections::HashSet;
137
138 use hyperactor_config::Attrs;
139 use hyperactor_config::from_env;
140 use indoc::indoc;
141
142 use super::*;
143
144 const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
145
146 #[test]
147 fn test_default_config() {
148 let config = Attrs::new();
149 assert_eq!(
150 config[CODEC_MAX_FRAME_LENGTH],
151 CODEC_MAX_FRAME_LENGTH_DEFAULT
152 );
153 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
154 assert_eq!(
155 config[MESSAGE_ACK_TIME_INTERVAL],
156 Duration::from_millis(500)
157 );
158 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
159 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
160 assert_eq!(
161 config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
162 Duration::from_mins(5)
163 );
164 }
165
166 #[tracing_test::traced_test]
167 #[test]
168 #[cfg_attr(not(fbcode_build), ignore)]
170 fn test_from_env() {
171 unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
174 unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
176
177 let config = from_env();
178
179 assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
180 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_mins(1));
181 assert_eq!(
182 config[MESSAGE_ACK_TIME_INTERVAL],
183 Duration::from_millis(500)
184 ); let expected_lines: HashSet<&str> = indoc! {"
187 # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
188 # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
189 # export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5m
190 # export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s
191 # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
192 # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
193 # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
194 # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
195 # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
196 # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
197 export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
198 # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
199 export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
200 # export HYPERACTOR_CLEANUP_TIMEOUT=3s
201 # export HYPERACTOR_SPLIT_MAX_BUFFER_AGE=50ms
202 # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
203 # export HYPERACTOR_HOST_SPAWN_READY_TIMEOUT=30s
204 "}
205 .trim_end()
206 .lines()
207 .collect();
208
209 logs_assert(|logged_lines: &[&str]| {
213 let mut expected_lines = expected_lines.clone(); for logged in logged_lines {
215 expected_lines.remove(logged);
216 }
217
218 if expected_lines.is_empty() {
219 Ok(())
220 } else {
221 Err(format!("missing log lines: {:?}", expected_lines))
222 }
223 });
224
225 unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
228 unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
230 }
231
232 #[test]
233 fn test_defaults() {
234 let config = Attrs::new();
236
237 assert!(config.is_empty());
239
240 assert_eq!(
242 config[CODEC_MAX_FRAME_LENGTH],
243 CODEC_MAX_FRAME_LENGTH_DEFAULT
244 );
245 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
246 assert_eq!(
247 config[MESSAGE_ACK_TIME_INTERVAL],
248 Duration::from_millis(500)
249 );
250 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
251 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
252
253 assert!(CODEC_MAX_FRAME_LENGTH.has_default());
255 assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
256 assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
257 assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
258 assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
259
260 assert_eq!(
262 CODEC_MAX_FRAME_LENGTH.default(),
263 Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
264 );
265 assert_eq!(
266 MESSAGE_DELIVERY_TIMEOUT.default(),
267 Some(&Duration::from_secs(30))
268 );
269 assert_eq!(
270 MESSAGE_ACK_TIME_INTERVAL.default(),
271 Some(&Duration::from_millis(500))
272 );
273 assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
274 assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
275 }
276
277 #[test]
278 fn test_serialization_only_includes_set_values() {
279 let mut config = Attrs::new();
280
281 let serialized = serde_json::to_string(&config).unwrap();
283 assert_eq!(serialized, "{}");
284
285 config[CODEC_MAX_FRAME_LENGTH] = 1024;
286
287 let serialized = serde_json::to_string(&config).unwrap();
288 assert!(serialized.contains("codec_max_frame_length"));
289 assert!(!serialized.contains("message_delivery_timeout")); let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
293
294 assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
296
297 assert_eq!(
299 restored_config[MESSAGE_DELIVERY_TIMEOUT],
300 Duration::from_secs(30)
301 );
302 }
303}