1use std::error::Error;
12use std::fmt;
13use std::sync::LazyLock;
14use std::sync::Mutex;
15use std::time::SystemTime;
16
17use async_trait::async_trait;
18use futures::pin_mut;
19use hyperactor_telemetry::TelemetryClock;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::channel::ChannelAddr;
24use crate::simnet::Event;
25use crate::simnet::SimNetError;
26use crate::simnet::simnet_handle;
27
28struct SimTime {
29 start: tokio::time::Instant,
30 now: Mutex<tokio::time::Instant>,
31 system_start: SystemTime,
32}
33
34#[allow(clippy::disallowed_methods)]
35static SIM_TIME: LazyLock<SimTime> = LazyLock::new(|| {
36 let now = tokio::time::Instant::now();
37 SimTime {
38 start: now.clone(),
39 now: Mutex::new(now),
40 system_start: SystemTime::now(),
41 }
42});
43
44#[derive(Debug)]
45pub struct TimeoutError;
50
51impl fmt::Display for TimeoutError {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 write!(f, "deadline has elapsed")
54 }
55}
56
57impl Error for TimeoutError {}
58
59pub trait Clock {
61 fn sleep(
63 &self,
64 duration: tokio::time::Duration,
65 ) -> impl std::future::Future<Output = ()> + Send + Sync;
66 fn non_advancing_sleep(
68 &self,
69 duration: tokio::time::Duration,
70 ) -> impl std::future::Future<Output = ()> + Send + Sync;
71 fn now(&self) -> tokio::time::Instant;
73 fn sleep_until(
75 &self,
76 deadline: tokio::time::Instant,
77 ) -> impl std::future::Future<Output = ()> + Send + Sync;
78 fn system_time_now(&self) -> SystemTime;
80 fn timeout<F, T>(
85 &self,
86 duration: tokio::time::Duration,
87 f: F,
88 ) -> impl std::future::Future<Output = Result<T, TimeoutError>> + Send
89 where
90 F: std::future::Future<Output = T> + Send;
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
96pub enum ClockKind {
97 Sim(SimClock),
99 Real(RealClock),
101}
102
103impl Clock for ClockKind {
104 async fn sleep(&self, duration: tokio::time::Duration) {
105 match self {
106 Self::Sim(clock) => clock.sleep(duration).await,
107 Self::Real(clock) => clock.sleep(duration).await,
108 }
109 }
110 async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
111 match self {
112 Self::Sim(clock) => clock.non_advancing_sleep(duration).await,
113 Self::Real(clock) => clock.non_advancing_sleep(duration).await,
114 }
115 }
116 async fn sleep_until(&self, deadline: tokio::time::Instant) {
117 match self {
118 Self::Sim(clock) => clock.sleep_until(deadline).await,
119 Self::Real(clock) => clock.sleep_until(deadline).await,
120 }
121 }
122 fn now(&self) -> tokio::time::Instant {
123 match self {
124 Self::Sim(clock) => clock.now(),
125 Self::Real(clock) => clock.now(),
126 }
127 }
128 fn system_time_now(&self) -> SystemTime {
129 match self {
130 Self::Sim(clock) => clock.system_time_now(),
131 Self::Real(clock) => clock.system_time_now(),
132 }
133 }
134 async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
135 where
136 F: std::future::Future<Output = T> + Send,
137 {
138 match self {
139 Self::Sim(clock) => clock.timeout(duration, f).await,
140 Self::Real(clock) => clock.timeout(duration, f).await,
141 }
142 }
143}
144
145impl TelemetryClock for ClockKind {
146 fn now(&self) -> tokio::time::Instant {
147 match self {
148 Self::Sim(clock) => clock.now(),
149 Self::Real(clock) => clock.now(),
150 }
151 }
152
153 fn system_time_now(&self) -> std::time::SystemTime {
154 match self {
155 Self::Sim(clock) => clock.system_time_now(),
156 Self::Real(clock) => clock.system_time_now(),
157 }
158 }
159}
160
161impl Default for ClockKind {
162 fn default() -> Self {
163 Self::Real(RealClock)
164 }
165}
166
167impl ClockKind {
168 pub fn for_channel_addr(channel_addr: &ChannelAddr) -> Self {
171 match channel_addr {
172 ChannelAddr::Sim(_) => Self::Sim(SimClock),
173 _ => Self::Real(RealClock),
174 }
175 }
176}
177
178#[derive(Debug)]
179struct SleepEvent {
180 done_tx: Option<tokio::sync::oneshot::Sender<()>>,
181 duration: tokio::time::Duration,
182}
183
184impl SleepEvent {
185 pub(crate) fn new(
186 done_tx: tokio::sync::oneshot::Sender<()>,
187 duration: tokio::time::Duration,
188 ) -> Box<Self> {
189 Box::new(Self {
190 done_tx: Some(done_tx),
191 duration,
192 })
193 }
194}
195
196#[async_trait]
197impl Event for SleepEvent {
198 async fn handle(&mut self) -> Result<(), SimNetError> {
199 self.done_tx
200 .take()
201 .unwrap()
202 .send(())
203 .map_err(|_| SimNetError::PanickedTask)?;
204
205 Ok(())
206 }
207
208 fn duration(&self) -> tokio::time::Duration {
209 self.duration
210 }
211
212 fn summary(&self) -> String {
213 format!("Sleeping for {} ms", self.duration.as_millis())
214 }
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct SimClock;
222
223impl Clock for SimClock {
224 async fn sleep(&self, duration: tokio::time::Duration) {
226 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
227
228 simnet_handle()
229 .unwrap()
230 .send_event(SleepEvent::new(tx, duration))
231 .unwrap();
232
233 rx.await.unwrap();
234 }
235
236 async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
237 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
238
239 simnet_handle()
240 .unwrap()
241 .send_nonadvanceable_event(SleepEvent::new(tx, duration))
242 .unwrap();
243
244 rx.await.unwrap();
245 }
246
247 async fn sleep_until(&self, deadline: tokio::time::Instant) {
248 let now = self.now();
249 if deadline <= now {
250 return;
251 }
252 self.sleep(deadline - now).await;
253 }
254 fn now(&self) -> tokio::time::Instant {
256 SIM_TIME.now.lock().unwrap().clone()
257 }
258
259 fn system_time_now(&self) -> SystemTime {
260 SIM_TIME.system_start.clone() + self.now().duration_since(SIM_TIME.start)
261 }
262
263 #[allow(clippy::disallowed_methods)]
264 async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
265 where
266 F: std::future::Future<Output = T>,
267 {
268 let (tx, deadline_rx) = tokio::sync::oneshot::channel::<()>();
269
270 simnet_handle()
271 .unwrap()
272 .send_event(SleepEvent::new(tx, duration))
273 .unwrap();
274
275 let fut = f;
276 pin_mut!(fut);
277
278 tokio::select! {
279 _ = deadline_rx => {
280 Err(TimeoutError)
281 }
282 res = &mut fut => Ok(res)
283 }
284 }
285}
286
287impl SimClock {
288 pub fn advance_to(&self, time: tokio::time::Instant) {
290 let mut guard = SIM_TIME.now.lock().unwrap();
291 *guard = time;
292 }
293
294 pub fn duration_since_start(&self, instant: tokio::time::Instant) -> tokio::time::Duration {
296 instant.duration_since(SIM_TIME.start)
297 }
298
299 pub fn start(&self) -> tokio::time::Instant {
301 SIM_TIME.start.clone()
302 }
303}
304
305#[derive(Debug, Clone, Serialize, Deserialize)]
307pub struct RealClock;
308
309impl Clock for RealClock {
310 #[allow(clippy::disallowed_methods)]
311 async fn sleep(&self, duration: tokio::time::Duration) {
312 tokio::time::sleep(duration).await;
313 }
314 async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
315 Self::sleep(self, duration).await;
316 }
317 #[allow(clippy::disallowed_methods)]
318 async fn sleep_until(&self, deadline: tokio::time::Instant) {
319 tokio::time::sleep_until(deadline).await;
320 }
321 #[allow(clippy::disallowed_methods)]
323 fn now(&self) -> tokio::time::Instant {
324 tokio::time::Instant::now()
325 }
326 #[allow(clippy::disallowed_methods)]
327 fn system_time_now(&self) -> SystemTime {
328 SystemTime::now()
329 }
330 #[allow(clippy::disallowed_methods)]
331 async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
332 where
333 F: std::future::Future<Output = T>,
334 {
335 tokio::time::timeout(duration, f)
336 .await
337 .map_err(|_| TimeoutError)
338 }
339}
340
341#[cfg(test)]
342mod tests {
343
344 use crate::clock::Clock;
345 use crate::clock::SimClock;
346 use crate::simnet;
347
348 #[tokio::test]
349 async fn test_sim_clock_simple() {
350 let start = SimClock.now();
351 assert_eq!(
352 SimClock.duration_since_start(start),
353 tokio::time::Duration::ZERO
354 );
355 SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
356 let end = SimClock.now();
357 assert_eq!(
358 SimClock.duration_since_start(end),
359 tokio::time::Duration::from_millis(10000)
360 );
361 assert_eq!(
362 end.duration_since(start),
363 tokio::time::Duration::from_secs(10)
364 );
365 }
366
367 #[tokio::test]
368 async fn test_sim_clock_system_time() {
369 let start = SimClock.system_time_now();
370 SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
371 let end = SimClock.system_time_now();
372 assert_eq!(
373 end.duration_since(start).unwrap(),
374 tokio::time::Duration::from_secs(10)
375 );
376 }
377
378 #[tokio::test]
379 async fn test_sim_timeout() {
380 simnet::start();
381 let res = SimClock
382 .timeout(tokio::time::Duration::from_secs(10), async {
383 SimClock.sleep(tokio::time::Duration::from_secs(5)).await;
384 5
385 })
386 .await;
387 assert_eq!(res.unwrap(), 5);
388
389 let res = SimClock
390 .timeout(tokio::time::Duration::from_secs(10), async {
391 SimClock.sleep(tokio::time::Duration::from_secs(15)).await;
392 5
393 })
394 .await;
395 assert!(res.is_err());
396 }
397}