hyperactor/
clock.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! The clock allows us to control the behaviour of all time dependent events in both real and simulated time throughout the system
10
11use std::error::Error;
12use std::fmt;
13use std::sync::LazyLock;
14use std::sync::Mutex;
15use std::time::SystemTime;
16
17use async_trait::async_trait;
18use futures::pin_mut;
19use hyperactor_telemetry::TelemetryClock;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::channel::ChannelAddr;
24use crate::simnet::Event;
25use crate::simnet::SimNetError;
26use crate::simnet::simnet_handle;
27
28struct SimTime {
29    start: tokio::time::Instant,
30    now: Mutex<tokio::time::Instant>,
31    system_start: SystemTime,
32}
33
34#[allow(clippy::disallowed_methods)]
35static SIM_TIME: LazyLock<SimTime> = LazyLock::new(|| {
36    let now = tokio::time::Instant::now();
37    SimTime {
38        start: now.clone(),
39        now: Mutex::new(now),
40        system_start: SystemTime::now(),
41    }
42});
43
44#[derive(Debug)]
45/// Errors returned by `Timeout`.
46///
47/// This error is returned when a timeout expires before the function was able
48/// to finish.
49pub struct TimeoutError;
50
51impl fmt::Display for TimeoutError {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        write!(f, "deadline has elapsed")
54    }
55}
56
57impl Error for TimeoutError {}
58
59/// The Sleeps trait allows different implementations to control the behavior of sleep.
60pub trait Clock {
61    /// Initiates a sleep for the specified duration
62    fn sleep(
63        &self,
64        duration: tokio::time::Duration,
65    ) -> impl std::future::Future<Output = ()> + Send + Sync;
66    /// Initiates a sleep for the specified duration
67    fn non_advancing_sleep(
68        &self,
69        duration: tokio::time::Duration,
70    ) -> impl std::future::Future<Output = ()> + Send + Sync;
71    /// Get the current time according to the clock
72    fn now(&self) -> tokio::time::Instant;
73    /// Sleep until the specified deadline.
74    fn sleep_until(
75        &self,
76        deadline: tokio::time::Instant,
77    ) -> impl std::future::Future<Output = ()> + Send + Sync;
78    /// Get the current system time according to the clock
79    fn system_time_now(&self) -> SystemTime;
80    /// Require a future to complete within the specified duration
81    ///
82    /// if the future completes before the duration has elapsed, then the completed value is returned.
83    /// Otherwise, an error is returned and the future is canceled.
84    fn timeout<F, T>(
85        &self,
86        duration: tokio::time::Duration,
87        f: F,
88    ) -> impl std::future::Future<Output = Result<T, TimeoutError>> + Send
89    where
90        F: std::future::Future<Output = T> + Send;
91}
92
93/// An adapter that allows us to control the behaviour of sleep between performing a real sleep
94/// and a sleep on the simnet
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub enum ClockKind {
97    /// Simulates a clock that uses the simnet's current time as the source of truth
98    Sim(SimClock),
99    /// Represents a real clock using tokio's sleep functionality for production use.
100    Real(RealClock),
101}
102
103impl Clock for ClockKind {
104    async fn sleep(&self, duration: tokio::time::Duration) {
105        match self {
106            Self::Sim(clock) => clock.sleep(duration).await,
107            Self::Real(clock) => clock.sleep(duration).await,
108        }
109    }
110    async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
111        match self {
112            Self::Sim(clock) => clock.non_advancing_sleep(duration).await,
113            Self::Real(clock) => clock.non_advancing_sleep(duration).await,
114        }
115    }
116    async fn sleep_until(&self, deadline: tokio::time::Instant) {
117        match self {
118            Self::Sim(clock) => clock.sleep_until(deadline).await,
119            Self::Real(clock) => clock.sleep_until(deadline).await,
120        }
121    }
122    fn now(&self) -> tokio::time::Instant {
123        match self {
124            Self::Sim(clock) => clock.now(),
125            Self::Real(clock) => clock.now(),
126        }
127    }
128    fn system_time_now(&self) -> SystemTime {
129        match self {
130            Self::Sim(clock) => clock.system_time_now(),
131            Self::Real(clock) => clock.system_time_now(),
132        }
133    }
134    async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
135    where
136        F: std::future::Future<Output = T> + Send,
137    {
138        match self {
139            Self::Sim(clock) => clock.timeout(duration, f).await,
140            Self::Real(clock) => clock.timeout(duration, f).await,
141        }
142    }
143}
144
145impl TelemetryClock for ClockKind {
146    fn now(&self) -> tokio::time::Instant {
147        match self {
148            Self::Sim(clock) => clock.now(),
149            Self::Real(clock) => clock.now(),
150        }
151    }
152
153    fn system_time_now(&self) -> std::time::SystemTime {
154        match self {
155            Self::Sim(clock) => clock.system_time_now(),
156            Self::Real(clock) => clock.system_time_now(),
157        }
158    }
159}
160
161impl Default for ClockKind {
162    fn default() -> Self {
163        Self::Real(RealClock)
164    }
165}
166
167impl ClockKind {
168    /// Returns the appropriate clock given the channel address kind
169    /// a proc is being served on
170    pub fn for_channel_addr(channel_addr: &ChannelAddr) -> Self {
171        match channel_addr {
172            ChannelAddr::Sim(_) => Self::Sim(SimClock),
173            _ => Self::Real(RealClock),
174        }
175    }
176}
177
178#[derive(Debug)]
179struct SleepEvent {
180    done_tx: Option<tokio::sync::oneshot::Sender<()>>,
181    duration: tokio::time::Duration,
182}
183
184impl SleepEvent {
185    pub(crate) fn new(
186        done_tx: tokio::sync::oneshot::Sender<()>,
187        duration: tokio::time::Duration,
188    ) -> Box<Self> {
189        Box::new(Self {
190            done_tx: Some(done_tx),
191            duration,
192        })
193    }
194}
195
196#[async_trait]
197impl Event for SleepEvent {
198    async fn handle(&mut self) -> Result<(), SimNetError> {
199        self.done_tx
200            .take()
201            .unwrap()
202            .send(())
203            .map_err(|_| SimNetError::PanickedTask)?;
204
205        Ok(())
206    }
207
208    fn duration(&self) -> tokio::time::Duration {
209        self.duration
210    }
211
212    fn summary(&self) -> String {
213        format!("Sleeping for {} ms", self.duration.as_millis())
214    }
215}
216
217/// Clock to be used in simulator runs that allows the simnet to create a scheduled event for.
218/// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's
219/// time to the wakeup time and use the transmitter to wake up this green thread
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct SimClock;
222
223impl Clock for SimClock {
224    /// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
225    async fn sleep(&self, duration: tokio::time::Duration) {
226        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
227
228        simnet_handle()
229            .unwrap()
230            .send_event(SleepEvent::new(tx, duration))
231            .unwrap();
232
233        rx.await.unwrap();
234    }
235
236    async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
237        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
238
239        simnet_handle()
240            .unwrap()
241            .send_nonadvanceable_event(SleepEvent::new(tx, duration))
242            .unwrap();
243
244        rx.await.unwrap();
245    }
246
247    async fn sleep_until(&self, deadline: tokio::time::Instant) {
248        let now = self.now();
249        if deadline <= now {
250            return;
251        }
252        self.sleep(deadline - now).await;
253    }
254    /// Get the current time according to the simnet
255    fn now(&self) -> tokio::time::Instant {
256        SIM_TIME.now.lock().unwrap().clone()
257    }
258
259    fn system_time_now(&self) -> SystemTime {
260        SIM_TIME.system_start.clone() + self.now().duration_since(SIM_TIME.start)
261    }
262
263    #[allow(clippy::disallowed_methods)]
264    async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
265    where
266        F: std::future::Future<Output = T>,
267    {
268        let (tx, deadline_rx) = tokio::sync::oneshot::channel::<()>();
269
270        simnet_handle()
271            .unwrap()
272            .send_event(SleepEvent::new(tx, duration))
273            .unwrap();
274
275        let fut = f;
276        pin_mut!(fut);
277
278        tokio::select! {
279            _ = deadline_rx => {
280                Err(TimeoutError)
281            }
282            res = &mut fut => Ok(res)
283        }
284    }
285}
286
287impl SimClock {
288    /// Advance the sumulator's time to the specified instant
289    pub fn advance_to(&self, time: tokio::time::Instant) {
290        let mut guard = SIM_TIME.now.lock().unwrap();
291        *guard = time;
292    }
293
294    /// Get the number of milliseconds elapsed since the start of the simulation
295    pub fn duration_since_start(&self, instant: tokio::time::Instant) -> tokio::time::Duration {
296        instant.duration_since(SIM_TIME.start)
297    }
298
299    /// Instant marking the start of the simulation
300    pub fn start(&self) -> tokio::time::Instant {
301        SIM_TIME.start.clone()
302    }
303}
304
305/// An adapter for tokio::time::sleep to be used in production
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub struct RealClock;
308
309impl Clock for RealClock {
310    #[allow(clippy::disallowed_methods)]
311    async fn sleep(&self, duration: tokio::time::Duration) {
312        tokio::time::sleep(duration).await;
313    }
314    async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
315        Self::sleep(self, duration).await;
316    }
317    #[allow(clippy::disallowed_methods)]
318    async fn sleep_until(&self, deadline: tokio::time::Instant) {
319        tokio::time::sleep_until(deadline).await;
320    }
321    /// Get the current time using tokio::time::Instant
322    #[allow(clippy::disallowed_methods)]
323    fn now(&self) -> tokio::time::Instant {
324        tokio::time::Instant::now()
325    }
326    #[allow(clippy::disallowed_methods)]
327    fn system_time_now(&self) -> SystemTime {
328        SystemTime::now()
329    }
330    #[allow(clippy::disallowed_methods)]
331    async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
332    where
333        F: std::future::Future<Output = T>,
334    {
335        tokio::time::timeout(duration, f)
336            .await
337            .map_err(|_| TimeoutError)
338    }
339}
340
341#[cfg(test)]
342mod tests {
343
344    use crate::clock::Clock;
345    use crate::clock::SimClock;
346    use crate::simnet;
347
348    #[tokio::test]
349    async fn test_sim_clock_simple() {
350        let start = SimClock.now();
351        assert_eq!(
352            SimClock.duration_since_start(start),
353            tokio::time::Duration::ZERO
354        );
355        SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
356        let end = SimClock.now();
357        assert_eq!(
358            SimClock.duration_since_start(end),
359            tokio::time::Duration::from_millis(10000)
360        );
361        assert_eq!(
362            end.duration_since(start),
363            tokio::time::Duration::from_secs(10)
364        );
365    }
366
367    #[tokio::test]
368    async fn test_sim_clock_system_time() {
369        let start = SimClock.system_time_now();
370        SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
371        let end = SimClock.system_time_now();
372        assert_eq!(
373            end.duration_since(start).unwrap(),
374            tokio::time::Duration::from_secs(10)
375        );
376    }
377
378    #[tokio::test]
379    async fn test_sim_timeout() {
380        simnet::start();
381        let res = SimClock
382            .timeout(tokio::time::Duration::from_secs(10), async {
383                SimClock.sleep(tokio::time::Duration::from_secs(5)).await;
384                5
385            })
386            .await;
387        assert_eq!(res.unwrap(), 5);
388
389        let res = SimClock
390            .timeout(tokio::time::Duration::from_secs(10), async {
391                SimClock.sleep(tokio::time::Duration::from_secs(15)).await;
392                5
393            })
394            .await;
395        assert!(res.is_err());
396    }
397}