hyperactor/
config.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Configuration keys and I/O for hyperactor.
10//!
11//! This module declares all config keys (`declare_attrs!`) and
12//! provides helpers to load/save `Attrs` (from env via `from_env`,
13//! from YAML via `from_yaml`, and `to_yaml`). It also re-exports the
14//! process-wide layered store under [`crate::config::global`].
15//!
16//! For reading/writing the process-global configuration (layered
17//! resolution, test overrides), see [`crate::config::global`].
18
19/// Global layered configuration store.
20///
21/// This submodule defines the process-wide configuration layers
22/// (`File`, `Env`, `Runtime`, and `TestOverride`), resolution order,
23/// and guard types (`ConfigLock`, `ConfigValueGuard`) used for
24/// testing. Use this when you need to read or temporarily override
25/// values in the global configuration state.
26pub mod global;
27
28use std::env;
29use std::fs::File;
30use std::io::Read;
31use std::path::Path;
32use std::sync::Arc;
33use std::sync::LazyLock;
34use std::sync::RwLock;
35use std::time::Duration;
36
37use serde::Deserialize;
38use serde::Serialize;
39use shell_quote::QuoteRefExt;
40
41use crate as hyperactor;
42use crate::attrs::AttrKeyInfo;
43use crate::attrs::AttrValue;
44use crate::attrs::Attrs;
45use crate::attrs::SerializableValue;
46use crate::attrs::declare_attrs;
47use crate::data::Encoding; // for macros
48
49/// Metadata describing how a configuration key is exposed across
50/// environments.
51///
52/// Each `ConfigAttr` entry defines how a Rust configuration key maps
53/// to external representations:
54///  - `env_name`: the environment variable consulted by
55///    [`init_from_env()`] when loading configuration.
56///  - `py_name`: the Python keyword argument accepted by
57///    `monarch.configure(...)` and returned by `get_configuration()`.
58///
59/// All configuration keys should carry this meta-attribute via
60/// `@meta(CONFIG = ConfigAttr { ... })`.
61#[derive(Clone, Debug, Serialize, Deserialize, hyperactor::Named)]
62pub struct ConfigAttr {
63    /// Environment variable consulted by `init_from_env()`.
64    pub env_name: Option<String>,
65
66    /// Python kwarg name used by `monarch.configure(...)` and
67    /// `get_configuration()`.
68    pub py_name: Option<String>,
69}
70
71impl AttrValue for ConfigAttr {
72    fn display(&self) -> String {
73        serde_json::to_string(self).unwrap_or_else(|_| "<invalid ConfigAttr>".into())
74    }
75    fn parse(s: &str) -> Result<Self, anyhow::Error> {
76        Ok(serde_json::from_str(s)?)
77    }
78}
79
80// Declare configuration keys using the new attrs system with defaults
81declare_attrs! {
82    /// This is a meta-attribute marking a configuration key.
83    ///
84    /// It carries metadata used to bridge Rust, environment
85    /// variables, and Python:
86    ///  - `env_name`: environment variable name consulted by
87    ///    `init_from_env()`.
88    ///  - `py_name`: keyword argument name recognized by
89    ///    `monarch.configure(...)`.
90    ///
91    /// All configuration keys should be annotated with this
92    /// attribute.
93    pub attr CONFIG: ConfigAttr;
94
95    /// Maximum frame length for codec
96    @meta(CONFIG = ConfigAttr {
97        env_name: Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
98        py_name: None,
99    })
100    pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
101
102    /// Message delivery timeout
103    @meta(CONFIG = ConfigAttr {
104        env_name: Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
105        py_name: None,
106    })
107    pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
108
109    /// Timeout used by allocator for stopping a proc.
110    @meta(CONFIG = ConfigAttr {
111        env_name: Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
112        py_name: None,
113    })
114    pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
115
116    /// Message acknowledgment interval
117    @meta(CONFIG = ConfigAttr {
118        env_name: Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
119        py_name: None,
120    })
121    pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
122
123    /// Number of messages after which to send an acknowledgment
124    @meta(CONFIG = ConfigAttr {
125        env_name: Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
126        py_name: None,
127    })
128    pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
129
130    /// Default hop Time-To-Live for message envelopes.
131    @meta(CONFIG = ConfigAttr {
132        env_name: Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
133        py_name: None,
134    })
135    pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
136
137    /// Maximum buffer size for split port messages
138    @meta(CONFIG = ConfigAttr {
139        env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
140        py_name: None,
141    })
142    pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
143
144    /// The maximum time an update can be buffered before being reduced.
145    @meta(CONFIG = ConfigAttr {
146        env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
147        py_name: None,
148    })
149    pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
150
151    /// Timeout used by proc mesh for stopping an actor.
152    @meta(CONFIG = ConfigAttr {
153        env_name: Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
154        py_name: None,
155    })
156    pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10);
157
158    /// Timeout used by proc for running the cleanup callback on an actor.
159    /// Should be less than the timeout for STOP_ACTOR_TIMEOUT.
160    @meta(CONFIG = ConfigAttr {
161        env_name: Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()),
162        py_name: None,
163    })
164    pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3);
165
166    /// Heartbeat interval for remote allocator. We do not rely on this heartbeat
167    /// anymore in v1, and it should be removed after we finishing the v0
168    /// deprecation.
169    @meta(CONFIG = ConfigAttr {
170        env_name: Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()),
171        py_name: None,
172    })
173    pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(300);
174
175    /// The default encoding to be used.
176    @meta(CONFIG = ConfigAttr {
177        env_name: Some("HYPERACTOR_DEFAULT_ENCODING".to_string()),
178        py_name: None,
179    })
180    pub attr DEFAULT_ENCODING: Encoding = Encoding::Multipart;
181
182    /// Whether to use multipart encoding for network channel communications.
183    @meta(CONFIG = ConfigAttr {
184        env_name: Some("HYPERACTOR_CHANNEL_MULTIPART".to_string()),
185        py_name: None,
186    })
187    pub attr CHANNEL_MULTIPART: bool = true;
188
189    /// How often to check for full MPSC channel on NetRx.
190    @meta(CONFIG = ConfigAttr {
191        env_name: Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
192        py_name: None,
193    })
194    pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
195
196    /// Sampling rate for logging message latency
197    /// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc.
198    @meta(CONFIG = ConfigAttr {
199        env_name: Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
200        py_name: None,
201    })
202    pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
203
204    /// Whether to enable client sequence assignment.
205    pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false;
206
207    /// Timeout for [`Host::spawn`] to await proc readiness.
208    ///
209    /// Default: 30 seconds. If set to zero, disables the timeout and
210    /// waits indefinitely.
211    @meta(CONFIG = ConfigAttr {
212        env_name: Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
213        py_name: None,
214    })
215    pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(30);
216}
217
218/// Load configuration from environment variables
219pub fn from_env() -> Attrs {
220    let mut config = Attrs::new();
221    let mut output = String::new();
222
223    fn export(env_var: &str, value: Option<&dyn SerializableValue>) -> String {
224        let env_var: String = env_var.quoted(shell_quote::Bash);
225        let value: String = value
226            .map_or("".to_string(), SerializableValue::display)
227            .quoted(shell_quote::Bash);
228        format!("export {}={}\n", env_var, value)
229    }
230
231    for key in inventory::iter::<AttrKeyInfo>() {
232        // Skip keys that are not marked as CONFIG or that do not
233        // declare an environment variable mapping. Only CONFIG-marked
234        // keys with an `env_name` participate in environment
235        // initialization.
236        let Some(cfg_meta) = key.meta.get(CONFIG) else {
237            continue;
238        };
239        let Some(env_var) = cfg_meta.env_name.as_deref() else {
240            continue;
241        };
242
243        let Ok(val) = env::var(env_var) else {
244            // Default value
245            output.push_str("# ");
246            output.push_str(&export(env_var, key.default));
247            continue;
248        };
249
250        match (key.parse)(&val) {
251            Err(e) => {
252                tracing::error!(
253                    "failed to override config key {} from value \"{}\" in ${}: {})",
254                    key.name,
255                    val,
256                    env_var,
257                    e
258                );
259                output.push_str("# ");
260                output.push_str(&export(env_var, key.default));
261            }
262            Ok(parsed) => {
263                output.push_str("# ");
264                output.push_str(&export(env_var, key.default));
265                output.push_str(&export(env_var, Some(parsed.as_ref())));
266                config.insert_value_by_name_unchecked(key.name, parsed);
267            }
268        }
269    }
270
271    tracing::info!(
272        "loaded configuration from environment:\n{}",
273        output.trim_end()
274    );
275
276    config
277}
278
279/// Load configuration from a YAML file
280pub fn from_yaml<P: AsRef<Path>>(path: P) -> Result<Attrs, anyhow::Error> {
281    let mut file = File::open(path)?;
282    let mut contents = String::new();
283    file.read_to_string(&mut contents)?;
284    Ok(serde_yaml::from_str(&contents)?)
285}
286
287/// Save configuration to a YAML file
288pub fn to_yaml<P: AsRef<Path>>(attrs: &Attrs, path: P) -> Result<(), anyhow::Error> {
289    let yaml = serde_yaml::to_string(attrs)?;
290    std::fs::write(path, yaml)?;
291    Ok(())
292}
293
294#[cfg(test)]
295mod tests {
296    use std::collections::HashSet;
297
298    use indoc::indoc;
299
300    use super::*;
301
302    const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
303
304    #[test]
305    fn test_default_config() {
306        let config = Attrs::new();
307        assert_eq!(
308            config[CODEC_MAX_FRAME_LENGTH],
309            CODEC_MAX_FRAME_LENGTH_DEFAULT
310        );
311        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
312        assert_eq!(
313            config[MESSAGE_ACK_TIME_INTERVAL],
314            Duration::from_millis(500)
315        );
316        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
317        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
318        assert_eq!(
319            config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
320            Duration::from_secs(300)
321        );
322    }
323
324    #[tracing_test::traced_test]
325    #[test]
326    // TODO: OSS: The logs_assert function returned an error: missing log lines: {"# export HYPERACTOR_DEFAULT_ENCODING=serde_multipart", ...}
327    #[cfg_attr(not(fbcode_build), ignore)]
328    fn test_from_env() {
329        // Set environment variables
330        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
331        unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
332        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
333        unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
334
335        let config = from_env();
336
337        assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
338        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(60));
339        assert_eq!(
340            config[MESSAGE_ACK_TIME_INTERVAL],
341            Duration::from_millis(500)
342        ); // Default value
343
344        let expected_lines: HashSet<&str> = indoc! {"
345            # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
346            # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
347            # export HYPERACTOR_CHANNEL_MULTIPART=1
348            # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
349            # export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5m
350            # export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s
351            # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
352            # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
353            # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
354            # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
355            # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
356            # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
357            export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
358            # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
359            export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
360        "}
361        .trim_end()
362        .lines()
363        .collect();
364
365        // For some reason, logs_contaqin fails to find these lines individually
366        // (possibly to do with the fact that we have newlines in our log entries);
367        // instead, we test it manually.
368        logs_assert(|logged_lines: &[&str]| {
369            let mut expected_lines = expected_lines.clone(); // this is an `Fn` closure
370            for logged in logged_lines {
371                expected_lines.remove(logged);
372            }
373
374            if expected_lines.is_empty() {
375                Ok(())
376            } else {
377                Err(format!("missing log lines: {:?}", expected_lines))
378            }
379        });
380
381        // Clean up
382        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
383        unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
384        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
385        unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
386    }
387
388    #[test]
389    fn test_defaults() {
390        // Test that empty config now returns defaults via get_or_default
391        let config = Attrs::new();
392
393        // Verify that the config is empty (no values explicitly set)
394        assert!(config.is_empty());
395
396        // But getters should still return the defaults from the keys
397        assert_eq!(
398            config[CODEC_MAX_FRAME_LENGTH],
399            CODEC_MAX_FRAME_LENGTH_DEFAULT
400        );
401        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
402        assert_eq!(
403            config[MESSAGE_ACK_TIME_INTERVAL],
404            Duration::from_millis(500)
405        );
406        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
407        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
408
409        // Verify the keys have defaults
410        assert!(CODEC_MAX_FRAME_LENGTH.has_default());
411        assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
412        assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
413        assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
414        assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
415
416        // Verify we can get defaults directly from keys
417        assert_eq!(
418            CODEC_MAX_FRAME_LENGTH.default(),
419            Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
420        );
421        assert_eq!(
422            MESSAGE_DELIVERY_TIMEOUT.default(),
423            Some(&Duration::from_secs(30))
424        );
425        assert_eq!(
426            MESSAGE_ACK_TIME_INTERVAL.default(),
427            Some(&Duration::from_millis(500))
428        );
429        assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
430        assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
431    }
432
433    #[test]
434    fn test_serialization_only_includes_set_values() {
435        let mut config = Attrs::new();
436
437        // Initially empty, serialization should be empty
438        let serialized = serde_json::to_string(&config).unwrap();
439        assert_eq!(serialized, "{}");
440
441        config[CODEC_MAX_FRAME_LENGTH] = 1024;
442
443        let serialized = serde_json::to_string(&config).unwrap();
444        assert!(serialized.contains("codec_max_frame_length"));
445        assert!(!serialized.contains("message_delivery_timeout")); // Default not serialized
446
447        // Deserialize back
448        let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
449
450        // Custom value should be preserved
451        assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
452
453        // Defaults should still work for other values
454        assert_eq!(
455            restored_config[MESSAGE_DELIVERY_TIMEOUT],
456            Duration::from_secs(30)
457        );
458    }
459}