hyperactor/
config.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Configuration keys and I/O for hyperactor.
10//!
11//! This module declares hyperactor-specific config keys.
12
13use std::io::Cursor;
14use std::io::Read;
15use std::path::PathBuf;
16use std::time::Duration;
17
18use hyperactor_config::AttrValue;
19use hyperactor_config::CONFIG;
20use hyperactor_config::ConfigAttr;
21use hyperactor_config::attrs::declare_attrs;
22use serde::Deserialize;
23use serde::Serialize;
24use typeuri::Named;
25
26/// Stores a PEM-encoded value, either specified directly or read from a file.
27#[derive(Clone, Debug, Serialize, Named)]
28#[named("hyperactor::config::Pem")]
29pub enum Pem {
30    /// Raw PEM value stored directly.
31    Value(Vec<u8>),
32    /// Path to a file containing the PEM data.
33    File(PathBuf),
34    /// Static path string (for use in const/static contexts).
35    StaticPath(&'static str),
36}
37
38// Custom Deserialize implementation because StaticPath contains &'static str
39impl<'de> Deserialize<'de> for Pem {
40    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
41    where
42        D: serde::Deserializer<'de>,
43    {
44        #[derive(Deserialize)]
45        enum PemDeserialize {
46            Value(Vec<u8>),
47            File(PathBuf),
48            // StaticPath deserializes as a File since we can't deserialize &'static str
49            StaticPath(String),
50        }
51
52        match PemDeserialize::deserialize(deserializer)? {
53            PemDeserialize::Value(v) => Ok(Pem::Value(v)),
54            PemDeserialize::File(p) => Ok(Pem::File(p)),
55            // Convert StaticPath string to File during deserialization
56            PemDeserialize::StaticPath(s) => Ok(Pem::File(PathBuf::from(s))),
57        }
58    }
59}
60
61impl AttrValue for Pem {
62    fn display(&self) -> String {
63        match self {
64            Pem::Value(data) => String::from_utf8_lossy(data).to_string(),
65            Pem::File(path) => path.display().to_string(),
66            Pem::StaticPath(path) => path.to_string(),
67        }
68    }
69
70    fn parse(value: &str) -> Result<Self, anyhow::Error> {
71        // PEM data starts with "-----BEGIN"
72        if value.trim_start().starts_with("-----BEGIN") {
73            Ok(Pem::Value(value.as_bytes().to_vec()))
74        } else {
75            Ok(Pem::File(PathBuf::from(value)))
76        }
77    }
78}
79
80impl Pem {
81    /// Returns a reader for the PEM data.
82    pub fn reader(&self) -> std::io::Result<Box<dyn Read + '_>> {
83        match self {
84            Pem::Value(data) => Ok(Box::new(Cursor::new(data))),
85            Pem::File(path) => Ok(Box::new(std::fs::File::open(path)?)),
86            Pem::StaticPath(path) => Ok(Box::new(std::fs::File::open(path)?)),
87        }
88    }
89}
90
91/// A bundle of PEM files for TLS configuration: CA certificate, server/client certificate, and private key.
92#[derive(Clone, Debug)]
93pub struct PemBundle {
94    /// The CA certificate used to verify peer certificates.
95    pub ca: Pem,
96    /// The certificate to present to peers.
97    pub cert: Pem,
98    /// The private key for the certificate.
99    pub key: Pem,
100}
101
102// Declare hyperactor-specific configuration keys
103declare_attrs! {
104    /// Maximum frame length for codec
105    @meta(CONFIG = ConfigAttr::new(
106        Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
107        Some("codec_max_frame_length".to_string()),
108    ))
109    pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
110
111    /// Message delivery timeout
112    @meta(CONFIG = ConfigAttr::new(
113        Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
114        Some("message_delivery_timeout".to_string()),
115    ))
116    pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
117
118    /// Maximum number of terminated actor snapshots retained per
119    /// proc for post-mortem introspection. When the limit is
120    /// exceeded, the oldest entries are evicted.
121    @meta(CONFIG = ConfigAttr::new(
122        Some("HYPERACTOR_TERMINATED_SNAPSHOT_RETENTION".to_string()),
123        Some("terminated_snapshot_retention".to_string()),
124    ))
125    pub attr TERMINATED_SNAPSHOT_RETENTION: usize = 100;
126
127    /// Timeout used by allocator for stopping a proc.
128    @meta(CONFIG = ConfigAttr::new(
129        Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
130        Some("process_exit_timeout".to_string()),
131    ))
132    pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
133
134    /// Message acknowledgment interval
135    @meta(CONFIG = ConfigAttr::new(
136        Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
137        Some("message_ack_time_interval".to_string()),
138    ))
139    pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
140
141    /// Number of messages after which to send an acknowledgment
142    @meta(CONFIG = ConfigAttr::new(
143        Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
144        Some("message_ack_every_n_messages".to_string()),
145    ))
146    pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
147
148    /// Default hop Time-To-Live for message envelopes.
149    @meta(CONFIG = ConfigAttr::new(
150        Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
151        Some("message_ttl_default".to_string()),
152    ))
153    pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
154
155    /// Maximum buffer size for split port messages
156    @meta(CONFIG = ConfigAttr::new(
157        Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
158        Some("split_max_buffer_size".to_string()),
159    ))
160    pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
161
162    /// The maximum time an update can be buffered before being reduced.
163    @meta(CONFIG = ConfigAttr::new(
164        Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
165        Some("split_max_buffer_age".to_string()),
166    ))
167    pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
168
169    /// Timeout used by proc mesh for stopping an actor.
170    @meta(CONFIG = ConfigAttr::new(
171        Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
172        Some("stop_actor_timeout".to_string()),
173    ))
174    pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10);
175
176    /// Timeout used by proc for running the cleanup callback on an actor.
177    /// Should be less than the timeout for STOP_ACTOR_TIMEOUT.
178    @meta(CONFIG = ConfigAttr::new(
179        Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()),
180        Some("cleanup_timeout".to_string()),
181    ))
182    pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3);
183
184    /// Heartbeat interval for remote allocator. We do not rely on this heartbeat
185    /// anymore in v1, and it should be removed after we finishing the v0
186    /// deprecation.
187    @meta(CONFIG = ConfigAttr::new(
188        Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()),
189        Some("remote_allocator_heartbeat_interval".to_string()),
190    ))
191    pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_mins(5);
192
193    /// How often to check for full MPSC channel on NetRx.
194    @meta(CONFIG = ConfigAttr::new(
195        Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
196        Some("channel_net_rx_buffer_full_check_interval".to_string()),
197    ))
198    pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
199
200    /// Sampling rate for logging message latency
201    /// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc.
202    @meta(CONFIG = ConfigAttr::new(
203        Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
204        Some("message_latency_sampling_rate".to_string()),
205    ))
206    pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
207
208    /// Whether to enable dest actor reordering buffer.
209    @meta(CONFIG = ConfigAttr::new(
210        Some("HYPERACTOR_ENABLE_DEST_ACTOR_REORDERING_BUFFER".to_string()),
211        Some("enable_dest_actor_reordering_buffer".to_string()),
212    ))
213    pub attr ENABLE_DEST_ACTOR_REORDERING_BUFFER: bool = false;
214
215    /// Timeout for [`Host::spawn`] to await proc readiness.
216    ///
217    /// Default: 30 seconds. If set to zero, disables the timeout and
218    /// waits indefinitely.
219    @meta(CONFIG = ConfigAttr::new(
220        Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
221        Some("host_spawn_ready_timeout".to_string()),
222    ))
223    pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(30);
224
225    /// Heartbeat interval for server health metrics. The server emits a
226    /// heartbeat metric at this interval to indicate it is alive.
227    @meta(CONFIG = ConfigAttr::new(
228        Some("HYPERACTOR_SERVER_HEARTBEAT_INTERVAL".to_string()),
229        Some("server_heartbeat_interval".to_string()),
230    ))
231    pub attr SERVER_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
232
233    /// Path to TLS certificate file for the 'tls' transport.
234    @meta(CONFIG = ConfigAttr::new(
235        Some("HYPERACTOR_TLS_CERT".to_string()),
236        Some("hyperactor_tls_cert".to_string()),
237    ).process_local())
238    pub attr TLS_CERT: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.crt");
239
240    /// Path to TLS private key file for the 'tls' transport.
241    @meta(CONFIG = ConfigAttr::new(
242        Some("HYPERACTOR_TLS_KEY".to_string()),
243        Some("hyperactor_tls_key".to_string()),
244    ).process_local())
245    pub attr TLS_KEY: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.key");
246
247    /// Path to TLS CA certificate file for the 'tls' transport.
248    @meta(CONFIG = ConfigAttr::new(
249        Some("HYPERACTOR_TLS_CA".to_string()),
250        Some("hyperactor_tls_ca".to_string()),
251    ).process_local())
252    pub attr TLS_CA: Pem = Pem::StaticPath("/etc/hyperactor/tls/ca.crt");
253}
254
255#[cfg(test)]
256mod tests {
257    use std::collections::HashSet;
258
259    use hyperactor_config::Attrs;
260    use hyperactor_config::from_env;
261    use indoc::indoc;
262
263    use super::*;
264
265    /// Like the `logs_assert` injected by `#[traced_test]`, but without scope
266    /// filtering. Use when asserting on events emitted outside the test's span
267    /// (e.g. from spawned tasks or panic hooks).
268    fn logs_assert_unscoped(f: impl Fn(&[&str]) -> Result<(), String>) {
269        let buf = tracing_test::internal::global_buf().lock().unwrap();
270        let logs_str = std::str::from_utf8(&buf).expect("Logs contain invalid UTF8");
271        let lines: Vec<&str> = logs_str.lines().collect();
272        match f(&lines) {
273            Ok(()) => {}
274            Err(msg) => panic!("{}", msg),
275        }
276    }
277
278    const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
279
280    #[test]
281    fn test_default_config() {
282        let config = Attrs::new();
283        assert_eq!(
284            config[CODEC_MAX_FRAME_LENGTH],
285            CODEC_MAX_FRAME_LENGTH_DEFAULT
286        );
287        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
288        assert_eq!(
289            config[MESSAGE_ACK_TIME_INTERVAL],
290            Duration::from_millis(500)
291        );
292        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
293        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
294        assert_eq!(
295            config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
296            Duration::from_mins(5)
297        );
298    }
299
300    #[tracing_test::traced_test]
301    #[test]
302    // TODO: OSS: The logs_assert function returned an error: missing log lines: {"# export HYPERACTOR_DEFAULT_ENCODING=serde_multipart", ...}
303    #[cfg_attr(not(fbcode_build), ignore)]
304    fn test_from_env() {
305        // Set environment variables
306        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
307        unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
308        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
309        unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
310
311        let config = from_env();
312
313        assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
314        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_mins(1));
315        assert_eq!(
316            config[MESSAGE_ACK_TIME_INTERVAL],
317            Duration::from_millis(500)
318        ); // Default value
319
320        let expected_lines: HashSet<&str> = indoc! {"
321            # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
322            # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
323            # export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5m
324            # export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s
325            # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
326            # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
327            # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
328            # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
329            # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
330            # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
331            export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
332            # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
333            export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
334            # export HYPERACTOR_CLEANUP_TIMEOUT=3s
335            # export HYPERACTOR_SPLIT_MAX_BUFFER_AGE=50ms
336            # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
337            # export HYPERACTOR_HOST_SPAWN_READY_TIMEOUT=30s
338        "}
339        .trim_end()
340        .lines()
341        .collect();
342
343        // For some reason, logs_contaqin fails to find these lines individually
344        // (possibly to do with the fact that we have newlines in our log entries);
345        // instead, we test it manually.
346        logs_assert_unscoped(|logged_lines: &[&str]| {
347            let mut expected_lines = expected_lines.clone(); // this is an `Fn` closure
348            for logged in logged_lines {
349                expected_lines.remove(logged);
350            }
351
352            if expected_lines.is_empty() {
353                Ok(())
354            } else {
355                Err(format!("missing log lines: {:?}", expected_lines))
356            }
357        });
358
359        // Clean up
360        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
361        unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
362        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
363        unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
364    }
365
366    #[test]
367    fn test_defaults() {
368        // Test that empty config now returns defaults via get_or_default
369        let config = Attrs::new();
370
371        // Verify that the config is empty (no values explicitly set)
372        assert!(config.is_empty());
373
374        // But getters should still return the defaults from the keys
375        assert_eq!(
376            config[CODEC_MAX_FRAME_LENGTH],
377            CODEC_MAX_FRAME_LENGTH_DEFAULT
378        );
379        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
380        assert_eq!(
381            config[MESSAGE_ACK_TIME_INTERVAL],
382            Duration::from_millis(500)
383        );
384        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
385        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
386
387        // Verify the keys have defaults
388        assert!(CODEC_MAX_FRAME_LENGTH.has_default());
389        assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
390        assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
391        assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
392        assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
393
394        // Verify we can get defaults directly from keys
395        assert_eq!(
396            CODEC_MAX_FRAME_LENGTH.default(),
397            Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
398        );
399        assert_eq!(
400            MESSAGE_DELIVERY_TIMEOUT.default(),
401            Some(&Duration::from_secs(30))
402        );
403        assert_eq!(
404            MESSAGE_ACK_TIME_INTERVAL.default(),
405            Some(&Duration::from_millis(500))
406        );
407        assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
408        assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
409    }
410
411    #[test]
412    fn test_serialization_only_includes_set_values() {
413        let mut config = Attrs::new();
414
415        // Initially empty, serialization should be empty
416        let serialized = serde_json::to_string(&config).unwrap();
417        assert_eq!(serialized, "{}");
418
419        config[CODEC_MAX_FRAME_LENGTH] = 1024;
420
421        let serialized = serde_json::to_string(&config).unwrap();
422        assert!(serialized.contains("codec_max_frame_length"));
423        assert!(!serialized.contains("message_delivery_timeout")); // Default not serialized
424
425        // Deserialize back
426        let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
427
428        // Custom value should be preserved
429        assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
430
431        // Defaults should still work for other values
432        assert_eq!(
433            restored_config[MESSAGE_DELIVERY_TIMEOUT],
434            Duration::from_secs(30)
435        );
436    }
437}