1pub mod global;
27
28use std::env;
29use std::fs::File;
30use std::io::Read;
31use std::path::Path;
32use std::sync::Arc;
33use std::sync::LazyLock;
34use std::sync::RwLock;
35use std::time::Duration;
36
37use shell_quote::QuoteRefExt;
38
39use crate::attrs::AttrKeyInfo;
40use crate::attrs::Attrs;
41use crate::attrs::SerializableValue;
42use crate::attrs::declare_attrs;
43use crate::data::Encoding;
44
45declare_attrs! {
47 pub attr CONFIG_ENV_VAR: String;
50
51 pub attr PYTHON_CONFIG_KEY: String;
54
55 @meta(CONFIG_ENV_VAR = "HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string())
57 pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string())
61 pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
62
63 @meta(CONFIG_ENV_VAR = "HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string())
65 pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
66
67 @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string())
69 pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
70
71 @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string())
73 pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
74
75 @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string())
77 pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
78
79 @meta(CONFIG_ENV_VAR = "HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string())
81 pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
82
83 @meta(CONFIG_ENV_VAR = "HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string())
85 pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
86
87 @meta(CONFIG_ENV_VAR = "HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string())
89 pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(1);
90
91 @meta(CONFIG_ENV_VAR = "HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string())
93 pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
94
95 @meta(CONFIG_ENV_VAR = "HYPERACTOR_DEFAULT_ENCODING".to_string())
97 pub attr DEFAULT_ENCODING: Encoding = Encoding::Multipart;
98
99 @meta(CONFIG_ENV_VAR = "HYPERACTOR_CHANNEL_MULTIPART".to_string())
101 pub attr CHANNEL_MULTIPART: bool = true;
102
103 @meta(CONFIG_ENV_VAR = "HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string())
105 pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
106
107 @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string())
110 pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
111
112 pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false;
114
115 @meta(CONFIG_ENV_VAR = "HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string())
120 pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(10);
121}
122
123pub fn from_env() -> Attrs {
125 let mut config = Attrs::new();
126 let mut output = String::new();
127
128 fn export(env_var: &str, value: Option<&dyn SerializableValue>) -> String {
129 let env_var: String = env_var.quoted(shell_quote::Bash);
130 let value: String = value
131 .map_or("".to_string(), SerializableValue::display)
132 .quoted(shell_quote::Bash);
133 format!("export {}={}\n", env_var, value)
134 }
135
136 for key in inventory::iter::<AttrKeyInfo>() {
137 let Some(env_var) = key.meta.get(CONFIG_ENV_VAR) else {
138 continue;
139 };
140 let Ok(val) = env::var(env_var) else {
141 output.push_str("# ");
143 output.push_str(&export(env_var, key.default));
144 continue;
145 };
146
147 match (key.parse)(&val) {
148 Err(e) => {
149 tracing::error!(
150 "failed to override config key {} from value \"{}\" in ${}: {})",
151 key.name,
152 val,
153 env_var,
154 e
155 );
156 output.push_str("# ");
157 output.push_str(&export(env_var, key.default));
158 }
159 Ok(parsed) => {
160 output.push_str("# ");
161 output.push_str(&export(env_var, key.default));
162 output.push_str(&export(env_var, Some(parsed.as_ref())));
163 config.insert_value_by_name_unchecked(key.name, parsed);
164 }
165 }
166 }
167
168 tracing::info!(
169 "loaded configuration from environment:\n{}",
170 output.trim_end()
171 );
172
173 config
174}
175
176pub fn from_yaml<P: AsRef<Path>>(path: P) -> Result<Attrs, anyhow::Error> {
178 let mut file = File::open(path)?;
179 let mut contents = String::new();
180 file.read_to_string(&mut contents)?;
181 Ok(serde_yaml::from_str(&contents)?)
182}
183
184pub fn to_yaml<P: AsRef<Path>>(attrs: &Attrs, path: P) -> Result<(), anyhow::Error> {
186 let yaml = serde_yaml::to_string(attrs)?;
187 std::fs::write(path, yaml)?;
188 Ok(())
189}
190
191#[cfg(test)]
192mod tests {
193 use std::collections::HashSet;
194
195 use indoc::indoc;
196
197 use super::*;
198
199 const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
200
201 #[test]
202 fn test_default_config() {
203 let config = Attrs::new();
204 assert_eq!(
205 config[CODEC_MAX_FRAME_LENGTH],
206 CODEC_MAX_FRAME_LENGTH_DEFAULT
207 );
208 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
209 assert_eq!(
210 config[MESSAGE_ACK_TIME_INTERVAL],
211 Duration::from_millis(500)
212 );
213 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
214 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
215 assert_eq!(
216 config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
217 Duration::from_secs(5)
218 );
219 }
220
221 #[tracing_test::traced_test]
222 #[test]
223 #[cfg_attr(not(feature = "fb"), ignore)]
225 fn test_from_env() {
226 unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
229 unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
231
232 let config = from_env();
233
234 assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
235 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(60));
236 assert_eq!(
237 config[MESSAGE_ACK_TIME_INTERVAL],
238 Duration::from_millis(500)
239 ); let expected_lines: HashSet<&str> = indoc! {"
242 # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
243 # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
244 # export HYPERACTOR_CHANNEL_MULTIPART=true
245 # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
246 # export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5s
247 # export HYPERACTOR_STOP_ACTOR_TIMEOUT=1s
248 # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
249 # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
250 # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
251 # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
252 # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
253 # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
254 export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
255 # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
256 export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
257 "}
258 .trim_end()
259 .lines()
260 .collect();
261
262 logs_assert(|logged_lines: &[&str]| {
266 let mut expected_lines = expected_lines.clone(); for logged in logged_lines {
268 expected_lines.remove(logged);
269 }
270
271 if expected_lines.is_empty() {
272 Ok(())
273 } else {
274 Err(format!("missing log lines: {:?}", expected_lines))
275 }
276 });
277
278 unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
281 unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
283 }
284
285 #[test]
286 fn test_defaults() {
287 let config = Attrs::new();
289
290 assert!(config.is_empty());
292
293 assert_eq!(
295 config[CODEC_MAX_FRAME_LENGTH],
296 CODEC_MAX_FRAME_LENGTH_DEFAULT
297 );
298 assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
299 assert_eq!(
300 config[MESSAGE_ACK_TIME_INTERVAL],
301 Duration::from_millis(500)
302 );
303 assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
304 assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
305
306 assert!(CODEC_MAX_FRAME_LENGTH.has_default());
308 assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
309 assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
310 assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
311 assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
312
313 assert_eq!(
315 CODEC_MAX_FRAME_LENGTH.default(),
316 Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
317 );
318 assert_eq!(
319 MESSAGE_DELIVERY_TIMEOUT.default(),
320 Some(&Duration::from_secs(30))
321 );
322 assert_eq!(
323 MESSAGE_ACK_TIME_INTERVAL.default(),
324 Some(&Duration::from_millis(500))
325 );
326 assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
327 assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
328 }
329
330 #[test]
331 fn test_serialization_only_includes_set_values() {
332 let mut config = Attrs::new();
333
334 let serialized = serde_json::to_string(&config).unwrap();
336 assert_eq!(serialized, "{}");
337
338 config[CODEC_MAX_FRAME_LENGTH] = 1024;
339
340 let serialized = serde_json::to_string(&config).unwrap();
341 assert!(serialized.contains("codec_max_frame_length"));
342 assert!(!serialized.contains("message_delivery_timeout")); let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
346
347 assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
349
350 assert_eq!(
352 restored_config[MESSAGE_DELIVERY_TIMEOUT],
353 Duration::from_secs(30)
354 );
355 }
356}