1use std::error::Error;
12use std::fmt;
13use std::sync::LazyLock;
14use std::sync::Mutex;
15use std::sync::OnceLock;
16use std::time::SystemTime;
17
18use futures::pin_mut;
19use hyperactor_telemetry::TelemetryClock;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::Mailbox;
24use crate::channel::ChannelAddr;
25use crate::data::Named;
26use crate::id;
27use crate::mailbox::DeliveryError;
28use crate::mailbox::MailboxSender;
29use crate::mailbox::MessageEnvelope;
30use crate::mailbox::Undeliverable;
31use crate::mailbox::UndeliverableMailboxSender;
32use crate::mailbox::monitored_return_handle;
33use crate::simnet::SleepEvent;
34use crate::simnet::simnet_handle;
35
36struct SimTime {
37 start: tokio::time::Instant,
38 now: Mutex<tokio::time::Instant>,
39 system_start: SystemTime,
40}
41
42#[allow(clippy::disallowed_methods)]
43static SIM_TIME: LazyLock<SimTime> = LazyLock::new(|| {
44 let now = tokio::time::Instant::now();
45 SimTime {
46 start: now,
47 now: Mutex::new(now),
48 system_start: SystemTime::now(),
49 }
50});
51
52#[derive(Debug)]
53pub struct TimeoutError;
58
59impl fmt::Display for TimeoutError {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 write!(f, "deadline has elapsed")
62 }
63}
64
65impl Error for TimeoutError {}
66
67pub trait Clock {
69 fn sleep(
71 &self,
72 duration: tokio::time::Duration,
73 ) -> impl std::future::Future<Output = ()> + Send + Sync;
74 fn non_advancing_sleep(
76 &self,
77 duration: tokio::time::Duration,
78 ) -> impl std::future::Future<Output = ()> + Send + Sync;
79 fn now(&self) -> tokio::time::Instant;
81 fn sleep_until(
83 &self,
84 deadline: tokio::time::Instant,
85 ) -> impl std::future::Future<Output = ()> + Send + Sync;
86 fn system_time_now(&self) -> SystemTime;
88 fn timeout<F, T>(
93 &self,
94 duration: tokio::time::Duration,
95 f: F,
96 ) -> impl std::future::Future<Output = Result<T, TimeoutError>> + Send
97 where
98 F: std::future::Future<Output = T> + Send;
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
104pub enum ClockKind {
105 Sim(SimClock),
107 Real(RealClock),
109}
110
111impl Clock for ClockKind {
112 async fn sleep(&self, duration: tokio::time::Duration) {
113 match self {
114 Self::Sim(clock) => clock.sleep(duration).await,
115 Self::Real(clock) => clock.sleep(duration).await,
116 }
117 }
118 async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
119 match self {
120 Self::Sim(clock) => clock.non_advancing_sleep(duration).await,
121 Self::Real(clock) => clock.non_advancing_sleep(duration).await,
122 }
123 }
124 async fn sleep_until(&self, deadline: tokio::time::Instant) {
125 match self {
126 Self::Sim(clock) => clock.sleep_until(deadline).await,
127 Self::Real(clock) => clock.sleep_until(deadline).await,
128 }
129 }
130 fn now(&self) -> tokio::time::Instant {
131 match self {
132 Self::Sim(clock) => clock.now(),
133 Self::Real(clock) => clock.now(),
134 }
135 }
136 fn system_time_now(&self) -> SystemTime {
137 match self {
138 Self::Sim(clock) => clock.system_time_now(),
139 Self::Real(clock) => clock.system_time_now(),
140 }
141 }
142 async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
143 where
144 F: std::future::Future<Output = T> + Send,
145 {
146 match self {
147 Self::Sim(clock) => clock.timeout(duration, f).await,
148 Self::Real(clock) => clock.timeout(duration, f).await,
149 }
150 }
151}
152
153impl TelemetryClock for ClockKind {
154 fn now(&self) -> tokio::time::Instant {
155 match self {
156 Self::Sim(clock) => clock.now(),
157 Self::Real(clock) => clock.now(),
158 }
159 }
160
161 fn system_time_now(&self) -> std::time::SystemTime {
162 match self {
163 Self::Sim(clock) => clock.system_time_now(),
164 Self::Real(clock) => clock.system_time_now(),
165 }
166 }
167}
168
169impl Default for ClockKind {
170 fn default() -> Self {
171 Self::Real(RealClock)
172 }
173}
174
175impl ClockKind {
176 pub fn for_channel_addr(channel_addr: &ChannelAddr) -> Self {
179 match channel_addr {
180 ChannelAddr::Sim(_) => Self::Sim(SimClock),
181 _ => Self::Real(RealClock),
182 }
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct SimClock;
191
192impl Clock for SimClock {
193 async fn sleep(&self, duration: tokio::time::Duration) {
195 let mailbox = SimClock::mailbox().clone();
196 let (tx, rx) = mailbox.open_once_port::<()>();
197
198 simnet_handle()
199 .unwrap()
200 .send_event(SleepEvent::new(tx.bind(), mailbox, duration))
201 .unwrap();
202 rx.recv().await.unwrap();
203 }
204
205 async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
206 let mailbox = SimClock::mailbox().clone();
207 let (tx, rx) = mailbox.open_once_port::<()>();
208
209 simnet_handle()
210 .unwrap()
211 .send_nonadvanceable_event(SleepEvent::new(tx.bind(), mailbox, duration))
212 .unwrap();
213 rx.recv().await.unwrap();
214 }
215
216 async fn sleep_until(&self, deadline: tokio::time::Instant) {
217 let now = self.now();
218 if deadline <= now {
219 return;
220 }
221 self.sleep(deadline - now).await;
222 }
223 fn now(&self) -> tokio::time::Instant {
225 *SIM_TIME.now.lock().unwrap()
226 }
227
228 fn system_time_now(&self) -> SystemTime {
229 SIM_TIME.system_start + self.now().duration_since(SIM_TIME.start)
230 }
231
232 #[allow(clippy::disallowed_methods)]
233 async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
234 where
235 F: std::future::Future<Output = T>,
236 {
237 let mailbox = SimClock::mailbox().clone();
238 let (tx, deadline_rx) = mailbox.open_once_port::<()>();
239
240 simnet_handle()
241 .unwrap()
242 .send_event(SleepEvent::new(tx.bind(), mailbox, duration))
243 .unwrap();
244
245 let fut = f;
246 pin_mut!(fut);
247
248 tokio::select! {
249 _ = deadline_rx.recv() => {
250 Err(TimeoutError)
251 }
252 res = &mut fut => Ok(res)
253 }
254 }
255}
256
257impl SimClock {
258 fn mailbox() -> &'static Mailbox {
261 static SIMCLOCK_MAILBOX: OnceLock<Mailbox> = OnceLock::new();
262 SIMCLOCK_MAILBOX.get_or_init(|| {
263 let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
264 let (undeliverable_messages, mut rx) =
265 mailbox.open_port::<Undeliverable<MessageEnvelope>>();
266 undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
267 tokio::spawn(async move {
268 while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
269 envelope.set_error(DeliveryError::BrokenLink(
270 "message returned to undeliverable port".to_string(),
271 ));
272 UndeliverableMailboxSender
273 .post(envelope, monitored_return_handle())
274 }
275 });
276 mailbox
277 })
278 }
279
280 pub fn advance_to(&self, time: tokio::time::Instant) {
282 let mut guard = SIM_TIME.now.lock().unwrap();
283 *guard = time;
284 }
285
286 pub fn duration_since_start(&self, instant: tokio::time::Instant) -> tokio::time::Duration {
288 instant.duration_since(SIM_TIME.start)
289 }
290
291 pub fn start(&self) -> tokio::time::Instant {
293 SIM_TIME.start
294 }
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize)]
299pub struct RealClock;
300
301impl Clock for RealClock {
302 #[allow(clippy::disallowed_methods)]
303 async fn sleep(&self, duration: tokio::time::Duration) {
304 tokio::time::sleep(duration).await;
305 }
306 async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
307 Self::sleep(self, duration).await;
308 }
309 #[allow(clippy::disallowed_methods)]
310 async fn sleep_until(&self, deadline: tokio::time::Instant) {
311 tokio::time::sleep_until(deadline).await;
312 }
313 #[allow(clippy::disallowed_methods)]
315 fn now(&self) -> tokio::time::Instant {
316 tokio::time::Instant::now()
317 }
318 #[allow(clippy::disallowed_methods)]
319 fn system_time_now(&self) -> SystemTime {
320 SystemTime::now()
321 }
322 #[allow(clippy::disallowed_methods)]
323 async fn timeout<F, T>(&self, duration: tokio::time::Duration, f: F) -> Result<T, TimeoutError>
324 where
325 F: std::future::Future<Output = T>,
326 {
327 tokio::time::timeout(duration, f)
328 .await
329 .map_err(|_| TimeoutError)
330 }
331}
332
333#[cfg(test)]
334mod tests {
335
336 use crate::clock::Clock;
337 use crate::clock::SimClock;
338 use crate::simnet;
339
340 #[tokio::test]
341 async fn test_sim_clock_simple() {
342 let start = SimClock.now();
343 assert_eq!(
344 SimClock.duration_since_start(start),
345 tokio::time::Duration::ZERO
346 );
347 SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
348 let end = SimClock.now();
349 assert_eq!(
350 SimClock.duration_since_start(end),
351 tokio::time::Duration::from_millis(10000)
352 );
353 assert_eq!(
354 end.duration_since(start),
355 tokio::time::Duration::from_secs(10)
356 );
357 }
358
359 #[tokio::test]
360 async fn test_sim_clock_system_time() {
361 let start = SimClock.system_time_now();
362 SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
363 let end = SimClock.system_time_now();
364 assert_eq!(
365 end.duration_since(start).unwrap(),
366 tokio::time::Duration::from_secs(10)
367 );
368 }
369
370 #[tokio::test]
371 async fn test_sim_timeout() {
372 simnet::start();
373 let res = SimClock
374 .timeout(tokio::time::Duration::from_secs(10), async {
375 SimClock.sleep(tokio::time::Duration::from_secs(5)).await;
376 5
377 })
378 .await;
379 assert_eq!(res.unwrap(), 5);
380
381 let res = SimClock
382 .timeout(tokio::time::Duration::from_secs(10), async {
383 SimClock.sleep(tokio::time::Duration::from_secs(15)).await;
384 5
385 })
386 .await;
387 assert!(res.is_err());
388 }
389}