hyperactor/
clock.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! The clock allows us to control the behaviour of all time dependent events in both real and simulated time throughout the system
10
11use std::error::Error;
12use std::fmt;
13use std::sync::LazyLock;
14use std::sync::Mutex;
15use std::time::SystemTime;
16
17use futures::pin_mut;
18use hyperactor_telemetry::TelemetryClock;
19use serde::Deserialize;
20use serde::Serialize;
21use tokio::sync::oneshot;
22
23use crate::channel::ChannelAddr;
24use crate::simnet::SleepEvent;
25use crate::simnet::simnet_handle;
26
27struct SimTime {
28    start: tokio::time::Instant,
29    now: Mutex<tokio::time::Instant>,
30    system_start: SystemTime,
31}
32
33#[allow(clippy::disallowed_methods)]
34static SIM_TIME: LazyLock<SimTime> = LazyLock::new(|| {
35    let now = tokio::time::Instant::now();
36    SimTime {
37        start: now,
38        now: Mutex::new(now),
39        system_start: SystemTime::now(),
40    }
41});
42
43#[derive(Debug)]
44/// Errors returned by `Timeout`.
45///
46/// This error is returned when a timeout expires before the function was able
47/// to finish.
48pub struct TimeoutError;
49
50impl fmt::Display for TimeoutError {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        write!(f, "deadline has elapsed")
53    }
54}
55
56impl Error for TimeoutError {}
57
58/// The Sleeps trait allows different implementations to control the behavior of sleep.
59pub trait Clock {
60    /// Initiates a sleep for the specified duration
61    fn sleep(
62        &self,
63        duration: tokio::time::Duration,
64    ) -> impl std::future::Future<Output = ()> + Send + Sync;
65    /// Initiates a sleep for the specified duration
66    fn non_advancing_sleep(
67        &self,
68        duration: tokio::time::Duration,
69    ) -> impl std::future::Future<Output = ()> + Send + Sync;
70    /// Get the current time according to the clock
71    fn now(&self) -> tokio::time::Instant;
72    /// Sleep until the specified deadline.
73    fn sleep_until(
74        &self,
75        deadline: tokio::time::Instant,
76    ) -> impl std::future::Future<Output = ()> + Send + Sync;
77    /// Get the current system time according to the clock
78    fn system_time_now(&self) -> SystemTime;
79    /// Require a future to complete within the specified duration
80    ///
81    /// if the future completes before the duration has elapsed, then the completed value is returned.
82    /// Otherwise, an error is returned and the future is canceled.
83    fn timeout<F, T>(
84        &self,
85        duration: tokio::time::Duration,
86        f: F,
87    ) -> impl std::future::Future<Output = Result<T, TimeoutError>> + Send
88    where
89        F: std::future::Future<Output = T> + Send;
90}
91
92/// An adapter that allows us to control the behaviour of sleep between performing a real sleep
93/// and a sleep on the simnet
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub enum ClockKind {
96    /// Simulates a clock that uses the simnet's current time as the source of truth
97    Sim(SimClock),
98    /// Represents a real clock using tokio's sleep functionality for production use.
99    Real(RealClock),
100}
101
102impl Clock for ClockKind {
103    async fn sleep(&self, duration: tokio::time::Duration) {
104        match self {
105            Self::Sim(clock) => clock.sleep(duration).await,
106            Self::Real(clock) => clock.sleep(duration).await,
107        }
108    }
109    async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
110        match self {
111            Self::Sim(clock) => clock.non_advancing_sleep(duration).await,
112            Self::Real(clock) => clock.non_advancing_sleep(duration).await,
113        }
114    }
115    async fn sleep_until(&self, deadline: tokio::time::Instant) {
116        match self {
117            Self::Sim(clock) => clock.sleep_until(deadline).await,
118            Self::Real(clock) => clock.sleep_until(deadline).await,
119        }
120    }
121    fn now(&self) -> tokio::time::Instant {
122        match self {
123            Self::Sim(clock) => clock.now(),
124            Self::Real(clock) => clock.now(),
125        }
126    }
127    fn system_time_now(&self) -> SystemTime {
128        match self {
129            Self::Sim(clock) => clock.system_time_now(),
130            Self::Real(clock) => clock.system_time_now(),
131        }
132    }
133    async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
134    where
135        F: std::future::Future<Output = T> + Send,
136    {
137        match self {
138            Self::Sim(clock) => clock.timeout(duration, f).await,
139            Self::Real(clock) => clock.timeout(duration, f).await,
140        }
141    }
142}
143
144impl TelemetryClock for ClockKind {
145    fn now(&self) -> tokio::time::Instant {
146        match self {
147            Self::Sim(clock) => clock.now(),
148            Self::Real(clock) => clock.now(),
149        }
150    }
151
152    fn system_time_now(&self) -> std::time::SystemTime {
153        match self {
154            Self::Sim(clock) => clock.system_time_now(),
155            Self::Real(clock) => clock.system_time_now(),
156        }
157    }
158}
159
160impl Default for ClockKind {
161    fn default() -> Self {
162        Self::Real(RealClock)
163    }
164}
165
166impl ClockKind {
167    /// Returns the appropriate clock given the channel address kind
168    /// a proc is being served on
169    pub fn for_channel_addr(channel_addr: &ChannelAddr) -> Self {
170        match channel_addr {
171            ChannelAddr::Sim(_) => Self::Sim(SimClock),
172            _ => Self::Real(RealClock),
173        }
174    }
175}
176
177/// Clock to be used in simulator runs that allows the simnet to create a scheduled event for.
178/// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's
179/// time to the wakeup time and use the transmitter to wake up this green thread
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct SimClock;
182
183impl Clock for SimClock {
184    /// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
185    async fn sleep(&self, duration: tokio::time::Duration) {
186        let (tx, rx) = oneshot::channel();
187
188        simnet_handle()
189            .unwrap()
190            .send_event(SleepEvent::new(tx, duration))
191            .unwrap();
192        rx.await.unwrap();
193    }
194
195    async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
196        let (tx, rx) = oneshot::channel();
197
198        simnet_handle()
199            .unwrap()
200            .send_nonadvanceable_event(SleepEvent::new(tx, duration))
201            .unwrap();
202        rx.await.unwrap();
203    }
204
205    async fn sleep_until(&self, deadline: tokio::time::Instant) {
206        let now = self.now();
207        if deadline <= now {
208            return;
209        }
210        self.sleep(deadline - now).await;
211    }
212    /// Get the current time according to the simnet
213    fn now(&self) -> tokio::time::Instant {
214        *SIM_TIME.now.lock().unwrap()
215    }
216
217    fn system_time_now(&self) -> SystemTime {
218        SIM_TIME.system_start + self.now().duration_since(SIM_TIME.start)
219    }
220
221    #[allow(clippy::disallowed_methods)]
222    async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
223    where
224        F: std::future::Future<Output = T>,
225    {
226        let (tx, deadline_rx) = oneshot::channel();
227
228        simnet_handle()
229            .unwrap()
230            .send_event(SleepEvent::new(tx, duration))
231            .unwrap();
232
233        let fut = f;
234        pin_mut!(fut);
235
236        tokio::select! {
237            _ = deadline_rx => {
238                Err(TimeoutError)
239            }
240            res = &mut fut => Ok(res)
241        }
242    }
243}
244
245impl SimClock {
246    /// Advance the sumulator's time to the specified instant
247    pub fn advance_to(&self, time: tokio::time::Instant) {
248        let mut guard = SIM_TIME.now.lock().unwrap();
249        *guard = time;
250    }
251
252    /// Get the number of milliseconds elapsed since the start of the simulation
253    pub fn duration_since_start(&self, instant: tokio::time::Instant) -> tokio::time::Duration {
254        instant.duration_since(SIM_TIME.start)
255    }
256
257    /// Instant marking the start of the simulation
258    pub fn start(&self) -> tokio::time::Instant {
259        SIM_TIME.start
260    }
261}
262
263/// An adapter for tokio::time::sleep to be used in production
264#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct RealClock;
266
267impl Clock for RealClock {
268    #[allow(clippy::disallowed_methods)]
269    async fn sleep(&self, duration: tokio::time::Duration) {
270        tokio::time::sleep(duration).await;
271    }
272    async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
273        Self::sleep(self, duration).await;
274    }
275    #[allow(clippy::disallowed_methods)]
276    async fn sleep_until(&self, deadline: tokio::time::Instant) {
277        tokio::time::sleep_until(deadline).await;
278    }
279    /// Get the current time using tokio::time::Instant
280    #[allow(clippy::disallowed_methods)]
281    fn now(&self) -> tokio::time::Instant {
282        tokio::time::Instant::now()
283    }
284    #[allow(clippy::disallowed_methods)]
285    fn system_time_now(&self) -> SystemTime {
286        SystemTime::now()
287    }
288    #[allow(clippy::disallowed_methods)]
289    async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
290    where
291        F: std::future::Future<Output = T>,
292    {
293        tokio::time::timeout(duration, f)
294            .await
295            .map_err(|_| TimeoutError)
296    }
297}
298
299#[cfg(test)]
300mod tests {
301
302    use crate::clock::Clock;
303    use crate::clock::SimClock;
304    use crate::simnet;
305
306    #[tokio::test]
307    async fn test_sim_clock_simple() {
308        let start = SimClock.now();
309        assert_eq!(
310            SimClock.duration_since_start(start),
311            tokio::time::Duration::ZERO
312        );
313        SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
314        let end = SimClock.now();
315        assert_eq!(
316            SimClock.duration_since_start(end),
317            tokio::time::Duration::from_millis(10000)
318        );
319        assert_eq!(
320            end.duration_since(start),
321            tokio::time::Duration::from_secs(10)
322        );
323    }
324
325    #[tokio::test]
326    async fn test_sim_clock_system_time() {
327        let start = SimClock.system_time_now();
328        SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
329        let end = SimClock.system_time_now();
330        assert_eq!(
331            end.duration_since(start).unwrap(),
332            tokio::time::Duration::from_secs(10)
333        );
334    }
335
336    #[tokio::test]
337    async fn test_sim_timeout() {
338        simnet::start();
339        let res = SimClock
340            .timeout(tokio::time::Duration::from_secs(10), async {
341                SimClock.sleep(tokio::time::Duration::from_secs(5)).await;
342                5
343            })
344            .await;
345        assert_eq!(res.unwrap(), 5);
346
347        let res = SimClock
348            .timeout(tokio::time::Duration::from_secs(10), async {
349                SimClock.sleep(tokio::time::Duration::from_secs(15)).await;
350                5
351            })
352            .await;
353        assert!(res.is_err());
354    }
355}