hyperactor/
config.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Configuration keys and I/O for hyperactor.
10//!
11//! This module declares hyperactor-specific config keys.
12
13use std::time::Duration;
14
15use hyperactor_config::CONFIG;
16use hyperactor_config::ConfigAttr;
17use hyperactor_config::attrs::declare_attrs;
18
19// Declare hyperactor-specific configuration keys
20declare_attrs! {
21    /// Maximum frame length for codec
22    @meta(CONFIG = ConfigAttr {
23        env_name: Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
24        py_name: Some("codec_max_frame_length".to_string()),
25    })
26    pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
27
28    /// Message delivery timeout
29    @meta(CONFIG = ConfigAttr {
30        env_name: Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
31        py_name: Some("message_delivery_timeout".to_string()),
32    })
33    pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
34
35    /// Timeout used by allocator for stopping a proc.
36    @meta(CONFIG = ConfigAttr {
37        env_name: Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
38        py_name: Some("process_exit_timeout".to_string()),
39    })
40    pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
41
42    /// Message acknowledgment interval
43    @meta(CONFIG = ConfigAttr {
44        env_name: Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
45        py_name: Some("message_ack_time_interval".to_string()),
46    })
47    pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
48
49    /// Number of messages after which to send an acknowledgment
50    @meta(CONFIG = ConfigAttr {
51        env_name: Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
52        py_name: Some("message_ack_every_n_messages".to_string()),
53    })
54    pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
55
56    /// Default hop Time-To-Live for message envelopes.
57    @meta(CONFIG = ConfigAttr {
58        env_name: Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
59        py_name: Some("message_ttl_default".to_string()),
60    })
61    pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
62
63    /// Maximum buffer size for split port messages
64    @meta(CONFIG = ConfigAttr {
65        env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
66        py_name: Some("split_max_buffer_size".to_string()),
67    })
68    pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
69
70    /// The maximum time an update can be buffered before being reduced.
71    @meta(CONFIG = ConfigAttr {
72        env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
73        py_name: Some("split_max_buffer_age".to_string()),
74    })
75    pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
76
77    /// Timeout used by proc mesh for stopping an actor.
78    @meta(CONFIG = ConfigAttr {
79        env_name: Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
80        py_name: Some("stop_actor_timeout".to_string()),
81    })
82    pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10);
83
84    /// Timeout used by proc for running the cleanup callback on an actor.
85    /// Should be less than the timeout for STOP_ACTOR_TIMEOUT.
86    @meta(CONFIG = ConfigAttr {
87        env_name: Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()),
88        py_name: Some("cleanup_timeout".to_string()),
89    })
90    pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3);
91
92    /// Heartbeat interval for remote allocator. We do not rely on this heartbeat
93    /// anymore in v1, and it should be removed after we finishing the v0
94    /// deprecation.
95    @meta(CONFIG = ConfigAttr {
96        env_name: Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()),
97        py_name: Some("remote_allocator_heartbeat_interval".to_string()),
98    })
99    pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_mins(5);
100
101    /// How often to check for full MPSC channel on NetRx.
102    @meta(CONFIG = ConfigAttr {
103        env_name: Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
104        py_name: Some("channel_net_rx_buffer_full_check_interval".to_string()),
105    })
106    pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
107
108    /// Sampling rate for logging message latency
109    /// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc.
110    @meta(CONFIG = ConfigAttr {
111        env_name: Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
112        py_name: Some("message_latency_sampling_rate".to_string()),
113    })
114    pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
115
116    /// Whether to enable client sequence assignment.
117    @meta(CONFIG = ConfigAttr {
118        env_name: Some("HYPERACTOR_ENABLE_CLIENT_SEQ_ASSIGNMENT".to_string()),
119        py_name: Some("enable_client_seq_assignment".to_string()),
120    })
121    pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false;
122
123    /// Timeout for [`Host::spawn`] to await proc readiness.
124    ///
125    /// Default: 30 seconds. If set to zero, disables the timeout and
126    /// waits indefinitely.
127    @meta(CONFIG = ConfigAttr {
128        env_name: Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
129        py_name: Some("host_spawn_ready_timeout".to_string()),
130    })
131    pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(30);
132}
133
134#[cfg(test)]
135mod tests {
136    use std::collections::HashSet;
137
138    use hyperactor_config::Attrs;
139    use hyperactor_config::from_env;
140    use indoc::indoc;
141
142    use super::*;
143
144    const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
145
146    #[test]
147    fn test_default_config() {
148        let config = Attrs::new();
149        assert_eq!(
150            config[CODEC_MAX_FRAME_LENGTH],
151            CODEC_MAX_FRAME_LENGTH_DEFAULT
152        );
153        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
154        assert_eq!(
155            config[MESSAGE_ACK_TIME_INTERVAL],
156            Duration::from_millis(500)
157        );
158        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
159        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
160        assert_eq!(
161            config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
162            Duration::from_mins(5)
163        );
164    }
165
166    #[tracing_test::traced_test]
167    #[test]
168    // TODO: OSS: The logs_assert function returned an error: missing log lines: {"# export HYPERACTOR_DEFAULT_ENCODING=serde_multipart", ...}
169    #[cfg_attr(not(fbcode_build), ignore)]
170    fn test_from_env() {
171        // Set environment variables
172        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
173        unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
174        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
175        unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
176
177        let config = from_env();
178
179        assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
180        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_mins(1));
181        assert_eq!(
182            config[MESSAGE_ACK_TIME_INTERVAL],
183            Duration::from_millis(500)
184        ); // Default value
185
186        let expected_lines: HashSet<&str> = indoc! {"
187            # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
188            # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
189            # export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5m
190            # export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s
191            # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
192            # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
193            # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
194            # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
195            # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
196            # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
197            export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
198            # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
199            export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
200            # export HYPERACTOR_CLEANUP_TIMEOUT=3s
201            # export HYPERACTOR_SPLIT_MAX_BUFFER_AGE=50ms
202            # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
203            # export HYPERACTOR_HOST_SPAWN_READY_TIMEOUT=30s
204        "}
205        .trim_end()
206        .lines()
207        .collect();
208
209        // For some reason, logs_contaqin fails to find these lines individually
210        // (possibly to do with the fact that we have newlines in our log entries);
211        // instead, we test it manually.
212        logs_assert(|logged_lines: &[&str]| {
213            let mut expected_lines = expected_lines.clone(); // this is an `Fn` closure
214            for logged in logged_lines {
215                expected_lines.remove(logged);
216            }
217
218            if expected_lines.is_empty() {
219                Ok(())
220            } else {
221                Err(format!("missing log lines: {:?}", expected_lines))
222            }
223        });
224
225        // Clean up
226        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
227        unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
228        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
229        unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
230    }
231
232    #[test]
233    fn test_defaults() {
234        // Test that empty config now returns defaults via get_or_default
235        let config = Attrs::new();
236
237        // Verify that the config is empty (no values explicitly set)
238        assert!(config.is_empty());
239
240        // But getters should still return the defaults from the keys
241        assert_eq!(
242            config[CODEC_MAX_FRAME_LENGTH],
243            CODEC_MAX_FRAME_LENGTH_DEFAULT
244        );
245        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
246        assert_eq!(
247            config[MESSAGE_ACK_TIME_INTERVAL],
248            Duration::from_millis(500)
249        );
250        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
251        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
252
253        // Verify the keys have defaults
254        assert!(CODEC_MAX_FRAME_LENGTH.has_default());
255        assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
256        assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
257        assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
258        assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
259
260        // Verify we can get defaults directly from keys
261        assert_eq!(
262            CODEC_MAX_FRAME_LENGTH.default(),
263            Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
264        );
265        assert_eq!(
266            MESSAGE_DELIVERY_TIMEOUT.default(),
267            Some(&Duration::from_secs(30))
268        );
269        assert_eq!(
270            MESSAGE_ACK_TIME_INTERVAL.default(),
271            Some(&Duration::from_millis(500))
272        );
273        assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
274        assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
275    }
276
277    #[test]
278    fn test_serialization_only_includes_set_values() {
279        let mut config = Attrs::new();
280
281        // Initially empty, serialization should be empty
282        let serialized = serde_json::to_string(&config).unwrap();
283        assert_eq!(serialized, "{}");
284
285        config[CODEC_MAX_FRAME_LENGTH] = 1024;
286
287        let serialized = serde_json::to_string(&config).unwrap();
288        assert!(serialized.contains("codec_max_frame_length"));
289        assert!(!serialized.contains("message_delivery_timeout")); // Default not serialized
290
291        // Deserialize back
292        let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
293
294        // Custom value should be preserved
295        assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
296
297        // Defaults should still work for other values
298        assert_eq!(
299            restored_config[MESSAGE_DELIVERY_TIMEOUT],
300            Duration::from_secs(30)
301        );
302    }
303}