hyperactor/
clock.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! The clock allows us to control the behaviour of all time dependent events in both real and simulated time throughout the system
10
11use std::error::Error;
12use std::fmt;
13use std::sync::LazyLock;
14use std::sync::Mutex;
15use std::sync::OnceLock;
16use std::time::SystemTime;
17
18use futures::pin_mut;
19use hyperactor_telemetry::TelemetryClock;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::Mailbox;
24use crate::channel::ChannelAddr;
25use crate::data::Named;
26use crate::id;
27use crate::mailbox::DeliveryError;
28use crate::mailbox::MailboxSender;
29use crate::mailbox::MessageEnvelope;
30use crate::mailbox::Undeliverable;
31use crate::mailbox::UndeliverableMailboxSender;
32use crate::mailbox::monitored_return_handle;
33use crate::simnet::SleepEvent;
34use crate::simnet::simnet_handle;
35
36struct SimTime {
37    start: tokio::time::Instant,
38    now: Mutex<tokio::time::Instant>,
39    system_start: SystemTime,
40}
41
42#[allow(clippy::disallowed_methods)]
43static SIM_TIME: LazyLock<SimTime> = LazyLock::new(|| {
44    let now = tokio::time::Instant::now();
45    SimTime {
46        start: now,
47        now: Mutex::new(now),
48        system_start: SystemTime::now(),
49    }
50});
51
52#[derive(Debug)]
53/// Errors returned by `Timeout`.
54///
55/// This error is returned when a timeout expires before the function was able
56/// to finish.
57pub struct TimeoutError;
58
59impl fmt::Display for TimeoutError {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        write!(f, "deadline has elapsed")
62    }
63}
64
65impl Error for TimeoutError {}
66
67/// The Sleeps trait allows different implementations to control the behavior of sleep.
68pub trait Clock {
69    /// Initiates a sleep for the specified duration
70    fn sleep(
71        &self,
72        duration: tokio::time::Duration,
73    ) -> impl std::future::Future<Output = ()> + Send + Sync;
74    /// Initiates a sleep for the specified duration
75    fn non_advancing_sleep(
76        &self,
77        duration: tokio::time::Duration,
78    ) -> impl std::future::Future<Output = ()> + Send + Sync;
79    /// Get the current time according to the clock
80    fn now(&self) -> tokio::time::Instant;
81    /// Sleep until the specified deadline.
82    fn sleep_until(
83        &self,
84        deadline: tokio::time::Instant,
85    ) -> impl std::future::Future<Output = ()> + Send + Sync;
86    /// Get the current system time according to the clock
87    fn system_time_now(&self) -> SystemTime;
88    /// Require a future to complete within the specified duration
89    ///
90    /// if the future completes before the duration has elapsed, then the completed value is returned.
91    /// Otherwise, an error is returned and the future is canceled.
92    fn timeout<F, T>(
93        &self,
94        duration: tokio::time::Duration,
95        f: F,
96    ) -> impl std::future::Future<Output = Result<T, TimeoutError>> + Send
97    where
98        F: std::future::Future<Output = T> + Send;
99}
100
101/// An adapter that allows us to control the behaviour of sleep between performing a real sleep
102/// and a sleep on the simnet
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub enum ClockKind {
105    /// Simulates a clock that uses the simnet's current time as the source of truth
106    Sim(SimClock),
107    /// Represents a real clock using tokio's sleep functionality for production use.
108    Real(RealClock),
109}
110
111impl Clock for ClockKind {
112    async fn sleep(&self, duration: tokio::time::Duration) {
113        match self {
114            Self::Sim(clock) => clock.sleep(duration).await,
115            Self::Real(clock) => clock.sleep(duration).await,
116        }
117    }
118    async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
119        match self {
120            Self::Sim(clock) => clock.non_advancing_sleep(duration).await,
121            Self::Real(clock) => clock.non_advancing_sleep(duration).await,
122        }
123    }
124    async fn sleep_until(&self, deadline: tokio::time::Instant) {
125        match self {
126            Self::Sim(clock) => clock.sleep_until(deadline).await,
127            Self::Real(clock) => clock.sleep_until(deadline).await,
128        }
129    }
130    fn now(&self) -> tokio::time::Instant {
131        match self {
132            Self::Sim(clock) => clock.now(),
133            Self::Real(clock) => clock.now(),
134        }
135    }
136    fn system_time_now(&self) -> SystemTime {
137        match self {
138            Self::Sim(clock) => clock.system_time_now(),
139            Self::Real(clock) => clock.system_time_now(),
140        }
141    }
142    async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
143    where
144        F: std::future::Future<Output = T> + Send,
145    {
146        match self {
147            Self::Sim(clock) => clock.timeout(duration, f).await,
148            Self::Real(clock) => clock.timeout(duration, f).await,
149        }
150    }
151}
152
153impl TelemetryClock for ClockKind {
154    fn now(&self) -> tokio::time::Instant {
155        match self {
156            Self::Sim(clock) => clock.now(),
157            Self::Real(clock) => clock.now(),
158        }
159    }
160
161    fn system_time_now(&self) -> std::time::SystemTime {
162        match self {
163            Self::Sim(clock) => clock.system_time_now(),
164            Self::Real(clock) => clock.system_time_now(),
165        }
166    }
167}
168
169impl Default for ClockKind {
170    fn default() -> Self {
171        Self::Real(RealClock)
172    }
173}
174
175impl ClockKind {
176    /// Returns the appropriate clock given the channel address kind
177    /// a proc is being served on
178    pub fn for_channel_addr(channel_addr: &ChannelAddr) -> Self {
179        match channel_addr {
180            ChannelAddr::Sim(_) => Self::Sim(SimClock),
181            _ => Self::Real(RealClock),
182        }
183    }
184}
185
186/// Clock to be used in simulator runs that allows the simnet to create a scheduled event for.
187/// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's
188/// time to the wakeup time and use the transmitter to wake up this green thread
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct SimClock;
191
192impl Clock for SimClock {
193    /// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
194    async fn sleep(&self, duration: tokio::time::Duration) {
195        let mailbox = SimClock::mailbox().clone();
196        let (tx, rx) = mailbox.open_once_port::<()>();
197
198        simnet_handle()
199            .unwrap()
200            .send_event(SleepEvent::new(tx.bind(), mailbox, duration))
201            .unwrap();
202        rx.recv().await.unwrap();
203    }
204
205    async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
206        let mailbox = SimClock::mailbox().clone();
207        let (tx, rx) = mailbox.open_once_port::<()>();
208
209        simnet_handle()
210            .unwrap()
211            .send_nonadvanceable_event(SleepEvent::new(tx.bind(), mailbox, duration))
212            .unwrap();
213        rx.recv().await.unwrap();
214    }
215
216    async fn sleep_until(&self, deadline: tokio::time::Instant) {
217        let now = self.now();
218        if deadline <= now {
219            return;
220        }
221        self.sleep(deadline - now).await;
222    }
223    /// Get the current time according to the simnet
224    fn now(&self) -> tokio::time::Instant {
225        *SIM_TIME.now.lock().unwrap()
226    }
227
228    fn system_time_now(&self) -> SystemTime {
229        SIM_TIME.system_start + self.now().duration_since(SIM_TIME.start)
230    }
231
232    #[allow(clippy::disallowed_methods)]
233    async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
234    where
235        F: std::future::Future<Output = T>,
236    {
237        let mailbox = SimClock::mailbox().clone();
238        let (tx, deadline_rx) = mailbox.open_once_port::<()>();
239
240        simnet_handle()
241            .unwrap()
242            .send_event(SleepEvent::new(tx.bind(), mailbox, duration))
243            .unwrap();
244
245        let fut = f;
246        pin_mut!(fut);
247
248        tokio::select! {
249            _ = deadline_rx.recv() => {
250                Err(TimeoutError)
251            }
252            res = &mut fut => Ok(res)
253        }
254    }
255}
256
257impl SimClock {
258    // TODO (SF, 2025-07-11): Remove this global, thread through a mailbox
259    // from upstack and handle undeliverable messages properly.
260    fn mailbox() -> &'static Mailbox {
261        static SIMCLOCK_MAILBOX: OnceLock<Mailbox> = OnceLock::new();
262        SIMCLOCK_MAILBOX.get_or_init(|| {
263            let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
264            let (undeliverable_messages, mut rx) =
265                mailbox.open_port::<Undeliverable<MessageEnvelope>>();
266            undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
267            tokio::spawn(async move {
268                while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
269                    envelope.set_error(DeliveryError::BrokenLink(
270                        "message returned to undeliverable port".to_string(),
271                    ));
272                    UndeliverableMailboxSender
273                        .post(envelope, /*unused */ monitored_return_handle())
274                }
275            });
276            mailbox
277        })
278    }
279
280    /// Advance the sumulator's time to the specified instant
281    pub fn advance_to(&self, time: tokio::time::Instant) {
282        let mut guard = SIM_TIME.now.lock().unwrap();
283        *guard = time;
284    }
285
286    /// Get the number of milliseconds elapsed since the start of the simulation
287    pub fn duration_since_start(&self, instant: tokio::time::Instant) -> tokio::time::Duration {
288        instant.duration_since(SIM_TIME.start)
289    }
290
291    /// Instant marking the start of the simulation
292    pub fn start(&self) -> tokio::time::Instant {
293        SIM_TIME.start
294    }
295}
296
297/// An adapter for tokio::time::sleep to be used in production
298#[derive(Debug, Clone, Serialize, Deserialize)]
299pub struct RealClock;
300
301impl Clock for RealClock {
302    #[allow(clippy::disallowed_methods)]
303    async fn sleep(&self, duration: tokio::time::Duration) {
304        tokio::time::sleep(duration).await;
305    }
306    async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
307        Self::sleep(self, duration).await;
308    }
309    #[allow(clippy::disallowed_methods)]
310    async fn sleep_until(&self, deadline: tokio::time::Instant) {
311        tokio::time::sleep_until(deadline).await;
312    }
313    /// Get the current time using tokio::time::Instant
314    #[allow(clippy::disallowed_methods)]
315    fn now(&self) -> tokio::time::Instant {
316        tokio::time::Instant::now()
317    }
318    #[allow(clippy::disallowed_methods)]
319    fn system_time_now(&self) -> SystemTime {
320        SystemTime::now()
321    }
322    #[allow(clippy::disallowed_methods)]
323    async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
324    where
325        F: std::future::Future<Output = T>,
326    {
327        tokio::time::timeout(duration, f)
328            .await
329            .map_err(|_| TimeoutError)
330    }
331}
332
333#[cfg(test)]
334mod tests {
335
336    use crate::clock::Clock;
337    use crate::clock::SimClock;
338    use crate::simnet;
339
340    #[tokio::test]
341    async fn test_sim_clock_simple() {
342        let start = SimClock.now();
343        assert_eq!(
344            SimClock.duration_since_start(start),
345            tokio::time::Duration::ZERO
346        );
347        SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
348        let end = SimClock.now();
349        assert_eq!(
350            SimClock.duration_since_start(end),
351            tokio::time::Duration::from_millis(10000)
352        );
353        assert_eq!(
354            end.duration_since(start),
355            tokio::time::Duration::from_secs(10)
356        );
357    }
358
359    #[tokio::test]
360    async fn test_sim_clock_system_time() {
361        let start = SimClock.system_time_now();
362        SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
363        let end = SimClock.system_time_now();
364        assert_eq!(
365            end.duration_since(start).unwrap(),
366            tokio::time::Duration::from_secs(10)
367        );
368    }
369
370    #[tokio::test]
371    async fn test_sim_timeout() {
372        simnet::start();
373        let res = SimClock
374            .timeout(tokio::time::Duration::from_secs(10), async {
375                SimClock.sleep(tokio::time::Duration::from_secs(5)).await;
376                5
377            })
378            .await;
379        assert_eq!(res.unwrap(), 5);
380
381        let res = SimClock
382            .timeout(tokio::time::Duration::from_secs(10), async {
383                SimClock.sleep(tokio::time::Duration::from_secs(15)).await;
384                5
385            })
386            .await;
387        assert!(res.is_err());
388    }
389}