1use std::error::Error;
12use std::fmt;
13use std::sync::LazyLock;
14use std::sync::Mutex;
15use std::time::SystemTime;
16
17use futures::pin_mut;
18use hyperactor_telemetry::TelemetryClock;
19use serde::Deserialize;
20use serde::Serialize;
21use tokio::sync::oneshot;
22
23use crate::channel::ChannelAddr;
24use crate::simnet::SleepEvent;
25use crate::simnet::simnet_handle;
26
27struct SimTime {
28 start: tokio::time::Instant,
29 now: Mutex<tokio::time::Instant>,
30 system_start: SystemTime,
31}
32
33#[allow(clippy::disallowed_methods)]
34static SIM_TIME: LazyLock<SimTime> = LazyLock::new(|| {
35 let now = tokio::time::Instant::now();
36 SimTime {
37 start: now,
38 now: Mutex::new(now),
39 system_start: SystemTime::now(),
40 }
41});
42
43#[derive(Debug)]
44pub struct TimeoutError;
49
50impl fmt::Display for TimeoutError {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 write!(f, "deadline has elapsed")
53 }
54}
55
56impl Error for TimeoutError {}
57
58pub trait Clock {
60 fn sleep(
62 &self,
63 duration: tokio::time::Duration,
64 ) -> impl std::future::Future<Output = ()> + Send + Sync;
65 fn non_advancing_sleep(
67 &self,
68 duration: tokio::time::Duration,
69 ) -> impl std::future::Future<Output = ()> + Send + Sync;
70 fn now(&self) -> tokio::time::Instant;
72 fn sleep_until(
74 &self,
75 deadline: tokio::time::Instant,
76 ) -> impl std::future::Future<Output = ()> + Send + Sync;
77 fn system_time_now(&self) -> SystemTime;
79 fn timeout<F, T>(
84 &self,
85 duration: tokio::time::Duration,
86 f: F,
87 ) -> impl std::future::Future<Output = Result<T, TimeoutError>> + Send
88 where
89 F: std::future::Future<Output = T> + Send;
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
95pub enum ClockKind {
96 Sim(SimClock),
98 Real(RealClock),
100}
101
102impl Clock for ClockKind {
103 async fn sleep(&self, duration: tokio::time::Duration) {
104 match self {
105 Self::Sim(clock) => clock.sleep(duration).await,
106 Self::Real(clock) => clock.sleep(duration).await,
107 }
108 }
109 async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
110 match self {
111 Self::Sim(clock) => clock.non_advancing_sleep(duration).await,
112 Self::Real(clock) => clock.non_advancing_sleep(duration).await,
113 }
114 }
115 async fn sleep_until(&self, deadline: tokio::time::Instant) {
116 match self {
117 Self::Sim(clock) => clock.sleep_until(deadline).await,
118 Self::Real(clock) => clock.sleep_until(deadline).await,
119 }
120 }
121 fn now(&self) -> tokio::time::Instant {
122 match self {
123 Self::Sim(clock) => clock.now(),
124 Self::Real(clock) => clock.now(),
125 }
126 }
127 fn system_time_now(&self) -> SystemTime {
128 match self {
129 Self::Sim(clock) => clock.system_time_now(),
130 Self::Real(clock) => clock.system_time_now(),
131 }
132 }
133 async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
134 where
135 F: std::future::Future<Output = T> + Send,
136 {
137 match self {
138 Self::Sim(clock) => clock.timeout(duration, f).await,
139 Self::Real(clock) => clock.timeout(duration, f).await,
140 }
141 }
142}
143
144impl TelemetryClock for ClockKind {
145 fn now(&self) -> tokio::time::Instant {
146 match self {
147 Self::Sim(clock) => clock.now(),
148 Self::Real(clock) => clock.now(),
149 }
150 }
151
152 fn system_time_now(&self) -> std::time::SystemTime {
153 match self {
154 Self::Sim(clock) => clock.system_time_now(),
155 Self::Real(clock) => clock.system_time_now(),
156 }
157 }
158}
159
160impl Default for ClockKind {
161 fn default() -> Self {
162 Self::Real(RealClock)
163 }
164}
165
166impl ClockKind {
167 pub fn for_channel_addr(channel_addr: &ChannelAddr) -> Self {
170 match channel_addr {
171 ChannelAddr::Sim(_) => Self::Sim(SimClock),
172 _ => Self::Real(RealClock),
173 }
174 }
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct SimClock;
182
183impl Clock for SimClock {
184 async fn sleep(&self, duration: tokio::time::Duration) {
186 let (tx, rx) = oneshot::channel();
187
188 simnet_handle()
189 .unwrap()
190 .send_event(SleepEvent::new(tx, duration))
191 .unwrap();
192 rx.await.unwrap();
193 }
194
195 async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
196 let (tx, rx) = oneshot::channel();
197
198 simnet_handle()
199 .unwrap()
200 .send_nonadvanceable_event(SleepEvent::new(tx, duration))
201 .unwrap();
202 rx.await.unwrap();
203 }
204
205 async fn sleep_until(&self, deadline: tokio::time::Instant) {
206 let now = self.now();
207 if deadline <= now {
208 return;
209 }
210 self.sleep(deadline - now).await;
211 }
212 fn now(&self) -> tokio::time::Instant {
214 *SIM_TIME.now.lock().unwrap()
215 }
216
217 fn system_time_now(&self) -> SystemTime {
218 SIM_TIME.system_start + self.now().duration_since(SIM_TIME.start)
219 }
220
221 #[allow(clippy::disallowed_methods)]
222 async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
223 where
224 F: std::future::Future<Output = T>,
225 {
226 let (tx, deadline_rx) = oneshot::channel();
227
228 simnet_handle()
229 .unwrap()
230 .send_event(SleepEvent::new(tx, duration))
231 .unwrap();
232
233 let fut = f;
234 pin_mut!(fut);
235
236 tokio::select! {
237 _ = deadline_rx => {
238 Err(TimeoutError)
239 }
240 res = &mut fut => Ok(res)
241 }
242 }
243}
244
245impl SimClock {
246 pub fn advance_to(&self, time: tokio::time::Instant) {
248 let mut guard = SIM_TIME.now.lock().unwrap();
249 *guard = time;
250 }
251
252 pub fn duration_since_start(&self, instant: tokio::time::Instant) -> tokio::time::Duration {
254 instant.duration_since(SIM_TIME.start)
255 }
256
257 pub fn start(&self) -> tokio::time::Instant {
259 SIM_TIME.start
260 }
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct RealClock;
266
267impl Clock for RealClock {
268 #[allow(clippy::disallowed_methods)]
269 async fn sleep(&self, duration: tokio::time::Duration) {
270 tokio::time::sleep(duration).await;
271 }
272 async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
273 Self::sleep(self, duration).await;
274 }
275 #[allow(clippy::disallowed_methods)]
276 async fn sleep_until(&self, deadline: tokio::time::Instant) {
277 tokio::time::sleep_until(deadline).await;
278 }
279 #[allow(clippy::disallowed_methods)]
281 fn now(&self) -> tokio::time::Instant {
282 tokio::time::Instant::now()
283 }
284 #[allow(clippy::disallowed_methods)]
285 fn system_time_now(&self) -> SystemTime {
286 SystemTime::now()
287 }
288 #[allow(clippy::disallowed_methods)]
289 async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
290 where
291 F: std::future::Future<Output = T>,
292 {
293 tokio::time::timeout(duration, f)
294 .await
295 .map_err(|_| TimeoutError)
296 }
297}
298
299#[cfg(test)]
300mod tests {
301
302 use crate::clock::Clock;
303 use crate::clock::SimClock;
304 use crate::simnet;
305
306 #[tokio::test]
307 async fn test_sim_clock_simple() {
308 let start = SimClock.now();
309 assert_eq!(
310 SimClock.duration_since_start(start),
311 tokio::time::Duration::ZERO
312 );
313 SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
314 let end = SimClock.now();
315 assert_eq!(
316 SimClock.duration_since_start(end),
317 tokio::time::Duration::from_millis(10000)
318 );
319 assert_eq!(
320 end.duration_since(start),
321 tokio::time::Duration::from_secs(10)
322 );
323 }
324
325 #[tokio::test]
326 async fn test_sim_clock_system_time() {
327 let start = SimClock.system_time_now();
328 SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
329 let end = SimClock.system_time_now();
330 assert_eq!(
331 end.duration_since(start).unwrap(),
332 tokio::time::Duration::from_secs(10)
333 );
334 }
335
336 #[tokio::test]
337 async fn test_sim_timeout() {
338 simnet::start();
339 let res = SimClock
340 .timeout(tokio::time::Duration::from_secs(10), async {
341 SimClock.sleep(tokio::time::Duration::from_secs(5)).await;
342 5
343 })
344 .await;
345 assert_eq!(res.unwrap(), 5);
346
347 let res = SimClock
348 .timeout(tokio::time::Duration::from_secs(10), async {
349 SimClock.sleep(tokio::time::Duration::from_secs(15)).await;
350 5
351 })
352 .await;
353 assert!(res.is_err());
354 }
355}