hyperactor/
config.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Configuration keys and I/O for hyperactor.
10//!
11//! This module declares hyperactor-specific config keys.
12
13use std::io::Cursor;
14use std::io::Read;
15use std::path::PathBuf;
16use std::time::Duration;
17
18use hyperactor_config::AttrValue;
19use hyperactor_config::CONFIG;
20use hyperactor_config::ConfigAttr;
21use hyperactor_config::attrs::declare_attrs;
22use serde::Deserialize;
23use serde::Serialize;
24use typeuri::Named;
25
26/// Stores a PEM-encoded value, either specified directly or read from a file.
27#[derive(Clone, Debug, Serialize, Named)]
28#[named("hyperactor::config::Pem")]
29pub enum Pem {
30    /// Raw PEM value stored directly.
31    Value(Vec<u8>),
32    /// Path to a file containing the PEM data.
33    File(PathBuf),
34    /// Static path string (for use in const/static contexts).
35    StaticPath(&'static str),
36}
37
38// Custom Deserialize implementation because StaticPath contains &'static str
39impl<'de> Deserialize<'de> for Pem {
40    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
41    where
42        D: serde::Deserializer<'de>,
43    {
44        #[derive(Deserialize)]
45        enum PemDeserialize {
46            Value(Vec<u8>),
47            File(PathBuf),
48            // StaticPath deserializes as a File since we can't deserialize &'static str
49            StaticPath(String),
50        }
51
52        match PemDeserialize::deserialize(deserializer)? {
53            PemDeserialize::Value(v) => Ok(Pem::Value(v)),
54            PemDeserialize::File(p) => Ok(Pem::File(p)),
55            // Convert StaticPath string to File during deserialization
56            PemDeserialize::StaticPath(s) => Ok(Pem::File(PathBuf::from(s))),
57        }
58    }
59}
60
61impl AttrValue for Pem {
62    fn display(&self) -> String {
63        match self {
64            Pem::Value(data) => String::from_utf8_lossy(data).to_string(),
65            Pem::File(path) => path.display().to_string(),
66            Pem::StaticPath(path) => path.to_string(),
67        }
68    }
69
70    fn parse(value: &str) -> Result<Self, anyhow::Error> {
71        // PEM data starts with "-----BEGIN"
72        if value.trim_start().starts_with("-----BEGIN") {
73            Ok(Pem::Value(value.as_bytes().to_vec()))
74        } else {
75            Ok(Pem::File(PathBuf::from(value)))
76        }
77    }
78}
79
80impl Pem {
81    /// Returns a reader for the PEM data.
82    pub fn reader(&self) -> std::io::Result<Box<dyn Read + '_>> {
83        match self {
84            Pem::Value(data) => Ok(Box::new(Cursor::new(data))),
85            Pem::File(path) => Ok(Box::new(std::fs::File::open(path)?)),
86            Pem::StaticPath(path) => Ok(Box::new(std::fs::File::open(path)?)),
87        }
88    }
89}
90
91/// A bundle of PEM files for TLS configuration: CA certificate, server/client certificate, and private key.
92#[derive(Clone, Debug)]
93pub struct PemBundle {
94    /// The CA certificate used to verify peer certificates.
95    pub ca: Pem,
96    /// The certificate to present to peers.
97    pub cert: Pem,
98    /// The private key for the certificate.
99    pub key: Pem,
100}
101
102// Declare hyperactor-specific configuration keys
103declare_attrs! {
104    /// Maximum frame length for codec
105    @meta(CONFIG = ConfigAttr::new(
106        Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
107        Some("codec_max_frame_length".to_string()),
108    ))
109    pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
110
111    /// Message delivery timeout
112    @meta(CONFIG = ConfigAttr::new(
113        Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
114        Some("message_delivery_timeout".to_string()),
115    ))
116    pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
117
118    /// Maximum number of terminated actor snapshots retained per
119    /// proc for post-mortem introspection. When the limit is
120    /// exceeded, the oldest entries are evicted.
121    @meta(CONFIG = ConfigAttr::new(
122        Some("HYPERACTOR_TERMINATED_SNAPSHOT_RETENTION".to_string()),
123        Some("terminated_snapshot_retention".to_string()),
124    ))
125    pub attr TERMINATED_SNAPSHOT_RETENTION: usize = 100;
126
127    /// Timeout used by allocator for stopping a proc.
128    @meta(CONFIG = ConfigAttr::new(
129        Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
130        Some("process_exit_timeout".to_string()),
131    ))
132    pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
133
134    /// Message acknowledgment interval
135    @meta(CONFIG = ConfigAttr::new(
136        Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
137        Some("message_ack_time_interval".to_string()),
138    ))
139    pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
140
141    /// Number of messages after which to send an acknowledgment
142    @meta(CONFIG = ConfigAttr::new(
143        Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
144        Some("message_ack_every_n_messages".to_string()),
145    ))
146    pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
147
148    /// Default hop Time-To-Live for message envelopes.
149    @meta(CONFIG = ConfigAttr::new(
150        Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
151        Some("message_ttl_default".to_string()),
152    ))
153    pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
154
155    /// Maximum buffer size for split port messages
156    @meta(CONFIG = ConfigAttr::new(
157        Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
158        Some("split_max_buffer_size".to_string()),
159    ))
160    pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
161
162    /// The maximum time an update can be buffered before being reduced.
163    @meta(CONFIG = ConfigAttr::new(
164        Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
165        Some("split_max_buffer_age".to_string()),
166    ))
167    pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
168
169    /// Timeout used by proc mesh for stopping an actor.
170    @meta(CONFIG = ConfigAttr::new(
171        Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
172        Some("stop_actor_timeout".to_string()),
173    ))
174    pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10);
175
176    /// Timeout used by proc for running the cleanup callback on an actor.
177    /// Should be less than the timeout for STOP_ACTOR_TIMEOUT.
178    @meta(CONFIG = ConfigAttr::new(
179        Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()),
180        Some("cleanup_timeout".to_string()),
181    ))
182    pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3);
183
184    /// Heartbeat interval for remote allocator. We do not rely on this heartbeat
185    /// anymore in v1, and it should be removed after we finishing the v0
186    /// deprecation.
187    @meta(CONFIG = ConfigAttr::new(
188        Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()),
189        Some("remote_allocator_heartbeat_interval".to_string()),
190    ))
191    pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_mins(5);
192
193    /// How often to check for full MPSC channel on NetRx.
194    @meta(CONFIG = ConfigAttr::new(
195        Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
196        Some("channel_net_rx_buffer_full_check_interval".to_string()),
197    ))
198    pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
199
200    /// Sampling rate for logging message latency
201    /// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc.
202    @meta(CONFIG = ConfigAttr::new(
203        Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
204        Some("message_latency_sampling_rate".to_string()),
205    ))
206    pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
207
208    /// Whether to enable dest actor reordering buffer.
209    @meta(CONFIG = ConfigAttr::new(
210        Some("HYPERACTOR_ENABLE_DEST_ACTOR_REORDERING_BUFFER".to_string()),
211        Some("enable_dest_actor_reordering_buffer".to_string()),
212    ))
213    pub attr ENABLE_DEST_ACTOR_REORDERING_BUFFER: bool = false;
214
215    /// Timeout for [`Host::spawn`] to await proc readiness.
216    ///
217    /// Default: 30 seconds. If set to zero, disables the timeout and
218    /// waits indefinitely.
219    @meta(CONFIG = ConfigAttr::new(
220        Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
221        Some("host_spawn_ready_timeout".to_string()),
222    ))
223    pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(30);
224
225    /// Heartbeat interval for server health metrics. The server emits a
226    /// heartbeat metric at this interval to indicate it is alive.
227    @meta(CONFIG = ConfigAttr::new(
228        Some("HYPERACTOR_SERVER_HEARTBEAT_INTERVAL".to_string()),
229        Some("server_heartbeat_interval".to_string()),
230    ))
231    pub attr SERVER_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
232
233    /// Timeout for best-effort forwarder flush during proc/actor
234    /// teardown. If the remote side has already torn down its
235    /// networking, acks may never arrive; this timeout prevents the
236    /// flush from hanging indefinitely.
237    @meta(CONFIG = ConfigAttr::new(
238        Some("HYPERACTOR_FORWARDER_FLUSH_TIMEOUT".to_string()),
239        Some("forwarder_flush_timeout".to_string()),
240    ))
241    pub attr FORWARDER_FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
242
243    /// Path to TLS certificate file for the 'tls' transport.
244    @meta(CONFIG = ConfigAttr::new(
245        Some("HYPERACTOR_TLS_CERT".to_string()),
246        Some("hyperactor_tls_cert".to_string()),
247    ).process_local())
248    pub attr TLS_CERT: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.crt");
249
250    /// Path to TLS private key file for the 'tls' transport.
251    @meta(CONFIG = ConfigAttr::new(
252        Some("HYPERACTOR_TLS_KEY".to_string()),
253        Some("hyperactor_tls_key".to_string()),
254    ).process_local())
255    pub attr TLS_KEY: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.key");
256
257    /// Path to TLS CA certificate file for the 'tls' transport.
258    @meta(CONFIG = ConfigAttr::new(
259        Some("HYPERACTOR_TLS_CA".to_string()),
260        Some("hyperactor_tls_ca".to_string()),
261    ).process_local())
262    pub attr TLS_CA: Pem = Pem::StaticPath("/etc/hyperactor/tls/ca.crt");
263}
264
265#[cfg(test)]
266mod tests {
267    use std::collections::HashSet;
268
269    use hyperactor_config::Attrs;
270    use hyperactor_config::from_env;
271    use indoc::indoc;
272
273    use super::*;
274
275    /// Like the `logs_assert` injected by `#[traced_test]`, but without scope
276    /// filtering. Use when asserting on events emitted outside the test's span
277    /// (e.g. from spawned tasks or panic hooks).
278    fn logs_assert_unscoped(f: impl Fn(&[&str]) -> Result<(), String>) {
279        let buf = tracing_test::internal::global_buf().lock().unwrap();
280        let logs_str = std::str::from_utf8(&buf).expect("Logs contain invalid UTF8");
281        let lines: Vec<&str> = logs_str.lines().collect();
282        match f(&lines) {
283            Ok(()) => {}
284            Err(msg) => panic!("{}", msg),
285        }
286    }
287
288    const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
289
290    #[test]
291    fn test_default_config() {
292        let config = Attrs::new();
293        assert_eq!(
294            config[CODEC_MAX_FRAME_LENGTH],
295            CODEC_MAX_FRAME_LENGTH_DEFAULT
296        );
297        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
298        assert_eq!(
299            config[MESSAGE_ACK_TIME_INTERVAL],
300            Duration::from_millis(500)
301        );
302        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
303        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
304        assert_eq!(
305            config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
306            Duration::from_mins(5)
307        );
308    }
309
310    #[tracing_test::traced_test]
311    #[test]
312    // TODO: OSS: The logs_assert function returned an error: missing log lines: {"# export HYPERACTOR_DEFAULT_ENCODING=serde_multipart", ...}
313    #[cfg_attr(not(fbcode_build), ignore)]
314    fn test_from_env() {
315        // Set environment variables
316        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
317        unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
318        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
319        unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
320
321        let config = from_env();
322
323        assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
324        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_mins(1));
325        assert_eq!(
326            config[MESSAGE_ACK_TIME_INTERVAL],
327            Duration::from_millis(500)
328        ); // Default value
329
330        let expected_lines: HashSet<&str> = indoc! {"
331            # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
332            # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
333            # export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5m
334            # export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s
335            # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
336            # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
337            # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
338            # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
339            # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
340            # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
341            export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
342            # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
343            export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
344            # export HYPERACTOR_CLEANUP_TIMEOUT=3s
345            # export HYPERACTOR_SPLIT_MAX_BUFFER_AGE=50ms
346            # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
347            # export HYPERACTOR_HOST_SPAWN_READY_TIMEOUT=30s
348        "}
349        .trim_end()
350        .lines()
351        .collect();
352
353        // For some reason, logs_contaqin fails to find these lines individually
354        // (possibly to do with the fact that we have newlines in our log entries);
355        // instead, we test it manually.
356        logs_assert_unscoped(|logged_lines: &[&str]| {
357            let mut expected_lines = expected_lines.clone(); // this is an `Fn` closure
358            for logged in logged_lines {
359                expected_lines.remove(logged);
360            }
361
362            if expected_lines.is_empty() {
363                Ok(())
364            } else {
365                Err(format!("missing log lines: {:?}", expected_lines))
366            }
367        });
368
369        // Clean up
370        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
371        unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
372        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
373        unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
374    }
375
376    #[test]
377    fn test_defaults() {
378        // Test that empty config now returns defaults via get_or_default
379        let config = Attrs::new();
380
381        // Verify that the config is empty (no values explicitly set)
382        assert!(config.is_empty());
383
384        // But getters should still return the defaults from the keys
385        assert_eq!(
386            config[CODEC_MAX_FRAME_LENGTH],
387            CODEC_MAX_FRAME_LENGTH_DEFAULT
388        );
389        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
390        assert_eq!(
391            config[MESSAGE_ACK_TIME_INTERVAL],
392            Duration::from_millis(500)
393        );
394        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
395        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
396
397        // Verify the keys have defaults
398        assert!(CODEC_MAX_FRAME_LENGTH.has_default());
399        assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
400        assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
401        assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
402        assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
403
404        // Verify we can get defaults directly from keys
405        assert_eq!(
406            CODEC_MAX_FRAME_LENGTH.default(),
407            Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
408        );
409        assert_eq!(
410            MESSAGE_DELIVERY_TIMEOUT.default(),
411            Some(&Duration::from_secs(30))
412        );
413        assert_eq!(
414            MESSAGE_ACK_TIME_INTERVAL.default(),
415            Some(&Duration::from_millis(500))
416        );
417        assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
418        assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
419    }
420
421    #[test]
422    fn test_serialization_only_includes_set_values() {
423        let mut config = Attrs::new();
424
425        // Initially empty, serialization should be empty
426        let serialized = serde_json::to_string(&config).unwrap();
427        assert_eq!(serialized, "{}");
428
429        config[CODEC_MAX_FRAME_LENGTH] = 1024;
430
431        let serialized = serde_json::to_string(&config).unwrap();
432        assert!(serialized.contains("codec_max_frame_length"));
433        assert!(!serialized.contains("message_delivery_timeout")); // Default not serialized
434
435        // Deserialize back
436        let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
437
438        // Custom value should be preserved
439        assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
440
441        // Defaults should still work for other values
442        assert_eq!(
443            restored_config[MESSAGE_DELIVERY_TIMEOUT],
444            Duration::from_secs(30)
445        );
446    }
447}