hyperactor/
config.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Configuration keys and I/O for hyperactor.
10//!
11//! This module declares all config keys (`declare_attrs!`) and
12//! provides helpers to load/save `Attrs` (from env via `from_env`,
13//! from YAML via `from_yaml`, and `to_yaml`). It also re-exports the
14//! process-wide layered store under [`crate::config::global`].
15//!
16//! For reading/writing the process-global configuration (layered
17//! resolution, test overrides), see [`crate::config::global`].
18
19/// Global layered configuration store.
20///
21/// This submodule defines the process-wide configuration layers
22/// (`File`, `Env`, `Runtime`, and `TestOverride`), resolution order,
23/// and guard types (`ConfigLock`, `ConfigValueGuard`) used for
24/// testing. Use this when you need to read or temporarily override
25/// values in the global configuration state.
26pub mod global;
27
28use std::env;
29use std::fs::File;
30use std::io::Read;
31use std::path::Path;
32use std::sync::Arc;
33use std::sync::LazyLock;
34use std::sync::RwLock;
35use std::time::Duration;
36
37use shell_quote::QuoteRefExt;
38
39use crate::attrs::AttrKeyInfo;
40use crate::attrs::Attrs;
41use crate::attrs::SerializableValue;
42use crate::attrs::declare_attrs;
43use crate::data::Encoding;
44
45// Declare configuration keys using the new attrs system with defaults
46declare_attrs! {
47    /// This is a meta-attribute specifying the environment variable used by the configuration
48    /// key.
49    pub attr CONFIG_ENV_VAR: String;
50
51    /// This is a meta-attribute specifying the name of the kwarg to pass to monarch.configure()
52    /// to set the attribute value in the global config.
53    pub attr PYTHON_CONFIG_KEY: String;
54
55    /// Maximum frame length for codec
56    @meta(CONFIG_ENV_VAR = "HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string())
57    pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
58
59    /// Message delivery timeout
60    @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string())
61    pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
62
63    /// Timeout used by allocator for stopping a proc.
64    @meta(CONFIG_ENV_VAR = "HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string())
65    pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
66
67    /// Message acknowledgment interval
68    @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string())
69    pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
70
71    /// Number of messages after which to send an acknowledgment
72    @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string())
73    pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
74
75    /// Default hop Time-To-Live for message envelopes.
76    @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string())
77    pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
78
79    /// Maximum buffer size for split port messages
80    @meta(CONFIG_ENV_VAR = "HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string())
81    pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
82
83    /// The maximum time an update can be buffered before being reduced.
84    @meta(CONFIG_ENV_VAR = "HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string())
85    pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
86
87    /// Timeout used by proc mesh for stopping an actor.
88    @meta(CONFIG_ENV_VAR = "HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string())
89    pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(1);
90
91    /// Heartbeat interval for remote allocator
92    @meta(CONFIG_ENV_VAR = "HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string())
93    pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
94
95    /// The default encoding to be used.
96    @meta(CONFIG_ENV_VAR = "HYPERACTOR_DEFAULT_ENCODING".to_string())
97    pub attr DEFAULT_ENCODING: Encoding = Encoding::Multipart;
98
99    /// Whether to use multipart encoding for network channel communications.
100    @meta(CONFIG_ENV_VAR = "HYPERACTOR_CHANNEL_MULTIPART".to_string())
101    pub attr CHANNEL_MULTIPART: bool = true;
102
103    /// How often to check for full MSPC channel on NetRx.
104    @meta(CONFIG_ENV_VAR = "HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string())
105    pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
106
107    /// Sampling rate for logging message latency
108    /// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc.
109    @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string())
110    pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
111
112    /// Whether to enable client sequence assignment.
113    pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false;
114
115    /// Timeout for [`Host::spawn`] to await proc readiness.
116    ///
117    /// Default: 10 seconds. If set to zero, disables the timeout and
118    /// waits indefinitely.
119    @meta(CONFIG_ENV_VAR = "HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string())
120    pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(10);
121}
122
123/// Load configuration from environment variables
124pub fn from_env() -> Attrs {
125    let mut config = Attrs::new();
126    let mut output = String::new();
127
128    fn export(env_var: &str, value: Option<&dyn SerializableValue>) -> String {
129        let env_var: String = env_var.quoted(shell_quote::Bash);
130        let value: String = value
131            .map_or("".to_string(), SerializableValue::display)
132            .quoted(shell_quote::Bash);
133        format!("export {}={}\n", env_var, value)
134    }
135
136    for key in inventory::iter::<AttrKeyInfo>() {
137        let Some(env_var) = key.meta.get(CONFIG_ENV_VAR) else {
138            continue;
139        };
140        let Ok(val) = env::var(env_var) else {
141            // Default value
142            output.push_str("# ");
143            output.push_str(&export(env_var, key.default));
144            continue;
145        };
146
147        match (key.parse)(&val) {
148            Err(e) => {
149                tracing::error!(
150                    "failed to override config key {} from value \"{}\" in ${}: {})",
151                    key.name,
152                    val,
153                    env_var,
154                    e
155                );
156                output.push_str("# ");
157                output.push_str(&export(env_var, key.default));
158            }
159            Ok(parsed) => {
160                output.push_str("# ");
161                output.push_str(&export(env_var, key.default));
162                output.push_str(&export(env_var, Some(parsed.as_ref())));
163                config.insert_value_by_name_unchecked(key.name, parsed);
164            }
165        }
166    }
167
168    tracing::info!(
169        "loaded configuration from environment:\n{}",
170        output.trim_end()
171    );
172
173    config
174}
175
176/// Load configuration from a YAML file
177pub fn from_yaml<P: AsRef<Path>>(path: P) -> Result<Attrs, anyhow::Error> {
178    let mut file = File::open(path)?;
179    let mut contents = String::new();
180    file.read_to_string(&mut contents)?;
181    Ok(serde_yaml::from_str(&contents)?)
182}
183
184/// Save configuration to a YAML file
185pub fn to_yaml<P: AsRef<Path>>(attrs: &Attrs, path: P) -> Result<(), anyhow::Error> {
186    let yaml = serde_yaml::to_string(attrs)?;
187    std::fs::write(path, yaml)?;
188    Ok(())
189}
190
191#[cfg(test)]
192mod tests {
193    use std::collections::HashSet;
194
195    use indoc::indoc;
196
197    use super::*;
198
199    const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
200
201    #[test]
202    fn test_default_config() {
203        let config = Attrs::new();
204        assert_eq!(
205            config[CODEC_MAX_FRAME_LENGTH],
206            CODEC_MAX_FRAME_LENGTH_DEFAULT
207        );
208        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
209        assert_eq!(
210            config[MESSAGE_ACK_TIME_INTERVAL],
211            Duration::from_millis(500)
212        );
213        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
214        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
215        assert_eq!(
216            config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
217            Duration::from_secs(5)
218        );
219    }
220
221    #[tracing_test::traced_test]
222    #[test]
223    // TODO: OSS: The logs_assert function returned an error: missing log lines: {"# export HYPERACTOR_DEFAULT_ENCODING=serde_multipart", ...}
224    #[cfg_attr(not(feature = "fb"), ignore)]
225    fn test_from_env() {
226        // Set environment variables
227        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
228        unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
229        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
230        unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
231
232        let config = from_env();
233
234        assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
235        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(60));
236        assert_eq!(
237            config[MESSAGE_ACK_TIME_INTERVAL],
238            Duration::from_millis(500)
239        ); // Default value
240
241        let expected_lines: HashSet<&str> = indoc! {"
242            # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
243            # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
244            # export HYPERACTOR_CHANNEL_MULTIPART=true
245            # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
246            # export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5s
247            # export HYPERACTOR_STOP_ACTOR_TIMEOUT=1s
248            # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
249            # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
250            # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
251            # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
252            # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
253            # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
254            export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
255            # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
256            export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
257        "}
258        .trim_end()
259        .lines()
260        .collect();
261
262        // For some reason, logs_contaqin fails to find these lines individually
263        // (possibly to do with the fact that we have newlines in our log entries);
264        // instead, we test it manually.
265        logs_assert(|logged_lines: &[&str]| {
266            let mut expected_lines = expected_lines.clone(); // this is an `Fn` closure
267            for logged in logged_lines {
268                expected_lines.remove(logged);
269            }
270
271            if expected_lines.is_empty() {
272                Ok(())
273            } else {
274                Err(format!("missing log lines: {:?}", expected_lines))
275            }
276        });
277
278        // Clean up
279        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
280        unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
281        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
282        unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
283    }
284
285    #[test]
286    fn test_defaults() {
287        // Test that empty config now returns defaults via get_or_default
288        let config = Attrs::new();
289
290        // Verify that the config is empty (no values explicitly set)
291        assert!(config.is_empty());
292
293        // But getters should still return the defaults from the keys
294        assert_eq!(
295            config[CODEC_MAX_FRAME_LENGTH],
296            CODEC_MAX_FRAME_LENGTH_DEFAULT
297        );
298        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
299        assert_eq!(
300            config[MESSAGE_ACK_TIME_INTERVAL],
301            Duration::from_millis(500)
302        );
303        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
304        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
305
306        // Verify the keys have defaults
307        assert!(CODEC_MAX_FRAME_LENGTH.has_default());
308        assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
309        assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
310        assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
311        assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
312
313        // Verify we can get defaults directly from keys
314        assert_eq!(
315            CODEC_MAX_FRAME_LENGTH.default(),
316            Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
317        );
318        assert_eq!(
319            MESSAGE_DELIVERY_TIMEOUT.default(),
320            Some(&Duration::from_secs(30))
321        );
322        assert_eq!(
323            MESSAGE_ACK_TIME_INTERVAL.default(),
324            Some(&Duration::from_millis(500))
325        );
326        assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
327        assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
328    }
329
330    #[test]
331    fn test_serialization_only_includes_set_values() {
332        let mut config = Attrs::new();
333
334        // Initially empty, serialization should be empty
335        let serialized = serde_json::to_string(&config).unwrap();
336        assert_eq!(serialized, "{}");
337
338        config[CODEC_MAX_FRAME_LENGTH] = 1024;
339
340        let serialized = serde_json::to_string(&config).unwrap();
341        assert!(serialized.contains("codec_max_frame_length"));
342        assert!(!serialized.contains("message_delivery_timeout")); // Default not serialized
343
344        // Deserialize back
345        let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
346
347        // Custom value should be preserved
348        assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
349
350        // Defaults should still work for other values
351        assert_eq!(
352            restored_config[MESSAGE_DELIVERY_TIMEOUT],
353            Duration::from_secs(30)
354        );
355    }
356}