Skip to main content

hyperactor/
config.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Configuration keys and I/O for hyperactor.
10//!
11//! This module declares hyperactor-specific config keys.
12
13use std::io::Cursor;
14use std::io::Read;
15use std::path::PathBuf;
16use std::time::Duration;
17
18use hyperactor_config::AttrValue;
19use hyperactor_config::CONFIG;
20use hyperactor_config::ConfigAttr;
21use hyperactor_config::attrs::declare_attrs;
22use serde::Deserialize;
23use serde::Serialize;
24use typeuri::Named;
25
26/// Stores a PEM-encoded value, either specified directly or read from a file.
27#[derive(Clone, Debug, Serialize, Named)]
28#[named("hyperactor::config::Pem")]
29pub enum Pem {
30    /// Raw PEM value stored directly.
31    Value(Vec<u8>),
32    /// Path to a file containing the PEM data.
33    File(PathBuf),
34    /// Static path string (for use in const/static contexts).
35    StaticPath(&'static str),
36}
37
38// Custom Deserialize implementation because StaticPath contains &'static str
39impl<'de> Deserialize<'de> for Pem {
40    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
41    where
42        D: serde::Deserializer<'de>,
43    {
44        #[derive(Deserialize)]
45        enum PemDeserialize {
46            Value(Vec<u8>),
47            File(PathBuf),
48            // StaticPath deserializes as a File since we can't deserialize &'static str
49            StaticPath(String),
50        }
51
52        match PemDeserialize::deserialize(deserializer)? {
53            PemDeserialize::Value(v) => Ok(Pem::Value(v)),
54            PemDeserialize::File(p) => Ok(Pem::File(p)),
55            // Convert StaticPath string to File during deserialization
56            PemDeserialize::StaticPath(s) => Ok(Pem::File(PathBuf::from(s))),
57        }
58    }
59}
60
61impl AttrValue for Pem {
62    fn display(&self) -> String {
63        match self {
64            Pem::Value(data) => String::from_utf8_lossy(data).to_string(),
65            Pem::File(path) => path.display().to_string(),
66            Pem::StaticPath(path) => path.to_string(),
67        }
68    }
69
70    fn parse(value: &str) -> Result<Self, anyhow::Error> {
71        // PEM data starts with "-----BEGIN"
72        if value.trim_start().starts_with("-----BEGIN") {
73            Ok(Pem::Value(value.as_bytes().to_vec()))
74        } else {
75            Ok(Pem::File(PathBuf::from(value)))
76        }
77    }
78}
79
80impl Pem {
81    /// Returns a reader for the PEM data.
82    pub fn reader(&self) -> std::io::Result<Box<dyn Read + '_>> {
83        match self {
84            Pem::Value(data) => Ok(Box::new(Cursor::new(data))),
85            Pem::File(path) => Ok(Box::new(std::fs::File::open(path)?)),
86            Pem::StaticPath(path) => Ok(Box::new(std::fs::File::open(path)?)),
87        }
88    }
89}
90
91/// A bundle of PEM files for TLS configuration: CA certificate, server/client certificate, and private key.
92#[derive(Clone, Debug)]
93pub struct PemBundle {
94    /// The CA certificate used to verify peer certificates.
95    pub ca: Pem,
96    /// The certificate to present to peers.
97    pub cert: Pem,
98    /// The private key for the certificate.
99    pub key: Pem,
100}
101
102// Declare hyperactor-specific configuration keys
103declare_attrs! {
104    /// Maximum frame length for codec
105    @meta(CONFIG = ConfigAttr::new(
106        Some("HYPERACTOR_CODEC_MAX_FRAME_LENGTH".to_string()),
107        Some("codec_max_frame_length".to_string()),
108    ))
109    pub attr CODEC_MAX_FRAME_LENGTH: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
110
111    /// Message delivery timeout
112    @meta(CONFIG = ConfigAttr::new(
113        Some("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT".to_string()),
114        Some("message_delivery_timeout".to_string()),
115    ))
116    pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
117
118    /// Maximum number of terminated actor snapshots retained per
119    /// proc for post-mortem introspection. When the limit is
120    /// exceeded, the oldest entries are evicted.
121    @meta(CONFIG = ConfigAttr::new(
122        Some("HYPERACTOR_TERMINATED_SNAPSHOT_RETENTION".to_string()),
123        Some("terminated_snapshot_retention".to_string()),
124    ))
125    pub attr TERMINATED_SNAPSHOT_RETENTION: usize = 100;
126
127    /// Timeout to stop a proc.
128    @meta(CONFIG = ConfigAttr::new(
129        Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()),
130        Some("process_exit_timeout".to_string()),
131    ))
132    pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
133
134    /// Message acknowledgment interval
135    @meta(CONFIG = ConfigAttr::new(
136        Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()),
137        Some("message_ack_time_interval".to_string()),
138    ))
139    pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
140
141    /// Number of messages after which to send an acknowledgment
142    @meta(CONFIG = ConfigAttr::new(
143        Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()),
144        Some("message_ack_every_n_messages".to_string()),
145    ))
146    pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000;
147
148    /// Default hop Time-To-Live for message envelopes.
149    @meta(CONFIG = ConfigAttr::new(
150        Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()),
151        Some("message_ttl_default".to_string()),
152    ))
153    pub attr MESSAGE_TTL_DEFAULT : u8 = 64;
154
155    /// Maximum buffer size for split port messages
156    @meta(CONFIG = ConfigAttr::new(
157        Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()),
158        Some("split_max_buffer_size".to_string()),
159    ))
160    pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;
161
162    /// The maximum time an update can be buffered before being reduced.
163    @meta(CONFIG = ConfigAttr::new(
164        Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()),
165        Some("split_max_buffer_age".to_string()),
166    ))
167    pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50);
168
169    /// Timeout used by proc mesh for stopping an actor.
170    @meta(CONFIG = ConfigAttr::new(
171        Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()),
172        Some("stop_actor_timeout".to_string()),
173    ))
174    pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10);
175
176    /// Timeout used by proc for running the cleanup callback on an actor.
177    /// Should be less than the timeout for STOP_ACTOR_TIMEOUT.
178    @meta(CONFIG = ConfigAttr::new(
179        Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()),
180        Some("cleanup_timeout".to_string()),
181    ))
182    pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3);
183
184    /// How often to check for full MPSC channel on NetRx.
185    @meta(CONFIG = ConfigAttr::new(
186        Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
187        Some("channel_net_rx_buffer_full_check_interval".to_string()),
188    ))
189    pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
190
191    /// Sampling rate for logging message latency
192    /// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc.
193    @meta(CONFIG = ConfigAttr::new(
194        Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()),
195        Some("message_latency_sampling_rate".to_string()),
196    ))
197    pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
198
199    /// Whether to enable dest actor reordering buffer.
200    @meta(CONFIG = ConfigAttr::new(
201        Some("HYPERACTOR_ENABLE_DEST_ACTOR_REORDERING_BUFFER".to_string()),
202        Some("enable_dest_actor_reordering_buffer".to_string()),
203    ))
204    pub attr ENABLE_DEST_ACTOR_REORDERING_BUFFER: bool = true;
205
206    /// Timeout for [`Host::spawn`] to await proc readiness.
207    ///
208    /// Default: 30 seconds. If set to zero, disables the timeout and
209    /// waits indefinitely.
210    @meta(CONFIG = ConfigAttr::new(
211        Some("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT".to_string()),
212        Some("host_spawn_ready_timeout".to_string()),
213    ))
214    pub attr HOST_SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(30);
215
216    /// Heartbeat interval for server health metrics. The server emits a
217    /// heartbeat metric at this interval to indicate it is alive.
218    @meta(CONFIG = ConfigAttr::new(
219        Some("HYPERACTOR_SERVER_HEARTBEAT_INTERVAL".to_string()),
220        Some("server_heartbeat_interval".to_string()),
221    ))
222    pub attr SERVER_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
223
224    /// Timeout for best-effort forwarder flush during proc/actor
225    /// teardown. If the remote side has already torn down its
226    /// networking, acks may never arrive; this timeout prevents the
227    /// flush from hanging indefinitely.
228    @meta(CONFIG = ConfigAttr::new(
229        Some("HYPERACTOR_FORWARDER_FLUSH_TIMEOUT".to_string()),
230        Some("forwarder_flush_timeout".to_string()),
231    ))
232    pub attr FORWARDER_FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
233
234    /// Path to TLS certificate file for the 'tls' transport.
235    @meta(CONFIG = ConfigAttr::new(
236        Some("HYPERACTOR_TLS_CERT".to_string()),
237        Some("hyperactor_tls_cert".to_string()),
238    ).process_local())
239    pub attr TLS_CERT: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.crt");
240
241    /// Path to TLS private key file for the 'tls' transport.
242    @meta(CONFIG = ConfigAttr::new(
243        Some("HYPERACTOR_TLS_KEY".to_string()),
244        Some("hyperactor_tls_key".to_string()),
245    ).process_local())
246    pub attr TLS_KEY: Pem = Pem::StaticPath("/etc/hyperactor/tls/tls.key");
247
248    /// Path to TLS CA certificate file for the 'tls' transport.
249    @meta(CONFIG = ConfigAttr::new(
250        Some("HYPERACTOR_TLS_CA".to_string()),
251        Some("hyperactor_tls_ca".to_string()),
252    ).process_local())
253    pub attr TLS_CA: Pem = Pem::StaticPath("/etc/hyperactor/tls/ca.crt");
254}
255
256#[cfg(test)]
257mod tests {
258    use std::collections::HashSet;
259
260    use hyperactor_config::Attrs;
261    use hyperactor_config::from_env;
262    use indoc::indoc;
263
264    use super::*;
265
266    /// Like the `logs_assert` injected by `#[traced_test]`, but without scope
267    /// filtering. Use when asserting on events emitted outside the test's span
268    /// (e.g. from spawned tasks or panic hooks).
269    fn logs_assert_unscoped(f: impl Fn(&[&str]) -> Result<(), String>) {
270        let buf = tracing_test::internal::global_buf().lock().unwrap();
271        let logs_str = std::str::from_utf8(&buf).expect("Logs contain invalid UTF8");
272        let lines: Vec<&str> = logs_str.lines().collect();
273        match f(&lines) {
274            Ok(()) => {}
275            Err(msg) => panic!("{}", msg),
276        }
277    }
278
279    const CODEC_MAX_FRAME_LENGTH_DEFAULT: usize = 10 * 1024 * 1024 * 1024;
280
281    #[test]
282    fn test_default_config() {
283        let config = Attrs::new();
284        assert_eq!(
285            config[CODEC_MAX_FRAME_LENGTH],
286            CODEC_MAX_FRAME_LENGTH_DEFAULT
287        );
288        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
289        assert_eq!(
290            config[MESSAGE_ACK_TIME_INTERVAL],
291            Duration::from_millis(500)
292        );
293        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
294        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
295    }
296
297    #[tracing_test::traced_test]
298    #[test]
299    // TODO: OSS: The logs_assert function returned an error: missing log lines: {"# export HYPERACTOR_DEFAULT_ENCODING=serde_multipart", ...}
300    #[cfg_attr(not(fbcode_build), ignore)]
301    fn test_from_env() {
302        // Set environment variables
303        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
304        unsafe { std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024") };
305        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
306        unsafe { std::env::set_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT", "60s") };
307
308        let config = from_env();
309
310        assert_eq!(config[CODEC_MAX_FRAME_LENGTH], 1024);
311        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_mins(1));
312        assert_eq!(
313            config[MESSAGE_ACK_TIME_INTERVAL],
314            Duration::from_millis(500)
315        ); // Default value
316
317        let expected_lines: HashSet<&str> = indoc! {"
318            # export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
319            # export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
320            # export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s
321            # export HYPERACTOR_SPLIT_MAX_BUFFER_SIZE=5
322            # export HYPERACTOR_MESSAGE_TTL_DEFAULT=64
323            # export HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES=1000
324            # export HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL=500ms
325            # export HYPERACTOR_PROCESS_EXIT_TIMEOUT=10s
326            # export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=30s
327            export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT=1m
328            # export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=10737418240
329            export HYPERACTOR_CODEC_MAX_FRAME_LENGTH=1024
330            # export HYPERACTOR_CLEANUP_TIMEOUT=3s
331            # export HYPERACTOR_SPLIT_MAX_BUFFER_AGE=50ms
332            # export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
333            # export HYPERACTOR_HOST_SPAWN_READY_TIMEOUT=30s
334        "}
335        .trim_end()
336        .lines()
337        .collect();
338
339        // For some reason, logs_contaqin fails to find these lines individually
340        // (possibly to do with the fact that we have newlines in our log entries);
341        // instead, we test it manually.
342        logs_assert_unscoped(|logged_lines: &[&str]| {
343            let mut expected_lines = expected_lines.clone(); // this is an `Fn` closure
344            for logged in logged_lines {
345                expected_lines.remove(logged);
346            }
347
348            if expected_lines.is_empty() {
349                Ok(())
350            } else {
351                Err(format!("missing log lines: {:?}", expected_lines))
352            }
353        });
354
355        // Clean up
356        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
357        unsafe { std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH") };
358        // SAFETY: TODO: Audit that the environment access only happens in single-threaded code.
359        unsafe { std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT_SECS") };
360    }
361
362    #[test]
363    fn test_defaults() {
364        // Test that empty config now returns defaults via get_or_default
365        let config = Attrs::new();
366
367        // Verify that the config is empty (no values explicitly set)
368        assert!(config.is_empty());
369
370        // But getters should still return the defaults from the keys
371        assert_eq!(
372            config[CODEC_MAX_FRAME_LENGTH],
373            CODEC_MAX_FRAME_LENGTH_DEFAULT
374        );
375        assert_eq!(config[MESSAGE_DELIVERY_TIMEOUT], Duration::from_secs(30));
376        assert_eq!(
377            config[MESSAGE_ACK_TIME_INTERVAL],
378            Duration::from_millis(500)
379        );
380        assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
381        assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
382
383        // Verify the keys have defaults
384        assert!(CODEC_MAX_FRAME_LENGTH.has_default());
385        assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
386        assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
387        assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
388        assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
389
390        // Verify we can get defaults directly from keys
391        assert_eq!(
392            CODEC_MAX_FRAME_LENGTH.default(),
393            Some(&(CODEC_MAX_FRAME_LENGTH_DEFAULT))
394        );
395        assert_eq!(
396            MESSAGE_DELIVERY_TIMEOUT.default(),
397            Some(&Duration::from_secs(30))
398        );
399        assert_eq!(
400            MESSAGE_ACK_TIME_INTERVAL.default(),
401            Some(&Duration::from_millis(500))
402        );
403        assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
404        assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
405    }
406
407    #[test]
408    fn test_serialization_only_includes_set_values() {
409        let mut config = Attrs::new();
410
411        // Initially empty, serialization should be empty
412        let serialized = serde_json::to_string(&config).unwrap();
413        assert_eq!(serialized, "{}");
414
415        config[CODEC_MAX_FRAME_LENGTH] = 1024;
416
417        let serialized = serde_json::to_string(&config).unwrap();
418        assert!(serialized.contains("codec_max_frame_length"));
419        assert!(!serialized.contains("message_delivery_timeout")); // Default not serialized
420
421        // Deserialize back
422        let restored_config: Attrs = serde_json::from_str(&serialized).unwrap();
423
424        // Custom value should be preserved
425        assert_eq!(restored_config[CODEC_MAX_FRAME_LENGTH], 1024);
426
427        // Defaults should still work for other values
428        assert_eq!(
429            restored_config[MESSAGE_DELIVERY_TIMEOUT],
430            Duration::from_secs(30)
431        );
432    }
433}